diff --git a/crates/index-scheduler/src/processing.rs b/crates/index-scheduler/src/processing.rs index 1c0b63441..884a8e975 100644 --- a/crates/index-scheduler/src/processing.rs +++ b/crates/index-scheduler/src/processing.rs @@ -84,11 +84,11 @@ make_enum_progress! { RetrievingTasks, RetrievingBatches, DeletingTasksDateTime, + DeletingBatchesDateTime, DeletingTasksMetadata, + DeletingBatchesMetadata, DeletingTasks, DeletingBatches, - DeletingBatchesDateTime, - DeletingBatchesMetadata, } } diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index 5211b8eb7..dfba22df1 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -1,5 +1,5 @@ use std::collections::{BTreeSet, HashMap, HashSet}; -use std::ops::{Bound, RangeInclusive}; +use std::ops::RangeInclusive; use std::panic::{catch_unwind, AssertUnwindSafe}; use std::sync::atomic::Ordering; @@ -7,10 +7,11 @@ use meilisearch_types::batches::{BatchEnqueuedAt, BatchId}; use meilisearch_types::heed::{Database, RoTxn, RwTxn}; use meilisearch_types::milli::progress::{Progress, VariableNameStep}; use meilisearch_types::milli::{self, CboRoaringBitmapCodec, ChannelCongestion}; -use meilisearch_types::tasks::{self, Details, IndexSwap, Kind, KindWithContent, Status, Task}; +use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task}; use meilisearch_types::versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH}; use milli::update::Settings as MilliSettings; use roaring::RoaringBitmap; +use time::OffsetDateTime; use super::create_batch::Batch; use crate::processing::{ @@ -18,10 +19,7 @@ use crate::processing::{ InnerSwappingTwoIndexes, SwappingTheIndexes, TaskCancelationProgress, TaskDeletionProgress, UpdateIndexProgress, }; -use crate::utils::{ - remove_n_tasks_datetime_earlier_than, remove_task_datetime, swap_index_uid_in_task, - ProcessingBatch, -}; +use crate::utils::{remove_n_tasks_datetime_earlier_than, swap_index_uid_in_task, ProcessingBatch}; use crate::{Error, IndexScheduler, Result, TaskId, BEI128}; #[derive(Debug, Default)] @@ -508,15 +506,16 @@ impl IndexScheduler { /// Delete each given task from all the databases (if it is deleteable). /// /// Return the number of tasks that were actually deleted. + #[allow(clippy::reversed_empty_ranges)] fn delete_matched_tasks( &self, matched_tasks: &RoaringBitmap, progress: &Progress, ) -> Result { - fn consecutive_ranges(iter: I) -> impl Iterator - where - I: IntoIterator, - { + /// Given a **sorted** iterator of `u32`, return an iterator of the ranges of consecutive values it contains. + fn consecutive_ranges( + iter: impl IntoIterator, + ) -> impl Iterator> { let mut iter = iter.into_iter().peekable(); std::iter::from_fn(move || { @@ -533,7 +532,7 @@ impl IndexScheduler { } } - Some((start, end)) + Some(start..=end) }) } @@ -543,25 +542,53 @@ impl IndexScheduler { mut to_remove: HashMap, db: Database, ) -> Result<()> { - if !to_remove.is_empty() { - let mut iter = db.rev_range_mut(wtxn, &range)?; - while let Some(i) = iter.next() { - let (timestamp, mut tasks) = i?; - if let Some(to_remove_tasks) = to_remove.remove(×tamp) { - tasks -= &to_remove_tasks; - if tasks.is_empty() { - // safety: We don't keep references to the database - unsafe { iter.del_current()? }; - } else { - // safety: We don't keep references to the database - unsafe { iter.put_current(×tamp, &tasks)? }; - } + if to_remove.is_empty() { + return Ok(()); + } + + let mut iter = db.rev_range_mut(wtxn, &range)?; + while let Some(i) = iter.next() { + let (timestamp, mut tasks) = i?; + if let Some(to_remove_tasks) = to_remove.remove(×tamp) { + tasks -= &to_remove_tasks; + if tasks.is_empty() { + // safety: We don't keep references to the database + unsafe { iter.del_current()? }; + } else { + // safety: We don't keep references to the database + unsafe { iter.put_current(×tamp, &tasks)? }; } } } + Ok(()) } + trait UnixTimestampNanosOpt { + fn unix_timestamp_nanos(self) -> Option; + } + impl UnixTimestampNanosOpt for Option { + fn unix_timestamp_nanos(self) -> Option { + self.map(|dt| dt.unix_timestamp_nanos()) + } + } + impl UnixTimestampNanosOpt for i128 { + fn unix_timestamp_nanos(self) -> Option { + Some(self) + } + } + + /// Extends a range to include the given value + fn extend_range(value: impl UnixTimestampNanosOpt, range: &mut RangeInclusive) { + let Some(value) = value.unix_timestamp_nanos() else { return }; + if value < *range.start() { + *range = value..=*range.end(); + } + if value > *range.end() { + *range = *range.start()..=value; + } + } + progress.update_progress(TaskDeletionProgress::RetrievingTasks); let rtxn = self.env.read_txn()?; @@ -569,37 +596,27 @@ impl IndexScheduler { // 1. Remove from this list the tasks that we are not allowed to delete let enqueued_tasks = self.queue.tasks.get_status(&rtxn, Status::Enqueued)?; let processing_tasks = &self.processing_tasks.read().unwrap().processing.clone(); - let all_task_ids = self.queue.tasks.all_task_ids(&rtxn)?; let mut to_delete_tasks = all_task_ids & matched_tasks; to_delete_tasks -= &**processing_tasks; to_delete_tasks -= &enqueued_tasks; - // 2. We now have a list of tasks to delete, delete them + // 2. We now have a list of tasks to delete. Read their metadata to list what needs to be updated. let mut affected_indexes = HashSet::new(); let mut affected_statuses = HashSet::new(); let mut affected_kinds = HashSet::new(); let mut affected_canceled_by = RoaringBitmap::new(); - // The tasks that have been removed *per batches*. - let mut affected_batches: HashMap = HashMap::new(); - - let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u32); - progress.update_progress(task_progress); - let mut min_enqueued = i128::MAX; - let mut max_enqueued = i128::MIN; - let mut min_started = i128::MAX; - let mut max_started = i128::MIN; - let mut min_finished = i128::MAX; - let mut max_finished = i128::MIN; + let mut affected_batches: HashMap = HashMap::new(); // The tasks that have been removed *per batches*. + let mut enqueued_range = i128::MAX..=i128::MIN; + let mut started_range = i128::MAX..=i128::MIN; + let mut finished_range = i128::MAX..=i128::MIN; let mut tasks_enqueued_to_remove: HashMap = HashMap::new(); let mut tasks_started_to_remove: HashMap = HashMap::new(); let mut tasks_finished_to_remove: HashMap = HashMap::new(); + let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u32); + progress.update_progress(task_progress); for range in consecutive_ranges(to_delete_tasks.iter()) { - let iter = self - .queue - .tasks - .all_tasks - .range(&rtxn, &(Bound::Included(range.0), Bound::Included(range.1)))?; + let iter = self.queue.tasks.all_tasks.range(&rtxn, &range)?; for task in iter { let (task_id, task) = task?; @@ -608,33 +625,16 @@ impl IndexScheduler { affected_kinds.insert(task.kind.as_kind()); let enqueued_at = task.enqueued_at.unix_timestamp_nanos(); - if enqueued_at < min_enqueued { - min_enqueued = enqueued_at; - } - if enqueued_at > max_enqueued { - max_enqueued = enqueued_at; - } + extend_range(enqueued_at, &mut enqueued_range); tasks_enqueued_to_remove.entry(enqueued_at).or_default().insert(task_id); - if let Some(started_at) = task.started_at { - let started_at = started_at.unix_timestamp_nanos(); - if started_at < min_started { - min_started = started_at; - } - if started_at > max_started { - max_started = started_at; - } + if let Some(started_at) = task.started_at.unix_timestamp_nanos() { + extend_range(started_at, &mut started_range); tasks_started_to_remove.entry(started_at).or_default().insert(task_id); } - if let Some(finished_at) = task.finished_at { - let finished_at = finished_at.unix_timestamp_nanos(); - if finished_at < min_finished { - min_finished = finished_at; - } - if finished_at > max_finished { - max_finished = finished_at; - } + if let Some(finished_at) = task.finished_at.unix_timestamp_nanos() { + extend_range(finished_at, &mut finished_range); tasks_finished_to_remove.entry(finished_at).or_default().insert(task_id); } @@ -648,6 +648,7 @@ impl IndexScheduler { } } + // 3. Read the tasks by indexes, statuses and kinds let mut affected_indexes_tasks = HashMap::new(); for index in &affected_indexes { affected_indexes_tasks @@ -667,43 +668,35 @@ impl IndexScheduler { } let mut to_remove_from_kinds = HashMap::new(); + // 3. Read affected batches to list metadata that needs to be updated. + let mut batches_enqueued_range = i128::MAX..=i128::MIN; + let mut batches_started_range = i128::MAX..=i128::MIN; + let mut batches_finished_range = i128::MAX..=i128::MIN; let mut batches_enqueued_to_remove: HashMap = HashMap::new(); - let mut batches_min_enqueued = i128::MAX; - let mut batches_max_enqueued = i128::MIN; let mut batches_started_to_remove: HashMap = HashMap::new(); - let mut batches_min_started = i128::MAX; - let mut batches_max_started = i128::MIN; let mut batches_finished_to_remove: HashMap = HashMap::new(); - let mut batches_min_finished = i128::MAX; - let mut batches_max_finished = i128::MIN; - let mut to_delete_batches = RoaringBitmap::new(); let mut tasks_to_remove_earlier = Vec::new(); - progress.update_progress(TaskDeletionProgress::RetrievingBatches); let affected_batches_bitmap = RoaringBitmap::from_iter(affected_batches.keys()); + let (atomic_progress, task_progress) = + AtomicBatchStep::new(affected_batches_bitmap.len() as u32); + progress.update_progress(task_progress); for range in consecutive_ranges(affected_batches_bitmap.iter()) { - let iter = self - .queue - .batch_to_tasks_mapping - .range(&rtxn, &(Bound::Included(range.0), Bound::Included(range.1)))?; - for i in iter { - let (batch_id, mut tasks) = i?; + let iter = self.queue.batch_to_tasks_mapping.range(&rtxn, &range)?; + for batch in iter { + let (batch_id, mut tasks) = batch?; let to_delete_tasks = affected_batches.remove(&batch_id).unwrap_or_default(); - tasks -= &to_delete_tasks; + // We must remove the batch entirely if tasks.is_empty() { if let Some(batch) = self.queue.batches.get_batch(&rtxn, batch_id)? { if let Some(BatchEnqueuedAt { earliest, oldest }) = batch.enqueued_at { let earliest = earliest.unix_timestamp_nanos(); let oldest = oldest.unix_timestamp_nanos(); - if earliest < batches_min_enqueued { - batches_min_enqueued = earliest; - } - if oldest > batches_max_enqueued { - batches_max_enqueued = oldest; - } + extend_range(earliest, &mut batches_enqueued_range); + extend_range(oldest, &mut batches_enqueued_range); batches_enqueued_to_remove .entry(earliest) .or_default() @@ -719,21 +712,11 @@ impl IndexScheduler { )); } let started_at = batch.started_at.unix_timestamp_nanos(); - if started_at < batches_min_started { - batches_min_started = started_at; - } - if started_at > batches_max_started { - batches_max_started = started_at; - } + extend_range(started_at, &mut batches_started_range); batches_started_to_remove.entry(started_at).or_default().insert(batch_id); if let Some(finished_at) = batch.finished_at { let finished_at = finished_at.unix_timestamp_nanos(); - if finished_at < batches_min_finished { - batches_min_finished = finished_at; - } - if finished_at > batches_max_finished { - batches_max_finished = finished_at; - } + extend_range(finished_at, &mut batches_finished_range); batches_finished_to_remove .entry(finished_at) .or_default() @@ -745,7 +728,7 @@ impl IndexScheduler { } // Anyway, we must remove the batch from all its reverse indexes. - // The only way to do that is to check + // Check if those are affected by the task deletion. for (index, index_tasks) in affected_indexes_tasks.iter() { if index_tasks.intersection_len(&tasks) == 0 { @@ -773,43 +756,85 @@ impl IndexScheduler { .insert(batch_id); } } + + // Note: no need to delete the persisted task data since + // we can only delete succeeded, failed, and canceled tasks. + // In each of those cases, the persisted data is supposed to + // have been deleted already. + + atomic_progress.fetch_add(1, Ordering::Relaxed); } } - // Note: don't delete the persisted task data since - // we can only delete succeeded, failed, and canceled tasks. - // In each of those cases, the persisted data is supposed to - // have been deleted already. - drop(rtxn); - let mut wtxn_owned = self.env.write_txn()?; - let wtxn = &mut wtxn_owned; + let mut owned_wtxn = self.env.write_txn()?; + let wtxn = &mut owned_wtxn; + // 4. Remove task datetimes progress.update_progress(TaskDeletionProgress::DeletingTasksDateTime); + remove_datetimes( wtxn, - min_enqueued..=max_enqueued, + enqueued_range, tasks_enqueued_to_remove, + self.queue.tasks.enqueued_at, + )?; + + remove_datetimes( + wtxn, + started_range, + tasks_started_to_remove, + self.queue.tasks.started_at, + )?; + + remove_datetimes( + wtxn, + finished_range, + tasks_finished_to_remove, + self.queue.tasks.finished_at, + )?; + + // 8. Delete batches datetimes + progress.update_progress(TaskDeletionProgress::DeletingBatchesDateTime); + + for (started_at, nb_tasks, batch_id) in tasks_to_remove_earlier { + remove_n_tasks_datetime_earlier_than( + wtxn, + self.queue.batches.enqueued_at, + started_at, + nb_tasks, + batch_id, + )?; + } + + remove_datetimes( + wtxn, + batches_enqueued_range, + batches_enqueued_to_remove, self.queue.batches.enqueued_at, )?; + remove_datetimes( wtxn, - min_started..=max_started, - tasks_started_to_remove, + batches_started_range, + batches_started_to_remove, self.queue.batches.started_at, )?; + remove_datetimes( wtxn, - min_finished..=max_finished, - tasks_finished_to_remove, + batches_finished_range, + batches_finished_to_remove, self.queue.batches.finished_at, )?; + // 5. Remove tasks from indexes, statuses, and kinds progress.update_progress(TaskDeletionProgress::DeletingTasksMetadata); let (atomic_progress, task_progress) = AtomicTaskStep::new( (affected_indexes.len() + affected_statuses.len() + affected_kinds.len()) as u32, ); progress.update_progress(task_progress); + for index in affected_indexes.iter() { self.queue.tasks.update_index(wtxn, index, |bitmap| *bitmap -= &to_delete_tasks)?; atomic_progress.fetch_add(1, Ordering::Relaxed); @@ -825,70 +850,7 @@ impl IndexScheduler { atomic_progress.fetch_add(1, Ordering::Relaxed); } - progress.update_progress(TaskDeletionProgress::DeletingTasks); - let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u32); - progress.update_progress(task_progress); - for range in consecutive_ranges(to_delete_tasks.iter()) { - self.queue - .tasks - .all_tasks - .delete_range(wtxn, &(Bound::Included(range.0), Bound::Included(range.1)))?; - atomic_progress.fetch_add(1, Ordering::Relaxed); - } - for canceled_by in affected_canceled_by { - if let Some(mut tasks) = self.queue.tasks.canceled_by.get(wtxn, &canceled_by)? { - tasks -= &to_delete_tasks; - if tasks.is_empty() { - self.queue.tasks.canceled_by.delete(wtxn, &canceled_by)?; - } else { - self.queue.tasks.canceled_by.put(wtxn, &canceled_by, &tasks)?; - } - } - } - - progress.update_progress(TaskDeletionProgress::DeletingBatches); - - for range in consecutive_ranges(to_delete_batches.iter()) { - self.queue - .batches - .all_batches - .delete_range(wtxn, &(Bound::Included(range.0), Bound::Included(range.1)))?; - self.queue - .batch_to_tasks_mapping - .delete_range(wtxn, &(Bound::Included(range.0), Bound::Included(range.1)))?; - } - - progress.update_progress(TaskDeletionProgress::DeletingBatchesDateTime); - - for (started_at, nb_tasks, batch_id) in tasks_to_remove_earlier { - remove_n_tasks_datetime_earlier_than( - wtxn, - self.queue.batches.enqueued_at, - started_at, - nb_tasks, - batch_id, - )?; - } - - remove_datetimes( - wtxn, - batches_min_enqueued..=batches_max_enqueued, - batches_enqueued_to_remove, - self.queue.batches.enqueued_at, - )?; - remove_datetimes( - wtxn, - batches_min_started..=batches_max_started, - batches_started_to_remove, - self.queue.batches.started_at, - )?; - remove_datetimes( - wtxn, - batches_min_finished..=batches_max_finished, - batches_finished_to_remove, - self.queue.batches.finished_at, - )?; - + // 9. Remove batches metadata from indexes, statuses, and kinds progress.update_progress(TaskDeletionProgress::DeletingBatchesMetadata); for (index, batches) in to_remove_from_indexes { @@ -909,7 +871,39 @@ impl IndexScheduler { })?; } - wtxn_owned.commit()?; + // 6. Delete tasks + progress.update_progress(TaskDeletionProgress::DeletingTasks); + let (atomic_progress, task_progress) = + AtomicTaskStep::new((to_delete_tasks.len() + affected_canceled_by.len()) as u32); + progress.update_progress(task_progress); + for range in consecutive_ranges(to_delete_tasks.iter()) { + self.queue.tasks.all_tasks.delete_range(wtxn, &range)?; + atomic_progress.fetch_add(range.size_hint().0 as u32, Ordering::Relaxed); + } + + for canceled_by in affected_canceled_by { + if let Some(mut tasks) = self.queue.tasks.canceled_by.get(wtxn, &canceled_by)? { + tasks -= &to_delete_tasks; + if tasks.is_empty() { + self.queue.tasks.canceled_by.delete(wtxn, &canceled_by)?; + } else { + self.queue.tasks.canceled_by.put(wtxn, &canceled_by, &tasks)?; + } + } + atomic_progress.fetch_add(1, Ordering::Relaxed); + } + + // 7. Delete batches + progress.update_progress(TaskDeletionProgress::DeletingBatches); + let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_batches.len() as u32); + progress.update_progress(task_progress); + for range in consecutive_ranges(to_delete_batches.iter()) { + self.queue.batches.all_batches.delete_range(wtxn, &range)?; + self.queue.batch_to_tasks_mapping.delete_range(wtxn, &range)?; + atomic_progress.fetch_add(range.size_hint().0 as u32, Ordering::Relaxed); + } + + owned_wtxn.commit()?; Ok(to_delete_tasks) }