diff --git a/Cargo.lock b/Cargo.lock index c197ae7f0..6450b7f84 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3259,7 +3259,9 @@ dependencies = [ "memmap2", "page_size", "rayon", + "reqwest", "roaring 0.10.12", + "rusty-s3", "serde", "serde_json", "synchronoise", @@ -3466,6 +3468,30 @@ dependencies = [ "regex", ] +[[package]] +name = "jiff" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be1f93b8b1eb69c77f24bbb0afdf66f54b632ee39af40ca21c4365a1d7347e49" +dependencies = [ + "jiff-static", + "log", + "portable-atomic", + "portable-atomic-util", + "serde", +] + +[[package]] +name = "jiff-static" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03343451ff899767262ec32146f6d559dd759fdadf42ff0e227c7c48f72594b4" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "jobserver" version = "0.1.34" @@ -3989,6 +4015,16 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d" +[[package]] +name = "md-5" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +dependencies = [ + "cfg-if", + "digest", +] + [[package]] name = "md5" version = "0.7.0" @@ -4963,6 +4999,15 @@ version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483" +[[package]] +name = "portable-atomic-util" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507" +dependencies = [ + "portable-atomic", +] + [[package]] name = "potential_utf" version = "0.1.3" @@ -5140,6 +5185,16 @@ dependencies = [ "version_check", ] +[[package]] +name = "quick-xml" +version = "0.38.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42a232e7487fc2ef313d96dde7948e7a3c05101870d8985e4fd8d26aedd27b89" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quinn" version = "0.11.9" @@ -5415,6 +5470,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", + "h2 0.4.12", "http 1.3.1", "http-body", "http-body-util", @@ -5705,6 +5761,25 @@ version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" +[[package]] +name = "rusty-s3" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fac2edd2f0b56bd79a7343f49afc01c2d41010df480538a510e0abc56044f66c" +dependencies = [ + "base64 0.22.1", + "hmac", + "jiff", + "md-5", + "percent-encoding", + "quick-xml", + "serde", + "serde_json", + "sha2", + "url", + "zeroize", +] + [[package]] name = "ryu" version = "1.0.20" diff --git a/crates/index-scheduler/Cargo.toml b/crates/index-scheduler/Cargo.toml index a96e669d2..9d01df3e5 100644 --- a/crates/index-scheduler/Cargo.toml +++ b/crates/index-scheduler/Cargo.toml @@ -45,6 +45,8 @@ tracing = "0.1.41" ureq = "2.12.1" uuid = { version = "1.17.0", features = ["serde", "v4"] } backoff = "0.4.0" +reqwest = { version = "0.12.23", features = ["rustls-tls", "http2"], default-features = false } +rusty-s3 = "0.8.1" tokio = { version = "1.47.1", features = ["full"] } [dev-dependencies] diff --git a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs index 4a7a9e074..8e6f6954e 100644 --- a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs +++ b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs @@ -1,11 +1,17 @@ +use std::env::VarError; use std::ffi::OsStr; use std::fs; use std::sync::atomic::Ordering; +use std::time::Duration; use meilisearch_types::heed::CompactionOption; use meilisearch_types::milli::progress::{Progress, VariableNameStep}; use meilisearch_types::tasks::{Status, Task}; use meilisearch_types::{compression, VERSION_FILE_NAME}; +use reqwest::header::ETAG; +use reqwest::Client; +use rusty_s3::actions::{CreateMultipartUpload, S3Action as _}; +use rusty_s3::{Bucket, Credentials, UrlStyle}; use crate::heed::EnvOpenOptions; use crate::processing::{AtomicUpdateFileStep, SnapshotCreationProgress}; @@ -82,6 +88,54 @@ impl IndexScheduler { ) -> Result> { progress.update_progress(SnapshotCreationProgress::StartTheSnapshotCreation); + const S3_BUCKET_URL: &str = "MEILI_S3_BUCKET_URL"; + const S3_BUCKET_REGION: &str = "MEILI_S3_BUCKET_REGION"; + const S3_BUCKET_NAME: &str = "MEILI_S3_BUCKET_NAME"; + const S3_ACCESS_KEY: &str = "MEILI_S3_ACCESS_KEY"; + const S3_SECRET_KEY: &str = "MEILI_S3_SECRET_KEY"; + + let bucket_url = std::env::var(S3_BUCKET_URL); + let bucket_region = std::env::var(S3_BUCKET_REGION); + let bucket_name = std::env::var(S3_BUCKET_NAME); + let access_key = std::env::var(S3_ACCESS_KEY); + let secret_key = std::env::var(S3_SECRET_KEY); + match (bucket_url, bucket_region, bucket_name, access_key, secret_key) { + ( + Ok(bucket_url), + Ok(bucket_region), + Ok(bucket_name), + Ok(access_key), + Ok(secret_key), + ) => { + let runtime = self.runtime.as_ref().expect("Runtime not initialized"); + return runtime.block_on(self.process_snapshot_to_s3( + progress, + bucket_url, + bucket_region, + bucket_name, + access_key, + secret_key, + tasks, + )); + } + ( + Err(VarError::NotPresent), + Err(VarError::NotPresent), + Err(VarError::NotPresent), + Err(VarError::NotPresent), + Err(VarError::NotPresent), + ) => (), + (Err(e), _, _, _, _) + | (_, Err(e), _, _, _) + | (_, _, Err(e), _, _) + | (_, _, _, Err(e), _) + | (_, _, _, _, Err(e)) => { + // TODO: Handle error gracefully + panic!("Error while reading environment variables: {}", e); + } + } + + // TODO move this code into a separate function fs::create_dir_all(&self.scheduler.snapshots_path)?; let temp_snapshot_dir = tempfile::tempdir()?; @@ -206,4 +260,68 @@ impl IndexScheduler { Ok(tasks) } + + pub(super) async fn process_snapshot_to_s3( + &self, + progress: Progress, + bucket_url: String, + bucket_region: String, + bucket_name: String, + access_key: String, + secret_key: String, + mut tasks: Vec, + ) -> Result> { + const ONE_HOUR: Duration = Duration::from_secs(3600); + + let client = Client::new(); + // TODO Remove this unwrap + let url = bucket_url.parse().unwrap(); + eprintln!("{url:?}"); + let bucket = Bucket::new(url, UrlStyle::Path, bucket_name, bucket_region).unwrap(); + let credential = Credentials::new(access_key, secret_key); + + let object = "test.txt"; + let action = bucket.create_multipart_upload(Some(&credential), object); + let url = action.sign(ONE_HOUR); + let resp = client.post(url).send().await.unwrap().error_for_status().unwrap(); + let body = resp.text().await.unwrap(); + + let multipart = CreateMultipartUpload::parse_response(&body).unwrap(); + let mut etags = Vec::::new(); + + // Every part must be between 5 MB and 5 GB in size, except for the last part + // A maximum of 10,000 parts can be uploaded to a single multipart upload. + // + // Part numbers can be any number from 1 to 10,000, inclusive. + // A part number uniquely identifies a part and also defines its position within + // the object being created. If you upload a new part using the same part number + // that was used with a previous part, the previously uploaded part is overwritten. + let part_upload = bucket.upload_part(Some(&credential), object, 1, multipart.upload_id()); + let url = part_upload.sign(ONE_HOUR); + + let body = "123456789"; + let resp = client.put(url).body(body).send().await.unwrap().error_for_status().unwrap(); + let etag = resp.headers().get(ETAG).expect("every UploadPart request returns an Etag"); + // TODO use bumpalo to reduce the number of allocations + etags.push(etag.to_str().unwrap().to_owned()); + + let action = bucket.complete_multipart_upload( + Some(&credential), + object, + multipart.upload_id(), + etags.iter().map(AsRef::as_ref), + ); + let url = action.sign(ONE_HOUR); + + let resp = + client.post(url).body(action.body()).send().await.unwrap().error_for_status().unwrap(); + let body = resp.text().await.unwrap(); + println!("it worked! {body}"); + + for task in &mut tasks { + task.status = Status::Succeeded; + } + + Ok(tasks) + } }