Support clean CLI options

This commit is contained in:
Kerollmops
2025-11-04 11:13:09 +01:00
parent 4fc506f267
commit d01bbbccde
8 changed files with 258 additions and 117 deletions

View File

@@ -1,16 +1,13 @@
use std::env::VarError;
use std::ffi::OsStr;
use std::fs;
#[cfg(unix)]
use std::path::PathBuf;
use std::sync::atomic::Ordering;
use meilisearch_types::heed::CompactionOption;
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
#[cfg(unix)]
use meilisearch_types::milli::update::S3SnapshotOptions;
use meilisearch_types::tasks::{Status, Task};
use meilisearch_types::{compression, VERSION_FILE_NAME};
#[cfg(unix)]
use path_slash::PathBufExt as _;
use crate::heed::EnvOpenOptions;
use crate::processing::{AtomicUpdateFileStep, SnapshotCreationProgress};
@@ -87,72 +84,20 @@ impl IndexScheduler {
) -> Result<Vec<Task>> {
progress.update_progress(SnapshotCreationProgress::StartTheSnapshotCreation);
const S3_BUCKET_URL: &str = "MEILI_S3_BUCKET_URL";
const S3_BUCKET_REGION: &str = "MEILI_S3_BUCKET_REGION";
const S3_BUCKET_NAME: &str = "MEILI_S3_BUCKET_NAME";
const S3_SNAPSHOT_PREFIX: &str = "MEILI_S3_SNAPSHOT_PREFIX";
const S3_ACCESS_KEY: &str = "MEILI_S3_ACCESS_KEY";
const S3_SECRET_KEY: &str = "MEILI_S3_SECRET_KEY";
let bucket_url = std::env::var(S3_BUCKET_URL).map_err(|e| (S3_BUCKET_URL, e));
let bucket_region = std::env::var(S3_BUCKET_REGION).map_err(|e| (S3_BUCKET_REGION, e));
let bucket_name = std::env::var(S3_BUCKET_NAME).map_err(|e| (S3_BUCKET_NAME, e));
let snapshot_prefix =
std::env::var(S3_SNAPSHOT_PREFIX).map_err(|e| (S3_SNAPSHOT_PREFIX, e));
let access_key = std::env::var(S3_ACCESS_KEY).map_err(|e| (S3_ACCESS_KEY, e));
let secret_key = std::env::var(S3_SECRET_KEY).map_err(|e| (S3_SECRET_KEY, e));
match (bucket_url, bucket_region, bucket_name, snapshot_prefix, access_key, secret_key) {
(
Ok(bucket_url),
Ok(bucket_region),
Ok(bucket_name),
Ok(snapshot_prefix),
Ok(access_key),
Ok(secret_key),
) => {
match self.scheduler.s3_snapshot_options.clone() {
Some(options) => {
#[cfg(not(unix))]
{
let _ = (
bucket_url,
bucket_region,
bucket_name,
snapshot_prefix,
access_key,
secret_key,
);
let _ = options;
panic!("Non-unix platform does not support S3 snapshotting");
}
#[cfg(unix)]
self.runtime.as_ref().expect("Runtime not initialized").block_on(
self.process_snapshot_to_s3(
progress,
bucket_url,
bucket_region,
bucket_name,
PathBuf::from_slash(snapshot_prefix),
access_key,
secret_key,
tasks,
),
)
}
(
Err((_, VarError::NotPresent)),
Err((_, VarError::NotPresent)),
Err((_, VarError::NotPresent)),
Err((_, VarError::NotPresent)),
Err((_, VarError::NotPresent)),
Err((_, VarError::NotPresent)),
) => self.process_snapshots_to_disk(progress, tasks),
(Err((var, e)), _, _, _, _, _)
| (_, Err((var, e)), _, _, _, _)
| (_, _, Err((var, e)), _, _, _)
| (_, _, _, Err((var, e)), _, _)
| (_, _, _, _, Err((var, e)), _)
| (_, _, _, _, _, Err((var, e))) => {
// TODO: Handle error gracefully
panic!("Error while reading environment variables: {}: {}", var, e);
self.runtime
.as_ref()
.expect("Runtime not initialized")
.block_on(self.process_snapshot_to_s3(progress, options, tasks))
}
None => self.process_snapshots_to_disk(progress, tasks),
}
}
@@ -291,12 +236,7 @@ impl IndexScheduler {
pub(super) async fn process_snapshot_to_s3(
&self,
progress: Progress,
bucket_url: String,
bucket_region: String,
bucket_name: String,
snapshot_prefix: PathBuf,
access_key: String,
secret_key: String,
opts: S3SnapshotOptions,
mut tasks: Vec<Task>,
) -> Result<Vec<Task>> {
use std::collections::VecDeque;
@@ -304,9 +244,11 @@ impl IndexScheduler {
use std::io::{self, Seek as _, SeekFrom, Write as _};
use std::os::fd::OwnedFd;
use std::path::Path;
use std::path::PathBuf;
use std::time::Duration;
use bytes::{Bytes, BytesMut};
use path_slash::PathBufExt as _;
use reqwest::header::ETAG;
use reqwest::Client;
use reqwest::Response;
@@ -314,33 +256,29 @@ impl IndexScheduler {
use rusty_s3::{Bucket, Credentials, UrlStyle};
use tokio::task::JoinHandle;
let S3SnapshotOptions {
s3_bucket_url,
s3_bucket_region,
s3_bucket_name,
s3_snapshot_prefix,
s3_access_key,
s3_secret_key,
s3_max_in_flight_parts,
s3_compression_level: level,
} = opts;
const ONE_HOUR: Duration = Duration::from_secs(3600);
// Parts are 375MiB which enables databases of up to 3.5TiB. Must be at least 2x5MiB.
const PART_SIZE: usize = 375 * 1024 * 1024;
// The maximum number of parts that can be uploaded in parallel.
const S3_MAX_IN_FLIGHT_PARTS: &str = "MEILI_S3_MAX_IN_FLIGHT_PARTS";
let max_in_flight_parts: usize = match std::env::var(S3_MAX_IN_FLIGHT_PARTS) {
Ok(val) => val.parse().expect("Failed to parse MEILI_S3_MAX_IN_FLIGHT_PARTS"),
Err(VarError::NotPresent) => 10,
Err(e) => panic!("Failed to read {}: {}", S3_MAX_IN_FLIGHT_PARTS, e),
};
// The compression level, defaults to no compression (0)
const S3_COMPRESSION_LEVEL: &str = "MEILI_S3_COMPRESSION_LEVEL";
let level: u32 = match std::env::var(S3_COMPRESSION_LEVEL) {
Ok(val) => val.parse().expect("Failed to parse MEILI_S3_COMPRESSION_LEVEL"),
Err(VarError::NotPresent) => 0,
Err(e) => panic!("Failed to read {}: {}", S3_COMPRESSION_LEVEL, e),
};
let client = Client::new();
// TODO Remove this unwrap
let url = bucket_url.parse().unwrap();
let bucket = Bucket::new(url, UrlStyle::Path, bucket_name, bucket_region).unwrap();
let credential = Credentials::new(access_key, secret_key);
let s3_snapshot_prefix = PathBuf::from_slash(s3_snapshot_prefix);
let url = s3_bucket_url.parse().unwrap();
let bucket = Bucket::new(url, UrlStyle::Path, s3_bucket_name, s3_bucket_region).unwrap();
let credential = Credentials::new(s3_access_key, s3_secret_key);
// TODO change this and use the database name like in the original version
let object_path = snapshot_prefix.join("data.ms.snapshot");
let object_path = s3_snapshot_prefix.join("data.ms.snapshot");
let object = object_path.to_slash().expect("Invalid UTF-8 path").into_owned();
eprintln!("Starting the upload of the snapshot to {object}");
@@ -365,7 +303,7 @@ impl IndexScheduler {
let mut etags = Vec::<String>::new();
let mut in_flight =
VecDeque::<(JoinHandle<reqwest::Result<Response>>, Bytes)>::with_capacity(
max_in_flight_parts,
s3_max_in_flight_parts,
);
for part_number in 1u16.. {
@@ -378,7 +316,7 @@ impl IndexScheduler {
let url = part_upload.sign(ONE_HOUR);
// Wait for a buffer to be ready if there are in-flight parts that landed
let mut buffer = if in_flight.len() >= max_in_flight_parts {
let mut buffer = if in_flight.len() >= s3_max_in_flight_parts {
let (request, buffer) = in_flight.pop_front().unwrap();
let resp = request.await.unwrap().unwrap().error_for_status().unwrap();
let etag =