mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-09-06 04:36:32 +00:00
Optimization A
Speeds up the "deleting tasks" subtask by 23%
This commit is contained in:
@ -1,4 +1,5 @@
|
|||||||
use std::collections::{BTreeSet, HashMap, HashSet};
|
use std::collections::{BTreeSet, HashMap, HashSet};
|
||||||
|
use std::ops::Bound;
|
||||||
use std::panic::{catch_unwind, AssertUnwindSafe};
|
use std::panic::{catch_unwind, AssertUnwindSafe};
|
||||||
use std::sync::atomic::Ordering;
|
use std::sync::atomic::Ordering;
|
||||||
|
|
||||||
@ -510,6 +511,29 @@ impl IndexScheduler {
|
|||||||
matched_tasks: &RoaringBitmap,
|
matched_tasks: &RoaringBitmap,
|
||||||
progress: &Progress,
|
progress: &Progress,
|
||||||
) -> Result<RoaringBitmap> {
|
) -> Result<RoaringBitmap> {
|
||||||
|
fn consecutive_ranges<I>(iter: I) -> impl Iterator<Item = (u32, u32)>
|
||||||
|
where
|
||||||
|
I: IntoIterator<Item = u32>,
|
||||||
|
{
|
||||||
|
let mut iter = iter.into_iter().peekable();
|
||||||
|
|
||||||
|
std::iter::from_fn(move || {
|
||||||
|
let start = iter.next()?;
|
||||||
|
|
||||||
|
let mut end = start;
|
||||||
|
|
||||||
|
while let Some(&next) = iter.peek() {
|
||||||
|
if next == end + 1 {
|
||||||
|
end = next;
|
||||||
|
iter.next();
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Some((start, end))
|
||||||
|
})
|
||||||
|
}
|
||||||
progress.update_progress(TaskDeletionProgress::DeletingTasksDateTime);
|
progress.update_progress(TaskDeletionProgress::DeletingTasksDateTime);
|
||||||
|
|
||||||
// 1. Remove from this list the tasks that we are not allowed to delete
|
// 1. Remove from this list the tasks that we are not allowed to delete
|
||||||
@ -596,8 +620,11 @@ impl IndexScheduler {
|
|||||||
progress.update_progress(TaskDeletionProgress::DeletingTasks);
|
progress.update_progress(TaskDeletionProgress::DeletingTasks);
|
||||||
let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u32);
|
let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u32);
|
||||||
progress.update_progress(task_progress);
|
progress.update_progress(task_progress);
|
||||||
for task in to_delete_tasks.iter() {
|
for range in consecutive_ranges(to_delete_tasks.iter()) {
|
||||||
self.queue.tasks.all_tasks.delete(wtxn, &task)?;
|
self.queue
|
||||||
|
.tasks
|
||||||
|
.all_tasks
|
||||||
|
.delete_range(wtxn, &(Bound::Included(range.0), Bound::Included(range.1)))?;
|
||||||
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
for canceled_by in affected_canceled_by {
|
for canceled_by in affected_canceled_by {
|
||||||
|
Reference in New Issue
Block a user