Turn dirty fix into beautiful fix

This commit is contained in:
Mubelotix
2025-07-22 15:17:26 +02:00
parent 9716834380
commit 6394efc4c2
2 changed files with 45 additions and 22 deletions

View File

@ -71,7 +71,7 @@ pub struct IndexMapper {
/// Path to the folder where the LMDB environments of each index are. /// Path to the folder where the LMDB environments of each index are.
base_path: PathBuf, base_path: PathBuf,
/// The map size an index is opened with on the first time. /// The map size an index is opened with on the first time.
index_base_map_size: usize, pub(crate) index_base_map_size: usize,
/// The quantity by which the map size of an index is incremented upon reopening, in bytes. /// The quantity by which the map size of an index is incremented upon reopening, in bytes.
index_growth_amount: usize, index_growth_amount: usize,
/// Whether we open a meilisearch index with the MDB_WRITEMAP option or not. /// Whether we open a meilisearch index with the MDB_WRITEMAP option or not.

View File

@ -6,11 +6,34 @@ use meilisearch_types::heed::CompactionOption;
use meilisearch_types::milli::progress::{Progress, VariableNameStep}; use meilisearch_types::milli::progress::{Progress, VariableNameStep};
use meilisearch_types::tasks::{Status, Task}; use meilisearch_types::tasks::{Status, Task};
use meilisearch_types::{compression, VERSION_FILE_NAME}; use meilisearch_types::{compression, VERSION_FILE_NAME};
use roaring::RoaringBitmap;
use crate::heed::EnvOpenOptions;
use crate::processing::{AtomicUpdateFileStep, SnapshotCreationProgress}; use crate::processing::{AtomicUpdateFileStep, SnapshotCreationProgress};
use crate::queue::TaskQueue;
use crate::{Error, IndexScheduler, Result}; use crate::{Error, IndexScheduler, Result};
/// # Safety
///
/// See [`EnvOpenOptions::open`].
unsafe fn mark_tasks_as_succeeded(
tasks: &[Task],
dst: &std::path::Path,
nb_db: u32,
index_base_map_size: usize,
) -> Result<()> {
let env_options = EnvOpenOptions::new();
let mut env_options = env_options.read_txn_without_tls();
let env = env_options.max_dbs(nb_db).map_size(index_base_map_size).open(dst)?;
let mut wtxn = env.write_txn()?;
let task_queue = TaskQueue::new(&env, &mut wtxn)?;
for mut task in tasks.iter().cloned() {
task.status = Status::Succeeded;
task_queue.update_task(&mut wtxn, &task)?;
}
wtxn.commit()?;
Ok(())
}
impl IndexScheduler { impl IndexScheduler {
pub(super) fn process_snapshot( pub(super) fn process_snapshot(
&self, &self,
@ -39,10 +62,6 @@ impl IndexScheduler {
// two read operations as the task processing is synchronous. // two read operations as the task processing is synchronous.
// 2.1 First copy the LMDB env of the index-scheduler // 2.1 First copy the LMDB env of the index-scheduler
//
// Note that just before we copy it, we set the status of the current tasks to Succeeded.
// This is because when the snapshot is loaded in the future, we don't want these tasks to rerun.
// In any case, if the snapshot can be loaded, it means that the tasks did succeed.
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler); progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler);
let dst = temp_snapshot_dir.path().join("tasks"); let dst = temp_snapshot_dir.path().join("tasks");
fs::create_dir_all(&dst)?; fs::create_dir_all(&dst)?;
@ -51,29 +70,33 @@ impl IndexScheduler {
} else { } else {
CompactionOption::Enabled CompactionOption::Enabled
}; };
let mut wtxn = self.env.write_txn()?;
for task in &mut tasks {
task.status = Status::Succeeded;
self.queue.tasks.update_task(&mut wtxn, task)?;
}
wtxn.commit()?;
self.env.copy_to_path(dst.join("data.mdb"), compaction_option)?; self.env.copy_to_path(dst.join("data.mdb"), compaction_option)?;
let mut wtxn = self.scheduler.auth_env.write_txn()?;
for task in &mut tasks {
task.status = Status::Enqueued;
self.queue.tasks.update_task(&mut wtxn, task)?;
}
wtxn.commit()?;
// 2.2 Create a read transaction on the index-scheduler // 2.2 Mark the current snapshot tasks as succeeded in the newly created env
//
// This is done to ensure that the tasks are not processed again when the snapshot is imported
//
// # Safety
//
// This is safe because we open the env file we just created in a temporary directory.
// We are sure it's not being used by any other process nor thread.
unsafe {
mark_tasks_as_succeeded(
&tasks,
&dst,
Self::nb_db(),
self.index_mapper.index_base_map_size,
)?;
}
// 2.3 Create a read transaction on the index-scheduler
let rtxn = self.env.read_txn()?; let rtxn = self.env.read_txn()?;
// 2.3 Create the update files directory // 2.4 Create the update files directory
let update_files_dir = temp_snapshot_dir.path().join("update_files"); let update_files_dir = temp_snapshot_dir.path().join("update_files");
fs::create_dir_all(&update_files_dir)?; fs::create_dir_all(&update_files_dir)?;
// 2.4 Only copy the update files of the enqueued tasks // 2.5 Only copy the update files of the enqueued tasks
progress.update_progress(SnapshotCreationProgress::SnapshotTheUpdateFiles); progress.update_progress(SnapshotCreationProgress::SnapshotTheUpdateFiles);
let enqueued = self.queue.tasks.get_status(&rtxn, Status::Enqueued)?; let enqueued = self.queue.tasks.get_status(&rtxn, Status::Enqueued)?;
let (atomic, update_file_progress) = AtomicUpdateFileStep::new(enqueued.len() as u32); let (atomic, update_file_progress) = AtomicUpdateFileStep::new(enqueued.len() as u32);