mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-11-22 04:36:32 +00:00
Support clean CLI options
This commit is contained in:
@@ -217,6 +217,7 @@ pub struct IndexScheduler {
|
||||
#[cfg(test)]
|
||||
run_loop_iteration: Arc<RwLock<usize>>,
|
||||
|
||||
/// The tokio runtime used for asynchronous tasks.
|
||||
runtime: Option<tokio::runtime::Handle>,
|
||||
}
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@ use convert_case::{Case, Casing as _};
|
||||
use meilisearch_types::error::ResponseError;
|
||||
use meilisearch_types::heed::{Env, WithoutTls};
|
||||
use meilisearch_types::milli;
|
||||
use meilisearch_types::milli::update::S3SnapshotOptions;
|
||||
use meilisearch_types::tasks::Status;
|
||||
use process_batch::ProcessBatchInfo;
|
||||
use rayon::current_num_threads;
|
||||
@@ -87,11 +88,14 @@ pub struct Scheduler {
|
||||
|
||||
/// Snapshot compaction status.
|
||||
pub(crate) experimental_no_snapshot_compaction: bool,
|
||||
|
||||
/// S3 Snapshot options.
|
||||
pub(crate) s3_snapshot_options: Option<S3SnapshotOptions>,
|
||||
}
|
||||
|
||||
impl Scheduler {
|
||||
pub(crate) fn private_clone(&self) -> Scheduler {
|
||||
Scheduler {
|
||||
pub(crate) fn private_clone(&self) -> Self {
|
||||
Self {
|
||||
must_stop_processing: self.must_stop_processing.clone(),
|
||||
wake_up: self.wake_up.clone(),
|
||||
autobatching_enabled: self.autobatching_enabled,
|
||||
@@ -103,23 +107,52 @@ impl Scheduler {
|
||||
version_file_path: self.version_file_path.clone(),
|
||||
embedding_cache_cap: self.embedding_cache_cap,
|
||||
experimental_no_snapshot_compaction: self.experimental_no_snapshot_compaction,
|
||||
s3_snapshot_options: self.s3_snapshot_options.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new(options: &IndexSchedulerOptions, auth_env: Env<WithoutTls>) -> Scheduler {
|
||||
let IndexSchedulerOptions {
|
||||
version_file_path,
|
||||
auth_path: _,
|
||||
tasks_path: _,
|
||||
update_file_path: _,
|
||||
indexes_path: _,
|
||||
snapshots_path,
|
||||
dumps_path,
|
||||
cli_webhook_url: _,
|
||||
cli_webhook_authorization: _,
|
||||
task_db_size: _,
|
||||
index_base_map_size: _,
|
||||
enable_mdb_writemap: _,
|
||||
index_growth_amount: _,
|
||||
index_count: _,
|
||||
indexer_config,
|
||||
autobatching_enabled,
|
||||
cleanup_enabled: _,
|
||||
max_number_of_tasks: _,
|
||||
max_number_of_batched_tasks,
|
||||
batched_tasks_size_limit,
|
||||
instance_features: _,
|
||||
auto_upgrade: _,
|
||||
embedding_cache_cap,
|
||||
experimental_no_snapshot_compaction,
|
||||
} = options;
|
||||
|
||||
Scheduler {
|
||||
must_stop_processing: MustStopProcessing::default(),
|
||||
// we want to start the loop right away in case meilisearch was ctrl+Ced while processing things
|
||||
wake_up: Arc::new(SignalEvent::auto(true)),
|
||||
autobatching_enabled: options.autobatching_enabled,
|
||||
max_number_of_batched_tasks: options.max_number_of_batched_tasks,
|
||||
batched_tasks_size_limit: options.batched_tasks_size_limit,
|
||||
dumps_path: options.dumps_path.clone(),
|
||||
snapshots_path: options.snapshots_path.clone(),
|
||||
autobatching_enabled: *autobatching_enabled,
|
||||
max_number_of_batched_tasks: *max_number_of_batched_tasks,
|
||||
batched_tasks_size_limit: *batched_tasks_size_limit,
|
||||
dumps_path: dumps_path.clone(),
|
||||
snapshots_path: snapshots_path.clone(),
|
||||
auth_env,
|
||||
version_file_path: options.version_file_path.clone(),
|
||||
embedding_cache_cap: options.embedding_cache_cap,
|
||||
experimental_no_snapshot_compaction: options.experimental_no_snapshot_compaction,
|
||||
version_file_path: version_file_path.clone(),
|
||||
embedding_cache_cap: *embedding_cache_cap,
|
||||
experimental_no_snapshot_compaction: *experimental_no_snapshot_compaction,
|
||||
s3_snapshot_options: indexer_config.s3_snapshot_options.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,16 +1,13 @@
|
||||
use std::env::VarError;
|
||||
use std::ffi::OsStr;
|
||||
use std::fs;
|
||||
#[cfg(unix)]
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
use meilisearch_types::heed::CompactionOption;
|
||||
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
|
||||
#[cfg(unix)]
|
||||
use meilisearch_types::milli::update::S3SnapshotOptions;
|
||||
use meilisearch_types::tasks::{Status, Task};
|
||||
use meilisearch_types::{compression, VERSION_FILE_NAME};
|
||||
#[cfg(unix)]
|
||||
use path_slash::PathBufExt as _;
|
||||
|
||||
use crate::heed::EnvOpenOptions;
|
||||
use crate::processing::{AtomicUpdateFileStep, SnapshotCreationProgress};
|
||||
@@ -87,72 +84,20 @@ impl IndexScheduler {
|
||||
) -> Result<Vec<Task>> {
|
||||
progress.update_progress(SnapshotCreationProgress::StartTheSnapshotCreation);
|
||||
|
||||
const S3_BUCKET_URL: &str = "MEILI_S3_BUCKET_URL";
|
||||
const S3_BUCKET_REGION: &str = "MEILI_S3_BUCKET_REGION";
|
||||
const S3_BUCKET_NAME: &str = "MEILI_S3_BUCKET_NAME";
|
||||
const S3_SNAPSHOT_PREFIX: &str = "MEILI_S3_SNAPSHOT_PREFIX";
|
||||
const S3_ACCESS_KEY: &str = "MEILI_S3_ACCESS_KEY";
|
||||
const S3_SECRET_KEY: &str = "MEILI_S3_SECRET_KEY";
|
||||
|
||||
let bucket_url = std::env::var(S3_BUCKET_URL).map_err(|e| (S3_BUCKET_URL, e));
|
||||
let bucket_region = std::env::var(S3_BUCKET_REGION).map_err(|e| (S3_BUCKET_REGION, e));
|
||||
let bucket_name = std::env::var(S3_BUCKET_NAME).map_err(|e| (S3_BUCKET_NAME, e));
|
||||
let snapshot_prefix =
|
||||
std::env::var(S3_SNAPSHOT_PREFIX).map_err(|e| (S3_SNAPSHOT_PREFIX, e));
|
||||
let access_key = std::env::var(S3_ACCESS_KEY).map_err(|e| (S3_ACCESS_KEY, e));
|
||||
let secret_key = std::env::var(S3_SECRET_KEY).map_err(|e| (S3_SECRET_KEY, e));
|
||||
match (bucket_url, bucket_region, bucket_name, snapshot_prefix, access_key, secret_key) {
|
||||
(
|
||||
Ok(bucket_url),
|
||||
Ok(bucket_region),
|
||||
Ok(bucket_name),
|
||||
Ok(snapshot_prefix),
|
||||
Ok(access_key),
|
||||
Ok(secret_key),
|
||||
) => {
|
||||
match self.scheduler.s3_snapshot_options.clone() {
|
||||
Some(options) => {
|
||||
#[cfg(not(unix))]
|
||||
{
|
||||
let _ = (
|
||||
bucket_url,
|
||||
bucket_region,
|
||||
bucket_name,
|
||||
snapshot_prefix,
|
||||
access_key,
|
||||
secret_key,
|
||||
);
|
||||
let _ = options;
|
||||
panic!("Non-unix platform does not support S3 snapshotting");
|
||||
}
|
||||
#[cfg(unix)]
|
||||
self.runtime.as_ref().expect("Runtime not initialized").block_on(
|
||||
self.process_snapshot_to_s3(
|
||||
progress,
|
||||
bucket_url,
|
||||
bucket_region,
|
||||
bucket_name,
|
||||
PathBuf::from_slash(snapshot_prefix),
|
||||
access_key,
|
||||
secret_key,
|
||||
tasks,
|
||||
),
|
||||
)
|
||||
}
|
||||
(
|
||||
Err((_, VarError::NotPresent)),
|
||||
Err((_, VarError::NotPresent)),
|
||||
Err((_, VarError::NotPresent)),
|
||||
Err((_, VarError::NotPresent)),
|
||||
Err((_, VarError::NotPresent)),
|
||||
Err((_, VarError::NotPresent)),
|
||||
) => self.process_snapshots_to_disk(progress, tasks),
|
||||
(Err((var, e)), _, _, _, _, _)
|
||||
| (_, Err((var, e)), _, _, _, _)
|
||||
| (_, _, Err((var, e)), _, _, _)
|
||||
| (_, _, _, Err((var, e)), _, _)
|
||||
| (_, _, _, _, Err((var, e)), _)
|
||||
| (_, _, _, _, _, Err((var, e))) => {
|
||||
// TODO: Handle error gracefully
|
||||
panic!("Error while reading environment variables: {}: {}", var, e);
|
||||
self.runtime
|
||||
.as_ref()
|
||||
.expect("Runtime not initialized")
|
||||
.block_on(self.process_snapshot_to_s3(progress, options, tasks))
|
||||
}
|
||||
None => self.process_snapshots_to_disk(progress, tasks),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -291,12 +236,7 @@ impl IndexScheduler {
|
||||
pub(super) async fn process_snapshot_to_s3(
|
||||
&self,
|
||||
progress: Progress,
|
||||
bucket_url: String,
|
||||
bucket_region: String,
|
||||
bucket_name: String,
|
||||
snapshot_prefix: PathBuf,
|
||||
access_key: String,
|
||||
secret_key: String,
|
||||
opts: S3SnapshotOptions,
|
||||
mut tasks: Vec<Task>,
|
||||
) -> Result<Vec<Task>> {
|
||||
use std::collections::VecDeque;
|
||||
@@ -304,9 +244,11 @@ impl IndexScheduler {
|
||||
use std::io::{self, Seek as _, SeekFrom, Write as _};
|
||||
use std::os::fd::OwnedFd;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use path_slash::PathBufExt as _;
|
||||
use reqwest::header::ETAG;
|
||||
use reqwest::Client;
|
||||
use reqwest::Response;
|
||||
@@ -314,33 +256,29 @@ impl IndexScheduler {
|
||||
use rusty_s3::{Bucket, Credentials, UrlStyle};
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
let S3SnapshotOptions {
|
||||
s3_bucket_url,
|
||||
s3_bucket_region,
|
||||
s3_bucket_name,
|
||||
s3_snapshot_prefix,
|
||||
s3_access_key,
|
||||
s3_secret_key,
|
||||
s3_max_in_flight_parts,
|
||||
s3_compression_level: level,
|
||||
} = opts;
|
||||
|
||||
const ONE_HOUR: Duration = Duration::from_secs(3600);
|
||||
// Parts are 375MiB which enables databases of up to 3.5TiB. Must be at least 2x5MiB.
|
||||
const PART_SIZE: usize = 375 * 1024 * 1024;
|
||||
|
||||
// The maximum number of parts that can be uploaded in parallel.
|
||||
const S3_MAX_IN_FLIGHT_PARTS: &str = "MEILI_S3_MAX_IN_FLIGHT_PARTS";
|
||||
let max_in_flight_parts: usize = match std::env::var(S3_MAX_IN_FLIGHT_PARTS) {
|
||||
Ok(val) => val.parse().expect("Failed to parse MEILI_S3_MAX_IN_FLIGHT_PARTS"),
|
||||
Err(VarError::NotPresent) => 10,
|
||||
Err(e) => panic!("Failed to read {}: {}", S3_MAX_IN_FLIGHT_PARTS, e),
|
||||
};
|
||||
|
||||
// The compression level, defaults to no compression (0)
|
||||
const S3_COMPRESSION_LEVEL: &str = "MEILI_S3_COMPRESSION_LEVEL";
|
||||
let level: u32 = match std::env::var(S3_COMPRESSION_LEVEL) {
|
||||
Ok(val) => val.parse().expect("Failed to parse MEILI_S3_COMPRESSION_LEVEL"),
|
||||
Err(VarError::NotPresent) => 0,
|
||||
Err(e) => panic!("Failed to read {}: {}", S3_COMPRESSION_LEVEL, e),
|
||||
};
|
||||
|
||||
let client = Client::new();
|
||||
// TODO Remove this unwrap
|
||||
let url = bucket_url.parse().unwrap();
|
||||
let bucket = Bucket::new(url, UrlStyle::Path, bucket_name, bucket_region).unwrap();
|
||||
let credential = Credentials::new(access_key, secret_key);
|
||||
let s3_snapshot_prefix = PathBuf::from_slash(s3_snapshot_prefix);
|
||||
let url = s3_bucket_url.parse().unwrap();
|
||||
let bucket = Bucket::new(url, UrlStyle::Path, s3_bucket_name, s3_bucket_region).unwrap();
|
||||
let credential = Credentials::new(s3_access_key, s3_secret_key);
|
||||
// TODO change this and use the database name like in the original version
|
||||
let object_path = snapshot_prefix.join("data.ms.snapshot");
|
||||
let object_path = s3_snapshot_prefix.join("data.ms.snapshot");
|
||||
let object = object_path.to_slash().expect("Invalid UTF-8 path").into_owned();
|
||||
|
||||
eprintln!("Starting the upload of the snapshot to {object}");
|
||||
@@ -365,7 +303,7 @@ impl IndexScheduler {
|
||||
let mut etags = Vec::<String>::new();
|
||||
let mut in_flight =
|
||||
VecDeque::<(JoinHandle<reqwest::Result<Response>>, Bytes)>::with_capacity(
|
||||
max_in_flight_parts,
|
||||
s3_max_in_flight_parts,
|
||||
);
|
||||
|
||||
for part_number in 1u16.. {
|
||||
@@ -378,7 +316,7 @@ impl IndexScheduler {
|
||||
let url = part_upload.sign(ONE_HOUR);
|
||||
|
||||
// Wait for a buffer to be ready if there are in-flight parts that landed
|
||||
let mut buffer = if in_flight.len() >= max_in_flight_parts {
|
||||
let mut buffer = if in_flight.len() >= s3_max_in_flight_parts {
|
||||
let (request, buffer) = in_flight.pop_front().unwrap();
|
||||
let resp = request.await.unwrap().unwrap().error_for_status().unwrap();
|
||||
let etag =
|
||||
|
||||
@@ -217,6 +217,7 @@ struct Infos {
|
||||
import_snapshot: bool,
|
||||
schedule_snapshot: Option<u64>,
|
||||
snapshot_dir: bool,
|
||||
uses_s3_snapshots: bool,
|
||||
ignore_missing_snapshot: bool,
|
||||
ignore_snapshot_if_db_exists: bool,
|
||||
http_addr: bool,
|
||||
@@ -285,6 +286,7 @@ impl Infos {
|
||||
indexer_options,
|
||||
config_file_path,
|
||||
no_analytics: _,
|
||||
s3_snapshot_options,
|
||||
} = options;
|
||||
|
||||
let schedule_snapshot = match schedule_snapshot {
|
||||
@@ -348,6 +350,7 @@ impl Infos {
|
||||
import_snapshot: import_snapshot.is_some(),
|
||||
schedule_snapshot,
|
||||
snapshot_dir: snapshot_dir != PathBuf::from("snapshots/"),
|
||||
uses_s3_snapshots: s3_snapshot_options.is_some(),
|
||||
ignore_missing_snapshot,
|
||||
ignore_snapshot_if_db_exists,
|
||||
http_addr: http_addr != default_http_addr(),
|
||||
|
||||
@@ -233,7 +233,11 @@ pub fn setup_meilisearch(
|
||||
task_db_size: opt.max_task_db_size.as_u64() as usize,
|
||||
index_base_map_size: opt.max_index_size.as_u64() as usize,
|
||||
enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage,
|
||||
indexer_config: Arc::new((&opt.indexer_options).try_into()?),
|
||||
indexer_config: Arc::new({
|
||||
let s3_snapshot_options =
|
||||
opt.s3_snapshot_options.clone().map(|opt| opt.try_into()).transpose()?;
|
||||
IndexerConfig { s3_snapshot_options, ..(&opt.indexer_options).try_into()? }
|
||||
}),
|
||||
autobatching_enabled: true,
|
||||
cleanup_enabled: !opt.experimental_replication_parameters,
|
||||
max_number_of_tasks: 1_000_000,
|
||||
@@ -534,7 +538,11 @@ fn import_dump(
|
||||
let indexer_config = if base_config.max_threads.is_none() {
|
||||
let (thread_pool, _) = default_thread_pool_and_threads();
|
||||
|
||||
let _config = IndexerConfig { thread_pool, ..*base_config };
|
||||
let _config = IndexerConfig {
|
||||
thread_pool,
|
||||
s3_snapshot_options: base_config.s3_snapshot_options.clone(),
|
||||
..*base_config
|
||||
};
|
||||
backup_config = _config;
|
||||
&backup_config
|
||||
} else {
|
||||
|
||||
@@ -12,7 +12,7 @@ use std::{env, fmt, fs};
|
||||
use byte_unit::{Byte, ParseError, UnitType};
|
||||
use clap::Parser;
|
||||
use meilisearch_types::features::InstanceTogglableFeatures;
|
||||
use meilisearch_types::milli::update::IndexerConfig;
|
||||
use meilisearch_types::milli::update::{IndexerConfig, S3SnapshotOptions};
|
||||
use meilisearch_types::milli::ThreadPoolNoAbortBuilder;
|
||||
use rustls::server::{ServerSessionMemoryCache, WebPkiClientVerifier};
|
||||
use rustls::RootCertStore;
|
||||
@@ -74,6 +74,17 @@ const MEILI_EXPERIMENTAL_EMBEDDING_CACHE_ENTRIES: &str =
|
||||
const MEILI_EXPERIMENTAL_NO_SNAPSHOT_COMPACTION: &str = "MEILI_EXPERIMENTAL_NO_SNAPSHOT_COMPACTION";
|
||||
const MEILI_EXPERIMENTAL_NO_EDITION_2024_FOR_DUMPS: &str =
|
||||
"MEILI_EXPERIMENTAL_NO_EDITION_2024_FOR_DUMPS";
|
||||
|
||||
// Related to S3 snapshots
|
||||
const MEILI_S3_BUCKET_URL: &str = "MEILI_S3_BUCKET_URL";
|
||||
const MEILI_S3_BUCKET_REGION: &str = "MEILI_S3_BUCKET_REGION";
|
||||
const MEILI_S3_BUCKET_NAME: &str = "MEILI_S3_BUCKET_NAME";
|
||||
const MEILI_S3_SNAPSHOT_PREFIX: &str = "MEILI_S3_SNAPSHOT_PREFIX";
|
||||
const MEILI_S3_ACCESS_KEY: &str = "MEILI_S3_ACCESS_KEY";
|
||||
const MEILI_S3_SECRET_KEY: &str = "MEILI_S3_SECRET_KEY";
|
||||
const MEILI_S3_MAX_IN_FLIGHT_PARTS: &str = "MEILI_S3_MAX_IN_FLIGHT_PARTS";
|
||||
const MEILI_S3_COMPRESSION_LEVEL: &str = "MEILI_S3_COMPRESSION_LEVEL";
|
||||
|
||||
const DEFAULT_CONFIG_FILE_PATH: &str = "./config.toml";
|
||||
const DEFAULT_DB_PATH: &str = "./data.ms";
|
||||
const DEFAULT_HTTP_ADDR: &str = "localhost:7700";
|
||||
@@ -83,6 +94,8 @@ const DEFAULT_SNAPSHOT_DIR: &str = "snapshots/";
|
||||
const DEFAULT_SNAPSHOT_INTERVAL_SEC: u64 = 86400;
|
||||
const DEFAULT_SNAPSHOT_INTERVAL_SEC_STR: &str = "86400";
|
||||
const DEFAULT_DUMP_DIR: &str = "dumps/";
|
||||
const DEFAULT_S3_SNAPSHOT_MAX_IN_FLIGHT_PARTS: NonZeroUsize = NonZeroUsize::new(10).unwrap();
|
||||
const DEFAULT_S3_SNAPSHOT_COMPRESSION_LEVEL: u32 = 0;
|
||||
|
||||
const MEILI_MAX_INDEXING_MEMORY: &str = "MEILI_MAX_INDEXING_MEMORY";
|
||||
const MEILI_MAX_INDEXING_THREADS: &str = "MEILI_MAX_INDEXING_THREADS";
|
||||
@@ -479,6 +492,10 @@ pub struct Opt {
|
||||
#[clap(flatten)]
|
||||
pub indexer_options: IndexerOpts,
|
||||
|
||||
#[serde(flatten)]
|
||||
#[clap(flatten)]
|
||||
pub s3_snapshot_options: Option<S3SnapshotOpts>,
|
||||
|
||||
/// Set the path to a configuration file that should be used to setup the engine.
|
||||
/// Format must be TOML.
|
||||
#[clap(long)]
|
||||
@@ -580,6 +597,7 @@ impl Opt {
|
||||
experimental_limit_batched_tasks_total_size,
|
||||
experimental_embedding_cache_entries,
|
||||
experimental_no_snapshot_compaction,
|
||||
s3_snapshot_options,
|
||||
} = self;
|
||||
export_to_env_if_not_present(MEILI_DB_PATH, db_path);
|
||||
export_to_env_if_not_present(MEILI_HTTP_ADDR, http_addr);
|
||||
@@ -681,6 +699,9 @@ impl Opt {
|
||||
experimental_no_snapshot_compaction.to_string(),
|
||||
);
|
||||
indexer_options.export_to_env();
|
||||
if let Some(s3_snapshot_options) = s3_snapshot_options {
|
||||
s3_snapshot_options.export_to_env();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_ssl_config(&self) -> anyhow::Result<Option<rustls::ServerConfig>> {
|
||||
@@ -849,6 +870,16 @@ impl TryFrom<&IndexerOpts> for IndexerConfig {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(other: &IndexerOpts) -> Result<Self, Self::Error> {
|
||||
let IndexerOpts {
|
||||
max_indexing_memory,
|
||||
max_indexing_threads,
|
||||
skip_index_budget,
|
||||
experimental_no_edition_2024_for_settings,
|
||||
experimental_no_edition_2024_for_dumps,
|
||||
experimental_no_edition_2024_for_prefix_post_processing,
|
||||
experimental_no_edition_2024_for_facet_post_processing,
|
||||
} = other;
|
||||
|
||||
let thread_pool = ThreadPoolNoAbortBuilder::new_for_indexing()
|
||||
.num_threads(other.max_indexing_threads.unwrap_or_else(|| num_cpus::get() / 2))
|
||||
.build()?;
|
||||
@@ -856,21 +887,124 @@ impl TryFrom<&IndexerOpts> for IndexerConfig {
|
||||
Ok(Self {
|
||||
thread_pool,
|
||||
log_every_n: Some(DEFAULT_LOG_EVERY_N),
|
||||
max_memory: other.max_indexing_memory.map(|b| b.as_u64() as usize),
|
||||
max_threads: *other.max_indexing_threads,
|
||||
max_memory: max_indexing_memory.map(|b| b.as_u64() as usize),
|
||||
max_threads: max_indexing_threads.0,
|
||||
max_positions_per_attributes: None,
|
||||
skip_index_budget: other.skip_index_budget,
|
||||
experimental_no_edition_2024_for_settings: other
|
||||
.experimental_no_edition_2024_for_settings,
|
||||
experimental_no_edition_2024_for_dumps: other.experimental_no_edition_2024_for_dumps,
|
||||
skip_index_budget: *skip_index_budget,
|
||||
experimental_no_edition_2024_for_settings: *experimental_no_edition_2024_for_settings,
|
||||
experimental_no_edition_2024_for_dumps: *experimental_no_edition_2024_for_dumps,
|
||||
chunk_compression_type: Default::default(),
|
||||
chunk_compression_level: Default::default(),
|
||||
documents_chunk_size: Default::default(),
|
||||
max_nb_chunks: Default::default(),
|
||||
experimental_no_edition_2024_for_prefix_post_processing: other
|
||||
.experimental_no_edition_2024_for_prefix_post_processing,
|
||||
experimental_no_edition_2024_for_facet_post_processing: other
|
||||
.experimental_no_edition_2024_for_facet_post_processing,
|
||||
experimental_no_edition_2024_for_prefix_post_processing:
|
||||
*experimental_no_edition_2024_for_prefix_post_processing,
|
||||
experimental_no_edition_2024_for_facet_post_processing:
|
||||
*experimental_no_edition_2024_for_facet_post_processing,
|
||||
s3_snapshot_options: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Parser, Deserialize)]
|
||||
// This group is a bit tricky but makes it possible to require all listed fields if one of them
|
||||
// is specified. It lets us keep an Option for the S3SnapshotOpts configuration.
|
||||
// <https://github.com/clap-rs/clap/issues/5092#issuecomment-2616986075>
|
||||
#[group(requires_all = ["s3_bucket_url", "s3_bucket_region", "s3_bucket_name", "s3_snapshot_prefix", "s3_access_key", "s3_secret_key"])]
|
||||
pub struct S3SnapshotOpts {
|
||||
/// The S3 bucket URL in the format https://s3.<region>.amazonaws.com.
|
||||
#[clap(long, env = MEILI_S3_BUCKET_URL, required = false)]
|
||||
#[serde(default)]
|
||||
pub s3_bucket_url: String,
|
||||
|
||||
/// The region in the format us-east-1.
|
||||
#[clap(long, env = MEILI_S3_BUCKET_REGION, required = false)]
|
||||
#[serde(default)]
|
||||
pub s3_bucket_region: String,
|
||||
|
||||
/// The bucket name.
|
||||
#[clap(long, env = MEILI_S3_BUCKET_NAME, required = false)]
|
||||
#[serde(default)]
|
||||
pub s3_bucket_name: String,
|
||||
|
||||
/// The prefix path where to put the snapshot, uses normal slashes (/).
|
||||
#[clap(long, env = MEILI_S3_SNAPSHOT_PREFIX, required = false)]
|
||||
#[serde(default)]
|
||||
pub s3_snapshot_prefix: String,
|
||||
|
||||
/// The S3 access key.
|
||||
#[clap(long, env = MEILI_S3_ACCESS_KEY, required = false)]
|
||||
#[serde(default)]
|
||||
pub s3_access_key: String,
|
||||
|
||||
/// The S3 secret key.
|
||||
#[clap(long, env = MEILI_S3_SECRET_KEY, required = false)]
|
||||
#[serde(default)]
|
||||
pub s3_secret_key: String,
|
||||
|
||||
/// The maximum number of parts that can be uploaded in parallel.
|
||||
#[clap(long, env = MEILI_S3_MAX_IN_FLIGHT_PARTS, default_value_t = default_s3_snapshot_max_in_flight_parts())]
|
||||
#[serde(default = "default_s3_snapshot_max_in_flight_parts")]
|
||||
pub s3_max_in_flight_parts: NonZeroUsize,
|
||||
|
||||
/// The compression level. Defaults to no compression (0).
|
||||
#[clap(long, env = MEILI_S3_COMPRESSION_LEVEL, default_value_t = default_s3_snapshot_compression_level())]
|
||||
#[serde(default = "default_s3_snapshot_compression_level")]
|
||||
pub s3_compression_level: u32,
|
||||
}
|
||||
|
||||
impl S3SnapshotOpts {
|
||||
/// Exports the values to their corresponding env vars if they are not set.
|
||||
pub fn export_to_env(self) {
|
||||
let S3SnapshotOpts {
|
||||
s3_bucket_url,
|
||||
s3_bucket_region,
|
||||
s3_bucket_name,
|
||||
s3_snapshot_prefix,
|
||||
s3_access_key,
|
||||
s3_secret_key,
|
||||
s3_max_in_flight_parts,
|
||||
s3_compression_level,
|
||||
} = self;
|
||||
|
||||
export_to_env_if_not_present(MEILI_S3_BUCKET_URL, s3_bucket_url);
|
||||
export_to_env_if_not_present(MEILI_S3_BUCKET_REGION, s3_bucket_region);
|
||||
export_to_env_if_not_present(MEILI_S3_BUCKET_NAME, s3_bucket_name);
|
||||
export_to_env_if_not_present(MEILI_S3_SNAPSHOT_PREFIX, s3_snapshot_prefix);
|
||||
export_to_env_if_not_present(MEILI_S3_ACCESS_KEY, s3_access_key);
|
||||
export_to_env_if_not_present(MEILI_S3_SECRET_KEY, s3_secret_key);
|
||||
export_to_env_if_not_present(
|
||||
MEILI_S3_MAX_IN_FLIGHT_PARTS,
|
||||
s3_max_in_flight_parts.to_string(),
|
||||
);
|
||||
export_to_env_if_not_present(MEILI_S3_COMPRESSION_LEVEL, s3_compression_level.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<S3SnapshotOpts> for S3SnapshotOptions {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(other: S3SnapshotOpts) -> Result<Self, Self::Error> {
|
||||
let S3SnapshotOpts {
|
||||
s3_bucket_url,
|
||||
s3_bucket_region,
|
||||
s3_bucket_name,
|
||||
s3_snapshot_prefix,
|
||||
s3_access_key,
|
||||
s3_secret_key,
|
||||
s3_max_in_flight_parts,
|
||||
s3_compression_level,
|
||||
} = other;
|
||||
|
||||
Ok(S3SnapshotOptions {
|
||||
s3_bucket_url,
|
||||
s3_bucket_region,
|
||||
s3_bucket_name,
|
||||
s3_snapshot_prefix,
|
||||
s3_access_key,
|
||||
s3_secret_key,
|
||||
s3_max_in_flight_parts,
|
||||
s3_compression_level,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -1089,6 +1223,14 @@ fn default_snapshot_interval_sec() -> &'static str {
|
||||
DEFAULT_SNAPSHOT_INTERVAL_SEC_STR
|
||||
}
|
||||
|
||||
fn default_s3_snapshot_max_in_flight_parts() -> NonZeroUsize {
|
||||
DEFAULT_S3_SNAPSHOT_MAX_IN_FLIGHT_PARTS
|
||||
}
|
||||
|
||||
fn default_s3_snapshot_compression_level() -> u32 {
|
||||
DEFAULT_S3_SNAPSHOT_COMPRESSION_LEVEL
|
||||
}
|
||||
|
||||
fn default_dump_dir() -> PathBuf {
|
||||
PathBuf::from(DEFAULT_DUMP_DIR)
|
||||
}
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
use std::num::NonZeroUsize;
|
||||
|
||||
use grenad::CompressionType;
|
||||
|
||||
use super::GrenadParameters;
|
||||
@@ -20,6 +22,7 @@ pub struct IndexerConfig {
|
||||
pub experimental_no_edition_2024_for_dumps: bool,
|
||||
pub experimental_no_edition_2024_for_prefix_post_processing: bool,
|
||||
pub experimental_no_edition_2024_for_facet_post_processing: bool,
|
||||
pub s3_snapshot_options: Option<S3SnapshotOptions>,
|
||||
}
|
||||
|
||||
impl IndexerConfig {
|
||||
@@ -37,6 +40,18 @@ impl IndexerConfig {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct S3SnapshotOptions {
|
||||
pub s3_bucket_url: String,
|
||||
pub s3_bucket_region: String,
|
||||
pub s3_bucket_name: String,
|
||||
pub s3_snapshot_prefix: String,
|
||||
pub s3_access_key: String,
|
||||
pub s3_secret_key: String,
|
||||
pub s3_max_in_flight_parts: NonZeroUsize,
|
||||
pub s3_compression_level: u32,
|
||||
}
|
||||
|
||||
/// By default use only 1 thread for indexing in tests
|
||||
#[cfg(test)]
|
||||
pub fn default_thread_pool_and_threads() -> (ThreadPoolNoAbort, Option<usize>) {
|
||||
@@ -76,6 +91,7 @@ impl Default for IndexerConfig {
|
||||
experimental_no_edition_2024_for_dumps: false,
|
||||
experimental_no_edition_2024_for_prefix_post_processing: false,
|
||||
experimental_no_edition_2024_for_facet_post_processing: false,
|
||||
s3_snapshot_options: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ pub use self::concurrent_available_ids::ConcurrentAvailableIds;
|
||||
pub use self::facet::bulk::FacetsUpdateBulk;
|
||||
pub use self::facet::incremental::FacetsUpdateIncrementalInner;
|
||||
pub use self::index_documents::{request_threads, *};
|
||||
pub use self::indexer_config::{default_thread_pool_and_threads, IndexerConfig};
|
||||
pub use self::indexer_config::{default_thread_pool_and_threads, IndexerConfig, S3SnapshotOptions};
|
||||
pub use self::new::ChannelCongestion;
|
||||
pub use self::settings::{validate_embedding_settings, Setting, Settings};
|
||||
pub use self::update_step::UpdateIndexingStep;
|
||||
|
||||
Reference in New Issue
Block a user