This commit is contained in:
Mubelotix
2025-08-12 11:59:00 +02:00
parent 1d46cb30f2
commit e5188699f9

View File

@ -99,7 +99,7 @@ impl IndexScheduler {
} }
} }
let mut deleted_tasks = self.delete_matched_tasks(&matched_tasks, &progress)?; let mut deleted_tasks = self.delete_matched_tasks(matched_tasks.clone(), &progress)?;
for task in tasks.iter_mut() { for task in tasks.iter_mut() {
task.status = Status::Succeeded; task.status = Status::Succeeded;
@ -508,7 +508,7 @@ impl IndexScheduler {
#[allow(clippy::reversed_empty_ranges)] #[allow(clippy::reversed_empty_ranges)]
fn delete_matched_tasks( fn delete_matched_tasks(
&self, &self,
matched_tasks: &RoaringBitmap, matched_tasks: RoaringBitmap,
progress: &Progress, progress: &Progress,
) -> Result<RoaringBitmap> { ) -> Result<RoaringBitmap> {
/// Given a **sorted** iterator of `u32`, return an iterator of the ranges of consecutive values it contains. /// Given a **sorted** iterator of `u32`, return an iterator of the ranges of consecutive values it contains.
@ -640,8 +640,8 @@ impl IndexScheduler {
// 1. Remove from this list the tasks that we are not allowed to delete // 1. Remove from this list the tasks that we are not allowed to delete
let enqueued_tasks = self.queue.tasks.get_status(&rtxn, Status::Enqueued)?; let enqueued_tasks = self.queue.tasks.get_status(&rtxn, Status::Enqueued)?;
let processing_tasks = &self.processing_tasks.read().unwrap().processing.clone(); let processing_tasks = &self.processing_tasks.read().unwrap().processing.clone();
let all_task_ids = self.queue.tasks.all_task_ids(&rtxn)?; let all_task_ids = &self.queue.tasks.all_task_ids(&rtxn)?;
let mut to_delete_tasks = all_task_ids & matched_tasks; let mut to_delete_tasks = matched_tasks & all_task_ids;
to_delete_tasks -= &**processing_tasks; to_delete_tasks -= &**processing_tasks;
to_delete_tasks -= &enqueued_tasks; to_delete_tasks -= &enqueued_tasks;
@ -726,6 +726,10 @@ impl IndexScheduler {
let to_delete_tasks = affected_batches.remove(&batch_id).unwrap_or_default(); let to_delete_tasks = affected_batches.remove(&batch_id).unwrap_or_default();
tasks -= &to_delete_tasks; tasks -= &to_delete_tasks;
// Note: we never delete tasks from the mapping. It's error-prone but intentional (perf)
// We make sure to filter the tasks from the mapping when we read them.
tasks &= all_task_ids;
// We must remove the batch entirely // We must remove the batch entirely
if tasks.is_empty() { if tasks.is_empty() {
to_delete_batches.insert(batch_id); to_delete_batches.insert(batch_id);