Geenrate an async tarball

This commit is contained in:
Clément Renault
2025-10-14 15:15:47 +02:00
committed by Kerollmops
parent 982babdb74
commit 76e4ec2168
3 changed files with 111 additions and 26 deletions

View File

@@ -12,6 +12,8 @@ license.workspace = true
[dependencies]
anyhow = "1.0.98"
astral-tokio-tar = { version = "0.5.5", default-features = false }
async-compression = { version = "0.4.0", features = ["tokio", "gzip"] }
bincode = "1.3.3"
byte-unit = "5.1.6"
bytes = "1.10.1"

View File

@@ -109,6 +109,9 @@ impl IndexScheduler {
Ok(secret_key),
) => {
let runtime = self.runtime.as_ref().expect("Runtime not initialized");
#[cfg(not(unix))]
panic!("Non-unix platform does not support S3 snapshotting");
#[cfg(unix)]
runtime.block_on(self.process_snapshot_to_s3(
progress,
bucket_url,
@@ -200,7 +203,7 @@ impl IndexScheduler {
let task =
self.queue.tasks.get_task(&rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
if let Some(content_uuid) = task.content_uuid() {
let src = self.queue.file_store.get_update_path(content_uuid);
let src = self.queue.file_store.update_path(content_uuid);
let dst = update_files_dir.join(content_uuid.to_string());
fs::copy(src, dst)?;
}
@@ -267,6 +270,7 @@ impl IndexScheduler {
Ok(tasks)
}
#[cfg(unix)]
pub(super) async fn process_snapshot_to_s3(
&self,
progress: Progress,
@@ -277,6 +281,12 @@ impl IndexScheduler {
secret_key: String,
mut tasks: Vec<Task>,
) -> Result<Vec<Task>> {
use std::path::Path;
use async_compression::tokio::write::GzipEncoder;
use async_compression::Level;
use tokio::fs::File;
// The maximum number of parts that can be uploaded in parallel.
const S3_MAX_IN_FLIGHT_PARTS: &str = "MEILI_S3_MAX_IN_FLIGHT_PARTS";
let max_in_flight_parts: usize = match std::env::var(S3_MAX_IN_FLIGHT_PARTS) {
@@ -290,43 +300,69 @@ impl IndexScheduler {
let bucket = Bucket::new(url, UrlStyle::Path, bucket_name, bucket_region).unwrap();
let credential = Credentials::new(access_key, secret_key);
let rtxn = self.read_txn()?;
let (writer, reader) = tokio::net::unix::pipe::pipe()?;
let compressed_writer = GzipEncoder::with_quality(writer, Level::Fastest);
let mut tarball = tokio_tar::Builder::new(compressed_writer);
// Every part must be between 5 MB and 5 GB in size, except for the last part
// A maximum of 10,000 parts can be uploaded to a single multipart upload.
//
// Part numbers can be any number from 1 to 10,000, inclusive.
// A part number uniquely identifies a part and also defines its position within
// the object being created. If you upload a new part using the same part number
// that was used with a previous part, the previously uploaded part is overwritten.
// 1. Snapshot the version file
tarball.append_path_with_name(&self.scheduler.version_file_path, VERSION_FILE_NAME).await?;
// 2. Snapshot the index scheduler LMDB env
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler);
let mut tasks_env_file = self.env.try_clone_inner_file().map(File::from_std)?;
let path = Path::new("tasks").join("data.mdb");
tarball.append_file(path, &mut tasks_env_file).await?;
drop(tasks_env_file);
// 2.3 Create a read transaction on the index-scheduler
let rtxn = self.env.read_txn()?;
// 2.4 Create the update files directory
// And only copy the update files of the enqueued tasks
progress.update_progress(SnapshotCreationProgress::SnapshotTheUpdateFiles);
let enqueued = self.queue.tasks.get_status(&rtxn, Status::Enqueued)?;
let (atomic, update_file_progress) = AtomicUpdateFileStep::new(enqueued.len() as u32);
progress.update_progress(update_file_progress);
let update_files_dir = Path::new("update_files");
for task_id in enqueued {
let task =
self.queue.tasks.get_task(&rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
if let Some(content_uuid) = task.content_uuid() {
let src = self.queue.file_store.update_path(content_uuid);
let mut update_file = File::open(src).await?;
let path = update_files_dir.join(content_uuid.to_string());
tarball.append_file(path, &mut update_file).await?;
}
atomic.fetch_add(1, Ordering::Relaxed);
}
// 3. Snapshot every indexes
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexes);
let index_mapping = self.index_mapper.index_mapping;
let nb_indexes = index_mapping.len(&rtxn)? as u32;
let indexes_dir = Path::new("indexes");
for (i, result) in index_mapping.iter(&rtxn)?.enumerate() {
let (name, uuid) = result?;
progress.update_progress(VariableNameStep::<SnapshotCreationProgress>::new(
name, i as u32, nb_indexes,
));
let index = self.index_mapper.index(&rtxn, name)?;
let file = index
.try_clone_inner_file()
.map_err(|e| Error::from_milli(e, Some(name.to_string())))?;
let mmap = unsafe { memmap2::Mmap::map(&file)? };
mmap.advise(memmap2::Advice::Sequential)?;
let mmap = bytes::Bytes::from_owner(mmap);
let object = format!("indexes/{uuid}");
multipart_upload(
&bucket,
&client,
Some(&credential),
max_in_flight_parts,
mmap,
&object,
)
.await?;
let path = indexes_dir.join(uuid.to_string());
let mut index_file = index.try_clone_inner_file().map(File::from_std).unwrap();
tarball.append_file(path, &mut index_file).await?;
}
drop(rtxn);
// 4. Snapshot the auth LMDB env
progress.update_progress(SnapshotCreationProgress::SnapshotTheApiKeys);
let mut auth_env_file =
self.scheduler.auth_env.try_clone_inner_file().map(File::from_std).unwrap();
let path = Path::new("auth").join("data.mdb");
tarball.append_file(path, &mut auth_env_file).await?;
tarball.finish().await?;
for task in &mut tasks {
task.status = Status::Succeeded;
}