mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-09-05 20:26:31 +00:00
Clean code
This commit is contained in:
@ -84,11 +84,11 @@ make_enum_progress! {
|
|||||||
RetrievingTasks,
|
RetrievingTasks,
|
||||||
RetrievingBatches,
|
RetrievingBatches,
|
||||||
DeletingTasksDateTime,
|
DeletingTasksDateTime,
|
||||||
|
DeletingBatchesDateTime,
|
||||||
DeletingTasksMetadata,
|
DeletingTasksMetadata,
|
||||||
|
DeletingBatchesMetadata,
|
||||||
DeletingTasks,
|
DeletingTasks,
|
||||||
DeletingBatches,
|
DeletingBatches,
|
||||||
DeletingBatchesDateTime,
|
|
||||||
DeletingBatchesMetadata,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
use std::collections::{BTreeSet, HashMap, HashSet};
|
use std::collections::{BTreeSet, HashMap, HashSet};
|
||||||
use std::ops::{Bound, RangeInclusive};
|
use std::ops::RangeInclusive;
|
||||||
use std::panic::{catch_unwind, AssertUnwindSafe};
|
use std::panic::{catch_unwind, AssertUnwindSafe};
|
||||||
use std::sync::atomic::Ordering;
|
use std::sync::atomic::Ordering;
|
||||||
|
|
||||||
@ -7,10 +7,11 @@ use meilisearch_types::batches::{BatchEnqueuedAt, BatchId};
|
|||||||
use meilisearch_types::heed::{Database, RoTxn, RwTxn};
|
use meilisearch_types::heed::{Database, RoTxn, RwTxn};
|
||||||
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
|
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
|
||||||
use meilisearch_types::milli::{self, CboRoaringBitmapCodec, ChannelCongestion};
|
use meilisearch_types::milli::{self, CboRoaringBitmapCodec, ChannelCongestion};
|
||||||
use meilisearch_types::tasks::{self, Details, IndexSwap, Kind, KindWithContent, Status, Task};
|
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task};
|
||||||
use meilisearch_types::versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
|
use meilisearch_types::versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
|
||||||
use milli::update::Settings as MilliSettings;
|
use milli::update::Settings as MilliSettings;
|
||||||
use roaring::RoaringBitmap;
|
use roaring::RoaringBitmap;
|
||||||
|
use time::OffsetDateTime;
|
||||||
|
|
||||||
use super::create_batch::Batch;
|
use super::create_batch::Batch;
|
||||||
use crate::processing::{
|
use crate::processing::{
|
||||||
@ -18,10 +19,7 @@ use crate::processing::{
|
|||||||
InnerSwappingTwoIndexes, SwappingTheIndexes, TaskCancelationProgress, TaskDeletionProgress,
|
InnerSwappingTwoIndexes, SwappingTheIndexes, TaskCancelationProgress, TaskDeletionProgress,
|
||||||
UpdateIndexProgress,
|
UpdateIndexProgress,
|
||||||
};
|
};
|
||||||
use crate::utils::{
|
use crate::utils::{remove_n_tasks_datetime_earlier_than, swap_index_uid_in_task, ProcessingBatch};
|
||||||
remove_n_tasks_datetime_earlier_than, remove_task_datetime, swap_index_uid_in_task,
|
|
||||||
ProcessingBatch,
|
|
||||||
};
|
|
||||||
use crate::{Error, IndexScheduler, Result, TaskId, BEI128};
|
use crate::{Error, IndexScheduler, Result, TaskId, BEI128};
|
||||||
|
|
||||||
#[derive(Debug, Default)]
|
#[derive(Debug, Default)]
|
||||||
@ -508,15 +506,16 @@ impl IndexScheduler {
|
|||||||
/// Delete each given task from all the databases (if it is deleteable).
|
/// Delete each given task from all the databases (if it is deleteable).
|
||||||
///
|
///
|
||||||
/// Return the number of tasks that were actually deleted.
|
/// Return the number of tasks that were actually deleted.
|
||||||
|
#[allow(clippy::reversed_empty_ranges)]
|
||||||
fn delete_matched_tasks(
|
fn delete_matched_tasks(
|
||||||
&self,
|
&self,
|
||||||
matched_tasks: &RoaringBitmap,
|
matched_tasks: &RoaringBitmap,
|
||||||
progress: &Progress,
|
progress: &Progress,
|
||||||
) -> Result<RoaringBitmap> {
|
) -> Result<RoaringBitmap> {
|
||||||
fn consecutive_ranges<I>(iter: I) -> impl Iterator<Item = (u32, u32)>
|
/// Given a **sorted** iterator of `u32`, return an iterator of the ranges of consecutive values it contains.
|
||||||
where
|
fn consecutive_ranges(
|
||||||
I: IntoIterator<Item = u32>,
|
iter: impl IntoIterator<Item = u32>,
|
||||||
{
|
) -> impl Iterator<Item = RangeInclusive<u32>> {
|
||||||
let mut iter = iter.into_iter().peekable();
|
let mut iter = iter.into_iter().peekable();
|
||||||
|
|
||||||
std::iter::from_fn(move || {
|
std::iter::from_fn(move || {
|
||||||
@ -533,7 +532,7 @@ impl IndexScheduler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Some((start, end))
|
Some(start..=end)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -543,25 +542,53 @@ impl IndexScheduler {
|
|||||||
mut to_remove: HashMap<i128, RoaringBitmap>,
|
mut to_remove: HashMap<i128, RoaringBitmap>,
|
||||||
db: Database<BEI128, CboRoaringBitmapCodec>,
|
db: Database<BEI128, CboRoaringBitmapCodec>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
if !to_remove.is_empty() {
|
if to_remove.is_empty() {
|
||||||
let mut iter = db.rev_range_mut(wtxn, &range)?;
|
return Ok(());
|
||||||
while let Some(i) = iter.next() {
|
}
|
||||||
let (timestamp, mut tasks) = i?;
|
|
||||||
if let Some(to_remove_tasks) = to_remove.remove(×tamp) {
|
let mut iter = db.rev_range_mut(wtxn, &range)?;
|
||||||
tasks -= &to_remove_tasks;
|
while let Some(i) = iter.next() {
|
||||||
if tasks.is_empty() {
|
let (timestamp, mut tasks) = i?;
|
||||||
// safety: We don't keep references to the database
|
if let Some(to_remove_tasks) = to_remove.remove(×tamp) {
|
||||||
unsafe { iter.del_current()? };
|
tasks -= &to_remove_tasks;
|
||||||
} else {
|
if tasks.is_empty() {
|
||||||
// safety: We don't keep references to the database
|
// safety: We don't keep references to the database
|
||||||
unsafe { iter.put_current(×tamp, &tasks)? };
|
unsafe { iter.del_current()? };
|
||||||
}
|
} else {
|
||||||
|
// safety: We don't keep references to the database
|
||||||
|
unsafe { iter.put_current(×tamp, &tasks)? };
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
trait UnixTimestampNanosOpt {
|
||||||
|
fn unix_timestamp_nanos(self) -> Option<i128>;
|
||||||
|
}
|
||||||
|
impl UnixTimestampNanosOpt for Option<OffsetDateTime> {
|
||||||
|
fn unix_timestamp_nanos(self) -> Option<i128> {
|
||||||
|
self.map(|dt| dt.unix_timestamp_nanos())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl UnixTimestampNanosOpt for i128 {
|
||||||
|
fn unix_timestamp_nanos(self) -> Option<i128> {
|
||||||
|
Some(self)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Extends a range to include the given value
|
||||||
|
fn extend_range(value: impl UnixTimestampNanosOpt, range: &mut RangeInclusive<i128>) {
|
||||||
|
let Some(value) = value.unix_timestamp_nanos() else { return };
|
||||||
|
if value < *range.start() {
|
||||||
|
*range = value..=*range.end();
|
||||||
|
}
|
||||||
|
if value > *range.end() {
|
||||||
|
*range = *range.start()..=value;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
progress.update_progress(TaskDeletionProgress::RetrievingTasks);
|
progress.update_progress(TaskDeletionProgress::RetrievingTasks);
|
||||||
|
|
||||||
let rtxn = self.env.read_txn()?;
|
let rtxn = self.env.read_txn()?;
|
||||||
@ -569,37 +596,27 @@ impl IndexScheduler {
|
|||||||
// 1. Remove from this list the tasks that we are not allowed to delete
|
// 1. Remove from this list the tasks that we are not allowed to delete
|
||||||
let enqueued_tasks = self.queue.tasks.get_status(&rtxn, Status::Enqueued)?;
|
let enqueued_tasks = self.queue.tasks.get_status(&rtxn, Status::Enqueued)?;
|
||||||
let processing_tasks = &self.processing_tasks.read().unwrap().processing.clone();
|
let processing_tasks = &self.processing_tasks.read().unwrap().processing.clone();
|
||||||
|
|
||||||
let all_task_ids = self.queue.tasks.all_task_ids(&rtxn)?;
|
let all_task_ids = self.queue.tasks.all_task_ids(&rtxn)?;
|
||||||
let mut to_delete_tasks = all_task_ids & matched_tasks;
|
let mut to_delete_tasks = all_task_ids & matched_tasks;
|
||||||
to_delete_tasks -= &**processing_tasks;
|
to_delete_tasks -= &**processing_tasks;
|
||||||
to_delete_tasks -= &enqueued_tasks;
|
to_delete_tasks -= &enqueued_tasks;
|
||||||
|
|
||||||
// 2. We now have a list of tasks to delete, delete them
|
// 2. We now have a list of tasks to delete. Read their metadata to list what needs to be updated.
|
||||||
let mut affected_indexes = HashSet::new();
|
let mut affected_indexes = HashSet::new();
|
||||||
let mut affected_statuses = HashSet::new();
|
let mut affected_statuses = HashSet::new();
|
||||||
let mut affected_kinds = HashSet::new();
|
let mut affected_kinds = HashSet::new();
|
||||||
let mut affected_canceled_by = RoaringBitmap::new();
|
let mut affected_canceled_by = RoaringBitmap::new();
|
||||||
// The tasks that have been removed *per batches*.
|
let mut affected_batches: HashMap<BatchId, RoaringBitmap> = HashMap::new(); // The tasks that have been removed *per batches*.
|
||||||
let mut affected_batches: HashMap<BatchId, RoaringBitmap> = HashMap::new();
|
let mut enqueued_range = i128::MAX..=i128::MIN;
|
||||||
|
let mut started_range = i128::MAX..=i128::MIN;
|
||||||
let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u32);
|
let mut finished_range = i128::MAX..=i128::MIN;
|
||||||
progress.update_progress(task_progress);
|
|
||||||
let mut min_enqueued = i128::MAX;
|
|
||||||
let mut max_enqueued = i128::MIN;
|
|
||||||
let mut min_started = i128::MAX;
|
|
||||||
let mut max_started = i128::MIN;
|
|
||||||
let mut min_finished = i128::MAX;
|
|
||||||
let mut max_finished = i128::MIN;
|
|
||||||
let mut tasks_enqueued_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
let mut tasks_enqueued_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
||||||
let mut tasks_started_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
let mut tasks_started_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
||||||
let mut tasks_finished_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
let mut tasks_finished_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
||||||
|
let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u32);
|
||||||
|
progress.update_progress(task_progress);
|
||||||
for range in consecutive_ranges(to_delete_tasks.iter()) {
|
for range in consecutive_ranges(to_delete_tasks.iter()) {
|
||||||
let iter = self
|
let iter = self.queue.tasks.all_tasks.range(&rtxn, &range)?;
|
||||||
.queue
|
|
||||||
.tasks
|
|
||||||
.all_tasks
|
|
||||||
.range(&rtxn, &(Bound::Included(range.0), Bound::Included(range.1)))?;
|
|
||||||
for task in iter {
|
for task in iter {
|
||||||
let (task_id, task) = task?;
|
let (task_id, task) = task?;
|
||||||
|
|
||||||
@ -608,33 +625,16 @@ impl IndexScheduler {
|
|||||||
affected_kinds.insert(task.kind.as_kind());
|
affected_kinds.insert(task.kind.as_kind());
|
||||||
|
|
||||||
let enqueued_at = task.enqueued_at.unix_timestamp_nanos();
|
let enqueued_at = task.enqueued_at.unix_timestamp_nanos();
|
||||||
if enqueued_at < min_enqueued {
|
extend_range(enqueued_at, &mut enqueued_range);
|
||||||
min_enqueued = enqueued_at;
|
|
||||||
}
|
|
||||||
if enqueued_at > max_enqueued {
|
|
||||||
max_enqueued = enqueued_at;
|
|
||||||
}
|
|
||||||
tasks_enqueued_to_remove.entry(enqueued_at).or_default().insert(task_id);
|
tasks_enqueued_to_remove.entry(enqueued_at).or_default().insert(task_id);
|
||||||
|
|
||||||
if let Some(started_at) = task.started_at {
|
if let Some(started_at) = task.started_at.unix_timestamp_nanos() {
|
||||||
let started_at = started_at.unix_timestamp_nanos();
|
extend_range(started_at, &mut started_range);
|
||||||
if started_at < min_started {
|
|
||||||
min_started = started_at;
|
|
||||||
}
|
|
||||||
if started_at > max_started {
|
|
||||||
max_started = started_at;
|
|
||||||
}
|
|
||||||
tasks_started_to_remove.entry(started_at).or_default().insert(task_id);
|
tasks_started_to_remove.entry(started_at).or_default().insert(task_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(finished_at) = task.finished_at {
|
if let Some(finished_at) = task.finished_at.unix_timestamp_nanos() {
|
||||||
let finished_at = finished_at.unix_timestamp_nanos();
|
extend_range(finished_at, &mut finished_range);
|
||||||
if finished_at < min_finished {
|
|
||||||
min_finished = finished_at;
|
|
||||||
}
|
|
||||||
if finished_at > max_finished {
|
|
||||||
max_finished = finished_at;
|
|
||||||
}
|
|
||||||
tasks_finished_to_remove.entry(finished_at).or_default().insert(task_id);
|
tasks_finished_to_remove.entry(finished_at).or_default().insert(task_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -648,6 +648,7 @@ impl IndexScheduler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 3. Read the tasks by indexes, statuses and kinds
|
||||||
let mut affected_indexes_tasks = HashMap::new();
|
let mut affected_indexes_tasks = HashMap::new();
|
||||||
for index in &affected_indexes {
|
for index in &affected_indexes {
|
||||||
affected_indexes_tasks
|
affected_indexes_tasks
|
||||||
@ -667,43 +668,35 @@ impl IndexScheduler {
|
|||||||
}
|
}
|
||||||
let mut to_remove_from_kinds = HashMap::new();
|
let mut to_remove_from_kinds = HashMap::new();
|
||||||
|
|
||||||
|
// 3. Read affected batches to list metadata that needs to be updated.
|
||||||
|
let mut batches_enqueued_range = i128::MAX..=i128::MIN;
|
||||||
|
let mut batches_started_range = i128::MAX..=i128::MIN;
|
||||||
|
let mut batches_finished_range = i128::MAX..=i128::MIN;
|
||||||
let mut batches_enqueued_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
let mut batches_enqueued_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
||||||
let mut batches_min_enqueued = i128::MAX;
|
|
||||||
let mut batches_max_enqueued = i128::MIN;
|
|
||||||
let mut batches_started_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
let mut batches_started_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
||||||
let mut batches_min_started = i128::MAX;
|
|
||||||
let mut batches_max_started = i128::MIN;
|
|
||||||
let mut batches_finished_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
let mut batches_finished_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
||||||
let mut batches_min_finished = i128::MAX;
|
|
||||||
let mut batches_max_finished = i128::MIN;
|
|
||||||
|
|
||||||
let mut to_delete_batches = RoaringBitmap::new();
|
let mut to_delete_batches = RoaringBitmap::new();
|
||||||
let mut tasks_to_remove_earlier = Vec::new();
|
let mut tasks_to_remove_earlier = Vec::new();
|
||||||
|
|
||||||
progress.update_progress(TaskDeletionProgress::RetrievingBatches);
|
progress.update_progress(TaskDeletionProgress::RetrievingBatches);
|
||||||
let affected_batches_bitmap = RoaringBitmap::from_iter(affected_batches.keys());
|
let affected_batches_bitmap = RoaringBitmap::from_iter(affected_batches.keys());
|
||||||
|
let (atomic_progress, task_progress) =
|
||||||
|
AtomicBatchStep::new(affected_batches_bitmap.len() as u32);
|
||||||
|
progress.update_progress(task_progress);
|
||||||
for range in consecutive_ranges(affected_batches_bitmap.iter()) {
|
for range in consecutive_ranges(affected_batches_bitmap.iter()) {
|
||||||
let iter = self
|
let iter = self.queue.batch_to_tasks_mapping.range(&rtxn, &range)?;
|
||||||
.queue
|
for batch in iter {
|
||||||
.batch_to_tasks_mapping
|
let (batch_id, mut tasks) = batch?;
|
||||||
.range(&rtxn, &(Bound::Included(range.0), Bound::Included(range.1)))?;
|
|
||||||
for i in iter {
|
|
||||||
let (batch_id, mut tasks) = i?;
|
|
||||||
let to_delete_tasks = affected_batches.remove(&batch_id).unwrap_or_default();
|
let to_delete_tasks = affected_batches.remove(&batch_id).unwrap_or_default();
|
||||||
|
|
||||||
tasks -= &to_delete_tasks;
|
tasks -= &to_delete_tasks;
|
||||||
|
|
||||||
// We must remove the batch entirely
|
// We must remove the batch entirely
|
||||||
if tasks.is_empty() {
|
if tasks.is_empty() {
|
||||||
if let Some(batch) = self.queue.batches.get_batch(&rtxn, batch_id)? {
|
if let Some(batch) = self.queue.batches.get_batch(&rtxn, batch_id)? {
|
||||||
if let Some(BatchEnqueuedAt { earliest, oldest }) = batch.enqueued_at {
|
if let Some(BatchEnqueuedAt { earliest, oldest }) = batch.enqueued_at {
|
||||||
let earliest = earliest.unix_timestamp_nanos();
|
let earliest = earliest.unix_timestamp_nanos();
|
||||||
let oldest = oldest.unix_timestamp_nanos();
|
let oldest = oldest.unix_timestamp_nanos();
|
||||||
if earliest < batches_min_enqueued {
|
extend_range(earliest, &mut batches_enqueued_range);
|
||||||
batches_min_enqueued = earliest;
|
extend_range(oldest, &mut batches_enqueued_range);
|
||||||
}
|
|
||||||
if oldest > batches_max_enqueued {
|
|
||||||
batches_max_enqueued = oldest;
|
|
||||||
}
|
|
||||||
batches_enqueued_to_remove
|
batches_enqueued_to_remove
|
||||||
.entry(earliest)
|
.entry(earliest)
|
||||||
.or_default()
|
.or_default()
|
||||||
@ -719,21 +712,11 @@ impl IndexScheduler {
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
let started_at = batch.started_at.unix_timestamp_nanos();
|
let started_at = batch.started_at.unix_timestamp_nanos();
|
||||||
if started_at < batches_min_started {
|
extend_range(started_at, &mut batches_started_range);
|
||||||
batches_min_started = started_at;
|
|
||||||
}
|
|
||||||
if started_at > batches_max_started {
|
|
||||||
batches_max_started = started_at;
|
|
||||||
}
|
|
||||||
batches_started_to_remove.entry(started_at).or_default().insert(batch_id);
|
batches_started_to_remove.entry(started_at).or_default().insert(batch_id);
|
||||||
if let Some(finished_at) = batch.finished_at {
|
if let Some(finished_at) = batch.finished_at {
|
||||||
let finished_at = finished_at.unix_timestamp_nanos();
|
let finished_at = finished_at.unix_timestamp_nanos();
|
||||||
if finished_at < batches_min_finished {
|
extend_range(finished_at, &mut batches_finished_range);
|
||||||
batches_min_finished = finished_at;
|
|
||||||
}
|
|
||||||
if finished_at > batches_max_finished {
|
|
||||||
batches_max_finished = finished_at;
|
|
||||||
}
|
|
||||||
batches_finished_to_remove
|
batches_finished_to_remove
|
||||||
.entry(finished_at)
|
.entry(finished_at)
|
||||||
.or_default()
|
.or_default()
|
||||||
@ -745,7 +728,7 @@ impl IndexScheduler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Anyway, we must remove the batch from all its reverse indexes.
|
// Anyway, we must remove the batch from all its reverse indexes.
|
||||||
// The only way to do that is to check
|
// Check if those are affected by the task deletion.
|
||||||
|
|
||||||
for (index, index_tasks) in affected_indexes_tasks.iter() {
|
for (index, index_tasks) in affected_indexes_tasks.iter() {
|
||||||
if index_tasks.intersection_len(&tasks) == 0 {
|
if index_tasks.intersection_len(&tasks) == 0 {
|
||||||
@ -773,43 +756,85 @@ impl IndexScheduler {
|
|||||||
.insert(batch_id);
|
.insert(batch_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Note: no need to delete the persisted task data since
|
||||||
|
// we can only delete succeeded, failed, and canceled tasks.
|
||||||
|
// In each of those cases, the persisted data is supposed to
|
||||||
|
// have been deleted already.
|
||||||
|
|
||||||
|
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Note: don't delete the persisted task data since
|
|
||||||
// we can only delete succeeded, failed, and canceled tasks.
|
|
||||||
// In each of those cases, the persisted data is supposed to
|
|
||||||
// have been deleted already.
|
|
||||||
|
|
||||||
drop(rtxn);
|
drop(rtxn);
|
||||||
let mut wtxn_owned = self.env.write_txn()?;
|
let mut owned_wtxn = self.env.write_txn()?;
|
||||||
let wtxn = &mut wtxn_owned;
|
let wtxn = &mut owned_wtxn;
|
||||||
|
|
||||||
|
// 4. Remove task datetimes
|
||||||
progress.update_progress(TaskDeletionProgress::DeletingTasksDateTime);
|
progress.update_progress(TaskDeletionProgress::DeletingTasksDateTime);
|
||||||
|
|
||||||
remove_datetimes(
|
remove_datetimes(
|
||||||
wtxn,
|
wtxn,
|
||||||
min_enqueued..=max_enqueued,
|
enqueued_range,
|
||||||
tasks_enqueued_to_remove,
|
tasks_enqueued_to_remove,
|
||||||
|
self.queue.tasks.enqueued_at,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
remove_datetimes(
|
||||||
|
wtxn,
|
||||||
|
started_range,
|
||||||
|
tasks_started_to_remove,
|
||||||
|
self.queue.tasks.started_at,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
remove_datetimes(
|
||||||
|
wtxn,
|
||||||
|
finished_range,
|
||||||
|
tasks_finished_to_remove,
|
||||||
|
self.queue.tasks.finished_at,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
// 8. Delete batches datetimes
|
||||||
|
progress.update_progress(TaskDeletionProgress::DeletingBatchesDateTime);
|
||||||
|
|
||||||
|
for (started_at, nb_tasks, batch_id) in tasks_to_remove_earlier {
|
||||||
|
remove_n_tasks_datetime_earlier_than(
|
||||||
|
wtxn,
|
||||||
|
self.queue.batches.enqueued_at,
|
||||||
|
started_at,
|
||||||
|
nb_tasks,
|
||||||
|
batch_id,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
remove_datetimes(
|
||||||
|
wtxn,
|
||||||
|
batches_enqueued_range,
|
||||||
|
batches_enqueued_to_remove,
|
||||||
self.queue.batches.enqueued_at,
|
self.queue.batches.enqueued_at,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
remove_datetimes(
|
remove_datetimes(
|
||||||
wtxn,
|
wtxn,
|
||||||
min_started..=max_started,
|
batches_started_range,
|
||||||
tasks_started_to_remove,
|
batches_started_to_remove,
|
||||||
self.queue.batches.started_at,
|
self.queue.batches.started_at,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
remove_datetimes(
|
remove_datetimes(
|
||||||
wtxn,
|
wtxn,
|
||||||
min_finished..=max_finished,
|
batches_finished_range,
|
||||||
tasks_finished_to_remove,
|
batches_finished_to_remove,
|
||||||
self.queue.batches.finished_at,
|
self.queue.batches.finished_at,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
// 5. Remove tasks from indexes, statuses, and kinds
|
||||||
progress.update_progress(TaskDeletionProgress::DeletingTasksMetadata);
|
progress.update_progress(TaskDeletionProgress::DeletingTasksMetadata);
|
||||||
let (atomic_progress, task_progress) = AtomicTaskStep::new(
|
let (atomic_progress, task_progress) = AtomicTaskStep::new(
|
||||||
(affected_indexes.len() + affected_statuses.len() + affected_kinds.len()) as u32,
|
(affected_indexes.len() + affected_statuses.len() + affected_kinds.len()) as u32,
|
||||||
);
|
);
|
||||||
progress.update_progress(task_progress);
|
progress.update_progress(task_progress);
|
||||||
|
|
||||||
for index in affected_indexes.iter() {
|
for index in affected_indexes.iter() {
|
||||||
self.queue.tasks.update_index(wtxn, index, |bitmap| *bitmap -= &to_delete_tasks)?;
|
self.queue.tasks.update_index(wtxn, index, |bitmap| *bitmap -= &to_delete_tasks)?;
|
||||||
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
||||||
@ -825,70 +850,7 @@ impl IndexScheduler {
|
|||||||
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
progress.update_progress(TaskDeletionProgress::DeletingTasks);
|
// 9. Remove batches metadata from indexes, statuses, and kinds
|
||||||
let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u32);
|
|
||||||
progress.update_progress(task_progress);
|
|
||||||
for range in consecutive_ranges(to_delete_tasks.iter()) {
|
|
||||||
self.queue
|
|
||||||
.tasks
|
|
||||||
.all_tasks
|
|
||||||
.delete_range(wtxn, &(Bound::Included(range.0), Bound::Included(range.1)))?;
|
|
||||||
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
for canceled_by in affected_canceled_by {
|
|
||||||
if let Some(mut tasks) = self.queue.tasks.canceled_by.get(wtxn, &canceled_by)? {
|
|
||||||
tasks -= &to_delete_tasks;
|
|
||||||
if tasks.is_empty() {
|
|
||||||
self.queue.tasks.canceled_by.delete(wtxn, &canceled_by)?;
|
|
||||||
} else {
|
|
||||||
self.queue.tasks.canceled_by.put(wtxn, &canceled_by, &tasks)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
progress.update_progress(TaskDeletionProgress::DeletingBatches);
|
|
||||||
|
|
||||||
for range in consecutive_ranges(to_delete_batches.iter()) {
|
|
||||||
self.queue
|
|
||||||
.batches
|
|
||||||
.all_batches
|
|
||||||
.delete_range(wtxn, &(Bound::Included(range.0), Bound::Included(range.1)))?;
|
|
||||||
self.queue
|
|
||||||
.batch_to_tasks_mapping
|
|
||||||
.delete_range(wtxn, &(Bound::Included(range.0), Bound::Included(range.1)))?;
|
|
||||||
}
|
|
||||||
|
|
||||||
progress.update_progress(TaskDeletionProgress::DeletingBatchesDateTime);
|
|
||||||
|
|
||||||
for (started_at, nb_tasks, batch_id) in tasks_to_remove_earlier {
|
|
||||||
remove_n_tasks_datetime_earlier_than(
|
|
||||||
wtxn,
|
|
||||||
self.queue.batches.enqueued_at,
|
|
||||||
started_at,
|
|
||||||
nb_tasks,
|
|
||||||
batch_id,
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
remove_datetimes(
|
|
||||||
wtxn,
|
|
||||||
batches_min_enqueued..=batches_max_enqueued,
|
|
||||||
batches_enqueued_to_remove,
|
|
||||||
self.queue.batches.enqueued_at,
|
|
||||||
)?;
|
|
||||||
remove_datetimes(
|
|
||||||
wtxn,
|
|
||||||
batches_min_started..=batches_max_started,
|
|
||||||
batches_started_to_remove,
|
|
||||||
self.queue.batches.started_at,
|
|
||||||
)?;
|
|
||||||
remove_datetimes(
|
|
||||||
wtxn,
|
|
||||||
batches_min_finished..=batches_max_finished,
|
|
||||||
batches_finished_to_remove,
|
|
||||||
self.queue.batches.finished_at,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
progress.update_progress(TaskDeletionProgress::DeletingBatchesMetadata);
|
progress.update_progress(TaskDeletionProgress::DeletingBatchesMetadata);
|
||||||
|
|
||||||
for (index, batches) in to_remove_from_indexes {
|
for (index, batches) in to_remove_from_indexes {
|
||||||
@ -909,7 +871,39 @@ impl IndexScheduler {
|
|||||||
})?;
|
})?;
|
||||||
}
|
}
|
||||||
|
|
||||||
wtxn_owned.commit()?;
|
// 6. Delete tasks
|
||||||
|
progress.update_progress(TaskDeletionProgress::DeletingTasks);
|
||||||
|
let (atomic_progress, task_progress) =
|
||||||
|
AtomicTaskStep::new((to_delete_tasks.len() + affected_canceled_by.len()) as u32);
|
||||||
|
progress.update_progress(task_progress);
|
||||||
|
for range in consecutive_ranges(to_delete_tasks.iter()) {
|
||||||
|
self.queue.tasks.all_tasks.delete_range(wtxn, &range)?;
|
||||||
|
atomic_progress.fetch_add(range.size_hint().0 as u32, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
for canceled_by in affected_canceled_by {
|
||||||
|
if let Some(mut tasks) = self.queue.tasks.canceled_by.get(wtxn, &canceled_by)? {
|
||||||
|
tasks -= &to_delete_tasks;
|
||||||
|
if tasks.is_empty() {
|
||||||
|
self.queue.tasks.canceled_by.delete(wtxn, &canceled_by)?;
|
||||||
|
} else {
|
||||||
|
self.queue.tasks.canceled_by.put(wtxn, &canceled_by, &tasks)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 7. Delete batches
|
||||||
|
progress.update_progress(TaskDeletionProgress::DeletingBatches);
|
||||||
|
let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_batches.len() as u32);
|
||||||
|
progress.update_progress(task_progress);
|
||||||
|
for range in consecutive_ranges(to_delete_batches.iter()) {
|
||||||
|
self.queue.batches.all_batches.delete_range(wtxn, &range)?;
|
||||||
|
self.queue.batch_to_tasks_mapping.delete_range(wtxn, &range)?;
|
||||||
|
atomic_progress.fetch_add(range.size_hint().0 as u32, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
owned_wtxn.commit()?;
|
||||||
|
|
||||||
Ok(to_delete_tasks)
|
Ok(to_delete_tasks)
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user