mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-09-05 20:26:31 +00:00
Optim G: Use the datetime delete function on batches
This commit is contained in:
@ -1,12 +1,12 @@
|
|||||||
use std::collections::{BTreeSet, HashMap, HashSet};
|
use std::collections::{BTreeSet, HashMap, HashSet};
|
||||||
use std::ops::Bound;
|
use std::ops::{Bound, RangeInclusive};
|
||||||
use std::panic::{catch_unwind, AssertUnwindSafe};
|
use std::panic::{catch_unwind, AssertUnwindSafe};
|
||||||
use std::sync::atomic::Ordering;
|
use std::sync::atomic::Ordering;
|
||||||
|
|
||||||
use meilisearch_types::batches::{BatchEnqueuedAt, BatchId};
|
use meilisearch_types::batches::{BatchEnqueuedAt, BatchId};
|
||||||
use meilisearch_types::heed::{RoTxn, RwTxn};
|
use meilisearch_types::heed::{Database, RoTxn, RwTxn};
|
||||||
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
|
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
|
||||||
use meilisearch_types::milli::{self, ChannelCongestion};
|
use meilisearch_types::milli::{self, CboRoaringBitmapCodec, ChannelCongestion};
|
||||||
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task};
|
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task};
|
||||||
use meilisearch_types::versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
|
use meilisearch_types::versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
|
||||||
use milli::update::Settings as MilliSettings;
|
use milli::update::Settings as MilliSettings;
|
||||||
@ -22,7 +22,7 @@ use crate::utils::{
|
|||||||
remove_n_tasks_datetime_earlier_than, remove_task_datetime, swap_index_uid_in_task,
|
remove_n_tasks_datetime_earlier_than, remove_task_datetime, swap_index_uid_in_task,
|
||||||
ProcessingBatch,
|
ProcessingBatch,
|
||||||
};
|
};
|
||||||
use crate::{Error, IndexScheduler, Result, TaskId};
|
use crate::{Error, IndexScheduler, Result, TaskId, BEI128};
|
||||||
|
|
||||||
#[derive(Debug, Default)]
|
#[derive(Debug, Default)]
|
||||||
pub struct ProcessBatchInfo {
|
pub struct ProcessBatchInfo {
|
||||||
@ -102,8 +102,7 @@ impl IndexScheduler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut deleted_tasks =
|
let mut deleted_tasks = self.delete_matched_tasks(&matched_tasks, &progress)?;
|
||||||
self.delete_matched_tasks(&matched_tasks, &progress)?;
|
|
||||||
|
|
||||||
for task in tasks.iter_mut() {
|
for task in tasks.iter_mut() {
|
||||||
task.status = Status::Succeeded;
|
task.status = Status::Succeeded;
|
||||||
@ -537,6 +536,32 @@ impl IndexScheduler {
|
|||||||
Some((start, end))
|
Some((start, end))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn remove_datetimes(
|
||||||
|
wtxn: &mut RwTxn<'_>,
|
||||||
|
range: RangeInclusive<i128>,
|
||||||
|
mut to_remove: HashMap<i128, RoaringBitmap>,
|
||||||
|
db: Database<BEI128, CboRoaringBitmapCodec>,
|
||||||
|
) -> Result<()> {
|
||||||
|
if !to_remove.is_empty() {
|
||||||
|
let mut iter = db.rev_range_mut(wtxn, &range)?;
|
||||||
|
while let Some(i) = iter.next() {
|
||||||
|
let (timestamp, mut tasks) = i?;
|
||||||
|
if let Some(to_remove_tasks) = to_remove.remove(×tamp) {
|
||||||
|
tasks -= &to_remove_tasks;
|
||||||
|
if tasks.is_empty() {
|
||||||
|
// safety: We don't keep references to the database
|
||||||
|
unsafe { iter.del_current()? };
|
||||||
|
} else {
|
||||||
|
// safety: We don't keep references to the database
|
||||||
|
unsafe { iter.put_current(×tamp, &tasks)? };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
progress.update_progress(TaskDeletionProgress::RetrievingTasks);
|
progress.update_progress(TaskDeletionProgress::RetrievingTasks);
|
||||||
|
|
||||||
let rtxn = self.env.read_txn()?;
|
let rtxn = self.env.read_txn()?;
|
||||||
@ -562,6 +587,10 @@ impl IndexScheduler {
|
|||||||
progress.update_progress(task_progress);
|
progress.update_progress(task_progress);
|
||||||
let mut min_enqueued = i128::MAX;
|
let mut min_enqueued = i128::MAX;
|
||||||
let mut max_enqueued = i128::MIN;
|
let mut max_enqueued = i128::MIN;
|
||||||
|
let mut min_started = i128::MAX;
|
||||||
|
let mut max_started = i128::MIN;
|
||||||
|
let mut min_finished = i128::MAX;
|
||||||
|
let mut max_finished = i128::MIN;
|
||||||
let mut enqueued_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
let mut enqueued_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
||||||
let mut started_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
let mut started_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
||||||
let mut finished_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
let mut finished_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
||||||
@ -589,22 +618,22 @@ impl IndexScheduler {
|
|||||||
|
|
||||||
if let Some(started_at) = task.started_at {
|
if let Some(started_at) = task.started_at {
|
||||||
let started_at = started_at.unix_timestamp_nanos();
|
let started_at = started_at.unix_timestamp_nanos();
|
||||||
if started_at < min_enqueued {
|
if started_at < min_started {
|
||||||
min_enqueued = started_at;
|
min_started = started_at;
|
||||||
}
|
}
|
||||||
if started_at > max_enqueued {
|
if started_at > max_started {
|
||||||
max_enqueued = started_at;
|
max_started = started_at;
|
||||||
}
|
}
|
||||||
started_to_remove.entry(started_at).or_default().insert(task_id);
|
started_to_remove.entry(started_at).or_default().insert(task_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(finished_at) = task.finished_at {
|
if let Some(finished_at) = task.finished_at {
|
||||||
let finished_at = finished_at.unix_timestamp_nanos();
|
let finished_at = finished_at.unix_timestamp_nanos();
|
||||||
if finished_at < min_enqueued {
|
if finished_at < min_finished {
|
||||||
min_enqueued = finished_at;
|
min_finished = finished_at;
|
||||||
}
|
}
|
||||||
if finished_at > max_enqueued {
|
if finished_at > max_finished {
|
||||||
max_enqueued = finished_at;
|
max_finished = finished_at;
|
||||||
}
|
}
|
||||||
finished_to_remove.entry(finished_at).or_default().insert(task_id);
|
finished_to_remove.entry(finished_at).or_default().insert(task_id);
|
||||||
}
|
}
|
||||||
@ -629,28 +658,24 @@ impl IndexScheduler {
|
|||||||
let wtxn = &mut wtxn_owned;
|
let wtxn = &mut wtxn_owned;
|
||||||
|
|
||||||
progress.update_progress(TaskDeletionProgress::DeletingTasksDateTime);
|
progress.update_progress(TaskDeletionProgress::DeletingTasksDateTime);
|
||||||
for (mut to_remove, db) in [
|
remove_datetimes(
|
||||||
(enqueued_to_remove, &self.queue.tasks.enqueued_at),
|
wtxn,
|
||||||
(started_to_remove, &self.queue.tasks.started_at),
|
min_enqueued..=max_enqueued,
|
||||||
(finished_to_remove, &self.queue.tasks.finished_at),
|
enqueued_to_remove,
|
||||||
] {
|
self.queue.batches.enqueued_at,
|
||||||
if !to_remove.is_empty() {
|
)?;
|
||||||
let mut iter = db.rev_range_mut(wtxn, &(min_enqueued..=max_enqueued))?;
|
remove_datetimes(
|
||||||
while let Some(i) = iter.next() {
|
wtxn,
|
||||||
let (timestamp, mut tasks) = i?;
|
min_started..=max_started,
|
||||||
if let Some(to_remove_tasks) = to_remove.remove(×tamp) {
|
started_to_remove,
|
||||||
tasks -= &to_remove_tasks;
|
self.queue.batches.started_at,
|
||||||
if tasks.is_empty() {
|
)?;
|
||||||
// safety: We don't keep references to the database
|
remove_datetimes(
|
||||||
unsafe { iter.del_current()? };
|
wtxn,
|
||||||
} else {
|
min_finished..=max_finished,
|
||||||
// safety: We don't keep references to the database
|
finished_to_remove,
|
||||||
unsafe { iter.put_current(×tamp, &tasks)? };
|
self.queue.batches.finished_at,
|
||||||
}
|
)?;
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
progress.update_progress(TaskDeletionProgress::DeletingTasksMetadata);
|
progress.update_progress(TaskDeletionProgress::DeletingTasksMetadata);
|
||||||
let (atomic_progress, task_progress) = AtomicTaskStep::new(
|
let (atomic_progress, task_progress) = AtomicTaskStep::new(
|
||||||
@ -715,6 +740,16 @@ impl IndexScheduler {
|
|||||||
}
|
}
|
||||||
let mut to_remove_from_kinds = HashMap::new();
|
let mut to_remove_from_kinds = HashMap::new();
|
||||||
|
|
||||||
|
let mut enqueued_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
||||||
|
let mut min_enqueued = i128::MAX;
|
||||||
|
let mut max_enqueued = i128::MIN;
|
||||||
|
let mut started_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
||||||
|
let mut min_started = i128::MAX;
|
||||||
|
let mut max_started = i128::MIN;
|
||||||
|
let mut finished_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
||||||
|
let mut min_finished = i128::MAX;
|
||||||
|
let mut max_finished = i128::MIN;
|
||||||
|
|
||||||
for (batch_id, to_delete_tasks) in affected_batches {
|
for (batch_id, to_delete_tasks) in affected_batches {
|
||||||
if let Some(mut tasks) = self.queue.batch_to_tasks_mapping.get(wtxn, &batch_id)? {
|
if let Some(mut tasks) = self.queue.batch_to_tasks_mapping.get(wtxn, &batch_id)? {
|
||||||
tasks -= &to_delete_tasks;
|
tasks -= &to_delete_tasks;
|
||||||
@ -722,18 +757,16 @@ impl IndexScheduler {
|
|||||||
if tasks.is_empty() {
|
if tasks.is_empty() {
|
||||||
if let Some(batch) = self.queue.batches.get_batch(wtxn, batch_id)? {
|
if let Some(batch) = self.queue.batches.get_batch(wtxn, batch_id)? {
|
||||||
if let Some(BatchEnqueuedAt { earliest, oldest }) = batch.enqueued_at {
|
if let Some(BatchEnqueuedAt { earliest, oldest }) = batch.enqueued_at {
|
||||||
remove_task_datetime(
|
let earliest = earliest.unix_timestamp_nanos();
|
||||||
wtxn,
|
let oldest = oldest.unix_timestamp_nanos();
|
||||||
self.queue.batches.enqueued_at,
|
if earliest < min_enqueued {
|
||||||
earliest,
|
min_enqueued = earliest;
|
||||||
batch_id,
|
}
|
||||||
)?;
|
if oldest > max_enqueued {
|
||||||
remove_task_datetime(
|
max_enqueued = oldest;
|
||||||
wtxn,
|
}
|
||||||
self.queue.batches.enqueued_at,
|
enqueued_to_remove.entry(earliest).or_default().insert(batch_id);
|
||||||
oldest,
|
enqueued_to_remove.entry(oldest).or_default().insert(batch_id);
|
||||||
batch_id,
|
|
||||||
)?;
|
|
||||||
} else {
|
} else {
|
||||||
// If we don't have the enqueued at in the batch it means the database comes from the v1.12
|
// If we don't have the enqueued at in the batch it means the database comes from the v1.12
|
||||||
// and we still need to find the date by scrolling the database
|
// and we still need to find the date by scrolling the database
|
||||||
@ -745,19 +778,23 @@ impl IndexScheduler {
|
|||||||
batch_id,
|
batch_id,
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
remove_task_datetime(
|
let started_at = batch.started_at.unix_timestamp_nanos();
|
||||||
wtxn,
|
if started_at < min_started {
|
||||||
self.queue.batches.started_at,
|
min_started = started_at;
|
||||||
batch.started_at,
|
}
|
||||||
batch_id,
|
if started_at > max_started {
|
||||||
)?;
|
max_started = started_at;
|
||||||
|
}
|
||||||
|
started_to_remove.entry(started_at).or_default().insert(batch_id);
|
||||||
if let Some(finished_at) = batch.finished_at {
|
if let Some(finished_at) = batch.finished_at {
|
||||||
remove_task_datetime(
|
let finished_at = finished_at.unix_timestamp_nanos();
|
||||||
wtxn,
|
if finished_at < min_finished {
|
||||||
self.queue.batches.finished_at,
|
min_finished = finished_at;
|
||||||
finished_at,
|
}
|
||||||
batch_id,
|
if finished_at > max_finished {
|
||||||
)?;
|
max_finished = finished_at;
|
||||||
|
}
|
||||||
|
finished_to_remove.entry(finished_at).or_default().insert(batch_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
self.queue.batches.all_batches.delete(wtxn, &batch_id)?;
|
self.queue.batches.all_batches.delete(wtxn, &batch_id)?;
|
||||||
@ -798,6 +835,25 @@ impl IndexScheduler {
|
|||||||
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
remove_datetimes(
|
||||||
|
wtxn,
|
||||||
|
min_enqueued..=max_enqueued,
|
||||||
|
enqueued_to_remove,
|
||||||
|
self.queue.batches.enqueued_at,
|
||||||
|
)?;
|
||||||
|
remove_datetimes(
|
||||||
|
wtxn,
|
||||||
|
min_started..=max_started,
|
||||||
|
started_to_remove,
|
||||||
|
self.queue.batches.started_at,
|
||||||
|
)?;
|
||||||
|
remove_datetimes(
|
||||||
|
wtxn,
|
||||||
|
min_finished..=max_finished,
|
||||||
|
finished_to_remove,
|
||||||
|
self.queue.batches.finished_at,
|
||||||
|
)?;
|
||||||
|
|
||||||
for (index, batches) in to_remove_from_indexes {
|
for (index, batches) in to_remove_from_indexes {
|
||||||
self.queue.tasks.update_index(wtxn, index, |index_tasks| {
|
self.queue.tasks.update_index(wtxn, index, |index_tasks| {
|
||||||
*index_tasks -= &batches;
|
*index_tasks -= &batches;
|
||||||
|
Reference in New Issue
Block a user