diff --git a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs index 4a7a9e074..3fa9a51c1 100644 --- a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs +++ b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs @@ -76,6 +76,22 @@ unsafe fn remove_tasks( impl IndexScheduler { pub(super) fn process_snapshot( + &self, + progress: Progress, + tasks: Vec, + ) -> Result> { + let compaction_option = if self.scheduler.experimental_no_snapshot_compaction { + CompactionOption::Disabled + } else { + CompactionOption::Enabled + }; + match compaction_option { + CompactionOption::Enabled => self.process_snapshot_with_temp(progress, tasks), + CompactionOption::Disabled => self.process_snapshot_with_pipe(progress, tasks), + } + } + + fn process_snapshot_with_temp( &self, progress: Progress, mut tasks: Vec, @@ -105,12 +121,8 @@ impl IndexScheduler { progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler); let dst = temp_snapshot_dir.path().join("tasks"); fs::create_dir_all(&dst)?; - let compaction_option = if self.scheduler.experimental_no_snapshot_compaction { - CompactionOption::Disabled - } else { - CompactionOption::Enabled - }; - self.env.copy_to_path(dst.join("data.mdb"), compaction_option)?; + + self.env.copy_to_path(dst.join("data.mdb"), CompactionOption::Enabled)?; // 2.2 Remove the current snapshot tasks // @@ -161,7 +173,7 @@ impl IndexScheduler { let dst = temp_snapshot_dir.path().join("indexes").join(uuid.to_string()); fs::create_dir_all(&dst)?; index - .copy_to_path(dst.join("data.mdb"), compaction_option) + .copy_to_path(dst.join("data.mdb"), CompactionOption::Enabled) .map_err(|e| Error::from_milli(e, Some(name.to_string())))?; } @@ -171,7 +183,7 @@ impl IndexScheduler { progress.update_progress(SnapshotCreationProgress::SnapshotTheApiKeys); let dst = temp_snapshot_dir.path().join("auth"); fs::create_dir_all(&dst)?; - self.scheduler.auth_env.copy_to_path(dst.join("data.mdb"), compaction_option)?; + self.scheduler.auth_env.copy_to_path(dst.join("data.mdb"), CompactionOption::Enabled)?; // 5. Copy and tarball the flat snapshot progress.update_progress(SnapshotCreationProgress::CreateTheTarball); @@ -206,4 +218,112 @@ impl IndexScheduler { Ok(tasks) } + + fn process_snapshot_with_pipe( + &self, + progress: Progress, + mut tasks: Vec, + ) -> Result> { + progress.update_progress(SnapshotCreationProgress::StartTheSnapshotCreation); + + fs::create_dir_all(&self.scheduler.snapshots_path)?; + + // 1. Find the base path and original name of the database + + // TODO find a better way to get this path + let mut base_path = self.env.path().to_owned(); + base_path.pop(); + let base_path = base_path; + let db_name = base_path.file_name().and_then(OsStr::to_str).unwrap_or("data.ms"); + + // 2. Start the tarball builder. The tarball will be created on another thread from piped data. + + let mut builder = compression::PipedArchiveBuilder::new( + self.scheduler.snapshots_path.clone(), + format!("{db_name}.snapshot"), + base_path, + ); + + // 3. Snapshot the VERSION file + builder.add_file_to_archive(self.scheduler.version_file_path.clone())?; + + // 4. Snapshot the index-scheduler LMDB env + // + // When we call copy_to_path, LMDB opens a read transaction by itself, + // we can't provide our own. It is an issue as we would like to know + // the update files to copy but new ones can be enqueued between the copy + // of the env and the new transaction we open to retrieve the enqueued tasks. + // So we prefer opening a new transaction after copying the env and copy more + // update files than not enough. + // + // Note that there cannot be any update files deleted between those + // two read operations as the task processing is synchronous. + + // 4.1 First copy the LMDB env of the index-scheduler + progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler); + builder.add_env_to_archive(&self.env)?; + + // 4.2 Create a read transaction on the index-scheduler + let rtxn = self.env.read_txn()?; + + // 4.3 Only copy the update files of the enqueued tasks + progress.update_progress(SnapshotCreationProgress::SnapshotTheUpdateFiles); + builder.add_dir_to_archive(self.queue.file_store.path().to_path_buf())?; + let enqueued = self.queue.tasks.get_status(&rtxn, Status::Enqueued)?; + let (atomic, update_file_progress) = AtomicUpdateFileStep::new(enqueued.len() as u32); + progress.update_progress(update_file_progress); + for task_id in enqueued { + let task = + self.queue.tasks.get_task(&rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?; + if let Some(content_uuid) = task.content_uuid() { + let src = self.queue.file_store.get_update_path(content_uuid); + builder.add_file_to_archive(src)?; + } + atomic.fetch_add(1, Ordering::Relaxed); + } + + // 5. Snapshot every index + progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexes); + builder.add_dir_to_archive(self.index_mapper.base_path().to_path_buf())?; + let index_mapping = self.index_mapper.index_mapping; + let nb_indexes = index_mapping.len(&rtxn)? as u32; + + for (i, result) in index_mapping.iter(&rtxn)?.enumerate() { + let (name, _) = result?; + progress.update_progress(VariableNameStep::::new( + name, i as u32, nb_indexes, + )); + let index = self.index_mapper.index(&rtxn, name)?; + builder.add_env_to_archive(index.raw_env())?; + } + + drop(rtxn); + + // 6. Snapshot the auth LMDB env + progress.update_progress(SnapshotCreationProgress::SnapshotTheApiKeys); + builder.add_env_to_archive(&self.scheduler.auth_env)?; + + // 7. Finalize the tarball + progress.update_progress(SnapshotCreationProgress::CreateTheTarball); + let file = builder.finish()?; + + // 8. Change the permission to make the snapshot readonly + let mut permissions = file.metadata()?.permissions(); + permissions.set_readonly(true); + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + #[allow(clippy::non_octal_unix_permissions)] + // rwxrwxrwx + permissions.set_mode(0b100100100); + } + + file.set_permissions(permissions)?; + + for task in &mut tasks { + task.status = Status::Succeeded; + } + + Ok(tasks) + } }