From 0c06bdefacb545e8366d0a45c92e9239297e66b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Thu, 16 Oct 2025 16:25:52 +0200 Subject: [PATCH] WIP --- .../scheduler/process_snapshot_creation.rs | 72 +++++++++++++------ 1 file changed, 50 insertions(+), 22 deletions(-) diff --git a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs index 530ed243f..9e4bb2770 100644 --- a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs +++ b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs @@ -92,19 +92,22 @@ impl IndexScheduler { const S3_BUCKET_URL: &str = "MEILI_S3_BUCKET_URL"; const S3_BUCKET_REGION: &str = "MEILI_S3_BUCKET_REGION"; const S3_BUCKET_NAME: &str = "MEILI_S3_BUCKET_NAME"; + const S3_SNAPSHOT_PREFIX: &str = "MEILI_S3_SNAPSHOT_PREFIX"; const S3_ACCESS_KEY: &str = "MEILI_S3_ACCESS_KEY"; const S3_SECRET_KEY: &str = "MEILI_S3_SECRET_KEY"; let bucket_url = std::env::var(S3_BUCKET_URL); let bucket_region = std::env::var(S3_BUCKET_REGION); let bucket_name = std::env::var(S3_BUCKET_NAME); + let snapshot_prefix = std::env::var(S3_SNAPSHOT_PREFIX); let access_key = std::env::var(S3_ACCESS_KEY); let secret_key = std::env::var(S3_SECRET_KEY); - match (bucket_url, bucket_region, bucket_name, access_key, secret_key) { + match (bucket_url, bucket_region, bucket_name, snapshot_prefix, access_key, secret_key) { ( Ok(bucket_url), Ok(bucket_region), Ok(bucket_name), + Ok(snapshot_prefix), Ok(access_key), Ok(secret_key), ) => { @@ -117,6 +120,7 @@ impl IndexScheduler { bucket_url, bucket_region, bucket_name, + snapshot_prefix, access_key, secret_key, tasks, @@ -128,12 +132,14 @@ impl IndexScheduler { Err(VarError::NotPresent), Err(VarError::NotPresent), Err(VarError::NotPresent), + Err(VarError::NotPresent), ) => self.process_snapshots_to_disk(progress, tasks), - (Err(e), _, _, _, _) - | (_, Err(e), _, _, _) - | (_, _, Err(e), _, _) - | (_, _, _, Err(e), _) - | (_, _, _, _, Err(e)) => { + (Err(e), _, _, _, _, _) + | (_, Err(e), _, _, _, _) + | (_, _, Err(e), _, _, _) + | (_, _, _, Err(e), _, _) + | (_, _, _, _, Err(e), _) + | (_, _, _, _, _, Err(e)) => { // TODO: Handle error gracefully panic!("Error while reading environment variables: {}", e); } @@ -277,15 +283,18 @@ impl IndexScheduler { bucket_url: String, bucket_region: String, bucket_name: String, + snapshot_prefix: String, access_key: String, secret_key: String, mut tasks: Vec, ) -> Result> { + use std::io; use std::path::Path; use async_compression::tokio::write::GzipEncoder; use async_compression::Level; use bytes::{Bytes, BytesMut}; + use meilisearch_types::milli::update::new::StdResult; use tokio::fs::File; use tokio::io::AsyncReadExt; use tokio::task::JoinHandle; @@ -293,7 +302,7 @@ impl IndexScheduler { const ONE_HOUR: Duration = Duration::from_secs(3600); // default part size is 250MiB // TODO use 375MiB - const PART_SIZE: usize = 500 * 1024 * 1024; + const PART_SIZE: usize = 5 * 1024 * 1024; // The maximum number of parts that can be uploaded in parallel. const S3_MAX_IN_FLIGHT_PARTS: &str = "MEILI_S3_MAX_IN_FLIGHT_PARTS"; @@ -308,7 +317,7 @@ impl IndexScheduler { let bucket = Bucket::new(url, UrlStyle::Path, bucket_name, bucket_region).unwrap(); let credential = Credentials::new(access_key, secret_key); // TODO change this and use the database name like in the original version - let object = "data.ms.snapshot"; + let object = format!("{}/data.ms.snapshot", snapshot_prefix); // TODO implement exponential backoff on upload requests: https://docs.rs/backoff // TODO return a result with actual errors @@ -316,9 +325,7 @@ impl IndexScheduler { // TODO Use a better thing than a String for the object path let (writer, mut reader) = tokio::net::unix::pipe::pipe()?; let uploader_task = tokio::spawn(async move { - use meilisearch_types::milli::update::new::StdResult; - - let action = bucket.create_multipart_upload(Some(&credential), object); + let action = bucket.create_multipart_upload(Some(&credential), &object); // TODO Question: If it is only signed for an hour and a snapshot takes longer than an hour, what happens? // If the part is deleted (like a TTL) we should sign it for at least 24 hours. let url = action.sign(ONE_HOUR); @@ -327,15 +334,15 @@ impl IndexScheduler { let multipart = CreateMultipartUpload::parse_response(&body).unwrap(); let mut etags = Vec::::new(); - let mut in_flight = VecDeque::<( JoinHandle>, Bytes, )>::with_capacity(max_in_flight_parts); + for part_number in 1u16.. { let part_upload = bucket.upload_part( Some(&credential), - object, + &object, part_number, multipart.upload_id(), ); @@ -358,18 +365,39 @@ impl IndexScheduler { }; while buffer.len() < (PART_SIZE / 2) { - if reader.read_buf(&mut buffer).await? == 0 { - eprintln!( - "breaking because read returned 0. Buffer len {}; capacity {}", - buffer.len(), - buffer.capacity(), - ); - break; + eprintln!( + "buffer is {:.2}% full, trying to read more", + buffer.len() as f32 / buffer.capacity() as f32 * 100.0 + ); + + // Wait for the pipe to be readable + reader.readable().await?; + + match reader.try_read_buf(&mut buffer) { + Ok(0) => break, + // We read some bytes but maybe not enough + Ok(n) => { + eprintln!("Read {} bytes from pipe, continuing", n); + continue; + } + // The readiness event is a false positive. + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + eprintln!("received a WouldBlock"); + continue; + } + Err(e) => return Err(e.into()), } } + eprintln!( + "buffer is {:.2}% full", + buffer.len() as f32 / buffer.capacity() as f32 * 100.0 + ); + if buffer.is_empty() { eprintln!("buffer is empty, breaking part number loop"); + // Break the loop if the buffer is empty + // after we tried to read bytes break; } @@ -389,7 +417,7 @@ impl IndexScheduler { let action = bucket.complete_multipart_upload( Some(&credential), - object, + &object, multipart.upload_id(), etags.iter().map(AsRef::as_ref), ); @@ -498,7 +526,7 @@ impl IndexScheduler { let path = Path::new("auth").join("data.mdb"); tarball.append_file(path, &mut auth_env_file).await?; - tarball.finish().await?; + tarball.into_inner().await?; Result::<_, Error>::Ok(()) });