mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-10-10 13:46:28 +00:00
Greatly simplify implementation by eschewing the pipe
This commit is contained in:
@@ -244,8 +244,7 @@ impl IndexScheduler {
|
|||||||
let mut builder = compression::PipedArchiveBuilder::new(
|
let mut builder = compression::PipedArchiveBuilder::new(
|
||||||
self.scheduler.snapshots_path.clone(),
|
self.scheduler.snapshots_path.clone(),
|
||||||
base_path.clone(),
|
base_path.clone(),
|
||||||
must_stop_processing.as_lambda(),
|
)?;
|
||||||
);
|
|
||||||
|
|
||||||
// 3. Snapshot the VERSION file
|
// 3. Snapshot the VERSION file
|
||||||
builder.add_file_to_archive(self.scheduler.version_file_path.clone())?;
|
builder.add_file_to_archive(self.scheduler.version_file_path.clone())?;
|
||||||
|
@@ -1,17 +1,12 @@
|
|||||||
use std::fs::{create_dir_all, File};
|
use std::fs::{create_dir_all, File};
|
||||||
use std::io::{PipeWriter, Read, Write};
|
use std::io::Write;
|
||||||
use std::mem::ManuallyDrop;
|
|
||||||
use std::ops::DerefMut;
|
|
||||||
use std::os::fd::{AsRawFd, FromRawFd};
|
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
|
|
||||||
use std::thread::JoinHandle;
|
|
||||||
|
|
||||||
use flate2::read::GzDecoder;
|
use flate2::read::GzDecoder;
|
||||||
use flate2::write::GzEncoder;
|
use flate2::write::GzEncoder;
|
||||||
use flate2::Compression;
|
use flate2::Compression;
|
||||||
use milli::heed::Env;
|
use milli::heed::Env;
|
||||||
use tar::{Archive, Builder, Header};
|
use tar::{Archive, Builder};
|
||||||
use tempfile::NamedTempFile;
|
use tempfile::NamedTempFile;
|
||||||
|
|
||||||
pub fn to_tar_gz(src: impl AsRef<Path>, dest: impl AsRef<Path>) -> anyhow::Result<()> {
|
pub fn to_tar_gz(src: impl AsRef<Path>, dest: impl AsRef<Path>) -> anyhow::Result<()> {
|
||||||
@@ -35,37 +30,20 @@ pub fn from_tar_gz(src: impl AsRef<Path>, dest: impl AsRef<Path>) -> anyhow::Res
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub struct PipedArchiveBuilder {
|
pub struct PipedArchiveBuilder {
|
||||||
send_compression: Sender<CompressionMessage>,
|
base_path: PathBuf,
|
||||||
send_cancellation: Sender<CancellationMessage>,
|
tar_encoder: tar::Builder<GzEncoder<NamedTempFile>>,
|
||||||
processing_thread: JoinHandle<anyhow::Result<NamedTempFile>>,
|
|
||||||
cancellation_thread: JoinHandle<()>,
|
|
||||||
}
|
|
||||||
|
|
||||||
enum CompressionMessage {
|
|
||||||
Env { path: PathBuf, reader: std::io::PipeReader },
|
|
||||||
File { path: PathBuf },
|
|
||||||
Dir { path: PathBuf },
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PipedArchiveBuilder {
|
impl PipedArchiveBuilder {
|
||||||
pub fn new<F>(dest_dir: PathBuf, base_path: PathBuf, must_stop_processing: F) -> Self
|
pub fn new(dest_dir: PathBuf, base_path: PathBuf) -> anyhow::Result<Self> {
|
||||||
where
|
let temp_archive = tempfile::NamedTempFile::new_in(&dest_dir)?;
|
||||||
F: Fn() -> bool + Send + 'static,
|
|
||||||
{
|
|
||||||
let (send_compression, recv) = std::sync::mpsc::channel();
|
|
||||||
let processing_thread = std::thread::Builder::new()
|
|
||||||
.name("piped-archive-builder".into())
|
|
||||||
.spawn(|| Self::run_processing(dest_dir, recv, base_path))
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let (send_cancellation, recv) = std::sync::mpsc::channel();
|
let gz_encoder = GzEncoder::new(temp_archive, Compression::default());
|
||||||
|
let mut tar_encoder = Builder::new(gz_encoder);
|
||||||
|
let base_path_in_archive = PathInArchive::from_absolute_and_base(&base_path, &base_path);
|
||||||
|
tar_encoder.append_dir(base_path_in_archive.as_path(), &base_path)?;
|
||||||
|
|
||||||
let cancellation_thread = std::thread::Builder::new()
|
Ok(Self { base_path, tar_encoder })
|
||||||
.name("piped-archive-builder-cancellation".into())
|
|
||||||
.spawn(|| Self::run_cancellation(must_stop_processing, recv))
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
Self { send_compression, send_cancellation, processing_thread, cancellation_thread }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Add a heed environment to the archive.
|
/// Add a heed environment to the archive.
|
||||||
@@ -80,29 +58,19 @@ impl PipedArchiveBuilder {
|
|||||||
/// - If the cancellation thread panicked or otherwise dropped its receiver.
|
/// - If the cancellation thread panicked or otherwise dropped its receiver.
|
||||||
/// - If the processing thread panicked or otherwise dropped its receiver.
|
/// - If the processing thread panicked or otherwise dropped its receiver.
|
||||||
pub fn add_env_to_archive<T>(&mut self, env: &Env<T>) -> anyhow::Result<()> {
|
pub fn add_env_to_archive<T>(&mut self, env: &Env<T>) -> anyhow::Result<()> {
|
||||||
let (reader, writer) = std::io::pipe()?;
|
|
||||||
let path = env.path().to_path_buf();
|
let path = env.path().to_path_buf();
|
||||||
// make sure that the environment cannot change while it is being added to the archive,
|
// make sure that the environment cannot change while it is being added to the archive,
|
||||||
// as any concurrent change would corrupt the copy.
|
// as any concurrent change would corrupt the copy.
|
||||||
let env_wtxn = env.write_txn()?;
|
let env_wtxn = env.write_txn()?;
|
||||||
|
|
||||||
// SAFETY: only the cancellation thread has the actual responsibility of closing the pipe since
|
let dir_path_in_archive = PathInArchive::from_absolute_and_base(&path, &self.base_path);
|
||||||
// the clone is `ManuallyDrop`.
|
|
||||||
let mut cloned_writer = unsafe {
|
|
||||||
let writer_raw_fd = writer.as_raw_fd();
|
|
||||||
ManuallyDrop::new(PipeWriter::from_raw_fd(writer_raw_fd))
|
|
||||||
};
|
|
||||||
|
|
||||||
self.send_cancellation.send(CancellationMessage::OpenedPipe { pipe: writer });
|
self.tar_encoder.append_dir(dir_path_in_archive.as_path(), &path)?;
|
||||||
|
|
||||||
self.send_compression.send(CompressionMessage::Env { path, reader });
|
let path = path.join("data.mdb");
|
||||||
|
let path_in_archive = PathInArchive::from_absolute_and_base(&path, &self.base_path);
|
||||||
|
|
||||||
let mdb_path = env.path().join("data.mdb");
|
self.tar_encoder.append_path_with_name(&path, path_in_archive.as_path())?;
|
||||||
let mut file = std::fs::File::open(&mdb_path)?;
|
|
||||||
let mut file = std::io::BufReader::with_capacity(16 * 4096, &mut file);
|
|
||||||
std::io::copy(&mut file, cloned_writer.deref_mut())?;
|
|
||||||
|
|
||||||
self.send_cancellation.send(CancellationMessage::ClosingPipe);
|
|
||||||
|
|
||||||
// no change we might want to commit
|
// no change we might want to commit
|
||||||
env_wtxn.abort();
|
env_wtxn.abort();
|
||||||
@@ -115,7 +83,8 @@ impl PipedArchiveBuilder {
|
|||||||
///
|
///
|
||||||
/// - If the processing thread panicked or otherwise dropped its receiver.
|
/// - If the processing thread panicked or otherwise dropped its receiver.
|
||||||
pub fn add_file_to_archive(&mut self, path: PathBuf) -> anyhow::Result<()> {
|
pub fn add_file_to_archive(&mut self, path: PathBuf) -> anyhow::Result<()> {
|
||||||
self.send_compression.send(CompressionMessage::File { path });
|
let path_in_archive = PathInArchive::from_absolute_and_base(&path, &self.base_path);
|
||||||
|
self.tar_encoder.append_path_with_name(&path, path_in_archive.as_path())?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -125,7 +94,9 @@ impl PipedArchiveBuilder {
|
|||||||
///
|
///
|
||||||
/// - If the processing thread panicked or otherwise dropped its receiver.
|
/// - If the processing thread panicked or otherwise dropped its receiver.
|
||||||
pub fn add_dir_to_archive(&mut self, path: PathBuf) -> anyhow::Result<()> {
|
pub fn add_dir_to_archive(&mut self, path: PathBuf) -> anyhow::Result<()> {
|
||||||
self.send_compression.send(CompressionMessage::Dir { path });
|
let path_in_archive = PathInArchive::from_absolute_and_base(&path, &self.base_path);
|
||||||
|
|
||||||
|
self.tar_encoder.append_dir(path_in_archive.as_path(), &path)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -139,99 +110,13 @@ impl PipedArchiveBuilder {
|
|||||||
/// - If the cancellation thread panicked.
|
/// - If the cancellation thread panicked.
|
||||||
/// - If the processing thread panicked or otherwise terminated in error.
|
/// - If the processing thread panicked or otherwise terminated in error.
|
||||||
pub fn finish(self, dest_path: &Path) -> anyhow::Result<File> {
|
pub fn finish(self, dest_path: &Path) -> anyhow::Result<File> {
|
||||||
drop(self.send_cancellation);
|
let gz_encoder = self.tar_encoder.into_inner()?;
|
||||||
drop(self.send_compression);
|
let mut temp_archive = gz_encoder.finish()?;
|
||||||
/// FIXME catch panics
|
temp_archive.flush()?;
|
||||||
let temp_archive = self.processing_thread.join().unwrap()?;
|
|
||||||
self.cancellation_thread.join().unwrap();
|
|
||||||
let archive = temp_archive.persist(dest_path)?;
|
let archive = temp_archive.persist(dest_path)?;
|
||||||
Ok(archive)
|
Ok(archive)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn run_processing(
|
|
||||||
dest_dir: PathBuf,
|
|
||||||
recv: Receiver<CompressionMessage>,
|
|
||||||
base_path: PathBuf,
|
|
||||||
) -> anyhow::Result<NamedTempFile> {
|
|
||||||
let mut temp_archive = tempfile::NamedTempFile::new_in(&dest_dir)?;
|
|
||||||
|
|
||||||
let gz_encoder = GzEncoder::new(&mut temp_archive, Compression::default());
|
|
||||||
let mut tar_encoder = Builder::new(gz_encoder);
|
|
||||||
let base_path_in_archive = PathInArchive::from_absolute_and_base(&base_path, &base_path);
|
|
||||||
// add the root
|
|
||||||
tar_encoder.append_dir(base_path_in_archive.as_path(), &base_path)?;
|
|
||||||
while let Ok(message) = recv.recv() {
|
|
||||||
match message {
|
|
||||||
CompressionMessage::Env { path, reader } => {
|
|
||||||
let dir_path_in_archive =
|
|
||||||
PathInArchive::from_absolute_and_base(&path, &base_path);
|
|
||||||
|
|
||||||
tar_encoder.append_dir(dir_path_in_archive.as_path(), &path)?;
|
|
||||||
|
|
||||||
let path = path.join("data.mdb");
|
|
||||||
Self::add_to_archive(&mut tar_encoder, &path, &base_path, reader)?;
|
|
||||||
}
|
|
||||||
CompressionMessage::File { path } => {
|
|
||||||
let path_in_archive = PathInArchive::from_absolute_and_base(&path, &base_path);
|
|
||||||
tar_encoder.append_path_with_name(&path, path_in_archive.as_path())?;
|
|
||||||
}
|
|
||||||
CompressionMessage::Dir { path } => {
|
|
||||||
let path_in_archive = PathInArchive::from_absolute_and_base(&path, &base_path);
|
|
||||||
|
|
||||||
tar_encoder.append_dir(path_in_archive.as_path(), &path)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let gz_encoder = tar_encoder.into_inner()?;
|
|
||||||
gz_encoder.finish()?;
|
|
||||||
temp_archive.flush()?;
|
|
||||||
Ok(temp_archive)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn run_cancellation<F>(must_stop_processing: F, recv: Receiver<CancellationMessage>)
|
|
||||||
where
|
|
||||||
F: Fn() -> bool + Send + 'static,
|
|
||||||
{
|
|
||||||
let mut current_pipe = None;
|
|
||||||
loop {
|
|
||||||
let next_message = match recv.recv_timeout(std::time::Duration::from_secs(60)) {
|
|
||||||
Ok(message) => message,
|
|
||||||
Err(RecvTimeoutError::Disconnected) => break,
|
|
||||||
Err(RecvTimeoutError::Timeout) => {
|
|
||||||
if must_stop_processing() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
match next_message {
|
|
||||||
CancellationMessage::OpenedPipe { pipe } => current_pipe = Some(pipe),
|
|
||||||
CancellationMessage::ClosingPipe => current_pipe = None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
drop(current_pipe);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn add_to_archive(
|
|
||||||
tar_encoder: &mut Builder<impl Write>,
|
|
||||||
path: &Path,
|
|
||||||
base: &Path,
|
|
||||||
reader: impl Read,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
let stats = path.metadata()?;
|
|
||||||
let mut header = Header::new_gnu();
|
|
||||||
header.set_metadata_in_mode(&stats, tar::HeaderMode::Complete);
|
|
||||||
let path_in_archive = PathInArchive::from_absolute_and_base(path, base);
|
|
||||||
|
|
||||||
tar_encoder.append_data(&mut header, path_in_archive.as_path(), reader)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
enum CancellationMessage {
|
|
||||||
OpenedPipe { pipe: PipeWriter },
|
|
||||||
ClosingPipe,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
struct PathInArchive(PathBuf);
|
struct PathInArchive(PathBuf);
|
||||||
|
Reference in New Issue
Block a user