Load status tasks only once

This commit is contained in:
Mubelotix
2025-08-12 18:38:42 +02:00
parent 808b9c71fa
commit 0b89783a3f

View File

@ -9,7 +9,7 @@ use meilisearch_types::milli::{self, CboRoaringBitmapCodec, ChannelCongestion};
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task}; use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task};
use meilisearch_types::versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH}; use meilisearch_types::versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
use milli::update::Settings as MilliSettings; use milli::update::Settings as MilliSettings;
use roaring::RoaringBitmap; use roaring::{MultiOps, RoaringBitmap};
use super::create_batch::Batch; use super::create_batch::Batch;
use crate::processing::{ use crate::processing::{
@ -617,12 +617,17 @@ impl IndexScheduler {
let rtxn = self.env.read_txn()?; let rtxn = self.env.read_txn()?;
// 1. Remove from this list the tasks that we are not allowed to delete // 1. Remove from this list the tasks that we are not allowed to delete
let enqueued_tasks = self.queue.tasks.get_status(&rtxn, Status::Enqueued)?;
let processing_tasks = &self.processing_tasks.read().unwrap().processing.clone(); let processing_tasks = &self.processing_tasks.read().unwrap().processing.clone();
let all_task_ids = &self.queue.tasks.all_task_ids(&rtxn)?; let mut status_tasks = HashMap::new();
for status in enum_iterator::all::<Status>() {
status_tasks.insert(status, self.queue.tasks.get_status(&rtxn, status)?);
}
let enqueued_tasks = status_tasks.get(&Status::Enqueued).unwrap(); // We added all statuses
let all_task_ids = status_tasks.values().union();
let mut to_remove_from_statuses = HashMap::new();
let mut to_delete_tasks = all_task_ids.clone() & matched_tasks; let mut to_delete_tasks = all_task_ids.clone() & matched_tasks;
to_delete_tasks -= &**processing_tasks; to_delete_tasks -= &**processing_tasks;
to_delete_tasks -= &enqueued_tasks; to_delete_tasks -= enqueued_tasks;
// 2. We now have a list of tasks to delete. Read their metadata to list what needs to be updated. // 2. We now have a list of tasks to delete. Read their metadata to list what needs to be updated.
let mut affected_indexes = HashSet::new(); let mut affected_indexes = HashSet::new();
@ -679,12 +684,6 @@ impl IndexScheduler {
} }
let mut to_remove_from_indexes = HashMap::new(); let mut to_remove_from_indexes = HashMap::new();
let mut affected_statuses_tasks = HashMap::new();
for status in &affected_statuses {
affected_statuses_tasks.insert(*status, self.queue.tasks.get_status(&rtxn, *status)?);
}
let mut to_remove_from_statuses = HashMap::new();
let mut affected_kinds_tasks = HashMap::new(); let mut affected_kinds_tasks = HashMap::new();
for kind in &affected_kinds { for kind in &affected_kinds {
affected_kinds_tasks.insert(*kind, self.queue.tasks.get_kind(&rtxn, *kind)?); affected_kinds_tasks.insert(*kind, self.queue.tasks.get_kind(&rtxn, *kind)?);
@ -707,7 +706,7 @@ impl IndexScheduler {
// Note: we never delete tasks from the mapping. It's error-prone but intentional (perf) // Note: we never delete tasks from the mapping. It's error-prone but intentional (perf)
// We make sure to filter the tasks from the mapping when we read them. // We make sure to filter the tasks from the mapping when we read them.
tasks &= all_task_ids; tasks &= &all_task_ids;
// We must remove the batch entirely // We must remove the batch entirely
if tasks.is_empty() { if tasks.is_empty() {
@ -725,7 +724,7 @@ impl IndexScheduler {
} }
} }
for (status, status_tasks) in affected_statuses_tasks.iter() { for (status, status_tasks) in status_tasks.iter() {
if status_tasks.is_disjoint(&tasks) { if status_tasks.is_disjoint(&tasks) {
to_remove_from_statuses to_remove_from_statuses
.entry(*status) .entry(*status)
@ -806,7 +805,8 @@ impl IndexScheduler {
atomic_progress.fetch_add(1, Ordering::Relaxed); atomic_progress.fetch_add(1, Ordering::Relaxed);
} }
for (status, mut tasks) in affected_statuses_tasks.into_iter() { for status in affected_statuses.into_iter() {
let mut tasks = status_tasks.remove(&status).unwrap(); // we inserted all statuses above
tasks -= &to_delete_tasks; tasks -= &to_delete_tasks;
if tasks.is_empty() { if tasks.is_empty() {
self.queue.tasks.status.delete(wtxn, &status)?; self.queue.tasks.status.delete(wtxn, &status)?;