Compression: implement cancellation and change env copy method

This commit is contained in:
Louis Dureuil
2025-10-02 16:56:25 +02:00
parent d41716d8f0
commit 3870a374af

View File

@@ -1,8 +1,10 @@
use std::fs::{create_dir_all, File};
use std::io::{Read, Write};
use std::os::fd::AsRawFd;
use std::io::{PipeWriter, Read, Write};
use std::mem::ManuallyDrop;
use std::ops::DerefMut;
use std::os::fd::{AsRawFd, FromRawFd};
use std::path::{Path, PathBuf};
use std::sync::mpsc::{Receiver, Sender};
use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
use std::thread::JoinHandle;
use flate2::read::GzDecoder;
@@ -32,8 +34,10 @@ pub fn from_tar_gz(src: impl AsRef<Path>, dest: impl AsRef<Path>) -> anyhow::Res
}
pub struct PipedArchiveBuilder {
send: Sender<CompressionMessage>,
join_handle: JoinHandle<anyhow::Result<File>>,
send_compression: Sender<CompressionMessage>,
send_cancellation: Sender<CancellationMessage>,
processing_thread: JoinHandle<anyhow::Result<File>>,
cancellation_thread: JoinHandle<()>,
}
enum CompressionMessage {
@@ -43,13 +47,29 @@ enum CompressionMessage {
}
impl PipedArchiveBuilder {
pub fn new(dest_dir: PathBuf, dest_filename: String, base_path: PathBuf) -> Self {
let (send, recv) = std::sync::mpsc::channel();
let join_handle = std::thread::Builder::new()
.name("piped-archive-builer".into())
.spawn(|| Self::run(dest_dir, dest_filename, recv, base_path))
pub fn new<F>(
dest_dir: PathBuf,
dest_filename: String,
base_path: PathBuf,
must_stop_processing: F,
) -> Self
where
F: Fn() -> bool + Send + 'static,
{
let (send_compression, recv) = std::sync::mpsc::channel();
let processing_thread = std::thread::Builder::new()
.name("piped-archive-builder".into())
.spawn(|| Self::run_processing(dest_dir, dest_filename, recv, base_path))
.unwrap();
Self { send, join_handle }
let (send_cancellation, recv) = std::sync::mpsc::channel();
let cancellation_thread = std::thread::Builder::new()
.name("piped-archive-builder-cancellation".into())
.spawn(|| Self::run_cancellation(must_stop_processing, recv))
.unwrap();
Self { send_compression, send_cancellation, processing_thread, cancellation_thread }
}
pub fn add_env_to_archive<T>(&mut self, env: &Env<T>) -> anyhow::Result<()> {
@@ -59,9 +79,22 @@ impl PipedArchiveBuilder {
// as any concurrent change would corrupt the copy.
let env_wtxn = env.write_txn()?;
self.send.send(CompressionMessage::Env { path, reader });
// SAFETY: the writer end of the pipe is available for write access
unsafe { env.copy_to_fd(writer.as_raw_fd(), milli::heed::CompactionOption::Disabled)? }
// SAFETY: only the cancellation thread has the actual responsibility of closing the pipe since
// the clone is `ManuallyDrop`.
let mut cloned_writer = unsafe {
let writer_raw_fd = writer.as_raw_fd();
ManuallyDrop::new(PipeWriter::from_raw_fd(writer_raw_fd))
};
self.send_cancellation.send(CancellationMessage::OpenedPipe { pipe: writer });
self.send_compression.send(CompressionMessage::Env { path, reader });
let mdb_path = env.path().join("data.mdb");
let mut file = std::fs::File::open(&mdb_path)?;
std::io::copy(&mut file, cloned_writer.deref_mut())?;
self.send_cancellation.send(CancellationMessage::ClosingPipe);
// no change we might want to commit
env_wtxn.abort();
@@ -69,23 +102,25 @@ impl PipedArchiveBuilder {
}
pub fn add_file_to_archive(&mut self, path: PathBuf) -> anyhow::Result<()> {
self.send.send(CompressionMessage::File { path });
self.send_compression.send(CompressionMessage::File { path });
Ok(())
}
pub fn add_dir_to_archive(&mut self, path: PathBuf) -> anyhow::Result<()> {
self.send.send(CompressionMessage::Dir { path });
self.send_compression.send(CompressionMessage::Dir { path });
Ok(())
}
pub fn finish(self) -> anyhow::Result<File> {
drop(self.send);
/// FIXME catch panic
let file = self.join_handle.join().unwrap()?;
drop(self.send_cancellation);
drop(self.send_compression);
/// FIXME catch panics
let file = self.processing_thread.join().unwrap()?;
self.cancellation_thread.join().unwrap();
Ok(file)
}
fn run(
fn run_processing(
dest_dir: PathBuf,
dest_filename: String,
recv: Receiver<CompressionMessage>,
@@ -128,6 +163,30 @@ impl PipedArchiveBuilder {
Ok(archive)
}
fn run_cancellation<F>(must_stop_processing: F, recv: Receiver<CancellationMessage>)
where
F: Fn() -> bool + Send + 'static,
{
let mut current_pipe = None;
loop {
let next_message = match recv.recv_timeout(std::time::Duration::from_secs(60)) {
Ok(message) => message,
Err(RecvTimeoutError::Disconnected) => break,
Err(RecvTimeoutError::Timeout) => {
if must_stop_processing() {
break;
}
continue;
}
};
match next_message {
CancellationMessage::OpenedPipe { pipe } => current_pipe = Some(pipe),
CancellationMessage::ClosingPipe => current_pipe = None,
}
}
drop(current_pipe);
}
fn add_to_archive(
tar_encoder: &mut Builder<impl Write>,
path: &Path,
@@ -144,6 +203,11 @@ impl PipedArchiveBuilder {
}
}
enum CancellationMessage {
OpenedPipe { pipe: PipeWriter },
ClosingPipe,
}
struct PathInArchive(PathBuf);
impl PathInArchive {