process: add cancelling points in process snapshot

This commit is contained in:
Louis Dureuil
2025-10-02 17:16:05 +02:00
parent 3870a374af
commit c885171029

View File

@@ -4,6 +4,7 @@ use std::sync::atomic::Ordering;
use meilisearch_types::heed::CompactionOption;
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
use meilisearch_types::milli::InternalError;
use meilisearch_types::tasks::{Status, Task};
use meilisearch_types::{compression, VERSION_FILE_NAME};
@@ -225,6 +226,8 @@ impl IndexScheduler {
mut tasks: Vec<Task>,
) -> Result<Vec<Task>> {
progress.update_progress(SnapshotCreationProgress::StartTheSnapshotCreation);
let must_stop_processing = &self.scheduler.must_stop_processing;
let abort_no_index = Err(Error::from_milli(InternalError::AbortedIndexation.into(), None));
fs::create_dir_all(&self.scheduler.snapshots_path)?;
@@ -242,10 +245,14 @@ impl IndexScheduler {
self.scheduler.snapshots_path.clone(),
format!("{db_name}.snapshot"),
base_path,
must_stop_processing.as_lambda(),
);
// 3. Snapshot the VERSION file
builder.add_file_to_archive(self.scheduler.version_file_path.clone())?;
if must_stop_processing.get() {
return abort_no_index;
}
// 4. Snapshot the index-scheduler LMDB env
//
@@ -262,6 +269,9 @@ impl IndexScheduler {
// 4.1 First copy the LMDB env of the index-scheduler
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler);
builder.add_env_to_archive(&self.env)?;
if must_stop_processing.get() {
return abort_no_index;
}
// 4.2 Create a read transaction on the index-scheduler
let rtxn = self.env.read_txn()?;
@@ -273,6 +283,9 @@ impl IndexScheduler {
let (atomic, update_file_progress) = AtomicUpdateFileStep::new(enqueued.len() as u32);
progress.update_progress(update_file_progress);
for task_id in enqueued {
if must_stop_processing.get() {
return abort_no_index;
}
let task =
self.queue.tasks.get_task(&rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
if let Some(content_uuid) = task.content_uuid() {
@@ -290,6 +303,17 @@ impl IndexScheduler {
for (i, result) in index_mapping.iter(&rtxn)?.enumerate() {
let (name, _) = result?;
let abort_index = || {
Err(Error::from_milli(
InternalError::AbortedIndexation.into(),
Some(name.to_string()), // defer the `to_string`
))
};
if must_stop_processing.get() {
return abort_index();
}
progress.update_progress(VariableNameStep::<SnapshotCreationProgress>::new(
name, i as u32, nb_indexes,
));
@@ -299,6 +323,10 @@ impl IndexScheduler {
drop(rtxn);
if must_stop_processing.get() {
return abort_no_index;
}
// 6. Snapshot the auth LMDB env
progress.update_progress(SnapshotCreationProgress::SnapshotTheApiKeys);
builder.add_env_to_archive(&self.scheduler.auth_env)?;