Delete the leftover compaction files from canceled operations

This commit is contained in:
Kerollmops
2025-10-15 16:46:20 +02:00
parent ffacf1c002
commit ddc76ad0dc
2 changed files with 36 additions and 5 deletions

View File

@@ -75,6 +75,7 @@ make_enum_progress! {
pub enum TaskCancelationProgress {
RetrievingTasks,
CancelingUpgrade,
CleaningCompactionLeftover,
UpdatingTasks,
}
}

View File

@@ -1,6 +1,6 @@
use std::collections::{BTreeSet, HashMap, HashSet};
use std::fs::File;
use std::io::{Seek, SeekFrom};
use std::fs::{remove_file, File};
use std::io::{ErrorKind, Seek, SeekFrom};
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::sync::atomic::Ordering;
@@ -29,6 +29,9 @@ use crate::utils::{
};
use crate::{Error, IndexScheduler, Result, TaskId};
/// The name of the copy of the data.mdb file used during compaction.
const DATA_MDB_COPY_NAME: &str = "data.mdb.cpy";
#[derive(Debug, Default)]
pub struct ProcessBatchInfo {
/// The write channel congestion. None when unavailable: settings update.
@@ -562,7 +565,7 @@ impl IndexScheduler {
let src_path = index.path().join("data.mdb");
let pre_size = std::fs::metadata(&src_path)?.len();
let dst_path = TempPath::from_path(index.path().join("data.mdb.cpy"));
let dst_path = TempPath::from_path(index.path().join(DATA_MDB_COPY_NAME));
let file = File::create(&dst_path)?;
let mut file = tempfile::NamedTempFile::from_parts(file, dst_path);
@@ -912,9 +915,10 @@ impl IndexScheduler {
let enqueued_tasks = &self.queue.tasks.get_status(rtxn, Status::Enqueued)?;
// 0. Check if any upgrade task was matched.
// 0. Check if any upgrade or compaction tasks were matched.
// If so, we cancel all the failed or enqueued upgrade tasks.
let upgrade_tasks = &self.queue.tasks.get_kind(rtxn, Kind::UpgradeDatabase)?;
let compaction_tasks = &self.queue.tasks.get_kind(rtxn, Kind::IndexCompaction)?;
let is_canceling_upgrade = !matched_tasks.is_disjoint(upgrade_tasks);
if is_canceling_upgrade {
let failed_tasks = self.queue.tasks.get_status(rtxn, Status::Failed)?;
@@ -979,7 +983,33 @@ impl IndexScheduler {
}
}
// 3. We now have a list of tasks to cancel, cancel them
// 3. If we are cancelling a compaction task, remove the tempfiles after incomplete compactions
for compaction_task in &tasks_to_cancel & compaction_tasks {
progress.update_progress(TaskCancelationProgress::CleaningCompactionLeftover);
let task = self.queue.tasks.get_task(rtxn, compaction_task)?.unwrap();
let Some(Details::IndexCompaction {
index_uid,
pre_compaction_size: _,
post_compaction_size: _,
}) = task.details
else {
unreachable!("wrong details for compaction task {compaction_task}")
};
let index_path = match self.index_mapper.index_mapping.get(rtxn, &index_uid)? {
Some(index_uuid) => self.index_mapper.index_path(index_uuid),
None => return Err(Error::IndexNotFound(index_uid)),
};
if let Err(e) = remove_file(index_path.join(DATA_MDB_COPY_NAME)) {
match e.kind() {
ErrorKind::NotFound => (),
_ => return Err(Error::IoError(e)),
}
}
}
// 4. We now have a list of tasks to cancel, cancel them
let (task_progress, progress_obj) = AtomicTaskStep::new(tasks_to_cancel.len() as u32);
progress.update_progress(progress_obj);