This commit is contained in:
Clément Renault
2025-10-16 16:25:52 +02:00
parent 62a8133bcd
commit 0c06bdefac

View File

@@ -92,19 +92,22 @@ impl IndexScheduler {
const S3_BUCKET_URL: &str = "MEILI_S3_BUCKET_URL";
const S3_BUCKET_REGION: &str = "MEILI_S3_BUCKET_REGION";
const S3_BUCKET_NAME: &str = "MEILI_S3_BUCKET_NAME";
const S3_SNAPSHOT_PREFIX: &str = "MEILI_S3_SNAPSHOT_PREFIX";
const S3_ACCESS_KEY: &str = "MEILI_S3_ACCESS_KEY";
const S3_SECRET_KEY: &str = "MEILI_S3_SECRET_KEY";
let bucket_url = std::env::var(S3_BUCKET_URL);
let bucket_region = std::env::var(S3_BUCKET_REGION);
let bucket_name = std::env::var(S3_BUCKET_NAME);
let snapshot_prefix = std::env::var(S3_SNAPSHOT_PREFIX);
let access_key = std::env::var(S3_ACCESS_KEY);
let secret_key = std::env::var(S3_SECRET_KEY);
match (bucket_url, bucket_region, bucket_name, access_key, secret_key) {
match (bucket_url, bucket_region, bucket_name, snapshot_prefix, access_key, secret_key) {
(
Ok(bucket_url),
Ok(bucket_region),
Ok(bucket_name),
Ok(snapshot_prefix),
Ok(access_key),
Ok(secret_key),
) => {
@@ -117,6 +120,7 @@ impl IndexScheduler {
bucket_url,
bucket_region,
bucket_name,
snapshot_prefix,
access_key,
secret_key,
tasks,
@@ -128,12 +132,14 @@ impl IndexScheduler {
Err(VarError::NotPresent),
Err(VarError::NotPresent),
Err(VarError::NotPresent),
Err(VarError::NotPresent),
) => self.process_snapshots_to_disk(progress, tasks),
(Err(e), _, _, _, _)
| (_, Err(e), _, _, _)
| (_, _, Err(e), _, _)
| (_, _, _, Err(e), _)
| (_, _, _, _, Err(e)) => {
(Err(e), _, _, _, _, _)
| (_, Err(e), _, _, _, _)
| (_, _, Err(e), _, _, _)
| (_, _, _, Err(e), _, _)
| (_, _, _, _, Err(e), _)
| (_, _, _, _, _, Err(e)) => {
// TODO: Handle error gracefully
panic!("Error while reading environment variables: {}", e);
}
@@ -277,15 +283,18 @@ impl IndexScheduler {
bucket_url: String,
bucket_region: String,
bucket_name: String,
snapshot_prefix: String,
access_key: String,
secret_key: String,
mut tasks: Vec<Task>,
) -> Result<Vec<Task>> {
use std::io;
use std::path::Path;
use async_compression::tokio::write::GzipEncoder;
use async_compression::Level;
use bytes::{Bytes, BytesMut};
use meilisearch_types::milli::update::new::StdResult;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio::task::JoinHandle;
@@ -293,7 +302,7 @@ impl IndexScheduler {
const ONE_HOUR: Duration = Duration::from_secs(3600);
// default part size is 250MiB
// TODO use 375MiB
const PART_SIZE: usize = 500 * 1024 * 1024;
const PART_SIZE: usize = 5 * 1024 * 1024;
// The maximum number of parts that can be uploaded in parallel.
const S3_MAX_IN_FLIGHT_PARTS: &str = "MEILI_S3_MAX_IN_FLIGHT_PARTS";
@@ -308,7 +317,7 @@ impl IndexScheduler {
let bucket = Bucket::new(url, UrlStyle::Path, bucket_name, bucket_region).unwrap();
let credential = Credentials::new(access_key, secret_key);
// TODO change this and use the database name like in the original version
let object = "data.ms.snapshot";
let object = format!("{}/data.ms.snapshot", snapshot_prefix);
// TODO implement exponential backoff on upload requests: https://docs.rs/backoff
// TODO return a result with actual errors
@@ -316,9 +325,7 @@ impl IndexScheduler {
// TODO Use a better thing than a String for the object path
let (writer, mut reader) = tokio::net::unix::pipe::pipe()?;
let uploader_task = tokio::spawn(async move {
use meilisearch_types::milli::update::new::StdResult;
let action = bucket.create_multipart_upload(Some(&credential), object);
let action = bucket.create_multipart_upload(Some(&credential), &object);
// TODO Question: If it is only signed for an hour and a snapshot takes longer than an hour, what happens?
// If the part is deleted (like a TTL) we should sign it for at least 24 hours.
let url = action.sign(ONE_HOUR);
@@ -327,15 +334,15 @@ impl IndexScheduler {
let multipart = CreateMultipartUpload::parse_response(&body).unwrap();
let mut etags = Vec::<String>::new();
let mut in_flight = VecDeque::<(
JoinHandle<StdResult<reqwest::Response, reqwest::Error>>,
Bytes,
)>::with_capacity(max_in_flight_parts);
for part_number in 1u16.. {
let part_upload = bucket.upload_part(
Some(&credential),
object,
&object,
part_number,
multipart.upload_id(),
);
@@ -358,18 +365,39 @@ impl IndexScheduler {
};
while buffer.len() < (PART_SIZE / 2) {
if reader.read_buf(&mut buffer).await? == 0 {
eprintln!(
"breaking because read returned 0. Buffer len {}; capacity {}",
buffer.len(),
buffer.capacity(),
);
break;
eprintln!(
"buffer is {:.2}% full, trying to read more",
buffer.len() as f32 / buffer.capacity() as f32 * 100.0
);
// Wait for the pipe to be readable
reader.readable().await?;
match reader.try_read_buf(&mut buffer) {
Ok(0) => break,
// We read some bytes but maybe not enough
Ok(n) => {
eprintln!("Read {} bytes from pipe, continuing", n);
continue;
}
// The readiness event is a false positive.
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
eprintln!("received a WouldBlock");
continue;
}
Err(e) => return Err(e.into()),
}
}
eprintln!(
"buffer is {:.2}% full",
buffer.len() as f32 / buffer.capacity() as f32 * 100.0
);
if buffer.is_empty() {
eprintln!("buffer is empty, breaking part number loop");
// Break the loop if the buffer is empty
// after we tried to read bytes
break;
}
@@ -389,7 +417,7 @@ impl IndexScheduler {
let action = bucket.complete_multipart_upload(
Some(&credential),
object,
&object,
multipart.upload_id(),
etags.iter().map(AsRef::as_ref),
);
@@ -498,7 +526,7 @@ impl IndexScheduler {
let path = Path::new("auth").join("data.mdb");
tarball.append_file(path, &mut auth_env_file).await?;
tarball.finish().await?;
tarball.into_inner().await?;
Result::<_, Error>::Ok(())
});