This commit is contained in:
Mubelotix
2025-08-12 10:48:37 +02:00
parent a54d7d2269
commit 0e9ac3ca92
2 changed files with 92 additions and 40 deletions

View File

@ -83,6 +83,8 @@ make_enum_progress! {
pub enum TaskDeletionProgress { pub enum TaskDeletionProgress {
RetrievingTasks, RetrievingTasks,
RetrievingBatchTasks, RetrievingBatchTasks,
RetrievingTaskDateTimes,
RetrievingBatchDateTimes,
DeletingTasksDateTime, DeletingTasksDateTime,
DeletingBatchesDateTime, DeletingBatchesDateTime,
DeletingTasksMetadata, DeletingTasksMetadata,

View File

@ -1,5 +1,5 @@
use std::collections::{BTreeSet, HashMap, HashSet}; use std::collections::{BTreeSet, HashMap, HashSet};
use std::ops::RangeInclusive; use std::ops::{Range, RangeInclusive};
use std::panic::{catch_unwind, AssertUnwindSafe}; use std::panic::{catch_unwind, AssertUnwindSafe};
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
@ -535,13 +535,15 @@ impl IndexScheduler {
}) })
} }
fn remove_datetimes( type DateTimeChanges = (Vec<Range<i128>>, HashMap<i128, RoaringBitmap>);
wtxn: &mut RwTxn<'_>,
fn prepare_remove_datetimes(
rtxn: &RoTxn<'_>,
mut to_remove: HashMap<i128, RoaringBitmap>, mut to_remove: HashMap<i128, RoaringBitmap>,
db: Database<BEI128, CboRoaringBitmapCodec>, db: Database<BEI128, CboRoaringBitmapCodec>,
) -> Result<()> { ) -> Result<DateTimeChanges> {
if to_remove.is_empty() { if to_remove.is_empty() {
return Ok(()); return Ok((Vec::new(), HashMap::new()));
} }
let min = to_remove.keys().min().cloned().unwrap_or(i128::MAX); let min = to_remove.keys().min().cloned().unwrap_or(i128::MAX);
@ -550,7 +552,7 @@ impl IndexScheduler {
// We iterate over the time database to see which ranges of timestamps need to be removed // We iterate over the time database to see which ranges of timestamps need to be removed
let lazy_db = db.lazily_decode_data(); let lazy_db = db.lazily_decode_data();
let iter = lazy_db.range(wtxn, &range)?; let iter = lazy_db.range(rtxn, &range)?;
let mut delete_range_start = None; let mut delete_range_start = None;
let mut delete_ranges = Vec::new(); let mut delete_ranges = Vec::new();
let mut to_put: HashMap<i128, RoaringBitmap> = HashMap::new(); let mut to_put: HashMap<i128, RoaringBitmap> = HashMap::new();
@ -576,35 +578,27 @@ impl IndexScheduler {
delete_ranges.push(delete_range_start..i128::MAX); delete_ranges.push(delete_range_start..i128::MAX);
} }
for range in delete_ranges { Ok((delete_ranges, to_put))
db.delete_range(wtxn, &range)?;
}
for (timestamp, data) in to_put {
db.put(wtxn, &timestamp, &data)?;
}
Ok(())
} }
fn remove_batch_datetimes( fn prepare_remove_batch_datetimes(
wtxn: &mut RwTxn<'_>, rtxn: &RoTxn<'_>,
to_remove: &RoaringBitmap, to_remove: &RoaringBitmap,
db: Database<BEI128, CboRoaringBitmapCodec>, db: Database<BEI128, CboRoaringBitmapCodec>,
) -> Result<()> { ) -> Result<DateTimeChanges> {
if to_remove.is_empty() { if to_remove.is_empty() {
return Ok(()); return Ok((Vec::new(), HashMap::new()));
} }
// We iterate over the time database to see which ranges of timestamps need to be removed // We iterate over the time database to see which ranges of timestamps need to be removed
let iter = db.iter(wtxn)?; let iter = db.iter(rtxn)?;
let mut delete_range_start = None; let mut delete_range_start = None;
let mut delete_ranges = Vec::new(); let mut delete_ranges = Vec::new();
let mut to_put: HashMap<i128, RoaringBitmap> = HashMap::new(); let mut to_put: HashMap<i128, RoaringBitmap> = HashMap::new();
for i in iter { for i in iter {
let (timestamp, mut current) = i?; let (timestamp, mut current) = i?;
if current.iter().any(|task_id| to_remove.contains(task_id)) { if !current.is_disjoint(to_remove) {
current -= to_remove; current -= to_remove;
if current.is_empty() { if current.is_empty() {
@ -622,6 +616,14 @@ impl IndexScheduler {
delete_ranges.push(delete_range_start..i128::MAX); delete_ranges.push(delete_range_start..i128::MAX);
} }
Ok((delete_ranges, to_put))
}
fn remove_datetimes(
wtxn: &mut RwTxn<'_>,
(delete_ranges, to_put): DateTimeChanges,
db: Database<BEI128, CboRoaringBitmapCodec>,
) -> Result<()> {
for range in delete_ranges { for range in delete_ranges {
db.delete_range(wtxn, &range)?; db.delete_range(wtxn, &range)?;
} }
@ -633,8 +635,6 @@ impl IndexScheduler {
Ok(()) Ok(())
} }
let instant = std::time::Instant::now();
progress.update_progress(TaskDeletionProgress::RetrievingTasks); progress.update_progress(TaskDeletionProgress::RetrievingTasks);
let rtxn = self.env.read_txn()?; let rtxn = self.env.read_txn()?;
@ -736,7 +736,7 @@ impl IndexScheduler {
// We must remove the batch from all the reverse indexes it no longer has tasks for. // We must remove the batch from all the reverse indexes it no longer has tasks for.
for (index, index_tasks) in affected_indexes_tasks.iter() { for (index, index_tasks) in affected_indexes_tasks.iter() {
if index_tasks.intersection_len(&tasks) == 0 { if !index_tasks.is_disjoint(&tasks) {
to_remove_from_indexes to_remove_from_indexes
.entry(index) .entry(index)
.or_insert_with(RoaringBitmap::new) .or_insert_with(RoaringBitmap::new)
@ -745,7 +745,7 @@ impl IndexScheduler {
} }
for (status, status_tasks) in affected_statuses_tasks.iter() { for (status, status_tasks) in affected_statuses_tasks.iter() {
if status_tasks.intersection_len(&tasks) == 0 { if !status_tasks.is_disjoint(&tasks) {
to_remove_from_statuses to_remove_from_statuses
.entry(*status) .entry(*status)
.or_insert_with(RoaringBitmap::new) .or_insert_with(RoaringBitmap::new)
@ -754,7 +754,7 @@ impl IndexScheduler {
} }
for (kind, kind_tasks) in affected_kinds_tasks.iter() { for (kind, kind_tasks) in affected_kinds_tasks.iter() {
if kind_tasks.intersection_len(&tasks) == 0 { if !kind_tasks.is_disjoint(&tasks) {
to_remove_from_kinds to_remove_from_kinds
.entry(*kind) .entry(*kind)
.or_insert_with(RoaringBitmap::new) .or_insert_with(RoaringBitmap::new)
@ -771,31 +771,81 @@ impl IndexScheduler {
} }
} }
// 5. Prepare to remove the datetimes from the tasks
progress.update_progress(TaskDeletionProgress::RetrievingTaskDateTimes);
let task_enqueued_changes = prepare_remove_datetimes(
&rtxn,
tasks_enqueued_to_remove,
self.queue.tasks.enqueued_at,
)?;
let task_started_changes = prepare_remove_datetimes(
&rtxn,
tasks_started_to_remove,
self.queue.tasks.started_at,
)?;
let task_finished_changes = prepare_remove_datetimes(
&rtxn,
tasks_finished_to_remove,
self.queue.tasks.finished_at,
)?;
// 6. Prepare to remove the datetimes from the batches
progress.update_progress(TaskDeletionProgress::RetrievingBatchDateTimes);
let batch_enqueued_changes = prepare_remove_batch_datetimes(
&rtxn,
&to_delete_batches,
self.queue.batches.enqueued_at,
)?;
let batch_started_changes = prepare_remove_batch_datetimes(
&rtxn,
&to_delete_batches,
self.queue.batches.started_at,
)?;
let batch_finished_changes = prepare_remove_batch_datetimes(
&rtxn,
&to_delete_batches,
self.queue.batches.finished_at,
)?;
drop(rtxn); drop(rtxn);
let mut owned_wtxn = self.env.write_txn()?; let mut owned_wtxn = self.env.write_txn()?;
let wtxn = &mut owned_wtxn; let wtxn = &mut owned_wtxn;
// 5. Remove task datetimes // 7. Remove task datetimes
progress.update_progress(TaskDeletionProgress::DeletingTasksDateTime); progress.update_progress(TaskDeletionProgress::DeletingTasksDateTime);
remove_datetimes(wtxn, tasks_enqueued_to_remove, self.queue.tasks.enqueued_at)?; remove_datetimes(wtxn, task_enqueued_changes, self.queue.tasks.enqueued_at)?;
remove_datetimes(wtxn, tasks_started_to_remove, self.queue.tasks.started_at)?; remove_datetimes(wtxn, task_started_changes, self.queue.tasks.started_at)?;
remove_datetimes(wtxn, tasks_finished_to_remove, self.queue.tasks.finished_at)?; remove_datetimes(wtxn, task_finished_changes, self.queue.tasks.finished_at)?;
// 6. Delete batches datetimes // 8. Delete batches datetimes
progress.update_progress(TaskDeletionProgress::DeletingBatchesDateTime); progress.update_progress(TaskDeletionProgress::DeletingBatchesDateTime);
remove_batch_datetimes(wtxn, &to_delete_batches, self.queue.batches.enqueued_at)?; remove_datetimes(
wtxn,
batch_enqueued_changes,
self.queue.batches.enqueued_at,
)?;
remove_batch_datetimes(wtxn, &to_delete_batches, self.queue.batches.started_at)?; remove_datetimes(
wtxn,
batch_started_changes,
self.queue.batches.started_at,
)?;
remove_batch_datetimes(wtxn, &to_delete_batches, self.queue.batches.finished_at)?; remove_datetimes(
wtxn,
batch_finished_changes,
self.queue.batches.finished_at,
)?;
remove_datetimes(wtxn, batches_finished_to_remove, self.queue.batches.finished_at)?; // 9. Remove tasks from indexes, statuses, and kinds
// 7. Remove tasks from indexes, statuses, and kinds
progress.update_progress(TaskDeletionProgress::DeletingTasksMetadata); progress.update_progress(TaskDeletionProgress::DeletingTasksMetadata);
let (atomic_progress, task_progress) = AtomicTaskStep::new( let (atomic_progress, task_progress) = AtomicTaskStep::new(
(affected_indexes.len() + affected_statuses.len() + affected_kinds.len()) as u32, (affected_indexes.len() + affected_statuses.len() + affected_kinds.len()) as u32,
@ -817,7 +867,7 @@ impl IndexScheduler {
atomic_progress.fetch_add(1, Ordering::Relaxed); atomic_progress.fetch_add(1, Ordering::Relaxed);
} }
// 8. Remove batches metadata from indexes, statuses, and kinds // 10. Remove batches metadata from indexes, statuses, and kinds
progress.update_progress(TaskDeletionProgress::DeletingBatchesMetadata); progress.update_progress(TaskDeletionProgress::DeletingBatchesMetadata);
for (index, batches) in to_remove_from_indexes { for (index, batches) in to_remove_from_indexes {
@ -838,7 +888,7 @@ impl IndexScheduler {
})?; })?;
} }
// 9. Delete tasks // 11. Delete tasks
progress.update_progress(TaskDeletionProgress::DeletingTasks); progress.update_progress(TaskDeletionProgress::DeletingTasks);
let (atomic_progress, task_progress) = let (atomic_progress, task_progress) =
AtomicTaskStep::new((to_delete_tasks.len() + affected_canceled_by.len()) as u32); AtomicTaskStep::new((to_delete_tasks.len() + affected_canceled_by.len()) as u32);
@ -860,7 +910,7 @@ impl IndexScheduler {
atomic_progress.fetch_add(1, Ordering::Relaxed); atomic_progress.fetch_add(1, Ordering::Relaxed);
} }
// 10. Delete batches // 12. Delete batches
progress.update_progress(TaskDeletionProgress::DeletingBatches); progress.update_progress(TaskDeletionProgress::DeletingBatches);
let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_batches.len() as u32); let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_batches.len() as u32);
progress.update_progress(task_progress); progress.update_progress(task_progress);