From 38cbd546042ecc41e99894c6716d7a149f34930d Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Mon, 6 Oct 2025 18:17:59 +0200 Subject: [PATCH] Implement the index compaction task --- crates/index-scheduler/src/processing.rs | 10 ++ .../src/scheduler/process_batch.rs | 101 +++++++++++++++++- 2 files changed, 107 insertions(+), 4 deletions(-) diff --git a/crates/index-scheduler/src/processing.rs b/crates/index-scheduler/src/processing.rs index 3da81f143..25c9a1539 100644 --- a/crates/index-scheduler/src/processing.rs +++ b/crates/index-scheduler/src/processing.rs @@ -138,6 +138,16 @@ make_enum_progress! { } } +make_enum_progress! { + pub enum IndexCompaction { + RetrieveTheIndex, + CreateTemporaryFile, + CopyAndCompactTheIndex, + PersistTheCompactedIndex, + CloseTheIndex, + } +} + make_enum_progress! { pub enum InnerSwappingTwoIndexes { RetrieveTheTasks, diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index b76fa250c..f6e8537e0 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -1,22 +1,26 @@ use std::collections::{BTreeSet, HashMap, HashSet}; +use std::io::{Seek, SeekFrom}; use std::panic::{catch_unwind, AssertUnwindSafe}; use std::sync::atomic::Ordering; +use byte_unit::Byte; use meilisearch_types::batches::{BatchEnqueuedAt, BatchId}; use meilisearch_types::heed::{RoTxn, RwTxn}; +use meilisearch_types::milli::heed::CompactionOption; use meilisearch_types::milli::progress::{Progress, VariableNameStep}; use meilisearch_types::milli::{self, ChannelCongestion}; use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task}; use meilisearch_types::versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH}; use milli::update::Settings as MilliSettings; use roaring::RoaringBitmap; +use tempfile::PersistError; use time::OffsetDateTime; use super::create_batch::Batch; use crate::processing::{ AtomicBatchStep, AtomicTaskStep, CreateIndexProgress, DeleteIndexProgress, FinalizingIndexStep, - InnerSwappingTwoIndexes, SwappingTheIndexes, TaskCancelationProgress, TaskDeletionProgress, - UpdateIndexProgress, + IndexCompaction, InnerSwappingTwoIndexes, SwappingTheIndexes, TaskCancelationProgress, + TaskDeletionProgress, UpdateIndexProgress, }; use crate::utils::{ self, remove_n_tasks_datetime_earlier_than, remove_task_datetime, swap_index_uid_in_task, @@ -418,8 +422,45 @@ impl IndexScheduler { task.status = Status::Succeeded; Ok((vec![task], ProcessBatchInfo::default())) } - Batch::IndexCompaction { index_uid, mut task } => { - todo!("Implement index compaction") + Batch::IndexCompaction { index_uid: _, mut task } => { + let KindWithContent::IndexCompaction { index_uid } = &task.kind else { + unreachable!() + }; + + let rtxn = self.env.read_txn()?; + let ret = catch_unwind(AssertUnwindSafe(|| { + self.apply_compaction(&rtxn, &progress, index_uid) + })); + + let (pre_size, post_size) = match ret { + Ok(Ok(stats)) => stats, + Ok(Err(Error::AbortedTask)) => return Err(Error::AbortedTask), + Ok(Err(e)) => return Err(e), + Err(e) => { + let msg = match e.downcast_ref::<&'static str>() { + Some(s) => *s, + None => match e.downcast_ref::() { + Some(s) => &s[..], + None => "Box", + }, + }; + return Err(Error::Export(Box::new(Error::ProcessBatchPanicked( + msg.to_string(), + )))); + } + }; + + task.status = Status::Succeeded; + if let Some(Details::IndexCompaction { + index_uid: _, + pre_compaction_size, + post_compaction_size, + }) = task.details.as_mut() + { + *pre_compaction_size = Some(Byte::from_u64(pre_size)); + *post_compaction_size = Some(Byte::from_u64(post_size)); + } + Ok((vec![task], ProcessBatchInfo::default())) } Batch::Export { mut task } => { let KindWithContent::Export { url, api_key, payload_size, indexes } = &task.kind @@ -496,6 +537,58 @@ impl IndexScheduler { } } + fn apply_compaction( + &self, + rtxn: &RoTxn, + progress: &Progress, + index_uid: &str, + ) -> Result<(u64, u64)> { + // 1. Verify that the index exists + if !self.index_mapper.index_exists(rtxn, index_uid)? { + return Err(Error::IndexNotFound(index_uid.to_owned())); + } + + // 2. We retrieve the index and create a temporary file in the index directory + progress.update_progress(IndexCompaction::RetrieveTheIndex); + let index = self.index_mapper.index(rtxn, index_uid)?; + let pre_size = index.map_size() as u64; + progress.update_progress(IndexCompaction::CreateTemporaryFile); + let mut file = tempfile::Builder::new() + .suffix("data.") + .prefix(".mdb.cpy") + .tempfile_in(index.path())?; + + // 3. We copy the index data to the temporary file + progress.update_progress(IndexCompaction::CopyAndCompactTheIndex); + index + .copy_to_file(file.as_file_mut(), CompactionOption::Enabled) + .map_err(|error| Error::Milli { error, index_uid: Some(index_uid.to_string()) })?; + // ...and reset the file position as specified in the documentation + file.seek(SeekFrom::Start(0))?; + + // 4. We replace the index data file with the temporary file + progress.update_progress(IndexCompaction::PersistTheCompactedIndex); + let post_size = file.as_file().metadata()?.len(); + match file.persist(index.path().join("data.mdb")) { + Ok(file) => file.sync_all()?, + // TODO see if we have a _resource busy_ error and probably handle this by: + // 1. closing the index, 2. replacing and 3. reopening it + Err(PersistError { error, file: _ }) => return Err(Error::IoError(error)), + }; + + // 5. prepare to close the index and wait for it + // The next time the index is opened, it will use the new compacted data + let closing_event = self.index_mapper.close_index(rtxn, index_uid)?; + + if let Some(closing_event) = closing_event { + progress.update_progress(IndexCompaction::CloseTheIndex); + drop(index); + closing_event.wait(); + } + + Ok((pre_size, post_size)) + } + /// Swap the index `lhs` with the index `rhs`. fn apply_index_swap( &self,