From 1cb9816f4442fc4daf0ad0cb0e1adf9759502592 Mon Sep 17 00:00:00 2001 From: Mubelotix Date: Fri, 8 Aug 2025 15:43:34 +0200 Subject: [PATCH] Optim J: Use iterator on batch reader --- .../src/scheduler/process_batch.rs | 25 +++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index 17c33d477..8f784053f 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -681,10 +681,16 @@ impl IndexScheduler { let mut tasks_to_remove_earlier = Vec::new(); progress.update_progress(TaskDeletionProgress::RetrievingBatches); - let (atomic_progress, batch_progress) = AtomicBatchStep::new(affected_batches.len() as u32); - progress.update_progress(batch_progress); - for (batch_id, to_delete_tasks) in affected_batches { - if let Some(mut tasks) = self.queue.batch_to_tasks_mapping.get(&rtxn, &batch_id)? { + let affected_batches_bitmap = RoaringBitmap::from_iter(affected_batches.keys()); + for range in consecutive_ranges(affected_batches_bitmap.iter()) { + let iter = self + .queue + .batch_to_tasks_mapping + .range(&rtxn, &(Bound::Included(range.0), Bound::Included(range.1)))?; + for i in iter { + let (batch_id, mut tasks) = i?; + let to_delete_tasks = affected_batches.remove(&batch_id).unwrap_or_default(); + tasks -= &to_delete_tasks; // We must remove the batch entirely if tasks.is_empty() { @@ -698,7 +704,10 @@ impl IndexScheduler { if oldest > batches_max_enqueued { batches_max_enqueued = oldest; } - batches_enqueued_to_remove.entry(earliest).or_default().insert(batch_id); + batches_enqueued_to_remove + .entry(earliest) + .or_default() + .insert(batch_id); batches_enqueued_to_remove.entry(oldest).or_default().insert(batch_id); } else { // If we don't have the enqueued at in the batch it means the database comes from the v1.12 @@ -725,7 +734,10 @@ impl IndexScheduler { if finished_at > batches_max_finished { batches_max_finished = finished_at; } - batches_finished_to_remove.entry(finished_at).or_default().insert(batch_id); + batches_finished_to_remove + .entry(finished_at) + .or_default() + .insert(batch_id); } to_delete_batches.insert(batch_id); @@ -762,7 +774,6 @@ impl IndexScheduler { } } } - atomic_progress.fetch_add(1, Ordering::Relaxed); } // Note: don't delete the persisted task data since