From ba0f0c3c30ae88498b163c913d4967aa25b44cb3 Mon Sep 17 00:00:00 2001 From: Mubelotix Date: Fri, 8 Aug 2025 14:55:24 +0200 Subject: [PATCH] Optim G: Use the datetime delete function on batches --- .../src/scheduler/process_batch.rs | 176 ++++++++++++------ 1 file changed, 116 insertions(+), 60 deletions(-) diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index d42a6bad7..e678f4505 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -1,12 +1,12 @@ use std::collections::{BTreeSet, HashMap, HashSet}; -use std::ops::Bound; +use std::ops::{Bound, RangeInclusive}; use std::panic::{catch_unwind, AssertUnwindSafe}; use std::sync::atomic::Ordering; use meilisearch_types::batches::{BatchEnqueuedAt, BatchId}; -use meilisearch_types::heed::{RoTxn, RwTxn}; +use meilisearch_types::heed::{Database, RoTxn, RwTxn}; use meilisearch_types::milli::progress::{Progress, VariableNameStep}; -use meilisearch_types::milli::{self, ChannelCongestion}; +use meilisearch_types::milli::{self, CboRoaringBitmapCodec, ChannelCongestion}; use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task}; use meilisearch_types::versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH}; use milli::update::Settings as MilliSettings; @@ -22,7 +22,7 @@ use crate::utils::{ remove_n_tasks_datetime_earlier_than, remove_task_datetime, swap_index_uid_in_task, ProcessingBatch, }; -use crate::{Error, IndexScheduler, Result, TaskId}; +use crate::{Error, IndexScheduler, Result, TaskId, BEI128}; #[derive(Debug, Default)] pub struct ProcessBatchInfo { @@ -102,8 +102,7 @@ impl IndexScheduler { } } - let mut deleted_tasks = - self.delete_matched_tasks(&matched_tasks, &progress)?; + let mut deleted_tasks = self.delete_matched_tasks(&matched_tasks, &progress)?; for task in tasks.iter_mut() { task.status = Status::Succeeded; @@ -537,6 +536,32 @@ impl IndexScheduler { Some((start, end)) }) } + + fn remove_datetimes( + wtxn: &mut RwTxn<'_>, + range: RangeInclusive, + mut to_remove: HashMap, + db: Database, + ) -> Result<()> { + if !to_remove.is_empty() { + let mut iter = db.rev_range_mut(wtxn, &range)?; + while let Some(i) = iter.next() { + let (timestamp, mut tasks) = i?; + if let Some(to_remove_tasks) = to_remove.remove(×tamp) { + tasks -= &to_remove_tasks; + if tasks.is_empty() { + // safety: We don't keep references to the database + unsafe { iter.del_current()? }; + } else { + // safety: We don't keep references to the database + unsafe { iter.put_current(×tamp, &tasks)? }; + } + } + } + } + Ok(()) + } + progress.update_progress(TaskDeletionProgress::RetrievingTasks); let rtxn = self.env.read_txn()?; @@ -562,6 +587,10 @@ impl IndexScheduler { progress.update_progress(task_progress); let mut min_enqueued = i128::MAX; let mut max_enqueued = i128::MIN; + let mut min_started = i128::MAX; + let mut max_started = i128::MIN; + let mut min_finished = i128::MAX; + let mut max_finished = i128::MIN; let mut enqueued_to_remove: HashMap = HashMap::new(); let mut started_to_remove: HashMap = HashMap::new(); let mut finished_to_remove: HashMap = HashMap::new(); @@ -589,22 +618,22 @@ impl IndexScheduler { if let Some(started_at) = task.started_at { let started_at = started_at.unix_timestamp_nanos(); - if started_at < min_enqueued { - min_enqueued = started_at; + if started_at < min_started { + min_started = started_at; } - if started_at > max_enqueued { - max_enqueued = started_at; + if started_at > max_started { + max_started = started_at; } started_to_remove.entry(started_at).or_default().insert(task_id); } if let Some(finished_at) = task.finished_at { let finished_at = finished_at.unix_timestamp_nanos(); - if finished_at < min_enqueued { - min_enqueued = finished_at; + if finished_at < min_finished { + min_finished = finished_at; } - if finished_at > max_enqueued { - max_enqueued = finished_at; + if finished_at > max_finished { + max_finished = finished_at; } finished_to_remove.entry(finished_at).or_default().insert(task_id); } @@ -629,28 +658,24 @@ impl IndexScheduler { let wtxn = &mut wtxn_owned; progress.update_progress(TaskDeletionProgress::DeletingTasksDateTime); - for (mut to_remove, db) in [ - (enqueued_to_remove, &self.queue.tasks.enqueued_at), - (started_to_remove, &self.queue.tasks.started_at), - (finished_to_remove, &self.queue.tasks.finished_at), - ] { - if !to_remove.is_empty() { - let mut iter = db.rev_range_mut(wtxn, &(min_enqueued..=max_enqueued))?; - while let Some(i) = iter.next() { - let (timestamp, mut tasks) = i?; - if let Some(to_remove_tasks) = to_remove.remove(×tamp) { - tasks -= &to_remove_tasks; - if tasks.is_empty() { - // safety: We don't keep references to the database - unsafe { iter.del_current()? }; - } else { - // safety: We don't keep references to the database - unsafe { iter.put_current(×tamp, &tasks)? }; - } - } - } - } - } + remove_datetimes( + wtxn, + min_enqueued..=max_enqueued, + enqueued_to_remove, + self.queue.batches.enqueued_at, + )?; + remove_datetimes( + wtxn, + min_started..=max_started, + started_to_remove, + self.queue.batches.started_at, + )?; + remove_datetimes( + wtxn, + min_finished..=max_finished, + finished_to_remove, + self.queue.batches.finished_at, + )?; progress.update_progress(TaskDeletionProgress::DeletingTasksMetadata); let (atomic_progress, task_progress) = AtomicTaskStep::new( @@ -715,6 +740,16 @@ impl IndexScheduler { } let mut to_remove_from_kinds = HashMap::new(); + let mut enqueued_to_remove: HashMap = HashMap::new(); + let mut min_enqueued = i128::MAX; + let mut max_enqueued = i128::MIN; + let mut started_to_remove: HashMap = HashMap::new(); + let mut min_started = i128::MAX; + let mut max_started = i128::MIN; + let mut finished_to_remove: HashMap = HashMap::new(); + let mut min_finished = i128::MAX; + let mut max_finished = i128::MIN; + for (batch_id, to_delete_tasks) in affected_batches { if let Some(mut tasks) = self.queue.batch_to_tasks_mapping.get(wtxn, &batch_id)? { tasks -= &to_delete_tasks; @@ -722,18 +757,16 @@ impl IndexScheduler { if tasks.is_empty() { if let Some(batch) = self.queue.batches.get_batch(wtxn, batch_id)? { if let Some(BatchEnqueuedAt { earliest, oldest }) = batch.enqueued_at { - remove_task_datetime( - wtxn, - self.queue.batches.enqueued_at, - earliest, - batch_id, - )?; - remove_task_datetime( - wtxn, - self.queue.batches.enqueued_at, - oldest, - batch_id, - )?; + let earliest = earliest.unix_timestamp_nanos(); + let oldest = oldest.unix_timestamp_nanos(); + if earliest < min_enqueued { + min_enqueued = earliest; + } + if oldest > max_enqueued { + max_enqueued = oldest; + } + enqueued_to_remove.entry(earliest).or_default().insert(batch_id); + enqueued_to_remove.entry(oldest).or_default().insert(batch_id); } else { // If we don't have the enqueued at in the batch it means the database comes from the v1.12 // and we still need to find the date by scrolling the database @@ -745,19 +778,23 @@ impl IndexScheduler { batch_id, )?; } - remove_task_datetime( - wtxn, - self.queue.batches.started_at, - batch.started_at, - batch_id, - )?; + let started_at = batch.started_at.unix_timestamp_nanos(); + if started_at < min_started { + min_started = started_at; + } + if started_at > max_started { + max_started = started_at; + } + started_to_remove.entry(started_at).or_default().insert(batch_id); if let Some(finished_at) = batch.finished_at { - remove_task_datetime( - wtxn, - self.queue.batches.finished_at, - finished_at, - batch_id, - )?; + let finished_at = finished_at.unix_timestamp_nanos(); + if finished_at < min_finished { + min_finished = finished_at; + } + if finished_at > max_finished { + max_finished = finished_at; + } + finished_to_remove.entry(finished_at).or_default().insert(batch_id); } self.queue.batches.all_batches.delete(wtxn, &batch_id)?; @@ -798,6 +835,25 @@ impl IndexScheduler { atomic_progress.fetch_add(1, Ordering::Relaxed); } + remove_datetimes( + wtxn, + min_enqueued..=max_enqueued, + enqueued_to_remove, + self.queue.batches.enqueued_at, + )?; + remove_datetimes( + wtxn, + min_started..=max_started, + started_to_remove, + self.queue.batches.started_at, + )?; + remove_datetimes( + wtxn, + min_finished..=max_finished, + finished_to_remove, + self.queue.batches.finished_at, + )?; + for (index, batches) in to_remove_from_indexes { self.queue.tasks.update_index(wtxn, index, |index_tasks| { *index_tasks -= &batches;