From 418730ef73aebd71678b07b4bc8d393611ae0c81 Mon Sep 17 00:00:00 2001 From: Mubelotix Date: Fri, 8 Aug 2025 12:24:47 +0200 Subject: [PATCH] Optim D --- .../src/scheduler/process_batch.rs | 83 +++++++++++++++---- 1 file changed, 65 insertions(+), 18 deletions(-) diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index 2f856f55c..40f3ea427 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -561,6 +561,11 @@ impl IndexScheduler { let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u32); progress.update_progress(task_progress); + let mut min_enqueued = i128::MAX; + let mut max_enqueued = i128::MIN; + let mut enqueued_to_remove = HashMap::new(); + let mut started_to_remove = HashMap::new(); + let mut finished_to_remove = HashMap::new(); for task_id in to_delete_tasks.iter() { let task = self.queue.tasks.get_task(wtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?; @@ -572,28 +577,47 @@ impl IndexScheduler { // we can only delete succeeded, failed, and canceled tasks. // In each of those cases, the persisted data is supposed to // have been deleted already. - utils::remove_task_datetime( - wtxn, - self.queue.tasks.enqueued_at, - task.enqueued_at, - task.uid, - )?; + + let enqueued_at = task.enqueued_at.unix_timestamp_nanos(); + if enqueued_at < min_enqueued { + min_enqueued = enqueued_at; + } + if enqueued_at > max_enqueued { + max_enqueued = enqueued_at; + } + enqueued_to_remove + .entry(enqueued_at) + .or_insert_with(RoaringBitmap::new) + .insert(task_id); + if let Some(started_at) = task.started_at { - utils::remove_task_datetime( - wtxn, - self.queue.tasks.started_at, - started_at, - task.uid, - )?; + let started_at = started_at.unix_timestamp_nanos(); + if started_at < min_enqueued { + min_enqueued = started_at; + } + if started_at > max_enqueued { + max_enqueued = started_at; + } + started_to_remove + .entry(started_at) + .or_insert_with(RoaringBitmap::new) + .insert(task_id); } + if let Some(finished_at) = task.finished_at { - utils::remove_task_datetime( - wtxn, - self.queue.tasks.finished_at, - finished_at, - task.uid, - )?; + let finished_at = finished_at.unix_timestamp_nanos(); + if finished_at < min_enqueued { + min_enqueued = finished_at; + } + if finished_at > max_enqueued { + max_enqueued = finished_at; + } + finished_to_remove + .entry(finished_at) + .or_insert_with(RoaringBitmap::new) + .insert(task_id); } + if let Some(canceled_by) = task.canceled_by { affected_canceled_by.insert(canceled_by); } @@ -603,6 +627,29 @@ impl IndexScheduler { atomic_progress.fetch_add(1, Ordering::Relaxed); } + for (mut to_remove, db) in [ + (enqueued_to_remove, &self.queue.tasks.enqueued_at), + (started_to_remove, &self.queue.tasks.started_at), + (finished_to_remove, &self.queue.tasks.finished_at), + ] { + if !to_remove.is_empty() { + let mut iter = db.rev_range_mut(wtxn, &(min_enqueued..=max_enqueued))?; + while let Some(i) = iter.next() { + let (timestamp, mut tasks) = i?; + if let Some(to_remove_tasks) = to_remove.remove(×tamp) { + tasks -= &to_remove_tasks; + if tasks.is_empty() { + // safety: We don't keep references to the database + unsafe { iter.del_current()? }; + } else { + // safety: We don't keep references to the database + unsafe { iter.put_current(×tamp, &tasks)? }; + } + } + } + } + } + progress.update_progress(TaskDeletionProgress::DeletingTasksMetadata); let (atomic_progress, task_progress) = AtomicTaskStep::new( (affected_indexes.len() + affected_statuses.len() + affected_kinds.len()) as u32,