From 3870a374afd0e7694e53449e08e5f693b4ff0cee Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Thu, 2 Oct 2025 16:56:25 +0200 Subject: [PATCH] Compression: implement cancellation and change env copy method --- crates/meilisearch-types/src/compression.rs | 104 ++++++++++++++++---- 1 file changed, 84 insertions(+), 20 deletions(-) diff --git a/crates/meilisearch-types/src/compression.rs b/crates/meilisearch-types/src/compression.rs index de2145428..414c873f3 100644 --- a/crates/meilisearch-types/src/compression.rs +++ b/crates/meilisearch-types/src/compression.rs @@ -1,8 +1,10 @@ use std::fs::{create_dir_all, File}; -use std::io::{Read, Write}; -use std::os::fd::AsRawFd; +use std::io::{PipeWriter, Read, Write}; +use std::mem::ManuallyDrop; +use std::ops::DerefMut; +use std::os::fd::{AsRawFd, FromRawFd}; use std::path::{Path, PathBuf}; -use std::sync::mpsc::{Receiver, Sender}; +use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender}; use std::thread::JoinHandle; use flate2::read::GzDecoder; @@ -32,8 +34,10 @@ pub fn from_tar_gz(src: impl AsRef, dest: impl AsRef) -> anyhow::Res } pub struct PipedArchiveBuilder { - send: Sender, - join_handle: JoinHandle>, + send_compression: Sender, + send_cancellation: Sender, + processing_thread: JoinHandle>, + cancellation_thread: JoinHandle<()>, } enum CompressionMessage { @@ -43,13 +47,29 @@ enum CompressionMessage { } impl PipedArchiveBuilder { - pub fn new(dest_dir: PathBuf, dest_filename: String, base_path: PathBuf) -> Self { - let (send, recv) = std::sync::mpsc::channel(); - let join_handle = std::thread::Builder::new() - .name("piped-archive-builer".into()) - .spawn(|| Self::run(dest_dir, dest_filename, recv, base_path)) + pub fn new( + dest_dir: PathBuf, + dest_filename: String, + base_path: PathBuf, + must_stop_processing: F, + ) -> Self + where + F: Fn() -> bool + Send + 'static, + { + let (send_compression, recv) = std::sync::mpsc::channel(); + let processing_thread = std::thread::Builder::new() + .name("piped-archive-builder".into()) + .spawn(|| Self::run_processing(dest_dir, dest_filename, recv, base_path)) .unwrap(); - Self { send, join_handle } + + let (send_cancellation, recv) = std::sync::mpsc::channel(); + + let cancellation_thread = std::thread::Builder::new() + .name("piped-archive-builder-cancellation".into()) + .spawn(|| Self::run_cancellation(must_stop_processing, recv)) + .unwrap(); + + Self { send_compression, send_cancellation, processing_thread, cancellation_thread } } pub fn add_env_to_archive(&mut self, env: &Env) -> anyhow::Result<()> { @@ -59,9 +79,22 @@ impl PipedArchiveBuilder { // as any concurrent change would corrupt the copy. let env_wtxn = env.write_txn()?; - self.send.send(CompressionMessage::Env { path, reader }); - // SAFETY: the writer end of the pipe is available for write access - unsafe { env.copy_to_fd(writer.as_raw_fd(), milli::heed::CompactionOption::Disabled)? } + // SAFETY: only the cancellation thread has the actual responsibility of closing the pipe since + // the clone is `ManuallyDrop`. + let mut cloned_writer = unsafe { + let writer_raw_fd = writer.as_raw_fd(); + ManuallyDrop::new(PipeWriter::from_raw_fd(writer_raw_fd)) + }; + + self.send_cancellation.send(CancellationMessage::OpenedPipe { pipe: writer }); + + self.send_compression.send(CompressionMessage::Env { path, reader }); + + let mdb_path = env.path().join("data.mdb"); + let mut file = std::fs::File::open(&mdb_path)?; + std::io::copy(&mut file, cloned_writer.deref_mut())?; + + self.send_cancellation.send(CancellationMessage::ClosingPipe); // no change we might want to commit env_wtxn.abort(); @@ -69,23 +102,25 @@ impl PipedArchiveBuilder { } pub fn add_file_to_archive(&mut self, path: PathBuf) -> anyhow::Result<()> { - self.send.send(CompressionMessage::File { path }); + self.send_compression.send(CompressionMessage::File { path }); Ok(()) } pub fn add_dir_to_archive(&mut self, path: PathBuf) -> anyhow::Result<()> { - self.send.send(CompressionMessage::Dir { path }); + self.send_compression.send(CompressionMessage::Dir { path }); Ok(()) } pub fn finish(self) -> anyhow::Result { - drop(self.send); - /// FIXME catch panic - let file = self.join_handle.join().unwrap()?; + drop(self.send_cancellation); + drop(self.send_compression); + /// FIXME catch panics + let file = self.processing_thread.join().unwrap()?; + self.cancellation_thread.join().unwrap(); Ok(file) } - fn run( + fn run_processing( dest_dir: PathBuf, dest_filename: String, recv: Receiver, @@ -128,6 +163,30 @@ impl PipedArchiveBuilder { Ok(archive) } + fn run_cancellation(must_stop_processing: F, recv: Receiver) + where + F: Fn() -> bool + Send + 'static, + { + let mut current_pipe = None; + loop { + let next_message = match recv.recv_timeout(std::time::Duration::from_secs(60)) { + Ok(message) => message, + Err(RecvTimeoutError::Disconnected) => break, + Err(RecvTimeoutError::Timeout) => { + if must_stop_processing() { + break; + } + continue; + } + }; + match next_message { + CancellationMessage::OpenedPipe { pipe } => current_pipe = Some(pipe), + CancellationMessage::ClosingPipe => current_pipe = None, + } + } + drop(current_pipe); + } + fn add_to_archive( tar_encoder: &mut Builder, path: &Path, @@ -144,6 +203,11 @@ impl PipedArchiveBuilder { } } +enum CancellationMessage { + OpenedPipe { pipe: PipeWriter }, + ClosingPipe, +} + struct PathInArchive(PathBuf); impl PathInArchive {