mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-09-06 04:36:32 +00:00
Optim K: Make iterations sequential
This commit is contained in:
@ -1,7 +1,7 @@
|
|||||||
use std::collections::{BTreeSet, HashMap, HashSet};
|
use std::collections::{BTreeSet, HashMap, HashSet};
|
||||||
use std::ops::RangeInclusive;
|
use std::ops::RangeInclusive;
|
||||||
use std::panic::{catch_unwind, AssertUnwindSafe};
|
use std::panic::{catch_unwind, AssertUnwindSafe};
|
||||||
use std::sync::atomic::Ordering;
|
use std::sync::atomic::{self, Ordering};
|
||||||
|
|
||||||
use meilisearch_types::batches::{BatchEnqueuedAt, BatchId};
|
use meilisearch_types::batches::{BatchEnqueuedAt, BatchId};
|
||||||
use meilisearch_types::heed::{Database, RoTxn, RwTxn};
|
use meilisearch_types::heed::{Database, RoTxn, RwTxn};
|
||||||
@ -668,7 +668,7 @@ impl IndexScheduler {
|
|||||||
}
|
}
|
||||||
let mut to_remove_from_kinds = HashMap::new();
|
let mut to_remove_from_kinds = HashMap::new();
|
||||||
|
|
||||||
// 3. Read affected batches to list metadata that needs to be updated.
|
// 4. Read affected batches to list metadata that needs to be updated.
|
||||||
let mut batches_enqueued_range = i128::MAX..=i128::MIN;
|
let mut batches_enqueued_range = i128::MAX..=i128::MIN;
|
||||||
let mut batches_started_range = i128::MAX..=i128::MIN;
|
let mut batches_started_range = i128::MAX..=i128::MIN;
|
||||||
let mut batches_finished_range = i128::MAX..=i128::MIN;
|
let mut batches_finished_range = i128::MAX..=i128::MIN;
|
||||||
@ -689,46 +689,7 @@ impl IndexScheduler {
|
|||||||
let to_delete_tasks = affected_batches.remove(&batch_id).unwrap_or_default();
|
let to_delete_tasks = affected_batches.remove(&batch_id).unwrap_or_default();
|
||||||
tasks -= &to_delete_tasks;
|
tasks -= &to_delete_tasks;
|
||||||
|
|
||||||
// We must remove the batch entirely
|
// We must remove the batch from all the reverse indexes it no longer has tasks for.
|
||||||
if tasks.is_empty() {
|
|
||||||
if let Some(batch) = self.queue.batches.get_batch(&rtxn, batch_id)? {
|
|
||||||
if let Some(BatchEnqueuedAt { earliest, oldest }) = batch.enqueued_at {
|
|
||||||
let earliest = earliest.unix_timestamp_nanos();
|
|
||||||
let oldest = oldest.unix_timestamp_nanos();
|
|
||||||
extend_range(earliest, &mut batches_enqueued_range);
|
|
||||||
extend_range(oldest, &mut batches_enqueued_range);
|
|
||||||
batches_enqueued_to_remove
|
|
||||||
.entry(earliest)
|
|
||||||
.or_default()
|
|
||||||
.insert(batch_id);
|
|
||||||
batches_enqueued_to_remove.entry(oldest).or_default().insert(batch_id);
|
|
||||||
} else {
|
|
||||||
// If we don't have the enqueued at in the batch it means the database comes from the v1.12
|
|
||||||
// and we still need to find the date by scrolling the database
|
|
||||||
tasks_to_remove_earlier.push((
|
|
||||||
batch.started_at,
|
|
||||||
batch.stats.total_nb_tasks.clamp(1, 2) as usize,
|
|
||||||
batch_id,
|
|
||||||
));
|
|
||||||
}
|
|
||||||
let started_at = batch.started_at.unix_timestamp_nanos();
|
|
||||||
extend_range(started_at, &mut batches_started_range);
|
|
||||||
batches_started_to_remove.entry(started_at).or_default().insert(batch_id);
|
|
||||||
if let Some(finished_at) = batch.finished_at {
|
|
||||||
let finished_at = finished_at.unix_timestamp_nanos();
|
|
||||||
extend_range(finished_at, &mut batches_finished_range);
|
|
||||||
batches_finished_to_remove
|
|
||||||
.entry(finished_at)
|
|
||||||
.or_default()
|
|
||||||
.insert(batch_id);
|
|
||||||
}
|
|
||||||
|
|
||||||
to_delete_batches.insert(batch_id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Anyway, we must remove the batch from all its reverse indexes.
|
|
||||||
// Check if those are affected by the task deletion.
|
|
||||||
|
|
||||||
for (index, index_tasks) in affected_indexes_tasks.iter() {
|
for (index, index_tasks) in affected_indexes_tasks.iter() {
|
||||||
if index_tasks.intersection_len(&tasks) == 0 {
|
if index_tasks.intersection_len(&tasks) == 0 {
|
||||||
@ -762,6 +723,44 @@ impl IndexScheduler {
|
|||||||
// In each of those cases, the persisted data is supposed to
|
// In each of those cases, the persisted data is supposed to
|
||||||
// have been deleted already.
|
// have been deleted already.
|
||||||
|
|
||||||
|
// We must remove the batch entirely
|
||||||
|
if tasks.is_empty() {
|
||||||
|
to_delete_batches.insert(batch_id);
|
||||||
|
} else {
|
||||||
|
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for range in consecutive_ranges(to_delete_batches.iter()) {
|
||||||
|
let iter = self.queue.batches.all_batches.range(&rtxn, &range)?;
|
||||||
|
for batch in iter {
|
||||||
|
let (batch_id, batch) = batch?;
|
||||||
|
|
||||||
|
if let Some(BatchEnqueuedAt { earliest, oldest }) = batch.enqueued_at {
|
||||||
|
let earliest = earliest.unix_timestamp_nanos();
|
||||||
|
let oldest = oldest.unix_timestamp_nanos();
|
||||||
|
extend_range(earliest, &mut batches_enqueued_range);
|
||||||
|
extend_range(oldest, &mut batches_enqueued_range);
|
||||||
|
batches_enqueued_to_remove.entry(earliest).or_default().insert(batch_id);
|
||||||
|
batches_enqueued_to_remove.entry(oldest).or_default().insert(batch_id);
|
||||||
|
} else {
|
||||||
|
// If we don't have the enqueued at in the batch it means the database comes from the v1.12
|
||||||
|
// and we still need to find the date by scrolling the database
|
||||||
|
tasks_to_remove_earlier.push((
|
||||||
|
batch.started_at,
|
||||||
|
batch.stats.total_nb_tasks.clamp(1, 2) as usize,
|
||||||
|
batch_id,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
let started_at = batch.started_at.unix_timestamp_nanos();
|
||||||
|
extend_range(started_at, &mut batches_started_range);
|
||||||
|
batches_started_to_remove.entry(started_at).or_default().insert(batch_id);
|
||||||
|
if let Some(finished_at) = batch.finished_at {
|
||||||
|
let finished_at = finished_at.unix_timestamp_nanos();
|
||||||
|
extend_range(finished_at, &mut batches_finished_range);
|
||||||
|
batches_finished_to_remove.entry(finished_at).or_default().insert(batch_id);
|
||||||
|
}
|
||||||
|
|
||||||
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -770,7 +769,7 @@ impl IndexScheduler {
|
|||||||
let mut owned_wtxn = self.env.write_txn()?;
|
let mut owned_wtxn = self.env.write_txn()?;
|
||||||
let wtxn = &mut owned_wtxn;
|
let wtxn = &mut owned_wtxn;
|
||||||
|
|
||||||
// 4. Remove task datetimes
|
// 5. Remove task datetimes
|
||||||
progress.update_progress(TaskDeletionProgress::DeletingTasksDateTime);
|
progress.update_progress(TaskDeletionProgress::DeletingTasksDateTime);
|
||||||
|
|
||||||
remove_datetimes(
|
remove_datetimes(
|
||||||
@ -794,7 +793,7 @@ impl IndexScheduler {
|
|||||||
self.queue.tasks.finished_at,
|
self.queue.tasks.finished_at,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
// 8. Delete batches datetimes
|
// 6. Delete batches datetimes
|
||||||
progress.update_progress(TaskDeletionProgress::DeletingBatchesDateTime);
|
progress.update_progress(TaskDeletionProgress::DeletingBatchesDateTime);
|
||||||
|
|
||||||
for (started_at, nb_tasks, batch_id) in tasks_to_remove_earlier {
|
for (started_at, nb_tasks, batch_id) in tasks_to_remove_earlier {
|
||||||
@ -828,7 +827,7 @@ impl IndexScheduler {
|
|||||||
self.queue.batches.finished_at,
|
self.queue.batches.finished_at,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
// 5. Remove tasks from indexes, statuses, and kinds
|
// 7. Remove tasks from indexes, statuses, and kinds
|
||||||
progress.update_progress(TaskDeletionProgress::DeletingTasksMetadata);
|
progress.update_progress(TaskDeletionProgress::DeletingTasksMetadata);
|
||||||
let (atomic_progress, task_progress) = AtomicTaskStep::new(
|
let (atomic_progress, task_progress) = AtomicTaskStep::new(
|
||||||
(affected_indexes.len() + affected_statuses.len() + affected_kinds.len()) as u32,
|
(affected_indexes.len() + affected_statuses.len() + affected_kinds.len()) as u32,
|
||||||
@ -850,7 +849,7 @@ impl IndexScheduler {
|
|||||||
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 9. Remove batches metadata from indexes, statuses, and kinds
|
// 8. Remove batches metadata from indexes, statuses, and kinds
|
||||||
progress.update_progress(TaskDeletionProgress::DeletingBatchesMetadata);
|
progress.update_progress(TaskDeletionProgress::DeletingBatchesMetadata);
|
||||||
|
|
||||||
for (index, batches) in to_remove_from_indexes {
|
for (index, batches) in to_remove_from_indexes {
|
||||||
@ -871,7 +870,7 @@ impl IndexScheduler {
|
|||||||
})?;
|
})?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 6. Delete tasks
|
// 9. Delete tasks
|
||||||
progress.update_progress(TaskDeletionProgress::DeletingTasks);
|
progress.update_progress(TaskDeletionProgress::DeletingTasks);
|
||||||
let (atomic_progress, task_progress) =
|
let (atomic_progress, task_progress) =
|
||||||
AtomicTaskStep::new((to_delete_tasks.len() + affected_canceled_by.len()) as u32);
|
AtomicTaskStep::new((to_delete_tasks.len() + affected_canceled_by.len()) as u32);
|
||||||
@ -893,7 +892,7 @@ impl IndexScheduler {
|
|||||||
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 7. Delete batches
|
// 10. Delete batches
|
||||||
progress.update_progress(TaskDeletionProgress::DeletingBatches);
|
progress.update_progress(TaskDeletionProgress::DeletingBatches);
|
||||||
let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_batches.len() as u32);
|
let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_batches.len() as u32);
|
||||||
progress.update_progress(task_progress);
|
progress.update_progress(task_progress);
|
||||||
|
Reference in New Issue
Block a user