From e1fa79e92acbd4397474ddbb09ff388b11b05bd1 Mon Sep 17 00:00:00 2001 From: Mubelotix Date: Tue, 12 Aug 2025 13:47:40 +0200 Subject: [PATCH] Optim O: Put instead of update --- .../src/scheduler/process_batch.rs | 51 ++++++++++--------- 1 file changed, 27 insertions(+), 24 deletions(-) diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index 745009ce2..7a43be763 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -1,5 +1,5 @@ use std::collections::{BTreeSet, HashMap, HashSet}; -use std::ops::{Range, RangeInclusive}; +use std::ops::RangeInclusive; use std::panic::{catch_unwind, AssertUnwindSafe}; use std::sync::atomic::Ordering; @@ -805,29 +805,7 @@ impl IndexScheduler { self.queue.batches.finished_at, )?; - // 9. Remove tasks from indexes, statuses, and kinds - progress.update_progress(TaskDeletionProgress::DeletingTasksMetadata); - let (atomic_progress, task_progress) = AtomicTaskStep::new( - (affected_indexes.len() + affected_statuses.len() + affected_kinds.len()) as u32, - ); - progress.update_progress(task_progress); - - for index in affected_indexes.iter() { - self.queue.tasks.update_index(wtxn, index, |bitmap| *bitmap -= &to_delete_tasks)?; - atomic_progress.fetch_add(1, Ordering::Relaxed); - } - - for status in affected_statuses.iter() { - self.queue.tasks.update_status(wtxn, *status, |bitmap| *bitmap -= &to_delete_tasks)?; - atomic_progress.fetch_add(1, Ordering::Relaxed); - } - - for kind in affected_kinds.iter() { - self.queue.tasks.update_kind(wtxn, *kind, |bitmap| *bitmap -= &to_delete_tasks)?; - atomic_progress.fetch_add(1, Ordering::Relaxed); - } - - // 10. Remove batches metadata from indexes, statuses, and kinds + // 9. Remove batches metadata from indexes, statuses, and kinds progress.update_progress(TaskDeletionProgress::DeletingBatchesMetadata); for (index, batches) in to_remove_from_indexes { @@ -848,6 +826,31 @@ impl IndexScheduler { })?; } + // 10. Remove tasks from indexes, statuses, and kinds + progress.update_progress(TaskDeletionProgress::DeletingTasksMetadata); + let (atomic_progress, task_progress) = AtomicTaskStep::new( + (affected_indexes.len() + affected_statuses.len() + affected_kinds.len()) as u32, + ); + progress.update_progress(task_progress); + + for (index, mut previous_tasks) in affected_indexes_tasks.into_iter() { + previous_tasks -= &to_delete_tasks; + self.queue.tasks.index_tasks.put(wtxn, index, &previous_tasks)?; + atomic_progress.fetch_add(1, Ordering::Relaxed); + } + + for (status, mut previous_tasks) in affected_statuses_tasks.into_iter() { + previous_tasks -= &to_delete_tasks; + self.queue.tasks.status.put(wtxn, &status, &previous_tasks)?; + atomic_progress.fetch_add(1, Ordering::Relaxed); + } + + for (kind, mut previous_tasks) in affected_kinds_tasks.into_iter() { + previous_tasks -= &to_delete_tasks; + self.queue.tasks.kind.put(wtxn, &kind, &previous_tasks)?; + atomic_progress.fetch_add(1, Ordering::Relaxed); + } + // 11. Delete tasks progress.update_progress(TaskDeletionProgress::DeletingTasks); let (atomic_progress, task_progress) =