From 4a84f1cd1aa36e5d7209217d1e6993c881b660f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Thu, 2 Oct 2025 17:17:32 +0200 Subject: [PATCH] Add the necessary batches and tasks in the process --- crates/dump/src/lib.rs | 4 ++ crates/index-scheduler/src/dump.rs | 1 + crates/index-scheduler/src/insta_snapshot.rs | 3 ++ .../src/scheduler/autobatcher.rs | 19 +++++++- .../src/scheduler/create_batch.rs | 17 ++++++- .../src/scheduler/process_batch.rs | 3 ++ crates/index-scheduler/src/utils.rs | 24 ++++++---- crates/meilisearch-types/src/task_view.rs | 32 ++++++++++++++ crates/meilisearch-types/src/tasks.rs | 44 +++++++++++++++++-- 9 files changed, 131 insertions(+), 16 deletions(-) diff --git a/crates/dump/src/lib.rs b/crates/dump/src/lib.rs index fdbd701be..c33f02422 100644 --- a/crates/dump/src/lib.rs +++ b/crates/dump/src/lib.rs @@ -158,6 +158,9 @@ pub enum KindDump { UpgradeDatabase { from: (u32, u32, u32), }, + CompactIndex { + index_uid: String, + }, } impl From for TaskDump { @@ -240,6 +243,7 @@ impl From for KindDump { KindWithContent::UpgradeDatabase { from: version } => { KindDump::UpgradeDatabase { from: version } } + KindWithContent::CompactIndex { index_uid } => KindDump::CompactIndex { index_uid }, } } } diff --git a/crates/index-scheduler/src/dump.rs b/crates/index-scheduler/src/dump.rs index e5e7a5d8c..b44e62bc9 100644 --- a/crates/index-scheduler/src/dump.rs +++ b/crates/index-scheduler/src/dump.rs @@ -234,6 +234,7 @@ impl<'a> Dump<'a> { } } KindDump::UpgradeDatabase { from } => KindWithContent::UpgradeDatabase { from }, + KindDump::CompactIndex { index_uid } => KindWithContent::CompactIndex { index_uid }, }, }; diff --git a/crates/index-scheduler/src/insta_snapshot.rs b/crates/index-scheduler/src/insta_snapshot.rs index df043ad87..d564d8ca8 100644 --- a/crates/index-scheduler/src/insta_snapshot.rs +++ b/crates/index-scheduler/src/insta_snapshot.rs @@ -317,6 +317,9 @@ fn snapshot_details(d: &Details) -> String { Details::UpgradeDatabase { from, to } => { format!("{{ from: {from:?}, to: {to:?} }}") } + Details::CompactIndex { index_uid, pre_compaction_size, post_compaction_size } => { + format!("{{ index_uid: {index_uid:?}, pre_compaction_size: {pre_compaction_size:?}, post_compaction_size: {post_compaction_size:?} }}") + } } } diff --git a/crates/index-scheduler/src/scheduler/autobatcher.rs b/crates/index-scheduler/src/scheduler/autobatcher.rs index a88a9f0bf..ad3ae24e8 100644 --- a/crates/index-scheduler/src/scheduler/autobatcher.rs +++ b/crates/index-scheduler/src/scheduler/autobatcher.rs @@ -25,6 +25,7 @@ enum AutobatchKind { IndexDeletion, IndexUpdate, IndexSwap, + CompactIndex, } impl AutobatchKind { @@ -68,6 +69,7 @@ impl From for AutobatchKind { KindWithContent::IndexCreation { .. } => AutobatchKind::IndexCreation, KindWithContent::IndexUpdate { .. } => AutobatchKind::IndexUpdate, KindWithContent::IndexSwap { .. } => AutobatchKind::IndexSwap, + KindWithContent::CompactIndex { .. } => AutobatchKind::CompactIndex, KindWithContent::TaskCancelation { .. } | KindWithContent::TaskDeletion { .. } | KindWithContent::DumpCreation { .. } @@ -118,6 +120,9 @@ pub enum BatchKind { IndexSwap { id: TaskId, }, + CompactIndex { + id: TaskId, + }, } impl BatchKind { @@ -183,6 +188,13 @@ impl BatchKind { )), false, ), + K::CompactIndex => ( + Break(( + BatchKind::CompactIndex { id: task_id }, + BatchStopReason::TaskCannotBeBatched { kind, id: task_id }, + )), + false, + ), K::DocumentClear => (Continue(BatchKind::DocumentClear { ids: vec![task_id] }), false), K::DocumentImport { allow_index_creation, primary_key: pk } if primary_key.is_none() || pk.is_none() || primary_key == pk.as_deref() => @@ -287,8 +299,10 @@ impl BatchKind { }; match (self, autobatch_kind) { - // We don't batch any of these operations - (this, K::IndexCreation | K::IndexUpdate | K::IndexSwap | K::DocumentEdition) => Break((this, BatchStopReason::TaskCannotBeBatched { kind, id })), + // We don't batch any of these operations + (this, K::IndexCreation | K::IndexUpdate | K::IndexSwap | K::DocumentEdition | K::CompactIndex) => { + Break((this, BatchStopReason::TaskCannotBeBatched { kind, id })) + }, // We must not batch tasks that don't have the same index creation rights if the index doesn't already exists. (this, kind) if !index_already_exists && this.allow_index_creation() == Some(false) && kind.allow_index_creation() == Some(true) => { Break((this, BatchStopReason::IndexCreationMismatch { id })) @@ -483,6 +497,7 @@ impl BatchKind { | BatchKind::IndexDeletion { .. } | BatchKind::IndexUpdate { .. } | BatchKind::IndexSwap { .. } + | BatchKind::CompactIndex { .. } | BatchKind::DocumentEdition { .. }, _, ) => { diff --git a/crates/index-scheduler/src/scheduler/create_batch.rs b/crates/index-scheduler/src/scheduler/create_batch.rs index c598ad405..ecfb2c779 100644 --- a/crates/index-scheduler/src/scheduler/create_batch.rs +++ b/crates/index-scheduler/src/scheduler/create_batch.rs @@ -55,6 +55,10 @@ pub(crate) enum Batch { UpgradeDatabase { tasks: Vec, }, + CompactIndex { + index_uid: String, + task: Task, + }, } #[derive(Debug)] @@ -110,7 +114,8 @@ impl Batch { | Batch::Dump(task) | Batch::IndexCreation { task, .. } | Batch::Export { task } - | Batch::IndexUpdate { task, .. } => { + | Batch::IndexUpdate { task, .. } + | Batch::CompactIndex { task, .. } => { RoaringBitmap::from_sorted_iter(std::iter::once(task.uid)).unwrap() } Batch::SnapshotCreation(tasks) @@ -155,7 +160,8 @@ impl Batch { IndexOperation { op, .. } => Some(op.index_uid()), IndexCreation { index_uid, .. } | IndexUpdate { index_uid, .. } - | IndexDeletion { index_uid, .. } => Some(index_uid), + | IndexDeletion { index_uid, .. } + | CompactIndex { index_uid, .. } => Some(index_uid), } } } @@ -175,6 +181,7 @@ impl fmt::Display for Batch { Batch::IndexUpdate { .. } => f.write_str("IndexUpdate")?, Batch::IndexDeletion { .. } => f.write_str("IndexDeletion")?, Batch::IndexSwap { .. } => f.write_str("IndexSwap")?, + Batch::CompactIndex { .. } => f.write_str("CompactIndex")?, Batch::Export { .. } => f.write_str("Export")?, Batch::UpgradeDatabase { .. } => f.write_str("UpgradeDatabase")?, }; @@ -430,6 +437,12 @@ impl IndexScheduler { current_batch.processing(Some(&mut task)); Ok(Some(Batch::IndexSwap { task })) } + BatchKind::CompactIndex { id } => { + let mut task = + self.queue.tasks.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?; + current_batch.processing(Some(&mut task)); + Ok(Some(Batch::CompactIndex { index_uid, task })) + } } } diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index efa137cdb..b306d2bd4 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -418,6 +418,9 @@ impl IndexScheduler { task.status = Status::Succeeded; Ok((vec![task], ProcessBatchInfo::default())) } + Batch::CompactIndex { index_uid, mut task } => { + todo!("Implement compact index") + } Batch::Export { mut task } => { let KindWithContent::Export { url, api_key, payload_size, indexes } = &task.kind else { diff --git a/crates/index-scheduler/src/utils.rs b/crates/index-scheduler/src/utils.rs index 2617aba99..3dc74d0a3 100644 --- a/crates/index-scheduler/src/utils.rs +++ b/crates/index-scheduler/src/utils.rs @@ -256,14 +256,15 @@ pub fn swap_index_uid_in_task(task: &mut Task, swap: (&str, &str)) { use KindWithContent as K; let mut index_uids = vec![]; match &mut task.kind { - K::DocumentAdditionOrUpdate { index_uid, .. } => index_uids.push(index_uid), - K::DocumentEdition { index_uid, .. } => index_uids.push(index_uid), - K::DocumentDeletion { index_uid, .. } => index_uids.push(index_uid), - K::DocumentDeletionByFilter { index_uid, .. } => index_uids.push(index_uid), - K::DocumentClear { index_uid } => index_uids.push(index_uid), - K::SettingsUpdate { index_uid, .. } => index_uids.push(index_uid), - K::IndexDeletion { index_uid } => index_uids.push(index_uid), - K::IndexCreation { index_uid, .. } => index_uids.push(index_uid), + K::DocumentAdditionOrUpdate { index_uid, .. } + | K::DocumentEdition { index_uid, .. } + | K::DocumentDeletion { index_uid, .. } + | K::DocumentDeletionByFilter { index_uid, .. } + | K::DocumentClear { index_uid } + | K::SettingsUpdate { index_uid, .. } + | K::IndexDeletion { index_uid } + | K::IndexCreation { index_uid, .. } + | K::CompactIndex { index_uid, .. } => index_uids.push(index_uid), K::IndexUpdate { index_uid, new_index_uid, .. } => { index_uids.push(index_uid); if let Some(new_uid) = new_index_uid { @@ -618,6 +619,13 @@ impl crate::IndexScheduler { Details::UpgradeDatabase { from: _, to: _ } => { assert_eq!(kind.as_kind(), Kind::UpgradeDatabase); } + Details::CompactIndex { + index_uid: _, + pre_compaction_size: _, + post_compaction_size: _, + } => { + assert_eq!(kind.as_kind(), Kind::CompactIndex); + } } } diff --git a/crates/meilisearch-types/src/task_view.rs b/crates/meilisearch-types/src/task_view.rs index cbc29a11b..ea6dc4988 100644 --- a/crates/meilisearch-types/src/task_view.rs +++ b/crates/meilisearch-types/src/task_view.rs @@ -142,6 +142,11 @@ pub struct DetailsView { pub old_index_uid: Option, #[serde(skip_serializing_if = "Option::is_none")] pub new_index_uid: Option, + // index compaction + #[serde(skip_serializing_if = "Option::is_none")] + pub pre_compaction_size: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub post_compaction_size: Option, } impl DetailsView { @@ -314,6 +319,24 @@ impl DetailsView { // We should never be able to batch multiple renames at the same time. (Some(left), Some(_right)) => Some(left), }, + pre_compaction_size: match ( + self.pre_compaction_size.clone(), + other.pre_compaction_size.clone(), + ) { + (None, None) => None, + (None, Some(size)) | (Some(size), None) => Some(size), + // We should never be able to batch multiple renames at the same time. + (Some(left), Some(_right)) => Some(left), + }, + post_compaction_size: match ( + self.post_compaction_size.clone(), + other.post_compaction_size.clone(), + ) { + (None, None) => None, + (None, Some(size)) | (Some(size), None) => Some(size), + // We should never be able to batch multiple renames at the same time. + (Some(left), Some(_right)) => Some(left), + }, } } } @@ -415,6 +438,15 @@ impl From
for DetailsView { upgrade_to: Some(format!("v{}.{}.{}", to.0, to.1, to.2)), ..Default::default() }, + Details::CompactIndex { pre_compaction_size, post_compaction_size, .. } => { + DetailsView { + pre_compaction_size: pre_compaction_size + .map(|size| size.get_appropriate_unit(UnitType::Both).to_string()), + post_compaction_size: post_compaction_size + .map(|size| size.get_appropriate_unit(UnitType::Both).to_string()), + ..Default::default() + } + } } } } diff --git a/crates/meilisearch-types/src/tasks.rs b/crates/meilisearch-types/src/tasks.rs index d0f668255..c1794fe6a 100644 --- a/crates/meilisearch-types/src/tasks.rs +++ b/crates/meilisearch-types/src/tasks.rs @@ -67,7 +67,8 @@ impl Task { | SettingsUpdate { index_uid, .. } | IndexCreation { index_uid, .. } | IndexUpdate { index_uid, .. } - | IndexDeletion { index_uid } => Some(index_uid), + | IndexDeletion { index_uid } + | CompactIndex { index_uid } => Some(index_uid), } } @@ -94,7 +95,8 @@ impl Task { | KindWithContent::DumpCreation { .. } | KindWithContent::SnapshotCreation | KindWithContent::Export { .. } - | KindWithContent::UpgradeDatabase { .. } => None, + | KindWithContent::UpgradeDatabase { .. } + | KindWithContent::CompactIndex { .. } => None, } } } @@ -170,6 +172,9 @@ pub enum KindWithContent { UpgradeDatabase { from: (u32, u32, u32), }, + CompactIndex { + index_uid: String, + }, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)] @@ -206,6 +211,7 @@ impl KindWithContent { KindWithContent::SnapshotCreation => Kind::SnapshotCreation, KindWithContent::Export { .. } => Kind::Export, KindWithContent::UpgradeDatabase { .. } => Kind::UpgradeDatabase, + KindWithContent::CompactIndex { .. } => Kind::CompactIndex, } } @@ -226,7 +232,8 @@ impl KindWithContent { | DocumentClear { index_uid } | SettingsUpdate { index_uid, .. } | IndexCreation { index_uid, .. } - | IndexDeletion { index_uid } => vec![index_uid], + | IndexDeletion { index_uid } + | CompactIndex { index_uid } => vec![index_uid], IndexUpdate { index_uid, new_index_uid, .. } => { let mut indexes = vec![index_uid.as_str()]; if let Some(new_uid) = new_index_uid { @@ -325,6 +332,11 @@ impl KindWithContent { versioning::VERSION_PATCH, ), }), + KindWithContent::CompactIndex { index_uid } => Some(Details::CompactIndex { + index_uid: index_uid.clone(), + pre_compaction_size: None, + post_compaction_size: None, + }), } } @@ -407,6 +419,11 @@ impl KindWithContent { versioning::VERSION_PATCH, ), }), + KindWithContent::CompactIndex { index_uid } => Some(Details::CompactIndex { + index_uid: index_uid.clone(), + pre_compaction_size: None, + post_compaction_size: None, + }), } } } @@ -469,6 +486,11 @@ impl From<&KindWithContent> for Option
{ versioning::VERSION_PATCH, ), }), + KindWithContent::CompactIndex { index_uid } => Some(Details::CompactIndex { + index_uid: index_uid.clone(), + pre_compaction_size: None, + post_compaction_size: None, + }), } } } @@ -579,6 +601,7 @@ pub enum Kind { SnapshotCreation, Export, UpgradeDatabase, + CompactIndex, } impl Kind { @@ -590,7 +613,8 @@ impl Kind { | Kind::SettingsUpdate | Kind::IndexCreation | Kind::IndexDeletion - | Kind::IndexUpdate => true, + | Kind::IndexUpdate + | Kind::CompactIndex => true, Kind::IndexSwap | Kind::TaskCancelation | Kind::TaskDeletion @@ -618,6 +642,7 @@ impl Display for Kind { Kind::SnapshotCreation => write!(f, "snapshotCreation"), Kind::Export => write!(f, "export"), Kind::UpgradeDatabase => write!(f, "upgradeDatabase"), + Kind::CompactIndex => write!(f, "compactIndex"), } } } @@ -653,6 +678,8 @@ impl FromStr for Kind { Ok(Kind::Export) } else if kind.eq_ignore_ascii_case("upgradeDatabase") { Ok(Kind::UpgradeDatabase) + } else if kind.eq_ignore_ascii_case("compactIndex") { + Ok(Kind::CompactIndex) } else { Err(ParseTaskKindError(kind.to_owned())) } @@ -738,6 +765,11 @@ pub enum Details { from: (u32, u32, u32), to: (u32, u32, u32), }, + CompactIndex { + index_uid: String, + pre_compaction_size: Option, + post_compaction_size: Option, + }, } #[derive(Debug, PartialEq, Clone, Serialize, Deserialize, ToSchema)] @@ -800,6 +832,10 @@ impl Details { Self::ClearAll { deleted_documents } => *deleted_documents = Some(0), Self::TaskCancelation { canceled_tasks, .. } => *canceled_tasks = Some(0), Self::TaskDeletion { deleted_tasks, .. } => *deleted_tasks = Some(0), + Self::CompactIndex { pre_compaction_size, post_compaction_size, .. } => { + *pre_compaction_size = None; + *post_compaction_size = None; + } Self::SettingsUpdate { .. } | Self::IndexInfo { .. } | Self::Dump { .. }