diff --git a/crates/index-scheduler/src/processing.rs b/crates/index-scheduler/src/processing.rs index 884a8e975..608e059eb 100644 --- a/crates/index-scheduler/src/processing.rs +++ b/crates/index-scheduler/src/processing.rs @@ -82,6 +82,7 @@ make_enum_progress! { make_enum_progress! { pub enum TaskDeletionProgress { RetrievingTasks, + RetrievingBatchTasks, RetrievingBatches, DeletingTasksDateTime, DeletingBatchesDateTime, diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index 26e0fc29c..e3d7dc999 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -1,7 +1,7 @@ use std::collections::{BTreeSet, HashMap, HashSet}; use std::ops::RangeInclusive; use std::panic::{catch_unwind, AssertUnwindSafe}; -use std::sync::atomic::{self, Ordering}; +use std::sync::atomic::Ordering; use meilisearch_types::batches::{BatchEnqueuedAt, BatchId}; use meilisearch_types::heed::{Database, RoTxn, RwTxn}; @@ -668,7 +668,7 @@ impl IndexScheduler { } let mut to_remove_from_kinds = HashMap::new(); - // 4. Read affected batches to list metadata that needs to be updated. + // 4. Read affected batches' tasks let mut batches_enqueued_range = i128::MAX..=i128::MIN; let mut batches_started_range = i128::MAX..=i128::MIN; let mut batches_finished_range = i128::MAX..=i128::MIN; @@ -677,8 +677,8 @@ impl IndexScheduler { let mut batches_finished_to_remove: HashMap = HashMap::new(); let mut to_delete_batches = RoaringBitmap::new(); let mut tasks_to_remove_earlier = Vec::new(); - progress.update_progress(TaskDeletionProgress::RetrievingBatches); let affected_batches_bitmap = RoaringBitmap::from_iter(affected_batches.keys()); + progress.update_progress(TaskDeletionProgress::RetrievingBatchTasks); let (atomic_progress, task_progress) = AtomicBatchStep::new(affected_batches_bitmap.len() as u32); progress.update_progress(task_progress); @@ -689,6 +689,11 @@ impl IndexScheduler { let to_delete_tasks = affected_batches.remove(&batch_id).unwrap_or_default(); tasks -= &to_delete_tasks; + // We must remove the batch entirely + if tasks.is_empty() { + to_delete_batches.insert(batch_id); + } + // We must remove the batch from all the reverse indexes it no longer has tasks for. for (index, index_tasks) in affected_indexes_tasks.iter() { @@ -723,14 +728,15 @@ impl IndexScheduler { // In each of those cases, the persisted data is supposed to // have been deleted already. - // We must remove the batch entirely - if tasks.is_empty() { - to_delete_batches.insert(batch_id); - } else { - atomic_progress.fetch_add(1, Ordering::Relaxed); - } + atomic_progress.fetch_add(1, Ordering::Relaxed); } } + + // 5. Read batches metadata + progress.update_progress(TaskDeletionProgress::RetrievingBatches); + let (atomic_progress, task_progress) = + AtomicBatchStep::new(to_delete_batches.len() as u32); + progress.update_progress(task_progress); for range in consecutive_ranges(to_delete_batches.iter()) { let iter = self.queue.batches.all_batches.range(&rtxn, &range)?; for batch in iter {