Split progress step in half

This commit is contained in:
Mubelotix
2025-08-11 17:52:59 +02:00
parent e23bf92502
commit 16c57414ad
2 changed files with 16 additions and 9 deletions

View File

@ -82,6 +82,7 @@ make_enum_progress! {
make_enum_progress! { make_enum_progress! {
pub enum TaskDeletionProgress { pub enum TaskDeletionProgress {
RetrievingTasks, RetrievingTasks,
RetrievingBatchTasks,
RetrievingBatches, RetrievingBatches,
DeletingTasksDateTime, DeletingTasksDateTime,
DeletingBatchesDateTime, DeletingBatchesDateTime,

View File

@ -1,7 +1,7 @@
use std::collections::{BTreeSet, HashMap, HashSet}; use std::collections::{BTreeSet, HashMap, HashSet};
use std::ops::RangeInclusive; use std::ops::RangeInclusive;
use std::panic::{catch_unwind, AssertUnwindSafe}; use std::panic::{catch_unwind, AssertUnwindSafe};
use std::sync::atomic::{self, Ordering}; use std::sync::atomic::Ordering;
use meilisearch_types::batches::{BatchEnqueuedAt, BatchId}; use meilisearch_types::batches::{BatchEnqueuedAt, BatchId};
use meilisearch_types::heed::{Database, RoTxn, RwTxn}; use meilisearch_types::heed::{Database, RoTxn, RwTxn};
@ -668,7 +668,7 @@ impl IndexScheduler {
} }
let mut to_remove_from_kinds = HashMap::new(); let mut to_remove_from_kinds = HashMap::new();
// 4. Read affected batches to list metadata that needs to be updated. // 4. Read affected batches' tasks
let mut batches_enqueued_range = i128::MAX..=i128::MIN; let mut batches_enqueued_range = i128::MAX..=i128::MIN;
let mut batches_started_range = i128::MAX..=i128::MIN; let mut batches_started_range = i128::MAX..=i128::MIN;
let mut batches_finished_range = i128::MAX..=i128::MIN; let mut batches_finished_range = i128::MAX..=i128::MIN;
@ -677,8 +677,8 @@ impl IndexScheduler {
let mut batches_finished_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new(); let mut batches_finished_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
let mut to_delete_batches = RoaringBitmap::new(); let mut to_delete_batches = RoaringBitmap::new();
let mut tasks_to_remove_earlier = Vec::new(); let mut tasks_to_remove_earlier = Vec::new();
progress.update_progress(TaskDeletionProgress::RetrievingBatches);
let affected_batches_bitmap = RoaringBitmap::from_iter(affected_batches.keys()); let affected_batches_bitmap = RoaringBitmap::from_iter(affected_batches.keys());
progress.update_progress(TaskDeletionProgress::RetrievingBatchTasks);
let (atomic_progress, task_progress) = let (atomic_progress, task_progress) =
AtomicBatchStep::new(affected_batches_bitmap.len() as u32); AtomicBatchStep::new(affected_batches_bitmap.len() as u32);
progress.update_progress(task_progress); progress.update_progress(task_progress);
@ -689,6 +689,11 @@ impl IndexScheduler {
let to_delete_tasks = affected_batches.remove(&batch_id).unwrap_or_default(); let to_delete_tasks = affected_batches.remove(&batch_id).unwrap_or_default();
tasks -= &to_delete_tasks; tasks -= &to_delete_tasks;
// We must remove the batch entirely
if tasks.is_empty() {
to_delete_batches.insert(batch_id);
}
// We must remove the batch from all the reverse indexes it no longer has tasks for. // We must remove the batch from all the reverse indexes it no longer has tasks for.
for (index, index_tasks) in affected_indexes_tasks.iter() { for (index, index_tasks) in affected_indexes_tasks.iter() {
@ -723,14 +728,15 @@ impl IndexScheduler {
// In each of those cases, the persisted data is supposed to // In each of those cases, the persisted data is supposed to
// have been deleted already. // have been deleted already.
// We must remove the batch entirely
if tasks.is_empty() {
to_delete_batches.insert(batch_id);
} else {
atomic_progress.fetch_add(1, Ordering::Relaxed); atomic_progress.fetch_add(1, Ordering::Relaxed);
} }
} }
}
// 5. Read batches metadata
progress.update_progress(TaskDeletionProgress::RetrievingBatches);
let (atomic_progress, task_progress) =
AtomicBatchStep::new(to_delete_batches.len() as u32);
progress.update_progress(task_progress);
for range in consecutive_ranges(to_delete_batches.iter()) { for range in consecutive_ranges(to_delete_batches.iter()) {
let iter = self.queue.batches.all_batches.range(&rtxn, &range)?; let iter = self.queue.batches.all_batches.range(&rtxn, &range)?;
for batch in iter { for batch in iter {