mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-09-06 04:36:32 +00:00
Optim I: read batches before writing
This commit is contained in:
@ -7,7 +7,7 @@ use meilisearch_types::batches::{BatchEnqueuedAt, BatchId};
|
|||||||
use meilisearch_types::heed::{Database, RoTxn, RwTxn};
|
use meilisearch_types::heed::{Database, RoTxn, RwTxn};
|
||||||
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
|
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
|
||||||
use meilisearch_types::milli::{self, CboRoaringBitmapCodec, ChannelCongestion};
|
use meilisearch_types::milli::{self, CboRoaringBitmapCodec, ChannelCongestion};
|
||||||
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task};
|
use meilisearch_types::tasks::{self, Details, IndexSwap, Kind, KindWithContent, Status, Task};
|
||||||
use meilisearch_types::versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
|
use meilisearch_types::versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
|
||||||
use milli::update::Settings as MilliSettings;
|
use milli::update::Settings as MilliSettings;
|
||||||
use roaring::RoaringBitmap;
|
use roaring::RoaringBitmap;
|
||||||
@ -571,9 +571,9 @@ impl IndexScheduler {
|
|||||||
let processing_tasks = &self.processing_tasks.read().unwrap().processing.clone();
|
let processing_tasks = &self.processing_tasks.read().unwrap().processing.clone();
|
||||||
|
|
||||||
let all_task_ids = self.queue.tasks.all_task_ids(&rtxn)?;
|
let all_task_ids = self.queue.tasks.all_task_ids(&rtxn)?;
|
||||||
let mut tasks_to_remove = all_task_ids & matched_tasks;
|
let mut to_delete_tasks = all_task_ids & matched_tasks;
|
||||||
tasks_to_remove -= &**processing_tasks;
|
to_delete_tasks -= &**processing_tasks;
|
||||||
tasks_to_remove -= &enqueued_tasks;
|
to_delete_tasks -= &enqueued_tasks;
|
||||||
|
|
||||||
// 2. We now have a list of tasks to delete, delete them
|
// 2. We now have a list of tasks to delete, delete them
|
||||||
let mut affected_indexes = HashSet::new();
|
let mut affected_indexes = HashSet::new();
|
||||||
@ -583,7 +583,7 @@ impl IndexScheduler {
|
|||||||
// The tasks that have been removed *per batches*.
|
// The tasks that have been removed *per batches*.
|
||||||
let mut affected_batches: HashMap<BatchId, RoaringBitmap> = HashMap::new();
|
let mut affected_batches: HashMap<BatchId, RoaringBitmap> = HashMap::new();
|
||||||
|
|
||||||
let (atomic_progress, task_progress) = AtomicTaskStep::new(tasks_to_remove.len() as u32);
|
let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u32);
|
||||||
progress.update_progress(task_progress);
|
progress.update_progress(task_progress);
|
||||||
let mut min_enqueued = i128::MAX;
|
let mut min_enqueued = i128::MAX;
|
||||||
let mut max_enqueued = i128::MIN;
|
let mut max_enqueued = i128::MIN;
|
||||||
@ -591,10 +591,10 @@ impl IndexScheduler {
|
|||||||
let mut max_started = i128::MIN;
|
let mut max_started = i128::MIN;
|
||||||
let mut min_finished = i128::MAX;
|
let mut min_finished = i128::MAX;
|
||||||
let mut max_finished = i128::MIN;
|
let mut max_finished = i128::MIN;
|
||||||
let mut enqueued_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
let mut tasks_enqueued_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
||||||
let mut started_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
let mut tasks_started_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
||||||
let mut finished_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
let mut tasks_finished_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
||||||
for range in consecutive_ranges(tasks_to_remove.iter()) {
|
for range in consecutive_ranges(to_delete_tasks.iter()) {
|
||||||
let iter = self
|
let iter = self
|
||||||
.queue
|
.queue
|
||||||
.tasks
|
.tasks
|
||||||
@ -614,7 +614,7 @@ impl IndexScheduler {
|
|||||||
if enqueued_at > max_enqueued {
|
if enqueued_at > max_enqueued {
|
||||||
max_enqueued = enqueued_at;
|
max_enqueued = enqueued_at;
|
||||||
}
|
}
|
||||||
enqueued_to_remove.entry(enqueued_at).or_default().insert(task_id);
|
tasks_enqueued_to_remove.entry(enqueued_at).or_default().insert(task_id);
|
||||||
|
|
||||||
if let Some(started_at) = task.started_at {
|
if let Some(started_at) = task.started_at {
|
||||||
let started_at = started_at.unix_timestamp_nanos();
|
let started_at = started_at.unix_timestamp_nanos();
|
||||||
@ -624,7 +624,7 @@ impl IndexScheduler {
|
|||||||
if started_at > max_started {
|
if started_at > max_started {
|
||||||
max_started = started_at;
|
max_started = started_at;
|
||||||
}
|
}
|
||||||
started_to_remove.entry(started_at).or_default().insert(task_id);
|
tasks_started_to_remove.entry(started_at).or_default().insert(task_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(finished_at) = task.finished_at {
|
if let Some(finished_at) = task.finished_at {
|
||||||
@ -635,7 +635,7 @@ impl IndexScheduler {
|
|||||||
if finished_at > max_finished {
|
if finished_at > max_finished {
|
||||||
max_finished = finished_at;
|
max_finished = finished_at;
|
||||||
}
|
}
|
||||||
finished_to_remove.entry(finished_at).or_default().insert(task_id);
|
tasks_finished_to_remove.entry(finished_at).or_default().insert(task_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(canceled_by) = task.canceled_by {
|
if let Some(canceled_by) = task.canceled_by {
|
||||||
@ -648,158 +648,87 @@ impl IndexScheduler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Note: don't delete the persisted task data since
|
|
||||||
// we can only delete succeeded, failed, and canceled tasks.
|
|
||||||
// In each of those cases, the persisted data is supposed to
|
|
||||||
// have been deleted already.
|
|
||||||
|
|
||||||
drop(rtxn);
|
|
||||||
let mut wtxn_owned = self.env.write_txn()?;
|
|
||||||
let wtxn = &mut wtxn_owned;
|
|
||||||
|
|
||||||
progress.update_progress(TaskDeletionProgress::DeletingTasksDateTime);
|
|
||||||
remove_datetimes(
|
|
||||||
wtxn,
|
|
||||||
min_enqueued..=max_enqueued,
|
|
||||||
enqueued_to_remove,
|
|
||||||
self.queue.batches.enqueued_at,
|
|
||||||
)?;
|
|
||||||
remove_datetimes(
|
|
||||||
wtxn,
|
|
||||||
min_started..=max_started,
|
|
||||||
started_to_remove,
|
|
||||||
self.queue.batches.started_at,
|
|
||||||
)?;
|
|
||||||
remove_datetimes(
|
|
||||||
wtxn,
|
|
||||||
min_finished..=max_finished,
|
|
||||||
finished_to_remove,
|
|
||||||
self.queue.batches.finished_at,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
progress.update_progress(TaskDeletionProgress::DeletingTasksMetadata);
|
|
||||||
let (atomic_progress, task_progress) = AtomicTaskStep::new(
|
|
||||||
(affected_indexes.len() + affected_statuses.len() + affected_kinds.len()) as u32,
|
|
||||||
);
|
|
||||||
progress.update_progress(task_progress);
|
|
||||||
for index in affected_indexes.iter() {
|
|
||||||
self.queue.tasks.update_index(wtxn, index, |bitmap| *bitmap -= &tasks_to_remove)?;
|
|
||||||
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
|
|
||||||
for status in affected_statuses.iter() {
|
|
||||||
self.queue.tasks.update_status(wtxn, *status, |bitmap| *bitmap -= &tasks_to_remove)?;
|
|
||||||
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
|
|
||||||
for kind in affected_kinds.iter() {
|
|
||||||
self.queue.tasks.update_kind(wtxn, *kind, |bitmap| *bitmap -= &tasks_to_remove)?;
|
|
||||||
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
|
|
||||||
progress.update_progress(TaskDeletionProgress::DeletingTasks);
|
|
||||||
let (atomic_progress, task_progress) = AtomicTaskStep::new(tasks_to_remove.len() as u32);
|
|
||||||
progress.update_progress(task_progress);
|
|
||||||
for range in consecutive_ranges(tasks_to_remove.iter()) {
|
|
||||||
self.queue
|
|
||||||
.tasks
|
|
||||||
.all_tasks
|
|
||||||
.delete_range(wtxn, &(Bound::Included(range.0), Bound::Included(range.1)))?;
|
|
||||||
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
for canceled_by in affected_canceled_by {
|
|
||||||
if let Some(mut tasks) = self.queue.tasks.canceled_by.get(wtxn, &canceled_by)? {
|
|
||||||
tasks -= &tasks_to_remove;
|
|
||||||
if tasks.is_empty() {
|
|
||||||
self.queue.tasks.canceled_by.delete(wtxn, &canceled_by)?;
|
|
||||||
} else {
|
|
||||||
self.queue.tasks.canceled_by.put(wtxn, &canceled_by, &tasks)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
progress.update_progress(TaskDeletionProgress::DeletingBatches);
|
|
||||||
let (atomic_progress, batch_progress) = AtomicBatchStep::new(affected_batches.len() as u32);
|
|
||||||
progress.update_progress(batch_progress);
|
|
||||||
|
|
||||||
let mut affected_indexes_tasks = HashMap::new();
|
let mut affected_indexes_tasks = HashMap::new();
|
||||||
for index in &affected_indexes {
|
for index in &affected_indexes {
|
||||||
affected_indexes_tasks
|
affected_indexes_tasks
|
||||||
.insert(index.as_str(), self.queue.tasks.index_tasks(wtxn, index)?);
|
.insert(index.as_str(), self.queue.tasks.index_tasks(&rtxn, index)?);
|
||||||
}
|
}
|
||||||
let mut to_remove_from_indexes = HashMap::new();
|
let mut to_remove_from_indexes = HashMap::new();
|
||||||
|
|
||||||
let mut affected_statuses_tasks = HashMap::new();
|
let mut affected_statuses_tasks = HashMap::new();
|
||||||
for status in &affected_statuses {
|
for status in &affected_statuses {
|
||||||
affected_statuses_tasks.insert(*status, self.queue.tasks.get_status(wtxn, *status)?);
|
affected_statuses_tasks.insert(*status, self.queue.tasks.get_status(&rtxn, *status)?);
|
||||||
}
|
}
|
||||||
let mut to_remove_from_statuses = HashMap::new();
|
let mut to_remove_from_statuses = HashMap::new();
|
||||||
|
|
||||||
let mut affected_kinds_tasks = HashMap::new();
|
let mut affected_kinds_tasks = HashMap::new();
|
||||||
for kind in &affected_kinds {
|
for kind in &affected_kinds {
|
||||||
affected_kinds_tasks.insert(*kind, self.queue.tasks.get_kind(wtxn, *kind)?);
|
affected_kinds_tasks.insert(*kind, self.queue.tasks.get_kind(&rtxn, *kind)?);
|
||||||
}
|
}
|
||||||
let mut to_remove_from_kinds = HashMap::new();
|
let mut to_remove_from_kinds = HashMap::new();
|
||||||
|
|
||||||
let mut enqueued_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
let mut batches_enqueued_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
||||||
let mut min_enqueued = i128::MAX;
|
let mut batches_min_enqueued = i128::MAX;
|
||||||
let mut max_enqueued = i128::MIN;
|
let mut batches_max_enqueued = i128::MIN;
|
||||||
let mut started_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
let mut batches_started_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
||||||
let mut min_started = i128::MAX;
|
let mut batches_min_started = i128::MAX;
|
||||||
let mut max_started = i128::MIN;
|
let mut batches_max_started = i128::MIN;
|
||||||
let mut finished_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
let mut batches_finished_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
||||||
let mut min_finished = i128::MAX;
|
let mut batches_min_finished = i128::MAX;
|
||||||
let mut max_finished = i128::MIN;
|
let mut batches_max_finished = i128::MIN;
|
||||||
|
|
||||||
let mut batches_to_remove = RoaringBitmap::new();
|
let mut to_delete_batches = RoaringBitmap::new();
|
||||||
|
let mut tasks_to_remove_earlier = Vec::new();
|
||||||
|
|
||||||
|
progress.update_progress(TaskDeletionProgress::RetrievingBatches);
|
||||||
|
let (atomic_progress, batch_progress) = AtomicBatchStep::new(affected_batches.len() as u32);
|
||||||
|
progress.update_progress(batch_progress);
|
||||||
for (batch_id, to_delete_tasks) in affected_batches {
|
for (batch_id, to_delete_tasks) in affected_batches {
|
||||||
if let Some(mut tasks) = self.queue.batch_to_tasks_mapping.get(wtxn, &batch_id)? {
|
if let Some(mut tasks) = self.queue.batch_to_tasks_mapping.get(&rtxn, &batch_id)? {
|
||||||
tasks -= &to_delete_tasks;
|
tasks -= &to_delete_tasks;
|
||||||
// We must remove the batch entirely
|
// We must remove the batch entirely
|
||||||
if tasks.is_empty() {
|
if tasks.is_empty() {
|
||||||
if let Some(batch) = self.queue.batches.get_batch(wtxn, batch_id)? {
|
if let Some(batch) = self.queue.batches.get_batch(&rtxn, batch_id)? {
|
||||||
if let Some(BatchEnqueuedAt { earliest, oldest }) = batch.enqueued_at {
|
if let Some(BatchEnqueuedAt { earliest, oldest }) = batch.enqueued_at {
|
||||||
let earliest = earliest.unix_timestamp_nanos();
|
let earliest = earliest.unix_timestamp_nanos();
|
||||||
let oldest = oldest.unix_timestamp_nanos();
|
let oldest = oldest.unix_timestamp_nanos();
|
||||||
if earliest < min_enqueued {
|
if earliest < batches_min_enqueued {
|
||||||
min_enqueued = earliest;
|
batches_min_enqueued = earliest;
|
||||||
}
|
}
|
||||||
if oldest > max_enqueued {
|
if oldest > batches_max_enqueued {
|
||||||
max_enqueued = oldest;
|
batches_max_enqueued = oldest;
|
||||||
}
|
}
|
||||||
enqueued_to_remove.entry(earliest).or_default().insert(batch_id);
|
batches_enqueued_to_remove.entry(earliest).or_default().insert(batch_id);
|
||||||
enqueued_to_remove.entry(oldest).or_default().insert(batch_id);
|
batches_enqueued_to_remove.entry(oldest).or_default().insert(batch_id);
|
||||||
} else {
|
} else {
|
||||||
// If we don't have the enqueued at in the batch it means the database comes from the v1.12
|
// If we don't have the enqueued at in the batch it means the database comes from the v1.12
|
||||||
// and we still need to find the date by scrolling the database
|
// and we still need to find the date by scrolling the database
|
||||||
remove_n_tasks_datetime_earlier_than(
|
tasks_to_remove_earlier.push((
|
||||||
wtxn,
|
|
||||||
self.queue.batches.enqueued_at,
|
|
||||||
batch.started_at,
|
batch.started_at,
|
||||||
batch.stats.total_nb_tasks.clamp(1, 2) as usize,
|
batch.stats.total_nb_tasks.clamp(1, 2) as usize,
|
||||||
batch_id,
|
batch_id,
|
||||||
)?;
|
));
|
||||||
}
|
}
|
||||||
let started_at = batch.started_at.unix_timestamp_nanos();
|
let started_at = batch.started_at.unix_timestamp_nanos();
|
||||||
if started_at < min_started {
|
if started_at < batches_min_started {
|
||||||
min_started = started_at;
|
batches_min_started = started_at;
|
||||||
}
|
}
|
||||||
if started_at > max_started {
|
if started_at > batches_max_started {
|
||||||
max_started = started_at;
|
batches_max_started = started_at;
|
||||||
}
|
}
|
||||||
started_to_remove.entry(started_at).or_default().insert(batch_id);
|
batches_started_to_remove.entry(started_at).or_default().insert(batch_id);
|
||||||
if let Some(finished_at) = batch.finished_at {
|
if let Some(finished_at) = batch.finished_at {
|
||||||
let finished_at = finished_at.unix_timestamp_nanos();
|
let finished_at = finished_at.unix_timestamp_nanos();
|
||||||
if finished_at < min_finished {
|
if finished_at < batches_min_finished {
|
||||||
min_finished = finished_at;
|
batches_min_finished = finished_at;
|
||||||
}
|
}
|
||||||
if finished_at > max_finished {
|
if finished_at > batches_max_finished {
|
||||||
max_finished = finished_at;
|
batches_max_finished = finished_at;
|
||||||
}
|
}
|
||||||
finished_to_remove.entry(finished_at).or_default().insert(batch_id);
|
batches_finished_to_remove.entry(finished_at).or_default().insert(batch_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
batches_to_remove.insert(batch_id);
|
to_delete_batches.insert(batch_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -836,7 +765,79 @@ impl IndexScheduler {
|
|||||||
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
for range in consecutive_ranges(batches_to_remove.iter()) {
|
// Note: don't delete the persisted task data since
|
||||||
|
// we can only delete succeeded, failed, and canceled tasks.
|
||||||
|
// In each of those cases, the persisted data is supposed to
|
||||||
|
// have been deleted already.
|
||||||
|
|
||||||
|
drop(rtxn);
|
||||||
|
let mut wtxn_owned = self.env.write_txn()?;
|
||||||
|
let wtxn = &mut wtxn_owned;
|
||||||
|
|
||||||
|
progress.update_progress(TaskDeletionProgress::DeletingTasksDateTime);
|
||||||
|
remove_datetimes(
|
||||||
|
wtxn,
|
||||||
|
min_enqueued..=max_enqueued,
|
||||||
|
tasks_enqueued_to_remove,
|
||||||
|
self.queue.batches.enqueued_at,
|
||||||
|
)?;
|
||||||
|
remove_datetimes(
|
||||||
|
wtxn,
|
||||||
|
min_started..=max_started,
|
||||||
|
tasks_started_to_remove,
|
||||||
|
self.queue.batches.started_at,
|
||||||
|
)?;
|
||||||
|
remove_datetimes(
|
||||||
|
wtxn,
|
||||||
|
min_finished..=max_finished,
|
||||||
|
tasks_finished_to_remove,
|
||||||
|
self.queue.batches.finished_at,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
progress.update_progress(TaskDeletionProgress::DeletingTasksMetadata);
|
||||||
|
let (atomic_progress, task_progress) = AtomicTaskStep::new(
|
||||||
|
(affected_indexes.len() + affected_statuses.len() + affected_kinds.len()) as u32,
|
||||||
|
);
|
||||||
|
progress.update_progress(task_progress);
|
||||||
|
for index in affected_indexes.iter() {
|
||||||
|
self.queue.tasks.update_index(wtxn, index, |bitmap| *bitmap -= &to_delete_tasks)?;
|
||||||
|
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
for status in affected_statuses.iter() {
|
||||||
|
self.queue.tasks.update_status(wtxn, *status, |bitmap| *bitmap -= &to_delete_tasks)?;
|
||||||
|
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
for kind in affected_kinds.iter() {
|
||||||
|
self.queue.tasks.update_kind(wtxn, *kind, |bitmap| *bitmap -= &to_delete_tasks)?;
|
||||||
|
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
progress.update_progress(TaskDeletionProgress::DeletingTasks);
|
||||||
|
let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u32);
|
||||||
|
progress.update_progress(task_progress);
|
||||||
|
for range in consecutive_ranges(to_delete_tasks.iter()) {
|
||||||
|
self.queue
|
||||||
|
.tasks
|
||||||
|
.all_tasks
|
||||||
|
.delete_range(wtxn, &(Bound::Included(range.0), Bound::Included(range.1)))?;
|
||||||
|
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
for canceled_by in affected_canceled_by {
|
||||||
|
if let Some(mut tasks) = self.queue.tasks.canceled_by.get(wtxn, &canceled_by)? {
|
||||||
|
tasks -= &to_delete_tasks;
|
||||||
|
if tasks.is_empty() {
|
||||||
|
self.queue.tasks.canceled_by.delete(wtxn, &canceled_by)?;
|
||||||
|
} else {
|
||||||
|
self.queue.tasks.canceled_by.put(wtxn, &canceled_by, &tasks)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
progress.update_progress(TaskDeletionProgress::DeletingBatches);
|
||||||
|
|
||||||
|
for range in consecutive_ranges(to_delete_batches.iter()) {
|
||||||
self.queue
|
self.queue
|
||||||
.batches
|
.batches
|
||||||
.all_batches
|
.all_batches
|
||||||
@ -846,25 +847,39 @@ impl IndexScheduler {
|
|||||||
.delete_range(wtxn, &(Bound::Included(range.0), Bound::Included(range.1)))?;
|
.delete_range(wtxn, &(Bound::Included(range.0), Bound::Included(range.1)))?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
progress.update_progress(TaskDeletionProgress::DeletingBatchesDateTime);
|
||||||
|
|
||||||
|
for (started_at, nb_tasks, batch_id) in tasks_to_remove_earlier {
|
||||||
|
remove_n_tasks_datetime_earlier_than(
|
||||||
|
wtxn,
|
||||||
|
self.queue.batches.enqueued_at,
|
||||||
|
started_at,
|
||||||
|
nb_tasks,
|
||||||
|
batch_id,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
|
||||||
remove_datetimes(
|
remove_datetimes(
|
||||||
wtxn,
|
wtxn,
|
||||||
min_enqueued..=max_enqueued,
|
batches_min_enqueued..=batches_max_enqueued,
|
||||||
enqueued_to_remove,
|
batches_enqueued_to_remove,
|
||||||
self.queue.batches.enqueued_at,
|
self.queue.batches.enqueued_at,
|
||||||
)?;
|
)?;
|
||||||
remove_datetimes(
|
remove_datetimes(
|
||||||
wtxn,
|
wtxn,
|
||||||
min_started..=max_started,
|
batches_min_started..=batches_max_started,
|
||||||
started_to_remove,
|
batches_started_to_remove,
|
||||||
self.queue.batches.started_at,
|
self.queue.batches.started_at,
|
||||||
)?;
|
)?;
|
||||||
remove_datetimes(
|
remove_datetimes(
|
||||||
wtxn,
|
wtxn,
|
||||||
min_finished..=max_finished,
|
batches_min_finished..=batches_max_finished,
|
||||||
finished_to_remove,
|
batches_finished_to_remove,
|
||||||
self.queue.batches.finished_at,
|
self.queue.batches.finished_at,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
progress.update_progress(TaskDeletionProgress::DeletingBatchesMetadata);
|
||||||
|
|
||||||
for (index, batches) in to_remove_from_indexes {
|
for (index, batches) in to_remove_from_indexes {
|
||||||
self.queue.tasks.update_index(wtxn, index, |index_tasks| {
|
self.queue.tasks.update_index(wtxn, index, |index_tasks| {
|
||||||
*index_tasks -= &batches;
|
*index_tasks -= &batches;
|
||||||
@ -885,7 +900,7 @@ impl IndexScheduler {
|
|||||||
|
|
||||||
wtxn_owned.commit()?;
|
wtxn_owned.commit()?;
|
||||||
|
|
||||||
Ok(tasks_to_remove)
|
Ok(to_delete_tasks)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Cancel each given task from all the databases (if it is cancelable).
|
/// Cancel each given task from all the databases (if it is cancelable).
|
||||||
|
Reference in New Issue
Block a user