mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-11-22 04:36:32 +00:00
Geenrate an async tarball
This commit is contained in:
@@ -12,6 +12,8 @@ license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.98"
|
||||
astral-tokio-tar = { version = "0.5.5", default-features = false }
|
||||
async-compression = { version = "0.4.0", features = ["tokio", "gzip"] }
|
||||
bincode = "1.3.3"
|
||||
byte-unit = "5.1.6"
|
||||
bytes = "1.10.1"
|
||||
|
||||
@@ -109,6 +109,9 @@ impl IndexScheduler {
|
||||
Ok(secret_key),
|
||||
) => {
|
||||
let runtime = self.runtime.as_ref().expect("Runtime not initialized");
|
||||
#[cfg(not(unix))]
|
||||
panic!("Non-unix platform does not support S3 snapshotting");
|
||||
#[cfg(unix)]
|
||||
runtime.block_on(self.process_snapshot_to_s3(
|
||||
progress,
|
||||
bucket_url,
|
||||
@@ -200,7 +203,7 @@ impl IndexScheduler {
|
||||
let task =
|
||||
self.queue.tasks.get_task(&rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
|
||||
if let Some(content_uuid) = task.content_uuid() {
|
||||
let src = self.queue.file_store.get_update_path(content_uuid);
|
||||
let src = self.queue.file_store.update_path(content_uuid);
|
||||
let dst = update_files_dir.join(content_uuid.to_string());
|
||||
fs::copy(src, dst)?;
|
||||
}
|
||||
@@ -267,6 +270,7 @@ impl IndexScheduler {
|
||||
Ok(tasks)
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
pub(super) async fn process_snapshot_to_s3(
|
||||
&self,
|
||||
progress: Progress,
|
||||
@@ -277,6 +281,12 @@ impl IndexScheduler {
|
||||
secret_key: String,
|
||||
mut tasks: Vec<Task>,
|
||||
) -> Result<Vec<Task>> {
|
||||
use std::path::Path;
|
||||
|
||||
use async_compression::tokio::write::GzipEncoder;
|
||||
use async_compression::Level;
|
||||
use tokio::fs::File;
|
||||
|
||||
// The maximum number of parts that can be uploaded in parallel.
|
||||
const S3_MAX_IN_FLIGHT_PARTS: &str = "MEILI_S3_MAX_IN_FLIGHT_PARTS";
|
||||
let max_in_flight_parts: usize = match std::env::var(S3_MAX_IN_FLIGHT_PARTS) {
|
||||
@@ -290,43 +300,69 @@ impl IndexScheduler {
|
||||
let bucket = Bucket::new(url, UrlStyle::Path, bucket_name, bucket_region).unwrap();
|
||||
let credential = Credentials::new(access_key, secret_key);
|
||||
|
||||
let rtxn = self.read_txn()?;
|
||||
let (writer, reader) = tokio::net::unix::pipe::pipe()?;
|
||||
let compressed_writer = GzipEncoder::with_quality(writer, Level::Fastest);
|
||||
let mut tarball = tokio_tar::Builder::new(compressed_writer);
|
||||
|
||||
// Every part must be between 5 MB and 5 GB in size, except for the last part
|
||||
// A maximum of 10,000 parts can be uploaded to a single multipart upload.
|
||||
//
|
||||
// Part numbers can be any number from 1 to 10,000, inclusive.
|
||||
// A part number uniquely identifies a part and also defines its position within
|
||||
// the object being created. If you upload a new part using the same part number
|
||||
// that was used with a previous part, the previously uploaded part is overwritten.
|
||||
// 1. Snapshot the version file
|
||||
tarball.append_path_with_name(&self.scheduler.version_file_path, VERSION_FILE_NAME).await?;
|
||||
|
||||
// 2. Snapshot the index scheduler LMDB env
|
||||
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler);
|
||||
let mut tasks_env_file = self.env.try_clone_inner_file().map(File::from_std)?;
|
||||
let path = Path::new("tasks").join("data.mdb");
|
||||
tarball.append_file(path, &mut tasks_env_file).await?;
|
||||
drop(tasks_env_file);
|
||||
|
||||
// 2.3 Create a read transaction on the index-scheduler
|
||||
let rtxn = self.env.read_txn()?;
|
||||
|
||||
// 2.4 Create the update files directory
|
||||
// And only copy the update files of the enqueued tasks
|
||||
progress.update_progress(SnapshotCreationProgress::SnapshotTheUpdateFiles);
|
||||
let enqueued = self.queue.tasks.get_status(&rtxn, Status::Enqueued)?;
|
||||
let (atomic, update_file_progress) = AtomicUpdateFileStep::new(enqueued.len() as u32);
|
||||
progress.update_progress(update_file_progress);
|
||||
let update_files_dir = Path::new("update_files");
|
||||
for task_id in enqueued {
|
||||
let task =
|
||||
self.queue.tasks.get_task(&rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
|
||||
if let Some(content_uuid) = task.content_uuid() {
|
||||
let src = self.queue.file_store.update_path(content_uuid);
|
||||
let mut update_file = File::open(src).await?;
|
||||
let path = update_files_dir.join(content_uuid.to_string());
|
||||
tarball.append_file(path, &mut update_file).await?;
|
||||
}
|
||||
atomic.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
// 3. Snapshot every indexes
|
||||
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexes);
|
||||
let index_mapping = self.index_mapper.index_mapping;
|
||||
let nb_indexes = index_mapping.len(&rtxn)? as u32;
|
||||
let indexes_dir = Path::new("indexes");
|
||||
for (i, result) in index_mapping.iter(&rtxn)?.enumerate() {
|
||||
let (name, uuid) = result?;
|
||||
progress.update_progress(VariableNameStep::<SnapshotCreationProgress>::new(
|
||||
name, i as u32, nb_indexes,
|
||||
));
|
||||
let index = self.index_mapper.index(&rtxn, name)?;
|
||||
let file = index
|
||||
.try_clone_inner_file()
|
||||
.map_err(|e| Error::from_milli(e, Some(name.to_string())))?;
|
||||
let mmap = unsafe { memmap2::Mmap::map(&file)? };
|
||||
mmap.advise(memmap2::Advice::Sequential)?;
|
||||
let mmap = bytes::Bytes::from_owner(mmap);
|
||||
|
||||
let object = format!("indexes/{uuid}");
|
||||
multipart_upload(
|
||||
&bucket,
|
||||
&client,
|
||||
Some(&credential),
|
||||
max_in_flight_parts,
|
||||
mmap,
|
||||
&object,
|
||||
)
|
||||
.await?;
|
||||
let path = indexes_dir.join(uuid.to_string());
|
||||
let mut index_file = index.try_clone_inner_file().map(File::from_std).unwrap();
|
||||
tarball.append_file(path, &mut index_file).await?;
|
||||
}
|
||||
|
||||
drop(rtxn);
|
||||
|
||||
// 4. Snapshot the auth LMDB env
|
||||
progress.update_progress(SnapshotCreationProgress::SnapshotTheApiKeys);
|
||||
let mut auth_env_file =
|
||||
self.scheduler.auth_env.try_clone_inner_file().map(File::from_std).unwrap();
|
||||
let path = Path::new("auth").join("data.mdb");
|
||||
tarball.append_file(path, &mut auth_env_file).await?;
|
||||
|
||||
tarball.finish().await?;
|
||||
|
||||
for task in &mut tasks {
|
||||
task.status = Status::Succeeded;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user