diff --git a/crates/index-scheduler/src/processing.rs b/crates/index-scheduler/src/processing.rs index 3da81f143..77f096e5f 100644 --- a/crates/index-scheduler/src/processing.rs +++ b/crates/index-scheduler/src/processing.rs @@ -81,6 +81,7 @@ make_enum_progress! { make_enum_progress! { pub enum TaskDeletionProgress { + RetrievingTasks, DeletingTasksDateTime, DeletingTasksMetadata, DeletingTasks, diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index 1cadd1037..d42a6bad7 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -102,10 +102,8 @@ impl IndexScheduler { } } - let mut wtxn = self.env.write_txn()?; let mut deleted_tasks = - self.delete_matched_tasks(&mut wtxn, &matched_tasks, &progress)?; - wtxn.commit()?; + self.delete_matched_tasks(&matched_tasks, &progress)?; for task in tasks.iter_mut() { task.status = Status::Succeeded; @@ -513,7 +511,6 @@ impl IndexScheduler { /// Return the number of tasks that were actually deleted. fn delete_matched_tasks( &self, - wtxn: &mut RwTxn, matched_tasks: &RoaringBitmap, progress: &Progress, ) -> Result { @@ -542,11 +539,13 @@ impl IndexScheduler { } progress.update_progress(TaskDeletionProgress::RetrievingTasks); + let rtxn = self.env.read_txn()?; + // 1. Remove from this list the tasks that we are not allowed to delete - let enqueued_tasks = self.queue.tasks.get_status(wtxn, Status::Enqueued)?; + let enqueued_tasks = self.queue.tasks.get_status(&rtxn, Status::Enqueued)?; let processing_tasks = &self.processing_tasks.read().unwrap().processing.clone(); - let all_task_ids = self.queue.tasks.all_task_ids(wtxn)?; + let all_task_ids = self.queue.tasks.all_task_ids(&rtxn)?; let mut to_delete_tasks = all_task_ids & matched_tasks; to_delete_tasks -= &**processing_tasks; to_delete_tasks -= &enqueued_tasks; @@ -571,7 +570,7 @@ impl IndexScheduler { .queue .tasks .all_tasks - .range(wtxn, &(Bound::Included(range.0), Bound::Included(range.1)))?; + .range(&rtxn, &(Bound::Included(range.0), Bound::Included(range.1)))?; for task in iter { let (task_id, task) = task?; @@ -625,6 +624,10 @@ impl IndexScheduler { // In each of those cases, the persisted data is supposed to // have been deleted already. + drop(rtxn); + let mut wtxn_owned = self.env.write_txn()?; + let wtxn = &mut wtxn_owned; + progress.update_progress(TaskDeletionProgress::DeletingTasksDateTime); for (mut to_remove, db) in [ (enqueued_to_remove, &self.queue.tasks.enqueued_at), @@ -813,6 +816,8 @@ impl IndexScheduler { })?; } + wtxn_owned.commit()?; + Ok(to_delete_tasks) }