From 6f8d788aa8ffd25c2c6b40c6be985e607d1135ef Mon Sep 17 00:00:00 2001 From: Mubelotix Date: Fri, 8 Aug 2025 15:30:15 +0200 Subject: [PATCH] Optim I: read batches before writing --- .../src/scheduler/process_batch.rs | 273 +++++++++--------- 1 file changed, 144 insertions(+), 129 deletions(-) diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index 1bb42b026..17c33d477 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -7,7 +7,7 @@ use meilisearch_types::batches::{BatchEnqueuedAt, BatchId}; use meilisearch_types::heed::{Database, RoTxn, RwTxn}; use meilisearch_types::milli::progress::{Progress, VariableNameStep}; use meilisearch_types::milli::{self, CboRoaringBitmapCodec, ChannelCongestion}; -use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task}; +use meilisearch_types::tasks::{self, Details, IndexSwap, Kind, KindWithContent, Status, Task}; use meilisearch_types::versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH}; use milli::update::Settings as MilliSettings; use roaring::RoaringBitmap; @@ -571,9 +571,9 @@ impl IndexScheduler { let processing_tasks = &self.processing_tasks.read().unwrap().processing.clone(); let all_task_ids = self.queue.tasks.all_task_ids(&rtxn)?; - let mut tasks_to_remove = all_task_ids & matched_tasks; - tasks_to_remove -= &**processing_tasks; - tasks_to_remove -= &enqueued_tasks; + let mut to_delete_tasks = all_task_ids & matched_tasks; + to_delete_tasks -= &**processing_tasks; + to_delete_tasks -= &enqueued_tasks; // 2. We now have a list of tasks to delete, delete them let mut affected_indexes = HashSet::new(); @@ -583,7 +583,7 @@ impl IndexScheduler { // The tasks that have been removed *per batches*. let mut affected_batches: HashMap = HashMap::new(); - let (atomic_progress, task_progress) = AtomicTaskStep::new(tasks_to_remove.len() as u32); + let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u32); progress.update_progress(task_progress); let mut min_enqueued = i128::MAX; let mut max_enqueued = i128::MIN; @@ -591,10 +591,10 @@ impl IndexScheduler { let mut max_started = i128::MIN; let mut min_finished = i128::MAX; let mut max_finished = i128::MIN; - let mut enqueued_to_remove: HashMap = HashMap::new(); - let mut started_to_remove: HashMap = HashMap::new(); - let mut finished_to_remove: HashMap = HashMap::new(); - for range in consecutive_ranges(tasks_to_remove.iter()) { + let mut tasks_enqueued_to_remove: HashMap = HashMap::new(); + let mut tasks_started_to_remove: HashMap = HashMap::new(); + let mut tasks_finished_to_remove: HashMap = HashMap::new(); + for range in consecutive_ranges(to_delete_tasks.iter()) { let iter = self .queue .tasks @@ -614,7 +614,7 @@ impl IndexScheduler { if enqueued_at > max_enqueued { max_enqueued = enqueued_at; } - enqueued_to_remove.entry(enqueued_at).or_default().insert(task_id); + tasks_enqueued_to_remove.entry(enqueued_at).or_default().insert(task_id); if let Some(started_at) = task.started_at { let started_at = started_at.unix_timestamp_nanos(); @@ -624,7 +624,7 @@ impl IndexScheduler { if started_at > max_started { max_started = started_at; } - started_to_remove.entry(started_at).or_default().insert(task_id); + tasks_started_to_remove.entry(started_at).or_default().insert(task_id); } if let Some(finished_at) = task.finished_at { @@ -635,7 +635,7 @@ impl IndexScheduler { if finished_at > max_finished { max_finished = finished_at; } - finished_to_remove.entry(finished_at).or_default().insert(task_id); + tasks_finished_to_remove.entry(finished_at).or_default().insert(task_id); } if let Some(canceled_by) = task.canceled_by { @@ -648,158 +648,87 @@ impl IndexScheduler { } } - // Note: don't delete the persisted task data since - // we can only delete succeeded, failed, and canceled tasks. - // In each of those cases, the persisted data is supposed to - // have been deleted already. - - drop(rtxn); - let mut wtxn_owned = self.env.write_txn()?; - let wtxn = &mut wtxn_owned; - - progress.update_progress(TaskDeletionProgress::DeletingTasksDateTime); - remove_datetimes( - wtxn, - min_enqueued..=max_enqueued, - enqueued_to_remove, - self.queue.batches.enqueued_at, - )?; - remove_datetimes( - wtxn, - min_started..=max_started, - started_to_remove, - self.queue.batches.started_at, - )?; - remove_datetimes( - wtxn, - min_finished..=max_finished, - finished_to_remove, - self.queue.batches.finished_at, - )?; - - progress.update_progress(TaskDeletionProgress::DeletingTasksMetadata); - let (atomic_progress, task_progress) = AtomicTaskStep::new( - (affected_indexes.len() + affected_statuses.len() + affected_kinds.len()) as u32, - ); - progress.update_progress(task_progress); - for index in affected_indexes.iter() { - self.queue.tasks.update_index(wtxn, index, |bitmap| *bitmap -= &tasks_to_remove)?; - atomic_progress.fetch_add(1, Ordering::Relaxed); - } - - for status in affected_statuses.iter() { - self.queue.tasks.update_status(wtxn, *status, |bitmap| *bitmap -= &tasks_to_remove)?; - atomic_progress.fetch_add(1, Ordering::Relaxed); - } - - for kind in affected_kinds.iter() { - self.queue.tasks.update_kind(wtxn, *kind, |bitmap| *bitmap -= &tasks_to_remove)?; - atomic_progress.fetch_add(1, Ordering::Relaxed); - } - - progress.update_progress(TaskDeletionProgress::DeletingTasks); - let (atomic_progress, task_progress) = AtomicTaskStep::new(tasks_to_remove.len() as u32); - progress.update_progress(task_progress); - for range in consecutive_ranges(tasks_to_remove.iter()) { - self.queue - .tasks - .all_tasks - .delete_range(wtxn, &(Bound::Included(range.0), Bound::Included(range.1)))?; - atomic_progress.fetch_add(1, Ordering::Relaxed); - } - for canceled_by in affected_canceled_by { - if let Some(mut tasks) = self.queue.tasks.canceled_by.get(wtxn, &canceled_by)? { - tasks -= &tasks_to_remove; - if tasks.is_empty() { - self.queue.tasks.canceled_by.delete(wtxn, &canceled_by)?; - } else { - self.queue.tasks.canceled_by.put(wtxn, &canceled_by, &tasks)?; - } - } - } - progress.update_progress(TaskDeletionProgress::DeletingBatches); - let (atomic_progress, batch_progress) = AtomicBatchStep::new(affected_batches.len() as u32); - progress.update_progress(batch_progress); - let mut affected_indexes_tasks = HashMap::new(); for index in &affected_indexes { affected_indexes_tasks - .insert(index.as_str(), self.queue.tasks.index_tasks(wtxn, index)?); + .insert(index.as_str(), self.queue.tasks.index_tasks(&rtxn, index)?); } let mut to_remove_from_indexes = HashMap::new(); let mut affected_statuses_tasks = HashMap::new(); for status in &affected_statuses { - affected_statuses_tasks.insert(*status, self.queue.tasks.get_status(wtxn, *status)?); + affected_statuses_tasks.insert(*status, self.queue.tasks.get_status(&rtxn, *status)?); } let mut to_remove_from_statuses = HashMap::new(); let mut affected_kinds_tasks = HashMap::new(); for kind in &affected_kinds { - affected_kinds_tasks.insert(*kind, self.queue.tasks.get_kind(wtxn, *kind)?); + affected_kinds_tasks.insert(*kind, self.queue.tasks.get_kind(&rtxn, *kind)?); } let mut to_remove_from_kinds = HashMap::new(); - let mut enqueued_to_remove: HashMap = HashMap::new(); - let mut min_enqueued = i128::MAX; - let mut max_enqueued = i128::MIN; - let mut started_to_remove: HashMap = HashMap::new(); - let mut min_started = i128::MAX; - let mut max_started = i128::MIN; - let mut finished_to_remove: HashMap = HashMap::new(); - let mut min_finished = i128::MAX; - let mut max_finished = i128::MIN; + let mut batches_enqueued_to_remove: HashMap = HashMap::new(); + let mut batches_min_enqueued = i128::MAX; + let mut batches_max_enqueued = i128::MIN; + let mut batches_started_to_remove: HashMap = HashMap::new(); + let mut batches_min_started = i128::MAX; + let mut batches_max_started = i128::MIN; + let mut batches_finished_to_remove: HashMap = HashMap::new(); + let mut batches_min_finished = i128::MAX; + let mut batches_max_finished = i128::MIN; - let mut batches_to_remove = RoaringBitmap::new(); + let mut to_delete_batches = RoaringBitmap::new(); + let mut tasks_to_remove_earlier = Vec::new(); + progress.update_progress(TaskDeletionProgress::RetrievingBatches); + let (atomic_progress, batch_progress) = AtomicBatchStep::new(affected_batches.len() as u32); + progress.update_progress(batch_progress); for (batch_id, to_delete_tasks) in affected_batches { - if let Some(mut tasks) = self.queue.batch_to_tasks_mapping.get(wtxn, &batch_id)? { + if let Some(mut tasks) = self.queue.batch_to_tasks_mapping.get(&rtxn, &batch_id)? { tasks -= &to_delete_tasks; // We must remove the batch entirely if tasks.is_empty() { - if let Some(batch) = self.queue.batches.get_batch(wtxn, batch_id)? { + if let Some(batch) = self.queue.batches.get_batch(&rtxn, batch_id)? { if let Some(BatchEnqueuedAt { earliest, oldest }) = batch.enqueued_at { let earliest = earliest.unix_timestamp_nanos(); let oldest = oldest.unix_timestamp_nanos(); - if earliest < min_enqueued { - min_enqueued = earliest; + if earliest < batches_min_enqueued { + batches_min_enqueued = earliest; } - if oldest > max_enqueued { - max_enqueued = oldest; + if oldest > batches_max_enqueued { + batches_max_enqueued = oldest; } - enqueued_to_remove.entry(earliest).or_default().insert(batch_id); - enqueued_to_remove.entry(oldest).or_default().insert(batch_id); + batches_enqueued_to_remove.entry(earliest).or_default().insert(batch_id); + batches_enqueued_to_remove.entry(oldest).or_default().insert(batch_id); } else { // If we don't have the enqueued at in the batch it means the database comes from the v1.12 // and we still need to find the date by scrolling the database - remove_n_tasks_datetime_earlier_than( - wtxn, - self.queue.batches.enqueued_at, + tasks_to_remove_earlier.push(( batch.started_at, batch.stats.total_nb_tasks.clamp(1, 2) as usize, batch_id, - )?; + )); } let started_at = batch.started_at.unix_timestamp_nanos(); - if started_at < min_started { - min_started = started_at; + if started_at < batches_min_started { + batches_min_started = started_at; } - if started_at > max_started { - max_started = started_at; + if started_at > batches_max_started { + batches_max_started = started_at; } - started_to_remove.entry(started_at).or_default().insert(batch_id); + batches_started_to_remove.entry(started_at).or_default().insert(batch_id); if let Some(finished_at) = batch.finished_at { let finished_at = finished_at.unix_timestamp_nanos(); - if finished_at < min_finished { - min_finished = finished_at; + if finished_at < batches_min_finished { + batches_min_finished = finished_at; } - if finished_at > max_finished { - max_finished = finished_at; + if finished_at > batches_max_finished { + batches_max_finished = finished_at; } - finished_to_remove.entry(finished_at).or_default().insert(batch_id); + batches_finished_to_remove.entry(finished_at).or_default().insert(batch_id); } - batches_to_remove.insert(batch_id); + to_delete_batches.insert(batch_id); } } @@ -836,7 +765,79 @@ impl IndexScheduler { atomic_progress.fetch_add(1, Ordering::Relaxed); } - for range in consecutive_ranges(batches_to_remove.iter()) { + // Note: don't delete the persisted task data since + // we can only delete succeeded, failed, and canceled tasks. + // In each of those cases, the persisted data is supposed to + // have been deleted already. + + drop(rtxn); + let mut wtxn_owned = self.env.write_txn()?; + let wtxn = &mut wtxn_owned; + + progress.update_progress(TaskDeletionProgress::DeletingTasksDateTime); + remove_datetimes( + wtxn, + min_enqueued..=max_enqueued, + tasks_enqueued_to_remove, + self.queue.batches.enqueued_at, + )?; + remove_datetimes( + wtxn, + min_started..=max_started, + tasks_started_to_remove, + self.queue.batches.started_at, + )?; + remove_datetimes( + wtxn, + min_finished..=max_finished, + tasks_finished_to_remove, + self.queue.batches.finished_at, + )?; + + progress.update_progress(TaskDeletionProgress::DeletingTasksMetadata); + let (atomic_progress, task_progress) = AtomicTaskStep::new( + (affected_indexes.len() + affected_statuses.len() + affected_kinds.len()) as u32, + ); + progress.update_progress(task_progress); + for index in affected_indexes.iter() { + self.queue.tasks.update_index(wtxn, index, |bitmap| *bitmap -= &to_delete_tasks)?; + atomic_progress.fetch_add(1, Ordering::Relaxed); + } + + for status in affected_statuses.iter() { + self.queue.tasks.update_status(wtxn, *status, |bitmap| *bitmap -= &to_delete_tasks)?; + atomic_progress.fetch_add(1, Ordering::Relaxed); + } + + for kind in affected_kinds.iter() { + self.queue.tasks.update_kind(wtxn, *kind, |bitmap| *bitmap -= &to_delete_tasks)?; + atomic_progress.fetch_add(1, Ordering::Relaxed); + } + + progress.update_progress(TaskDeletionProgress::DeletingTasks); + let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u32); + progress.update_progress(task_progress); + for range in consecutive_ranges(to_delete_tasks.iter()) { + self.queue + .tasks + .all_tasks + .delete_range(wtxn, &(Bound::Included(range.0), Bound::Included(range.1)))?; + atomic_progress.fetch_add(1, Ordering::Relaxed); + } + for canceled_by in affected_canceled_by { + if let Some(mut tasks) = self.queue.tasks.canceled_by.get(wtxn, &canceled_by)? { + tasks -= &to_delete_tasks; + if tasks.is_empty() { + self.queue.tasks.canceled_by.delete(wtxn, &canceled_by)?; + } else { + self.queue.tasks.canceled_by.put(wtxn, &canceled_by, &tasks)?; + } + } + } + + progress.update_progress(TaskDeletionProgress::DeletingBatches); + + for range in consecutive_ranges(to_delete_batches.iter()) { self.queue .batches .all_batches @@ -846,25 +847,39 @@ impl IndexScheduler { .delete_range(wtxn, &(Bound::Included(range.0), Bound::Included(range.1)))?; } + progress.update_progress(TaskDeletionProgress::DeletingBatchesDateTime); + + for (started_at, nb_tasks, batch_id) in tasks_to_remove_earlier { + remove_n_tasks_datetime_earlier_than( + wtxn, + self.queue.batches.enqueued_at, + started_at, + nb_tasks, + batch_id, + )?; + } + remove_datetimes( wtxn, - min_enqueued..=max_enqueued, - enqueued_to_remove, + batches_min_enqueued..=batches_max_enqueued, + batches_enqueued_to_remove, self.queue.batches.enqueued_at, )?; remove_datetimes( wtxn, - min_started..=max_started, - started_to_remove, + batches_min_started..=batches_max_started, + batches_started_to_remove, self.queue.batches.started_at, )?; remove_datetimes( wtxn, - min_finished..=max_finished, - finished_to_remove, + batches_min_finished..=batches_max_finished, + batches_finished_to_remove, self.queue.batches.finished_at, )?; + progress.update_progress(TaskDeletionProgress::DeletingBatchesMetadata); + for (index, batches) in to_remove_from_indexes { self.queue.tasks.update_index(wtxn, index, |index_tasks| { *index_tasks -= &batches; @@ -885,7 +900,7 @@ impl IndexScheduler { wtxn_owned.commit()?; - Ok(tasks_to_remove) + Ok(to_delete_tasks) } /// Cancel each given task from all the databases (if it is cancelable).