From 14cb1bbbfb2894235a3661fee0bdf4fcf9cff9d3 Mon Sep 17 00:00:00 2001 From: Mubelotix Date: Fri, 8 Aug 2025 11:32:17 +0200 Subject: [PATCH] Optimization B Speeds up the subtask "deleting batches" by 9% --- .../src/scheduler/process_batch.rs | 77 ++++++++++++++----- 1 file changed, 56 insertions(+), 21 deletions(-) diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index 43a28fe13..2f856f55c 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -646,6 +646,26 @@ impl IndexScheduler { progress.update_progress(TaskDeletionProgress::DeletingBatches); let (atomic_progress, batch_progress) = AtomicBatchStep::new(affected_batches.len() as u32); progress.update_progress(batch_progress); + + let mut affected_indexes_tasks = HashMap::new(); + for index in &affected_indexes { + affected_indexes_tasks + .insert(index.as_str(), self.queue.tasks.index_tasks(wtxn, index)?); + } + let mut to_remove_from_indexes = HashMap::new(); + + let mut affected_statuses_tasks = HashMap::new(); + for status in &affected_statuses { + affected_statuses_tasks.insert(*status, self.queue.tasks.get_status(wtxn, *status)?); + } + let mut to_remove_from_statuses = HashMap::new(); + + let mut affected_kinds_tasks = HashMap::new(); + for kind in &affected_kinds { + affected_kinds_tasks.insert(*kind, self.queue.tasks.get_kind(wtxn, *kind)?); + } + let mut to_remove_from_kinds = HashMap::new(); + for (batch_id, to_delete_tasks) in affected_batches { if let Some(mut tasks) = self.queue.batch_to_tasks_mapping.get(wtxn, &batch_id)? { tasks -= &to_delete_tasks; @@ -699,39 +719,54 @@ impl IndexScheduler { // Anyway, we must remove the batch from all its reverse indexes. // The only way to do that is to check - for index in affected_indexes.iter() { - let index_tasks = self.queue.tasks.index_tasks(wtxn, index)?; - let remaining_index_tasks = index_tasks & &tasks; - if remaining_index_tasks.is_empty() { - self.queue.batches.update_index(wtxn, index, |bitmap| { - bitmap.remove(batch_id); - })?; + for (index, index_tasks) in affected_indexes_tasks.iter() { + if index_tasks.intersection_len(&tasks) == 0 { + to_remove_from_indexes + .entry(index) + .or_insert_with(RoaringBitmap::new) + .insert(batch_id); } } - for status in affected_statuses.iter() { - let status_tasks = self.queue.tasks.get_status(wtxn, *status)?; - let remaining_status_tasks = status_tasks & &tasks; - if remaining_status_tasks.is_empty() { - self.queue.batches.update_status(wtxn, *status, |bitmap| { - bitmap.remove(batch_id); - })?; + for (status, status_tasks) in affected_statuses_tasks.iter() { + if status_tasks.intersection_len(&tasks) == 0 { + to_remove_from_statuses + .entry(*status) + .or_insert_with(RoaringBitmap::new) + .insert(batch_id); } } - for kind in affected_kinds.iter() { - let kind_tasks = self.queue.tasks.get_kind(wtxn, *kind)?; - let remaining_kind_tasks = kind_tasks & &tasks; - if remaining_kind_tasks.is_empty() { - self.queue.batches.update_kind(wtxn, *kind, |bitmap| { - bitmap.remove(batch_id); - })?; + for (kind, kind_tasks) in affected_kinds_tasks.iter() { + if kind_tasks.intersection_len(&tasks) == 0 { + to_remove_from_kinds + .entry(*kind) + .or_insert_with(RoaringBitmap::new) + .insert(batch_id); } } } atomic_progress.fetch_add(1, Ordering::Relaxed); } + for (index, batches) in to_remove_from_indexes { + self.queue.tasks.update_index(wtxn, index, |index_tasks| { + *index_tasks -= &batches; + })?; + } + + for (status, batches) in to_remove_from_statuses { + self.queue.tasks.update_status(wtxn, status, |status_tasks| { + *status_tasks -= &batches; + })?; + } + + for (kind, batches) in to_remove_from_kinds { + self.queue.tasks.update_kind(wtxn, kind, |kind_tasks| { + *kind_tasks -= &batches; + })?; + } + Ok(to_delete_tasks) }