From 4492bbb5f92baf08f80be1b7557a406a3dee8ac0 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Mon, 6 Oct 2025 21:36:54 +0200 Subject: [PATCH] Greatly simplify implementation by eschewing the pipe --- .../scheduler/process_snapshot_creation.rs | 3 +- crates/meilisearch-types/src/compression.rs | 165 +++--------------- 2 files changed, 26 insertions(+), 142 deletions(-) diff --git a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs index 2f62f988b..9259f8d16 100644 --- a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs +++ b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs @@ -244,8 +244,7 @@ impl IndexScheduler { let mut builder = compression::PipedArchiveBuilder::new( self.scheduler.snapshots_path.clone(), base_path.clone(), - must_stop_processing.as_lambda(), - ); + )?; // 3. Snapshot the VERSION file builder.add_file_to_archive(self.scheduler.version_file_path.clone())?; diff --git a/crates/meilisearch-types/src/compression.rs b/crates/meilisearch-types/src/compression.rs index 6c8177a27..40de31579 100644 --- a/crates/meilisearch-types/src/compression.rs +++ b/crates/meilisearch-types/src/compression.rs @@ -1,17 +1,12 @@ use std::fs::{create_dir_all, File}; -use std::io::{PipeWriter, Read, Write}; -use std::mem::ManuallyDrop; -use std::ops::DerefMut; -use std::os::fd::{AsRawFd, FromRawFd}; +use std::io::Write; use std::path::{Path, PathBuf}; -use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender}; -use std::thread::JoinHandle; use flate2::read::GzDecoder; use flate2::write::GzEncoder; use flate2::Compression; use milli::heed::Env; -use tar::{Archive, Builder, Header}; +use tar::{Archive, Builder}; use tempfile::NamedTempFile; pub fn to_tar_gz(src: impl AsRef, dest: impl AsRef) -> anyhow::Result<()> { @@ -35,37 +30,20 @@ pub fn from_tar_gz(src: impl AsRef, dest: impl AsRef) -> anyhow::Res } pub struct PipedArchiveBuilder { - send_compression: Sender, - send_cancellation: Sender, - processing_thread: JoinHandle>, - cancellation_thread: JoinHandle<()>, -} - -enum CompressionMessage { - Env { path: PathBuf, reader: std::io::PipeReader }, - File { path: PathBuf }, - Dir { path: PathBuf }, + base_path: PathBuf, + tar_encoder: tar::Builder>, } impl PipedArchiveBuilder { - pub fn new(dest_dir: PathBuf, base_path: PathBuf, must_stop_processing: F) -> Self - where - F: Fn() -> bool + Send + 'static, - { - let (send_compression, recv) = std::sync::mpsc::channel(); - let processing_thread = std::thread::Builder::new() - .name("piped-archive-builder".into()) - .spawn(|| Self::run_processing(dest_dir, recv, base_path)) - .unwrap(); + pub fn new(dest_dir: PathBuf, base_path: PathBuf) -> anyhow::Result { + let temp_archive = tempfile::NamedTempFile::new_in(&dest_dir)?; - let (send_cancellation, recv) = std::sync::mpsc::channel(); + let gz_encoder = GzEncoder::new(temp_archive, Compression::default()); + let mut tar_encoder = Builder::new(gz_encoder); + let base_path_in_archive = PathInArchive::from_absolute_and_base(&base_path, &base_path); + tar_encoder.append_dir(base_path_in_archive.as_path(), &base_path)?; - let cancellation_thread = std::thread::Builder::new() - .name("piped-archive-builder-cancellation".into()) - .spawn(|| Self::run_cancellation(must_stop_processing, recv)) - .unwrap(); - - Self { send_compression, send_cancellation, processing_thread, cancellation_thread } + Ok(Self { base_path, tar_encoder }) } /// Add a heed environment to the archive. @@ -80,29 +58,19 @@ impl PipedArchiveBuilder { /// - If the cancellation thread panicked or otherwise dropped its receiver. /// - If the processing thread panicked or otherwise dropped its receiver. pub fn add_env_to_archive(&mut self, env: &Env) -> anyhow::Result<()> { - let (reader, writer) = std::io::pipe()?; let path = env.path().to_path_buf(); // make sure that the environment cannot change while it is being added to the archive, // as any concurrent change would corrupt the copy. let env_wtxn = env.write_txn()?; - // SAFETY: only the cancellation thread has the actual responsibility of closing the pipe since - // the clone is `ManuallyDrop`. - let mut cloned_writer = unsafe { - let writer_raw_fd = writer.as_raw_fd(); - ManuallyDrop::new(PipeWriter::from_raw_fd(writer_raw_fd)) - }; + let dir_path_in_archive = PathInArchive::from_absolute_and_base(&path, &self.base_path); - self.send_cancellation.send(CancellationMessage::OpenedPipe { pipe: writer }); + self.tar_encoder.append_dir(dir_path_in_archive.as_path(), &path)?; - self.send_compression.send(CompressionMessage::Env { path, reader }); + let path = path.join("data.mdb"); + let path_in_archive = PathInArchive::from_absolute_and_base(&path, &self.base_path); - let mdb_path = env.path().join("data.mdb"); - let mut file = std::fs::File::open(&mdb_path)?; - let mut file = std::io::BufReader::with_capacity(16 * 4096, &mut file); - std::io::copy(&mut file, cloned_writer.deref_mut())?; - - self.send_cancellation.send(CancellationMessage::ClosingPipe); + self.tar_encoder.append_path_with_name(&path, path_in_archive.as_path())?; // no change we might want to commit env_wtxn.abort(); @@ -115,7 +83,8 @@ impl PipedArchiveBuilder { /// /// - If the processing thread panicked or otherwise dropped its receiver. pub fn add_file_to_archive(&mut self, path: PathBuf) -> anyhow::Result<()> { - self.send_compression.send(CompressionMessage::File { path }); + let path_in_archive = PathInArchive::from_absolute_and_base(&path, &self.base_path); + self.tar_encoder.append_path_with_name(&path, path_in_archive.as_path())?; Ok(()) } @@ -125,7 +94,9 @@ impl PipedArchiveBuilder { /// /// - If the processing thread panicked or otherwise dropped its receiver. pub fn add_dir_to_archive(&mut self, path: PathBuf) -> anyhow::Result<()> { - self.send_compression.send(CompressionMessage::Dir { path }); + let path_in_archive = PathInArchive::from_absolute_and_base(&path, &self.base_path); + + self.tar_encoder.append_dir(path_in_archive.as_path(), &path)?; Ok(()) } @@ -139,99 +110,13 @@ impl PipedArchiveBuilder { /// - If the cancellation thread panicked. /// - If the processing thread panicked or otherwise terminated in error. pub fn finish(self, dest_path: &Path) -> anyhow::Result { - drop(self.send_cancellation); - drop(self.send_compression); - /// FIXME catch panics - let temp_archive = self.processing_thread.join().unwrap()?; - self.cancellation_thread.join().unwrap(); + let gz_encoder = self.tar_encoder.into_inner()?; + let mut temp_archive = gz_encoder.finish()?; + temp_archive.flush()?; + let archive = temp_archive.persist(dest_path)?; Ok(archive) } - - fn run_processing( - dest_dir: PathBuf, - recv: Receiver, - base_path: PathBuf, - ) -> anyhow::Result { - let mut temp_archive = tempfile::NamedTempFile::new_in(&dest_dir)?; - - let gz_encoder = GzEncoder::new(&mut temp_archive, Compression::default()); - let mut tar_encoder = Builder::new(gz_encoder); - let base_path_in_archive = PathInArchive::from_absolute_and_base(&base_path, &base_path); - // add the root - tar_encoder.append_dir(base_path_in_archive.as_path(), &base_path)?; - while let Ok(message) = recv.recv() { - match message { - CompressionMessage::Env { path, reader } => { - let dir_path_in_archive = - PathInArchive::from_absolute_and_base(&path, &base_path); - - tar_encoder.append_dir(dir_path_in_archive.as_path(), &path)?; - - let path = path.join("data.mdb"); - Self::add_to_archive(&mut tar_encoder, &path, &base_path, reader)?; - } - CompressionMessage::File { path } => { - let path_in_archive = PathInArchive::from_absolute_and_base(&path, &base_path); - tar_encoder.append_path_with_name(&path, path_in_archive.as_path())?; - } - CompressionMessage::Dir { path } => { - let path_in_archive = PathInArchive::from_absolute_and_base(&path, &base_path); - - tar_encoder.append_dir(path_in_archive.as_path(), &path)?; - } - } - } - - let gz_encoder = tar_encoder.into_inner()?; - gz_encoder.finish()?; - temp_archive.flush()?; - Ok(temp_archive) - } - - fn run_cancellation(must_stop_processing: F, recv: Receiver) - where - F: Fn() -> bool + Send + 'static, - { - let mut current_pipe = None; - loop { - let next_message = match recv.recv_timeout(std::time::Duration::from_secs(60)) { - Ok(message) => message, - Err(RecvTimeoutError::Disconnected) => break, - Err(RecvTimeoutError::Timeout) => { - if must_stop_processing() { - break; - } - continue; - } - }; - match next_message { - CancellationMessage::OpenedPipe { pipe } => current_pipe = Some(pipe), - CancellationMessage::ClosingPipe => current_pipe = None, - } - } - drop(current_pipe); - } - - fn add_to_archive( - tar_encoder: &mut Builder, - path: &Path, - base: &Path, - reader: impl Read, - ) -> anyhow::Result<()> { - let stats = path.metadata()?; - let mut header = Header::new_gnu(); - header.set_metadata_in_mode(&stats, tar::HeaderMode::Complete); - let path_in_archive = PathInArchive::from_absolute_and_base(path, base); - - tar_encoder.append_data(&mut header, path_in_archive.as_path(), reader)?; - Ok(()) - } -} - -enum CancellationMessage { - OpenedPipe { pipe: PipeWriter }, - ClosingPipe, } struct PathInArchive(PathBuf);