From 0e9ac3ca92420b1d5dd1029a81ffcb756b16660d Mon Sep 17 00:00:00 2001 From: Mubelotix Date: Tue, 12 Aug 2025 10:48:37 +0200 Subject: [PATCH] Attempt --- crates/index-scheduler/src/processing.rs | 2 + .../src/scheduler/process_batch.rs | 130 ++++++++++++------ 2 files changed, 92 insertions(+), 40 deletions(-) diff --git a/crates/index-scheduler/src/processing.rs b/crates/index-scheduler/src/processing.rs index 7a222fd7c..26cc03b12 100644 --- a/crates/index-scheduler/src/processing.rs +++ b/crates/index-scheduler/src/processing.rs @@ -83,6 +83,8 @@ make_enum_progress! { pub enum TaskDeletionProgress { RetrievingTasks, RetrievingBatchTasks, + RetrievingTaskDateTimes, + RetrievingBatchDateTimes, DeletingTasksDateTime, DeletingBatchesDateTime, DeletingTasksMetadata, diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index 6434299af..f4b5117cc 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -1,5 +1,5 @@ use std::collections::{BTreeSet, HashMap, HashSet}; -use std::ops::RangeInclusive; +use std::ops::{Range, RangeInclusive}; use std::panic::{catch_unwind, AssertUnwindSafe}; use std::sync::atomic::Ordering; @@ -535,13 +535,15 @@ impl IndexScheduler { }) } - fn remove_datetimes( - wtxn: &mut RwTxn<'_>, + type DateTimeChanges = (Vec>, HashMap); + + fn prepare_remove_datetimes( + rtxn: &RoTxn<'_>, mut to_remove: HashMap, db: Database, - ) -> Result<()> { + ) -> Result { if to_remove.is_empty() { - return Ok(()); + return Ok((Vec::new(), HashMap::new())); } let min = to_remove.keys().min().cloned().unwrap_or(i128::MAX); @@ -550,7 +552,7 @@ impl IndexScheduler { // We iterate over the time database to see which ranges of timestamps need to be removed let lazy_db = db.lazily_decode_data(); - let iter = lazy_db.range(wtxn, &range)?; + let iter = lazy_db.range(rtxn, &range)?; let mut delete_range_start = None; let mut delete_ranges = Vec::new(); let mut to_put: HashMap = HashMap::new(); @@ -576,35 +578,27 @@ impl IndexScheduler { delete_ranges.push(delete_range_start..i128::MAX); } - for range in delete_ranges { - db.delete_range(wtxn, &range)?; - } - - for (timestamp, data) in to_put { - db.put(wtxn, ×tamp, &data)?; - } - - Ok(()) + Ok((delete_ranges, to_put)) } - fn remove_batch_datetimes( - wtxn: &mut RwTxn<'_>, + fn prepare_remove_batch_datetimes( + rtxn: &RoTxn<'_>, to_remove: &RoaringBitmap, db: Database, - ) -> Result<()> { + ) -> Result { if to_remove.is_empty() { - return Ok(()); + return Ok((Vec::new(), HashMap::new())); } // We iterate over the time database to see which ranges of timestamps need to be removed - let iter = db.iter(wtxn)?; + let iter = db.iter(rtxn)?; let mut delete_range_start = None; let mut delete_ranges = Vec::new(); let mut to_put: HashMap = HashMap::new(); for i in iter { let (timestamp, mut current) = i?; - if current.iter().any(|task_id| to_remove.contains(task_id)) { + if !current.is_disjoint(to_remove) { current -= to_remove; if current.is_empty() { @@ -622,6 +616,14 @@ impl IndexScheduler { delete_ranges.push(delete_range_start..i128::MAX); } + Ok((delete_ranges, to_put)) + } + + fn remove_datetimes( + wtxn: &mut RwTxn<'_>, + (delete_ranges, to_put): DateTimeChanges, + db: Database, + ) -> Result<()> { for range in delete_ranges { db.delete_range(wtxn, &range)?; } @@ -633,8 +635,6 @@ impl IndexScheduler { Ok(()) } - let instant = std::time::Instant::now(); - progress.update_progress(TaskDeletionProgress::RetrievingTasks); let rtxn = self.env.read_txn()?; @@ -736,7 +736,7 @@ impl IndexScheduler { // We must remove the batch from all the reverse indexes it no longer has tasks for. for (index, index_tasks) in affected_indexes_tasks.iter() { - if index_tasks.intersection_len(&tasks) == 0 { + if !index_tasks.is_disjoint(&tasks) { to_remove_from_indexes .entry(index) .or_insert_with(RoaringBitmap::new) @@ -745,7 +745,7 @@ impl IndexScheduler { } for (status, status_tasks) in affected_statuses_tasks.iter() { - if status_tasks.intersection_len(&tasks) == 0 { + if !status_tasks.is_disjoint(&tasks) { to_remove_from_statuses .entry(*status) .or_insert_with(RoaringBitmap::new) @@ -754,7 +754,7 @@ impl IndexScheduler { } for (kind, kind_tasks) in affected_kinds_tasks.iter() { - if kind_tasks.intersection_len(&tasks) == 0 { + if !kind_tasks.is_disjoint(&tasks) { to_remove_from_kinds .entry(*kind) .or_insert_with(RoaringBitmap::new) @@ -771,31 +771,81 @@ impl IndexScheduler { } } + // 5. Prepare to remove the datetimes from the tasks + progress.update_progress(TaskDeletionProgress::RetrievingTaskDateTimes); + let task_enqueued_changes = prepare_remove_datetimes( + &rtxn, + tasks_enqueued_to_remove, + self.queue.tasks.enqueued_at, + )?; + + let task_started_changes = prepare_remove_datetimes( + &rtxn, + tasks_started_to_remove, + self.queue.tasks.started_at, + )?; + + let task_finished_changes = prepare_remove_datetimes( + &rtxn, + tasks_finished_to_remove, + self.queue.tasks.finished_at, + )?; + + // 6. Prepare to remove the datetimes from the batches + progress.update_progress(TaskDeletionProgress::RetrievingBatchDateTimes); + let batch_enqueued_changes = prepare_remove_batch_datetimes( + &rtxn, + &to_delete_batches, + self.queue.batches.enqueued_at, + )?; + + let batch_started_changes = prepare_remove_batch_datetimes( + &rtxn, + &to_delete_batches, + self.queue.batches.started_at, + )?; + + let batch_finished_changes = prepare_remove_batch_datetimes( + &rtxn, + &to_delete_batches, + self.queue.batches.finished_at, + )?; + drop(rtxn); let mut owned_wtxn = self.env.write_txn()?; let wtxn = &mut owned_wtxn; - // 5. Remove task datetimes + // 7. Remove task datetimes progress.update_progress(TaskDeletionProgress::DeletingTasksDateTime); - remove_datetimes(wtxn, tasks_enqueued_to_remove, self.queue.tasks.enqueued_at)?; + remove_datetimes(wtxn, task_enqueued_changes, self.queue.tasks.enqueued_at)?; - remove_datetimes(wtxn, tasks_started_to_remove, self.queue.tasks.started_at)?; + remove_datetimes(wtxn, task_started_changes, self.queue.tasks.started_at)?; - remove_datetimes(wtxn, tasks_finished_to_remove, self.queue.tasks.finished_at)?; + remove_datetimes(wtxn, task_finished_changes, self.queue.tasks.finished_at)?; - // 6. Delete batches datetimes + // 8. Delete batches datetimes progress.update_progress(TaskDeletionProgress::DeletingBatchesDateTime); - remove_batch_datetimes(wtxn, &to_delete_batches, self.queue.batches.enqueued_at)?; + remove_datetimes( + wtxn, + batch_enqueued_changes, + self.queue.batches.enqueued_at, + )?; - remove_batch_datetimes(wtxn, &to_delete_batches, self.queue.batches.started_at)?; + remove_datetimes( + wtxn, + batch_started_changes, + self.queue.batches.started_at, + )?; - remove_batch_datetimes(wtxn, &to_delete_batches, self.queue.batches.finished_at)?; + remove_datetimes( + wtxn, + batch_finished_changes, + self.queue.batches.finished_at, + )?; - remove_datetimes(wtxn, batches_finished_to_remove, self.queue.batches.finished_at)?; - - // 7. Remove tasks from indexes, statuses, and kinds + // 9. Remove tasks from indexes, statuses, and kinds progress.update_progress(TaskDeletionProgress::DeletingTasksMetadata); let (atomic_progress, task_progress) = AtomicTaskStep::new( (affected_indexes.len() + affected_statuses.len() + affected_kinds.len()) as u32, @@ -817,7 +867,7 @@ impl IndexScheduler { atomic_progress.fetch_add(1, Ordering::Relaxed); } - // 8. Remove batches metadata from indexes, statuses, and kinds + // 10. Remove batches metadata from indexes, statuses, and kinds progress.update_progress(TaskDeletionProgress::DeletingBatchesMetadata); for (index, batches) in to_remove_from_indexes { @@ -838,7 +888,7 @@ impl IndexScheduler { })?; } - // 9. Delete tasks + // 11. Delete tasks progress.update_progress(TaskDeletionProgress::DeletingTasks); let (atomic_progress, task_progress) = AtomicTaskStep::new((to_delete_tasks.len() + affected_canceled_by.len()) as u32); @@ -860,7 +910,7 @@ impl IndexScheduler { atomic_progress.fetch_add(1, Ordering::Relaxed); } - // 10. Delete batches + // 12. Delete batches progress.update_progress(TaskDeletionProgress::DeletingBatches); let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_batches.len() as u32); progress.update_progress(task_progress);