Optimization B

Speeds up the subtask "deleting batches" by 9%
This commit is contained in:
Mubelotix
2025-08-08 11:32:17 +02:00
parent 31c4215ad2
commit 14cb1bbbfb

View File

@ -646,6 +646,26 @@ impl IndexScheduler {
progress.update_progress(TaskDeletionProgress::DeletingBatches); progress.update_progress(TaskDeletionProgress::DeletingBatches);
let (atomic_progress, batch_progress) = AtomicBatchStep::new(affected_batches.len() as u32); let (atomic_progress, batch_progress) = AtomicBatchStep::new(affected_batches.len() as u32);
progress.update_progress(batch_progress); progress.update_progress(batch_progress);
let mut affected_indexes_tasks = HashMap::new();
for index in &affected_indexes {
affected_indexes_tasks
.insert(index.as_str(), self.queue.tasks.index_tasks(wtxn, index)?);
}
let mut to_remove_from_indexes = HashMap::new();
let mut affected_statuses_tasks = HashMap::new();
for status in &affected_statuses {
affected_statuses_tasks.insert(*status, self.queue.tasks.get_status(wtxn, *status)?);
}
let mut to_remove_from_statuses = HashMap::new();
let mut affected_kinds_tasks = HashMap::new();
for kind in &affected_kinds {
affected_kinds_tasks.insert(*kind, self.queue.tasks.get_kind(wtxn, *kind)?);
}
let mut to_remove_from_kinds = HashMap::new();
for (batch_id, to_delete_tasks) in affected_batches { for (batch_id, to_delete_tasks) in affected_batches {
if let Some(mut tasks) = self.queue.batch_to_tasks_mapping.get(wtxn, &batch_id)? { if let Some(mut tasks) = self.queue.batch_to_tasks_mapping.get(wtxn, &batch_id)? {
tasks -= &to_delete_tasks; tasks -= &to_delete_tasks;
@ -699,39 +719,54 @@ impl IndexScheduler {
// Anyway, we must remove the batch from all its reverse indexes. // Anyway, we must remove the batch from all its reverse indexes.
// The only way to do that is to check // The only way to do that is to check
for index in affected_indexes.iter() { for (index, index_tasks) in affected_indexes_tasks.iter() {
let index_tasks = self.queue.tasks.index_tasks(wtxn, index)?; if index_tasks.intersection_len(&tasks) == 0 {
let remaining_index_tasks = index_tasks & &tasks; to_remove_from_indexes
if remaining_index_tasks.is_empty() { .entry(index)
self.queue.batches.update_index(wtxn, index, |bitmap| { .or_insert_with(RoaringBitmap::new)
bitmap.remove(batch_id); .insert(batch_id);
})?;
} }
} }
for status in affected_statuses.iter() { for (status, status_tasks) in affected_statuses_tasks.iter() {
let status_tasks = self.queue.tasks.get_status(wtxn, *status)?; if status_tasks.intersection_len(&tasks) == 0 {
let remaining_status_tasks = status_tasks & &tasks; to_remove_from_statuses
if remaining_status_tasks.is_empty() { .entry(*status)
self.queue.batches.update_status(wtxn, *status, |bitmap| { .or_insert_with(RoaringBitmap::new)
bitmap.remove(batch_id); .insert(batch_id);
})?;
} }
} }
for kind in affected_kinds.iter() { for (kind, kind_tasks) in affected_kinds_tasks.iter() {
let kind_tasks = self.queue.tasks.get_kind(wtxn, *kind)?; if kind_tasks.intersection_len(&tasks) == 0 {
let remaining_kind_tasks = kind_tasks & &tasks; to_remove_from_kinds
if remaining_kind_tasks.is_empty() { .entry(*kind)
self.queue.batches.update_kind(wtxn, *kind, |bitmap| { .or_insert_with(RoaringBitmap::new)
bitmap.remove(batch_id); .insert(batch_id);
})?;
} }
} }
} }
atomic_progress.fetch_add(1, Ordering::Relaxed); atomic_progress.fetch_add(1, Ordering::Relaxed);
} }
for (index, batches) in to_remove_from_indexes {
self.queue.tasks.update_index(wtxn, index, |index_tasks| {
*index_tasks -= &batches;
})?;
}
for (status, batches) in to_remove_from_statuses {
self.queue.tasks.update_status(wtxn, status, |status_tasks| {
*status_tasks -= &batches;
})?;
}
for (kind, batches) in to_remove_from_kinds {
self.queue.tasks.update_kind(wtxn, kind, |kind_tasks| {
*kind_tasks -= &batches;
})?;
}
Ok(to_delete_tasks) Ok(to_delete_tasks)
} }