Optim M: Completely stop reading batches

This commit is contained in:
Mubelotix
2025-08-12 10:18:17 +02:00
parent 60a1188a58
commit a54d7d2269
2 changed files with 53 additions and 52 deletions

View File

@ -83,7 +83,6 @@ make_enum_progress! {
pub enum TaskDeletionProgress {
RetrievingTasks,
RetrievingBatchTasks,
RetrievingBatches,
DeletingTasksDateTime,
DeletingBatchesDateTime,
DeletingTasksMetadata,

View File

@ -3,7 +3,7 @@ use std::ops::RangeInclusive;
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::sync::atomic::Ordering;
use meilisearch_types::batches::{BatchEnqueuedAt, BatchId};
use meilisearch_types::batches::BatchId;
use meilisearch_types::heed::{Database, RoTxn, RwTxn};
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
use meilisearch_types::milli::{self, CboRoaringBitmapCodec, ChannelCongestion};
@ -18,7 +18,7 @@ use crate::processing::{
InnerSwappingTwoIndexes, SwappingTheIndexes, TaskCancelationProgress, TaskDeletionProgress,
UpdateIndexProgress,
};
use crate::utils::{remove_n_tasks_datetime_earlier_than, swap_index_uid_in_task, ProcessingBatch};
use crate::utils::{swap_index_uid_in_task, ProcessingBatch};
use crate::{Error, IndexScheduler, Result, TaskId, BEI128};
#[derive(Debug, Default)]
@ -587,6 +587,54 @@ impl IndexScheduler {
Ok(())
}
fn remove_batch_datetimes(
wtxn: &mut RwTxn<'_>,
to_remove: &RoaringBitmap,
db: Database<BEI128, CboRoaringBitmapCodec>,
) -> Result<()> {
if to_remove.is_empty() {
return Ok(());
}
// We iterate over the time database to see which ranges of timestamps need to be removed
let iter = db.iter(wtxn)?;
let mut delete_range_start = None;
let mut delete_ranges = Vec::new();
let mut to_put: HashMap<i128, RoaringBitmap> = HashMap::new();
for i in iter {
let (timestamp, mut current) = i?;
if current.iter().any(|task_id| to_remove.contains(task_id)) {
current -= to_remove;
if current.is_empty() {
delete_range_start = Some(timestamp);
} else {
// We could close the deletion range but it's not necessary because the new value will get reinserted anyway
to_put.insert(timestamp, current);
}
} else if let Some(delete_range_start) = delete_range_start.take() {
// Current one must not be deleted so we need to skip it
delete_ranges.push(delete_range_start..timestamp);
}
}
if let Some(delete_range_start) = delete_range_start.take() {
delete_ranges.push(delete_range_start..i128::MAX);
}
for range in delete_ranges {
db.delete_range(wtxn, &range)?;
}
for (timestamp, data) in to_put {
db.put(wtxn, &timestamp, &data)?;
}
Ok(())
}
let instant = std::time::Instant::now();
progress.update_progress(TaskDeletionProgress::RetrievingTasks);
let rtxn = self.env.read_txn()?;
@ -667,11 +715,7 @@ impl IndexScheduler {
let mut to_remove_from_kinds = HashMap::new();
// 4. Read affected batches' tasks
let mut batches_enqueued_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
let mut batches_started_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
let mut batches_finished_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
let mut to_delete_batches = RoaringBitmap::new();
let mut tasks_to_remove_earlier = Vec::new();
let affected_batches_bitmap = RoaringBitmap::from_iter(affected_batches.keys());
progress.update_progress(TaskDeletionProgress::RetrievingBatchTasks);
let (atomic_progress, task_progress) =
@ -727,40 +771,6 @@ impl IndexScheduler {
}
}
// 5. Read batches metadata
progress.update_progress(TaskDeletionProgress::RetrievingBatches);
let (atomic_progress, task_progress) = AtomicBatchStep::new(to_delete_batches.len() as u32);
progress.update_progress(task_progress);
for range in consecutive_ranges(to_delete_batches.iter()) {
let iter = self.queue.batches.all_batches.range(&rtxn, &range)?;
for batch in iter {
let (batch_id, batch) = batch?;
if let Some(BatchEnqueuedAt { earliest, oldest }) = batch.enqueued_at {
let earliest = earliest.unix_timestamp_nanos();
let oldest = oldest.unix_timestamp_nanos();
batches_enqueued_to_remove.entry(earliest).or_default().insert(batch_id);
batches_enqueued_to_remove.entry(oldest).or_default().insert(batch_id);
} else {
// If we don't have the enqueued at in the batch it means the database comes from the v1.12
// and we still need to find the date by scrolling the database
tasks_to_remove_earlier.push((
batch.started_at,
batch.stats.total_nb_tasks.clamp(1, 2) as usize,
batch_id,
));
}
let started_at = batch.started_at.unix_timestamp_nanos();
batches_started_to_remove.entry(started_at).or_default().insert(batch_id);
if let Some(finished_at) = batch.finished_at {
let finished_at = finished_at.unix_timestamp_nanos();
batches_finished_to_remove.entry(finished_at).or_default().insert(batch_id);
}
atomic_progress.fetch_add(1, Ordering::Relaxed);
}
}
drop(rtxn);
let mut owned_wtxn = self.env.write_txn()?;
let wtxn = &mut owned_wtxn;
@ -777,19 +787,11 @@ impl IndexScheduler {
// 6. Delete batches datetimes
progress.update_progress(TaskDeletionProgress::DeletingBatchesDateTime);
for (started_at, nb_tasks, batch_id) in tasks_to_remove_earlier {
remove_n_tasks_datetime_earlier_than(
wtxn,
self.queue.batches.enqueued_at,
started_at,
nb_tasks,
batch_id,
)?;
}
remove_batch_datetimes(wtxn, &to_delete_batches, self.queue.batches.enqueued_at)?;
remove_datetimes(wtxn, batches_enqueued_to_remove, self.queue.batches.enqueued_at)?;
remove_batch_datetimes(wtxn, &to_delete_batches, self.queue.batches.started_at)?;
remove_datetimes(wtxn, batches_started_to_remove, self.queue.batches.started_at)?;
remove_batch_datetimes(wtxn, &to_delete_batches, self.queue.batches.finished_at)?;
remove_datetimes(wtxn, batches_finished_to_remove, self.queue.batches.finished_at)?;