Compare commits

...

33 Commits

Author SHA1 Message Date
Kerollmops
cbc0878158 Introduce some backoff retries 2025-11-04 17:46:01 +01:00
Kerollmops
8967e40083 Support cancelation 2025-11-04 16:30:02 +01:00
Kerollmops
f6a0db9cb8 Clean up the code 2025-11-04 14:28:03 +01:00
Kerollmops
b64b083003 Support clean CLI options 2025-11-04 14:28:03 +01:00
Kerollmops
eab2b806e9 Remove unused imports/code on Windows 2025-11-04 09:07:59 +01:00
Kerollmops
f7d9425c59 Remove unused imports/code on Windows 2025-11-03 18:52:58 +01:00
Kerollmops
f7f68f81a4 Make it run on Windows as we use a standard io::pipe 2025-11-03 18:19:54 +01:00
Kerollmops
6e0d0d306b Make clippy happy 2025-11-03 17:45:29 +01:00
Kerollmops
34dc67aea8 Make the compression level configurable 2025-10-24 11:47:47 +02:00
Kerollmops
fbe56822f0 Disable compression entirely to avoid being CPU bound 2025-10-23 13:32:32 +02:00
Clément Renault
5e1af30b42 Improve the way we create the snapshot path 2025-10-20 16:48:14 +02:00
Clément Renault
d298b21a95 Remove useless dependencies 2025-10-20 16:48:14 +02:00
Kerollmops
a4ad87febf Make it finaly work but without async on the write side 2025-10-20 16:48:14 +02:00
Kerollmops
2caa2be441 Seeking the tasks/data.mdb file to the begining made the trick 2025-10-20 16:48:14 +02:00
Kerollmops
a829ded023 Improve understanding of S3-related errors 2025-10-20 16:48:14 +02:00
Kerollmops
98be43b66b Retrieve the bytesMut only when released 2025-10-20 16:48:14 +02:00
Kerollmops
3ec4750426 Fix minimum part size 2025-10-20 16:48:14 +02:00
Kerollmops
d717ec3486 Improve error messaging when missing env var 2025-10-20 16:48:14 +02:00
Clément Renault
0c06bdefac WIP 2025-10-20 16:48:13 +02:00
Kerollmops
62a8133bcd WIP Do more tests 2025-10-20 16:48:13 +02:00
Clément Renault
c1f7542dfa WIP sending multiparts of 250MiB 2025-10-20 16:48:13 +02:00
Clément Renault
9105e8bb8c Rename the update_path function 2025-10-20 16:48:13 +02:00
Clément Renault
59e2394e69 Geenrate an async tarball 2025-10-20 16:48:13 +02:00
Kerollmops
430bc91c4c WIP 2025-10-20 16:48:13 +02:00
Kerollmops
1172dce093 Make max in flights parts fro upload configurable 2025-10-20 16:48:13 +02:00
Kerollmops
21140a33e6 Use a good mem advice for uploads 2025-10-20 16:48:13 +02:00
Kerollmops
9ed7a81495 Move the S3 snapshots to disk into a dedicated method 2025-10-20 16:48:13 +02:00
Clément Renault
c8c1d95efd Upload ten parts at a time 2025-10-20 16:48:13 +02:00
Clément Renault
10d1e26478 Use the Bytes crate to send the parts 2025-10-20 16:48:13 +02:00
Clément Renault
cd781d267b Upload indexes under their uuids 2025-10-20 16:48:13 +02:00
Clément Renault
b16af4a763 Initial working S3 uploads to RustFS 2025-10-20 16:48:13 +02:00
Louis Dureuil
1514d13ab3 Pass tokio handle to index-scheduler 2025-10-20 16:48:13 +02:00
Kerollmops
d51f13a59e Use the latest version of heed 2025-10-20 16:48:13 +02:00
20 changed files with 790 additions and 50 deletions

93
Cargo.lock generated
View File

