Optim H: Remove batches in iterator

This commit is contained in:
Mubelotix
2025-08-08 15:07:48 +02:00
parent cb36257537
commit 6a99c5b2f3

View File

@ -683,24 +683,24 @@ impl IndexScheduler {
);
progress.update_progress(task_progress);
for index in affected_indexes.iter() {
self.queue.tasks.update_index(wtxn, index, |bitmap| *bitmap -= &to_delete_tasks)?;
self.queue.tasks.update_index(wtxn, index, |bitmap| *bitmap -= &tasks_to_remove)?;
atomic_progress.fetch_add(1, Ordering::Relaxed);
}
for status in affected_statuses.iter() {
self.queue.tasks.update_status(wtxn, *status, |bitmap| *bitmap -= &to_delete_tasks)?;
self.queue.tasks.update_status(wtxn, *status, |bitmap| *bitmap -= &tasks_to_remove)?;
atomic_progress.fetch_add(1, Ordering::Relaxed);
}
for kind in affected_kinds.iter() {
self.queue.tasks.update_kind(wtxn, *kind, |bitmap| *bitmap -= &to_delete_tasks)?;
self.queue.tasks.update_kind(wtxn, *kind, |bitmap| *bitmap -= &tasks_to_remove)?;
atomic_progress.fetch_add(1, Ordering::Relaxed);
}
progress.update_progress(TaskDeletionProgress::DeletingTasks);
let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u32);
let (atomic_progress, task_progress) = AtomicTaskStep::new(tasks_to_remove.len() as u32);
progress.update_progress(task_progress);
for range in consecutive_ranges(to_delete_tasks.iter()) {
for range in consecutive_ranges(tasks_to_remove.iter()) {
self.queue
.tasks
.all_tasks
@ -709,7 +709,7 @@ impl IndexScheduler {
}
for canceled_by in affected_canceled_by {
if let Some(mut tasks) = self.queue.tasks.canceled_by.get(wtxn, &canceled_by)? {
tasks -= &to_delete_tasks;
tasks -= &tasks_to_remove;
if tasks.is_empty() {
self.queue.tasks.canceled_by.delete(wtxn, &canceled_by)?;
} else {
@ -750,6 +750,8 @@ impl IndexScheduler {
let mut min_finished = i128::MAX;
let mut max_finished = i128::MIN;
let mut batches_to_remove = RoaringBitmap::new();
for (batch_id, to_delete_tasks) in affected_batches {
if let Some(mut tasks) = self.queue.batch_to_tasks_mapping.get(wtxn, &batch_id)? {
tasks -= &to_delete_tasks;
@ -797,8 +799,7 @@ impl IndexScheduler {
finished_to_remove.entry(finished_at).or_default().insert(batch_id);
}
self.queue.batches.all_batches.delete(wtxn, &batch_id)?;
self.queue.batch_to_tasks_mapping.delete(wtxn, &batch_id)?;
batches_to_remove.insert(batch_id);
}
}
@ -835,6 +836,16 @@ impl IndexScheduler {
atomic_progress.fetch_add(1, Ordering::Relaxed);
}
for range in consecutive_ranges(batches_to_remove.iter()) {
self.queue
.batches
.all_batches
.delete_range(wtxn, &(Bound::Included(range.0), Bound::Included(range.1)))?;
self.queue
.batch_to_tasks_mapping
.delete_range(wtxn, &(Bound::Included(range.0), Bound::Included(range.1)))?;
}
remove_datetimes(
wtxn,
min_enqueued..=max_enqueued,
@ -874,7 +885,7 @@ impl IndexScheduler {
wtxn_owned.commit()?;
Ok(to_delete_tasks)
Ok(tasks_to_remove)
}
/// Cancel each given task from all the databases (if it is cancelable).