diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index 2d7f80713..1bb42b026 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -683,24 +683,24 @@ impl IndexScheduler { ); progress.update_progress(task_progress); for index in affected_indexes.iter() { - self.queue.tasks.update_index(wtxn, index, |bitmap| *bitmap -= &to_delete_tasks)?; + self.queue.tasks.update_index(wtxn, index, |bitmap| *bitmap -= &tasks_to_remove)?; atomic_progress.fetch_add(1, Ordering::Relaxed); } for status in affected_statuses.iter() { - self.queue.tasks.update_status(wtxn, *status, |bitmap| *bitmap -= &to_delete_tasks)?; + self.queue.tasks.update_status(wtxn, *status, |bitmap| *bitmap -= &tasks_to_remove)?; atomic_progress.fetch_add(1, Ordering::Relaxed); } for kind in affected_kinds.iter() { - self.queue.tasks.update_kind(wtxn, *kind, |bitmap| *bitmap -= &to_delete_tasks)?; + self.queue.tasks.update_kind(wtxn, *kind, |bitmap| *bitmap -= &tasks_to_remove)?; atomic_progress.fetch_add(1, Ordering::Relaxed); } progress.update_progress(TaskDeletionProgress::DeletingTasks); - let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u32); + let (atomic_progress, task_progress) = AtomicTaskStep::new(tasks_to_remove.len() as u32); progress.update_progress(task_progress); - for range in consecutive_ranges(to_delete_tasks.iter()) { + for range in consecutive_ranges(tasks_to_remove.iter()) { self.queue .tasks .all_tasks @@ -709,7 +709,7 @@ impl IndexScheduler { } for canceled_by in affected_canceled_by { if let Some(mut tasks) = self.queue.tasks.canceled_by.get(wtxn, &canceled_by)? { - tasks -= &to_delete_tasks; + tasks -= &tasks_to_remove; if tasks.is_empty() { self.queue.tasks.canceled_by.delete(wtxn, &canceled_by)?; } else { @@ -750,6 +750,8 @@ impl IndexScheduler { let mut min_finished = i128::MAX; let mut max_finished = i128::MIN; + let mut batches_to_remove = RoaringBitmap::new(); + for (batch_id, to_delete_tasks) in affected_batches { if let Some(mut tasks) = self.queue.batch_to_tasks_mapping.get(wtxn, &batch_id)? { tasks -= &to_delete_tasks; @@ -797,8 +799,7 @@ impl IndexScheduler { finished_to_remove.entry(finished_at).or_default().insert(batch_id); } - self.queue.batches.all_batches.delete(wtxn, &batch_id)?; - self.queue.batch_to_tasks_mapping.delete(wtxn, &batch_id)?; + batches_to_remove.insert(batch_id); } } @@ -835,6 +836,16 @@ impl IndexScheduler { atomic_progress.fetch_add(1, Ordering::Relaxed); } + for range in consecutive_ranges(batches_to_remove.iter()) { + self.queue + .batches + .all_batches + .delete_range(wtxn, &(Bound::Included(range.0), Bound::Included(range.1)))?; + self.queue + .batch_to_tasks_mapping + .delete_range(wtxn, &(Bound::Included(range.0), Bound::Included(range.1)))?; + } + remove_datetimes( wtxn, min_enqueued..=max_enqueued, @@ -874,7 +885,7 @@ impl IndexScheduler { wtxn_owned.commit()?; - Ok(to_delete_tasks) + Ok(tasks_to_remove) } /// Cancel each given task from all the databases (if it is cancelable).