Use PipedArchiveBuilder to process snapshots without compaction

This commit is contained in:
Louis Dureuil
2025-10-02 11:18:54 +02:00
parent 467e15d9c0
commit 43a6505435

View File

@@ -76,6 +76,22 @@ unsafe fn remove_tasks(
impl IndexScheduler { impl IndexScheduler {
pub(super) fn process_snapshot( pub(super) fn process_snapshot(
&self,
progress: Progress,
tasks: Vec<Task>,
) -> Result<Vec<Task>> {
let compaction_option = if self.scheduler.experimental_no_snapshot_compaction {
CompactionOption::Disabled
} else {
CompactionOption::Enabled
};
match compaction_option {
CompactionOption::Enabled => self.process_snapshot_with_temp(progress, tasks),
CompactionOption::Disabled => self.process_snapshot_with_pipe(progress, tasks),
}
}
fn process_snapshot_with_temp(
&self, &self,
progress: Progress, progress: Progress,
mut tasks: Vec<Task>, mut tasks: Vec<Task>,
@@ -105,12 +121,8 @@ impl IndexScheduler {
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler); progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler);
let dst = temp_snapshot_dir.path().join("tasks"); let dst = temp_snapshot_dir.path().join("tasks");
fs::create_dir_all(&dst)?; fs::create_dir_all(&dst)?;
let compaction_option = if self.scheduler.experimental_no_snapshot_compaction {
CompactionOption::Disabled self.env.copy_to_path(dst.join("data.mdb"), CompactionOption::Enabled)?;
} else {
CompactionOption::Enabled
};
self.env.copy_to_path(dst.join("data.mdb"), compaction_option)?;
// 2.2 Remove the current snapshot tasks // 2.2 Remove the current snapshot tasks
// //
@@ -161,7 +173,7 @@ impl IndexScheduler {
let dst = temp_snapshot_dir.path().join("indexes").join(uuid.to_string()); let dst = temp_snapshot_dir.path().join("indexes").join(uuid.to_string());
fs::create_dir_all(&dst)?; fs::create_dir_all(&dst)?;
index index
.copy_to_path(dst.join("data.mdb"), compaction_option) .copy_to_path(dst.join("data.mdb"), CompactionOption::Enabled)
.map_err(|e| Error::from_milli(e, Some(name.to_string())))?; .map_err(|e| Error::from_milli(e, Some(name.to_string())))?;
} }
@@ -171,7 +183,7 @@ impl IndexScheduler {
progress.update_progress(SnapshotCreationProgress::SnapshotTheApiKeys); progress.update_progress(SnapshotCreationProgress::SnapshotTheApiKeys);
let dst = temp_snapshot_dir.path().join("auth"); let dst = temp_snapshot_dir.path().join("auth");
fs::create_dir_all(&dst)?; fs::create_dir_all(&dst)?;
self.scheduler.auth_env.copy_to_path(dst.join("data.mdb"), compaction_option)?; self.scheduler.auth_env.copy_to_path(dst.join("data.mdb"), CompactionOption::Enabled)?;
// 5. Copy and tarball the flat snapshot // 5. Copy and tarball the flat snapshot
progress.update_progress(SnapshotCreationProgress::CreateTheTarball); progress.update_progress(SnapshotCreationProgress::CreateTheTarball);
@@ -206,4 +218,112 @@ impl IndexScheduler {
Ok(tasks) Ok(tasks)
} }
fn process_snapshot_with_pipe(
&self,
progress: Progress,
mut tasks: Vec<Task>,
) -> Result<Vec<Task>> {
progress.update_progress(SnapshotCreationProgress::StartTheSnapshotCreation);
fs::create_dir_all(&self.scheduler.snapshots_path)?;
// 1. Find the base path and original name of the database
// TODO find a better way to get this path
let mut base_path = self.env.path().to_owned();
base_path.pop();
let base_path = base_path;
let db_name = base_path.file_name().and_then(OsStr::to_str).unwrap_or("data.ms");
// 2. Start the tarball builder. The tarball will be created on another thread from piped data.
let mut builder = compression::PipedArchiveBuilder::new(
self.scheduler.snapshots_path.clone(),
format!("{db_name}.snapshot"),
base_path,
);
// 3. Snapshot the VERSION file
builder.add_file_to_archive(self.scheduler.version_file_path.clone())?;
// 4. Snapshot the index-scheduler LMDB env
//
// When we call copy_to_path, LMDB opens a read transaction by itself,
// we can't provide our own. It is an issue as we would like to know
// the update files to copy but new ones can be enqueued between the copy
// of the env and the new transaction we open to retrieve the enqueued tasks.
// So we prefer opening a new transaction after copying the env and copy more
// update files than not enough.
//
// Note that there cannot be any update files deleted between those
// two read operations as the task processing is synchronous.
// 4.1 First copy the LMDB env of the index-scheduler
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler);
builder.add_env_to_archive(&self.env)?;
// 4.2 Create a read transaction on the index-scheduler
let rtxn = self.env.read_txn()?;
// 4.3 Only copy the update files of the enqueued tasks
progress.update_progress(SnapshotCreationProgress::SnapshotTheUpdateFiles);
builder.add_dir_to_archive(self.queue.file_store.path().to_path_buf())?;
let enqueued = self.queue.tasks.get_status(&rtxn, Status::Enqueued)?;
let (atomic, update_file_progress) = AtomicUpdateFileStep::new(enqueued.len() as u32);
progress.update_progress(update_file_progress);
for task_id in enqueued {
let task =
self.queue.tasks.get_task(&rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
if let Some(content_uuid) = task.content_uuid() {
let src = self.queue.file_store.get_update_path(content_uuid);
builder.add_file_to_archive(src)?;
}
atomic.fetch_add(1, Ordering::Relaxed);
}
// 5. Snapshot every index
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexes);
builder.add_dir_to_archive(self.index_mapper.base_path().to_path_buf())?;
let index_mapping = self.index_mapper.index_mapping;
let nb_indexes = index_mapping.len(&rtxn)? as u32;
for (i, result) in index_mapping.iter(&rtxn)?.enumerate() {
let (name, _) = result?;
progress.update_progress(VariableNameStep::<SnapshotCreationProgress>::new(
name, i as u32, nb_indexes,
));
let index = self.index_mapper.index(&rtxn, name)?;
builder.add_env_to_archive(index.raw_env())?;
}
drop(rtxn);
// 6. Snapshot the auth LMDB env
progress.update_progress(SnapshotCreationProgress::SnapshotTheApiKeys);
builder.add_env_to_archive(&self.scheduler.auth_env)?;
// 7. Finalize the tarball
progress.update_progress(SnapshotCreationProgress::CreateTheTarball);
let file = builder.finish()?;
// 8. Change the permission to make the snapshot readonly
let mut permissions = file.metadata()?.permissions();
permissions.set_readonly(true);
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
#[allow(clippy::non_octal_unix_permissions)]
// rwxrwxrwx
permissions.set_mode(0b100100100);
}
file.set_permissions(permissions)?;
for task in &mut tasks {
task.status = Status::Succeeded;
}
Ok(tasks)
}
} }