From 2f2e42e72db0b81ad31510fc703c4d216bb424d5 Mon Sep 17 00:00:00 2001 From: Mubelotix Date: Tue, 22 Jul 2025 12:33:18 +0200 Subject: [PATCH 1/8] Add test for issue #4653 --- crates/meilisearch/tests/snapshot/mod.rs | 49 +++++++++++++++++++++--- 1 file changed, 44 insertions(+), 5 deletions(-) diff --git a/crates/meilisearch/tests/snapshot/mod.rs b/crates/meilisearch/tests/snapshot/mod.rs index 32946b06e..b4e3f152c 100644 --- a/crates/meilisearch/tests/snapshot/mod.rs +++ b/crates/meilisearch/tests/snapshot/mod.rs @@ -122,11 +122,7 @@ async fn perform_on_demand_snapshot() { let server = Server::new_with_options(options).await.unwrap(); let index = server.index("catto"); - index - .update_settings(json! ({ - "searchableAttributes": [], - })) - .await; + index.update_settings(json! ({ "searchableAttributes": [] })).await; index.load_test_set(&server).await; @@ -203,3 +199,46 @@ async fn perform_on_demand_snapshot() { server.index("doggo").settings(), ); } + +#[actix_rt::test] +#[cfg_attr(target_os = "windows", ignore)] +async fn snapshotception_issue_4653() { + let temp = tempfile::tempdir().unwrap(); + let snapshot_dir = tempfile::tempdir().unwrap(); + let options = + Opt { snapshot_dir: snapshot_dir.path().to_owned(), ..default_settings(temp.path()) }; + + let server = Server::new_with_options(options).await.unwrap(); + + let (task, code) = server.create_snapshot().await; + snapshot!(code, @"202 Accepted"); + snapshot!(json_string!(task, { ".enqueuedAt" => "[date]" }), @r###" + { + "taskUid": 0, + "indexUid": null, + "status": "enqueued", + "type": "snapshotCreation", + "enqueuedAt": "[date]" + } + "###); + let task = server.wait_task(task.uid()).await.succeeded(); + + let temp = tempfile::tempdir().unwrap(); + let snapshot_path = snapshot_dir.path().to_owned().join("db.snapshot"); + + let options = Opt { import_snapshot: Some(snapshot_path), ..default_settings(temp.path()) }; + let snapshot_server = Server::new_with_options(options).await.unwrap(); + + // The snapshot creation task should NOT be spawned => task queue is empty + let (tasks, code) = snapshot_server.tasks().await; + snapshot!(code, @"200 OK"); + snapshot!(tasks, @r#" + { + "results": [], + "total": 0, + "limit": 20, + "from": null, + "next": null + } + "#); +} From 971683438028e3ad4bdc99c0066481c9be70cb77 Mon Sep 17 00:00:00 2001 From: Mubelotix Date: Tue, 22 Jul 2025 14:31:42 +0200 Subject: [PATCH 2/8] Initial fix --- .../src/scheduler/process_snapshot_creation.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs index d58157ae3..431555904 100644 --- a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs +++ b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs @@ -6,6 +6,7 @@ use meilisearch_types::heed::CompactionOption; use meilisearch_types::milli::progress::{Progress, VariableNameStep}; use meilisearch_types::tasks::{Status, Task}; use meilisearch_types::{compression, VERSION_FILE_NAME}; +use roaring::RoaringBitmap; use crate::processing::{AtomicUpdateFileStep, SnapshotCreationProgress}; use crate::{Error, IndexScheduler, Result}; @@ -38,6 +39,10 @@ impl IndexScheduler { // two read operations as the task processing is synchronous. // 2.1 First copy the LMDB env of the index-scheduler + // + // Note that just before we copy it, we set the status of the current tasks to Succeeded. + // This is because when the snapshot is loaded in the future, we don't want these tasks to rerun. + // In any case, if the snapshot can be loaded, it means that the tasks did succeed. progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler); let dst = temp_snapshot_dir.path().join("tasks"); fs::create_dir_all(&dst)?; @@ -46,7 +51,20 @@ impl IndexScheduler { } else { CompactionOption::Enabled }; + + let mut wtxn = self.env.write_txn()?; + for task in &mut tasks { + task.status = Status::Succeeded; + self.queue.tasks.update_task(&mut wtxn, task)?; + } + wtxn.commit()?; self.env.copy_to_path(dst.join("data.mdb"), compaction_option)?; + let mut wtxn = self.scheduler.auth_env.write_txn()?; + for task in &mut tasks { + task.status = Status::Enqueued; + self.queue.tasks.update_task(&mut wtxn, task)?; + } + wtxn.commit()?; // 2.2 Create a read transaction on the index-scheduler let rtxn = self.env.read_txn()?; From 6394efc4c25f8ec0ab13f880687d03e18aeb9a5a Mon Sep 17 00:00:00 2001 From: Mubelotix Date: Tue, 22 Jul 2025 15:17:26 +0200 Subject: [PATCH 3/8] Turn dirty fix into beautiful fix --- .../index-scheduler/src/index_mapper/mod.rs | 2 +- .../scheduler/process_snapshot_creation.rs | 65 +++++++++++++------ 2 files changed, 45 insertions(+), 22 deletions(-) diff --git a/crates/index-scheduler/src/index_mapper/mod.rs b/crates/index-scheduler/src/index_mapper/mod.rs index 86fb17ca7..e6bdccd41 100644 --- a/crates/index-scheduler/src/index_mapper/mod.rs +++ b/crates/index-scheduler/src/index_mapper/mod.rs @@ -71,7 +71,7 @@ pub struct IndexMapper { /// Path to the folder where the LMDB environments of each index are. base_path: PathBuf, /// The map size an index is opened with on the first time. - index_base_map_size: usize, + pub(crate) index_base_map_size: usize, /// The quantity by which the map size of an index is incremented upon reopening, in bytes. index_growth_amount: usize, /// Whether we open a meilisearch index with the MDB_WRITEMAP option or not. diff --git a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs index 431555904..b9c4329ff 100644 --- a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs +++ b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs @@ -6,11 +6,34 @@ use meilisearch_types::heed::CompactionOption; use meilisearch_types::milli::progress::{Progress, VariableNameStep}; use meilisearch_types::tasks::{Status, Task}; use meilisearch_types::{compression, VERSION_FILE_NAME}; -use roaring::RoaringBitmap; +use crate::heed::EnvOpenOptions; use crate::processing::{AtomicUpdateFileStep, SnapshotCreationProgress}; +use crate::queue::TaskQueue; use crate::{Error, IndexScheduler, Result}; +/// # Safety +/// +/// See [`EnvOpenOptions::open`]. +unsafe fn mark_tasks_as_succeeded( + tasks: &[Task], + dst: &std::path::Path, + nb_db: u32, + index_base_map_size: usize, +) -> Result<()> { + let env_options = EnvOpenOptions::new(); + let mut env_options = env_options.read_txn_without_tls(); + let env = env_options.max_dbs(nb_db).map_size(index_base_map_size).open(dst)?; + let mut wtxn = env.write_txn()?; + let task_queue = TaskQueue::new(&env, &mut wtxn)?; + for mut task in tasks.iter().cloned() { + task.status = Status::Succeeded; + task_queue.update_task(&mut wtxn, &task)?; + } + wtxn.commit()?; + Ok(()) +} + impl IndexScheduler { pub(super) fn process_snapshot( &self, @@ -39,10 +62,6 @@ impl IndexScheduler { // two read operations as the task processing is synchronous. // 2.1 First copy the LMDB env of the index-scheduler - // - // Note that just before we copy it, we set the status of the current tasks to Succeeded. - // This is because when the snapshot is loaded in the future, we don't want these tasks to rerun. - // In any case, if the snapshot can be loaded, it means that the tasks did succeed. progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler); let dst = temp_snapshot_dir.path().join("tasks"); fs::create_dir_all(&dst)?; @@ -51,29 +70,33 @@ impl IndexScheduler { } else { CompactionOption::Enabled }; - - let mut wtxn = self.env.write_txn()?; - for task in &mut tasks { - task.status = Status::Succeeded; - self.queue.tasks.update_task(&mut wtxn, task)?; - } - wtxn.commit()?; self.env.copy_to_path(dst.join("data.mdb"), compaction_option)?; - let mut wtxn = self.scheduler.auth_env.write_txn()?; - for task in &mut tasks { - task.status = Status::Enqueued; - self.queue.tasks.update_task(&mut wtxn, task)?; - } - wtxn.commit()?; - // 2.2 Create a read transaction on the index-scheduler + // 2.2 Mark the current snapshot tasks as succeeded in the newly created env + // + // This is done to ensure that the tasks are not processed again when the snapshot is imported + // + // # Safety + // + // This is safe because we open the env file we just created in a temporary directory. + // We are sure it's not being used by any other process nor thread. + unsafe { + mark_tasks_as_succeeded( + &tasks, + &dst, + Self::nb_db(), + self.index_mapper.index_base_map_size, + )?; + } + + // 2.3 Create a read transaction on the index-scheduler let rtxn = self.env.read_txn()?; - // 2.3 Create the update files directory + // 2.4 Create the update files directory let update_files_dir = temp_snapshot_dir.path().join("update_files"); fs::create_dir_all(&update_files_dir)?; - // 2.4 Only copy the update files of the enqueued tasks + // 2.5 Only copy the update files of the enqueued tasks progress.update_progress(SnapshotCreationProgress::SnapshotTheUpdateFiles); let enqueued = self.queue.tasks.get_status(&rtxn, Status::Enqueued)?; let (atomic, update_file_progress) = AtomicUpdateFileStep::new(enqueued.len() as u32); From c1aa4120ac63d319f28de56e98ff686762acbc1c Mon Sep 17 00:00:00 2001 From: Mubelotix Date: Tue, 22 Jul 2025 15:18:13 +0200 Subject: [PATCH 4/8] Update test --- crates/meilisearch/tests/snapshot/mod.rs | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/crates/meilisearch/tests/snapshot/mod.rs b/crates/meilisearch/tests/snapshot/mod.rs index b4e3f152c..08623f869 100644 --- a/crates/meilisearch/tests/snapshot/mod.rs +++ b/crates/meilisearch/tests/snapshot/mod.rs @@ -221,7 +221,7 @@ async fn snapshotception_issue_4653() { "enqueuedAt": "[date]" } "###); - let task = server.wait_task(task.uid()).await.succeeded(); + server.wait_task(task.uid()).await.succeeded(); let temp = tempfile::tempdir().unwrap(); let snapshot_path = snapshot_dir.path().to_owned().join("db.snapshot"); @@ -229,16 +229,22 @@ async fn snapshotception_issue_4653() { let options = Opt { import_snapshot: Some(snapshot_path), ..default_settings(temp.path()) }; let snapshot_server = Server::new_with_options(options).await.unwrap(); - // The snapshot creation task should NOT be spawned => task queue is empty - let (tasks, code) = snapshot_server.tasks().await; + // The snapshot creation task should NOT be spawned again => task is succeeded + let (task, code) = snapshot_server.get_task(task.uid()).await; snapshot!(code, @"200 OK"); - snapshot!(tasks, @r#" + snapshot!(json_string!(task, { ".enqueuedAt" => "[date]" }), @r#" { - "results": [], - "total": 0, - "limit": 20, - "from": null, - "next": null + "uid": 0, + "batchUid": 0, + "indexUid": null, + "status": "succeeded", + "type": "snapshotCreation", + "canceledBy": null, + "error": null, + "duration": null, + "enqueuedAt": "[date]", + "startedAt": null, + "finishedAt": null } "#); } From 846d27354bd69a967d00e3e5d4af5ed3324453df Mon Sep 17 00:00:00 2001 From: Mubelotix Date: Tue, 22 Jul 2025 15:18:21 +0200 Subject: [PATCH 5/8] Format --- crates/meilisearch/tests/snapshot/mod.rs | 2 +- crates/milli/src/update/facet/mod.rs | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/meilisearch/tests/snapshot/mod.rs b/crates/meilisearch/tests/snapshot/mod.rs index 08623f869..a3c78bf28 100644 --- a/crates/meilisearch/tests/snapshot/mod.rs +++ b/crates/meilisearch/tests/snapshot/mod.rs @@ -228,7 +228,7 @@ async fn snapshotception_issue_4653() { let options = Opt { import_snapshot: Some(snapshot_path), ..default_settings(temp.path()) }; let snapshot_server = Server::new_with_options(options).await.unwrap(); - + // The snapshot creation task should NOT be spawned again => task is succeeded let (task, code) = snapshot_server.get_task(task.uid()).await; snapshot!(code, @"200 OK"); diff --git a/crates/milli/src/update/facet/mod.rs b/crates/milli/src/update/facet/mod.rs index c40916670..71596530e 100644 --- a/crates/milli/src/update/facet/mod.rs +++ b/crates/milli/src/update/facet/mod.rs @@ -119,6 +119,7 @@ pub struct FacetsUpdate<'i> { min_level_size: u8, data_size: u64, } + impl<'i> FacetsUpdate<'i> { pub fn new( index: &'i Index, From 5dcf79233e8dc6ceb7a3f7004a4a0c0e3d2c428c Mon Sep 17 00:00:00 2001 From: Mubelotix Date: Wed, 23 Jul 2025 11:30:39 +0200 Subject: [PATCH 6/8] Remove useless parameter Co-Authored-By: Tamo --- .../src/scheduler/process_snapshot_creation.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs index b9c4329ff..0859974cc 100644 --- a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs +++ b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs @@ -18,12 +18,11 @@ use crate::{Error, IndexScheduler, Result}; unsafe fn mark_tasks_as_succeeded( tasks: &[Task], dst: &std::path::Path, - nb_db: u32, index_base_map_size: usize, ) -> Result<()> { let env_options = EnvOpenOptions::new(); let mut env_options = env_options.read_txn_without_tls(); - let env = env_options.max_dbs(nb_db).map_size(index_base_map_size).open(dst)?; + let env = env_options.max_dbs(TaskQueue::nb_db()).map_size(index_base_map_size).open(dst)?; let mut wtxn = env.write_txn()?; let task_queue = TaskQueue::new(&env, &mut wtxn)?; for mut task in tasks.iter().cloned() { @@ -84,7 +83,6 @@ impl IndexScheduler { mark_tasks_as_succeeded( &tasks, &dst, - Self::nb_db(), self.index_mapper.index_base_map_size, )?; } From 44b24652d272dadb676c1422cc8ab13a606b0052 Mon Sep 17 00:00:00 2001 From: Mubelotix Date: Wed, 23 Jul 2025 14:30:25 +0200 Subject: [PATCH 7/8] Change strategy to remove task instead of marking it succeeded --- .../scheduler/process_snapshot_creation.rs | 53 ++++++++++++++++--- crates/meilisearch/tests/snapshot/mod.rs | 46 +++++++++++----- 2 files changed, 77 insertions(+), 22 deletions(-) diff --git a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs index 0859974cc..3b46d0359 100644 --- a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs +++ b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs @@ -25,9 +25,50 @@ unsafe fn mark_tasks_as_succeeded( let env = env_options.max_dbs(TaskQueue::nb_db()).map_size(index_base_map_size).open(dst)?; let mut wtxn = env.write_txn()?; let task_queue = TaskQueue::new(&env, &mut wtxn)?; - for mut task in tasks.iter().cloned() { - task.status = Status::Succeeded; - task_queue.update_task(&mut wtxn, &task)?; + + // Destructuring to ensure the code below gets updated if a database gets added in the future. + let TaskQueue { + all_tasks, + status, + kind, + index_tasks: _, // snapshot creation tasks are not index tasks + canceled_by, + enqueued_at, + started_at, + finished_at, + } = task_queue; + + for task in tasks { + all_tasks.delete(&mut wtxn, &task.uid)?; + + let mut tasks = status.get(&wtxn, &task.status)?.unwrap_or_default(); + tasks.remove(task.uid); + status.put(&mut wtxn, &task.status, &tasks)?; + + let mut tasks = kind.get(&wtxn, &task.kind.as_kind())?.unwrap_or_default(); + tasks.remove(task.uid); + kind.put(&mut wtxn, &task.kind.as_kind(), &tasks)?; + + canceled_by.delete(&mut wtxn, &task.uid)?; + + let timestamp = task.enqueued_at.unix_timestamp_nanos(); + let mut tasks = enqueued_at.get(&wtxn, ×tamp)?.unwrap_or_default(); + tasks.remove(task.uid); + enqueued_at.put(&mut wtxn, ×tamp, &tasks)?; + + if let Some(task_started_at) = task.started_at { + let timestamp = task_started_at.unix_timestamp_nanos(); + let mut tasks = started_at.get(&wtxn, ×tamp)?.unwrap_or_default(); + tasks.remove(task.uid); + started_at.put(&mut wtxn, ×tamp, &tasks)?; + } + + if let Some(task_finished_at) = task.finished_at { + let timestamp = task_finished_at.unix_timestamp_nanos(); + let mut tasks = finished_at.get(&wtxn, ×tamp)?.unwrap_or_default(); + tasks.remove(task.uid); + finished_at.put(&mut wtxn, ×tamp, &tasks)?; + } } wtxn.commit()?; Ok(()) @@ -80,11 +121,7 @@ impl IndexScheduler { // This is safe because we open the env file we just created in a temporary directory. // We are sure it's not being used by any other process nor thread. unsafe { - mark_tasks_as_succeeded( - &tasks, - &dst, - self.index_mapper.index_base_map_size, - )?; + mark_tasks_as_succeeded(&tasks, &dst, self.index_mapper.index_base_map_size)?; } // 2.3 Create a read transaction on the index-scheduler diff --git a/crates/meilisearch/tests/snapshot/mod.rs b/crates/meilisearch/tests/snapshot/mod.rs index a3c78bf28..98ce17b80 100644 --- a/crates/meilisearch/tests/snapshot/mod.rs +++ b/crates/meilisearch/tests/snapshot/mod.rs @@ -229,22 +229,40 @@ async fn snapshotception_issue_4653() { let options = Opt { import_snapshot: Some(snapshot_path), ..default_settings(temp.path()) }; let snapshot_server = Server::new_with_options(options).await.unwrap(); - // The snapshot creation task should NOT be spawned again => task is succeeded - let (task, code) = snapshot_server.get_task(task.uid()).await; + // The snapshot should have been taken without the snapshot creation task + let (tasks, code) = snapshot_server.tasks().await; snapshot!(code, @"200 OK"); - snapshot!(json_string!(task, { ".enqueuedAt" => "[date]" }), @r#" + snapshot!(tasks, @r#" { - "uid": 0, - "batchUid": 0, - "indexUid": null, - "status": "succeeded", - "type": "snapshotCreation", - "canceledBy": null, - "error": null, - "duration": null, - "enqueuedAt": "[date]", - "startedAt": null, - "finishedAt": null + "results": [], + "total": 0, + "limit": 20, + "from": null, + "next": null + } + "#); + + // Ensure the task is not present in the snapshot + let (task, code) = snapshot_server.get_task(0).await; + snapshot!(code, @"404 Not Found"); + snapshot!(task, @r#" + { + "message": "Task `0` not found.", + "code": "task_not_found", + "type": "invalid_request", + "link": "https://docs.meilisearch.com/errors#task_not_found" + } + "#); + + // Ensure the batch is also not present + let (batch, code) = snapshot_server.get_batch(0).await; + snapshot!(code, @"404 Not Found"); + snapshot!(batch, @r#" + { + "message": "Batch `0` not found.", + "code": "batch_not_found", + "type": "invalid_request", + "link": "https://docs.meilisearch.com/errors#batch_not_found" } "#); } From 1f18f0ba77e6abb9224e506a8d823815b151a2ea Mon Sep 17 00:00:00 2001 From: Mubelotix Date: Wed, 23 Jul 2025 14:33:58 +0200 Subject: [PATCH 8/8] Update little tiny comments --- .../src/scheduler/process_snapshot_creation.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs index 3b46d0359..4a7a9e074 100644 --- a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs +++ b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs @@ -15,7 +15,7 @@ use crate::{Error, IndexScheduler, Result}; /// # Safety /// /// See [`EnvOpenOptions::open`]. -unsafe fn mark_tasks_as_succeeded( +unsafe fn remove_tasks( tasks: &[Task], dst: &std::path::Path, index_base_map_size: usize, @@ -112,7 +112,7 @@ impl IndexScheduler { }; self.env.copy_to_path(dst.join("data.mdb"), compaction_option)?; - // 2.2 Mark the current snapshot tasks as succeeded in the newly created env + // 2.2 Remove the current snapshot tasks // // This is done to ensure that the tasks are not processed again when the snapshot is imported // @@ -121,7 +121,7 @@ impl IndexScheduler { // This is safe because we open the env file we just created in a temporary directory. // We are sure it's not being used by any other process nor thread. unsafe { - mark_tasks_as_succeeded(&tasks, &dst, self.index_mapper.index_base_map_size)?; + remove_tasks(&tasks, &dst, self.index_mapper.index_base_map_size)?; } // 2.3 Create a read transaction on the index-scheduler