Merge pull request #5773 from meilisearch/snapshotception

Fix snapshotCreation task being included in snapshot
This commit is contained in:
Tamo
2025-08-04 13:53:43 +00:00
committed by GitHub
4 changed files with 149 additions and 9 deletions

View File

@ -71,7 +71,7 @@ pub struct IndexMapper {
/// Path to the folder where the LMDB environments of each index are.
base_path: PathBuf,
/// The map size an index is opened with on the first time.
index_base_map_size: usize,
pub(crate) index_base_map_size: usize,
/// The quantity by which the map size of an index is incremented upon reopening, in bytes.
index_growth_amount: usize,
/// Whether we open a meilisearch index with the MDB_WRITEMAP option or not.

View File

@ -7,9 +7,73 @@ use meilisearch_types::milli::progress::{Progress, VariableNameStep};
use meilisearch_types::tasks::{Status, Task};
use meilisearch_types::{compression, VERSION_FILE_NAME};
use crate::heed::EnvOpenOptions;
use crate::processing::{AtomicUpdateFileStep, SnapshotCreationProgress};
use crate::queue::TaskQueue;
use crate::{Error, IndexScheduler, Result};
/// # Safety
///
/// See [`EnvOpenOptions::open`].
unsafe fn remove_tasks(
tasks: &[Task],
dst: &std::path::Path,
index_base_map_size: usize,
) -> Result<()> {
let env_options = EnvOpenOptions::new();
let mut env_options = env_options.read_txn_without_tls();
let env = env_options.max_dbs(TaskQueue::nb_db()).map_size(index_base_map_size).open(dst)?;
let mut wtxn = env.write_txn()?;
let task_queue = TaskQueue::new(&env, &mut wtxn)?;
// Destructuring to ensure the code below gets updated if a database gets added in the future.
let TaskQueue {
all_tasks,
status,
kind,
index_tasks: _, // snapshot creation tasks are not index tasks
canceled_by,
enqueued_at,
started_at,
finished_at,
} = task_queue;
for task in tasks {
all_tasks.delete(&mut wtxn, &task.uid)?;
let mut tasks = status.get(&wtxn, &task.status)?.unwrap_or_default();
tasks.remove(task.uid);
status.put(&mut wtxn, &task.status, &tasks)?;
let mut tasks = kind.get(&wtxn, &task.kind.as_kind())?.unwrap_or_default();
tasks.remove(task.uid);
kind.put(&mut wtxn, &task.kind.as_kind(), &tasks)?;
canceled_by.delete(&mut wtxn, &task.uid)?;
let timestamp = task.enqueued_at.unix_timestamp_nanos();
let mut tasks = enqueued_at.get(&wtxn, &timestamp)?.unwrap_or_default();
tasks.remove(task.uid);
enqueued_at.put(&mut wtxn, &timestamp, &tasks)?;
if let Some(task_started_at) = task.started_at {
let timestamp = task_started_at.unix_timestamp_nanos();
let mut tasks = started_at.get(&wtxn, &timestamp)?.unwrap_or_default();
tasks.remove(task.uid);
started_at.put(&mut wtxn, &timestamp, &tasks)?;
}
if let Some(task_finished_at) = task.finished_at {
let timestamp = task_finished_at.unix_timestamp_nanos();
let mut tasks = finished_at.get(&wtxn, &timestamp)?.unwrap_or_default();
tasks.remove(task.uid);
finished_at.put(&mut wtxn, &timestamp, &tasks)?;
}
}
wtxn.commit()?;
Ok(())
}
impl IndexScheduler {
pub(super) fn process_snapshot(
&self,
@ -48,14 +112,26 @@ impl IndexScheduler {
};
self.env.copy_to_path(dst.join("data.mdb"), compaction_option)?;
// 2.2 Create a read transaction on the index-scheduler
// 2.2 Remove the current snapshot tasks
//
// This is done to ensure that the tasks are not processed again when the snapshot is imported
//
// # Safety
//
// This is safe because we open the env file we just created in a temporary directory.
// We are sure it's not being used by any other process nor thread.
unsafe {
remove_tasks(&tasks, &dst, self.index_mapper.index_base_map_size)?;
}
// 2.3 Create a read transaction on the index-scheduler
let rtxn = self.env.read_txn()?;
// 2.3 Create the update files directory
// 2.4 Create the update files directory
let update_files_dir = temp_snapshot_dir.path().join("update_files");
fs::create_dir_all(&update_files_dir)?;
// 2.4 Only copy the update files of the enqueued tasks
// 2.5 Only copy the update files of the enqueued tasks
progress.update_progress(SnapshotCreationProgress::SnapshotTheUpdateFiles);
let enqueued = self.queue.tasks.get_status(&rtxn, Status::Enqueued)?;
let (atomic, update_file_progress) = AtomicUpdateFileStep::new(enqueued.len() as u32);

View File

@ -122,11 +122,7 @@ async fn perform_on_demand_snapshot() {
let server = Server::new_with_options(options).await.unwrap();
let index = server.index("catto");
index
.update_settings(json! ({
"searchableAttributes": [],
}))
.await;
index.update_settings(json! ({ "searchableAttributes": [] })).await;
index.load_test_set(&server).await;
@ -203,3 +199,70 @@ async fn perform_on_demand_snapshot() {
server.index("doggo").settings(),
);
}
#[actix_rt::test]
#[cfg_attr(target_os = "windows", ignore)]
async fn snapshotception_issue_4653() {
let temp = tempfile::tempdir().unwrap();
let snapshot_dir = tempfile::tempdir().unwrap();
let options =
Opt { snapshot_dir: snapshot_dir.path().to_owned(), ..default_settings(temp.path()) };
let server = Server::new_with_options(options).await.unwrap();
let (task, code) = server.create_snapshot().await;
snapshot!(code, @"202 Accepted");
snapshot!(json_string!(task, { ".enqueuedAt" => "[date]" }), @r###"
{
"taskUid": 0,
"indexUid": null,
"status": "enqueued",
"type": "snapshotCreation",
"enqueuedAt": "[date]"
}
"###);
server.wait_task(task.uid()).await.succeeded();
let temp = tempfile::tempdir().unwrap();
let snapshot_path = snapshot_dir.path().to_owned().join("db.snapshot");
let options = Opt { import_snapshot: Some(snapshot_path), ..default_settings(temp.path()) };
let snapshot_server = Server::new_with_options(options).await.unwrap();
// The snapshot should have been taken without the snapshot creation task
let (tasks, code) = snapshot_server.tasks().await;
snapshot!(code, @"200 OK");
snapshot!(tasks, @r#"
{
"results": [],
"total": 0,
"limit": 20,
"from": null,
"next": null
}
"#);
// Ensure the task is not present in the snapshot
let (task, code) = snapshot_server.get_task(0).await;
snapshot!(code, @"404 Not Found");
snapshot!(task, @r#"
{
"message": "Task `0` not found.",
"code": "task_not_found",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#task_not_found"
}
"#);
// Ensure the batch is also not present
let (batch, code) = snapshot_server.get_batch(0).await;
snapshot!(code, @"404 Not Found");
snapshot!(batch, @r#"
{
"message": "Batch `0` not found.",
"code": "batch_not_found",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#batch_not_found"
}
"#);
}

View File

@ -119,6 +119,7 @@ pub struct FacetsUpdate<'i> {
min_level_size: u8,
data_size: u64,
}
impl<'i> FacetsUpdate<'i> {
pub fn new(
index: &'i Index,