diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index 8c453f72c..8a584509f 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -99,7 +99,7 @@ impl IndexScheduler { } } - let mut deleted_tasks = self.delete_matched_tasks(&matched_tasks, &progress)?; + let mut deleted_tasks = self.delete_matched_tasks(matched_tasks.clone(), &progress)?; for task in tasks.iter_mut() { task.status = Status::Succeeded; @@ -508,7 +508,7 @@ impl IndexScheduler { #[allow(clippy::reversed_empty_ranges)] fn delete_matched_tasks( &self, - matched_tasks: &RoaringBitmap, + matched_tasks: RoaringBitmap, progress: &Progress, ) -> Result { /// Given a **sorted** iterator of `u32`, return an iterator of the ranges of consecutive values it contains. @@ -640,8 +640,8 @@ impl IndexScheduler { // 1. Remove from this list the tasks that we are not allowed to delete let enqueued_tasks = self.queue.tasks.get_status(&rtxn, Status::Enqueued)?; let processing_tasks = &self.processing_tasks.read().unwrap().processing.clone(); - let all_task_ids = self.queue.tasks.all_task_ids(&rtxn)?; - let mut to_delete_tasks = all_task_ids & matched_tasks; + let all_task_ids = &self.queue.tasks.all_task_ids(&rtxn)?; + let mut to_delete_tasks = matched_tasks & all_task_ids; to_delete_tasks -= &**processing_tasks; to_delete_tasks -= &enqueued_tasks; @@ -726,6 +726,10 @@ impl IndexScheduler { let to_delete_tasks = affected_batches.remove(&batch_id).unwrap_or_default(); tasks -= &to_delete_tasks; + // Note: we never delete tasks from the mapping. It's error-prone but intentional (perf) + // We make sure to filter the tasks from the mapping when we read them. + tasks &= all_task_ids; + // We must remove the batch entirely if tasks.is_empty() { to_delete_batches.insert(batch_id);