This commit is contained in:
Mubelotix
2025-08-08 12:24:47 +02:00
parent 9a29a7790b
commit 418730ef73

View File

@ -561,6 +561,11 @@ impl IndexScheduler {
let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u32); let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u32);
progress.update_progress(task_progress); progress.update_progress(task_progress);
let mut min_enqueued = i128::MAX;
let mut max_enqueued = i128::MIN;
let mut enqueued_to_remove = HashMap::new();
let mut started_to_remove = HashMap::new();
let mut finished_to_remove = HashMap::new();
for task_id in to_delete_tasks.iter() { for task_id in to_delete_tasks.iter() {
let task = let task =
self.queue.tasks.get_task(wtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?; self.queue.tasks.get_task(wtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
@ -572,28 +577,47 @@ impl IndexScheduler {
// we can only delete succeeded, failed, and canceled tasks. // we can only delete succeeded, failed, and canceled tasks.
// In each of those cases, the persisted data is supposed to // In each of those cases, the persisted data is supposed to
// have been deleted already. // have been deleted already.
utils::remove_task_datetime(
wtxn, let enqueued_at = task.enqueued_at.unix_timestamp_nanos();
self.queue.tasks.enqueued_at, if enqueued_at < min_enqueued {
task.enqueued_at, min_enqueued = enqueued_at;
task.uid, }
)?; if enqueued_at > max_enqueued {
max_enqueued = enqueued_at;
}
enqueued_to_remove
.entry(enqueued_at)
.or_insert_with(RoaringBitmap::new)
.insert(task_id);
if let Some(started_at) = task.started_at { if let Some(started_at) = task.started_at {
utils::remove_task_datetime( let started_at = started_at.unix_timestamp_nanos();
wtxn, if started_at < min_enqueued {
self.queue.tasks.started_at, min_enqueued = started_at;
started_at,
task.uid,
)?;
} }
if started_at > max_enqueued {
max_enqueued = started_at;
}
started_to_remove
.entry(started_at)
.or_insert_with(RoaringBitmap::new)
.insert(task_id);
}
if let Some(finished_at) = task.finished_at { if let Some(finished_at) = task.finished_at {
utils::remove_task_datetime( let finished_at = finished_at.unix_timestamp_nanos();
wtxn, if finished_at < min_enqueued {
self.queue.tasks.finished_at, min_enqueued = finished_at;
finished_at,
task.uid,
)?;
} }
if finished_at > max_enqueued {
max_enqueued = finished_at;
}
finished_to_remove
.entry(finished_at)
.or_insert_with(RoaringBitmap::new)
.insert(task_id);
}
if let Some(canceled_by) = task.canceled_by { if let Some(canceled_by) = task.canceled_by {
affected_canceled_by.insert(canceled_by); affected_canceled_by.insert(canceled_by);
} }
@ -603,6 +627,29 @@ impl IndexScheduler {
atomic_progress.fetch_add(1, Ordering::Relaxed); atomic_progress.fetch_add(1, Ordering::Relaxed);
} }
for (mut to_remove, db) in [
(enqueued_to_remove, &self.queue.tasks.enqueued_at),
(started_to_remove, &self.queue.tasks.started_at),
(finished_to_remove, &self.queue.tasks.finished_at),
] {
if !to_remove.is_empty() {
let mut iter = db.rev_range_mut(wtxn, &(min_enqueued..=max_enqueued))?;
while let Some(i) = iter.next() {
let (timestamp, mut tasks) = i?;
if let Some(to_remove_tasks) = to_remove.remove(&timestamp) {
tasks -= &to_remove_tasks;
if tasks.is_empty() {
// safety: We don't keep references to the database
unsafe { iter.del_current()? };
} else {
// safety: We don't keep references to the database
unsafe { iter.put_current(&timestamp, &tasks)? };
}
}
}
}
}
progress.update_progress(TaskDeletionProgress::DeletingTasksMetadata); progress.update_progress(TaskDeletionProgress::DeletingTasksMetadata);
let (atomic_progress, task_progress) = AtomicTaskStep::new( let (atomic_progress, task_progress) = AtomicTaskStep::new(
(affected_indexes.len() + affected_statuses.len() + affected_kinds.len()) as u32, (affected_indexes.len() + affected_statuses.len() + affected_kinds.len()) as u32,