Implement the index compaction task

This commit is contained in:
Kerollmops
2025-10-06 18:17:59 +02:00
committed by Clément Renault
parent cf55420d25
commit c8b481e86f
2 changed files with 107 additions and 4 deletions

View File

@@ -138,6 +138,16 @@ make_enum_progress! {
}
}
make_enum_progress! {
pub enum IndexCompaction {
RetrieveTheIndex,
CreateTemporaryFile,
CopyAndCompactTheIndex,
PersistTheCompactedIndex,
CloseTheIndex,
}
}
make_enum_progress! {
pub enum InnerSwappingTwoIndexes {
RetrieveTheTasks,

View File

@@ -1,22 +1,26 @@
use std::collections::{BTreeSet, HashMap, HashSet};
use std::io::{Seek, SeekFrom};
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::sync::atomic::Ordering;
use byte_unit::Byte;
use meilisearch_types::batches::{BatchEnqueuedAt, BatchId};
use meilisearch_types::heed::{RoTxn, RwTxn};
use meilisearch_types::milli::heed::CompactionOption;
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
use meilisearch_types::milli::{self, ChannelCongestion};
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task};
use meilisearch_types::versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
use milli::update::Settings as MilliSettings;
use roaring::RoaringBitmap;
use tempfile::PersistError;
use time::OffsetDateTime;
use super::create_batch::Batch;
use crate::processing::{
AtomicBatchStep, AtomicTaskStep, CreateIndexProgress, DeleteIndexProgress, FinalizingIndexStep,
InnerSwappingTwoIndexes, SwappingTheIndexes, TaskCancelationProgress, TaskDeletionProgress,
UpdateIndexProgress,
IndexCompaction, InnerSwappingTwoIndexes, SwappingTheIndexes, TaskCancelationProgress,
TaskDeletionProgress, UpdateIndexProgress,
};
use crate::utils::{
self, remove_n_tasks_datetime_earlier_than, remove_task_datetime, swap_index_uid_in_task,
@@ -418,8 +422,45 @@ impl IndexScheduler {
task.status = Status::Succeeded;
Ok((vec![task], ProcessBatchInfo::default()))
}
Batch::IndexCompaction { index_uid, mut task } => {
todo!("Implement index compaction")
Batch::IndexCompaction { index_uid: _, mut task } => {
let KindWithContent::IndexCompaction { index_uid } = &task.kind else {
unreachable!()
};
let rtxn = self.env.read_txn()?;
let ret = catch_unwind(AssertUnwindSafe(|| {
self.apply_compaction(&rtxn, &progress, index_uid)
}));
let (pre_size, post_size) = match ret {
Ok(Ok(stats)) => stats,
Ok(Err(Error::AbortedTask)) => return Err(Error::AbortedTask),
Ok(Err(e)) => return Err(e),
Err(e) => {
let msg = match e.downcast_ref::<&'static str>() {
Some(s) => *s,
None => match e.downcast_ref::<String>() {
Some(s) => &s[..],
None => "Box<dyn Any>",
},
};
return Err(Error::Export(Box::new(Error::ProcessBatchPanicked(
msg.to_string(),
))));
}
};
task.status = Status::Succeeded;
if let Some(Details::IndexCompaction {
index_uid: _,
pre_compaction_size,
post_compaction_size,
}) = task.details.as_mut()
{
*pre_compaction_size = Some(Byte::from_u64(pre_size));
*post_compaction_size = Some(Byte::from_u64(post_size));
}
Ok((vec![task], ProcessBatchInfo::default()))
}
Batch::Export { mut task } => {
let KindWithContent::Export { url, api_key, payload_size, indexes } = &task.kind
@@ -496,6 +537,58 @@ impl IndexScheduler {
}
}
fn apply_compaction(
&self,
rtxn: &RoTxn,
progress: &Progress,
index_uid: &str,
) -> Result<(u64, u64)> {
// 1. Verify that the index exists
if !self.index_mapper.index_exists(rtxn, index_uid)? {
return Err(Error::IndexNotFound(index_uid.to_owned()));
}
// 2. We retrieve the index and create a temporary file in the index directory
progress.update_progress(IndexCompaction::RetrieveTheIndex);
let index = self.index_mapper.index(rtxn, index_uid)?;
let pre_size = index.map_size() as u64;
progress.update_progress(IndexCompaction::CreateTemporaryFile);
let mut file = tempfile::Builder::new()
.suffix("data.")
.prefix(".mdb.cpy")
.tempfile_in(index.path())?;
// 3. We copy the index data to the temporary file
progress.update_progress(IndexCompaction::CopyAndCompactTheIndex);
index
.copy_to_file(file.as_file_mut(), CompactionOption::Enabled)
.map_err(|error| Error::Milli { error, index_uid: Some(index_uid.to_string()) })?;
// ...and reset the file position as specified in the documentation
file.seek(SeekFrom::Start(0))?;
// 4. We replace the index data file with the temporary file
progress.update_progress(IndexCompaction::PersistTheCompactedIndex);
let post_size = file.as_file().metadata()?.len();
match file.persist(index.path().join("data.mdb")) {
Ok(file) => file.sync_all()?,
// TODO see if we have a _resource busy_ error and probably handle this by:
// 1. closing the index, 2. replacing and 3. reopening it
Err(PersistError { error, file: _ }) => return Err(Error::IoError(error)),
};
// 5. prepare to close the index and wait for it
// The next time the index is opened, it will use the new compacted data
let closing_event = self.index_mapper.close_index(rtxn, index_uid)?;
if let Some(closing_event) = closing_event {
progress.update_progress(IndexCompaction::CloseTheIndex);
drop(index);
closing_event.wait();
}
Ok((pre_size, post_size))
}
/// Swap the index `lhs` with the index `rhs`.
fn apply_index_swap(
&self,