From ddc76ad0dcbdf8fed3f08e69e6ad59910146a896 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Wed, 15 Oct 2025 16:46:20 +0200 Subject: [PATCH] Delete the leftover compaction files from canceled operations --- crates/index-scheduler/src/processing.rs | 1 + .../src/scheduler/process_batch.rs | 40 ++++++++++++++++--- 2 files changed, 36 insertions(+), 5 deletions(-) diff --git a/crates/index-scheduler/src/processing.rs b/crates/index-scheduler/src/processing.rs index cf6c8c686..16de63244 100644 --- a/crates/index-scheduler/src/processing.rs +++ b/crates/index-scheduler/src/processing.rs @@ -75,6 +75,7 @@ make_enum_progress! { pub enum TaskCancelationProgress { RetrievingTasks, CancelingUpgrade, + CleaningCompactionLeftover, UpdatingTasks, } } diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index ab17847ef..36ca68ed9 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -1,6 +1,6 @@ use std::collections::{BTreeSet, HashMap, HashSet}; -use std::fs::File; -use std::io::{Seek, SeekFrom}; +use std::fs::{remove_file, File}; +use std::io::{ErrorKind, Seek, SeekFrom}; use std::panic::{catch_unwind, AssertUnwindSafe}; use std::sync::atomic::Ordering; @@ -29,6 +29,9 @@ use crate::utils::{ }; use crate::{Error, IndexScheduler, Result, TaskId}; +/// The name of the copy of the data.mdb file used during compaction. +const DATA_MDB_COPY_NAME: &str = "data.mdb.cpy"; + #[derive(Debug, Default)] pub struct ProcessBatchInfo { /// The write channel congestion. None when unavailable: settings update. @@ -562,7 +565,7 @@ impl IndexScheduler { let src_path = index.path().join("data.mdb"); let pre_size = std::fs::metadata(&src_path)?.len(); - let dst_path = TempPath::from_path(index.path().join("data.mdb.cpy")); + let dst_path = TempPath::from_path(index.path().join(DATA_MDB_COPY_NAME)); let file = File::create(&dst_path)?; let mut file = tempfile::NamedTempFile::from_parts(file, dst_path); @@ -912,9 +915,10 @@ impl IndexScheduler { let enqueued_tasks = &self.queue.tasks.get_status(rtxn, Status::Enqueued)?; - // 0. Check if any upgrade task was matched. + // 0. Check if any upgrade or compaction tasks were matched. // If so, we cancel all the failed or enqueued upgrade tasks. let upgrade_tasks = &self.queue.tasks.get_kind(rtxn, Kind::UpgradeDatabase)?; + let compaction_tasks = &self.queue.tasks.get_kind(rtxn, Kind::IndexCompaction)?; let is_canceling_upgrade = !matched_tasks.is_disjoint(upgrade_tasks); if is_canceling_upgrade { let failed_tasks = self.queue.tasks.get_status(rtxn, Status::Failed)?; @@ -979,7 +983,33 @@ impl IndexScheduler { } } - // 3. We now have a list of tasks to cancel, cancel them + // 3. If we are cancelling a compaction task, remove the tempfiles after incomplete compactions + for compaction_task in &tasks_to_cancel & compaction_tasks { + progress.update_progress(TaskCancelationProgress::CleaningCompactionLeftover); + let task = self.queue.tasks.get_task(rtxn, compaction_task)?.unwrap(); + let Some(Details::IndexCompaction { + index_uid, + pre_compaction_size: _, + post_compaction_size: _, + }) = task.details + else { + unreachable!("wrong details for compaction task {compaction_task}") + }; + + let index_path = match self.index_mapper.index_mapping.get(rtxn, &index_uid)? { + Some(index_uuid) => self.index_mapper.index_path(index_uuid), + None => return Err(Error::IndexNotFound(index_uid)), + }; + + if let Err(e) = remove_file(index_path.join(DATA_MDB_COPY_NAME)) { + match e.kind() { + ErrorKind::NotFound => (), + _ => return Err(Error::IoError(e)), + } + } + } + + // 4. We now have a list of tasks to cancel, cancel them let (task_progress, progress_obj) = AtomicTaskStep::new(tasks_to_cancel.len() as u32); progress.update_progress(progress_obj);