From 6394efc4c25f8ec0ab13f880687d03e18aeb9a5a Mon Sep 17 00:00:00 2001 From: Mubelotix Date: Tue, 22 Jul 2025 15:17:26 +0200 Subject: [PATCH] Turn dirty fix into beautiful fix --- .../index-scheduler/src/index_mapper/mod.rs | 2 +- .../scheduler/process_snapshot_creation.rs | 65 +++++++++++++------ 2 files changed, 45 insertions(+), 22 deletions(-) diff --git a/crates/index-scheduler/src/index_mapper/mod.rs b/crates/index-scheduler/src/index_mapper/mod.rs index 86fb17ca7..e6bdccd41 100644 --- a/crates/index-scheduler/src/index_mapper/mod.rs +++ b/crates/index-scheduler/src/index_mapper/mod.rs @@ -71,7 +71,7 @@ pub struct IndexMapper { /// Path to the folder where the LMDB environments of each index are. base_path: PathBuf, /// The map size an index is opened with on the first time. - index_base_map_size: usize, + pub(crate) index_base_map_size: usize, /// The quantity by which the map size of an index is incremented upon reopening, in bytes. index_growth_amount: usize, /// Whether we open a meilisearch index with the MDB_WRITEMAP option or not. diff --git a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs index 431555904..b9c4329ff 100644 --- a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs +++ b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs @@ -6,11 +6,34 @@ use meilisearch_types::heed::CompactionOption; use meilisearch_types::milli::progress::{Progress, VariableNameStep}; use meilisearch_types::tasks::{Status, Task}; use meilisearch_types::{compression, VERSION_FILE_NAME}; -use roaring::RoaringBitmap; +use crate::heed::EnvOpenOptions; use crate::processing::{AtomicUpdateFileStep, SnapshotCreationProgress}; +use crate::queue::TaskQueue; use crate::{Error, IndexScheduler, Result}; +/// # Safety +/// +/// See [`EnvOpenOptions::open`]. +unsafe fn mark_tasks_as_succeeded( + tasks: &[Task], + dst: &std::path::Path, + nb_db: u32, + index_base_map_size: usize, +) -> Result<()> { + let env_options = EnvOpenOptions::new(); + let mut env_options = env_options.read_txn_without_tls(); + let env = env_options.max_dbs(nb_db).map_size(index_base_map_size).open(dst)?; + let mut wtxn = env.write_txn()?; + let task_queue = TaskQueue::new(&env, &mut wtxn)?; + for mut task in tasks.iter().cloned() { + task.status = Status::Succeeded; + task_queue.update_task(&mut wtxn, &task)?; + } + wtxn.commit()?; + Ok(()) +} + impl IndexScheduler { pub(super) fn process_snapshot( &self, @@ -39,10 +62,6 @@ impl IndexScheduler { // two read operations as the task processing is synchronous. // 2.1 First copy the LMDB env of the index-scheduler - // - // Note that just before we copy it, we set the status of the current tasks to Succeeded. - // This is because when the snapshot is loaded in the future, we don't want these tasks to rerun. - // In any case, if the snapshot can be loaded, it means that the tasks did succeed. progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler); let dst = temp_snapshot_dir.path().join("tasks"); fs::create_dir_all(&dst)?; @@ -51,29 +70,33 @@ impl IndexScheduler { } else { CompactionOption::Enabled }; - - let mut wtxn = self.env.write_txn()?; - for task in &mut tasks { - task.status = Status::Succeeded; - self.queue.tasks.update_task(&mut wtxn, task)?; - } - wtxn.commit()?; self.env.copy_to_path(dst.join("data.mdb"), compaction_option)?; - let mut wtxn = self.scheduler.auth_env.write_txn()?; - for task in &mut tasks { - task.status = Status::Enqueued; - self.queue.tasks.update_task(&mut wtxn, task)?; - } - wtxn.commit()?; - // 2.2 Create a read transaction on the index-scheduler + // 2.2 Mark the current snapshot tasks as succeeded in the newly created env + // + // This is done to ensure that the tasks are not processed again when the snapshot is imported + // + // # Safety + // + // This is safe because we open the env file we just created in a temporary directory. + // We are sure it's not being used by any other process nor thread. + unsafe { + mark_tasks_as_succeeded( + &tasks, + &dst, + Self::nb_db(), + self.index_mapper.index_base_map_size, + )?; + } + + // 2.3 Create a read transaction on the index-scheduler let rtxn = self.env.read_txn()?; - // 2.3 Create the update files directory + // 2.4 Create the update files directory let update_files_dir = temp_snapshot_dir.path().join("update_files"); fs::create_dir_all(&update_files_dir)?; - // 2.4 Only copy the update files of the enqueued tasks + // 2.5 Only copy the update files of the enqueued tasks progress.update_progress(SnapshotCreationProgress::SnapshotTheUpdateFiles); let enqueued = self.queue.tasks.get_status(&rtxn, Status::Enqueued)?; let (atomic, update_file_progress) = AtomicUpdateFileStep::new(enqueued.len() as u32);