@@ -1123,7 +1123,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "159fa412eae48a1d94d0b9ecdb85c97ce56eb2a347c62394d3fdbf221adabc1a"
dependencies = [
"path-matchers",
"path-slash",
"path-slash 0.1.5",
]
[[package]]
@@ -2838,9 +2838,9 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
[[package]]
name = "heed"
version = "0.22.1-nested-rtxns"
version = "0.22.1-nested-rtxns-2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ff115ba5712b1f1fc7617b195f5c2f139e29c397ff79da040cd19db75ccc240"
checksum = "a644ab0b1e8234a7c82e83e26b24af1e4a4b8317629fa6ccd9ea2608c1595c73"
dependencies = [
"bitflags 2.9.4",
"byteorder",
@@ -3242,6 +3242,7 @@ dependencies = [
"bumpalo",
"bumparaw-collections",
"byte-unit",
"bytes",
"convert_case 0.8.0",
"crossbeam-channel",
"csv",
@@ -3258,14 +3259,19 @@ dependencies = [
"meilisearch-types",
"memmap2",
"page_size",
"path-slash 0.2.1",
"rayon",
"reqwest",
"roaring 0.10.12",
"rusty-s3",
"serde",
"serde_json",
"synchronoise",
"tar",
"tempfile",
"thiserror 2.0.16",
"time",
"tokio",
"tracing",
"ureq",
"uuid",
@@ -3465,6 +3471,30 @@ dependencies = [
"regex",
]
[[package]]
name = "jiff"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be1f93b8b1eb69c77f24bbb0afdf66f54b632ee39af40ca21c4365a1d7347e49"
dependencies = [
"jiff-static",
"log",
"portable-atomic",
"portable-atomic-util",
"serde",
]
[[package]]
name = "jiff-static"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03343451ff899767262ec32146f6d559dd759fdadf42ff0e227c7c48f72594b4"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.106",
]
[[package]]
name = "jobserver"
version = "0.1.34"
@@ -3988,6 +4018,16 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d"
[[package]]
name = "md-5"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf"
dependencies = [
"cfg-if",
"digest",
]
[[package]]
name = "md5"
version = "0.7.0"
@@ -4745,6 +4785,12 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "498a099351efa4becc6a19c72aa9270598e8fd274ca47052e37455241c88b696"
[[package]]
name = "path-slash"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e91099d4268b0e11973f036e885d652fb0b21fedcf69738c627f94db6a44f42"
[[package]]
name = "pbkdf2"
version = "0.12.2"
@@ -4962,6 +5008,15 @@ version = "1.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483"
[[package]]
name = "portable-atomic-util"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507"
dependencies = [
"portable-atomic",
]
[[package]]
name = "potential_utf"
version = "0.1.3"
@@ -5139,6 +5194,16 @@ dependencies = [
"version_check",
]
[[package]]
name = "quick-xml"
version = "0.38.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42a232e7487fc2ef313d96dde7948e7a3c05101870d8985e4fd8d26aedd27b89"
dependencies = [
"memchr",
"serde",
]
[[package]]
name = "quinn"
version = "0.11.9"
@@ -5414,6 +5479,7 @@ dependencies = [
"futures-channel",
"futures-core",
"futures-util",
"h2 0.4.12",
"http 1.3.1",
"http-body",
"http-body-util",
@@ -5704,6 +5770,25 @@ version = "1.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d"
[[package]]
name = "rusty-s3"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fac2edd2f0b56bd79a7343f49afc01c2d41010df480538a510e0abc56044f66c"
dependencies = [
"base64 0.22.1",
"hmac",
"jiff",
"md-5",
"percent-encoding",
"quick-xml",
"serde",
"serde_json",
"sha2",
"url",
"zeroize",
]
[[package]]
name = "ryu"
version = "1.0.20"
@@ -6182,7 +6267,7 @@ checksum = "f9c425c07353535ef55b45420f5a8b0a397cd9bc3d7e5236497ca0d90604aa9b"
dependencies = [
"change-detection",
"mime_guess",
"path-slash",
"path-slash 0.1.5",
]
[[package]]

View File

@@ -60,7 +60,7 @@ impl FileStore {
/// Returns the file corresponding to the requested uuid.
pub fn get_update(&self, uuid: Uuid) -> Result<StdFile> {
let path = self.get_update_path(uuid);
let path = self.update_path(uuid);
let file = match StdFile::open(path) {
Ok(file) => file,
Err(e) => {
@@ -72,7 +72,7 @@ impl FileStore {
}
/// Returns the path that correspond to this uuid, the path could not exists.
pub fn get_update_path(&self, uuid: Uuid) -> PathBuf {
pub fn update_path(&self, uuid: Uuid) -> PathBuf {
self.path.join(uuid.to_string())
}

View File

@@ -14,6 +14,7 @@ license.workspace = true
anyhow = "1.0.98"
bincode = "1.3.3"
byte-unit = "5.1.6"
bytes = "1.10.1"
bumpalo = "3.18.1"
bumparaw-collections = "0.1.4"
convert_case = "0.8.0"
@@ -28,10 +29,12 @@ meilisearch-auth = { path = "../meilisearch-auth" }
meilisearch-types = { path = "../meilisearch-types" }
memmap2 = "0.9.7"
page_size = "0.6.0"
path-slash = "0.2.1"
rayon = "1.10.0"
roaring = { version = "0.10.12", features = ["serde"] }
serde = { version = "1.0.219", features = ["derive"] }
serde_json = { version = "1.0.140", features = ["preserve_order"] }
tar = "0.4.44"
synchronoise = "1.0.1"
tempfile = "3.20.0"
thiserror = "2.0.12"
@@ -45,6 +48,9 @@ tracing = "0.1.41"
ureq = "2.12.1"
uuid = { version = "1.17.0", features = ["serde", "v4"] }
backoff = "0.4.0"
reqwest = { version = "0.12.23", features = ["rustls-tls", "http2"], default-features = false }
rusty-s3 = "0.8.1"
tokio = { version = "1.47.1", features = ["full"] }
[dev-dependencies]
big_s = "1.0.2"

View File

@@ -5,6 +5,7 @@ use meilisearch_types::error::{Code, ErrorCode};
use meilisearch_types::milli::index::RollbackOutcome;
use meilisearch_types::tasks::{Kind, Status};
use meilisearch_types::{heed, milli};
use reqwest::StatusCode;
use thiserror::Error;
use crate::TaskId;
@@ -127,6 +128,14 @@ pub enum Error {
#[error("Aborted task")]
AbortedTask,
#[error("S3 error: status: {status}, body: {body}")]
S3Error { status: StatusCode, body: String },
#[error("S3 HTTP error: {0}")]
S3HttpError(reqwest::Error),
#[error("S3 XML error: {0}")]
S3XmlError(Box<dyn std::error::Error + Send + Sync>),
#[error("S3 bucket error: {0}")]
S3BucketError(rusty_s3::BucketError),
#[error(transparent)]
Dump(#[from] dump::Error),
#[error(transparent)]
@@ -226,6 +235,10 @@ impl Error {
| Error::TaskCancelationWithEmptyQuery
| Error::FromRemoteWhenExporting { .. }
| Error::AbortedTask
| Error::S3Error { .. }
| Error::S3HttpError(_)
| Error::S3XmlError(_)
| Error::S3BucketError(_)
| Error::Dump(_)
| Error::Heed(_)
| Error::Milli { .. }
@@ -293,8 +306,14 @@ impl ErrorCode for Error {
Error::BatchNotFound(_) => Code::BatchNotFound,
Error::TaskDeletionWithEmptyQuery => Code::MissingTaskFilters,
Error::TaskCancelationWithEmptyQuery => Code::MissingTaskFilters,
// TODO: not sure of the Code to use
Error::NoSpaceLeftInTaskQueue => Code::NoSpaceLeftOnDevice,
Error::S3Error { status, .. } if status.is_client_error() => {
Code::InvalidS3SnapshotRequest
}
Error::S3Error { .. } => Code::S3SnapshotServerError,
Error::S3HttpError(_) => Code::S3SnapshotServerError,
Error::S3XmlError(_) => Code::S3SnapshotServerError,
Error::S3BucketError(_) => Code::InvalidS3SnapshotParameters,
Error::Dump(e) => e.error_code(),
Error::Milli { error, .. } => error.error_code(),
Error::ProcessBatchPanicked(_) => Code::Internal,

View File

@@ -36,6 +36,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
run_loop_iteration: _,
embedders: _,
chat_settings: _,
runtime: _,
} = scheduler;
let rtxn = env.read_txn().unwrap();

View File

@@ -216,6 +216,9 @@ pub struct IndexScheduler {
/// A counter that is incremented before every call to [`tick`](IndexScheduler::tick)
#[cfg(test)]
run_loop_iteration: Arc<RwLock<usize>>,
/// The tokio runtime used for asynchronous tasks.
runtime: Option<tokio::runtime::Handle>,
}
impl IndexScheduler {
@@ -242,6 +245,7 @@ impl IndexScheduler {
run_loop_iteration: self.run_loop_iteration.clone(),
features: self.features.clone(),
chat_settings: self.chat_settings,
runtime: self.runtime.clone(),
}
}
@@ -260,6 +264,7 @@ impl IndexScheduler {
options: IndexSchedulerOptions,
auth_env: Env<WithoutTls>,
from_db_version: (u32, u32, u32),
runtime: Option<tokio::runtime::Handle>,
#[cfg(test)] test_breakpoint_sdr: crossbeam_channel::Sender<(test_utils::Breakpoint, bool)>,
#[cfg(test)] planned_failures: Vec<(usize, test_utils::FailureLocation)>,
) -> Result<Self> {
@@ -341,6 +346,7 @@ impl IndexScheduler {
run_loop_iteration: Arc::new(RwLock::new(0)),
features,
chat_settings,
runtime,
};
this.run();

View File

@@ -25,6 +25,7 @@ use convert_case::{Case, Casing as _};
use meilisearch_types::error::ResponseError;
use meilisearch_types::heed::{Env, WithoutTls};
use meilisearch_types::milli;
use meilisearch_types::milli::update::S3SnapshotOptions;
use meilisearch_types::tasks::Status;
use process_batch::ProcessBatchInfo;
use rayon::current_num_threads;
@@ -87,11 +88,14 @@ pub struct Scheduler {
/// Snapshot compaction status.
pub(crate) experimental_no_snapshot_compaction: bool,
/// S3 Snapshot options.
pub(crate) s3_snapshot_options: Option<S3SnapshotOptions>,
}
impl Scheduler {
pub(crate) fn private_clone(&self) -> Scheduler {
Scheduler {
pub(crate) fn private_clone(&self) -> Self {
Self {
must_stop_processing: self.must_stop_processing.clone(),
wake_up: self.wake_up.clone(),
autobatching_enabled: self.autobatching_enabled,
@@ -103,23 +107,52 @@ impl Scheduler {
version_file_path: self.version_file_path.clone(),
embedding_cache_cap: self.embedding_cache_cap,
experimental_no_snapshot_compaction: self.experimental_no_snapshot_compaction,
s3_snapshot_options: self.s3_snapshot_options.clone(),
}
}
pub fn new(options: &IndexSchedulerOptions, auth_env: Env<WithoutTls>) -> Scheduler {
let IndexSchedulerOptions {
version_file_path,
auth_path: _,
tasks_path: _,
update_file_path: _,
indexes_path: _,
snapshots_path,
dumps_path,
cli_webhook_url: _,
cli_webhook_authorization: _,
task_db_size: _,
index_base_map_size: _,
enable_mdb_writemap: _,
index_growth_amount: _,
index_count: _,
indexer_config,
autobatching_enabled,
cleanup_enabled: _,
max_number_of_tasks: _,
max_number_of_batched_tasks,
batched_tasks_size_limit,
instance_features: _,
auto_upgrade: _,
embedding_cache_cap,
experimental_no_snapshot_compaction,
} = options;
Scheduler {
must_stop_processing: MustStopProcessing::default(),
// we want to start the loop right away in case meilisearch was ctrl+Ced while processing things
wake_up: Arc::new(SignalEvent::auto(true)),
autobatching_enabled: options.autobatching_enabled,
max_number_of_batched_tasks: options.max_number_of_batched_tasks,
batched_tasks_size_limit: options.batched_tasks_size_limit,
dumps_path: options.dumps_path.clone(),
snapshots_path: options.snapshots_path.clone(),
autobatching_enabled: *autobatching_enabled,
max_number_of_batched_tasks: *max_number_of_batched_tasks,
batched_tasks_size_limit: *batched_tasks_size_limit,
dumps_path: dumps_path.clone(),
snapshots_path: snapshots_path.clone(),
auth_env,
version_file_path: options.version_file_path.clone(),
embedding_cache_cap: options.embedding_cache_cap,
experimental_no_snapshot_compaction: options.experimental_no_snapshot_compaction,
version_file_path: version_file_path.clone(),
embedding_cache_cap: *embedding_cache_cap,
experimental_no_snapshot_compaction: *experimental_no_snapshot_compaction,
s3_snapshot_options: indexer_config.s3_snapshot_options.clone(),
}
}
}

View File

@@ -4,6 +4,8 @@ use std::sync::atomic::Ordering;
use meilisearch_types::heed::CompactionOption;
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
#[cfg(unix)]
use meilisearch_types::milli::update::S3SnapshotOptions;
use meilisearch_types::tasks::{Status, Task};
use meilisearch_types::{compression, VERSION_FILE_NAME};
@@ -78,10 +80,32 @@ impl IndexScheduler {
pub(super) fn process_snapshot(
&self,
progress: Progress,
mut tasks: Vec<Task>,
tasks: Vec<Task>,
) -> Result<Vec<Task>> {
progress.update_progress(SnapshotCreationProgress::StartTheSnapshotCreation);
match self.scheduler.s3_snapshot_options.clone() {
Some(options) => {
#[cfg(not(unix))]
{
let _ = options;
panic!("Non-unix platform does not support S3 snapshotting");
}
#[cfg(unix)]
self.runtime
.as_ref()
.expect("Runtime not initialized")
.block_on(self.process_snapshot_to_s3(progress, options, tasks))
}
None => self.process_snapshots_to_disk(progress, tasks),
}
}
fn process_snapshots_to_disk(
&self,
progress: Progress,
mut tasks: Vec<Task>,
) -> Result<Vec<Task>, Error> {
fs::create_dir_all(&self.scheduler.snapshots_path)?;
let temp_snapshot_dir = tempfile::tempdir()?;
@@ -140,7 +164,7 @@ impl IndexScheduler {
let task =
self.queue.tasks.get_task(&rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
if let Some(content_uuid) = task.content_uuid() {
let src = self.queue.file_store.get_update_path(content_uuid);
let src = self.queue.file_store.update_path(content_uuid);
let dst = update_files_dir.join(content_uuid.to_string());
fs::copy(src, dst)?;
}
@@ -206,4 +230,335 @@ impl IndexScheduler {
Ok(tasks)
}
#[cfg(unix)]
#[allow(clippy::too_many_arguments)]
pub(super) async fn process_snapshot_to_s3(
&self,
progress: Progress,
opts: S3SnapshotOptions,
mut tasks: Vec<Task>,
) -> Result<Vec<Task>> {
use std::collections::VecDeque;
use std::fs::File;
use std::io::{self, Seek as _, SeekFrom, Write as _};
use std::os::fd::OwnedFd;
use std::path::Path;
use std::path::PathBuf;
use bytes::{Bytes, BytesMut};
use path_slash::PathBufExt as _;
use reqwest::header::ETAG;
use reqwest::Client;
use reqwest::Response;
use rusty_s3::actions::{CreateMultipartUpload, S3Action as _};
use rusty_s3::{Bucket, BucketError, Credentials, UrlStyle};
use tokio::task::JoinHandle;
let S3SnapshotOptions {
s3_bucket_url,
s3_bucket_region,
s3_bucket_name,
s3_snapshot_prefix,
s3_access_key,
s3_secret_key,
s3_max_in_flight_parts,
s3_compression_level: level,
s3_signature_duration,
s3_multipart_part_size,
} = opts;
let must_stop_processing = self.scheduler.must_stop_processing.clone();
let retry_backoff = backoff::ExponentialBackoff::default();
let (reader, writer) = std::io::pipe()?;
let uploader_task = tokio::spawn(async move {
let reader = OwnedFd::from(reader);
let reader = tokio::net::unix::pipe::Receiver::from_owned_fd(reader)?;
let s3_snapshot_prefix = PathBuf::from_slash(s3_snapshot_prefix);
let url = s3_bucket_url
.parse()
.map_err(BucketError::ParseError)
.map_err(Error::S3BucketError)?;
let bucket = Bucket::new(url, UrlStyle::Path, s3_bucket_name, s3_bucket_region)
.map_err(Error::S3BucketError)?;
let credential = Credentials::new(s3_access_key, s3_secret_key);
// TODO change this and use the database name like in the original version
let object_path = s3_snapshot_prefix.join("data.ms.snapshot");
let object = object_path.to_slash().expect("Invalid UTF-8 path").into_owned();
let action = bucket.create_multipart_upload(Some(&credential), &object);
let url = action.sign(s3_signature_duration);
let client = Client::new();
let resp = client.post(url).send().await.map_err(Error::S3HttpError)?;
let status = resp.status();
let body = match resp.error_for_status_ref() {
Ok(_) => resp.text().await.map_err(Error::S3HttpError)?,
Err(_) => {
return Err(Error::S3Error {
status,
body: resp.text().await.unwrap_or_default(),
})
}
};
let multipart = CreateMultipartUpload::parse_response(&body)
.map_err(|e| Error::S3XmlError(Box::new(e)))?;
tracing::debug!("Starting the upload of the snapshot to {object}");
// We use this bumpalo for etags strings.
let bump = bumpalo::Bump::new();
let mut etags = Vec::<&str>::new();
let mut in_flight =
VecDeque::<(JoinHandle<reqwest::Result<Response>>, Bytes)>::with_capacity(
s3_max_in_flight_parts.get(),
);
// Part numbers start at 1 and cannot be larger than 10k
for part_number in 1u16.. {
if must_stop_processing.get() {
return Err(Error::AbortedTask);
}
let part_upload = bucket.upload_part(
Some(&credential),
&object,
part_number,
multipart.upload_id(),
);
let url = part_upload.sign(s3_signature_duration);
// Wait for a buffer to be ready if there are in-flight parts that landed
let mut buffer = if in_flight.len() >= s3_max_in_flight_parts.get() {
let (request, buffer) = in_flight.pop_front().unwrap();
let resp = request.await.unwrap().map_err(Error::S3HttpError)?;
let resp = match resp.error_for_status_ref() {
Ok(response) => response,
Err(_) => {
return Err(Error::S3Error {
status: resp.status(),
body: resp.text().await.unwrap_or_default(),
});
}
};
let etag =
resp.headers().get(ETAG).expect("every UploadPart request returns an Etag");
let mut buffer = match buffer.try_into_mut() {
Ok(buffer) => buffer,
Err(_) => unreachable!("Impossible to convert into BytesMut"),
};
etags.push(bump.alloc_str(etag.to_str().unwrap()));
buffer.clear();
buffer
} else {
BytesMut::with_capacity(s3_multipart_part_size as usize)
};
// If we successfully read enough bytes,
// we can continue and send the buffer/part
while buffer.len() < (s3_multipart_part_size as usize / 2) {
// Wait for the pipe to be readable
reader.readable().await?;
match reader.try_read_buf(&mut buffer) {
Ok(0) => break,
// We read some bytes but maybe not enough
Ok(_) => continue,
// The readiness event is a false positive.
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
Err(e) => return Err(e.into()),
}
}
if buffer.is_empty() {
// Break the loop if the buffer is
// empty after we tried to read bytes
break;
}
let body = buffer.freeze();
tracing::trace!("Sending part {part_number}");
let task = tokio::spawn({
let client = client.clone();
let body = body.clone();
backoff::future::retry(retry_backoff.clone(), move || {
let client = client.clone();
let url = url.clone();
let body = body.clone();
async move {
match client.put(url).body(body).send().await {
Ok(resp) if resp.status().is_client_error() => {
resp.error_for_status().map_err(backoff::Error::Permanent)
}
Ok(resp) => Ok(resp),
Err(e) => Err(backoff::Error::transient(e)),
}
}
})
});
in_flight.push_back((task, body));
}
for (join_handle, _buffer) in in_flight {
let resp = join_handle.await.unwrap().map_err(Error::S3HttpError)?;
let resp = match resp.error_for_status_ref() {
Ok(response) => response,
Err(_) => {
return Err(Error::S3Error {
status: resp.status(),
body: resp.text().await.unwrap_or_default(),
})
}
};
let etag =
resp.headers().get(ETAG).expect("every UploadPart request returns an Etag");
etags.push(bump.alloc_str(etag.to_str().unwrap()));
}
tracing::trace!("Finalizing the multipart upload");
let action = bucket.complete_multipart_upload(
Some(&credential),
&object,
multipart.upload_id(),
etags.iter().map(AsRef::as_ref),
);
let url = action.sign(s3_signature_duration);
let body = action.body();
let resp = backoff::future::retry(retry_backoff, move || {
let client = client.clone();
let url = url.clone();
let body = body.clone();
async move {
match client.post(url).body(body).send().await {
Ok(resp) if resp.status().is_client_error() => {
resp.error_for_status().map_err(backoff::Error::Permanent)
}
Ok(resp) => Ok(resp),
Err(e) => Err(backoff::Error::transient(e)),
}
}
})
.await
.map_err(Error::S3HttpError)?;
let status = resp.status();
let body =
resp.text().await.map_err(|e| Error::S3Error { status, body: e.to_string() })?;
if status.is_success() {
Ok(())
} else {
Err(Error::S3Error { status, body })
}
});
let index_scheduler = IndexScheduler::private_clone(self);
let builder_task = tokio::task::spawn_blocking(move || {
let writer = flate2::write::GzEncoder::new(writer, flate2::Compression::new(level));
let mut tarball = tar::Builder::new(writer);
// 1. Snapshot the version file
tarball.append_path_with_name(
&index_scheduler.scheduler.version_file_path,
VERSION_FILE_NAME,
)?;
// 2. Snapshot the index scheduler LMDB env
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler);
let mut tasks_env_file = index_scheduler.env.try_clone_inner_file()?;
// Note: Seeking to the start of the file is necessary to ensure
// the tarball reads the file from the beginning. I still
// don't know why the read cursor is not at the beginning.
tasks_env_file.seek(SeekFrom::Start(0))?;
let path = Path::new("tasks").join("data.mdb");
tarball.append_file(path, &mut tasks_env_file)?;
drop(tasks_env_file);
// 2.3 Create a read transaction on the index-scheduler
let rtxn = index_scheduler.env.read_txn()?;
// 2.4 Create the update files directory
// And only copy the update files of the enqueued tasks
progress.update_progress(SnapshotCreationProgress::SnapshotTheUpdateFiles);
let enqueued = index_scheduler.queue.tasks.get_status(&rtxn, Status::Enqueued)?;
let (atomic, update_file_progress) = AtomicUpdateFileStep::new(enqueued.len() as u32);
progress.update_progress(update_file_progress);
// TODO I need to create the update files directory (even if empty)
// I should probably simply use the append_dir_all method
// but I'll loose the progression.
let update_files_dir = Path::new("update_files");
for task_id in enqueued {
let task = index_scheduler
.queue
.tasks
.get_task(&rtxn, task_id)?
.ok_or(Error::CorruptedTaskQueue)?;
if let Some(content_uuid) = task.content_uuid() {
let src = index_scheduler.queue.file_store.update_path(content_uuid);
let mut update_file = File::open(src)?;
let path = update_files_dir.join(content_uuid.to_string());
tarball.append_file(path, &mut update_file)?;
}
atomic.fetch_add(1, Ordering::Relaxed);
}
// 3. Snapshot every indexes
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexes);
let index_mapping = index_scheduler.index_mapper.index_mapping;
let nb_indexes = index_mapping.len(&rtxn)? as u32;
let indexes_dir = Path::new("indexes");
let indexes_references: Vec<_> = index_scheduler
.index_mapper
.index_mapping
.iter(&rtxn)?
.map(|res| res.map_err(Error::from).map(|(name, uuid)| (name.to_string(), uuid)))
.collect::<Result<_, Error>>()?;
// It's prettier to use a for loop instead of the IndexMapper::try_for_each_index
// method, especially when we need to access the UUID, local path and index number.
for (i, (name, uuid)) in indexes_references.into_iter().enumerate() {
progress.update_progress(VariableNameStep::<SnapshotCreationProgress>::new(
&name, i as u32, nb_indexes,
));
let path = indexes_dir.join(uuid.to_string()).join("data.mdb");
let index = index_scheduler.index_mapper.index(&rtxn, &name)?;
let mut index_file = index.try_clone_inner_file().unwrap();
index_file.seek(SeekFrom::Start(0))?;
tracing::trace!("Appending index file for {name} in {}", path.display());
tarball.append_file(path, &mut index_file)?;
}
drop(rtxn);
// 4. Snapshot the auth LMDB env
progress.update_progress(SnapshotCreationProgress::SnapshotTheApiKeys);
let mut auth_env_file =
index_scheduler.scheduler.auth_env.try_clone_inner_file().unwrap();
auth_env_file.seek(SeekFrom::Start(0))?;
let path = Path::new("auth").join("data.mdb");
tarball.append_file(path, &mut auth_env_file)?;
let mut gzencoder = tarball.into_inner()?;
gzencoder.flush()?;
gzencoder.try_finish()?;
let mut writer = gzencoder.finish()?;
writer.flush()?;
Result::<_, Error>::Ok(())
});
let (uploader_result, builder_result) = tokio::join!(uploader_task, builder_task);
// Check uploader result first to early return on task abortion.
uploader_result.unwrap()?;
builder_result.unwrap()?;
for task in &mut tasks {
task.status = Status::Succeeded;
}
Ok(tasks)
}
}

View File

@@ -126,7 +126,7 @@ impl IndexScheduler {
std::fs::create_dir_all(&options.auth_path).unwrap();
let auth_env = open_auth_store_env(&options.auth_path).unwrap();
let index_scheduler =
Self::new(options, auth_env, version, sender, planned_failures).unwrap();
Self::new(options, auth_env, version, None, sender, planned_failures).unwrap();
// To be 100% consistent between all test we're going to start the scheduler right now
// and ensure it's in the expected starting state.

View File

@@ -390,6 +390,9 @@ TooManyVectors , InvalidRequest , BAD_REQU
UnretrievableDocument , Internal , BAD_REQUEST ;
UnretrievableErrorCode , InvalidRequest , BAD_REQUEST ;
UnsupportedMediaType , InvalidRequest , UNSUPPORTED_MEDIA_TYPE ;
InvalidS3SnapshotRequest , Internal , BAD_REQUEST ;
InvalidS3SnapshotParameters , Internal , BAD_REQUEST ;
S3SnapshotServerError , Internal , BAD_GATEWAY ;
// Experimental features
VectorEmbeddingError , InvalidRequest , BAD_REQUEST ;

View File

@@ -217,6 +217,7 @@ struct Infos {
import_snapshot: bool,
schedule_snapshot: Option<u64>,
snapshot_dir: bool,
uses_s3_snapshots: bool,
ignore_missing_snapshot: bool,
ignore_snapshot_if_db_exists: bool,
http_addr: bool,
@@ -285,6 +286,7 @@ impl Infos {
indexer_options,
config_file_path,
no_analytics: _,
s3_snapshot_options,
} = options;
let schedule_snapshot = match schedule_snapshot {
@@ -348,6 +350,7 @@ impl Infos {
import_snapshot: import_snapshot.is_some(),
schedule_snapshot,
snapshot_dir: snapshot_dir != PathBuf::from("snapshots/"),
uses_s3_snapshots: s3_snapshot_options.is_some(),
ignore_missing_snapshot,
ignore_snapshot_if_db_exists,
http_addr: http_addr != default_http_addr(),

View File

@@ -216,7 +216,10 @@ enum OnFailure {
KeepDb,
}
pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<AuthController>)> {
pub fn setup_meilisearch(
opt: &Opt,
handle: tokio::runtime::Handle,
) -> anyhow::Result<(Arc<IndexScheduler>, Arc<AuthController>)> {
let index_scheduler_opt = IndexSchedulerOptions {
version_file_path: opt.db_path.join(VERSION_FILE_NAME),
auth_path: opt.db_path.join("auth"),
@@ -230,7 +233,11 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
task_db_size: opt.max_task_db_size.as_u64() as usize,
index_base_map_size: opt.max_index_size.as_u64() as usize,
enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage,
indexer_config: Arc::new((&opt.indexer_options).try_into()?),
indexer_config: Arc::new({
let s3_snapshot_options =
opt.s3_snapshot_options.clone().map(|opt| opt.try_into()).transpose()?;
IndexerConfig { s3_snapshot_options, ..(&opt.indexer_options).try_into()? }
}),
autobatching_enabled: true,
cleanup_enabled: !opt.experimental_replication_parameters,
max_number_of_tasks: 1_000_000,
@@ -256,6 +263,7 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
index_scheduler_opt,
OnFailure::RemoveDb,
binary_version, // the db is empty
handle,
)?,
Err(e) => {
std::fs::remove_dir_all(&opt.db_path)?;
@@ -273,7 +281,7 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
bail!("snapshot doesn't exist at {}", snapshot_path.display())
// the snapshot and the db exist, and we can ignore the snapshot because of the ignore_snapshot_if_db_exists flag
} else {
open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version)?
open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version, handle)?
}
} else if let Some(ref path) = opt.import_dump {
let src_path_exists = path.exists();
@@ -284,6 +292,7 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
index_scheduler_opt,
OnFailure::RemoveDb,
binary_version, // the db is empty
handle,
)?;
match import_dump(&opt.db_path, path, &mut index_scheduler, &mut auth_controller) {
Ok(()) => (index_scheduler, auth_controller),
@@ -304,10 +313,10 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
// the dump and the db exist and we can ignore the dump because of the ignore_dump_if_db_exists flag
// or, the dump is missing but we can ignore that because of the ignore_missing_dump flag
} else {
open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version)?
open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version, handle)?
}
} else {
open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version)?
open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version, handle)?
};
// We create a loop in a thread that registers snapshotCreation tasks
@@ -338,6 +347,7 @@ fn open_or_create_database_unchecked(
index_scheduler_opt: IndexSchedulerOptions,
on_failure: OnFailure,
version: (u32, u32, u32),
handle: tokio::runtime::Handle,
) -> anyhow::Result<(IndexScheduler, AuthController)> {
// we don't want to create anything in the data.ms yet, thus we
// wrap our two builders in a closure that'll be executed later.
@@ -345,7 +355,7 @@ fn open_or_create_database_unchecked(
let auth_env = open_auth_store_env(&index_scheduler_opt.auth_path).unwrap();
let auth_controller = AuthController::new(auth_env.clone(), &opt.master_key);
let index_scheduler_builder = || -> anyhow::Result<_> {
Ok(IndexScheduler::new(index_scheduler_opt, auth_env, version)?)
Ok(IndexScheduler::new(index_scheduler_opt, auth_env, version, Some(handle))?)
};
match (
@@ -452,6 +462,7 @@ fn open_or_create_database(
index_scheduler_opt: IndexSchedulerOptions,
empty_db: bool,
binary_version: (u32, u32, u32),
handle: tokio::runtime::Handle,
) -> anyhow::Result<(IndexScheduler, AuthController)> {
let version = if !empty_db {
check_version(opt, &index_scheduler_opt, binary_version)?
@@ -459,7 +470,7 @@ fn open_or_create_database(
binary_version
};
open_or_create_database_unchecked(opt, index_scheduler_opt, OnFailure::KeepDb, version)
open_or_create_database_unchecked(opt, index_scheduler_opt, OnFailure::KeepDb, version, handle)
}
fn import_dump(
@@ -527,7 +538,11 @@ fn import_dump(
let indexer_config = if base_config.max_threads.is_none() {
let (thread_pool, _) = default_thread_pool_and_threads();
let _config = IndexerConfig { thread_pool, ..*base_config };
let _config = IndexerConfig {
thread_pool,
s3_snapshot_options: base_config.s3_snapshot_options.clone(),
..*base_config
};
backup_config = _config;
&backup_config
} else {

View File

@@ -76,7 +76,10 @@ fn on_panic(info: &std::panic::PanicHookInfo) {
#[actix_web::main]
async fn main() -> anyhow::Result<()> {
try_main().await.inspect_err(|error| {
// won't panic inside of tokio::main
let runtime = tokio::runtime::Handle::current();
try_main(runtime).await.inspect_err(|error| {
tracing::error!(%error);
let mut current = error.source();
let mut depth = 0;
@@ -88,7 +91,7 @@ async fn main() -> anyhow::Result<()> {
})
}
async fn try_main() -> anyhow::Result<()> {
async fn try_main(runtime: tokio::runtime::Handle) -> anyhow::Result<()> {
let (opt, config_read_from) = Opt::try_build()?;
std::panic::set_hook(Box::new(on_panic));
@@ -122,7 +125,7 @@ async fn try_main() -> anyhow::Result<()> {
_ => (),
}
let (index_scheduler, auth_controller) = setup_meilisearch(&opt)?;
let (index_scheduler, auth_controller) = setup_meilisearch(&opt, runtime)?;
let analytics =
analytics::Analytics::new(&opt, index_scheduler.clone(), auth_controller.clone()).await;

View File

@@ -7,12 +7,13 @@ use std::ops::Deref;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use std::{env, fmt, fs};
use byte_unit::{Byte, ParseError, UnitType};
use clap::Parser;
use meilisearch_types::features::InstanceTogglableFeatures;
use meilisearch_types::milli::update::IndexerConfig;
use meilisearch_types::milli::update::{IndexerConfig, S3SnapshotOptions};
use meilisearch_types::milli::ThreadPoolNoAbortBuilder;
use rustls::server::{ServerSessionMemoryCache, WebPkiClientVerifier};
use rustls::RootCertStore;
@@ -74,6 +75,19 @@ const MEILI_EXPERIMENTAL_EMBEDDING_CACHE_ENTRIES: &str =
const MEILI_EXPERIMENTAL_NO_SNAPSHOT_COMPACTION: &str = "MEILI_EXPERIMENTAL_NO_SNAPSHOT_COMPACTION";
const MEILI_EXPERIMENTAL_NO_EDITION_2024_FOR_DUMPS: &str =
"MEILI_EXPERIMENTAL_NO_EDITION_2024_FOR_DUMPS";
// Related to S3 snapshots
const MEILI_S3_BUCKET_URL: &str = "MEILI_S3_BUCKET_URL";
const MEILI_S3_BUCKET_REGION: &str = "MEILI_S3_BUCKET_REGION";
const MEILI_S3_BUCKET_NAME: &str = "MEILI_S3_BUCKET_NAME";
const MEILI_S3_SNAPSHOT_PREFIX: &str = "MEILI_S3_SNAPSHOT_PREFIX";
const MEILI_S3_ACCESS_KEY: &str = "MEILI_S3_ACCESS_KEY";
const MEILI_S3_SECRET_KEY: &str = "MEILI_S3_SECRET_KEY";
const MEILI_S3_MAX_IN_FLIGHT_PARTS: &str = "MEILI_S3_MAX_IN_FLIGHT_PARTS";
const MEILI_S3_COMPRESSION_LEVEL: &str = "MEILI_S3_COMPRESSION_LEVEL";
const MEILI_S3_SIGNATURE_DURATION_SECONDS: &str = "MEILI_S3_SIGNATURE_DURATION_SECONDS";
const MEILI_S3_MULTIPART_PART_SIZE: &str = "MEILI_S3_MULTIPART_PART_SIZE";
const DEFAULT_CONFIG_FILE_PATH: &str = "./config.toml";
const DEFAULT_DB_PATH: &str = "./data.ms";
const DEFAULT_HTTP_ADDR: &str = "localhost:7700";
@@ -83,6 +97,10 @@ const DEFAULT_SNAPSHOT_DIR: &str = "snapshots/";
const DEFAULT_SNAPSHOT_INTERVAL_SEC: u64 = 86400;
const DEFAULT_SNAPSHOT_INTERVAL_SEC_STR: &str = "86400";
const DEFAULT_DUMP_DIR: &str = "dumps/";
const DEFAULT_S3_SNAPSHOT_MAX_IN_FLIGHT_PARTS: NonZeroUsize = NonZeroUsize::new(10).unwrap();
const DEFAULT_S3_SNAPSHOT_COMPRESSION_LEVEL: u32 = 0;
const DEFAULT_S3_SNAPSHOT_SIGNATURE_DURATION_SECONDS: u64 = 8 * 3600; // 8 hours
const DEFAULT_S3_SNAPSHOT_MULTIPART_PART_SIZE: Byte = Byte::from_u64(375 * 1024 * 1024); // 375 MiB
const MEILI_MAX_INDEXING_MEMORY: &str = "MEILI_MAX_INDEXING_MEMORY";
const MEILI_MAX_INDEXING_THREADS: &str = "MEILI_MAX_INDEXING_THREADS";
@@ -479,6 +497,10 @@ pub struct Opt {
#[clap(flatten)]
pub indexer_options: IndexerOpts,
#[serde(flatten)]
#[clap(flatten)]
pub s3_snapshot_options: Option<S3SnapshotOpts>,
/// Set the path to a configuration file that should be used to setup the engine.
/// Format must be TOML.
#[clap(long)]
@@ -580,6 +602,7 @@ impl Opt {
experimental_limit_batched_tasks_total_size,
experimental_embedding_cache_entries,
experimental_no_snapshot_compaction,
s3_snapshot_options,
} = self;
export_to_env_if_not_present(MEILI_DB_PATH, db_path);
export_to_env_if_not_present(MEILI_HTTP_ADDR, http_addr);
@@ -681,6 +704,9 @@ impl Opt {
experimental_no_snapshot_compaction.to_string(),
);
indexer_options.export_to_env();
if let Some(s3_snapshot_options) = s3_snapshot_options {
s3_snapshot_options.export_to_env();
}
}
pub fn get_ssl_config(&self) -> anyhow::Result<Option<rustls::ServerConfig>> {
@@ -849,6 +875,16 @@ impl TryFrom<&IndexerOpts> for IndexerConfig {
type Error = anyhow::Error;
fn try_from(other: &IndexerOpts) -> Result<Self, Self::Error> {
let IndexerOpts {
max_indexing_memory,
max_indexing_threads,
skip_index_budget,
experimental_no_edition_2024_for_settings,
experimental_no_edition_2024_for_dumps,
experimental_no_edition_2024_for_prefix_post_processing,
experimental_no_edition_2024_for_facet_post_processing,
} = other;
let thread_pool = ThreadPoolNoAbortBuilder::new_for_indexing()
.num_threads(other.max_indexing_threads.unwrap_or_else(|| num_cpus::get() / 2))
.build()?;
@@ -856,21 +892,152 @@ impl TryFrom<&IndexerOpts> for IndexerConfig {
Ok(Self {
thread_pool,
log_every_n: Some(DEFAULT_LOG_EVERY_N),
max_memory: other.max_indexing_memory.map(|b| b.as_u64() as usize),
max_threads: *other.max_indexing_threads,
max_memory: max_indexing_memory.map(|b| b.as_u64() as usize),
max_threads: max_indexing_threads.0,
max_positions_per_attributes: None,
skip_index_budget: other.skip_index_budget,
experimental_no_edition_2024_for_settings: other
.experimental_no_edition_2024_for_settings,
experimental_no_edition_2024_for_dumps: other.experimental_no_edition_2024_for_dumps,
skip_index_budget: *skip_index_budget,
experimental_no_edition_2024_for_settings: *experimental_no_edition_2024_for_settings,
experimental_no_edition_2024_for_dumps: *experimental_no_edition_2024_for_dumps,
chunk_compression_type: Default::default(),
chunk_compression_level: Default::default(),
documents_chunk_size: Default::default(),
max_nb_chunks: Default::default(),
experimental_no_edition_2024_for_prefix_post_processing: other
.experimental_no_edition_2024_for_prefix_post_processing,
experimental_no_edition_2024_for_facet_post_processing: other
.experimental_no_edition_2024_for_facet_post_processing,
experimental_no_edition_2024_for_prefix_post_processing:
*experimental_no_edition_2024_for_prefix_post_processing,
experimental_no_edition_2024_for_facet_post_processing:
*experimental_no_edition_2024_for_facet_post_processing,
s3_snapshot_options: None,
})
}
}
#[derive(Debug, Clone, Parser, Deserialize)]
// This group is a bit tricky but makes it possible to require all listed fields if one of them
// is specified. It lets us keep an Option for the S3SnapshotOpts configuration.
// <https://github.com/clap-rs/clap/issues/5092#issuecomment-2616986075>
#[group(requires_all = ["s3_bucket_url", "s3_bucket_region", "s3_bucket_name", "s3_snapshot_prefix", "s3_access_key", "s3_secret_key"])]
pub struct S3SnapshotOpts {
/// The S3 bucket URL in the format https://s3.<region>.amazonaws.com.
#[clap(long, env = MEILI_S3_BUCKET_URL, required = false)]
#[serde(default)]
pub s3_bucket_url: String,
/// The region in the format us-east-1.
#[clap(long, env = MEILI_S3_BUCKET_REGION, required = false)]
#[serde(default)]
pub s3_bucket_region: String,
/// The bucket name.
#[clap(long, env = MEILI_S3_BUCKET_NAME, required = false)]
#[serde(default)]
pub s3_bucket_name: String,
/// The prefix path where to put the snapshot, uses normal slashes (/).
#[clap(long, env = MEILI_S3_SNAPSHOT_PREFIX, required = false)]
#[serde(default)]
pub s3_snapshot_prefix: String,
/// The S3 access key.
#[clap(long, env = MEILI_S3_ACCESS_KEY, required = false)]
#[serde(default)]
pub s3_access_key: String,
/// The S3 secret key.
#[clap(long, env = MEILI_S3_SECRET_KEY, required = false)]
#[serde(default)]
pub s3_secret_key: String,
/// The maximum number of parts that can be uploaded in parallel.
#[clap(long, env = MEILI_S3_MAX_IN_FLIGHT_PARTS, default_value_t = default_s3_snapshot_max_in_flight_parts())]
#[serde(default = "default_s3_snapshot_max_in_flight_parts")]
pub s3_max_in_flight_parts: NonZeroUsize,
/// The compression level. Defaults to no compression (0).
#[clap(long, env = MEILI_S3_COMPRESSION_LEVEL, default_value_t = default_s3_snapshot_compression_level())]
#[serde(default = "default_s3_snapshot_compression_level")]
pub s3_compression_level: u32,
/// The signature duration for the multipart upload.
#[clap(long, env = MEILI_S3_SIGNATURE_DURATION_SECONDS, default_value_t = default_s3_snapshot_signature_duration_seconds())]
#[serde(default = "default_s3_snapshot_signature_duration_seconds")]
pub s3_signature_duration_seconds: u64,
/// The size of the the multipart parts.
///
/// Must not be less than 10MiB and larger than 8GiB. Yes,
/// twice the boundaries of the AWS S3 multipart upload
/// because we use it a bit differently internally.
#[clap(long, env = MEILI_S3_MULTIPART_PART_SIZE, default_value_t = default_s3_snapshot_multipart_part_size())]
#[serde(default = "default_s3_snapshot_multipart_part_size")]
pub s3_multipart_part_size: Byte,
}
impl S3SnapshotOpts {
/// Exports the values to their corresponding env vars if they are not set.
pub fn export_to_env(self) {
let S3SnapshotOpts {
s3_bucket_url,
s3_bucket_region,
s3_bucket_name,
s3_snapshot_prefix,
s3_access_key,
s3_secret_key,
s3_max_in_flight_parts,
s3_compression_level,
s3_signature_duration_seconds,
s3_multipart_part_size,
} = self;
export_to_env_if_not_present(MEILI_S3_BUCKET_URL, s3_bucket_url);
export_to_env_if_not_present(MEILI_S3_BUCKET_REGION, s3_bucket_region);
export_to_env_if_not_present(MEILI_S3_BUCKET_NAME, s3_bucket_name);
export_to_env_if_not_present(MEILI_S3_SNAPSHOT_PREFIX, s3_snapshot_prefix);
export_to_env_if_not_present(MEILI_S3_ACCESS_KEY, s3_access_key);
export_to_env_if_not_present(MEILI_S3_SECRET_KEY, s3_secret_key);
export_to_env_if_not_present(
MEILI_S3_MAX_IN_FLIGHT_PARTS,
s3_max_in_flight_parts.to_string(),
);
export_to_env_if_not_present(MEILI_S3_COMPRESSION_LEVEL, s3_compression_level.to_string());
export_to_env_if_not_present(
MEILI_S3_SIGNATURE_DURATION_SECONDS,
s3_signature_duration_seconds.to_string(),
);
export_to_env_if_not_present(
MEILI_S3_MULTIPART_PART_SIZE,
s3_multipart_part_size.to_string(),
);
}
}
impl TryFrom<S3SnapshotOpts> for S3SnapshotOptions {
type Error = anyhow::Error;
fn try_from(other: S3SnapshotOpts) -> Result<Self, Self::Error> {
let S3SnapshotOpts {
s3_bucket_url,
s3_bucket_region,
s3_bucket_name,
s3_snapshot_prefix,
s3_access_key,
s3_secret_key,
s3_max_in_flight_parts,
s3_compression_level,
s3_signature_duration_seconds,
s3_multipart_part_size,
} = other;
Ok(S3SnapshotOptions {
s3_bucket_url,
s3_bucket_region,
s3_bucket_name,
s3_snapshot_prefix,
s3_access_key,
s3_secret_key,
s3_max_in_flight_parts,
s3_compression_level,
s3_signature_duration: Duration::from_secs(s3_signature_duration_seconds),
s3_multipart_part_size: s3_multipart_part_size.as_u64(),
})
}
}
@@ -1089,6 +1256,22 @@ fn default_snapshot_interval_sec() -> &'static str {
DEFAULT_SNAPSHOT_INTERVAL_SEC_STR
}
fn default_s3_snapshot_max_in_flight_parts() -> NonZeroUsize {
DEFAULT_S3_SNAPSHOT_MAX_IN_FLIGHT_PARTS
}
fn default_s3_snapshot_compression_level() -> u32 {
DEFAULT_S3_SNAPSHOT_COMPRESSION_LEVEL
}
fn default_s3_snapshot_signature_duration_seconds() -> u64 {
DEFAULT_S3_SNAPSHOT_SIGNATURE_DURATION_SECONDS
}
fn default_s3_snapshot_multipart_part_size() -> Byte {
DEFAULT_S3_SNAPSHOT_MULTIPART_PART_SIZE
}
fn default_dump_dir() -> PathBuf {
PathBuf::from(DEFAULT_DUMP_DIR)
}

View File

@@ -49,8 +49,8 @@ impl Server<Owned> {
}
let options = default_settings(dir.path());
let (index_scheduler, auth) = setup_meilisearch(&options).unwrap();
let handle = tokio::runtime::Handle::current();
let (index_scheduler, auth) = setup_meilisearch(&options, handle).unwrap();
let service = Service { index_scheduler, auth, options, api_key: None };
Server { service, _dir: Some(dir), _marker: PhantomData }
@@ -65,7 +65,9 @@ impl Server<Owned> {
options.master_key = Some("MASTER_KEY".to_string());
let (index_scheduler, auth) = setup_meilisearch(&options).unwrap();
let handle = tokio::runtime::Handle::current();
let (index_scheduler, auth) = setup_meilisearch(&options, handle).unwrap();
let service = Service { index_scheduler, auth, options, api_key: None };
Server { service, _dir: Some(dir), _marker: PhantomData }
@@ -78,7 +80,9 @@ impl Server<Owned> {
}
pub async fn new_with_options(options: Opt) -> Result<Self, anyhow::Error> {
let (index_scheduler, auth) = setup_meilisearch(&options)?;
let handle = tokio::runtime::Handle::current();
let (index_scheduler, auth) = setup_meilisearch(&options, handle)?;
let service = Service { index_scheduler, auth, options, api_key: None };
Ok(Server { service, _dir: None, _marker: PhantomData })
@@ -217,8 +221,9 @@ impl Server<Shared> {
}
let options = default_settings(dir.path());
let handle = tokio::runtime::Handle::current();
let (index_scheduler, auth) = setup_meilisearch(&options).unwrap();
let (index_scheduler, auth) = setup_meilisearch(&options, handle).unwrap();
let service = Service { index_scheduler, auth, api_key: None, options };
Server { service, _dir: Some(dir), _marker: PhantomData }

View File

@@ -68,7 +68,7 @@ fn convert_update_files(db_path: &Path) -> anyhow::Result<()> {
for uuid in file_store.all_uuids().context("while retrieving uuids from file store")? {
let uuid = uuid.context("while retrieving uuid from file store")?;
let update_file_path = file_store.get_update_path(uuid);
let update_file_path = file_store.update_path(uuid);
let update_file = file_store
.get_update(uuid)
.with_context(|| format!("while getting update file for uuid {uuid:?}"))?;

View File

@@ -34,7 +34,7 @@ grenad = { version = "0.5.0", default-features = false, features = [
"rayon",
"tempfile",
] }
heed = { version = "0.22.1-nested-rtxns", default-features = false, features = [
heed = { version = "0.22.1-nested-rtxns-2", default-features = false, features = [
"serde-json",
"serde-bincode",
] }

View File

@@ -425,6 +425,10 @@ impl Index {
self.env.info().map_size
}
pub fn try_clone_inner_file(&self) -> Result<File> {
Ok(self.env.try_clone_inner_file()?)
}
pub fn copy_to_file(&self, file: &mut File, option: CompactionOption) -> Result<()> {
self.env.copy_to_file(file, option).map_err(Into::into)
}

View File

@@ -1,3 +1,6 @@
use std::num::NonZeroUsize;
use std::time::Duration;
use grenad::CompressionType;
use super::GrenadParameters;
@@ -20,6 +23,7 @@ pub struct IndexerConfig {
pub experimental_no_edition_2024_for_dumps: bool,
pub experimental_no_edition_2024_for_prefix_post_processing: bool,
pub experimental_no_edition_2024_for_facet_post_processing: bool,
pub s3_snapshot_options: Option<S3SnapshotOptions>,
}
impl IndexerConfig {
@@ -37,6 +41,20 @@ impl IndexerConfig {
}
}
#[derive(Debug, Clone)]
pub struct S3SnapshotOptions {
pub s3_bucket_url: String,
pub s3_bucket_region: String,
pub s3_bucket_name: String,
pub s3_snapshot_prefix: String,
pub s3_access_key: String,
pub s3_secret_key: String,
pub s3_max_in_flight_parts: NonZeroUsize,
pub s3_compression_level: u32,
pub s3_signature_duration: Duration,
pub s3_multipart_part_size: u64,
}
/// By default use only 1 thread for indexing in tests
#[cfg(test)]
pub fn default_thread_pool_and_threads() -> (ThreadPoolNoAbort, Option<usize>) {
@@ -76,6 +94,7 @@ impl Default for IndexerConfig {
experimental_no_edition_2024_for_dumps: false,
experimental_no_edition_2024_for_prefix_post_processing: false,
experimental_no_edition_2024_for_facet_post_processing: false,
s3_snapshot_options: None,
}
}
}

View File

@@ -5,7 +5,7 @@ pub use self::concurrent_available_ids::ConcurrentAvailableIds;
pub use self::facet::bulk::FacetsUpdateBulk;
pub use self::facet::incremental::FacetsUpdateIncrementalInner;
pub use self::index_documents::{request_threads, *};
pub use self::indexer_config::{default_thread_pool_and_threads, IndexerConfig};
pub use self::indexer_config::{default_thread_pool_and_threads, IndexerConfig, S3SnapshotOptions};
pub use self::new::ChannelCongestion;
pub use self::settings::{validate_embedding_settings, Setting, Settings};
pub use self::update_step::UpdateIndexingStep;