diff --git a/crates/index-scheduler/src/index_mapper/mod.rs b/crates/index-scheduler/src/index_mapper/mod.rs index 86fb17ca7..e6bdccd41 100644 --- a/crates/index-scheduler/src/index_mapper/mod.rs +++ b/crates/index-scheduler/src/index_mapper/mod.rs @@ -71,7 +71,7 @@ pub struct IndexMapper { /// Path to the folder where the LMDB environments of each index are. base_path: PathBuf, /// The map size an index is opened with on the first time. - index_base_map_size: usize, + pub(crate) index_base_map_size: usize, /// The quantity by which the map size of an index is incremented upon reopening, in bytes. index_growth_amount: usize, /// Whether we open a meilisearch index with the MDB_WRITEMAP option or not. diff --git a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs index d58157ae3..4a7a9e074 100644 --- a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs +++ b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs @@ -7,9 +7,73 @@ use meilisearch_types::milli::progress::{Progress, VariableNameStep}; use meilisearch_types::tasks::{Status, Task}; use meilisearch_types::{compression, VERSION_FILE_NAME}; +use crate::heed::EnvOpenOptions; use crate::processing::{AtomicUpdateFileStep, SnapshotCreationProgress}; +use crate::queue::TaskQueue; use crate::{Error, IndexScheduler, Result}; +/// # Safety +/// +/// See [`EnvOpenOptions::open`]. +unsafe fn remove_tasks( + tasks: &[Task], + dst: &std::path::Path, + index_base_map_size: usize, +) -> Result<()> { + let env_options = EnvOpenOptions::new(); + let mut env_options = env_options.read_txn_without_tls(); + let env = env_options.max_dbs(TaskQueue::nb_db()).map_size(index_base_map_size).open(dst)?; + let mut wtxn = env.write_txn()?; + let task_queue = TaskQueue::new(&env, &mut wtxn)?; + + // Destructuring to ensure the code below gets updated if a database gets added in the future. + let TaskQueue { + all_tasks, + status, + kind, + index_tasks: _, // snapshot creation tasks are not index tasks + canceled_by, + enqueued_at, + started_at, + finished_at, + } = task_queue; + + for task in tasks { + all_tasks.delete(&mut wtxn, &task.uid)?; + + let mut tasks = status.get(&wtxn, &task.status)?.unwrap_or_default(); + tasks.remove(task.uid); + status.put(&mut wtxn, &task.status, &tasks)?; + + let mut tasks = kind.get(&wtxn, &task.kind.as_kind())?.unwrap_or_default(); + tasks.remove(task.uid); + kind.put(&mut wtxn, &task.kind.as_kind(), &tasks)?; + + canceled_by.delete(&mut wtxn, &task.uid)?; + + let timestamp = task.enqueued_at.unix_timestamp_nanos(); + let mut tasks = enqueued_at.get(&wtxn, ×tamp)?.unwrap_or_default(); + tasks.remove(task.uid); + enqueued_at.put(&mut wtxn, ×tamp, &tasks)?; + + if let Some(task_started_at) = task.started_at { + let timestamp = task_started_at.unix_timestamp_nanos(); + let mut tasks = started_at.get(&wtxn, ×tamp)?.unwrap_or_default(); + tasks.remove(task.uid); + started_at.put(&mut wtxn, ×tamp, &tasks)?; + } + + if let Some(task_finished_at) = task.finished_at { + let timestamp = task_finished_at.unix_timestamp_nanos(); + let mut tasks = finished_at.get(&wtxn, ×tamp)?.unwrap_or_default(); + tasks.remove(task.uid); + finished_at.put(&mut wtxn, ×tamp, &tasks)?; + } + } + wtxn.commit()?; + Ok(()) +} + impl IndexScheduler { pub(super) fn process_snapshot( &self, @@ -48,14 +112,26 @@ impl IndexScheduler { }; self.env.copy_to_path(dst.join("data.mdb"), compaction_option)?; - // 2.2 Create a read transaction on the index-scheduler + // 2.2 Remove the current snapshot tasks + // + // This is done to ensure that the tasks are not processed again when the snapshot is imported + // + // # Safety + // + // This is safe because we open the env file we just created in a temporary directory. + // We are sure it's not being used by any other process nor thread. + unsafe { + remove_tasks(&tasks, &dst, self.index_mapper.index_base_map_size)?; + } + + // 2.3 Create a read transaction on the index-scheduler let rtxn = self.env.read_txn()?; - // 2.3 Create the update files directory + // 2.4 Create the update files directory let update_files_dir = temp_snapshot_dir.path().join("update_files"); fs::create_dir_all(&update_files_dir)?; - // 2.4 Only copy the update files of the enqueued tasks + // 2.5 Only copy the update files of the enqueued tasks progress.update_progress(SnapshotCreationProgress::SnapshotTheUpdateFiles); let enqueued = self.queue.tasks.get_status(&rtxn, Status::Enqueued)?; let (atomic, update_file_progress) = AtomicUpdateFileStep::new(enqueued.len() as u32); diff --git a/crates/meilisearch/tests/snapshot/mod.rs b/crates/meilisearch/tests/snapshot/mod.rs index 32946b06e..98ce17b80 100644 --- a/crates/meilisearch/tests/snapshot/mod.rs +++ b/crates/meilisearch/tests/snapshot/mod.rs @@ -122,11 +122,7 @@ async fn perform_on_demand_snapshot() { let server = Server::new_with_options(options).await.unwrap(); let index = server.index("catto"); - index - .update_settings(json! ({ - "searchableAttributes": [], - })) - .await; + index.update_settings(json! ({ "searchableAttributes": [] })).await; index.load_test_set(&server).await; @@ -203,3 +199,70 @@ async fn perform_on_demand_snapshot() { server.index("doggo").settings(), ); } + +#[actix_rt::test] +#[cfg_attr(target_os = "windows", ignore)] +async fn snapshotception_issue_4653() { + let temp = tempfile::tempdir().unwrap(); + let snapshot_dir = tempfile::tempdir().unwrap(); + let options = + Opt { snapshot_dir: snapshot_dir.path().to_owned(), ..default_settings(temp.path()) }; + + let server = Server::new_with_options(options).await.unwrap(); + + let (task, code) = server.create_snapshot().await; + snapshot!(code, @"202 Accepted"); + snapshot!(json_string!(task, { ".enqueuedAt" => "[date]" }), @r###" + { + "taskUid": 0, + "indexUid": null, + "status": "enqueued", + "type": "snapshotCreation", + "enqueuedAt": "[date]" + } + "###); + server.wait_task(task.uid()).await.succeeded(); + + let temp = tempfile::tempdir().unwrap(); + let snapshot_path = snapshot_dir.path().to_owned().join("db.snapshot"); + + let options = Opt { import_snapshot: Some(snapshot_path), ..default_settings(temp.path()) }; + let snapshot_server = Server::new_with_options(options).await.unwrap(); + + // The snapshot should have been taken without the snapshot creation task + let (tasks, code) = snapshot_server.tasks().await; + snapshot!(code, @"200 OK"); + snapshot!(tasks, @r#" + { + "results": [], + "total": 0, + "limit": 20, + "from": null, + "next": null + } + "#); + + // Ensure the task is not present in the snapshot + let (task, code) = snapshot_server.get_task(0).await; + snapshot!(code, @"404 Not Found"); + snapshot!(task, @r#" + { + "message": "Task `0` not found.", + "code": "task_not_found", + "type": "invalid_request", + "link": "https://docs.meilisearch.com/errors#task_not_found" + } + "#); + + // Ensure the batch is also not present + let (batch, code) = snapshot_server.get_batch(0).await; + snapshot!(code, @"404 Not Found"); + snapshot!(batch, @r#" + { + "message": "Batch `0` not found.", + "code": "batch_not_found", + "type": "invalid_request", + "link": "https://docs.meilisearch.com/errors#batch_not_found" + } + "#); +} diff --git a/crates/milli/src/update/facet/mod.rs b/crates/milli/src/update/facet/mod.rs index c40916670..71596530e 100644 --- a/crates/milli/src/update/facet/mod.rs +++ b/crates/milli/src/update/facet/mod.rs @@ -119,6 +119,7 @@ pub struct FacetsUpdate<'i> { min_level_size: u8, data_size: u64, } + impl<'i> FacetsUpdate<'i> { pub fn new( index: &'i Index,