mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-09-05 20:26:31 +00:00
Don't split read and write, it's bad for the cache
This commit is contained in:
@ -83,8 +83,6 @@ make_enum_progress! {
|
||||
pub enum TaskDeletionProgress {
|
||||
RetrievingTasks,
|
||||
RetrievingBatchTasks,
|
||||
RetrievingTaskDateTimes,
|
||||
RetrievingBatchDateTimes,
|
||||
DeletingTasksDateTime,
|
||||
DeletingBatchesDateTime,
|
||||
DeletingTasksMetadata,
|
||||
|
@ -535,15 +535,13 @@ impl IndexScheduler {
|
||||
})
|
||||
}
|
||||
|
||||
type DateTimeChanges = (Vec<Range<i128>>, HashMap<i128, RoaringBitmap>);
|
||||
|
||||
fn prepare_remove_datetimes(
|
||||
rtxn: &RoTxn<'_>,
|
||||
fn remove_task_datetimes(
|
||||
wtxn: &mut RwTxn<'_>,
|
||||
mut to_remove: HashMap<i128, RoaringBitmap>,
|
||||
db: Database<BEI128, CboRoaringBitmapCodec>,
|
||||
) -> Result<DateTimeChanges> {
|
||||
) -> Result<()> {
|
||||
if to_remove.is_empty() {
|
||||
return Ok((Vec::new(), HashMap::new()));
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let min = to_remove.keys().min().cloned().unwrap_or(i128::MAX);
|
||||
@ -552,7 +550,7 @@ impl IndexScheduler {
|
||||
|
||||
// We iterate over the time database to see which ranges of timestamps need to be removed
|
||||
let lazy_db = db.lazily_decode_data();
|
||||
let iter = lazy_db.range(rtxn, &range)?;
|
||||
let iter = lazy_db.range(wtxn, &range)?;
|
||||
let mut delete_range_start = None;
|
||||
let mut delete_ranges = Vec::new();
|
||||
let mut to_put: HashMap<i128, RoaringBitmap> = HashMap::new();
|
||||
@ -578,20 +576,28 @@ impl IndexScheduler {
|
||||
delete_ranges.push(delete_range_start..i128::MAX);
|
||||
}
|
||||
|
||||
Ok((delete_ranges, to_put))
|
||||
for range in delete_ranges {
|
||||
db.delete_range(wtxn, &range)?;
|
||||
}
|
||||
|
||||
for (timestamp, data) in to_put {
|
||||
db.put(wtxn, ×tamp, &data)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn prepare_remove_batch_datetimes(
|
||||
rtxn: &RoTxn<'_>,
|
||||
fn remove_batch_datetimes(
|
||||
wtxn: &mut RwTxn<'_>,
|
||||
to_remove: &RoaringBitmap,
|
||||
db: Database<BEI128, CboRoaringBitmapCodec>,
|
||||
) -> Result<DateTimeChanges> {
|
||||
) -> Result<()> {
|
||||
if to_remove.is_empty() {
|
||||
return Ok((Vec::new(), HashMap::new()));
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// We iterate over the time database to see which ranges of timestamps need to be removed
|
||||
let iter = db.iter(rtxn)?;
|
||||
let iter = db.iter(wtxn)?;
|
||||
let mut delete_range_start = None;
|
||||
let mut delete_ranges = Vec::new();
|
||||
let mut to_put: HashMap<i128, RoaringBitmap> = HashMap::new();
|
||||
@ -616,14 +622,6 @@ impl IndexScheduler {
|
||||
delete_ranges.push(delete_range_start..i128::MAX);
|
||||
}
|
||||
|
||||
Ok((delete_ranges, to_put))
|
||||
}
|
||||
|
||||
fn remove_datetimes(
|
||||
wtxn: &mut RwTxn<'_>,
|
||||
(delete_ranges, to_put): DateTimeChanges,
|
||||
db: Database<BEI128, CboRoaringBitmapCodec>,
|
||||
) -> Result<()> {
|
||||
for range in delete_ranges {
|
||||
db.delete_range(wtxn, &range)?;
|
||||
}
|
||||
@ -771,46 +769,6 @@ impl IndexScheduler {
|
||||
}
|
||||
}
|
||||
|
||||
// 5. Prepare to remove the datetimes from the tasks
|
||||
progress.update_progress(TaskDeletionProgress::RetrievingTaskDateTimes);
|
||||
let task_enqueued_changes = prepare_remove_datetimes(
|
||||
&rtxn,
|
||||
tasks_enqueued_to_remove,
|
||||
self.queue.tasks.enqueued_at,
|
||||
)?;
|
||||
|
||||
let task_started_changes = prepare_remove_datetimes(
|
||||
&rtxn,
|
||||
tasks_started_to_remove,
|
||||
self.queue.tasks.started_at,
|
||||
)?;
|
||||
|
||||
let task_finished_changes = prepare_remove_datetimes(
|
||||
&rtxn,
|
||||
tasks_finished_to_remove,
|
||||
self.queue.tasks.finished_at,
|
||||
)?;
|
||||
|
||||
// 6. Prepare to remove the datetimes from the batches
|
||||
progress.update_progress(TaskDeletionProgress::RetrievingBatchDateTimes);
|
||||
let batch_enqueued_changes = prepare_remove_batch_datetimes(
|
||||
&rtxn,
|
||||
&to_delete_batches,
|
||||
self.queue.batches.enqueued_at,
|
||||
)?;
|
||||
|
||||
let batch_started_changes = prepare_remove_batch_datetimes(
|
||||
&rtxn,
|
||||
&to_delete_batches,
|
||||
self.queue.batches.started_at,
|
||||
)?;
|
||||
|
||||
let batch_finished_changes = prepare_remove_batch_datetimes(
|
||||
&rtxn,
|
||||
&to_delete_batches,
|
||||
self.queue.batches.finished_at,
|
||||
)?;
|
||||
|
||||
drop(rtxn);
|
||||
let mut owned_wtxn = self.env.write_txn()?;
|
||||
let wtxn = &mut owned_wtxn;
|
||||
@ -818,30 +776,28 @@ impl IndexScheduler {
|
||||
// 7. Remove task datetimes
|
||||
progress.update_progress(TaskDeletionProgress::DeletingTasksDateTime);
|
||||
|
||||
remove_datetimes(wtxn, task_enqueued_changes, self.queue.tasks.enqueued_at)?;
|
||||
|
||||
remove_datetimes(wtxn, task_started_changes, self.queue.tasks.started_at)?;
|
||||
|
||||
remove_datetimes(wtxn, task_finished_changes, self.queue.tasks.finished_at)?;
|
||||
remove_task_datetimes(wtxn, tasks_enqueued_to_remove, self.queue.tasks.enqueued_at)?;
|
||||
remove_task_datetimes(wtxn, tasks_started_to_remove, self.queue.tasks.started_at)?;
|
||||
remove_task_datetimes(wtxn, tasks_finished_to_remove, self.queue.tasks.finished_at)?;
|
||||
|
||||
// 8. Delete batches datetimes
|
||||
progress.update_progress(TaskDeletionProgress::DeletingBatchesDateTime);
|
||||
|
||||
remove_datetimes(
|
||||
remove_batch_datetimes(
|
||||
wtxn,
|
||||
batch_enqueued_changes,
|
||||
&to_delete_batches,
|
||||
self.queue.batches.enqueued_at,
|
||||
)?;
|
||||
|
||||
remove_datetimes(
|
||||
remove_batch_datetimes(
|
||||
wtxn,
|
||||
batch_started_changes,
|
||||
&to_delete_batches,
|
||||
self.queue.batches.started_at,
|
||||
)?;
|
||||
|
||||
remove_datetimes(
|
||||
remove_batch_datetimes(
|
||||
wtxn,
|
||||
batch_finished_changes,
|
||||
&to_delete_batches,
|
||||
self.queue.batches.finished_at,
|
||||
)?;
|
||||
|
||||
|
Reference in New Issue
Block a user