diff --git a/Cargo.lock b/Cargo.lock index 86a77d4ee..f3355e9cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1123,7 +1123,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "159fa412eae48a1d94d0b9ecdb85c97ce56eb2a347c62394d3fdbf221adabc1a" dependencies = [ "path-matchers", - "path-slash", + "path-slash 0.1.5", ] [[package]] @@ -3259,6 +3259,7 @@ dependencies = [ "meilisearch-types", "memmap2", "page_size", + "path-slash 0.2.1", "rayon", "reqwest", "roaring 0.10.12", @@ -4784,6 +4785,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "498a099351efa4becc6a19c72aa9270598e8fd274ca47052e37455241c88b696" +[[package]] +name = "path-slash" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e91099d4268b0e11973f036e885d652fb0b21fedcf69738c627f94db6a44f42" + [[package]] name = "pbkdf2" version = "0.12.2" @@ -6260,7 +6267,7 @@ checksum = "f9c425c07353535ef55b45420f5a8b0a397cd9bc3d7e5236497ca0d90604aa9b" dependencies = [ "change-detection", "mime_guess", - "path-slash", + "path-slash 0.1.5", ] [[package]] diff --git a/crates/index-scheduler/Cargo.toml b/crates/index-scheduler/Cargo.toml index fb8b6ff7b..454f0063a 100644 --- a/crates/index-scheduler/Cargo.toml +++ b/crates/index-scheduler/Cargo.toml @@ -29,6 +29,7 @@ meilisearch-auth = { path = "../meilisearch-auth" } meilisearch-types = { path = "../meilisearch-types" } memmap2 = "0.9.7" page_size = "0.6.0" +path-slash = "0.2.1" rayon = "1.10.0" roaring = { version = "0.10.12", features = ["serde"] } serde = { version = "1.0.219", features = ["derive"] } diff --git a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs index 6971a2a76..f5190860d 100644 --- a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs +++ b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs @@ -2,6 +2,7 @@ use std::collections::VecDeque; use std::env::VarError; use std::ffi::OsStr; use std::fs; +use std::path::PathBuf; use std::sync::atomic::Ordering; use std::time::Duration; @@ -9,6 +10,7 @@ use meilisearch_types::heed::CompactionOption; use meilisearch_types::milli::progress::{Progress, VariableNameStep}; use meilisearch_types::tasks::{Status, Task}; use meilisearch_types::{compression, VERSION_FILE_NAME}; +use path_slash::PathBufExt; use reqwest::header::ETAG; use reqwest::Client; use rusty_s3::actions::{CreateMultipartUpload, S3Action as _}; @@ -121,7 +123,7 @@ impl IndexScheduler { bucket_url, bucket_region, bucket_name, - snapshot_prefix, + PathBuf::from_slash(snapshot_prefix), access_key, secret_key, tasks, @@ -284,7 +286,7 @@ impl IndexScheduler { bucket_url: String, bucket_region: String, bucket_name: String, - snapshot_prefix: String, + snapshot_prefix: PathBuf, access_key: String, secret_key: String, mut tasks: Vec, @@ -295,13 +297,13 @@ impl IndexScheduler { use std::path::Path; use bytes::{Bytes, BytesMut}; - use meilisearch_types::milli::update::new::StdResult; + use reqwest::Response; use tokio::task::JoinHandle; const ONE_HOUR: Duration = Duration::from_secs(3600); // default part size is 250MiB // TODO use 375MiB - // It must be at least twice 5MiB + // It must be at least 2x5MiB const PART_SIZE: usize = 10 * 1024 * 1024; // The maximum number of parts that can be uploaded in parallel. @@ -317,14 +319,16 @@ impl IndexScheduler { let bucket = Bucket::new(url, UrlStyle::Path, bucket_name, bucket_region).unwrap(); let credential = Credentials::new(access_key, secret_key); // TODO change this and use the database name like in the original version - let object = format!("{}/data.ms.snapshot", snapshot_prefix); + let object_path = snapshot_prefix.join("data.ms.snapshot"); + let object = object_path.to_slash().expect("Invalid UTF-8 path").into_owned(); + + eprintln!("Starting the upload of the snapshot to {object}"); // TODO implement exponential backoff on upload requests: https://docs.rs/backoff // TODO return a result with actual errors // TODO sign for longer than an hour? - // TODO Use a better thing than a String for the object path - // NOTE to make it work on Windows we could try using std::io::pipe instead - // let (writer, reader) = tokio::net::unix::pipe::pipe()?; + // NOTE to make it work on Windows we could try using std::io::pipe instead. + // However, we are still using the tokio unix pipe in the async upload loop. let (reader, writer) = std::io::pipe()?; let uploader_task = tokio::spawn(async move { let reader = OwnedFd::from(reader); @@ -338,10 +342,10 @@ impl IndexScheduler { let multipart = CreateMultipartUpload::parse_response(&body).unwrap(); let mut etags = Vec::::new(); - let mut in_flight = VecDeque::<( - JoinHandle>, - Bytes, - )>::with_capacity(max_in_flight_parts); + let mut in_flight = + VecDeque::<(JoinHandle>, Bytes)>::with_capacity( + max_in_flight_parts, + ); for part_number in 1u16.. { let part_upload = bucket.upload_part(