diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index f3cddacd2..9c8cf7752 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -9,7 +9,7 @@ use meilisearch_types::milli::{self, CboRoaringBitmapCodec, ChannelCongestion}; use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task}; use meilisearch_types::versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH}; use milli::update::Settings as MilliSettings; -use roaring::RoaringBitmap; +use roaring::{MultiOps, RoaringBitmap}; use super::create_batch::Batch; use crate::processing::{ @@ -617,12 +617,17 @@ impl IndexScheduler { let rtxn = self.env.read_txn()?; // 1. Remove from this list the tasks that we are not allowed to delete - let enqueued_tasks = self.queue.tasks.get_status(&rtxn, Status::Enqueued)?; let processing_tasks = &self.processing_tasks.read().unwrap().processing.clone(); - let all_task_ids = &self.queue.tasks.all_task_ids(&rtxn)?; + let mut status_tasks = HashMap::new(); + for status in enum_iterator::all::() { + status_tasks.insert(status, self.queue.tasks.get_status(&rtxn, status)?); + } + let enqueued_tasks = status_tasks.get(&Status::Enqueued).unwrap(); // We added all statuses + let all_task_ids = status_tasks.values().union(); + let mut to_remove_from_statuses = HashMap::new(); let mut to_delete_tasks = all_task_ids.clone() & matched_tasks; to_delete_tasks -= &**processing_tasks; - to_delete_tasks -= &enqueued_tasks; + to_delete_tasks -= enqueued_tasks; // 2. We now have a list of tasks to delete. Read their metadata to list what needs to be updated. let mut affected_indexes = HashSet::new(); @@ -679,12 +684,6 @@ impl IndexScheduler { } let mut to_remove_from_indexes = HashMap::new(); - let mut affected_statuses_tasks = HashMap::new(); - for status in &affected_statuses { - affected_statuses_tasks.insert(*status, self.queue.tasks.get_status(&rtxn, *status)?); - } - let mut to_remove_from_statuses = HashMap::new(); - let mut affected_kinds_tasks = HashMap::new(); for kind in &affected_kinds { affected_kinds_tasks.insert(*kind, self.queue.tasks.get_kind(&rtxn, *kind)?); @@ -707,7 +706,7 @@ impl IndexScheduler { // Note: we never delete tasks from the mapping. It's error-prone but intentional (perf) // We make sure to filter the tasks from the mapping when we read them. - tasks &= all_task_ids; + tasks &= &all_task_ids; // We must remove the batch entirely if tasks.is_empty() { @@ -725,7 +724,7 @@ impl IndexScheduler { } } - for (status, status_tasks) in affected_statuses_tasks.iter() { + for (status, status_tasks) in status_tasks.iter() { if status_tasks.is_disjoint(&tasks) { to_remove_from_statuses .entry(*status) @@ -806,7 +805,8 @@ impl IndexScheduler { atomic_progress.fetch_add(1, Ordering::Relaxed); } - for (status, mut tasks) in affected_statuses_tasks.into_iter() { + for status in affected_statuses.into_iter() { + let mut tasks = status_tasks.remove(&status).unwrap(); // we inserted all statuses above tasks -= &to_delete_tasks; if tasks.is_empty() { self.queue.tasks.status.delete(wtxn, &status)?;