From b64b083003a3a7ad40bcc2fc4f1e5f13088b6d60 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Tue, 4 Nov 2025 11:13:09 +0100 Subject: [PATCH] Support clean CLI options --- crates/index-scheduler/src/lib.rs | 1 + crates/index-scheduler/src/scheduler/mod.rs | 53 ++++-- .../scheduler/process_snapshot_creation.rs | 124 ++++--------- .../src/analytics/segment_analytics.rs | 3 + crates/meilisearch/src/lib.rs | 12 +- crates/meilisearch/src/option.rs | 164 ++++++++++++++++-- crates/milli/src/update/indexer_config.rs | 16 ++ crates/milli/src/update/mod.rs | 2 +- 8 files changed, 258 insertions(+), 117 deletions(-) diff --git a/crates/index-scheduler/src/lib.rs b/crates/index-scheduler/src/lib.rs index 8b3e71d25..abfdc7eb8 100644 --- a/crates/index-scheduler/src/lib.rs +++ b/crates/index-scheduler/src/lib.rs @@ -217,6 +217,7 @@ pub struct IndexScheduler { #[cfg(test)] run_loop_iteration: Arc>, + /// The tokio runtime used for asynchronous tasks. runtime: Option, } diff --git a/crates/index-scheduler/src/scheduler/mod.rs b/crates/index-scheduler/src/scheduler/mod.rs index c57bbf70d..bfbab3869 100644 --- a/crates/index-scheduler/src/scheduler/mod.rs +++ b/crates/index-scheduler/src/scheduler/mod.rs @@ -25,6 +25,7 @@ use convert_case::{Case, Casing as _}; use meilisearch_types::error::ResponseError; use meilisearch_types::heed::{Env, WithoutTls}; use meilisearch_types::milli; +use meilisearch_types::milli::update::S3SnapshotOptions; use meilisearch_types::tasks::Status; use process_batch::ProcessBatchInfo; use rayon::current_num_threads; @@ -87,11 +88,14 @@ pub struct Scheduler { /// Snapshot compaction status. pub(crate) experimental_no_snapshot_compaction: bool, + + /// S3 Snapshot options. + pub(crate) s3_snapshot_options: Option, } impl Scheduler { - pub(crate) fn private_clone(&self) -> Scheduler { - Scheduler { + pub(crate) fn private_clone(&self) -> Self { + Self { must_stop_processing: self.must_stop_processing.clone(), wake_up: self.wake_up.clone(), autobatching_enabled: self.autobatching_enabled, @@ -103,23 +107,52 @@ impl Scheduler { version_file_path: self.version_file_path.clone(), embedding_cache_cap: self.embedding_cache_cap, experimental_no_snapshot_compaction: self.experimental_no_snapshot_compaction, + s3_snapshot_options: self.s3_snapshot_options.clone(), } } pub fn new(options: &IndexSchedulerOptions, auth_env: Env) -> Scheduler { + let IndexSchedulerOptions { + version_file_path, + auth_path: _, + tasks_path: _, + update_file_path: _, + indexes_path: _, + snapshots_path, + dumps_path, + cli_webhook_url: _, + cli_webhook_authorization: _, + task_db_size: _, + index_base_map_size: _, + enable_mdb_writemap: _, + index_growth_amount: _, + index_count: _, + indexer_config, + autobatching_enabled, + cleanup_enabled: _, + max_number_of_tasks: _, + max_number_of_batched_tasks, + batched_tasks_size_limit, + instance_features: _, + auto_upgrade: _, + embedding_cache_cap, + experimental_no_snapshot_compaction, + } = options; + Scheduler { must_stop_processing: MustStopProcessing::default(), // we want to start the loop right away in case meilisearch was ctrl+Ced while processing things wake_up: Arc::new(SignalEvent::auto(true)), - autobatching_enabled: options.autobatching_enabled, - max_number_of_batched_tasks: options.max_number_of_batched_tasks, - batched_tasks_size_limit: options.batched_tasks_size_limit, - dumps_path: options.dumps_path.clone(), - snapshots_path: options.snapshots_path.clone(), + autobatching_enabled: *autobatching_enabled, + max_number_of_batched_tasks: *max_number_of_batched_tasks, + batched_tasks_size_limit: *batched_tasks_size_limit, + dumps_path: dumps_path.clone(), + snapshots_path: snapshots_path.clone(), auth_env, - version_file_path: options.version_file_path.clone(), - embedding_cache_cap: options.embedding_cache_cap, - experimental_no_snapshot_compaction: options.experimental_no_snapshot_compaction, + version_file_path: version_file_path.clone(), + embedding_cache_cap: *embedding_cache_cap, + experimental_no_snapshot_compaction: *experimental_no_snapshot_compaction, + s3_snapshot_options: indexer_config.s3_snapshot_options.clone(), } } } diff --git a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs index 9d3c2516f..278245861 100644 --- a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs +++ b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs @@ -1,16 +1,13 @@ -use std::env::VarError; use std::ffi::OsStr; use std::fs; -#[cfg(unix)] -use std::path::PathBuf; use std::sync::atomic::Ordering; use meilisearch_types::heed::CompactionOption; use meilisearch_types::milli::progress::{Progress, VariableNameStep}; +#[cfg(unix)] +use meilisearch_types::milli::update::S3SnapshotOptions; use meilisearch_types::tasks::{Status, Task}; use meilisearch_types::{compression, VERSION_FILE_NAME}; -#[cfg(unix)] -use path_slash::PathBufExt as _; use crate::heed::EnvOpenOptions; use crate::processing::{AtomicUpdateFileStep, SnapshotCreationProgress}; @@ -87,72 +84,20 @@ impl IndexScheduler { ) -> Result> { progress.update_progress(SnapshotCreationProgress::StartTheSnapshotCreation); - const S3_BUCKET_URL: &str = "MEILI_S3_BUCKET_URL"; - const S3_BUCKET_REGION: &str = "MEILI_S3_BUCKET_REGION"; - const S3_BUCKET_NAME: &str = "MEILI_S3_BUCKET_NAME"; - const S3_SNAPSHOT_PREFIX: &str = "MEILI_S3_SNAPSHOT_PREFIX"; - const S3_ACCESS_KEY: &str = "MEILI_S3_ACCESS_KEY"; - const S3_SECRET_KEY: &str = "MEILI_S3_SECRET_KEY"; - - let bucket_url = std::env::var(S3_BUCKET_URL).map_err(|e| (S3_BUCKET_URL, e)); - let bucket_region = std::env::var(S3_BUCKET_REGION).map_err(|e| (S3_BUCKET_REGION, e)); - let bucket_name = std::env::var(S3_BUCKET_NAME).map_err(|e| (S3_BUCKET_NAME, e)); - let snapshot_prefix = - std::env::var(S3_SNAPSHOT_PREFIX).map_err(|e| (S3_SNAPSHOT_PREFIX, e)); - let access_key = std::env::var(S3_ACCESS_KEY).map_err(|e| (S3_ACCESS_KEY, e)); - let secret_key = std::env::var(S3_SECRET_KEY).map_err(|e| (S3_SECRET_KEY, e)); - match (bucket_url, bucket_region, bucket_name, snapshot_prefix, access_key, secret_key) { - ( - Ok(bucket_url), - Ok(bucket_region), - Ok(bucket_name), - Ok(snapshot_prefix), - Ok(access_key), - Ok(secret_key), - ) => { + match self.scheduler.s3_snapshot_options.clone() { + Some(options) => { #[cfg(not(unix))] { - let _ = ( - bucket_url, - bucket_region, - bucket_name, - snapshot_prefix, - access_key, - secret_key, - ); + let _ = options; panic!("Non-unix platform does not support S3 snapshotting"); } #[cfg(unix)] - self.runtime.as_ref().expect("Runtime not initialized").block_on( - self.process_snapshot_to_s3( - progress, - bucket_url, - bucket_region, - bucket_name, - PathBuf::from_slash(snapshot_prefix), - access_key, - secret_key, - tasks, - ), - ) - } - ( - Err((_, VarError::NotPresent)), - Err((_, VarError::NotPresent)), - Err((_, VarError::NotPresent)), - Err((_, VarError::NotPresent)), - Err((_, VarError::NotPresent)), - Err((_, VarError::NotPresent)), - ) => self.process_snapshots_to_disk(progress, tasks), - (Err((var, e)), _, _, _, _, _) - | (_, Err((var, e)), _, _, _, _) - | (_, _, Err((var, e)), _, _, _) - | (_, _, _, Err((var, e)), _, _) - | (_, _, _, _, Err((var, e)), _) - | (_, _, _, _, _, Err((var, e))) => { - // TODO: Handle error gracefully - panic!("Error while reading environment variables: {}: {}", var, e); + self.runtime + .as_ref() + .expect("Runtime not initialized") + .block_on(self.process_snapshot_to_s3(progress, options, tasks)) } + None => self.process_snapshots_to_disk(progress, tasks), } } @@ -291,12 +236,7 @@ impl IndexScheduler { pub(super) async fn process_snapshot_to_s3( &self, progress: Progress, - bucket_url: String, - bucket_region: String, - bucket_name: String, - snapshot_prefix: PathBuf, - access_key: String, - secret_key: String, + opts: S3SnapshotOptions, mut tasks: Vec, ) -> Result> { use std::collections::VecDeque; @@ -304,9 +244,11 @@ impl IndexScheduler { use std::io::{self, Seek as _, SeekFrom, Write as _}; use std::os::fd::OwnedFd; use std::path::Path; + use std::path::PathBuf; use std::time::Duration; use bytes::{Bytes, BytesMut}; + use path_slash::PathBufExt as _; use reqwest::header::ETAG; use reqwest::Client; use reqwest::Response; @@ -314,33 +256,29 @@ impl IndexScheduler { use rusty_s3::{Bucket, Credentials, UrlStyle}; use tokio::task::JoinHandle; + let S3SnapshotOptions { + s3_bucket_url, + s3_bucket_region, + s3_bucket_name, + s3_snapshot_prefix, + s3_access_key, + s3_secret_key, + s3_max_in_flight_parts, + s3_compression_level: level, + } = opts; + const ONE_HOUR: Duration = Duration::from_secs(3600); // Parts are 375MiB which enables databases of up to 3.5TiB. Must be at least 2x5MiB. const PART_SIZE: usize = 375 * 1024 * 1024; - // The maximum number of parts that can be uploaded in parallel. - const S3_MAX_IN_FLIGHT_PARTS: &str = "MEILI_S3_MAX_IN_FLIGHT_PARTS"; - let max_in_flight_parts: usize = match std::env::var(S3_MAX_IN_FLIGHT_PARTS) { - Ok(val) => val.parse().expect("Failed to parse MEILI_S3_MAX_IN_FLIGHT_PARTS"), - Err(VarError::NotPresent) => 10, - Err(e) => panic!("Failed to read {}: {}", S3_MAX_IN_FLIGHT_PARTS, e), - }; - - // The compression level, defaults to no compression (0) - const S3_COMPRESSION_LEVEL: &str = "MEILI_S3_COMPRESSION_LEVEL"; - let level: u32 = match std::env::var(S3_COMPRESSION_LEVEL) { - Ok(val) => val.parse().expect("Failed to parse MEILI_S3_COMPRESSION_LEVEL"), - Err(VarError::NotPresent) => 0, - Err(e) => panic!("Failed to read {}: {}", S3_COMPRESSION_LEVEL, e), - }; - let client = Client::new(); // TODO Remove this unwrap - let url = bucket_url.parse().unwrap(); - let bucket = Bucket::new(url, UrlStyle::Path, bucket_name, bucket_region).unwrap(); - let credential = Credentials::new(access_key, secret_key); + let s3_snapshot_prefix = PathBuf::from_slash(s3_snapshot_prefix); + let url = s3_bucket_url.parse().unwrap(); + let bucket = Bucket::new(url, UrlStyle::Path, s3_bucket_name, s3_bucket_region).unwrap(); + let credential = Credentials::new(s3_access_key, s3_secret_key); // TODO change this and use the database name like in the original version - let object_path = snapshot_prefix.join("data.ms.snapshot"); + let object_path = s3_snapshot_prefix.join("data.ms.snapshot"); let object = object_path.to_slash().expect("Invalid UTF-8 path").into_owned(); eprintln!("Starting the upload of the snapshot to {object}"); @@ -365,7 +303,7 @@ impl IndexScheduler { let mut etags = Vec::::new(); let mut in_flight = VecDeque::<(JoinHandle>, Bytes)>::with_capacity( - max_in_flight_parts, + s3_max_in_flight_parts, ); for part_number in 1u16.. { @@ -378,7 +316,7 @@ impl IndexScheduler { let url = part_upload.sign(ONE_HOUR); // Wait for a buffer to be ready if there are in-flight parts that landed - let mut buffer = if in_flight.len() >= max_in_flight_parts { + let mut buffer = if in_flight.len() >= s3_max_in_flight_parts { let (request, buffer) = in_flight.pop_front().unwrap(); let resp = request.await.unwrap().unwrap().error_for_status().unwrap(); let etag = diff --git a/crates/meilisearch/src/analytics/segment_analytics.rs b/crates/meilisearch/src/analytics/segment_analytics.rs index 4635c572f..aec71d431 100644 --- a/crates/meilisearch/src/analytics/segment_analytics.rs +++ b/crates/meilisearch/src/analytics/segment_analytics.rs @@ -217,6 +217,7 @@ struct Infos { import_snapshot: bool, schedule_snapshot: Option, snapshot_dir: bool, + uses_s3_snapshots: bool, ignore_missing_snapshot: bool, ignore_snapshot_if_db_exists: bool, http_addr: bool, @@ -285,6 +286,7 @@ impl Infos { indexer_options, config_file_path, no_analytics: _, + s3_snapshot_options, } = options; let schedule_snapshot = match schedule_snapshot { @@ -348,6 +350,7 @@ impl Infos { import_snapshot: import_snapshot.is_some(), schedule_snapshot, snapshot_dir: snapshot_dir != PathBuf::from("snapshots/"), + uses_s3_snapshots: s3_snapshot_options.is_some(), ignore_missing_snapshot, ignore_snapshot_if_db_exists, http_addr: http_addr != default_http_addr(), diff --git a/crates/meilisearch/src/lib.rs b/crates/meilisearch/src/lib.rs index ec507de7f..8a2d2c0a1 100644 --- a/crates/meilisearch/src/lib.rs +++ b/crates/meilisearch/src/lib.rs @@ -233,7 +233,11 @@ pub fn setup_meilisearch( task_db_size: opt.max_task_db_size.as_u64() as usize, index_base_map_size: opt.max_index_size.as_u64() as usize, enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage, - indexer_config: Arc::new((&opt.indexer_options).try_into()?), + indexer_config: Arc::new({ + let s3_snapshot_options = + opt.s3_snapshot_options.clone().map(|opt| opt.try_into()).transpose()?; + IndexerConfig { s3_snapshot_options, ..(&opt.indexer_options).try_into()? } + }), autobatching_enabled: true, cleanup_enabled: !opt.experimental_replication_parameters, max_number_of_tasks: 1_000_000, @@ -534,7 +538,11 @@ fn import_dump( let indexer_config = if base_config.max_threads.is_none() { let (thread_pool, _) = default_thread_pool_and_threads(); - let _config = IndexerConfig { thread_pool, ..*base_config }; + let _config = IndexerConfig { + thread_pool, + s3_snapshot_options: base_config.s3_snapshot_options.clone(), + ..*base_config + }; backup_config = _config; &backup_config } else { diff --git a/crates/meilisearch/src/option.rs b/crates/meilisearch/src/option.rs index 17522e493..1f4173399 100644 --- a/crates/meilisearch/src/option.rs +++ b/crates/meilisearch/src/option.rs @@ -12,7 +12,7 @@ use std::{env, fmt, fs}; use byte_unit::{Byte, ParseError, UnitType}; use clap::Parser; use meilisearch_types::features::InstanceTogglableFeatures; -use meilisearch_types::milli::update::IndexerConfig; +use meilisearch_types::milli::update::{IndexerConfig, S3SnapshotOptions}; use meilisearch_types::milli::ThreadPoolNoAbortBuilder; use rustls::server::{ServerSessionMemoryCache, WebPkiClientVerifier}; use rustls::RootCertStore; @@ -74,6 +74,17 @@ const MEILI_EXPERIMENTAL_EMBEDDING_CACHE_ENTRIES: &str = const MEILI_EXPERIMENTAL_NO_SNAPSHOT_COMPACTION: &str = "MEILI_EXPERIMENTAL_NO_SNAPSHOT_COMPACTION"; const MEILI_EXPERIMENTAL_NO_EDITION_2024_FOR_DUMPS: &str = "MEILI_EXPERIMENTAL_NO_EDITION_2024_FOR_DUMPS"; + +// Related to S3 snapshots +const MEILI_S3_BUCKET_URL: &str = "MEILI_S3_BUCKET_URL"; +const MEILI_S3_BUCKET_REGION: &str = "MEILI_S3_BUCKET_REGION"; +const MEILI_S3_BUCKET_NAME: &str = "MEILI_S3_BUCKET_NAME"; +const MEILI_S3_SNAPSHOT_PREFIX: &str = "MEILI_S3_SNAPSHOT_PREFIX"; +const MEILI_S3_ACCESS_KEY: &str = "MEILI_S3_ACCESS_KEY"; +const MEILI_S3_SECRET_KEY: &str = "MEILI_S3_SECRET_KEY"; +const MEILI_S3_MAX_IN_FLIGHT_PARTS: &str = "MEILI_S3_MAX_IN_FLIGHT_PARTS"; +const MEILI_S3_COMPRESSION_LEVEL: &str = "MEILI_S3_COMPRESSION_LEVEL"; + const DEFAULT_CONFIG_FILE_PATH: &str = "./config.toml"; const DEFAULT_DB_PATH: &str = "./data.ms"; const DEFAULT_HTTP_ADDR: &str = "localhost:7700"; @@ -83,6 +94,8 @@ const DEFAULT_SNAPSHOT_DIR: &str = "snapshots/"; const DEFAULT_SNAPSHOT_INTERVAL_SEC: u64 = 86400; const DEFAULT_SNAPSHOT_INTERVAL_SEC_STR: &str = "86400"; const DEFAULT_DUMP_DIR: &str = "dumps/"; +const DEFAULT_S3_SNAPSHOT_MAX_IN_FLIGHT_PARTS: NonZeroUsize = NonZeroUsize::new(10).unwrap(); +const DEFAULT_S3_SNAPSHOT_COMPRESSION_LEVEL: u32 = 0; const MEILI_MAX_INDEXING_MEMORY: &str = "MEILI_MAX_INDEXING_MEMORY"; const MEILI_MAX_INDEXING_THREADS: &str = "MEILI_MAX_INDEXING_THREADS"; @@ -479,6 +492,10 @@ pub struct Opt { #[clap(flatten)] pub indexer_options: IndexerOpts, + #[serde(flatten)] + #[clap(flatten)] + pub s3_snapshot_options: Option, + /// Set the path to a configuration file that should be used to setup the engine. /// Format must be TOML. #[clap(long)] @@ -580,6 +597,7 @@ impl Opt { experimental_limit_batched_tasks_total_size, experimental_embedding_cache_entries, experimental_no_snapshot_compaction, + s3_snapshot_options, } = self; export_to_env_if_not_present(MEILI_DB_PATH, db_path); export_to_env_if_not_present(MEILI_HTTP_ADDR, http_addr); @@ -681,6 +699,9 @@ impl Opt { experimental_no_snapshot_compaction.to_string(), ); indexer_options.export_to_env(); + if let Some(s3_snapshot_options) = s3_snapshot_options { + s3_snapshot_options.export_to_env(); + } } pub fn get_ssl_config(&self) -> anyhow::Result> { @@ -849,6 +870,16 @@ impl TryFrom<&IndexerOpts> for IndexerConfig { type Error = anyhow::Error; fn try_from(other: &IndexerOpts) -> Result { + let IndexerOpts { + max_indexing_memory, + max_indexing_threads, + skip_index_budget, + experimental_no_edition_2024_for_settings, + experimental_no_edition_2024_for_dumps, + experimental_no_edition_2024_for_prefix_post_processing, + experimental_no_edition_2024_for_facet_post_processing, + } = other; + let thread_pool = ThreadPoolNoAbortBuilder::new_for_indexing() .num_threads(other.max_indexing_threads.unwrap_or_else(|| num_cpus::get() / 2)) .build()?; @@ -856,21 +887,124 @@ impl TryFrom<&IndexerOpts> for IndexerConfig { Ok(Self { thread_pool, log_every_n: Some(DEFAULT_LOG_EVERY_N), - max_memory: other.max_indexing_memory.map(|b| b.as_u64() as usize), - max_threads: *other.max_indexing_threads, + max_memory: max_indexing_memory.map(|b| b.as_u64() as usize), + max_threads: max_indexing_threads.0, max_positions_per_attributes: None, - skip_index_budget: other.skip_index_budget, - experimental_no_edition_2024_for_settings: other - .experimental_no_edition_2024_for_settings, - experimental_no_edition_2024_for_dumps: other.experimental_no_edition_2024_for_dumps, + skip_index_budget: *skip_index_budget, + experimental_no_edition_2024_for_settings: *experimental_no_edition_2024_for_settings, + experimental_no_edition_2024_for_dumps: *experimental_no_edition_2024_for_dumps, chunk_compression_type: Default::default(), chunk_compression_level: Default::default(), documents_chunk_size: Default::default(), max_nb_chunks: Default::default(), - experimental_no_edition_2024_for_prefix_post_processing: other - .experimental_no_edition_2024_for_prefix_post_processing, - experimental_no_edition_2024_for_facet_post_processing: other - .experimental_no_edition_2024_for_facet_post_processing, + experimental_no_edition_2024_for_prefix_post_processing: + *experimental_no_edition_2024_for_prefix_post_processing, + experimental_no_edition_2024_for_facet_post_processing: + *experimental_no_edition_2024_for_facet_post_processing, + s3_snapshot_options: None, + }) + } +} + +#[derive(Debug, Clone, Parser, Deserialize)] +// This group is a bit tricky but makes it possible to require all listed fields if one of them +// is specified. It lets us keep an Option for the S3SnapshotOpts configuration. +// +#[group(requires_all = ["s3_bucket_url", "s3_bucket_region", "s3_bucket_name", "s3_snapshot_prefix", "s3_access_key", "s3_secret_key"])] +pub struct S3SnapshotOpts { + /// The S3 bucket URL in the format https://s3..amazonaws.com. + #[clap(long, env = MEILI_S3_BUCKET_URL, required = false)] + #[serde(default)] + pub s3_bucket_url: String, + + /// The region in the format us-east-1. + #[clap(long, env = MEILI_S3_BUCKET_REGION, required = false)] + #[serde(default)] + pub s3_bucket_region: String, + + /// The bucket name. + #[clap(long, env = MEILI_S3_BUCKET_NAME, required = false)] + #[serde(default)] + pub s3_bucket_name: String, + + /// The prefix path where to put the snapshot, uses normal slashes (/). + #[clap(long, env = MEILI_S3_SNAPSHOT_PREFIX, required = false)] + #[serde(default)] + pub s3_snapshot_prefix: String, + + /// The S3 access key. + #[clap(long, env = MEILI_S3_ACCESS_KEY, required = false)] + #[serde(default)] + pub s3_access_key: String, + + /// The S3 secret key. + #[clap(long, env = MEILI_S3_SECRET_KEY, required = false)] + #[serde(default)] + pub s3_secret_key: String, + + /// The maximum number of parts that can be uploaded in parallel. + #[clap(long, env = MEILI_S3_MAX_IN_FLIGHT_PARTS, default_value_t = default_s3_snapshot_max_in_flight_parts())] + #[serde(default = "default_s3_snapshot_max_in_flight_parts")] + pub s3_max_in_flight_parts: NonZeroUsize, + + /// The compression level. Defaults to no compression (0). + #[clap(long, env = MEILI_S3_COMPRESSION_LEVEL, default_value_t = default_s3_snapshot_compression_level())] + #[serde(default = "default_s3_snapshot_compression_level")] + pub s3_compression_level: u32, +} + +impl S3SnapshotOpts { + /// Exports the values to their corresponding env vars if they are not set. + pub fn export_to_env(self) { + let S3SnapshotOpts { + s3_bucket_url, + s3_bucket_region, + s3_bucket_name, + s3_snapshot_prefix, + s3_access_key, + s3_secret_key, + s3_max_in_flight_parts, + s3_compression_level, + } = self; + + export_to_env_if_not_present(MEILI_S3_BUCKET_URL, s3_bucket_url); + export_to_env_if_not_present(MEILI_S3_BUCKET_REGION, s3_bucket_region); + export_to_env_if_not_present(MEILI_S3_BUCKET_NAME, s3_bucket_name); + export_to_env_if_not_present(MEILI_S3_SNAPSHOT_PREFIX, s3_snapshot_prefix); + export_to_env_if_not_present(MEILI_S3_ACCESS_KEY, s3_access_key); + export_to_env_if_not_present(MEILI_S3_SECRET_KEY, s3_secret_key); + export_to_env_if_not_present( + MEILI_S3_MAX_IN_FLIGHT_PARTS, + s3_max_in_flight_parts.to_string(), + ); + export_to_env_if_not_present(MEILI_S3_COMPRESSION_LEVEL, s3_compression_level.to_string()); + } +} + +impl TryFrom for S3SnapshotOptions { + type Error = anyhow::Error; + + fn try_from(other: S3SnapshotOpts) -> Result { + let S3SnapshotOpts { + s3_bucket_url, + s3_bucket_region, + s3_bucket_name, + s3_snapshot_prefix, + s3_access_key, + s3_secret_key, + s3_max_in_flight_parts, + s3_compression_level, + } = other; + + Ok(S3SnapshotOptions { + s3_bucket_url, + s3_bucket_region, + s3_bucket_name, + s3_snapshot_prefix, + s3_access_key, + s3_secret_key, + s3_max_in_flight_parts, + s3_compression_level, }) } } @@ -1089,6 +1223,14 @@ fn default_snapshot_interval_sec() -> &'static str { DEFAULT_SNAPSHOT_INTERVAL_SEC_STR } +fn default_s3_snapshot_max_in_flight_parts() -> NonZeroUsize { + DEFAULT_S3_SNAPSHOT_MAX_IN_FLIGHT_PARTS +} + +fn default_s3_snapshot_compression_level() -> u32 { + DEFAULT_S3_SNAPSHOT_COMPRESSION_LEVEL +} + fn default_dump_dir() -> PathBuf { PathBuf::from(DEFAULT_DUMP_DIR) } diff --git a/crates/milli/src/update/indexer_config.rs b/crates/milli/src/update/indexer_config.rs index 65e32483d..a3aecb9a2 100644 --- a/crates/milli/src/update/indexer_config.rs +++ b/crates/milli/src/update/indexer_config.rs @@ -1,3 +1,5 @@ +use std::num::NonZeroUsize; + use grenad::CompressionType; use super::GrenadParameters; @@ -20,6 +22,7 @@ pub struct IndexerConfig { pub experimental_no_edition_2024_for_dumps: bool, pub experimental_no_edition_2024_for_prefix_post_processing: bool, pub experimental_no_edition_2024_for_facet_post_processing: bool, + pub s3_snapshot_options: Option, } impl IndexerConfig { @@ -37,6 +40,18 @@ impl IndexerConfig { } } +#[derive(Debug, Clone)] +pub struct S3SnapshotOptions { + pub s3_bucket_url: String, + pub s3_bucket_region: String, + pub s3_bucket_name: String, + pub s3_snapshot_prefix: String, + pub s3_access_key: String, + pub s3_secret_key: String, + pub s3_max_in_flight_parts: NonZeroUsize, + pub s3_compression_level: u32, +} + /// By default use only 1 thread for indexing in tests #[cfg(test)] pub fn default_thread_pool_and_threads() -> (ThreadPoolNoAbort, Option) { @@ -76,6 +91,7 @@ impl Default for IndexerConfig { experimental_no_edition_2024_for_dumps: false, experimental_no_edition_2024_for_prefix_post_processing: false, experimental_no_edition_2024_for_facet_post_processing: false, + s3_snapshot_options: None, } } } diff --git a/crates/milli/src/update/mod.rs b/crates/milli/src/update/mod.rs index 64eb9f1d3..6fb2d0248 100644 --- a/crates/milli/src/update/mod.rs +++ b/crates/milli/src/update/mod.rs @@ -5,7 +5,7 @@ pub use self::concurrent_available_ids::ConcurrentAvailableIds; pub use self::facet::bulk::FacetsUpdateBulk; pub use self::facet::incremental::FacetsUpdateIncrementalInner; pub use self::index_documents::{request_threads, *}; -pub use self::indexer_config::{default_thread_pool_and_threads, IndexerConfig}; +pub use self::indexer_config::{default_thread_pool_and_threads, IndexerConfig, S3SnapshotOptions}; pub use self::new::ChannelCongestion; pub use self::settings::{validate_embedding_settings, Setting, Settings}; pub use self::update_step::UpdateIndexingStep;