From 1d188a7ad3f71fcc367e846b04e84a00a5244ae1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Thu, 16 Oct 2025 13:00:20 +0200 Subject: [PATCH] Make the compaction tasks a priority over the export ones --- .../src/scheduler/create_batch.rs | 21 +++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/crates/index-scheduler/src/scheduler/create_batch.rs b/crates/index-scheduler/src/scheduler/create_batch.rs index 06d796548..86023c07e 100644 --- a/crates/index-scheduler/src/scheduler/create_batch.rs +++ b/crates/index-scheduler/src/scheduler/create_batch.rs @@ -525,7 +525,20 @@ impl IndexScheduler { return Ok(Some((Batch::TaskDeletions(tasks), current_batch))); } - // 3. we batch the export. + // 3. we get the next task to compact + let to_compact = self.queue.tasks.get_kind(rtxn, Kind::IndexCompaction)? & enqueued; + if let Some(task_id) = to_compact.min() { + let mut task = + self.queue.tasks.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?; + current_batch.processing(Some(&mut task)); + current_batch + .reason(BatchStopReason::TaskKindCannotBeBatched { kind: Kind::IndexCompaction }); + let index_uid = + task.index_uid().expect("Compaction task must have an index uid").to_owned(); + return Ok(Some((Batch::IndexCompaction { index_uid, task }, current_batch))); + } + + // 4. we batch the export. let to_export = self.queue.tasks.get_kind(rtxn, Kind::Export)? & enqueued; if !to_export.is_empty() { let task_id = to_export.iter().next().expect("There must be at least one export task"); @@ -535,7 +548,7 @@ impl IndexScheduler { return Ok(Some((Batch::Export { task }, current_batch))); } - // 4. we batch the snapshot. + // 5. we batch the snapshot. let to_snapshot = self.queue.tasks.get_kind(rtxn, Kind::SnapshotCreation)? & enqueued; if !to_snapshot.is_empty() { let mut tasks = self.queue.tasks.get_existing_tasks(rtxn, to_snapshot)?; @@ -545,7 +558,7 @@ impl IndexScheduler { return Ok(Some((Batch::SnapshotCreation(tasks), current_batch))); } - // 5. we batch the dumps. + // 6. we batch the dumps. let to_dump = self.queue.tasks.get_kind(rtxn, Kind::DumpCreation)? & enqueued; if let Some(to_dump) = to_dump.min() { let mut task = @@ -558,7 +571,7 @@ impl IndexScheduler { return Ok(Some((Batch::Dump(task), current_batch))); } - // 6. We make a batch from the unprioritised tasks. Start by taking the next enqueued task. + // 7. We make a batch from the unprioritised tasks. Start by taking the next enqueued task. let task_id = if let Some(task_id) = enqueued.min() { task_id } else { return Ok(None) }; let mut task = self.queue.tasks.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;