diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index dfba22df1..26e0fc29c 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -1,7 +1,7 @@ use std::collections::{BTreeSet, HashMap, HashSet}; use std::ops::RangeInclusive; use std::panic::{catch_unwind, AssertUnwindSafe}; -use std::sync::atomic::Ordering; +use std::sync::atomic::{self, Ordering}; use meilisearch_types::batches::{BatchEnqueuedAt, BatchId}; use meilisearch_types::heed::{Database, RoTxn, RwTxn}; @@ -668,7 +668,7 @@ impl IndexScheduler { } let mut to_remove_from_kinds = HashMap::new(); - // 3. Read affected batches to list metadata that needs to be updated. + // 4. Read affected batches to list metadata that needs to be updated. let mut batches_enqueued_range = i128::MAX..=i128::MIN; let mut batches_started_range = i128::MAX..=i128::MIN; let mut batches_finished_range = i128::MAX..=i128::MIN; @@ -689,46 +689,7 @@ impl IndexScheduler { let to_delete_tasks = affected_batches.remove(&batch_id).unwrap_or_default(); tasks -= &to_delete_tasks; - // We must remove the batch entirely - if tasks.is_empty() { - if let Some(batch) = self.queue.batches.get_batch(&rtxn, batch_id)? { - if let Some(BatchEnqueuedAt { earliest, oldest }) = batch.enqueued_at { - let earliest = earliest.unix_timestamp_nanos(); - let oldest = oldest.unix_timestamp_nanos(); - extend_range(earliest, &mut batches_enqueued_range); - extend_range(oldest, &mut batches_enqueued_range); - batches_enqueued_to_remove - .entry(earliest) - .or_default() - .insert(batch_id); - batches_enqueued_to_remove.entry(oldest).or_default().insert(batch_id); - } else { - // If we don't have the enqueued at in the batch it means the database comes from the v1.12 - // and we still need to find the date by scrolling the database - tasks_to_remove_earlier.push(( - batch.started_at, - batch.stats.total_nb_tasks.clamp(1, 2) as usize, - batch_id, - )); - } - let started_at = batch.started_at.unix_timestamp_nanos(); - extend_range(started_at, &mut batches_started_range); - batches_started_to_remove.entry(started_at).or_default().insert(batch_id); - if let Some(finished_at) = batch.finished_at { - let finished_at = finished_at.unix_timestamp_nanos(); - extend_range(finished_at, &mut batches_finished_range); - batches_finished_to_remove - .entry(finished_at) - .or_default() - .insert(batch_id); - } - - to_delete_batches.insert(batch_id); - } - } - - // Anyway, we must remove the batch from all its reverse indexes. - // Check if those are affected by the task deletion. + // We must remove the batch from all the reverse indexes it no longer has tasks for. for (index, index_tasks) in affected_indexes_tasks.iter() { if index_tasks.intersection_len(&tasks) == 0 { @@ -762,6 +723,44 @@ impl IndexScheduler { // In each of those cases, the persisted data is supposed to // have been deleted already. + // We must remove the batch entirely + if tasks.is_empty() { + to_delete_batches.insert(batch_id); + } else { + atomic_progress.fetch_add(1, Ordering::Relaxed); + } + } + } + for range in consecutive_ranges(to_delete_batches.iter()) { + let iter = self.queue.batches.all_batches.range(&rtxn, &range)?; + for batch in iter { + let (batch_id, batch) = batch?; + + if let Some(BatchEnqueuedAt { earliest, oldest }) = batch.enqueued_at { + let earliest = earliest.unix_timestamp_nanos(); + let oldest = oldest.unix_timestamp_nanos(); + extend_range(earliest, &mut batches_enqueued_range); + extend_range(oldest, &mut batches_enqueued_range); + batches_enqueued_to_remove.entry(earliest).or_default().insert(batch_id); + batches_enqueued_to_remove.entry(oldest).or_default().insert(batch_id); + } else { + // If we don't have the enqueued at in the batch it means the database comes from the v1.12 + // and we still need to find the date by scrolling the database + tasks_to_remove_earlier.push(( + batch.started_at, + batch.stats.total_nb_tasks.clamp(1, 2) as usize, + batch_id, + )); + } + let started_at = batch.started_at.unix_timestamp_nanos(); + extend_range(started_at, &mut batches_started_range); + batches_started_to_remove.entry(started_at).or_default().insert(batch_id); + if let Some(finished_at) = batch.finished_at { + let finished_at = finished_at.unix_timestamp_nanos(); + extend_range(finished_at, &mut batches_finished_range); + batches_finished_to_remove.entry(finished_at).or_default().insert(batch_id); + } + atomic_progress.fetch_add(1, Ordering::Relaxed); } } @@ -770,7 +769,7 @@ impl IndexScheduler { let mut owned_wtxn = self.env.write_txn()?; let wtxn = &mut owned_wtxn; - // 4. Remove task datetimes + // 5. Remove task datetimes progress.update_progress(TaskDeletionProgress::DeletingTasksDateTime); remove_datetimes( @@ -794,7 +793,7 @@ impl IndexScheduler { self.queue.tasks.finished_at, )?; - // 8. Delete batches datetimes + // 6. Delete batches datetimes progress.update_progress(TaskDeletionProgress::DeletingBatchesDateTime); for (started_at, nb_tasks, batch_id) in tasks_to_remove_earlier { @@ -828,7 +827,7 @@ impl IndexScheduler { self.queue.batches.finished_at, )?; - // 5. Remove tasks from indexes, statuses, and kinds + // 7. Remove tasks from indexes, statuses, and kinds progress.update_progress(TaskDeletionProgress::DeletingTasksMetadata); let (atomic_progress, task_progress) = AtomicTaskStep::new( (affected_indexes.len() + affected_statuses.len() + affected_kinds.len()) as u32, @@ -850,7 +849,7 @@ impl IndexScheduler { atomic_progress.fetch_add(1, Ordering::Relaxed); } - // 9. Remove batches metadata from indexes, statuses, and kinds + // 8. Remove batches metadata from indexes, statuses, and kinds progress.update_progress(TaskDeletionProgress::DeletingBatchesMetadata); for (index, batches) in to_remove_from_indexes { @@ -871,7 +870,7 @@ impl IndexScheduler { })?; } - // 6. Delete tasks + // 9. Delete tasks progress.update_progress(TaskDeletionProgress::DeletingTasks); let (atomic_progress, task_progress) = AtomicTaskStep::new((to_delete_tasks.len() + affected_canceled_by.len()) as u32); @@ -893,7 +892,7 @@ impl IndexScheduler { atomic_progress.fetch_add(1, Ordering::Relaxed); } - // 7. Delete batches + // 10. Delete batches progress.update_progress(TaskDeletionProgress::DeletingBatches); let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_batches.len() as u32); progress.update_progress(task_progress);