mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-09-06 04:36:32 +00:00
Optim J: Use iterator on batch reader
This commit is contained in:
@ -681,10 +681,16 @@ impl IndexScheduler {
|
|||||||
let mut tasks_to_remove_earlier = Vec::new();
|
let mut tasks_to_remove_earlier = Vec::new();
|
||||||
|
|
||||||
progress.update_progress(TaskDeletionProgress::RetrievingBatches);
|
progress.update_progress(TaskDeletionProgress::RetrievingBatches);
|
||||||
let (atomic_progress, batch_progress) = AtomicBatchStep::new(affected_batches.len() as u32);
|
let affected_batches_bitmap = RoaringBitmap::from_iter(affected_batches.keys());
|
||||||
progress.update_progress(batch_progress);
|
for range in consecutive_ranges(affected_batches_bitmap.iter()) {
|
||||||
for (batch_id, to_delete_tasks) in affected_batches {
|
let iter = self
|
||||||
if let Some(mut tasks) = self.queue.batch_to_tasks_mapping.get(&rtxn, &batch_id)? {
|
.queue
|
||||||
|
.batch_to_tasks_mapping
|
||||||
|
.range(&rtxn, &(Bound::Included(range.0), Bound::Included(range.1)))?;
|
||||||
|
for i in iter {
|
||||||
|
let (batch_id, mut tasks) = i?;
|
||||||
|
let to_delete_tasks = affected_batches.remove(&batch_id).unwrap_or_default();
|
||||||
|
|
||||||
tasks -= &to_delete_tasks;
|
tasks -= &to_delete_tasks;
|
||||||
// We must remove the batch entirely
|
// We must remove the batch entirely
|
||||||
if tasks.is_empty() {
|
if tasks.is_empty() {
|
||||||
@ -698,7 +704,10 @@ impl IndexScheduler {
|
|||||||
if oldest > batches_max_enqueued {
|
if oldest > batches_max_enqueued {
|
||||||
batches_max_enqueued = oldest;
|
batches_max_enqueued = oldest;
|
||||||
}
|
}
|
||||||
batches_enqueued_to_remove.entry(earliest).or_default().insert(batch_id);
|
batches_enqueued_to_remove
|
||||||
|
.entry(earliest)
|
||||||
|
.or_default()
|
||||||
|
.insert(batch_id);
|
||||||
batches_enqueued_to_remove.entry(oldest).or_default().insert(batch_id);
|
batches_enqueued_to_remove.entry(oldest).or_default().insert(batch_id);
|
||||||
} else {
|
} else {
|
||||||
// If we don't have the enqueued at in the batch it means the database comes from the v1.12
|
// If we don't have the enqueued at in the batch it means the database comes from the v1.12
|
||||||
@ -725,7 +734,10 @@ impl IndexScheduler {
|
|||||||
if finished_at > batches_max_finished {
|
if finished_at > batches_max_finished {
|
||||||
batches_max_finished = finished_at;
|
batches_max_finished = finished_at;
|
||||||
}
|
}
|
||||||
batches_finished_to_remove.entry(finished_at).or_default().insert(batch_id);
|
batches_finished_to_remove
|
||||||
|
.entry(finished_at)
|
||||||
|
.or_default()
|
||||||
|
.insert(batch_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
to_delete_batches.insert(batch_id);
|
to_delete_batches.insert(batch_id);
|
||||||
@ -762,7 +774,6 @@ impl IndexScheduler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Note: don't delete the persisted task data since
|
// Note: don't delete the persisted task data since
|
||||||
|
Reference in New Issue
Block a user