Add the necessary batches and tasks in the process

This commit is contained in:
Clément Renault
2025-10-02 17:17:32 +02:00
parent 511cb0ff82
commit 4a84f1cd1a
9 changed files with 131 additions and 16 deletions

View File

@@ -234,6 +234,7 @@ impl<'a> Dump<'a> {
}
}
KindDump::UpgradeDatabase { from } => KindWithContent::UpgradeDatabase { from },
KindDump::CompactIndex { index_uid } => KindWithContent::CompactIndex { index_uid },
},
};

View File

@@ -317,6 +317,9 @@ fn snapshot_details(d: &Details) -> String {
Details::UpgradeDatabase { from, to } => {
format!("{{ from: {from:?}, to: {to:?} }}")
}
Details::CompactIndex { index_uid, pre_compaction_size, post_compaction_size } => {
format!("{{ index_uid: {index_uid:?}, pre_compaction_size: {pre_compaction_size:?}, post_compaction_size: {post_compaction_size:?} }}")
}
}
}

View File

@@ -25,6 +25,7 @@ enum AutobatchKind {
IndexDeletion,
IndexUpdate,
IndexSwap,
CompactIndex,
}
impl AutobatchKind {
@@ -68,6 +69,7 @@ impl From<KindWithContent> for AutobatchKind {
KindWithContent::IndexCreation { .. } => AutobatchKind::IndexCreation,
KindWithContent::IndexUpdate { .. } => AutobatchKind::IndexUpdate,
KindWithContent::IndexSwap { .. } => AutobatchKind::IndexSwap,
KindWithContent::CompactIndex { .. } => AutobatchKind::CompactIndex,
KindWithContent::TaskCancelation { .. }
| KindWithContent::TaskDeletion { .. }
| KindWithContent::DumpCreation { .. }
@@ -118,6 +120,9 @@ pub enum BatchKind {
IndexSwap {
id: TaskId,
},
CompactIndex {
id: TaskId,
},
}
impl BatchKind {
@@ -183,6 +188,13 @@ impl BatchKind {
)),
false,
),
K::CompactIndex => (
Break((
BatchKind::CompactIndex { id: task_id },
BatchStopReason::TaskCannotBeBatched { kind, id: task_id },
)),
false,
),
K::DocumentClear => (Continue(BatchKind::DocumentClear { ids: vec![task_id] }), false),
K::DocumentImport { allow_index_creation, primary_key: pk }
if primary_key.is_none() || pk.is_none() || primary_key == pk.as_deref() =>
@@ -287,8 +299,10 @@ impl BatchKind {
};
match (self, autobatch_kind) {
// We don't batch any of these operations
(this, K::IndexCreation | K::IndexUpdate | K::IndexSwap | K::DocumentEdition) => Break((this, BatchStopReason::TaskCannotBeBatched { kind, id })),
// We don't batch any of these operations
(this, K::IndexCreation | K::IndexUpdate | K::IndexSwap | K::DocumentEdition | K::CompactIndex) => {
Break((this, BatchStopReason::TaskCannotBeBatched { kind, id }))
},
// We must not batch tasks that don't have the same index creation rights if the index doesn't already exists.
(this, kind) if !index_already_exists && this.allow_index_creation() == Some(false) && kind.allow_index_creation() == Some(true) => {
Break((this, BatchStopReason::IndexCreationMismatch { id }))
@@ -483,6 +497,7 @@ impl BatchKind {
| BatchKind::IndexDeletion { .. }
| BatchKind::IndexUpdate { .. }
| BatchKind::IndexSwap { .. }
| BatchKind::CompactIndex { .. }
| BatchKind::DocumentEdition { .. },
_,
) => {

View File

@@ -55,6 +55,10 @@ pub(crate) enum Batch {
UpgradeDatabase {
tasks: Vec<Task>,
},
CompactIndex {
index_uid: String,
task: Task,
},
}
#[derive(Debug)]
@@ -110,7 +114,8 @@ impl Batch {
| Batch::Dump(task)
| Batch::IndexCreation { task, .. }
| Batch::Export { task }
| Batch::IndexUpdate { task, .. } => {
| Batch::IndexUpdate { task, .. }
| Batch::CompactIndex { task, .. } => {
RoaringBitmap::from_sorted_iter(std::iter::once(task.uid)).unwrap()
}
Batch::SnapshotCreation(tasks)
@@ -155,7 +160,8 @@ impl Batch {
IndexOperation { op, .. } => Some(op.index_uid()),
IndexCreation { index_uid, .. }
| IndexUpdate { index_uid, .. }
| IndexDeletion { index_uid, .. } => Some(index_uid),
| IndexDeletion { index_uid, .. }
| CompactIndex { index_uid, .. } => Some(index_uid),
}
}
}
@@ -175,6 +181,7 @@ impl fmt::Display for Batch {
Batch::IndexUpdate { .. } => f.write_str("IndexUpdate")?,
Batch::IndexDeletion { .. } => f.write_str("IndexDeletion")?,
Batch::IndexSwap { .. } => f.write_str("IndexSwap")?,
Batch::CompactIndex { .. } => f.write_str("CompactIndex")?,
Batch::Export { .. } => f.write_str("Export")?,
Batch::UpgradeDatabase { .. } => f.write_str("UpgradeDatabase")?,
};
@@ -430,6 +437,12 @@ impl IndexScheduler {
current_batch.processing(Some(&mut task));
Ok(Some(Batch::IndexSwap { task }))
}
BatchKind::CompactIndex { id } => {
let mut task =
self.queue.tasks.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
current_batch.processing(Some(&mut task));
Ok(Some(Batch::CompactIndex { index_uid, task }))
}
}
}

View File

@@ -418,6 +418,9 @@ impl IndexScheduler {
task.status = Status::Succeeded;
Ok((vec![task], ProcessBatchInfo::default()))
}
Batch::CompactIndex { index_uid, mut task } => {
todo!("Implement compact index")
}
Batch::Export { mut task } => {
let KindWithContent::Export { url, api_key, payload_size, indexes } = &task.kind
else {

View File

@@ -256,14 +256,15 @@ pub fn swap_index_uid_in_task(task: &mut Task, swap: (&str, &str)) {
use KindWithContent as K;
let mut index_uids = vec![];
match &mut task.kind {
K::DocumentAdditionOrUpdate { index_uid, .. } => index_uids.push(index_uid),
K::DocumentEdition { index_uid, .. } => index_uids.push(index_uid),
K::DocumentDeletion { index_uid, .. } => index_uids.push(index_uid),
K::DocumentDeletionByFilter { index_uid, .. } => index_uids.push(index_uid),
K::DocumentClear { index_uid } => index_uids.push(index_uid),
K::SettingsUpdate { index_uid, .. } => index_uids.push(index_uid),
K::IndexDeletion { index_uid } => index_uids.push(index_uid),
K::IndexCreation { index_uid, .. } => index_uids.push(index_uid),
K::DocumentAdditionOrUpdate { index_uid, .. }
| K::DocumentEdition { index_uid, .. }
| K::DocumentDeletion { index_uid, .. }
| K::DocumentDeletionByFilter { index_uid, .. }
| K::DocumentClear { index_uid }
| K::SettingsUpdate { index_uid, .. }
| K::IndexDeletion { index_uid }
| K::IndexCreation { index_uid, .. }
| K::CompactIndex { index_uid, .. } => index_uids.push(index_uid),
K::IndexUpdate { index_uid, new_index_uid, .. } => {
index_uids.push(index_uid);
if let Some(new_uid) = new_index_uid {
@@ -618,6 +619,13 @@ impl crate::IndexScheduler {
Details::UpgradeDatabase { from: _, to: _ } => {
assert_eq!(kind.as_kind(), Kind::UpgradeDatabase);
}
Details::CompactIndex {
index_uid: _,
pre_compaction_size: _,
post_compaction_size: _,
} => {
assert_eq!(kind.as_kind(), Kind::CompactIndex);
}
}
}