Move the tarball streaming into a dedicated function

This commit is contained in:
Kerollmops
2025-11-06 11:11:26 +01:00
parent 32b997d817
commit 375b5600cd

View File

@@ -4,8 +4,6 @@ use std::sync::atomic::Ordering;
use meilisearch_types::heed::CompactionOption;
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
#[cfg(unix)]
use meilisearch_types::milli::update::S3SnapshotOptions;
use meilisearch_types::tasks::{Status, Task};
use meilisearch_types::{compression, VERSION_FILE_NAME};
@@ -235,12 +233,10 @@ impl IndexScheduler {
pub(super) async fn process_snapshot_to_s3(
&self,
progress: Progress,
opts: S3SnapshotOptions,
opts: meilisearch_types::milli::update::S3SnapshotOptions,
mut tasks: Vec<Task>,
) -> Result<Vec<Task>> {
use std::fs::File;
use std::io::{Seek as _, SeekFrom, Write as _};
use std::path::Path;
use meilisearch_types::milli::update::S3SnapshotOptions;
let S3SnapshotOptions {
s3_bucket_url,
@@ -282,100 +278,7 @@ impl IndexScheduler {
let index_scheduler = IndexScheduler::private_clone(self);
let builder_task = tokio::task::spawn_blocking(move || {
let writer = flate2::write::GzEncoder::new(writer, flate2::Compression::new(level));
let mut tarball = tar::Builder::new(writer);
// 1. Snapshot the version file
tarball.append_path_with_name(
&index_scheduler.scheduler.version_file_path,
VERSION_FILE_NAME,
)?;
// 2. Snapshot the index scheduler LMDB env
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler);
let mut tasks_env_file = index_scheduler.env.try_clone_inner_file()?;
// Note: A previous snapshot operation may have left the cursor
// at the end of the file so we need to seek to the start.
tasks_env_file.seek(SeekFrom::Start(0))?;
let path = Path::new("tasks").join("data.mdb");
tarball.append_file(path, &mut tasks_env_file)?;
drop(tasks_env_file);
// 2.3 Create a read transaction on the index-scheduler
let rtxn = index_scheduler.env.read_txn()?;
// 2.4 Create the update files directory
// And only copy the update files of the enqueued tasks
progress.update_progress(SnapshotCreationProgress::SnapshotTheUpdateFiles);
let enqueued = index_scheduler.queue.tasks.get_status(&rtxn, Status::Enqueued)?;
let (atomic, update_file_progress) = AtomicUpdateFileStep::new(enqueued.len() as u32);
progress.update_progress(update_file_progress);
// TODO I need to create the update files directory (even if empty)
// I should probably simply use the append_dir_all method
// but I'll loose the progression.
let update_files_dir = Path::new("update_files");
for task_id in enqueued {
let task = index_scheduler
.queue
.tasks
.get_task(&rtxn, task_id)?
.ok_or(Error::CorruptedTaskQueue)?;
if let Some(content_uuid) = task.content_uuid() {
let src = index_scheduler.queue.file_store.update_path(content_uuid);
let mut update_file = File::open(src)?;
let path = update_files_dir.join(content_uuid.to_string());
tarball.append_file(path, &mut update_file)?;
}
atomic.fetch_add(1, Ordering::Relaxed);
}
// 3. Snapshot every indexes
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexes);
let index_mapping = index_scheduler.index_mapper.index_mapping;
let nb_indexes = index_mapping.len(&rtxn)? as u32;
let indexes_dir = Path::new("indexes");
let indexes_references: Vec<_> = index_scheduler
.index_mapper
.index_mapping
.iter(&rtxn)?
.map(|res| res.map_err(Error::from).map(|(name, uuid)| (name.to_string(), uuid)))
.collect::<Result<_, Error>>()?;
// It's prettier to use a for loop instead of the IndexMapper::try_for_each_index
// method, especially when we need to access the UUID, local path and index number.
for (i, (name, uuid)) in indexes_references.into_iter().enumerate() {
progress.update_progress(VariableNameStep::<SnapshotCreationProgress>::new(
&name, i as u32, nb_indexes,
));
let path = indexes_dir.join(uuid.to_string()).join("data.mdb");
let index = index_scheduler.index_mapper.index(&rtxn, &name)?;
let mut index_file = index.try_clone_inner_file().unwrap();
// Note: A previous snapshot operation may have left the cursor
// at the end of the file so we need to seek to the start.
index_file.seek(SeekFrom::Start(0))?;
tracing::trace!("Appending index file for {name} in {}", path.display());
tarball.append_file(path, &mut index_file)?;
}
drop(rtxn);
// 4. Snapshot the auth LMDB env
progress.update_progress(SnapshotCreationProgress::SnapshotTheApiKeys);
let mut auth_env_file =
index_scheduler.scheduler.auth_env.try_clone_inner_file().unwrap();
// Note: A previous snapshot operation may have left the cursor
// at the end of the file so we need to seek to the start.
auth_env_file.seek(SeekFrom::Start(0))?;
let path = Path::new("auth").join("data.mdb");
tarball.append_file(path, &mut auth_env_file)?;
let mut gzencoder = tarball.into_inner()?;
gzencoder.flush()?;
gzencoder.try_finish()?;
let mut writer = gzencoder.finish()?;
writer.flush()?;
Result::<_, Error>::Ok(())
stream_tarball_into_pipe(progress, level, writer, index_scheduler)
});
let (uploader_result, builder_result) = tokio::join!(uploader_task, builder_task);
@@ -392,6 +295,112 @@ impl IndexScheduler {
}
}
/// Streams a tarball of the database content into a pipe.
#[cfg(unix)]
fn stream_tarball_into_pipe(
progress: Progress,
level: u32,
writer: std::io::PipeWriter,
index_scheduler: IndexScheduler,
) -> std::result::Result<(), Error> {
use std::io::{Seek as _, SeekFrom, Write as _};
use std::path::Path;
let writer = flate2::write::GzEncoder::new(writer, flate2::Compression::new(level));
let mut tarball = tar::Builder::new(writer);
// 1. Snapshot the version file
tarball
.append_path_with_name(&index_scheduler.scheduler.version_file_path, VERSION_FILE_NAME)?;
// 2. Snapshot the index scheduler LMDB env
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler);
let mut tasks_env_file = index_scheduler.env.try_clone_inner_file()?;
// Note: A previous snapshot operation may have left the cursor
// at the end of the file so we need to seek to the start.
tasks_env_file.seek(SeekFrom::Start(0))?;
let path = Path::new("tasks").join("data.mdb");
tarball.append_file(path, &mut tasks_env_file)?;
drop(tasks_env_file);
// 2.3 Create a read transaction on the index-scheduler
let rtxn = index_scheduler.env.read_txn()?;
// 2.4 Create the update files directory
// And only copy the update files of the enqueued tasks
progress.update_progress(SnapshotCreationProgress::SnapshotTheUpdateFiles);
let enqueued = index_scheduler.queue.tasks.get_status(&rtxn, Status::Enqueued)?;
let (atomic, update_file_progress) = AtomicUpdateFileStep::new(enqueued.len() as u32);
progress.update_progress(update_file_progress);
// TODO I need to create the update files directory (even if empty)
// I should probably simply use the append_dir_all method
// but I'll loose the progression.
let update_files_dir = Path::new("update_files");
for task_id in enqueued {
let task = index_scheduler
.queue
.tasks
.get_task(&rtxn, task_id)?
.ok_or(Error::CorruptedTaskQueue)?;
if let Some(content_uuid) = task.content_uuid() {
use std::fs::File;
let src = index_scheduler.queue.file_store.update_path(content_uuid);
let mut update_file = File::open(src)?;
let path = update_files_dir.join(content_uuid.to_string());
tarball.append_file(path, &mut update_file)?;
}
atomic.fetch_add(1, Ordering::Relaxed);
}
// 3. Snapshot every indexes
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexes);
let index_mapping = index_scheduler.index_mapper.index_mapping;
let nb_indexes = index_mapping.len(&rtxn)? as u32;
let indexes_dir = Path::new("indexes");
let indexes_references: Vec<_> = index_scheduler
.index_mapper
.index_mapping
.iter(&rtxn)?
.map(|res| res.map_err(Error::from).map(|(name, uuid)| (name.to_string(), uuid)))
.collect::<Result<_, Error>>()?;
// It's prettier to use a for loop instead of the IndexMapper::try_for_each_index
// method, especially when we need to access the UUID, local path and index number.
for (i, (name, uuid)) in indexes_references.into_iter().enumerate() {
progress.update_progress(VariableNameStep::<SnapshotCreationProgress>::new(
&name, i as u32, nb_indexes,
));
let path = indexes_dir.join(uuid.to_string()).join("data.mdb");
let index = index_scheduler.index_mapper.index(&rtxn, &name)?;
let mut index_file = index.try_clone_inner_file().unwrap();
// Note: A previous snapshot operation may have left the cursor
// at the end of the file so we need to seek to the start.
index_file.seek(SeekFrom::Start(0))?;
tracing::trace!("Appending index file for {name} in {}", path.display());
tarball.append_file(path, &mut index_file)?;
}
drop(rtxn);
// 4. Snapshot the auth LMDB env
progress.update_progress(SnapshotCreationProgress::SnapshotTheApiKeys);
let mut auth_env_file = index_scheduler.scheduler.auth_env.try_clone_inner_file().unwrap();
// Note: A previous snapshot operation may have left the cursor
// at the end of the file so we need to seek to the start.
auth_env_file.seek(SeekFrom::Start(0))?;
let path = Path::new("auth").join("data.mdb");
tarball.append_file(path, &mut auth_env_file)?;
let mut gzencoder = tarball.into_inner()?;
gzencoder.flush()?;
gzencoder.try_finish()?;
let mut writer = gzencoder.finish()?;
writer.flush()?;
Result::<_, Error>::Ok(())
}
/// Streams the content read from the given reader to S3.
#[cfg(unix)]
async fn multipart_stream_to_s3(