From 60a1188a58eed54e96ca1be8f99d4beb5c54b201 Mon Sep 17 00:00:00 2001 From: Mubelotix Date: Tue, 12 Aug 2025 09:33:38 +0200 Subject: [PATCH] Optim L: Delete datetimes by range --- .../src/scheduler/process_batch.rs | 90 ++++++++++--------- 1 file changed, 46 insertions(+), 44 deletions(-) diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index 729582cca..0ae45abac 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -548,20 +548,41 @@ impl IndexScheduler { let max = to_remove.keys().max().cloned().unwrap_or(i128::MIN); let range = min..=max; - let mut iter = db.rev_range_mut(wtxn, &range)?; - while let Some(i) = iter.next() { - let (timestamp, mut tasks) = i?; - if let Some(to_remove_tasks) = to_remove.remove(×tamp) { - tasks -= &to_remove_tasks; - if tasks.is_empty() { - // safety: We don't keep references to the database - unsafe { iter.del_current()? }; + // We iterate over the time database to see which ranges of timestamps need to be removed + let lazy_db = db.lazily_decode_data(); + let iter = lazy_db.range(wtxn, &range)?; + let mut delete_range_start = None; + let mut delete_ranges = Vec::new(); + let mut to_put: HashMap = HashMap::new(); + for i in iter { + let (timestamp, data) = i?; + + if let Some(to_remove) = to_remove.remove(×tamp) { + let mut current = data.decode().unwrap(); // TODO + current -= &to_remove; + + if current.is_empty() { + delete_range_start = Some(timestamp); } else { - // safety: We don't keep references to the database - unsafe { iter.put_current(×tamp, &tasks)? }; + // We could close the deletion range but it's not necessary because the new value will get reinserted anyway + to_put.insert(timestamp, current); } + } else if let Some(delete_range_start) = delete_range_start.take() { + // Current one must not be deleted so we need to skip it + delete_ranges.push(delete_range_start..timestamp); } } + if let Some(delete_range_start) = delete_range_start.take() { + delete_ranges.push(delete_range_start..i128::MAX); + } + + for range in delete_ranges { + db.delete_range(wtxn, &range)?; + } + + for (timestamp, data) in to_put { + db.put(wtxn, ×tamp, &data)?; + } Ok(()) } @@ -602,11 +623,17 @@ impl IndexScheduler { tasks_enqueued_to_remove.entry(enqueued_at).or_default().insert(task_id); if let Some(started_at) = task.started_at { - tasks_started_to_remove.entry(started_at.unix_timestamp_nanos()).or_default().insert(task_id); + tasks_started_to_remove + .entry(started_at.unix_timestamp_nanos()) + .or_default() + .insert(task_id); } if let Some(finished_at) = task.finished_at { - tasks_finished_to_remove.entry(finished_at.unix_timestamp_nanos()).or_default().insert(task_id); + tasks_finished_to_remove + .entry(finished_at.unix_timestamp_nanos()) + .or_default() + .insert(task_id); } if let Some(canceled_by) = task.canceled_by { @@ -702,8 +729,7 @@ impl IndexScheduler { // 5. Read batches metadata progress.update_progress(TaskDeletionProgress::RetrievingBatches); - let (atomic_progress, task_progress) = - AtomicBatchStep::new(to_delete_batches.len() as u32); + let (atomic_progress, task_progress) = AtomicBatchStep::new(to_delete_batches.len() as u32); progress.update_progress(task_progress); for range in consecutive_ranges(to_delete_batches.iter()) { let iter = self.queue.batches.all_batches.range(&rtxn, &range)?; @@ -742,23 +768,11 @@ impl IndexScheduler { // 5. Remove task datetimes progress.update_progress(TaskDeletionProgress::DeletingTasksDateTime); - remove_datetimes( - wtxn, - tasks_enqueued_to_remove, - self.queue.tasks.enqueued_at, - )?; + remove_datetimes(wtxn, tasks_enqueued_to_remove, self.queue.tasks.enqueued_at)?; - remove_datetimes( - wtxn, - tasks_started_to_remove, - self.queue.tasks.started_at, - )?; + remove_datetimes(wtxn, tasks_started_to_remove, self.queue.tasks.started_at)?; - remove_datetimes( - wtxn, - tasks_finished_to_remove, - self.queue.tasks.finished_at, - )?; + remove_datetimes(wtxn, tasks_finished_to_remove, self.queue.tasks.finished_at)?; // 6. Delete batches datetimes progress.update_progress(TaskDeletionProgress::DeletingBatchesDateTime); @@ -773,23 +787,11 @@ impl IndexScheduler { )?; } - remove_datetimes( - wtxn, - batches_enqueued_to_remove, - self.queue.batches.enqueued_at, - )?; + remove_datetimes(wtxn, batches_enqueued_to_remove, self.queue.batches.enqueued_at)?; - remove_datetimes( - wtxn, - batches_started_to_remove, - self.queue.batches.started_at, - )?; + remove_datetimes(wtxn, batches_started_to_remove, self.queue.batches.started_at)?; - remove_datetimes( - wtxn, - batches_finished_to_remove, - self.queue.batches.finished_at, - )?; + remove_datetimes(wtxn, batches_finished_to_remove, self.queue.batches.finished_at)?; // 7. Remove tasks from indexes, statuses, and kinds progress.update_progress(TaskDeletionProgress::DeletingTasksMetadata);