mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-09-05 20:26:31 +00:00
Optim E
This commit is contained in:
@ -19,7 +19,7 @@ use crate::processing::{
|
|||||||
UpdateIndexProgress,
|
UpdateIndexProgress,
|
||||||
};
|
};
|
||||||
use crate::utils::{
|
use crate::utils::{
|
||||||
self, remove_n_tasks_datetime_earlier_than, remove_task_datetime, swap_index_uid_in_task,
|
remove_n_tasks_datetime_earlier_than, remove_task_datetime, swap_index_uid_in_task,
|
||||||
ProcessingBatch,
|
ProcessingBatch,
|
||||||
};
|
};
|
||||||
use crate::{Error, IndexScheduler, Result, TaskId};
|
use crate::{Error, IndexScheduler, Result, TaskId};
|
||||||
@ -566,58 +566,65 @@ impl IndexScheduler {
|
|||||||
let mut enqueued_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
let mut enqueued_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
||||||
let mut started_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
let mut started_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
||||||
let mut finished_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
let mut finished_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
||||||
for task_id in to_delete_tasks.iter() {
|
for range in consecutive_ranges(to_delete_tasks.iter()) {
|
||||||
let task =
|
let iter = self
|
||||||
self.queue.tasks.get_task(wtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
|
.queue
|
||||||
|
.tasks
|
||||||
|
.all_tasks
|
||||||
|
.range(wtxn, &(Bound::Included(range.0), Bound::Included(range.1)))?;
|
||||||
|
for task in iter {
|
||||||
|
let (task_id, task) = task?;
|
||||||
|
|
||||||
affected_indexes.extend(task.indexes().into_iter().map(|x| x.to_owned()));
|
affected_indexes.extend(task.indexes().into_iter().map(|x| x.to_owned()));
|
||||||
affected_statuses.insert(task.status);
|
affected_statuses.insert(task.status);
|
||||||
affected_kinds.insert(task.kind.as_kind());
|
affected_kinds.insert(task.kind.as_kind());
|
||||||
// Note: don't delete the persisted task data since
|
|
||||||
// we can only delete succeeded, failed, and canceled tasks.
|
|
||||||
// In each of those cases, the persisted data is supposed to
|
|
||||||
// have been deleted already.
|
|
||||||
|
|
||||||
let enqueued_at = task.enqueued_at.unix_timestamp_nanos();
|
let enqueued_at = task.enqueued_at.unix_timestamp_nanos();
|
||||||
if enqueued_at < min_enqueued {
|
if enqueued_at < min_enqueued {
|
||||||
min_enqueued = enqueued_at;
|
min_enqueued = enqueued_at;
|
||||||
}
|
|
||||||
if enqueued_at > max_enqueued {
|
|
||||||
max_enqueued = enqueued_at;
|
|
||||||
}
|
|
||||||
enqueued_to_remove.entry(enqueued_at).or_default().insert(task_id);
|
|
||||||
|
|
||||||
if let Some(started_at) = task.started_at {
|
|
||||||
let started_at = started_at.unix_timestamp_nanos();
|
|
||||||
if started_at < min_enqueued {
|
|
||||||
min_enqueued = started_at;
|
|
||||||
}
|
}
|
||||||
if started_at > max_enqueued {
|
if enqueued_at > max_enqueued {
|
||||||
max_enqueued = started_at;
|
max_enqueued = enqueued_at;
|
||||||
}
|
}
|
||||||
started_to_remove.entry(started_at).or_default().insert(task_id);
|
enqueued_to_remove.entry(enqueued_at).or_default().insert(task_id);
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(finished_at) = task.finished_at {
|
if let Some(started_at) = task.started_at {
|
||||||
let finished_at = finished_at.unix_timestamp_nanos();
|
let started_at = started_at.unix_timestamp_nanos();
|
||||||
if finished_at < min_enqueued {
|
if started_at < min_enqueued {
|
||||||
min_enqueued = finished_at;
|
min_enqueued = started_at;
|
||||||
|
}
|
||||||
|
if started_at > max_enqueued {
|
||||||
|
max_enqueued = started_at;
|
||||||
|
}
|
||||||
|
started_to_remove.entry(started_at).or_default().insert(task_id);
|
||||||
}
|
}
|
||||||
if finished_at > max_enqueued {
|
|
||||||
max_enqueued = finished_at;
|
|
||||||
}
|
|
||||||
finished_to_remove.entry(finished_at).or_default().insert(task_id);
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(canceled_by) = task.canceled_by {
|
if let Some(finished_at) = task.finished_at {
|
||||||
affected_canceled_by.insert(canceled_by);
|
let finished_at = finished_at.unix_timestamp_nanos();
|
||||||
|
if finished_at < min_enqueued {
|
||||||
|
min_enqueued = finished_at;
|
||||||
|
}
|
||||||
|
if finished_at > max_enqueued {
|
||||||
|
max_enqueued = finished_at;
|
||||||
|
}
|
||||||
|
finished_to_remove.entry(finished_at).or_default().insert(task_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(canceled_by) = task.canceled_by {
|
||||||
|
affected_canceled_by.insert(canceled_by);
|
||||||
|
}
|
||||||
|
if let Some(batch_uid) = task.batch_uid {
|
||||||
|
affected_batches.entry(batch_uid).or_default().insert(task_id);
|
||||||
|
}
|
||||||
|
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
if let Some(batch_uid) = task.batch_uid {
|
|
||||||
affected_batches.entry(batch_uid).or_default().insert(task_id);
|
|
||||||
}
|
|
||||||
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Note: don't delete the persisted task data since
|
||||||
|
// we can only delete succeeded, failed, and canceled tasks.
|
||||||
|
// In each of those cases, the persisted data is supposed to
|
||||||
|
// have been deleted already.
|
||||||
|
|
||||||
progress.update_progress(TaskDeletionProgress::DeletingTasksDateTime);
|
progress.update_progress(TaskDeletionProgress::DeletingTasksDateTime);
|
||||||
for (mut to_remove, db) in [
|
for (mut to_remove, db) in [
|
||||||
(enqueued_to_remove, &self.queue.tasks.enqueued_at),
|
(enqueued_to_remove, &self.queue.tasks.enqueued_at),
|
||||||
|
Reference in New Issue
Block a user