diff --git a/crates/index-scheduler/src/processing.rs b/crates/index-scheduler/src/processing.rs index 608e059eb..7a222fd7c 100644 --- a/crates/index-scheduler/src/processing.rs +++ b/crates/index-scheduler/src/processing.rs @@ -83,7 +83,6 @@ make_enum_progress! { pub enum TaskDeletionProgress { RetrievingTasks, RetrievingBatchTasks, - RetrievingBatches, DeletingTasksDateTime, DeletingBatchesDateTime, DeletingTasksMetadata, diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index 0ae45abac..6434299af 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -3,7 +3,7 @@ use std::ops::RangeInclusive; use std::panic::{catch_unwind, AssertUnwindSafe}; use std::sync::atomic::Ordering; -use meilisearch_types::batches::{BatchEnqueuedAt, BatchId}; +use meilisearch_types::batches::BatchId; use meilisearch_types::heed::{Database, RoTxn, RwTxn}; use meilisearch_types::milli::progress::{Progress, VariableNameStep}; use meilisearch_types::milli::{self, CboRoaringBitmapCodec, ChannelCongestion}; @@ -18,7 +18,7 @@ use crate::processing::{ InnerSwappingTwoIndexes, SwappingTheIndexes, TaskCancelationProgress, TaskDeletionProgress, UpdateIndexProgress, }; -use crate::utils::{remove_n_tasks_datetime_earlier_than, swap_index_uid_in_task, ProcessingBatch}; +use crate::utils::{swap_index_uid_in_task, ProcessingBatch}; use crate::{Error, IndexScheduler, Result, TaskId, BEI128}; #[derive(Debug, Default)] @@ -587,6 +587,54 @@ impl IndexScheduler { Ok(()) } + fn remove_batch_datetimes( + wtxn: &mut RwTxn<'_>, + to_remove: &RoaringBitmap, + db: Database, + ) -> Result<()> { + if to_remove.is_empty() { + return Ok(()); + } + + // We iterate over the time database to see which ranges of timestamps need to be removed + let iter = db.iter(wtxn)?; + let mut delete_range_start = None; + let mut delete_ranges = Vec::new(); + let mut to_put: HashMap = HashMap::new(); + for i in iter { + let (timestamp, mut current) = i?; + + if current.iter().any(|task_id| to_remove.contains(task_id)) { + current -= to_remove; + + if current.is_empty() { + delete_range_start = Some(timestamp); + } else { + // We could close the deletion range but it's not necessary because the new value will get reinserted anyway + to_put.insert(timestamp, current); + } + } else if let Some(delete_range_start) = delete_range_start.take() { + // Current one must not be deleted so we need to skip it + delete_ranges.push(delete_range_start..timestamp); + } + } + if let Some(delete_range_start) = delete_range_start.take() { + delete_ranges.push(delete_range_start..i128::MAX); + } + + for range in delete_ranges { + db.delete_range(wtxn, &range)?; + } + + for (timestamp, data) in to_put { + db.put(wtxn, ×tamp, &data)?; + } + + Ok(()) + } + + let instant = std::time::Instant::now(); + progress.update_progress(TaskDeletionProgress::RetrievingTasks); let rtxn = self.env.read_txn()?; @@ -667,11 +715,7 @@ impl IndexScheduler { let mut to_remove_from_kinds = HashMap::new(); // 4. Read affected batches' tasks - let mut batches_enqueued_to_remove: HashMap = HashMap::new(); - let mut batches_started_to_remove: HashMap = HashMap::new(); - let mut batches_finished_to_remove: HashMap = HashMap::new(); let mut to_delete_batches = RoaringBitmap::new(); - let mut tasks_to_remove_earlier = Vec::new(); let affected_batches_bitmap = RoaringBitmap::from_iter(affected_batches.keys()); progress.update_progress(TaskDeletionProgress::RetrievingBatchTasks); let (atomic_progress, task_progress) = @@ -727,40 +771,6 @@ impl IndexScheduler { } } - // 5. Read batches metadata - progress.update_progress(TaskDeletionProgress::RetrievingBatches); - let (atomic_progress, task_progress) = AtomicBatchStep::new(to_delete_batches.len() as u32); - progress.update_progress(task_progress); - for range in consecutive_ranges(to_delete_batches.iter()) { - let iter = self.queue.batches.all_batches.range(&rtxn, &range)?; - for batch in iter { - let (batch_id, batch) = batch?; - - if let Some(BatchEnqueuedAt { earliest, oldest }) = batch.enqueued_at { - let earliest = earliest.unix_timestamp_nanos(); - let oldest = oldest.unix_timestamp_nanos(); - batches_enqueued_to_remove.entry(earliest).or_default().insert(batch_id); - batches_enqueued_to_remove.entry(oldest).or_default().insert(batch_id); - } else { - // If we don't have the enqueued at in the batch it means the database comes from the v1.12 - // and we still need to find the date by scrolling the database - tasks_to_remove_earlier.push(( - batch.started_at, - batch.stats.total_nb_tasks.clamp(1, 2) as usize, - batch_id, - )); - } - let started_at = batch.started_at.unix_timestamp_nanos(); - batches_started_to_remove.entry(started_at).or_default().insert(batch_id); - if let Some(finished_at) = batch.finished_at { - let finished_at = finished_at.unix_timestamp_nanos(); - batches_finished_to_remove.entry(finished_at).or_default().insert(batch_id); - } - - atomic_progress.fetch_add(1, Ordering::Relaxed); - } - } - drop(rtxn); let mut owned_wtxn = self.env.write_txn()?; let wtxn = &mut owned_wtxn; @@ -777,19 +787,11 @@ impl IndexScheduler { // 6. Delete batches datetimes progress.update_progress(TaskDeletionProgress::DeletingBatchesDateTime); - for (started_at, nb_tasks, batch_id) in tasks_to_remove_earlier { - remove_n_tasks_datetime_earlier_than( - wtxn, - self.queue.batches.enqueued_at, - started_at, - nb_tasks, - batch_id, - )?; - } + remove_batch_datetimes(wtxn, &to_delete_batches, self.queue.batches.enqueued_at)?; - remove_datetimes(wtxn, batches_enqueued_to_remove, self.queue.batches.enqueued_at)?; + remove_batch_datetimes(wtxn, &to_delete_batches, self.queue.batches.started_at)?; - remove_datetimes(wtxn, batches_started_to_remove, self.queue.batches.started_at)?; + remove_batch_datetimes(wtxn, &to_delete_batches, self.queue.batches.finished_at)?; remove_datetimes(wtxn, batches_finished_to_remove, self.queue.batches.finished_at)?;