diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index 0ed970ab9..1cadd1037 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -19,7 +19,7 @@ use crate::processing::{ UpdateIndexProgress, }; use crate::utils::{ - self, remove_n_tasks_datetime_earlier_than, remove_task_datetime, swap_index_uid_in_task, + remove_n_tasks_datetime_earlier_than, remove_task_datetime, swap_index_uid_in_task, ProcessingBatch, }; use crate::{Error, IndexScheduler, Result, TaskId}; @@ -566,58 +566,65 @@ impl IndexScheduler { let mut enqueued_to_remove: HashMap = HashMap::new(); let mut started_to_remove: HashMap = HashMap::new(); let mut finished_to_remove: HashMap = HashMap::new(); - for task_id in to_delete_tasks.iter() { - let task = - self.queue.tasks.get_task(wtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?; + for range in consecutive_ranges(to_delete_tasks.iter()) { + let iter = self + .queue + .tasks + .all_tasks + .range(wtxn, &(Bound::Included(range.0), Bound::Included(range.1)))?; + for task in iter { + let (task_id, task) = task?; - affected_indexes.extend(task.indexes().into_iter().map(|x| x.to_owned())); - affected_statuses.insert(task.status); - affected_kinds.insert(task.kind.as_kind()); - // Note: don't delete the persisted task data since - // we can only delete succeeded, failed, and canceled tasks. - // In each of those cases, the persisted data is supposed to - // have been deleted already. + affected_indexes.extend(task.indexes().into_iter().map(|x| x.to_owned())); + affected_statuses.insert(task.status); + affected_kinds.insert(task.kind.as_kind()); - let enqueued_at = task.enqueued_at.unix_timestamp_nanos(); - if enqueued_at < min_enqueued { - min_enqueued = enqueued_at; - } - if enqueued_at > max_enqueued { - max_enqueued = enqueued_at; - } - enqueued_to_remove.entry(enqueued_at).or_default().insert(task_id); - - if let Some(started_at) = task.started_at { - let started_at = started_at.unix_timestamp_nanos(); - if started_at < min_enqueued { - min_enqueued = started_at; + let enqueued_at = task.enqueued_at.unix_timestamp_nanos(); + if enqueued_at < min_enqueued { + min_enqueued = enqueued_at; } - if started_at > max_enqueued { - max_enqueued = started_at; + if enqueued_at > max_enqueued { + max_enqueued = enqueued_at; } - started_to_remove.entry(started_at).or_default().insert(task_id); - } + enqueued_to_remove.entry(enqueued_at).or_default().insert(task_id); - if let Some(finished_at) = task.finished_at { - let finished_at = finished_at.unix_timestamp_nanos(); - if finished_at < min_enqueued { - min_enqueued = finished_at; + if let Some(started_at) = task.started_at { + let started_at = started_at.unix_timestamp_nanos(); + if started_at < min_enqueued { + min_enqueued = started_at; + } + if started_at > max_enqueued { + max_enqueued = started_at; + } + started_to_remove.entry(started_at).or_default().insert(task_id); } - if finished_at > max_enqueued { - max_enqueued = finished_at; - } - finished_to_remove.entry(finished_at).or_default().insert(task_id); - } - if let Some(canceled_by) = task.canceled_by { - affected_canceled_by.insert(canceled_by); + if let Some(finished_at) = task.finished_at { + let finished_at = finished_at.unix_timestamp_nanos(); + if finished_at < min_enqueued { + min_enqueued = finished_at; + } + if finished_at > max_enqueued { + max_enqueued = finished_at; + } + finished_to_remove.entry(finished_at).or_default().insert(task_id); + } + + if let Some(canceled_by) = task.canceled_by { + affected_canceled_by.insert(canceled_by); + } + if let Some(batch_uid) = task.batch_uid { + affected_batches.entry(batch_uid).or_default().insert(task_id); + } + atomic_progress.fetch_add(1, Ordering::Relaxed); } - if let Some(batch_uid) = task.batch_uid { - affected_batches.entry(batch_uid).or_default().insert(task_id); - } - atomic_progress.fetch_add(1, Ordering::Relaxed); } + // Note: don't delete the persisted task data since + // we can only delete succeeded, failed, and canceled tasks. + // In each of those cases, the persisted data is supposed to + // have been deleted already. + progress.update_progress(TaskDeletionProgress::DeletingTasksDateTime); for (mut to_remove, db) in [ (enqueued_to_remove, &self.queue.tasks.enqueued_at),