mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-11-22 04:36:32 +00:00
Compare commits
33 Commits
v1.24.0
...
prototype-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cbc0878158 | ||
|
|
8967e40083 | ||
|
|
f6a0db9cb8 | ||
|
|
b64b083003 | ||
|
|
eab2b806e9 | ||
|
|
f7d9425c59 | ||
|
|
f7f68f81a4 | ||
|
|
6e0d0d306b | ||
|
|
34dc67aea8 | ||
|
|
fbe56822f0 | ||
|
|
5e1af30b42 | ||
|
|
d298b21a95 | ||
|
|
a4ad87febf | ||
|
|
2caa2be441 | ||
|
|
a829ded023 | ||
|
|
98be43b66b | ||
|
|
3ec4750426 | ||
|
|
d717ec3486 | ||
|
|
0c06bdefac | ||
|
|
62a8133bcd | ||
|
|
c1f7542dfa | ||
|
|
9105e8bb8c | ||
|
|
59e2394e69 | ||
|
|
430bc91c4c | ||
|
|
1172dce093 | ||
|
|
21140a33e6 | ||
|
|
9ed7a81495 | ||
|
|
c8c1d95efd | ||
|
|
10d1e26478 | ||
|
|
cd781d267b | ||
|
|
b16af4a763 | ||
|
|
1514d13ab3 | ||
|
|
d51f13a59e |
93
Cargo.lock
generated
93
Cargo.lock
generated
@@ -1123,7 +1123,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "159fa412eae48a1d94d0b9ecdb85c97ce56eb2a347c62394d3fdbf221adabc1a"
|
||||
dependencies = [
|
||||
"path-matchers",
|
||||
"path-slash",
|
||||
"path-slash 0.1.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2838,9 +2838,9 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
|
||||
|
||||
[[package]]
|
||||
name = "heed"
|
||||
version = "0.22.1-nested-rtxns"
|
||||
version = "0.22.1-nested-rtxns-2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0ff115ba5712b1f1fc7617b195f5c2f139e29c397ff79da040cd19db75ccc240"
|
||||
checksum = "a644ab0b1e8234a7c82e83e26b24af1e4a4b8317629fa6ccd9ea2608c1595c73"
|
||||
dependencies = [
|
||||
"bitflags 2.9.4",
|
||||
"byteorder",
|
||||
@@ -3242,6 +3242,7 @@ dependencies = [
|
||||
"bumpalo",
|
||||
"bumparaw-collections",
|
||||
"byte-unit",
|
||||
"bytes",
|
||||
"convert_case 0.8.0",
|
||||
"crossbeam-channel",
|
||||
"csv",
|
||||
@@ -3258,14 +3259,19 @@ dependencies = [
|
||||
"meilisearch-types",
|
||||
"memmap2",
|
||||
"page_size",
|
||||
"path-slash 0.2.1",
|
||||
"rayon",
|
||||
"reqwest",
|
||||
"roaring 0.10.12",
|
||||
"rusty-s3",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"synchronoise",
|
||||
"tar",
|
||||
"tempfile",
|
||||
"thiserror 2.0.16",
|
||||
"time",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"ureq",
|
||||
"uuid",
|
||||
@@ -3465,6 +3471,30 @@ dependencies = [
|
||||
"regex",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "jiff"
|
||||
version = "0.2.15"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "be1f93b8b1eb69c77f24bbb0afdf66f54b632ee39af40ca21c4365a1d7347e49"
|
||||
dependencies = [
|
||||
"jiff-static",
|
||||
"log",
|
||||
"portable-atomic",
|
||||
"portable-atomic-util",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "jiff-static"
|
||||
version = "0.2.15"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "03343451ff899767262ec32146f6d559dd759fdadf42ff0e227c7c48f72594b4"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.106",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "jobserver"
|
||||
version = "0.1.34"
|
||||
@@ -3988,6 +4018,16 @@ version = "1.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d"
|
||||
|
||||
[[package]]
|
||||
name = "md-5"
|
||||
version = "0.10.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"digest",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "md5"
|
||||
version = "0.7.0"
|
||||
@@ -4745,6 +4785,12 @@ version = "0.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "498a099351efa4becc6a19c72aa9270598e8fd274ca47052e37455241c88b696"
|
||||
|
||||
[[package]]
|
||||
name = "path-slash"
|
||||
version = "0.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1e91099d4268b0e11973f036e885d652fb0b21fedcf69738c627f94db6a44f42"
|
||||
|
||||
[[package]]
|
||||
name = "pbkdf2"
|
||||
version = "0.12.2"
|
||||
@@ -4962,6 +5008,15 @@ version = "1.11.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483"
|
||||
|
||||
[[package]]
|
||||
name = "portable-atomic-util"
|
||||
version = "0.2.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507"
|
||||
dependencies = [
|
||||
"portable-atomic",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "potential_utf"
|
||||
version = "0.1.3"
|
||||
@@ -5139,6 +5194,16 @@ dependencies = [
|
||||
"version_check",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quick-xml"
|
||||
version = "0.38.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "42a232e7487fc2ef313d96dde7948e7a3c05101870d8985e4fd8d26aedd27b89"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quinn"
|
||||
version = "0.11.9"
|
||||
@@ -5414,6 +5479,7 @@ dependencies = [
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"h2 0.4.12",
|
||||
"http 1.3.1",
|
||||
"http-body",
|
||||
"http-body-util",
|
||||
@@ -5704,6 +5770,25 @@ version = "1.0.22"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d"
|
||||
|
||||
[[package]]
|
||||
name = "rusty-s3"
|
||||
version = "0.8.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fac2edd2f0b56bd79a7343f49afc01c2d41010df480538a510e0abc56044f66c"
|
||||
dependencies = [
|
||||
"base64 0.22.1",
|
||||
"hmac",
|
||||
"jiff",
|
||||
"md-5",
|
||||
"percent-encoding",
|
||||
"quick-xml",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sha2",
|
||||
"url",
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ryu"
|
||||
version = "1.0.20"
|
||||
@@ -6182,7 +6267,7 @@ checksum = "f9c425c07353535ef55b45420f5a8b0a397cd9bc3d7e5236497ca0d90604aa9b"
|
||||
dependencies = [
|
||||
"change-detection",
|
||||
"mime_guess",
|
||||
"path-slash",
|
||||
"path-slash 0.1.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -60,7 +60,7 @@ impl FileStore {
|
||||
|
||||
/// Returns the file corresponding to the requested uuid.
|
||||
pub fn get_update(&self, uuid: Uuid) -> Result<StdFile> {
|
||||
let path = self.get_update_path(uuid);
|
||||
let path = self.update_path(uuid);
|
||||
let file = match StdFile::open(path) {
|
||||
Ok(file) => file,
|
||||
Err(e) => {
|
||||
@@ -72,7 +72,7 @@ impl FileStore {
|
||||
}
|
||||
|
||||
/// Returns the path that correspond to this uuid, the path could not exists.
|
||||
pub fn get_update_path(&self, uuid: Uuid) -> PathBuf {
|
||||
pub fn update_path(&self, uuid: Uuid) -> PathBuf {
|
||||
self.path.join(uuid.to_string())
|
||||
}
|
||||
|
||||
|
||||
@@ -14,6 +14,7 @@ license.workspace = true
|
||||
anyhow = "1.0.98"
|
||||
bincode = "1.3.3"
|
||||
byte-unit = "5.1.6"
|
||||
bytes = "1.10.1"
|
||||
bumpalo = "3.18.1"
|
||||
bumparaw-collections = "0.1.4"
|
||||
convert_case = "0.8.0"
|
||||
@@ -28,10 +29,12 @@ meilisearch-auth = { path = "../meilisearch-auth" }
|
||||
meilisearch-types = { path = "../meilisearch-types" }
|
||||
memmap2 = "0.9.7"
|
||||
page_size = "0.6.0"
|
||||
path-slash = "0.2.1"
|
||||
rayon = "1.10.0"
|
||||
roaring = { version = "0.10.12", features = ["serde"] }
|
||||
serde = { version = "1.0.219", features = ["derive"] }
|
||||
serde_json = { version = "1.0.140", features = ["preserve_order"] }
|
||||
tar = "0.4.44"
|
||||
synchronoise = "1.0.1"
|
||||
tempfile = "3.20.0"
|
||||
thiserror = "2.0.12"
|
||||
@@ -45,6 +48,9 @@ tracing = "0.1.41"
|
||||
ureq = "2.12.1"
|
||||
uuid = { version = "1.17.0", features = ["serde", "v4"] }
|
||||
backoff = "0.4.0"
|
||||
reqwest = { version = "0.12.23", features = ["rustls-tls", "http2"], default-features = false }
|
||||
rusty-s3 = "0.8.1"
|
||||
tokio = { version = "1.47.1", features = ["full"] }
|
||||
|
||||
[dev-dependencies]
|
||||
big_s = "1.0.2"
|
||||
|
||||
@@ -5,6 +5,7 @@ use meilisearch_types::error::{Code, ErrorCode};
|
||||
use meilisearch_types::milli::index::RollbackOutcome;
|
||||
use meilisearch_types::tasks::{Kind, Status};
|
||||
use meilisearch_types::{heed, milli};
|
||||
use reqwest::StatusCode;
|
||||
use thiserror::Error;
|
||||
|
||||
use crate::TaskId;
|
||||
@@ -127,6 +128,14 @@ pub enum Error {
|
||||
#[error("Aborted task")]
|
||||
AbortedTask,
|
||||
|
||||
#[error("S3 error: status: {status}, body: {body}")]
|
||||
S3Error { status: StatusCode, body: String },
|
||||
#[error("S3 HTTP error: {0}")]
|
||||
S3HttpError(reqwest::Error),
|
||||
#[error("S3 XML error: {0}")]
|
||||
S3XmlError(Box<dyn std::error::Error + Send + Sync>),
|
||||
#[error("S3 bucket error: {0}")]
|
||||
S3BucketError(rusty_s3::BucketError),
|
||||
#[error(transparent)]
|
||||
Dump(#[from] dump::Error),
|
||||
#[error(transparent)]
|
||||
@@ -226,6 +235,10 @@ impl Error {
|
||||
| Error::TaskCancelationWithEmptyQuery
|
||||
| Error::FromRemoteWhenExporting { .. }
|
||||
| Error::AbortedTask
|
||||
| Error::S3Error { .. }
|
||||
| Error::S3HttpError(_)
|
||||
| Error::S3XmlError(_)
|
||||
| Error::S3BucketError(_)
|
||||
| Error::Dump(_)
|
||||
| Error::Heed(_)
|
||||
| Error::Milli { .. }
|
||||
@@ -293,8 +306,14 @@ impl ErrorCode for Error {
|
||||
Error::BatchNotFound(_) => Code::BatchNotFound,
|
||||
Error::TaskDeletionWithEmptyQuery => Code::MissingTaskFilters,
|
||||
Error::TaskCancelationWithEmptyQuery => Code::MissingTaskFilters,
|
||||
// TODO: not sure of the Code to use
|
||||
Error::NoSpaceLeftInTaskQueue => Code::NoSpaceLeftOnDevice,
|
||||
Error::S3Error { status, .. } if status.is_client_error() => {
|
||||
Code::InvalidS3SnapshotRequest
|
||||
}
|
||||
Error::S3Error { .. } => Code::S3SnapshotServerError,
|
||||
Error::S3HttpError(_) => Code::S3SnapshotServerError,
|
||||
Error::S3XmlError(_) => Code::S3SnapshotServerError,
|
||||
Error::S3BucketError(_) => Code::InvalidS3SnapshotParameters,
|
||||
Error::Dump(e) => e.error_code(),
|
||||
Error::Milli { error, .. } => error.error_code(),
|
||||
Error::ProcessBatchPanicked(_) => Code::Internal,
|
||||
|
||||
@@ -36,6 +36,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
|
||||
run_loop_iteration: _,
|
||||
embedders: _,
|
||||
chat_settings: _,
|
||||
runtime: _,
|
||||
} = scheduler;
|
||||
|
||||
let rtxn = env.read_txn().unwrap();
|
||||
|
||||
@@ -216,6 +216,9 @@ pub struct IndexScheduler {
|
||||
/// A counter that is incremented before every call to [`tick`](IndexScheduler::tick)
|
||||
#[cfg(test)]
|
||||
run_loop_iteration: Arc<RwLock<usize>>,
|
||||
|
||||
/// The tokio runtime used for asynchronous tasks.
|
||||
runtime: Option<tokio::runtime::Handle>,
|
||||
}
|
||||
|
||||
impl IndexScheduler {
|
||||
@@ -242,6 +245,7 @@ impl IndexScheduler {
|
||||
run_loop_iteration: self.run_loop_iteration.clone(),
|
||||
features: self.features.clone(),
|
||||
chat_settings: self.chat_settings,
|
||||
runtime: self.runtime.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -260,6 +264,7 @@ impl IndexScheduler {
|
||||
options: IndexSchedulerOptions,
|
||||
auth_env: Env<WithoutTls>,
|
||||
from_db_version: (u32, u32, u32),
|
||||
runtime: Option<tokio::runtime::Handle>,
|
||||
#[cfg(test)] test_breakpoint_sdr: crossbeam_channel::Sender<(test_utils::Breakpoint, bool)>,
|
||||
#[cfg(test)] planned_failures: Vec<(usize, test_utils::FailureLocation)>,
|
||||
) -> Result<Self> {
|
||||
@@ -341,6 +346,7 @@ impl IndexScheduler {
|
||||
run_loop_iteration: Arc::new(RwLock::new(0)),
|
||||
features,
|
||||
chat_settings,
|
||||
runtime,
|
||||
};
|
||||
|
||||
this.run();
|
||||
|
||||
@@ -25,6 +25,7 @@ use convert_case::{Case, Casing as _};
|
||||
use meilisearch_types::error::ResponseError;
|
||||
use meilisearch_types::heed::{Env, WithoutTls};
|
||||
use meilisearch_types::milli;
|
||||
use meilisearch_types::milli::update::S3SnapshotOptions;
|
||||
use meilisearch_types::tasks::Status;
|
||||
use process_batch::ProcessBatchInfo;
|
||||
use rayon::current_num_threads;
|
||||
@@ -87,11 +88,14 @@ pub struct Scheduler {
|
||||
|
||||
/// Snapshot compaction status.
|
||||
pub(crate) experimental_no_snapshot_compaction: bool,
|
||||
|
||||
/// S3 Snapshot options.
|
||||
pub(crate) s3_snapshot_options: Option<S3SnapshotOptions>,
|
||||
}
|
||||
|
||||
impl Scheduler {
|
||||
pub(crate) fn private_clone(&self) -> Scheduler {
|
||||
Scheduler {
|
||||
pub(crate) fn private_clone(&self) -> Self {
|
||||
Self {
|
||||
must_stop_processing: self.must_stop_processing.clone(),
|
||||
wake_up: self.wake_up.clone(),
|
||||
autobatching_enabled: self.autobatching_enabled,
|
||||
@@ -103,23 +107,52 @@ impl Scheduler {
|
||||
version_file_path: self.version_file_path.clone(),
|
||||
embedding_cache_cap: self.embedding_cache_cap,
|
||||
experimental_no_snapshot_compaction: self.experimental_no_snapshot_compaction,
|
||||
s3_snapshot_options: self.s3_snapshot_options.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new(options: &IndexSchedulerOptions, auth_env: Env<WithoutTls>) -> Scheduler {
|
||||
let IndexSchedulerOptions {
|
||||
version_file_path,
|
||||
auth_path: _,
|
||||
tasks_path: _,
|
||||
update_file_path: _,
|
||||
indexes_path: _,
|
||||
snapshots_path,
|
||||
dumps_path,
|
||||
cli_webhook_url: _,
|
||||
cli_webhook_authorization: _,
|
||||
task_db_size: _,
|
||||
index_base_map_size: _,
|
||||
enable_mdb_writemap: _,
|
||||
index_growth_amount: _,
|
||||
index_count: _,
|
||||
indexer_config,
|
||||
autobatching_enabled,
|
||||
cleanup_enabled: _,
|
||||
max_number_of_tasks: _,
|
||||
max_number_of_batched_tasks,
|
||||
batched_tasks_size_limit,
|
||||
instance_features: _,
|
||||
auto_upgrade: _,
|
||||
embedding_cache_cap,
|
||||
experimental_no_snapshot_compaction,
|
||||
} = options;
|
||||
|
||||
Scheduler {
|
||||
must_stop_processing: MustStopProcessing::default(),
|
||||
// we want to start the loop right away in case meilisearch was ctrl+Ced while processing things
|
||||
wake_up: Arc::new(SignalEvent::auto(true)),
|
||||
autobatching_enabled: options.autobatching_enabled,
|
||||
max_number_of_batched_tasks: options.max_number_of_batched_tasks,
|
||||
batched_tasks_size_limit: options.batched_tasks_size_limit,
|
||||
dumps_path: options.dumps_path.clone(),
|
||||
snapshots_path: options.snapshots_path.clone(),
|
||||
autobatching_enabled: *autobatching_enabled,
|
||||
max_number_of_batched_tasks: *max_number_of_batched_tasks,
|
||||
batched_tasks_size_limit: *batched_tasks_size_limit,
|
||||
dumps_path: dumps_path.clone(),
|
||||
snapshots_path: snapshots_path.clone(),
|
||||
auth_env,
|
||||
version_file_path: options.version_file_path.clone(),
|
||||
embedding_cache_cap: options.embedding_cache_cap,
|
||||
experimental_no_snapshot_compaction: options.experimental_no_snapshot_compaction,
|
||||
version_file_path: version_file_path.clone(),
|
||||
embedding_cache_cap: *embedding_cache_cap,
|
||||
experimental_no_snapshot_compaction: *experimental_no_snapshot_compaction,
|
||||
s3_snapshot_options: indexer_config.s3_snapshot_options.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,8 @@ use std::sync::atomic::Ordering;
|
||||
|
||||
use meilisearch_types::heed::CompactionOption;
|
||||
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
|
||||
#[cfg(unix)]
|
||||
use meilisearch_types::milli::update::S3SnapshotOptions;
|
||||
use meilisearch_types::tasks::{Status, Task};
|
||||
use meilisearch_types::{compression, VERSION_FILE_NAME};
|
||||
|
||||
@@ -78,10 +80,32 @@ impl IndexScheduler {
|
||||
pub(super) fn process_snapshot(
|
||||
&self,
|
||||
progress: Progress,
|
||||
mut tasks: Vec<Task>,
|
||||
tasks: Vec<Task>,
|
||||
) -> Result<Vec<Task>> {
|
||||
progress.update_progress(SnapshotCreationProgress::StartTheSnapshotCreation);
|
||||
|
||||
match self.scheduler.s3_snapshot_options.clone() {
|
||||
Some(options) => {
|
||||
#[cfg(not(unix))]
|
||||
{
|
||||
let _ = options;
|
||||
panic!("Non-unix platform does not support S3 snapshotting");
|
||||
}
|
||||
#[cfg(unix)]
|
||||
self.runtime
|
||||
.as_ref()
|
||||
.expect("Runtime not initialized")
|
||||
.block_on(self.process_snapshot_to_s3(progress, options, tasks))
|
||||
}
|
||||
None => self.process_snapshots_to_disk(progress, tasks),
|
||||
}
|
||||
}
|
||||
|
||||
fn process_snapshots_to_disk(
|
||||
&self,
|
||||
progress: Progress,
|
||||
mut tasks: Vec<Task>,
|
||||
) -> Result<Vec<Task>, Error> {
|
||||
fs::create_dir_all(&self.scheduler.snapshots_path)?;
|
||||
let temp_snapshot_dir = tempfile::tempdir()?;
|
||||
|
||||
@@ -140,7 +164,7 @@ impl IndexScheduler {
|
||||
let task =
|
||||
self.queue.tasks.get_task(&rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
|
||||
if let Some(content_uuid) = task.content_uuid() {
|
||||
let src = self.queue.file_store.get_update_path(content_uuid);
|
||||
let src = self.queue.file_store.update_path(content_uuid);
|
||||
let dst = update_files_dir.join(content_uuid.to_string());
|
||||
fs::copy(src, dst)?;
|
||||
}
|
||||
@@ -206,4 +230,335 @@ impl IndexScheduler {
|
||||
|
||||
Ok(tasks)
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(super) async fn process_snapshot_to_s3(
|
||||
&self,
|
||||
progress: Progress,
|
||||
opts: S3SnapshotOptions,
|
||||
mut tasks: Vec<Task>,
|
||||
) -> Result<Vec<Task>> {
|
||||
use std::collections::VecDeque;
|
||||
use std::fs::File;
|
||||
use std::io::{self, Seek as _, SeekFrom, Write as _};
|
||||
use std::os::fd::OwnedFd;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use path_slash::PathBufExt as _;
|
||||
use reqwest::header::ETAG;
|
||||
use reqwest::Client;
|
||||
use reqwest::Response;
|
||||
use rusty_s3::actions::{CreateMultipartUpload, S3Action as _};
|
||||
use rusty_s3::{Bucket, BucketError, Credentials, UrlStyle};
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
let S3SnapshotOptions {
|
||||
s3_bucket_url,
|
||||
s3_bucket_region,
|
||||
s3_bucket_name,
|
||||
s3_snapshot_prefix,
|
||||
s3_access_key,
|
||||
s3_secret_key,
|
||||
s3_max_in_flight_parts,
|
||||
s3_compression_level: level,
|
||||
s3_signature_duration,
|
||||
s3_multipart_part_size,
|
||||
} = opts;
|
||||
|
||||
let must_stop_processing = self.scheduler.must_stop_processing.clone();
|
||||
let retry_backoff = backoff::ExponentialBackoff::default();
|
||||
|
||||
let (reader, writer) = std::io::pipe()?;
|
||||
let uploader_task = tokio::spawn(async move {
|
||||
let reader = OwnedFd::from(reader);
|
||||
let reader = tokio::net::unix::pipe::Receiver::from_owned_fd(reader)?;
|
||||
|
||||
let s3_snapshot_prefix = PathBuf::from_slash(s3_snapshot_prefix);
|
||||
let url = s3_bucket_url
|
||||
.parse()
|
||||
.map_err(BucketError::ParseError)
|
||||
.map_err(Error::S3BucketError)?;
|
||||
let bucket = Bucket::new(url, UrlStyle::Path, s3_bucket_name, s3_bucket_region)
|
||||
.map_err(Error::S3BucketError)?;
|
||||
let credential = Credentials::new(s3_access_key, s3_secret_key);
|
||||
// TODO change this and use the database name like in the original version
|
||||
let object_path = s3_snapshot_prefix.join("data.ms.snapshot");
|
||||
let object = object_path.to_slash().expect("Invalid UTF-8 path").into_owned();
|
||||
|
||||
let action = bucket.create_multipart_upload(Some(&credential), &object);
|
||||
let url = action.sign(s3_signature_duration);
|
||||
let client = Client::new();
|
||||
let resp = client.post(url).send().await.map_err(Error::S3HttpError)?;
|
||||
let status = resp.status();
|
||||
let body = match resp.error_for_status_ref() {
|
||||
Ok(_) => resp.text().await.map_err(Error::S3HttpError)?,
|
||||
Err(_) => {
|
||||
return Err(Error::S3Error {
|
||||
status,
|
||||
body: resp.text().await.unwrap_or_default(),
|
||||
})
|
||||
}
|
||||
};
|
||||
|
||||
let multipart = CreateMultipartUpload::parse_response(&body)
|
||||
.map_err(|e| Error::S3XmlError(Box::new(e)))?;
|
||||
tracing::debug!("Starting the upload of the snapshot to {object}");
|
||||
|
||||
// We use this bumpalo for etags strings.
|
||||
let bump = bumpalo::Bump::new();
|
||||
let mut etags = Vec::<&str>::new();
|
||||
let mut in_flight =
|
||||
VecDeque::<(JoinHandle<reqwest::Result<Response>>, Bytes)>::with_capacity(
|
||||
s3_max_in_flight_parts.get(),
|
||||
);
|
||||
|
||||
// Part numbers start at 1 and cannot be larger than 10k
|
||||
for part_number in 1u16.. {
|
||||
if must_stop_processing.get() {
|
||||
return Err(Error::AbortedTask);
|
||||
}
|
||||
|
||||
let part_upload = bucket.upload_part(
|
||||
Some(&credential),
|
||||
&object,
|
||||
part_number,
|
||||
multipart.upload_id(),
|
||||
);
|
||||
let url = part_upload.sign(s3_signature_duration);
|
||||
|
||||
// Wait for a buffer to be ready if there are in-flight parts that landed
|
||||
let mut buffer = if in_flight.len() >= s3_max_in_flight_parts.get() {
|
||||
let (request, buffer) = in_flight.pop_front().unwrap();
|
||||
let resp = request.await.unwrap().map_err(Error::S3HttpError)?;
|
||||
let resp = match resp.error_for_status_ref() {
|
||||
Ok(response) => response,
|
||||
Err(_) => {
|
||||
return Err(Error::S3Error {
|
||||
status: resp.status(),
|
||||
body: resp.text().await.unwrap_or_default(),
|
||||
});
|
||||
}
|
||||
};
|
||||
let etag =
|
||||
resp.headers().get(ETAG).expect("every UploadPart request returns an Etag");
|
||||
let mut buffer = match buffer.try_into_mut() {
|
||||
Ok(buffer) => buffer,
|
||||
Err(_) => unreachable!("Impossible to convert into BytesMut"),
|
||||
};
|
||||
etags.push(bump.alloc_str(etag.to_str().unwrap()));
|
||||
buffer.clear();
|
||||
buffer
|
||||
} else {
|
||||
BytesMut::with_capacity(s3_multipart_part_size as usize)
|
||||
};
|
||||
|
||||
// If we successfully read enough bytes,
|
||||
// we can continue and send the buffer/part
|
||||
while buffer.len() < (s3_multipart_part_size as usize / 2) {
|
||||
// Wait for the pipe to be readable
|
||||
reader.readable().await?;
|
||||
|
||||
match reader.try_read_buf(&mut buffer) {
|
||||
Ok(0) => break,
|
||||
// We read some bytes but maybe not enough
|
||||
Ok(_) => continue,
|
||||
// The readiness event is a false positive.
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
|
||||
Err(e) => return Err(e.into()),
|
||||
}
|
||||
}
|
||||
|
||||
if buffer.is_empty() {
|
||||
// Break the loop if the buffer is
|
||||
// empty after we tried to read bytes
|
||||
break;
|
||||
}
|
||||
|
||||
let body = buffer.freeze();
|
||||
tracing::trace!("Sending part {part_number}");
|
||||
let task = tokio::spawn({
|
||||
let client = client.clone();
|
||||
let body = body.clone();
|
||||
backoff::future::retry(retry_backoff.clone(), move || {
|
||||
let client = client.clone();
|
||||
let url = url.clone();
|
||||
let body = body.clone();
|
||||
async move {
|
||||
match client.put(url).body(body).send().await {
|
||||
Ok(resp) if resp.status().is_client_error() => {
|
||||
resp.error_for_status().map_err(backoff::Error::Permanent)
|
||||
}
|
||||
Ok(resp) => Ok(resp),
|
||||
Err(e) => Err(backoff::Error::transient(e)),
|
||||
}
|
||||
}
|
||||
})
|
||||
});
|
||||
in_flight.push_back((task, body));
|
||||
}
|
||||
|
||||
for (join_handle, _buffer) in in_flight {
|
||||
let resp = join_handle.await.unwrap().map_err(Error::S3HttpError)?;
|
||||
let resp = match resp.error_for_status_ref() {
|
||||
Ok(response) => response,
|
||||
Err(_) => {
|
||||
return Err(Error::S3Error {
|
||||
status: resp.status(),
|
||||
body: resp.text().await.unwrap_or_default(),
|
||||
})
|
||||
}
|
||||
};
|
||||
let etag =
|
||||
resp.headers().get(ETAG).expect("every UploadPart request returns an Etag");
|
||||
etags.push(bump.alloc_str(etag.to_str().unwrap()));
|
||||
}
|
||||
|
||||
tracing::trace!("Finalizing the multipart upload");
|
||||
|
||||
let action = bucket.complete_multipart_upload(
|
||||
Some(&credential),
|
||||
&object,
|
||||
multipart.upload_id(),
|
||||
etags.iter().map(AsRef::as_ref),
|
||||
);
|
||||
let url = action.sign(s3_signature_duration);
|
||||
let body = action.body();
|
||||
let resp = backoff::future::retry(retry_backoff, move || {
|
||||
let client = client.clone();
|
||||
let url = url.clone();
|
||||
let body = body.clone();
|
||||
async move {
|
||||
match client.post(url).body(body).send().await {
|
||||
Ok(resp) if resp.status().is_client_error() => {
|
||||
resp.error_for_status().map_err(backoff::Error::Permanent)
|
||||
}
|
||||
Ok(resp) => Ok(resp),
|
||||
Err(e) => Err(backoff::Error::transient(e)),
|
||||
}
|
||||
}
|
||||
})
|
||||
.await
|
||||
.map_err(Error::S3HttpError)?;
|
||||
|
||||
let status = resp.status();
|
||||
let body =
|
||||
resp.text().await.map_err(|e| Error::S3Error { status, body: e.to_string() })?;
|
||||
|
||||
if status.is_success() {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(Error::S3Error { status, body })
|
||||
}
|
||||
});
|
||||
|
||||
let index_scheduler = IndexScheduler::private_clone(self);
|
||||
let builder_task = tokio::task::spawn_blocking(move || {
|
||||
let writer = flate2::write::GzEncoder::new(writer, flate2::Compression::new(level));
|
||||
let mut tarball = tar::Builder::new(writer);
|
||||
|
||||
// 1. Snapshot the version file
|
||||
tarball.append_path_with_name(
|
||||
&index_scheduler.scheduler.version_file_path,
|
||||
VERSION_FILE_NAME,
|
||||
)?;
|
||||
|
||||
// 2. Snapshot the index scheduler LMDB env
|
||||
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler);
|
||||
let mut tasks_env_file = index_scheduler.env.try_clone_inner_file()?;
|
||||
// Note: Seeking to the start of the file is necessary to ensure
|
||||
// the tarball reads the file from the beginning. I still
|
||||
// don't know why the read cursor is not at the beginning.
|
||||
tasks_env_file.seek(SeekFrom::Start(0))?;
|
||||
let path = Path::new("tasks").join("data.mdb");
|
||||
tarball.append_file(path, &mut tasks_env_file)?;
|
||||
drop(tasks_env_file);
|
||||
|
||||
// 2.3 Create a read transaction on the index-scheduler
|
||||
let rtxn = index_scheduler.env.read_txn()?;
|
||||
|
||||
// 2.4 Create the update files directory
|
||||
// And only copy the update files of the enqueued tasks
|
||||
progress.update_progress(SnapshotCreationProgress::SnapshotTheUpdateFiles);
|
||||
let enqueued = index_scheduler.queue.tasks.get_status(&rtxn, Status::Enqueued)?;
|
||||
let (atomic, update_file_progress) = AtomicUpdateFileStep::new(enqueued.len() as u32);
|
||||
progress.update_progress(update_file_progress);
|
||||
// TODO I need to create the update files directory (even if empty)
|
||||
// I should probably simply use the append_dir_all method
|
||||
// but I'll loose the progression.
|
||||
let update_files_dir = Path::new("update_files");
|
||||
for task_id in enqueued {
|
||||
let task = index_scheduler
|
||||
.queue
|
||||
.tasks
|
||||
.get_task(&rtxn, task_id)?
|
||||
.ok_or(Error::CorruptedTaskQueue)?;
|
||||
if let Some(content_uuid) = task.content_uuid() {
|
||||
let src = index_scheduler.queue.file_store.update_path(content_uuid);
|
||||
let mut update_file = File::open(src)?;
|
||||
let path = update_files_dir.join(content_uuid.to_string());
|
||||
tarball.append_file(path, &mut update_file)?;
|
||||
}
|
||||
atomic.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
// 3. Snapshot every indexes
|
||||
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexes);
|
||||
let index_mapping = index_scheduler.index_mapper.index_mapping;
|
||||
let nb_indexes = index_mapping.len(&rtxn)? as u32;
|
||||
let indexes_dir = Path::new("indexes");
|
||||
let indexes_references: Vec<_> = index_scheduler
|
||||
.index_mapper
|
||||
.index_mapping
|
||||
.iter(&rtxn)?
|
||||
.map(|res| res.map_err(Error::from).map(|(name, uuid)| (name.to_string(), uuid)))
|
||||
.collect::<Result<_, Error>>()?;
|
||||
|
||||
// It's prettier to use a for loop instead of the IndexMapper::try_for_each_index
|
||||
// method, especially when we need to access the UUID, local path and index number.
|
||||
for (i, (name, uuid)) in indexes_references.into_iter().enumerate() {
|
||||
progress.update_progress(VariableNameStep::<SnapshotCreationProgress>::new(
|
||||
&name, i as u32, nb_indexes,
|
||||
));
|
||||
let path = indexes_dir.join(uuid.to_string()).join("data.mdb");
|
||||
let index = index_scheduler.index_mapper.index(&rtxn, &name)?;
|
||||
let mut index_file = index.try_clone_inner_file().unwrap();
|
||||
index_file.seek(SeekFrom::Start(0))?;
|
||||
tracing::trace!("Appending index file for {name} in {}", path.display());
|
||||
tarball.append_file(path, &mut index_file)?;
|
||||
}
|
||||
|
||||
drop(rtxn);
|
||||
|
||||
// 4. Snapshot the auth LMDB env
|
||||
progress.update_progress(SnapshotCreationProgress::SnapshotTheApiKeys);
|
||||
let mut auth_env_file =
|
||||
index_scheduler.scheduler.auth_env.try_clone_inner_file().unwrap();
|
||||
auth_env_file.seek(SeekFrom::Start(0))?;
|
||||
let path = Path::new("auth").join("data.mdb");
|
||||
tarball.append_file(path, &mut auth_env_file)?;
|
||||
|
||||
let mut gzencoder = tarball.into_inner()?;
|
||||
gzencoder.flush()?;
|
||||
gzencoder.try_finish()?;
|
||||
let mut writer = gzencoder.finish()?;
|
||||
writer.flush()?;
|
||||
|
||||
Result::<_, Error>::Ok(())
|
||||
});
|
||||
|
||||
let (uploader_result, builder_result) = tokio::join!(uploader_task, builder_task);
|
||||
|
||||
// Check uploader result first to early return on task abortion.
|
||||
uploader_result.unwrap()?;
|
||||
builder_result.unwrap()?;
|
||||
|
||||
for task in &mut tasks {
|
||||
task.status = Status::Succeeded;
|
||||
}
|
||||
|
||||
Ok(tasks)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -126,7 +126,7 @@ impl IndexScheduler {
|
||||
std::fs::create_dir_all(&options.auth_path).unwrap();
|
||||
let auth_env = open_auth_store_env(&options.auth_path).unwrap();
|
||||
let index_scheduler =
|
||||
Self::new(options, auth_env, version, sender, planned_failures).unwrap();
|
||||
Self::new(options, auth_env, version, None, sender, planned_failures).unwrap();
|
||||
|
||||
// To be 100% consistent between all test we're going to start the scheduler right now
|
||||
// and ensure it's in the expected starting state.
|
||||
|
||||
@@ -390,6 +390,9 @@ TooManyVectors , InvalidRequest , BAD_REQU
|
||||
UnretrievableDocument , Internal , BAD_REQUEST ;
|
||||
UnretrievableErrorCode , InvalidRequest , BAD_REQUEST ;
|
||||
UnsupportedMediaType , InvalidRequest , UNSUPPORTED_MEDIA_TYPE ;
|
||||
InvalidS3SnapshotRequest , Internal , BAD_REQUEST ;
|
||||
InvalidS3SnapshotParameters , Internal , BAD_REQUEST ;
|
||||
S3SnapshotServerError , Internal , BAD_GATEWAY ;
|
||||
|
||||
// Experimental features
|
||||
VectorEmbeddingError , InvalidRequest , BAD_REQUEST ;
|
||||
|
||||
@@ -217,6 +217,7 @@ struct Infos {
|
||||
import_snapshot: bool,
|
||||
schedule_snapshot: Option<u64>,
|
||||
snapshot_dir: bool,
|
||||
uses_s3_snapshots: bool,
|
||||
ignore_missing_snapshot: bool,
|
||||
ignore_snapshot_if_db_exists: bool,
|
||||
http_addr: bool,
|
||||
@@ -285,6 +286,7 @@ impl Infos {
|
||||
indexer_options,
|
||||
config_file_path,
|
||||
no_analytics: _,
|
||||
s3_snapshot_options,
|
||||
} = options;
|
||||
|
||||
let schedule_snapshot = match schedule_snapshot {
|
||||
@@ -348,6 +350,7 @@ impl Infos {
|
||||
import_snapshot: import_snapshot.is_some(),
|
||||
schedule_snapshot,
|
||||
snapshot_dir: snapshot_dir != PathBuf::from("snapshots/"),
|
||||
uses_s3_snapshots: s3_snapshot_options.is_some(),
|
||||
ignore_missing_snapshot,
|
||||
ignore_snapshot_if_db_exists,
|
||||
http_addr: http_addr != default_http_addr(),
|
||||
|
||||
@@ -216,7 +216,10 @@ enum OnFailure {
|
||||
KeepDb,
|
||||
}
|
||||
|
||||
pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<AuthController>)> {
|
||||
pub fn setup_meilisearch(
|
||||
opt: &Opt,
|
||||
handle: tokio::runtime::Handle,
|
||||
) -> anyhow::Result<(Arc<IndexScheduler>, Arc<AuthController>)> {
|
||||
let index_scheduler_opt = IndexSchedulerOptions {
|
||||
version_file_path: opt.db_path.join(VERSION_FILE_NAME),
|
||||
auth_path: opt.db_path.join("auth"),
|
||||
@@ -230,7 +233,11 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
|
||||
task_db_size: opt.max_task_db_size.as_u64() as usize,
|
||||
index_base_map_size: opt.max_index_size.as_u64() as usize,
|
||||
enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage,
|
||||
indexer_config: Arc::new((&opt.indexer_options).try_into()?),
|
||||
indexer_config: Arc::new({
|
||||
let s3_snapshot_options =
|
||||
opt.s3_snapshot_options.clone().map(|opt| opt.try_into()).transpose()?;
|
||||
IndexerConfig { s3_snapshot_options, ..(&opt.indexer_options).try_into()? }
|
||||
}),
|
||||
autobatching_enabled: true,
|
||||
cleanup_enabled: !opt.experimental_replication_parameters,
|
||||
max_number_of_tasks: 1_000_000,
|
||||
@@ -256,6 +263,7 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
|
||||
index_scheduler_opt,
|
||||
OnFailure::RemoveDb,
|
||||
binary_version, // the db is empty
|
||||
handle,
|
||||
)?,
|
||||
Err(e) => {
|
||||
std::fs::remove_dir_all(&opt.db_path)?;
|
||||
@@ -273,7 +281,7 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
|
||||
bail!("snapshot doesn't exist at {}", snapshot_path.display())
|
||||
// the snapshot and the db exist, and we can ignore the snapshot because of the ignore_snapshot_if_db_exists flag
|
||||
} else {
|
||||
open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version)?
|
||||
open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version, handle)?
|
||||
}
|
||||
} else if let Some(ref path) = opt.import_dump {
|
||||
let src_path_exists = path.exists();
|
||||
@@ -284,6 +292,7 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
|
||||
index_scheduler_opt,
|
||||
OnFailure::RemoveDb,
|
||||
binary_version, // the db is empty
|
||||
handle,
|
||||
)?;
|
||||
match import_dump(&opt.db_path, path, &mut index_scheduler, &mut auth_controller) {
|
||||
Ok(()) => (index_scheduler, auth_controller),
|
||||
@@ -304,10 +313,10 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
|
||||
// the dump and the db exist and we can ignore the dump because of the ignore_dump_if_db_exists flag
|
||||
// or, the dump is missing but we can ignore that because of the ignore_missing_dump flag
|
||||
} else {
|
||||
open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version)?
|
||||
open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version, handle)?
|
||||
}
|
||||
} else {
|
||||
open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version)?
|
||||
open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version, handle)?
|
||||
};
|
||||
|
||||
// We create a loop in a thread that registers snapshotCreation tasks
|
||||
@@ -338,6 +347,7 @@ fn open_or_create_database_unchecked(
|
||||
index_scheduler_opt: IndexSchedulerOptions,
|
||||
on_failure: OnFailure,
|
||||
version: (u32, u32, u32),
|
||||
handle: tokio::runtime::Handle,
|
||||
) -> anyhow::Result<(IndexScheduler, AuthController)> {
|
||||
// we don't want to create anything in the data.ms yet, thus we
|
||||
// wrap our two builders in a closure that'll be executed later.
|
||||
@@ -345,7 +355,7 @@ fn open_or_create_database_unchecked(
|
||||
let auth_env = open_auth_store_env(&index_scheduler_opt.auth_path).unwrap();
|
||||
let auth_controller = AuthController::new(auth_env.clone(), &opt.master_key);
|
||||
let index_scheduler_builder = || -> anyhow::Result<_> {
|
||||
Ok(IndexScheduler::new(index_scheduler_opt, auth_env, version)?)
|
||||
Ok(IndexScheduler::new(index_scheduler_opt, auth_env, version, Some(handle))?)
|
||||
};
|
||||
|
||||
match (
|
||||
@@ -452,6 +462,7 @@ fn open_or_create_database(
|
||||
index_scheduler_opt: IndexSchedulerOptions,
|
||||
empty_db: bool,
|
||||
binary_version: (u32, u32, u32),
|
||||
handle: tokio::runtime::Handle,
|
||||
) -> anyhow::Result<(IndexScheduler, AuthController)> {
|
||||
let version = if !empty_db {
|
||||
check_version(opt, &index_scheduler_opt, binary_version)?
|
||||
@@ -459,7 +470,7 @@ fn open_or_create_database(
|
||||
binary_version
|
||||
};
|
||||
|
||||
open_or_create_database_unchecked(opt, index_scheduler_opt, OnFailure::KeepDb, version)
|
||||
open_or_create_database_unchecked(opt, index_scheduler_opt, OnFailure::KeepDb, version, handle)
|
||||
}
|
||||
|
||||
fn import_dump(
|
||||
@@ -527,7 +538,11 @@ fn import_dump(
|
||||
let indexer_config = if base_config.max_threads.is_none() {
|
||||
let (thread_pool, _) = default_thread_pool_and_threads();
|
||||
|
||||
let _config = IndexerConfig { thread_pool, ..*base_config };
|
||||
let _config = IndexerConfig {
|
||||
thread_pool,
|
||||
s3_snapshot_options: base_config.s3_snapshot_options.clone(),
|
||||
..*base_config
|
||||
};
|
||||
backup_config = _config;
|
||||
&backup_config
|
||||
} else {
|
||||
|
||||
@@ -76,7 +76,10 @@ fn on_panic(info: &std::panic::PanicHookInfo) {
|
||||
|
||||
#[actix_web::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
try_main().await.inspect_err(|error| {
|
||||
// won't panic inside of tokio::main
|
||||
let runtime = tokio::runtime::Handle::current();
|
||||
|
||||
try_main(runtime).await.inspect_err(|error| {
|
||||
tracing::error!(%error);
|
||||
let mut current = error.source();
|
||||
let mut depth = 0;
|
||||
@@ -88,7 +91,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
})
|
||||
}
|
||||
|
||||
async fn try_main() -> anyhow::Result<()> {
|
||||
async fn try_main(runtime: tokio::runtime::Handle) -> anyhow::Result<()> {
|
||||
let (opt, config_read_from) = Opt::try_build()?;
|
||||
|
||||
std::panic::set_hook(Box::new(on_panic));
|
||||
@@ -122,7 +125,7 @@ async fn try_main() -> anyhow::Result<()> {
|
||||
_ => (),
|
||||
}
|
||||
|
||||
let (index_scheduler, auth_controller) = setup_meilisearch(&opt)?;
|
||||
let (index_scheduler, auth_controller) = setup_meilisearch(&opt, runtime)?;
|
||||
|
||||
let analytics =
|
||||
analytics::Analytics::new(&opt, index_scheduler.clone(), auth_controller.clone()).await;
|
||||
|
||||
@@ -7,12 +7,13 @@ use std::ops::Deref;
|
||||
use std::path::PathBuf;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::{env, fmt, fs};
|
||||
|
||||
use byte_unit::{Byte, ParseError, UnitType};
|
||||
use clap::Parser;
|
||||
use meilisearch_types::features::InstanceTogglableFeatures;
|
||||
use meilisearch_types::milli::update::IndexerConfig;
|
||||
use meilisearch_types::milli::update::{IndexerConfig, S3SnapshotOptions};
|
||||
use meilisearch_types::milli::ThreadPoolNoAbortBuilder;
|
||||
use rustls::server::{ServerSessionMemoryCache, WebPkiClientVerifier};
|
||||
use rustls::RootCertStore;
|
||||
@@ -74,6 +75,19 @@ const MEILI_EXPERIMENTAL_EMBEDDING_CACHE_ENTRIES: &str =
|
||||
const MEILI_EXPERIMENTAL_NO_SNAPSHOT_COMPACTION: &str = "MEILI_EXPERIMENTAL_NO_SNAPSHOT_COMPACTION";
|
||||
const MEILI_EXPERIMENTAL_NO_EDITION_2024_FOR_DUMPS: &str =
|
||||
"MEILI_EXPERIMENTAL_NO_EDITION_2024_FOR_DUMPS";
|
||||
|
||||
// Related to S3 snapshots
|
||||
const MEILI_S3_BUCKET_URL: &str = "MEILI_S3_BUCKET_URL";
|
||||
const MEILI_S3_BUCKET_REGION: &str = "MEILI_S3_BUCKET_REGION";
|
||||
const MEILI_S3_BUCKET_NAME: &str = "MEILI_S3_BUCKET_NAME";
|
||||
const MEILI_S3_SNAPSHOT_PREFIX: &str = "MEILI_S3_SNAPSHOT_PREFIX";
|
||||
const MEILI_S3_ACCESS_KEY: &str = "MEILI_S3_ACCESS_KEY";
|
||||
const MEILI_S3_SECRET_KEY: &str = "MEILI_S3_SECRET_KEY";
|
||||
const MEILI_S3_MAX_IN_FLIGHT_PARTS: &str = "MEILI_S3_MAX_IN_FLIGHT_PARTS";
|
||||
const MEILI_S3_COMPRESSION_LEVEL: &str = "MEILI_S3_COMPRESSION_LEVEL";
|
||||
const MEILI_S3_SIGNATURE_DURATION_SECONDS: &str = "MEILI_S3_SIGNATURE_DURATION_SECONDS";
|
||||
const MEILI_S3_MULTIPART_PART_SIZE: &str = "MEILI_S3_MULTIPART_PART_SIZE";
|
||||
|
||||
const DEFAULT_CONFIG_FILE_PATH: &str = "./config.toml";
|
||||
const DEFAULT_DB_PATH: &str = "./data.ms";
|
||||
const DEFAULT_HTTP_ADDR: &str = "localhost:7700";
|
||||
@@ -83,6 +97,10 @@ const DEFAULT_SNAPSHOT_DIR: &str = "snapshots/";
|
||||
const DEFAULT_SNAPSHOT_INTERVAL_SEC: u64 = 86400;
|
||||
const DEFAULT_SNAPSHOT_INTERVAL_SEC_STR: &str = "86400";
|
||||
const DEFAULT_DUMP_DIR: &str = "dumps/";
|
||||
const DEFAULT_S3_SNAPSHOT_MAX_IN_FLIGHT_PARTS: NonZeroUsize = NonZeroUsize::new(10).unwrap();
|
||||
const DEFAULT_S3_SNAPSHOT_COMPRESSION_LEVEL: u32 = 0;
|
||||
const DEFAULT_S3_SNAPSHOT_SIGNATURE_DURATION_SECONDS: u64 = 8 * 3600; // 8 hours
|
||||
const DEFAULT_S3_SNAPSHOT_MULTIPART_PART_SIZE: Byte = Byte::from_u64(375 * 1024 * 1024); // 375 MiB
|
||||
|
||||
const MEILI_MAX_INDEXING_MEMORY: &str = "MEILI_MAX_INDEXING_MEMORY";
|
||||
const MEILI_MAX_INDEXING_THREADS: &str = "MEILI_MAX_INDEXING_THREADS";
|
||||
@@ -479,6 +497,10 @@ pub struct Opt {
|
||||
#[clap(flatten)]
|
||||
pub indexer_options: IndexerOpts,
|
||||
|
||||
#[serde(flatten)]
|
||||
#[clap(flatten)]
|
||||
pub s3_snapshot_options: Option<S3SnapshotOpts>,
|
||||
|
||||
/// Set the path to a configuration file that should be used to setup the engine.
|
||||
/// Format must be TOML.
|
||||
#[clap(long)]
|
||||
@@ -580,6 +602,7 @@ impl Opt {
|
||||
experimental_limit_batched_tasks_total_size,
|
||||
experimental_embedding_cache_entries,
|
||||
experimental_no_snapshot_compaction,
|
||||
s3_snapshot_options,
|
||||
} = self;
|
||||
export_to_env_if_not_present(MEILI_DB_PATH, db_path);
|
||||
export_to_env_if_not_present(MEILI_HTTP_ADDR, http_addr);
|
||||
@@ -681,6 +704,9 @@ impl Opt {
|
||||
experimental_no_snapshot_compaction.to_string(),
|
||||
);
|
||||
indexer_options.export_to_env();
|
||||
if let Some(s3_snapshot_options) = s3_snapshot_options {
|
||||
s3_snapshot_options.export_to_env();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_ssl_config(&self) -> anyhow::Result<Option<rustls::ServerConfig>> {
|
||||
@@ -849,6 +875,16 @@ impl TryFrom<&IndexerOpts> for IndexerConfig {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(other: &IndexerOpts) -> Result<Self, Self::Error> {
|
||||
let IndexerOpts {
|
||||
max_indexing_memory,
|
||||
max_indexing_threads,
|
||||
skip_index_budget,
|
||||
experimental_no_edition_2024_for_settings,
|
||||
experimental_no_edition_2024_for_dumps,
|
||||
experimental_no_edition_2024_for_prefix_post_processing,
|
||||
experimental_no_edition_2024_for_facet_post_processing,
|
||||
} = other;
|
||||
|
||||
let thread_pool = ThreadPoolNoAbortBuilder::new_for_indexing()
|
||||
.num_threads(other.max_indexing_threads.unwrap_or_else(|| num_cpus::get() / 2))
|
||||
.build()?;
|
||||
@@ -856,21 +892,152 @@ impl TryFrom<&IndexerOpts> for IndexerConfig {
|
||||
Ok(Self {
|
||||
thread_pool,
|
||||
log_every_n: Some(DEFAULT_LOG_EVERY_N),
|
||||
max_memory: other.max_indexing_memory.map(|b| b.as_u64() as usize),
|
||||
max_threads: *other.max_indexing_threads,
|
||||
max_memory: max_indexing_memory.map(|b| b.as_u64() as usize),
|
||||
max_threads: max_indexing_threads.0,
|
||||
max_positions_per_attributes: None,
|
||||
skip_index_budget: other.skip_index_budget,
|
||||
experimental_no_edition_2024_for_settings: other
|
||||
.experimental_no_edition_2024_for_settings,
|
||||
experimental_no_edition_2024_for_dumps: other.experimental_no_edition_2024_for_dumps,
|
||||
skip_index_budget: *skip_index_budget,
|
||||
experimental_no_edition_2024_for_settings: *experimental_no_edition_2024_for_settings,
|
||||
experimental_no_edition_2024_for_dumps: *experimental_no_edition_2024_for_dumps,
|
||||
chunk_compression_type: Default::default(),
|
||||
chunk_compression_level: Default::default(),
|
||||
documents_chunk_size: Default::default(),
|
||||
max_nb_chunks: Default::default(),
|
||||
experimental_no_edition_2024_for_prefix_post_processing: other
|
||||
.experimental_no_edition_2024_for_prefix_post_processing,
|
||||
experimental_no_edition_2024_for_facet_post_processing: other
|
||||
.experimental_no_edition_2024_for_facet_post_processing,
|
||||
experimental_no_edition_2024_for_prefix_post_processing:
|
||||
*experimental_no_edition_2024_for_prefix_post_processing,
|
||||
experimental_no_edition_2024_for_facet_post_processing:
|
||||
*experimental_no_edition_2024_for_facet_post_processing,
|
||||
s3_snapshot_options: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Parser, Deserialize)]
|
||||
// This group is a bit tricky but makes it possible to require all listed fields if one of them
|
||||
// is specified. It lets us keep an Option for the S3SnapshotOpts configuration.
|
||||
// <https://github.com/clap-rs/clap/issues/5092#issuecomment-2616986075>
|
||||
#[group(requires_all = ["s3_bucket_url", "s3_bucket_region", "s3_bucket_name", "s3_snapshot_prefix", "s3_access_key", "s3_secret_key"])]
|
||||
pub struct S3SnapshotOpts {
|
||||
/// The S3 bucket URL in the format https://s3.<region>.amazonaws.com.
|
||||
#[clap(long, env = MEILI_S3_BUCKET_URL, required = false)]
|
||||
#[serde(default)]
|
||||
pub s3_bucket_url: String,
|
||||
|
||||
/// The region in the format us-east-1.
|
||||
#[clap(long, env = MEILI_S3_BUCKET_REGION, required = false)]
|
||||
#[serde(default)]
|
||||
pub s3_bucket_region: String,
|
||||
|
||||
/// The bucket name.
|
||||
#[clap(long, env = MEILI_S3_BUCKET_NAME, required = false)]
|
||||
#[serde(default)]
|
||||
pub s3_bucket_name: String,
|
||||
|
||||
/// The prefix path where to put the snapshot, uses normal slashes (/).
|
||||
#[clap(long, env = MEILI_S3_SNAPSHOT_PREFIX, required = false)]
|
||||
#[serde(default)]
|
||||
pub s3_snapshot_prefix: String,
|
||||
|
||||
/// The S3 access key.
|
||||
#[clap(long, env = MEILI_S3_ACCESS_KEY, required = false)]
|
||||
#[serde(default)]
|
||||
pub s3_access_key: String,
|
||||
|
||||
/// The S3 secret key.
|
||||
#[clap(long, env = MEILI_S3_SECRET_KEY, required = false)]
|
||||
#[serde(default)]
|
||||
pub s3_secret_key: String,
|
||||
|
||||
/// The maximum number of parts that can be uploaded in parallel.
|
||||
#[clap(long, env = MEILI_S3_MAX_IN_FLIGHT_PARTS, default_value_t = default_s3_snapshot_max_in_flight_parts())]
|
||||
#[serde(default = "default_s3_snapshot_max_in_flight_parts")]
|
||||
pub s3_max_in_flight_parts: NonZeroUsize,
|
||||
|
||||
/// The compression level. Defaults to no compression (0).
|
||||
#[clap(long, env = MEILI_S3_COMPRESSION_LEVEL, default_value_t = default_s3_snapshot_compression_level())]
|
||||
#[serde(default = "default_s3_snapshot_compression_level")]
|
||||
pub s3_compression_level: u32,
|
||||
|
||||
/// The signature duration for the multipart upload.
|
||||
#[clap(long, env = MEILI_S3_SIGNATURE_DURATION_SECONDS, default_value_t = default_s3_snapshot_signature_duration_seconds())]
|
||||
#[serde(default = "default_s3_snapshot_signature_duration_seconds")]
|
||||
pub s3_signature_duration_seconds: u64,
|
||||
|
||||
/// The size of the the multipart parts.
|
||||
///
|
||||
/// Must not be less than 10MiB and larger than 8GiB. Yes,
|
||||
/// twice the boundaries of the AWS S3 multipart upload
|
||||
/// because we use it a bit differently internally.
|
||||
#[clap(long, env = MEILI_S3_MULTIPART_PART_SIZE, default_value_t = default_s3_snapshot_multipart_part_size())]
|
||||
#[serde(default = "default_s3_snapshot_multipart_part_size")]
|
||||
pub s3_multipart_part_size: Byte,
|
||||
}
|
||||
|
||||
impl S3SnapshotOpts {
|
||||
/// Exports the values to their corresponding env vars if they are not set.
|
||||
pub fn export_to_env(self) {
|
||||
let S3SnapshotOpts {
|
||||
s3_bucket_url,
|
||||
s3_bucket_region,
|
||||
s3_bucket_name,
|
||||
s3_snapshot_prefix,
|
||||
s3_access_key,
|
||||
s3_secret_key,
|
||||
s3_max_in_flight_parts,
|
||||
s3_compression_level,
|
||||
s3_signature_duration_seconds,
|
||||
s3_multipart_part_size,
|
||||
} = self;
|
||||
|
||||
export_to_env_if_not_present(MEILI_S3_BUCKET_URL, s3_bucket_url);
|
||||
export_to_env_if_not_present(MEILI_S3_BUCKET_REGION, s3_bucket_region);
|
||||
export_to_env_if_not_present(MEILI_S3_BUCKET_NAME, s3_bucket_name);
|
||||
export_to_env_if_not_present(MEILI_S3_SNAPSHOT_PREFIX, s3_snapshot_prefix);
|
||||
export_to_env_if_not_present(MEILI_S3_ACCESS_KEY, s3_access_key);
|
||||
export_to_env_if_not_present(MEILI_S3_SECRET_KEY, s3_secret_key);
|
||||
export_to_env_if_not_present(
|
||||
MEILI_S3_MAX_IN_FLIGHT_PARTS,
|
||||
s3_max_in_flight_parts.to_string(),
|
||||
);
|
||||
export_to_env_if_not_present(MEILI_S3_COMPRESSION_LEVEL, s3_compression_level.to_string());
|
||||
export_to_env_if_not_present(
|
||||
MEILI_S3_SIGNATURE_DURATION_SECONDS,
|
||||
s3_signature_duration_seconds.to_string(),
|
||||
);
|
||||
export_to_env_if_not_present(
|
||||
MEILI_S3_MULTIPART_PART_SIZE,
|
||||
s3_multipart_part_size.to_string(),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<S3SnapshotOpts> for S3SnapshotOptions {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(other: S3SnapshotOpts) -> Result<Self, Self::Error> {
|
||||
let S3SnapshotOpts {
|
||||
s3_bucket_url,
|
||||
s3_bucket_region,
|
||||
s3_bucket_name,
|
||||
s3_snapshot_prefix,
|
||||
s3_access_key,
|
||||
s3_secret_key,
|
||||
s3_max_in_flight_parts,
|
||||
s3_compression_level,
|
||||
s3_signature_duration_seconds,
|
||||
s3_multipart_part_size,
|
||||
} = other;
|
||||
|
||||
Ok(S3SnapshotOptions {
|
||||
s3_bucket_url,
|
||||
s3_bucket_region,
|
||||
s3_bucket_name,
|
||||
s3_snapshot_prefix,
|
||||
s3_access_key,
|
||||
s3_secret_key,
|
||||
s3_max_in_flight_parts,
|
||||
s3_compression_level,
|
||||
s3_signature_duration: Duration::from_secs(s3_signature_duration_seconds),
|
||||
s3_multipart_part_size: s3_multipart_part_size.as_u64(),
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -1089,6 +1256,22 @@ fn default_snapshot_interval_sec() -> &'static str {
|
||||
DEFAULT_SNAPSHOT_INTERVAL_SEC_STR
|
||||
}
|
||||
|
||||
fn default_s3_snapshot_max_in_flight_parts() -> NonZeroUsize {
|
||||
DEFAULT_S3_SNAPSHOT_MAX_IN_FLIGHT_PARTS
|
||||
}
|
||||
|
||||
fn default_s3_snapshot_compression_level() -> u32 {
|
||||
DEFAULT_S3_SNAPSHOT_COMPRESSION_LEVEL
|
||||
}
|
||||
|
||||
fn default_s3_snapshot_signature_duration_seconds() -> u64 {
|
||||
DEFAULT_S3_SNAPSHOT_SIGNATURE_DURATION_SECONDS
|
||||
}
|
||||
|
||||
fn default_s3_snapshot_multipart_part_size() -> Byte {
|
||||
DEFAULT_S3_SNAPSHOT_MULTIPART_PART_SIZE
|
||||
}
|
||||
|
||||
fn default_dump_dir() -> PathBuf {
|
||||
PathBuf::from(DEFAULT_DUMP_DIR)
|
||||
}
|
||||
|
||||
@@ -49,8 +49,8 @@ impl Server<Owned> {
|
||||
}
|
||||
|
||||
let options = default_settings(dir.path());
|
||||
|
||||
let (index_scheduler, auth) = setup_meilisearch(&options).unwrap();
|
||||
let handle = tokio::runtime::Handle::current();
|
||||
let (index_scheduler, auth) = setup_meilisearch(&options, handle).unwrap();
|
||||
let service = Service { index_scheduler, auth, options, api_key: None };
|
||||
|
||||
Server { service, _dir: Some(dir), _marker: PhantomData }
|
||||
@@ -65,7 +65,9 @@ impl Server<Owned> {
|
||||
|
||||
options.master_key = Some("MASTER_KEY".to_string());
|
||||
|
||||
let (index_scheduler, auth) = setup_meilisearch(&options).unwrap();
|
||||
let handle = tokio::runtime::Handle::current();
|
||||
|
||||
let (index_scheduler, auth) = setup_meilisearch(&options, handle).unwrap();
|
||||
let service = Service { index_scheduler, auth, options, api_key: None };
|
||||
|
||||
Server { service, _dir: Some(dir), _marker: PhantomData }
|
||||
@@ -78,7 +80,9 @@ impl Server<Owned> {
|
||||
}
|
||||
|
||||
pub async fn new_with_options(options: Opt) -> Result<Self, anyhow::Error> {
|
||||
let (index_scheduler, auth) = setup_meilisearch(&options)?;
|
||||
let handle = tokio::runtime::Handle::current();
|
||||
|
||||
let (index_scheduler, auth) = setup_meilisearch(&options, handle)?;
|
||||
let service = Service { index_scheduler, auth, options, api_key: None };
|
||||
|
||||
Ok(Server { service, _dir: None, _marker: PhantomData })
|
||||
@@ -217,8 +221,9 @@ impl Server<Shared> {
|
||||
}
|
||||
|
||||
let options = default_settings(dir.path());
|
||||
let handle = tokio::runtime::Handle::current();
|
||||
|
||||
let (index_scheduler, auth) = setup_meilisearch(&options).unwrap();
|
||||
let (index_scheduler, auth) = setup_meilisearch(&options, handle).unwrap();
|
||||
let service = Service { index_scheduler, auth, api_key: None, options };
|
||||
|
||||
Server { service, _dir: Some(dir), _marker: PhantomData }
|
||||
|
||||
@@ -68,7 +68,7 @@ fn convert_update_files(db_path: &Path) -> anyhow::Result<()> {
|
||||
|
||||
for uuid in file_store.all_uuids().context("while retrieving uuids from file store")? {
|
||||
let uuid = uuid.context("while retrieving uuid from file store")?;
|
||||
let update_file_path = file_store.get_update_path(uuid);
|
||||
let update_file_path = file_store.update_path(uuid);
|
||||
let update_file = file_store
|
||||
.get_update(uuid)
|
||||
.with_context(|| format!("while getting update file for uuid {uuid:?}"))?;
|
||||
|
||||
@@ -34,7 +34,7 @@ grenad = { version = "0.5.0", default-features = false, features = [
|
||||
"rayon",
|
||||
"tempfile",
|
||||
] }
|
||||
heed = { version = "0.22.1-nested-rtxns", default-features = false, features = [
|
||||
heed = { version = "0.22.1-nested-rtxns-2", default-features = false, features = [
|
||||
"serde-json",
|
||||
"serde-bincode",
|
||||
] }
|
||||
|
||||
@@ -425,6 +425,10 @@ impl Index {
|
||||
self.env.info().map_size
|
||||
}
|
||||
|
||||
pub fn try_clone_inner_file(&self) -> Result<File> {
|
||||
Ok(self.env.try_clone_inner_file()?)
|
||||
}
|
||||
|
||||
pub fn copy_to_file(&self, file: &mut File, option: CompactionOption) -> Result<()> {
|
||||
self.env.copy_to_file(file, option).map_err(Into::into)
|
||||
}
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
use std::num::NonZeroUsize;
|
||||
use std::time::Duration;
|
||||
|
||||
use grenad::CompressionType;
|
||||
|
||||
use super::GrenadParameters;
|
||||
@@ -20,6 +23,7 @@ pub struct IndexerConfig {
|
||||
pub experimental_no_edition_2024_for_dumps: bool,
|
||||
pub experimental_no_edition_2024_for_prefix_post_processing: bool,
|
||||
pub experimental_no_edition_2024_for_facet_post_processing: bool,
|
||||
pub s3_snapshot_options: Option<S3SnapshotOptions>,
|
||||
}
|
||||
|
||||
impl IndexerConfig {
|
||||
@@ -37,6 +41,20 @@ impl IndexerConfig {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct S3SnapshotOptions {
|
||||
pub s3_bucket_url: String,
|
||||
pub s3_bucket_region: String,
|
||||
pub s3_bucket_name: String,
|
||||
pub s3_snapshot_prefix: String,
|
||||
pub s3_access_key: String,
|
||||
pub s3_secret_key: String,
|
||||
pub s3_max_in_flight_parts: NonZeroUsize,
|
||||
pub s3_compression_level: u32,
|
||||
pub s3_signature_duration: Duration,
|
||||
pub s3_multipart_part_size: u64,
|
||||
}
|
||||
|
||||
/// By default use only 1 thread for indexing in tests
|
||||
#[cfg(test)]
|
||||
pub fn default_thread_pool_and_threads() -> (ThreadPoolNoAbort, Option<usize>) {
|
||||
@@ -76,6 +94,7 @@ impl Default for IndexerConfig {
|
||||
experimental_no_edition_2024_for_dumps: false,
|
||||
experimental_no_edition_2024_for_prefix_post_processing: false,
|
||||
experimental_no_edition_2024_for_facet_post_processing: false,
|
||||
s3_snapshot_options: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ pub use self::concurrent_available_ids::ConcurrentAvailableIds;
|
||||
pub use self::facet::bulk::FacetsUpdateBulk;
|
||||
pub use self::facet::incremental::FacetsUpdateIncrementalInner;
|
||||
pub use self::index_documents::{request_threads, *};
|
||||
pub use self::indexer_config::{default_thread_pool_and_threads, IndexerConfig};
|
||||
pub use self::indexer_config::{default_thread_pool_and_threads, IndexerConfig, S3SnapshotOptions};
|
||||
pub use self::new::ChannelCongestion;
|
||||
pub use self::settings::{validate_embedding_settings, Setting, Settings};
|
||||
pub use self::update_step::UpdateIndexingStep;
|
||||
|
||||
Reference in New Issue
Block a user