Snapshot takes compression and compaction options

This commit is contained in:
Louis Dureuil
2025-04-07 08:31:39 +02:00
parent c3c5a928e4
commit 7115b29a8c
13 changed files with 267 additions and 66 deletions

View File

@@ -210,7 +210,10 @@ impl<'a> Dump<'a> {
KindDump::DumpCreation { keys, instance_uid } => {
KindWithContent::DumpCreation { keys, instance_uid }
}
KindDump::SnapshotCreation => KindWithContent::SnapshotCreation,
KindDump::SnapshotCreation => KindWithContent::SnapshotCreationWithParams {
compaction: false,
compression: true,
},
KindDump::UpgradeDatabase { from } => KindWithContent::UpgradeDatabase { from },
},
};

View File

@@ -5,9 +5,10 @@ tasks affecting a single index into a [batch](crate::batch::Batch).
The main function of the autobatcher is [`next_autobatch`].
*/
use meilisearch_types::tasks::TaskId;
use std::ops::ControlFlow::{self, Break, Continue};
use meilisearch_types::tasks::TaskId;
use crate::KindWithContent;
/// Succinctly describes a task's [`Kind`](meilisearch_types::tasks::Kind)
@@ -71,7 +72,8 @@ impl From<KindWithContent> for AutobatchKind {
| KindWithContent::TaskDeletion { .. }
| KindWithContent::DumpCreation { .. }
| KindWithContent::UpgradeDatabase { .. }
| KindWithContent::SnapshotCreation => {
| KindWithContent::SnapshotCreation
| KindWithContent::SnapshotCreationWithParams { .. } => {
panic!("The autobatcher should never be called with tasks that don't apply to an index.")
}
}

View File

@@ -23,7 +23,11 @@ pub(crate) enum Batch {
task: Task,
},
TaskDeletions(Vec<Task>),
SnapshotCreation(Vec<Task>),
SnapshotCreation {
tasks: Vec<Task>,
compression: bool,
compaction: bool,
},
Dump(Task),
IndexOperation {
op: IndexOperation,
@@ -106,7 +110,7 @@ impl Batch {
| Batch::IndexUpdate { task, .. } => {
RoaringBitmap::from_sorted_iter(std::iter::once(task.uid)).unwrap()
}
Batch::SnapshotCreation(tasks)
Batch::SnapshotCreation { tasks, .. }
| Batch::TaskDeletions(tasks)
| Batch::UpgradeDatabase { tasks }
| Batch::IndexDeletion { tasks, .. } => {
@@ -140,7 +144,7 @@ impl Batch {
match self {
TaskCancelation { .. }
| TaskDeletions(_)
| SnapshotCreation(_)
| SnapshotCreation { .. }
| Dump(_)
| UpgradeDatabase { .. }
| IndexSwap { .. } => None,
@@ -160,7 +164,7 @@ impl fmt::Display for Batch {
match self {
Batch::TaskCancelation { .. } => f.write_str("TaskCancelation")?,
Batch::TaskDeletions(_) => f.write_str("TaskDeletion")?,
Batch::SnapshotCreation(_) => f.write_str("SnapshotCreation")?,
Batch::SnapshotCreation { .. } => f.write_str("SnapshotCreation")?,
Batch::Dump(_) => f.write_str("Dump")?,
Batch::IndexOperation { op, .. } => write!(f, "{op}")?,
Batch::IndexCreation { .. } => f.write_str("IndexCreation")?,
@@ -478,7 +482,17 @@ impl IndexScheduler {
if !to_snapshot.is_empty() {
let mut tasks = self.queue.tasks.get_existing_tasks(rtxn, to_snapshot)?;
current_batch.processing(&mut tasks);
return Ok(Some((Batch::SnapshotCreation(tasks), current_batch)));
let (compaction, compression) = match &tasks.last().unwrap().kind {
KindWithContent::SnapshotCreation => (false, true),
KindWithContent::SnapshotCreationWithParams { compaction, compression } => {
(*compaction, *compression)
}
_ => unreachable!(),
};
return Ok(Some((
Batch::SnapshotCreation { compaction, compression, tasks },
current_batch,
)));
}
// 4. we batch the dumps.

View File

@@ -117,9 +117,9 @@ impl IndexScheduler {
}
Ok((tasks, None))
}
Batch::SnapshotCreation(tasks) => {
self.process_snapshot(progress, tasks).map(|tasks| (tasks, None))
}
Batch::SnapshotCreation { tasks, compression, compaction } => self
.process_snapshot(progress, tasks, compaction, compression)
.map(|tasks| (tasks, None)),
Batch::Dump(task) => {
self.process_dump_creation(progress, task).map(|tasks| (tasks, None))
}

View File

@@ -15,8 +15,13 @@ impl IndexScheduler {
&self,
progress: Progress,
mut tasks: Vec<Task>,
compaction: bool,
compression: bool,
) -> Result<Vec<Task>> {
tracing::debug!(compaction, compression, "process snapshot");
progress.update_progress(SnapshotCreationProgress::StartTheSnapshotCreation);
let compaction =
if compaction { CompactionOption::Enabled } else { CompactionOption::Disabled };
fs::create_dir_all(&self.scheduler.snapshots_path)?;
let temp_snapshot_dir = tempfile::tempdir()?;
@@ -41,7 +46,7 @@ impl IndexScheduler {
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler);
let dst = temp_snapshot_dir.path().join("tasks");
fs::create_dir_all(&dst)?;
self.env.copy_to_path(dst.join("data.mdb"), CompactionOption::Enabled)?;
self.env.copy_to_path(dst.join("data.mdb"), compaction)?;
// 2.2 Create a read transaction on the index-scheduler
let rtxn = self.env.read_txn()?;
@@ -80,7 +85,7 @@ impl IndexScheduler {
let dst = temp_snapshot_dir.path().join("indexes").join(uuid.to_string());
fs::create_dir_all(&dst)?;
index
.copy_to_path(dst.join("data.mdb"), CompactionOption::Enabled)
.copy_to_path(dst.join("data.mdb"), compaction)
.map_err(|e| Error::from_milli(e, Some(name.to_string())))?;
}
@@ -103,7 +108,7 @@ impl IndexScheduler {
// 5.2 Tarball the content of the snapshot in a tempfile with a .snapshot extension
let snapshot_path = self.scheduler.snapshots_path.join(format!("{}.snapshot", db_name));
let temp_snapshot_file = tempfile::NamedTempFile::new_in(&self.scheduler.snapshots_path)?;
compression::to_tar_gz(temp_snapshot_dir.path(), temp_snapshot_file.path())?;
compression::to_tar_gz(temp_snapshot_dir.path(), temp_snapshot_file.path(), compression)?;
let file = temp_snapshot_file.persist(snapshot_path)?;
// 5.3 Change the permission to make the snapshot readonly

View File

@@ -266,6 +266,7 @@ pub fn swap_index_uid_in_task(task: &mut Task, swap: (&str, &str)) {
| K::DumpCreation { .. }
| K::UpgradeDatabase { .. }
| K::SnapshotCreation => (),
K::SnapshotCreationWithParams { .. } => (),
};
if let Some(Details::IndexSwap { swaps }) = &mut task.details {
for IndexSwap { indexes: (lhs, rhs) } in swaps.iter_mut() {