diff --git a/crates/index-scheduler/src/processing.rs b/crates/index-scheduler/src/processing.rs index 26cc03b12..7a222fd7c 100644 --- a/crates/index-scheduler/src/processing.rs +++ b/crates/index-scheduler/src/processing.rs @@ -83,8 +83,6 @@ make_enum_progress! { pub enum TaskDeletionProgress { RetrievingTasks, RetrievingBatchTasks, - RetrievingTaskDateTimes, - RetrievingBatchDateTimes, DeletingTasksDateTime, DeletingBatchesDateTime, DeletingTasksMetadata, diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index f4b5117cc..158d1b1ca 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -535,15 +535,13 @@ impl IndexScheduler { }) } - type DateTimeChanges = (Vec>, HashMap); - - fn prepare_remove_datetimes( - rtxn: &RoTxn<'_>, + fn remove_task_datetimes( + wtxn: &mut RwTxn<'_>, mut to_remove: HashMap, db: Database, - ) -> Result { + ) -> Result<()> { if to_remove.is_empty() { - return Ok((Vec::new(), HashMap::new())); + return Ok(()); } let min = to_remove.keys().min().cloned().unwrap_or(i128::MAX); @@ -552,7 +550,7 @@ impl IndexScheduler { // We iterate over the time database to see which ranges of timestamps need to be removed let lazy_db = db.lazily_decode_data(); - let iter = lazy_db.range(rtxn, &range)?; + let iter = lazy_db.range(wtxn, &range)?; let mut delete_range_start = None; let mut delete_ranges = Vec::new(); let mut to_put: HashMap = HashMap::new(); @@ -578,20 +576,28 @@ impl IndexScheduler { delete_ranges.push(delete_range_start..i128::MAX); } - Ok((delete_ranges, to_put)) + for range in delete_ranges { + db.delete_range(wtxn, &range)?; + } + + for (timestamp, data) in to_put { + db.put(wtxn, ×tamp, &data)?; + } + + Ok(()) } - fn prepare_remove_batch_datetimes( - rtxn: &RoTxn<'_>, + fn remove_batch_datetimes( + wtxn: &mut RwTxn<'_>, to_remove: &RoaringBitmap, db: Database, - ) -> Result { + ) -> Result<()> { if to_remove.is_empty() { - return Ok((Vec::new(), HashMap::new())); + return Ok(()); } // We iterate over the time database to see which ranges of timestamps need to be removed - let iter = db.iter(rtxn)?; + let iter = db.iter(wtxn)?; let mut delete_range_start = None; let mut delete_ranges = Vec::new(); let mut to_put: HashMap = HashMap::new(); @@ -616,14 +622,6 @@ impl IndexScheduler { delete_ranges.push(delete_range_start..i128::MAX); } - Ok((delete_ranges, to_put)) - } - - fn remove_datetimes( - wtxn: &mut RwTxn<'_>, - (delete_ranges, to_put): DateTimeChanges, - db: Database, - ) -> Result<()> { for range in delete_ranges { db.delete_range(wtxn, &range)?; } @@ -771,46 +769,6 @@ impl IndexScheduler { } } - // 5. Prepare to remove the datetimes from the tasks - progress.update_progress(TaskDeletionProgress::RetrievingTaskDateTimes); - let task_enqueued_changes = prepare_remove_datetimes( - &rtxn, - tasks_enqueued_to_remove, - self.queue.tasks.enqueued_at, - )?; - - let task_started_changes = prepare_remove_datetimes( - &rtxn, - tasks_started_to_remove, - self.queue.tasks.started_at, - )?; - - let task_finished_changes = prepare_remove_datetimes( - &rtxn, - tasks_finished_to_remove, - self.queue.tasks.finished_at, - )?; - - // 6. Prepare to remove the datetimes from the batches - progress.update_progress(TaskDeletionProgress::RetrievingBatchDateTimes); - let batch_enqueued_changes = prepare_remove_batch_datetimes( - &rtxn, - &to_delete_batches, - self.queue.batches.enqueued_at, - )?; - - let batch_started_changes = prepare_remove_batch_datetimes( - &rtxn, - &to_delete_batches, - self.queue.batches.started_at, - )?; - - let batch_finished_changes = prepare_remove_batch_datetimes( - &rtxn, - &to_delete_batches, - self.queue.batches.finished_at, - )?; - drop(rtxn); let mut owned_wtxn = self.env.write_txn()?; let wtxn = &mut owned_wtxn; @@ -818,30 +776,28 @@ impl IndexScheduler { // 7. Remove task datetimes progress.update_progress(TaskDeletionProgress::DeletingTasksDateTime); - remove_datetimes(wtxn, task_enqueued_changes, self.queue.tasks.enqueued_at)?; - - remove_datetimes(wtxn, task_started_changes, self.queue.tasks.started_at)?; - - remove_datetimes(wtxn, task_finished_changes, self.queue.tasks.finished_at)?; + remove_task_datetimes(wtxn, tasks_enqueued_to_remove, self.queue.tasks.enqueued_at)?; + remove_task_datetimes(wtxn, tasks_started_to_remove, self.queue.tasks.started_at)?; + remove_task_datetimes(wtxn, tasks_finished_to_remove, self.queue.tasks.finished_at)?; // 8. Delete batches datetimes progress.update_progress(TaskDeletionProgress::DeletingBatchesDateTime); - remove_datetimes( + remove_batch_datetimes( wtxn, - batch_enqueued_changes, + &to_delete_batches, self.queue.batches.enqueued_at, )?; - remove_datetimes( + remove_batch_datetimes( wtxn, - batch_started_changes, + &to_delete_batches, self.queue.batches.started_at, )?; - remove_datetimes( + remove_batch_datetimes( wtxn, - batch_finished_changes, + &to_delete_batches, self.queue.batches.finished_at, )?;