mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-10-10 05:36:35 +00:00
Only persist the archive in finish
This commit is contained in:
@@ -243,8 +243,7 @@ impl IndexScheduler {
|
||||
|
||||
let mut builder = compression::PipedArchiveBuilder::new(
|
||||
self.scheduler.snapshots_path.clone(),
|
||||
format!("{db_name}.snapshot"),
|
||||
base_path,
|
||||
base_path.clone(),
|
||||
must_stop_processing.as_lambda(),
|
||||
);
|
||||
|
||||
@@ -333,7 +332,8 @@ impl IndexScheduler {
|
||||
|
||||
// 7. Finalize the tarball
|
||||
progress.update_progress(SnapshotCreationProgress::CreateTheTarball);
|
||||
let file = builder.finish()?;
|
||||
let file =
|
||||
builder.finish(&self.scheduler.snapshots_path.join(format!("{db_name}.snapshot")))?;
|
||||
|
||||
// 8. Change the permission to make the snapshot readonly
|
||||
let mut permissions = file.metadata()?.permissions();
|
||||
|
@@ -12,6 +12,7 @@ use flate2::write::GzEncoder;
|
||||
use flate2::Compression;
|
||||
use milli::heed::Env;
|
||||
use tar::{Archive, Builder, Header};
|
||||
use tempfile::NamedTempFile;
|
||||
|
||||
pub fn to_tar_gz(src: impl AsRef<Path>, dest: impl AsRef<Path>) -> anyhow::Result<()> {
|
||||
let mut f = File::create(dest)?;
|
||||
@@ -36,7 +37,7 @@ pub fn from_tar_gz(src: impl AsRef<Path>, dest: impl AsRef<Path>) -> anyhow::Res
|
||||
pub struct PipedArchiveBuilder {
|
||||
send_compression: Sender<CompressionMessage>,
|
||||
send_cancellation: Sender<CancellationMessage>,
|
||||
processing_thread: JoinHandle<anyhow::Result<File>>,
|
||||
processing_thread: JoinHandle<anyhow::Result<NamedTempFile>>,
|
||||
cancellation_thread: JoinHandle<()>,
|
||||
}
|
||||
|
||||
@@ -47,19 +48,14 @@ enum CompressionMessage {
|
||||
}
|
||||
|
||||
impl PipedArchiveBuilder {
|
||||
pub fn new<F>(
|
||||
dest_dir: PathBuf,
|
||||
dest_filename: String,
|
||||
base_path: PathBuf,
|
||||
must_stop_processing: F,
|
||||
) -> Self
|
||||
pub fn new<F>(dest_dir: PathBuf, base_path: PathBuf, must_stop_processing: F) -> Self
|
||||
where
|
||||
F: Fn() -> bool + Send + 'static,
|
||||
{
|
||||
let (send_compression, recv) = std::sync::mpsc::channel();
|
||||
let processing_thread = std::thread::Builder::new()
|
||||
.name("piped-archive-builder".into())
|
||||
.spawn(|| Self::run_processing(dest_dir, dest_filename, recv, base_path))
|
||||
.spawn(|| Self::run_processing(dest_dir, recv, base_path))
|
||||
.unwrap();
|
||||
|
||||
let (send_cancellation, recv) = std::sync::mpsc::channel();
|
||||
@@ -72,6 +68,17 @@ impl PipedArchiveBuilder {
|
||||
Self { send_compression, send_cancellation, processing_thread, cancellation_thread }
|
||||
}
|
||||
|
||||
/// Add a heed environment to the archive.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// - Errors originating with that thread:
|
||||
/// - Heed errors, if taking a write transaction fails
|
||||
/// - If the copy of the environment fails.
|
||||
/// - If there is an I/O error opening the database at the environment's path.
|
||||
/// - Errors originating with another thread:
|
||||
/// - If the cancellation thread panicked or otherwise dropped its receiver.
|
||||
/// - If the processing thread panicked or otherwise dropped its receiver.
|
||||
pub fn add_env_to_archive<T>(&mut self, env: &Env<T>) -> anyhow::Result<()> {
|
||||
let (reader, writer) = std::io::pipe()?;
|
||||
let path = env.path().to_path_buf();
|
||||
@@ -101,31 +108,50 @@ impl PipedArchiveBuilder {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Add a file to the archive
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// - If the processing thread panicked or otherwise dropped its receiver.
|
||||
pub fn add_file_to_archive(&mut self, path: PathBuf) -> anyhow::Result<()> {
|
||||
self.send_compression.send(CompressionMessage::File { path });
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Add a directory name (**without its contents**) to the archive.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// - If the processing thread panicked or otherwise dropped its receiver.
|
||||
pub fn add_dir_to_archive(&mut self, path: PathBuf) -> anyhow::Result<()> {
|
||||
self.send_compression.send(CompressionMessage::Dir { path });
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn finish(self) -> anyhow::Result<File> {
|
||||
/// Finalize the archive and persists it to disk.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// - Originating with the current thread:
|
||||
/// - If persisting the archive fails
|
||||
/// - Originating with another thread:
|
||||
/// - If the cancellation thread panicked.
|
||||
/// - If the processing thread panicked or otherwise terminated in error.
|
||||
pub fn finish(self, dest_path: &Path) -> anyhow::Result<File> {
|
||||
drop(self.send_cancellation);
|
||||
drop(self.send_compression);
|
||||
/// FIXME catch panics
|
||||
let file = self.processing_thread.join().unwrap()?;
|
||||
let temp_archive = self.processing_thread.join().unwrap()?;
|
||||
self.cancellation_thread.join().unwrap();
|
||||
Ok(file)
|
||||
let archive = temp_archive.persist(dest_path)?;
|
||||
Ok(archive)
|
||||
}
|
||||
|
||||
fn run_processing(
|
||||
dest_dir: PathBuf,
|
||||
dest_filename: String,
|
||||
recv: Receiver<CompressionMessage>,
|
||||
base_path: PathBuf,
|
||||
) -> anyhow::Result<File> {
|
||||
) -> anyhow::Result<NamedTempFile> {
|
||||
let mut temp_archive = tempfile::NamedTempFile::new_in(&dest_dir)?;
|
||||
|
||||
let gz_encoder = GzEncoder::new(&mut temp_archive, Compression::default());
|
||||
@@ -159,8 +185,7 @@ impl PipedArchiveBuilder {
|
||||
let gz_encoder = tar_encoder.into_inner()?;
|
||||
gz_encoder.finish()?;
|
||||
temp_archive.flush()?;
|
||||
let archive = temp_archive.persist(dest_dir.join(dest_filename))?;
|
||||
Ok(archive)
|
||||
Ok(temp_archive)
|
||||
}
|
||||
|
||||
fn run_cancellation<F>(must_stop_processing: F, recv: Receiver<CancellationMessage>)
|
||||
|
Reference in New Issue
Block a user