Improve the way we create the snapshot path

This commit is contained in:
Clément Renault
2025-10-20 16:46:24 +02:00
parent d298b21a95
commit 5e1af30b42
3 changed files with 26 additions and 14 deletions

View File

@@ -29,6 +29,7 @@ meilisearch-auth = { path = "../meilisearch-auth" }
meilisearch-types = { path = "../meilisearch-types" }
memmap2 = "0.9.7"
page_size = "0.6.0"
path-slash = "0.2.1"
rayon = "1.10.0"
roaring = { version = "0.10.12", features = ["serde"] }
serde = { version = "1.0.219", features = ["derive"] }

View File

@@ -2,6 +2,7 @@ use std::collections::VecDeque;
use std::env::VarError;
use std::ffi::OsStr;
use std::fs;
use std::path::PathBuf;
use std::sync::atomic::Ordering;
use std::time::Duration;
@@ -9,6 +10,7 @@ use meilisearch_types::heed::CompactionOption;
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
use meilisearch_types::tasks::{Status, Task};
use meilisearch_types::{compression, VERSION_FILE_NAME};
use path_slash::PathBufExt;
use reqwest::header::ETAG;
use reqwest::Client;
use rusty_s3::actions::{CreateMultipartUpload, S3Action as _};
@@ -121,7 +123,7 @@ impl IndexScheduler {
bucket_url,
bucket_region,
bucket_name,
snapshot_prefix,
PathBuf::from_slash(snapshot_prefix),
access_key,
secret_key,
tasks,
@@ -284,7 +286,7 @@ impl IndexScheduler {
bucket_url: String,
bucket_region: String,
bucket_name: String,
snapshot_prefix: String,
snapshot_prefix: PathBuf,
access_key: String,
secret_key: String,
mut tasks: Vec<Task>,
@@ -295,13 +297,13 @@ impl IndexScheduler {
use std::path::Path;
use bytes::{Bytes, BytesMut};
use meilisearch_types::milli::update::new::StdResult;
use reqwest::Response;
use tokio::task::JoinHandle;
const ONE_HOUR: Duration = Duration::from_secs(3600);
// default part size is 250MiB
// TODO use 375MiB
// It must be at least twice 5MiB
// It must be at least 2x5MiB
const PART_SIZE: usize = 10 * 1024 * 1024;
// The maximum number of parts that can be uploaded in parallel.
@@ -317,14 +319,16 @@ impl IndexScheduler {
let bucket = Bucket::new(url, UrlStyle::Path, bucket_name, bucket_region).unwrap();
let credential = Credentials::new(access_key, secret_key);
// TODO change this and use the database name like in the original version
let object = format!("{}/data.ms.snapshot", snapshot_prefix);
let object_path = snapshot_prefix.join("data.ms.snapshot");
let object = object_path.to_slash().expect("Invalid UTF-8 path").into_owned();
eprintln!("Starting the upload of the snapshot to {object}");
// TODO implement exponential backoff on upload requests: https://docs.rs/backoff
// TODO return a result with actual errors
// TODO sign for longer than an hour?
// TODO Use a better thing than a String for the object path
// NOTE to make it work on Windows we could try using std::io::pipe instead
// let (writer, reader) = tokio::net::unix::pipe::pipe()?;
// NOTE to make it work on Windows we could try using std::io::pipe instead.
// However, we are still using the tokio unix pipe in the async upload loop.
let (reader, writer) = std::io::pipe()?;
let uploader_task = tokio::spawn(async move {
let reader = OwnedFd::from(reader);
@@ -338,10 +342,10 @@ impl IndexScheduler {
let multipart = CreateMultipartUpload::parse_response(&body).unwrap();
let mut etags = Vec::<String>::new();
let mut in_flight = VecDeque::<(
JoinHandle<StdResult<reqwest::Response, reqwest::Error>>,
Bytes,
)>::with_capacity(max_in_flight_parts);
let mut in_flight =
VecDeque::<(JoinHandle<reqwest::Result<Response>>, Bytes)>::with_capacity(
max_in_flight_parts,
);
for part_number in 1u16.. {
let part_upload = bucket.upload_part(