From 62ad7b345a7c15be6c81591fdaa0a35d44aa2bb0 Mon Sep 17 00:00:00 2001 From: Mubelotix Date: Mon, 11 Aug 2025 18:09:11 +0200 Subject: [PATCH] Make code more readable --- .../src/scheduler/process_batch.rs | 58 +++---------------- 1 file changed, 8 insertions(+), 50 deletions(-) diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index e3d7dc999..729582cca 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -11,7 +11,6 @@ use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status use meilisearch_types::versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH}; use milli::update::Settings as MilliSettings; use roaring::RoaringBitmap; -use time::OffsetDateTime; use super::create_batch::Batch; use crate::processing::{ @@ -538,7 +537,6 @@ impl IndexScheduler { fn remove_datetimes( wtxn: &mut RwTxn<'_>, - range: RangeInclusive, mut to_remove: HashMap, db: Database, ) -> Result<()> { @@ -546,6 +544,10 @@ impl IndexScheduler { return Ok(()); } + let min = to_remove.keys().min().cloned().unwrap_or(i128::MAX); + let max = to_remove.keys().max().cloned().unwrap_or(i128::MIN); + let range = min..=max; + let mut iter = db.rev_range_mut(wtxn, &range)?; while let Some(i) = iter.next() { let (timestamp, mut tasks) = i?; @@ -564,31 +566,6 @@ impl IndexScheduler { Ok(()) } - trait UnixTimestampNanosOpt { - fn unix_timestamp_nanos(self) -> Option; - } - impl UnixTimestampNanosOpt for Option { - fn unix_timestamp_nanos(self) -> Option { - self.map(|dt| dt.unix_timestamp_nanos()) - } - } - impl UnixTimestampNanosOpt for i128 { - fn unix_timestamp_nanos(self) -> Option { - Some(self) - } - } - - /// Extends a range to include the given value - fn extend_range(value: impl UnixTimestampNanosOpt, range: &mut RangeInclusive) { - let Some(value) = value.unix_timestamp_nanos() else { return }; - if value < *range.start() { - *range = value..=*range.end(); - } - if value > *range.end() { - *range = *range.start()..=value; - } - } - progress.update_progress(TaskDeletionProgress::RetrievingTasks); let rtxn = self.env.read_txn()?; @@ -607,9 +584,6 @@ impl IndexScheduler { let mut affected_kinds = HashSet::new(); let mut affected_canceled_by = RoaringBitmap::new(); let mut affected_batches: HashMap = HashMap::new(); // The tasks that have been removed *per batches*. - let mut enqueued_range = i128::MAX..=i128::MIN; - let mut started_range = i128::MAX..=i128::MIN; - let mut finished_range = i128::MAX..=i128::MIN; let mut tasks_enqueued_to_remove: HashMap = HashMap::new(); let mut tasks_started_to_remove: HashMap = HashMap::new(); let mut tasks_finished_to_remove: HashMap = HashMap::new(); @@ -625,17 +599,14 @@ impl IndexScheduler { affected_kinds.insert(task.kind.as_kind()); let enqueued_at = task.enqueued_at.unix_timestamp_nanos(); - extend_range(enqueued_at, &mut enqueued_range); tasks_enqueued_to_remove.entry(enqueued_at).or_default().insert(task_id); - if let Some(started_at) = task.started_at.unix_timestamp_nanos() { - extend_range(started_at, &mut started_range); - tasks_started_to_remove.entry(started_at).or_default().insert(task_id); + if let Some(started_at) = task.started_at { + tasks_started_to_remove.entry(started_at.unix_timestamp_nanos()).or_default().insert(task_id); } - if let Some(finished_at) = task.finished_at.unix_timestamp_nanos() { - extend_range(finished_at, &mut finished_range); - tasks_finished_to_remove.entry(finished_at).or_default().insert(task_id); + if let Some(finished_at) = task.finished_at { + tasks_finished_to_remove.entry(finished_at.unix_timestamp_nanos()).or_default().insert(task_id); } if let Some(canceled_by) = task.canceled_by { @@ -669,9 +640,6 @@ impl IndexScheduler { let mut to_remove_from_kinds = HashMap::new(); // 4. Read affected batches' tasks - let mut batches_enqueued_range = i128::MAX..=i128::MIN; - let mut batches_started_range = i128::MAX..=i128::MIN; - let mut batches_finished_range = i128::MAX..=i128::MIN; let mut batches_enqueued_to_remove: HashMap = HashMap::new(); let mut batches_started_to_remove: HashMap = HashMap::new(); let mut batches_finished_to_remove: HashMap = HashMap::new(); @@ -745,8 +713,6 @@ impl IndexScheduler { if let Some(BatchEnqueuedAt { earliest, oldest }) = batch.enqueued_at { let earliest = earliest.unix_timestamp_nanos(); let oldest = oldest.unix_timestamp_nanos(); - extend_range(earliest, &mut batches_enqueued_range); - extend_range(oldest, &mut batches_enqueued_range); batches_enqueued_to_remove.entry(earliest).or_default().insert(batch_id); batches_enqueued_to_remove.entry(oldest).or_default().insert(batch_id); } else { @@ -759,11 +725,9 @@ impl IndexScheduler { )); } let started_at = batch.started_at.unix_timestamp_nanos(); - extend_range(started_at, &mut batches_started_range); batches_started_to_remove.entry(started_at).or_default().insert(batch_id); if let Some(finished_at) = batch.finished_at { let finished_at = finished_at.unix_timestamp_nanos(); - extend_range(finished_at, &mut batches_finished_range); batches_finished_to_remove.entry(finished_at).or_default().insert(batch_id); } @@ -780,21 +744,18 @@ impl IndexScheduler { remove_datetimes( wtxn, - enqueued_range, tasks_enqueued_to_remove, self.queue.tasks.enqueued_at, )?; remove_datetimes( wtxn, - started_range, tasks_started_to_remove, self.queue.tasks.started_at, )?; remove_datetimes( wtxn, - finished_range, tasks_finished_to_remove, self.queue.tasks.finished_at, )?; @@ -814,21 +775,18 @@ impl IndexScheduler { remove_datetimes( wtxn, - batches_enqueued_range, batches_enqueued_to_remove, self.queue.batches.enqueued_at, )?; remove_datetimes( wtxn, - batches_started_range, batches_started_to_remove, self.queue.batches.started_at, )?; remove_datetimes( wtxn, - batches_finished_range, batches_finished_to_remove, self.queue.batches.finished_at, )?;