Add the necessary batches and tasks in the process

This commit is contained in:
Clément Renault
2025-10-02 17:17:32 +02:00
committed by Kerollmops
parent 53905c1362
commit f95398420b
9 changed files with 131 additions and 16 deletions

View File

@@ -25,6 +25,7 @@ enum AutobatchKind {
IndexDeletion,
IndexUpdate,
IndexSwap,
CompactIndex,
}
impl AutobatchKind {
@@ -68,6 +69,7 @@ impl From<KindWithContent> for AutobatchKind {
KindWithContent::IndexCreation { .. } => AutobatchKind::IndexCreation,
KindWithContent::IndexUpdate { .. } => AutobatchKind::IndexUpdate,
KindWithContent::IndexSwap { .. } => AutobatchKind::IndexSwap,
KindWithContent::CompactIndex { .. } => AutobatchKind::CompactIndex,
KindWithContent::TaskCancelation { .. }
| KindWithContent::TaskDeletion { .. }
| KindWithContent::DumpCreation { .. }
@@ -118,6 +120,9 @@ pub enum BatchKind {
IndexSwap {
id: TaskId,
},
CompactIndex {
id: TaskId,
},
}
impl BatchKind {
@@ -183,6 +188,13 @@ impl BatchKind {
)),
false,
),
K::CompactIndex => (
Break((
BatchKind::CompactIndex { id: task_id },
BatchStopReason::TaskCannotBeBatched { kind, id: task_id },
)),
false,
),
K::DocumentClear => (Continue(BatchKind::DocumentClear { ids: vec![task_id] }), false),
K::DocumentImport { allow_index_creation, primary_key: pk }
if primary_key.is_none() || pk.is_none() || primary_key == pk.as_deref() =>
@@ -287,8 +299,10 @@ impl BatchKind {
};
match (self, autobatch_kind) {
// We don't batch any of these operations
(this, K::IndexCreation | K::IndexUpdate | K::IndexSwap | K::DocumentEdition) => Break((this, BatchStopReason::TaskCannotBeBatched { kind, id })),
// We don't batch any of these operations
(this, K::IndexCreation | K::IndexUpdate | K::IndexSwap | K::DocumentEdition | K::CompactIndex) => {
Break((this, BatchStopReason::TaskCannotBeBatched { kind, id }))
},
// We must not batch tasks that don't have the same index creation rights if the index doesn't already exists.
(this, kind) if !index_already_exists && this.allow_index_creation() == Some(false) && kind.allow_index_creation() == Some(true) => {
Break((this, BatchStopReason::IndexCreationMismatch { id }))
@@ -483,6 +497,7 @@ impl BatchKind {
| BatchKind::IndexDeletion { .. }
| BatchKind::IndexUpdate { .. }
| BatchKind::IndexSwap { .. }
| BatchKind::CompactIndex { .. }
| BatchKind::DocumentEdition { .. },
_,
) => {

View File

@@ -55,6 +55,10 @@ pub(crate) enum Batch {
UpgradeDatabase {
tasks: Vec<Task>,
},
CompactIndex {
index_uid: String,
task: Task,
},
}
#[derive(Debug)]
@@ -110,7 +114,8 @@ impl Batch {
| Batch::Dump(task)
| Batch::IndexCreation { task, .. }
| Batch::Export { task }
| Batch::IndexUpdate { task, .. } => {
| Batch::IndexUpdate { task, .. }
| Batch::CompactIndex { task, .. } => {
RoaringBitmap::from_sorted_iter(std::iter::once(task.uid)).unwrap()
}
Batch::SnapshotCreation(tasks)
@@ -155,7 +160,8 @@ impl Batch {
IndexOperation { op, .. } => Some(op.index_uid()),
IndexCreation { index_uid, .. }
| IndexUpdate { index_uid, .. }
| IndexDeletion { index_uid, .. } => Some(index_uid),
| IndexDeletion { index_uid, .. }
| CompactIndex { index_uid, .. } => Some(index_uid),
}
}
}
@@ -175,6 +181,7 @@ impl fmt::Display for Batch {
Batch::IndexUpdate { .. } => f.write_str("IndexUpdate")?,
Batch::IndexDeletion { .. } => f.write_str("IndexDeletion")?,
Batch::IndexSwap { .. } => f.write_str("IndexSwap")?,
Batch::CompactIndex { .. } => f.write_str("CompactIndex")?,
Batch::Export { .. } => f.write_str("Export")?,
Batch::UpgradeDatabase { .. } => f.write_str("UpgradeDatabase")?,
};
@@ -430,6 +437,12 @@ impl IndexScheduler {
current_batch.processing(Some(&mut task));
Ok(Some(Batch::IndexSwap { task }))
}
BatchKind::CompactIndex { id } => {
let mut task =
self.queue.tasks.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
current_batch.processing(Some(&mut task));
Ok(Some(Batch::CompactIndex { index_uid, task }))
}
}
}

View File

@@ -418,6 +418,9 @@ impl IndexScheduler {
task.status = Status::Succeeded;
Ok((vec![task], ProcessBatchInfo::default()))
}
Batch::CompactIndex { index_uid, mut task } => {
todo!("Implement compact index")
}
Batch::Export { mut task } => {
let KindWithContent::Export { url, api_key, payload_size, indexes } = &task.kind
else {