Initial working S3 uploads to RustFS

This commit is contained in:
Clément Renault
2025-10-09 16:02:44 +02:00
parent 18edd88a60
commit 94e12af45a
3 changed files with 195 additions and 0 deletions

View File

@@ -45,6 +45,8 @@ tracing = "0.1.41"
ureq = "2.12.1"
uuid = { version = "1.17.0", features = ["serde", "v4"] }
backoff = "0.4.0"
reqwest = { version = "0.12.23", features = ["rustls-tls", "http2"], default-features = false }
rusty-s3 = "0.8.1"
tokio = { version = "1.47.1", features = ["full"] }
[dev-dependencies]

View File

@@ -1,11 +1,17 @@
use std::env::VarError;
use std::ffi::OsStr;
use std::fs;
use std::sync::atomic::Ordering;
use std::time::Duration;
use meilisearch_types::heed::CompactionOption;
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
use meilisearch_types::tasks::{Status, Task};
use meilisearch_types::{compression, VERSION_FILE_NAME};
use reqwest::header::ETAG;
use reqwest::Client;
use rusty_s3::actions::{CreateMultipartUpload, S3Action as _};
use rusty_s3::{Bucket, Credentials, UrlStyle};
use crate::heed::EnvOpenOptions;
use crate::processing::{AtomicUpdateFileStep, SnapshotCreationProgress};
@@ -82,6 +88,54 @@ impl IndexScheduler {
) -> Result<Vec<Task>> {
progress.update_progress(SnapshotCreationProgress::StartTheSnapshotCreation);
const S3_BUCKET_URL: &str = "MEILI_S3_BUCKET_URL";
const S3_BUCKET_REGION: &str = "MEILI_S3_BUCKET_REGION";
const S3_BUCKET_NAME: &str = "MEILI_S3_BUCKET_NAME";
const S3_ACCESS_KEY: &str = "MEILI_S3_ACCESS_KEY";
const S3_SECRET_KEY: &str = "MEILI_S3_SECRET_KEY";
let bucket_url = std::env::var(S3_BUCKET_URL);
let bucket_region = std::env::var(S3_BUCKET_REGION);
let bucket_name = std::env::var(S3_BUCKET_NAME);
let access_key = std::env::var(S3_ACCESS_KEY);
let secret_key = std::env::var(S3_SECRET_KEY);
match (bucket_url, bucket_region, bucket_name, access_key, secret_key) {
(
Ok(bucket_url),
Ok(bucket_region),
Ok(bucket_name),
Ok(access_key),
Ok(secret_key),
) => {
let runtime = self.runtime.as_ref().expect("Runtime not initialized");
return runtime.block_on(self.process_snapshot_to_s3(
progress,
bucket_url,
bucket_region,
bucket_name,
access_key,
secret_key,
tasks,
));
}
(
Err(VarError::NotPresent),
Err(VarError::NotPresent),
Err(VarError::NotPresent),
Err(VarError::NotPresent),
Err(VarError::NotPresent),
) => (),
(Err(e), _, _, _, _)
| (_, Err(e), _, _, _)
| (_, _, Err(e), _, _)
| (_, _, _, Err(e), _)
| (_, _, _, _, Err(e)) => {
// TODO: Handle error gracefully
panic!("Error while reading environment variables: {}", e);
}
}
// TODO move this code into a separate function
fs::create_dir_all(&self.scheduler.snapshots_path)?;
let temp_snapshot_dir = tempfile::tempdir()?;
@@ -206,4 +260,68 @@ impl IndexScheduler {
Ok(tasks)
}
pub(super) async fn process_snapshot_to_s3(
&self,
progress: Progress,
bucket_url: String,
bucket_region: String,
bucket_name: String,
access_key: String,
secret_key: String,
mut tasks: Vec<Task>,
) -> Result<Vec<Task>> {
const ONE_HOUR: Duration = Duration::from_secs(3600);
let client = Client::new();
// TODO Remove this unwrap
let url = bucket_url.parse().unwrap();
eprintln!("{url:?}");
let bucket = Bucket::new(url, UrlStyle::Path, bucket_name, bucket_region).unwrap();
let credential = Credentials::new(access_key, secret_key);
let object = "test.txt";
let action = bucket.create_multipart_upload(Some(&credential), object);
let url = action.sign(ONE_HOUR);
let resp = client.post(url).send().await.unwrap().error_for_status().unwrap();
let body = resp.text().await.unwrap();
let multipart = CreateMultipartUpload::parse_response(&body).unwrap();
let mut etags = Vec::<String>::new();
// Every part must be between 5 MB and 5 GB in size, except for the last part
// A maximum of 10,000 parts can be uploaded to a single multipart upload.
//
// Part numbers can be any number from 1 to 10,000, inclusive.
// A part number uniquely identifies a part and also defines its position within
// the object being created. If you upload a new part using the same part number
// that was used with a previous part, the previously uploaded part is overwritten.
let part_upload = bucket.upload_part(Some(&credential), object, 1, multipart.upload_id());
let url = part_upload.sign(ONE_HOUR);
let body = "123456789";
let resp = client.put(url).body(body).send().await.unwrap().error_for_status().unwrap();
let etag = resp.headers().get(ETAG).expect("every UploadPart request returns an Etag");
// TODO use bumpalo to reduce the number of allocations
etags.push(etag.to_str().unwrap().to_owned());
let action = bucket.complete_multipart_upload(
Some(&credential),
object,
multipart.upload_id(),
etags.iter().map(AsRef::as_ref),
);
let url = action.sign(ONE_HOUR);
let resp =
client.post(url).body(action.body()).send().await.unwrap().error_for_status().unwrap();
let body = resp.text().await.unwrap();
println!("it worked! {body}");
for task in &mut tasks {
task.status = Status::Succeeded;
}
Ok(tasks)
}
}