mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-09-05 20:26:31 +00:00
Optim F - Delay wtxn
This commit is contained in:
@ -81,6 +81,7 @@ make_enum_progress! {
|
||||
|
||||
make_enum_progress! {
|
||||
pub enum TaskDeletionProgress {
|
||||
RetrievingTasks,
|
||||
DeletingTasksDateTime,
|
||||
DeletingTasksMetadata,
|
||||
DeletingTasks,
|
||||
|
@ -102,10 +102,8 @@ impl IndexScheduler {
|
||||
}
|
||||
}
|
||||
|
||||
let mut wtxn = self.env.write_txn()?;
|
||||
let mut deleted_tasks =
|
||||
self.delete_matched_tasks(&mut wtxn, &matched_tasks, &progress)?;
|
||||
wtxn.commit()?;
|
||||
self.delete_matched_tasks(&matched_tasks, &progress)?;
|
||||
|
||||
for task in tasks.iter_mut() {
|
||||
task.status = Status::Succeeded;
|
||||
@ -513,7 +511,6 @@ impl IndexScheduler {
|
||||
/// Return the number of tasks that were actually deleted.
|
||||
fn delete_matched_tasks(
|
||||
&self,
|
||||
wtxn: &mut RwTxn,
|
||||
matched_tasks: &RoaringBitmap,
|
||||
progress: &Progress,
|
||||
) -> Result<RoaringBitmap> {
|
||||
@ -542,11 +539,13 @@ impl IndexScheduler {
|
||||
}
|
||||
progress.update_progress(TaskDeletionProgress::RetrievingTasks);
|
||||
|
||||
let rtxn = self.env.read_txn()?;
|
||||
|
||||
// 1. Remove from this list the tasks that we are not allowed to delete
|
||||
let enqueued_tasks = self.queue.tasks.get_status(wtxn, Status::Enqueued)?;
|
||||
let enqueued_tasks = self.queue.tasks.get_status(&rtxn, Status::Enqueued)?;
|
||||
let processing_tasks = &self.processing_tasks.read().unwrap().processing.clone();
|
||||
|
||||
let all_task_ids = self.queue.tasks.all_task_ids(wtxn)?;
|
||||
let all_task_ids = self.queue.tasks.all_task_ids(&rtxn)?;
|
||||
let mut to_delete_tasks = all_task_ids & matched_tasks;
|
||||
to_delete_tasks -= &**processing_tasks;
|
||||
to_delete_tasks -= &enqueued_tasks;
|
||||
@ -571,7 +570,7 @@ impl IndexScheduler {
|
||||
.queue
|
||||
.tasks
|
||||
.all_tasks
|
||||
.range(wtxn, &(Bound::Included(range.0), Bound::Included(range.1)))?;
|
||||
.range(&rtxn, &(Bound::Included(range.0), Bound::Included(range.1)))?;
|
||||
for task in iter {
|
||||
let (task_id, task) = task?;
|
||||
|
||||
@ -625,6 +624,10 @@ impl IndexScheduler {
|
||||
// In each of those cases, the persisted data is supposed to
|
||||
// have been deleted already.
|
||||
|
||||
drop(rtxn);
|
||||
let mut wtxn_owned = self.env.write_txn()?;
|
||||
let wtxn = &mut wtxn_owned;
|
||||
|
||||
progress.update_progress(TaskDeletionProgress::DeletingTasksDateTime);
|
||||
for (mut to_remove, db) in [
|
||||
(enqueued_to_remove, &self.queue.tasks.enqueued_at),
|
||||
@ -813,6 +816,8 @@ impl IndexScheduler {
|
||||
})?;
|
||||
}
|
||||
|
||||
wtxn_owned.commit()?;
|
||||
|
||||
Ok(to_delete_tasks)
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user