mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-11-26 15:59:10 +00:00
Compare commits
6 Commits
prototype-
...
prototype-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7d7cb4c3d1 | ||
|
|
a1d35d9e52 | ||
|
|
9f17ab51d4 | ||
|
|
5ecb4eb79e | ||
|
|
0a91c091c6 | ||
|
|
47a15fbe24 |
93
Cargo.lock
generated
93
Cargo.lock
generated
@@ -1123,7 +1123,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "159fa412eae48a1d94d0b9ecdb85c97ce56eb2a347c62394d3fdbf221adabc1a"
|
||||
dependencies = [
|
||||
"path-matchers",
|
||||
"path-slash 0.1.5",
|
||||
"path-slash",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2838,9 +2838,9 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
|
||||
|
||||
[[package]]
|
||||
name = "heed"
|
||||
version = "0.22.1-nested-rtxns-2"
|
||||
version = "0.22.1-nested-rtxns"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a644ab0b1e8234a7c82e83e26b24af1e4a4b8317629fa6ccd9ea2608c1595c73"
|
||||
checksum = "0ff115ba5712b1f1fc7617b195f5c2f139e29c397ff79da040cd19db75ccc240"
|
||||
dependencies = [
|
||||
"bitflags 2.9.4",
|
||||
"byteorder",
|
||||
@@ -3242,7 +3242,6 @@ dependencies = [
|
||||
"bumpalo",
|
||||
"bumparaw-collections",
|
||||
"byte-unit",
|
||||
"bytes",
|
||||
"convert_case 0.8.0",
|
||||
"crossbeam-channel",
|
||||
"csv",
|
||||
@@ -3259,19 +3258,14 @@ dependencies = [
|
||||
"meilisearch-types",
|
||||
"memmap2",
|
||||
"page_size",
|
||||
"path-slash 0.2.1",
|
||||
"rayon",
|
||||
"reqwest",
|
||||
"roaring 0.10.12",
|
||||
"rusty-s3",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"synchronoise",
|
||||
"tar",
|
||||
"tempfile",
|
||||
"thiserror 2.0.16",
|
||||
"time",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"ureq",
|
||||
"uuid",
|
||||
@@ -3471,30 +3465,6 @@ dependencies = [
|
||||
"regex",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "jiff"
|
||||
version = "0.2.15"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "be1f93b8b1eb69c77f24bbb0afdf66f54b632ee39af40ca21c4365a1d7347e49"
|
||||
dependencies = [
|
||||
"jiff-static",
|
||||
"log",
|
||||
"portable-atomic",
|
||||
"portable-atomic-util",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "jiff-static"
|
||||
version = "0.2.15"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "03343451ff899767262ec32146f6d559dd759fdadf42ff0e227c7c48f72594b4"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.106",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "jobserver"
|
||||
version = "0.1.34"
|
||||
@@ -4018,16 +3988,6 @@ version = "1.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d"
|
||||
|
||||
[[package]]
|
||||
name = "md-5"
|
||||
version = "0.10.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"digest",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "md5"
|
||||
version = "0.7.0"
|
||||
@@ -4785,12 +4745,6 @@ version = "0.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "498a099351efa4becc6a19c72aa9270598e8fd274ca47052e37455241c88b696"
|
||||
|
||||
[[package]]
|
||||
name = "path-slash"
|
||||
version = "0.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1e91099d4268b0e11973f036e885d652fb0b21fedcf69738c627f94db6a44f42"
|
||||
|
||||
[[package]]
|
||||
name = "pbkdf2"
|
||||
version = "0.12.2"
|
||||
@@ -5008,15 +4962,6 @@ version = "1.11.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483"
|
||||
|
||||
[[package]]
|
||||
name = "portable-atomic-util"
|
||||
version = "0.2.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507"
|
||||
dependencies = [
|
||||
"portable-atomic",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "potential_utf"
|
||||
version = "0.1.3"
|
||||
@@ -5194,16 +5139,6 @@ dependencies = [
|
||||
"version_check",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quick-xml"
|
||||
version = "0.38.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "42a232e7487fc2ef313d96dde7948e7a3c05101870d8985e4fd8d26aedd27b89"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quinn"
|
||||
version = "0.11.9"
|
||||
@@ -5479,7 +5414,6 @@ dependencies = [
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"h2 0.4.12",
|
||||
"http 1.3.1",
|
||||
"http-body",
|
||||
"http-body-util",
|
||||
@@ -5770,25 +5704,6 @@ version = "1.0.22"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d"
|
||||
|
||||
[[package]]
|
||||
name = "rusty-s3"
|
||||
version = "0.8.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fac2edd2f0b56bd79a7343f49afc01c2d41010df480538a510e0abc56044f66c"
|
||||
dependencies = [
|
||||
"base64 0.22.1",
|
||||
"hmac",
|
||||
"jiff",
|
||||
"md-5",
|
||||
"percent-encoding",
|
||||
"quick-xml",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sha2",
|
||||
"url",
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ryu"
|
||||
version = "1.0.20"
|
||||
@@ -6267,7 +6182,7 @@ checksum = "f9c425c07353535ef55b45420f5a8b0a397cd9bc3d7e5236497ca0d90604aa9b"
|
||||
dependencies = [
|
||||
"change-detection",
|
||||
"mime_guess",
|
||||
"path-slash 0.1.5",
|
||||
"path-slash",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -60,7 +60,7 @@ impl FileStore {
|
||||
|
||||
/// Returns the file corresponding to the requested uuid.
|
||||
pub fn get_update(&self, uuid: Uuid) -> Result<StdFile> {
|
||||
let path = self.update_path(uuid);
|
||||
let path = self.get_update_path(uuid);
|
||||
let file = match StdFile::open(path) {
|
||||
Ok(file) => file,
|
||||
Err(e) => {
|
||||
@@ -72,7 +72,7 @@ impl FileStore {
|
||||
}
|
||||
|
||||
/// Returns the path that correspond to this uuid, the path could not exists.
|
||||
pub fn update_path(&self, uuid: Uuid) -> PathBuf {
|
||||
pub fn get_update_path(&self, uuid: Uuid) -> PathBuf {
|
||||
self.path.join(uuid.to_string())
|
||||
}
|
||||
|
||||
|
||||
@@ -14,7 +14,6 @@ license.workspace = true
|
||||
anyhow = "1.0.98"
|
||||
bincode = "1.3.3"
|
||||
byte-unit = "5.1.6"
|
||||
bytes = "1.10.1"
|
||||
bumpalo = "3.18.1"
|
||||
bumparaw-collections = "0.1.4"
|
||||
convert_case = "0.8.0"
|
||||
@@ -29,12 +28,10 @@ meilisearch-auth = { path = "../meilisearch-auth" }
|
||||
meilisearch-types = { path = "../meilisearch-types" }
|
||||
memmap2 = "0.9.7"
|
||||
page_size = "0.6.0"
|
||||
path-slash = "0.2.1"
|
||||
rayon = "1.10.0"
|
||||
roaring = { version = "0.10.12", features = ["serde"] }
|
||||
serde = { version = "1.0.219", features = ["derive"] }
|
||||
serde_json = { version = "1.0.140", features = ["preserve_order"] }
|
||||
tar = "0.4.44"
|
||||
synchronoise = "1.0.1"
|
||||
tempfile = "3.20.0"
|
||||
thiserror = "2.0.12"
|
||||
@@ -48,9 +45,6 @@ tracing = "0.1.41"
|
||||
ureq = "2.12.1"
|
||||
uuid = { version = "1.17.0", features = ["serde", "v4"] }
|
||||
backoff = "0.4.0"
|
||||
reqwest = { version = "0.12.23", features = ["rustls-tls", "http2"], default-features = false }
|
||||
rusty-s3 = "0.8.1"
|
||||
tokio = { version = "1.47.1", features = ["full"] }
|
||||
|
||||
[dev-dependencies]
|
||||
big_s = "1.0.2"
|
||||
|
||||
@@ -36,7 +36,6 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
|
||||
run_loop_iteration: _,
|
||||
embedders: _,
|
||||
chat_settings: _,
|
||||
runtime: _,
|
||||
} = scheduler;
|
||||
|
||||
let rtxn = env.read_txn().unwrap();
|
||||
|
||||
@@ -216,8 +216,6 @@ pub struct IndexScheduler {
|
||||
/// A counter that is incremented before every call to [`tick`](IndexScheduler::tick)
|
||||
#[cfg(test)]
|
||||
run_loop_iteration: Arc<RwLock<usize>>,
|
||||
|
||||
runtime: Option<tokio::runtime::Handle>,
|
||||
}
|
||||
|
||||
impl IndexScheduler {
|
||||
@@ -244,7 +242,6 @@ impl IndexScheduler {
|
||||
run_loop_iteration: self.run_loop_iteration.clone(),
|
||||
features: self.features.clone(),
|
||||
chat_settings: self.chat_settings,
|
||||
runtime: self.runtime.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -263,7 +260,6 @@ impl IndexScheduler {
|
||||
options: IndexSchedulerOptions,
|
||||
auth_env: Env<WithoutTls>,
|
||||
from_db_version: (u32, u32, u32),
|
||||
runtime: Option<tokio::runtime::Handle>,
|
||||
#[cfg(test)] test_breakpoint_sdr: crossbeam_channel::Sender<(test_utils::Breakpoint, bool)>,
|
||||
#[cfg(test)] planned_failures: Vec<(usize, test_utils::FailureLocation)>,
|
||||
) -> Result<Self> {
|
||||
@@ -345,7 +341,6 @@ impl IndexScheduler {
|
||||
run_loop_iteration: Arc::new(RwLock::new(0)),
|
||||
features,
|
||||
chat_settings,
|
||||
runtime,
|
||||
};
|
||||
|
||||
this.run();
|
||||
|
||||
@@ -1,20 +1,11 @@
|
||||
use std::collections::VecDeque;
|
||||
use std::env::VarError;
|
||||
use std::ffi::OsStr;
|
||||
use std::fs;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::time::Duration;
|
||||
|
||||
use meilisearch_types::heed::CompactionOption;
|
||||
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
|
||||
use meilisearch_types::tasks::{Status, Task};
|
||||
use meilisearch_types::{compression, VERSION_FILE_NAME};
|
||||
use path_slash::PathBufExt;
|
||||
use reqwest::header::ETAG;
|
||||
use reqwest::Client;
|
||||
use rusty_s3::actions::{CreateMultipartUpload, S3Action as _};
|
||||
use rusty_s3::{Bucket, Credentials, UrlStyle};
|
||||
|
||||
use crate::heed::EnvOpenOptions;
|
||||
use crate::processing::{AtomicUpdateFileStep, SnapshotCreationProgress};
|
||||
@@ -87,73 +78,10 @@ impl IndexScheduler {
|
||||
pub(super) fn process_snapshot(
|
||||
&self,
|
||||
progress: Progress,
|
||||
tasks: Vec<Task>,
|
||||
mut tasks: Vec<Task>,
|
||||
) -> Result<Vec<Task>> {
|
||||
progress.update_progress(SnapshotCreationProgress::StartTheSnapshotCreation);
|
||||
|
||||
const S3_BUCKET_URL: &str = "MEILI_S3_BUCKET_URL";
|
||||
const S3_BUCKET_REGION: &str = "MEILI_S3_BUCKET_REGION";
|
||||
const S3_BUCKET_NAME: &str = "MEILI_S3_BUCKET_NAME";
|
||||
const S3_SNAPSHOT_PREFIX: &str = "MEILI_S3_SNAPSHOT_PREFIX";
|
||||
const S3_ACCESS_KEY: &str = "MEILI_S3_ACCESS_KEY";
|
||||
const S3_SECRET_KEY: &str = "MEILI_S3_SECRET_KEY";
|
||||
|
||||
let bucket_url = std::env::var(S3_BUCKET_URL).map_err(|e| (S3_BUCKET_URL, e));
|
||||
let bucket_region = std::env::var(S3_BUCKET_REGION).map_err(|e| (S3_BUCKET_REGION, e));
|
||||
let bucket_name = std::env::var(S3_BUCKET_NAME).map_err(|e| (S3_BUCKET_NAME, e));
|
||||
let snapshot_prefix =
|
||||
std::env::var(S3_SNAPSHOT_PREFIX).map_err(|e| (S3_SNAPSHOT_PREFIX, e));
|
||||
let access_key = std::env::var(S3_ACCESS_KEY).map_err(|e| (S3_ACCESS_KEY, e));
|
||||
let secret_key = std::env::var(S3_SECRET_KEY).map_err(|e| (S3_SECRET_KEY, e));
|
||||
match (bucket_url, bucket_region, bucket_name, snapshot_prefix, access_key, secret_key) {
|
||||
(
|
||||
Ok(bucket_url),
|
||||
Ok(bucket_region),
|
||||
Ok(bucket_name),
|
||||
Ok(snapshot_prefix),
|
||||
Ok(access_key),
|
||||
Ok(secret_key),
|
||||
) => {
|
||||
let runtime = self.runtime.as_ref().expect("Runtime not initialized");
|
||||
#[cfg(not(unix))]
|
||||
panic!("Non-unix platform does not support S3 snapshotting");
|
||||
#[cfg(unix)]
|
||||
runtime.block_on(self.process_snapshot_to_s3(
|
||||
progress,
|
||||
bucket_url,
|
||||
bucket_region,
|
||||
bucket_name,
|
||||
PathBuf::from_slash(snapshot_prefix),
|
||||
access_key,
|
||||
secret_key,
|
||||
tasks,
|
||||
))
|
||||
}
|
||||
(
|
||||
Err((_, VarError::NotPresent)),
|
||||
Err((_, VarError::NotPresent)),
|
||||
Err((_, VarError::NotPresent)),
|
||||
Err((_, VarError::NotPresent)),
|
||||
Err((_, VarError::NotPresent)),
|
||||
Err((_, VarError::NotPresent)),
|
||||
) => self.process_snapshots_to_disk(progress, tasks),
|
||||
(Err((var, e)), _, _, _, _, _)
|
||||
| (_, Err((var, e)), _, _, _, _)
|
||||
| (_, _, Err((var, e)), _, _, _)
|
||||
| (_, _, _, Err((var, e)), _, _)
|
||||
| (_, _, _, _, Err((var, e)), _)
|
||||
| (_, _, _, _, _, Err((var, e))) => {
|
||||
// TODO: Handle error gracefully
|
||||
panic!("Error while reading environment variables: {}: {}", var, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn process_snapshots_to_disk(
|
||||
&self,
|
||||
progress: Progress,
|
||||
mut tasks: Vec<Task>,
|
||||
) -> Result<Vec<Task>, Error> {
|
||||
fs::create_dir_all(&self.scheduler.snapshots_path)?;
|
||||
let temp_snapshot_dir = tempfile::tempdir()?;
|
||||
|
||||
@@ -212,7 +140,7 @@ impl IndexScheduler {
|
||||
let task =
|
||||
self.queue.tasks.get_task(&rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
|
||||
if let Some(content_uuid) = task.content_uuid() {
|
||||
let src = self.queue.file_store.update_path(content_uuid);
|
||||
let src = self.queue.file_store.get_update_path(content_uuid);
|
||||
let dst = update_files_dir.join(content_uuid.to_string());
|
||||
fs::copy(src, dst)?;
|
||||
}
|
||||
@@ -278,292 +206,4 @@ impl IndexScheduler {
|
||||
|
||||
Ok(tasks)
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
pub(super) async fn process_snapshot_to_s3(
|
||||
&self,
|
||||
progress: Progress,
|
||||
bucket_url: String,
|
||||
bucket_region: String,
|
||||
bucket_name: String,
|
||||
snapshot_prefix: PathBuf,
|
||||
access_key: String,
|
||||
secret_key: String,
|
||||
mut tasks: Vec<Task>,
|
||||
) -> Result<Vec<Task>> {
|
||||
use std::fs::File;
|
||||
use std::io::{self, Seek as _, SeekFrom, Write as _};
|
||||
use std::os::fd::OwnedFd;
|
||||
use std::path::Path;
|
||||
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use reqwest::Response;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
const ONE_HOUR: Duration = Duration::from_secs(3600);
|
||||
// default part size is 250MiB
|
||||
// TODO use 375MiB
|
||||
// It must be at least 2x5MiB
|
||||
const PART_SIZE: usize = 10 * 1024 * 1024;
|
||||
|
||||
// The maximum number of parts that can be uploaded in parallel.
|
||||
const S3_MAX_IN_FLIGHT_PARTS: &str = "MEILI_S3_MAX_IN_FLIGHT_PARTS";
|
||||
let max_in_flight_parts: usize = match std::env::var(S3_MAX_IN_FLIGHT_PARTS) {
|
||||
Ok(val) => val.parse().expect("Failed to parse MEILI_S3_MAX_IN_FLIGHT_PARTS"),
|
||||
Err(VarError::NotPresent) => 10,
|
||||
Err(e) => panic!("Failed to read {}: {}", S3_MAX_IN_FLIGHT_PARTS, e),
|
||||
};
|
||||
|
||||
// The compression level, defaults to no compression (0)
|
||||
const S3_COMPRESSION_LEVEL: &str = "MEILI_S3_COMPRESSION_LEVEL";
|
||||
let level: u32 = match std::env::var(S3_COMPRESSION_LEVEL) {
|
||||
Ok(val) => val.parse().expect("Failed to parse MEILI_S3_COMPRESSION_LEVEL"),
|
||||
Err(VarError::NotPresent) => 0,
|
||||
Err(e) => panic!("Failed to read {}: {}", S3_COMPRESSION_LEVEL, e),
|
||||
};
|
||||
|
||||
let client = Client::new();
|
||||
// TODO Remove this unwrap
|
||||
let url = bucket_url.parse().unwrap();
|
||||
let bucket = Bucket::new(url, UrlStyle::Path, bucket_name, bucket_region).unwrap();
|
||||
let credential = Credentials::new(access_key, secret_key);
|
||||
// TODO change this and use the database name like in the original version
|
||||
let object_path = snapshot_prefix.join("data.ms.snapshot");
|
||||
let object = object_path.to_slash().expect("Invalid UTF-8 path").into_owned();
|
||||
|
||||
eprintln!("Starting the upload of the snapshot to {object}");
|
||||
|
||||
// TODO implement exponential backoff on upload requests: https://docs.rs/backoff
|
||||
// TODO return a result with actual errors
|
||||
// TODO sign for longer than an hour?
|
||||
// NOTE to make it work on Windows we could try using std::io::pipe instead.
|
||||
// However, we are still using the tokio unix pipe in the async upload loop.
|
||||
let (reader, writer) = std::io::pipe()?;
|
||||
let uploader_task = tokio::spawn(async move {
|
||||
let reader = OwnedFd::from(reader);
|
||||
let reader = tokio::net::unix::pipe::Receiver::from_owned_fd(reader)?;
|
||||
let action = bucket.create_multipart_upload(Some(&credential), &object);
|
||||
// TODO Question: If it is only signed for an hour and a snapshot takes longer than an hour, what happens?
|
||||
// If the part is deleted (like a TTL) we should sign it for at least 24 hours.
|
||||
let url = action.sign(ONE_HOUR);
|
||||
let resp = client.post(url).send().await.unwrap().error_for_status().unwrap();
|
||||
let body = resp.text().await.unwrap();
|
||||
|
||||
let multipart = CreateMultipartUpload::parse_response(&body).unwrap();
|
||||
let mut etags = Vec::<String>::new();
|
||||
let mut in_flight =
|
||||
VecDeque::<(JoinHandle<reqwest::Result<Response>>, Bytes)>::with_capacity(
|
||||
max_in_flight_parts,
|
||||
);
|
||||
|
||||
for part_number in 1u16.. {
|
||||
let part_upload = bucket.upload_part(
|
||||
Some(&credential),
|
||||
&object,
|
||||
part_number,
|
||||
multipart.upload_id(),
|
||||
);
|
||||
let url = part_upload.sign(ONE_HOUR);
|
||||
|
||||
// Wait for a buffer to be ready if there are in-flight parts that landed
|
||||
let mut buffer = if in_flight.len() >= max_in_flight_parts {
|
||||
let (request, buffer) = in_flight.pop_front().unwrap();
|
||||
let resp = request.await.unwrap().unwrap().error_for_status().unwrap();
|
||||
let etag =
|
||||
resp.headers().get(ETAG).expect("every UploadPart request returns an Etag");
|
||||
let mut buffer = match buffer.try_into_mut() {
|
||||
Ok(buffer) => buffer,
|
||||
Err(_) => panic!("Valid to convert into BytesMut"),
|
||||
};
|
||||
// TODO use bumpalo to reduce the number of allocations
|
||||
etags.push(etag.to_str().unwrap().to_owned());
|
||||
buffer.clear();
|
||||
buffer
|
||||
} else {
|
||||
// TODO Base this on the available memory
|
||||
BytesMut::with_capacity(PART_SIZE)
|
||||
};
|
||||
|
||||
while buffer.len() < (PART_SIZE / 2) {
|
||||
eprintln!(
|
||||
"buffer is {:.2}% full, trying to read more",
|
||||
buffer.len() as f32 / buffer.capacity() as f32 * 100.0
|
||||
);
|
||||
|
||||
// Wait for the pipe to be readable
|
||||
reader.readable().await?;
|
||||
|
||||
match reader.try_read_buf(&mut buffer) {
|
||||
Ok(0) => break,
|
||||
// We read some bytes but maybe not enough
|
||||
Ok(n) => {
|
||||
eprintln!("Read {} bytes from pipe, continuing", n);
|
||||
continue;
|
||||
}
|
||||
// The readiness event is a false positive.
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||
eprintln!("received a WouldBlock");
|
||||
continue;
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
}
|
||||
}
|
||||
|
||||
eprintln!(
|
||||
"buffer is {:.2}% full",
|
||||
buffer.len() as f32 / buffer.capacity() as f32 * 100.0
|
||||
);
|
||||
|
||||
if buffer.is_empty() {
|
||||
eprintln!("buffer is empty, breaking part number loop");
|
||||
// Break the loop if the buffer is empty
|
||||
// after we tried to read bytes
|
||||
break;
|
||||
}
|
||||
|
||||
let body = buffer.freeze();
|
||||
eprintln!("Sending part {}", part_number);
|
||||
let task = tokio::spawn(client.put(url).body(body.clone()).send());
|
||||
in_flight.push_back((task, body));
|
||||
}
|
||||
|
||||
for (join_handle, _buffer) in in_flight {
|
||||
let resp = join_handle.await.unwrap().unwrap().error_for_status().unwrap();
|
||||
let etag =
|
||||
resp.headers().get(ETAG).expect("every UploadPart request returns an Etag");
|
||||
// TODO use bumpalo to reduce the number of allocations
|
||||
etags.push(etag.to_str().unwrap().to_owned());
|
||||
}
|
||||
|
||||
eprintln!("Finalizing the multipart upload");
|
||||
|
||||
let action = bucket.complete_multipart_upload(
|
||||
Some(&credential),
|
||||
&object,
|
||||
multipart.upload_id(),
|
||||
etags.iter().map(AsRef::as_ref),
|
||||
);
|
||||
let url = action.sign(ONE_HOUR);
|
||||
let resp = client.post(url).body(action.body()).send().await.unwrap();
|
||||
let status = resp.status();
|
||||
let text = resp.text().await.unwrap();
|
||||
eprintln!("Status: {status}, Text: {text}");
|
||||
|
||||
// TODO do a better check and do not assert
|
||||
// assert!(resp.status().is_success());
|
||||
|
||||
Result::<_, Error>::Ok(())
|
||||
});
|
||||
|
||||
// TODO not a big fan of this clone
|
||||
// remove it and get all the necessary data from the scheduler
|
||||
let index_scheduler = IndexScheduler::private_clone(self);
|
||||
let builder_task = tokio::task::spawn_blocking(move || {
|
||||
// NOTE enabling compression still generates a corrupted tarball
|
||||
let writer = flate2::write::GzEncoder::new(writer, flate2::Compression::new(level));
|
||||
let mut tarball = tar::Builder::new(writer);
|
||||
|
||||
// 1. Snapshot the version file
|
||||
tarball.append_path_with_name(
|
||||
&index_scheduler.scheduler.version_file_path,
|
||||
VERSION_FILE_NAME,
|
||||
)?;
|
||||
|
||||
// 2. Snapshot the index scheduler LMDB env
|
||||
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler);
|
||||
let mut tasks_env_file = index_scheduler.env.try_clone_inner_file()?;
|
||||
// NOTE That made the trick !!! Should I memory map instead?
|
||||
tasks_env_file.seek(SeekFrom::Start(0))?;
|
||||
let path = Path::new("tasks").join("data.mdb");
|
||||
// NOTE when commenting this line, the tarball works better
|
||||
tarball.append_file(path, &mut tasks_env_file)?;
|
||||
drop(tasks_env_file);
|
||||
|
||||
// 2.3 Create a read transaction on the index-scheduler
|
||||
let rtxn = index_scheduler.env.read_txn()?;
|
||||
|
||||
// 2.4 Create the update files directory
|
||||
// And only copy the update files of the enqueued tasks
|
||||
progress.update_progress(SnapshotCreationProgress::SnapshotTheUpdateFiles);
|
||||
let enqueued = index_scheduler.queue.tasks.get_status(&rtxn, Status::Enqueued)?;
|
||||
let (atomic, update_file_progress) = AtomicUpdateFileStep::new(enqueued.len() as u32);
|
||||
progress.update_progress(update_file_progress);
|
||||
// TODO I need to create the update files directory (even if empty)
|
||||
// I should probably simply use the append_dir_all method
|
||||
// but I'll loose the progression.
|
||||
let update_files_dir = Path::new("update_files");
|
||||
for task_id in enqueued {
|
||||
let task = index_scheduler
|
||||
.queue
|
||||
.tasks
|
||||
.get_task(&rtxn, task_id)?
|
||||
.ok_or(Error::CorruptedTaskQueue)?;
|
||||
if let Some(content_uuid) = task.content_uuid() {
|
||||
let src = index_scheduler.queue.file_store.update_path(content_uuid);
|
||||
let mut update_file = File::open(src)?;
|
||||
let path = update_files_dir.join(content_uuid.to_string());
|
||||
tarball.append_file(path, &mut update_file)?;
|
||||
}
|
||||
atomic.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
// 3. Snapshot every indexes
|
||||
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexes);
|
||||
let index_mapping = index_scheduler.index_mapper.index_mapping;
|
||||
let nb_indexes = index_mapping.len(&rtxn)? as u32;
|
||||
let indexes_dir = Path::new("indexes");
|
||||
let indexes_references: Vec<_> = index_scheduler
|
||||
.index_mapper
|
||||
.index_mapping
|
||||
.iter(&rtxn)?
|
||||
.map(|res| res.map_err(Error::from).map(|(name, uuid)| (name.to_string(), uuid)))
|
||||
.collect::<Result<_, Error>>()?;
|
||||
|
||||
dbg!(&indexes_references);
|
||||
|
||||
// Note that we need to collect and open all of the indexes files because
|
||||
// otherwise, using a for loop, we would have to have a Send rtxn.
|
||||
// TODO I don't need to do this trick if my writer is NOT async
|
||||
for (i, (name, uuid)) in indexes_references.into_iter().enumerate() {
|
||||
progress.update_progress(VariableNameStep::<SnapshotCreationProgress>::new(
|
||||
&name, i as u32, nb_indexes,
|
||||
));
|
||||
let path = indexes_dir.join(uuid.to_string()).join("data.mdb");
|
||||
let index = index_scheduler.index_mapper.index(&rtxn, &name)?;
|
||||
let mut index_file = index.try_clone_inner_file().unwrap();
|
||||
index_file.seek(SeekFrom::Start(0))?;
|
||||
eprintln!("Appending index file for {} in {}", name, path.display());
|
||||
tarball.append_file(path, &mut index_file)?;
|
||||
}
|
||||
|
||||
drop(rtxn);
|
||||
|
||||
// 4. Snapshot the auth LMDB env
|
||||
progress.update_progress(SnapshotCreationProgress::SnapshotTheApiKeys);
|
||||
let mut auth_env_file =
|
||||
index_scheduler.scheduler.auth_env.try_clone_inner_file().unwrap();
|
||||
auth_env_file.seek(SeekFrom::Start(0))?;
|
||||
let path = Path::new("auth").join("data.mdb");
|
||||
tarball.append_file(path, &mut auth_env_file)?;
|
||||
|
||||
let mut gzencoder = tarball.into_inner()?;
|
||||
gzencoder.flush()?;
|
||||
gzencoder.try_finish()?;
|
||||
let mut writer = gzencoder.finish()?;
|
||||
writer.flush()?;
|
||||
|
||||
Result::<_, Error>::Ok(())
|
||||
});
|
||||
|
||||
let (uploader_result, builder_result) = tokio::join!(uploader_task, builder_task);
|
||||
|
||||
builder_result.unwrap()?;
|
||||
uploader_result.unwrap()?;
|
||||
|
||||
for task in &mut tasks {
|
||||
task.status = Status::Succeeded;
|
||||
}
|
||||
|
||||
Ok(tasks)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -126,7 +126,7 @@ impl IndexScheduler {
|
||||
std::fs::create_dir_all(&options.auth_path).unwrap();
|
||||
let auth_env = open_auth_store_env(&options.auth_path).unwrap();
|
||||
let index_scheduler =
|
||||
Self::new(options, auth_env, version, None, sender, planned_failures).unwrap();
|
||||
Self::new(options, auth_env, version, sender, planned_failures).unwrap();
|
||||
|
||||
// To be 100% consistent between all test we're going to start the scheduler right now
|
||||
// and ensure it's in the expected starting state.
|
||||
|
||||
@@ -216,10 +216,7 @@ enum OnFailure {
|
||||
KeepDb,
|
||||
}
|
||||
|
||||
pub fn setup_meilisearch(
|
||||
opt: &Opt,
|
||||
handle: tokio::runtime::Handle,
|
||||
) -> anyhow::Result<(Arc<IndexScheduler>, Arc<AuthController>)> {
|
||||
pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<AuthController>)> {
|
||||
let index_scheduler_opt = IndexSchedulerOptions {
|
||||
version_file_path: opt.db_path.join(VERSION_FILE_NAME),
|
||||
auth_path: opt.db_path.join("auth"),
|
||||
@@ -259,7 +256,6 @@ pub fn setup_meilisearch(
|
||||
index_scheduler_opt,
|
||||
OnFailure::RemoveDb,
|
||||
binary_version, // the db is empty
|
||||
handle,
|
||||
)?,
|
||||
Err(e) => {
|
||||
std::fs::remove_dir_all(&opt.db_path)?;
|
||||
@@ -277,7 +273,7 @@ pub fn setup_meilisearch(
|
||||
bail!("snapshot doesn't exist at {}", snapshot_path.display())
|
||||
// the snapshot and the db exist, and we can ignore the snapshot because of the ignore_snapshot_if_db_exists flag
|
||||
} else {
|
||||
open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version, handle)?
|
||||
open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version)?
|
||||
}
|
||||
} else if let Some(ref path) = opt.import_dump {
|
||||
let src_path_exists = path.exists();
|
||||
@@ -288,7 +284,6 @@ pub fn setup_meilisearch(
|
||||
index_scheduler_opt,
|
||||
OnFailure::RemoveDb,
|
||||
binary_version, // the db is empty
|
||||
handle,
|
||||
)?;
|
||||
match import_dump(&opt.db_path, path, &mut index_scheduler, &mut auth_controller) {
|
||||
Ok(()) => (index_scheduler, auth_controller),
|
||||
@@ -309,10 +304,10 @@ pub fn setup_meilisearch(
|
||||
// the dump and the db exist and we can ignore the dump because of the ignore_dump_if_db_exists flag
|
||||
// or, the dump is missing but we can ignore that because of the ignore_missing_dump flag
|
||||
} else {
|
||||
open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version, handle)?
|
||||
open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version)?
|
||||
}
|
||||
} else {
|
||||
open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version, handle)?
|
||||
open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version)?
|
||||
};
|
||||
|
||||
// We create a loop in a thread that registers snapshotCreation tasks
|
||||
@@ -343,7 +338,6 @@ fn open_or_create_database_unchecked(
|
||||
index_scheduler_opt: IndexSchedulerOptions,
|
||||
on_failure: OnFailure,
|
||||
version: (u32, u32, u32),
|
||||
handle: tokio::runtime::Handle,
|
||||
) -> anyhow::Result<(IndexScheduler, AuthController)> {
|
||||
// we don't want to create anything in the data.ms yet, thus we
|
||||
// wrap our two builders in a closure that'll be executed later.
|
||||
@@ -351,7 +345,7 @@ fn open_or_create_database_unchecked(
|
||||
let auth_env = open_auth_store_env(&index_scheduler_opt.auth_path).unwrap();
|
||||
let auth_controller = AuthController::new(auth_env.clone(), &opt.master_key);
|
||||
let index_scheduler_builder = || -> anyhow::Result<_> {
|
||||
Ok(IndexScheduler::new(index_scheduler_opt, auth_env, version, Some(handle))?)
|
||||
Ok(IndexScheduler::new(index_scheduler_opt, auth_env, version)?)
|
||||
};
|
||||
|
||||
match (
|
||||
@@ -458,7 +452,6 @@ fn open_or_create_database(
|
||||
index_scheduler_opt: IndexSchedulerOptions,
|
||||
empty_db: bool,
|
||||
binary_version: (u32, u32, u32),
|
||||
handle: tokio::runtime::Handle,
|
||||
) -> anyhow::Result<(IndexScheduler, AuthController)> {
|
||||
let version = if !empty_db {
|
||||
check_version(opt, &index_scheduler_opt, binary_version)?
|
||||
@@ -466,7 +459,7 @@ fn open_or_create_database(
|
||||
binary_version
|
||||
};
|
||||
|
||||
open_or_create_database_unchecked(opt, index_scheduler_opt, OnFailure::KeepDb, version, handle)
|
||||
open_or_create_database_unchecked(opt, index_scheduler_opt, OnFailure::KeepDb, version)
|
||||
}
|
||||
|
||||
fn import_dump(
|
||||
|
||||
@@ -76,10 +76,7 @@ fn on_panic(info: &std::panic::PanicHookInfo) {
|
||||
|
||||
#[actix_web::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
// won't panic inside of tokio::main
|
||||
let runtime = tokio::runtime::Handle::current();
|
||||
|
||||
try_main(runtime).await.inspect_err(|error| {
|
||||
try_main().await.inspect_err(|error| {
|
||||
tracing::error!(%error);
|
||||
let mut current = error.source();
|
||||
let mut depth = 0;
|
||||
@@ -91,7 +88,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
})
|
||||
}
|
||||
|
||||
async fn try_main(runtime: tokio::runtime::Handle) -> anyhow::Result<()> {
|
||||
async fn try_main() -> anyhow::Result<()> {
|
||||
let (opt, config_read_from) = Opt::try_build()?;
|
||||
|
||||
std::panic::set_hook(Box::new(on_panic));
|
||||
@@ -125,7 +122,7 @@ async fn try_main(runtime: tokio::runtime::Handle) -> anyhow::Result<()> {
|
||||
_ => (),
|
||||
}
|
||||
|
||||
let (index_scheduler, auth_controller) = setup_meilisearch(&opt, runtime)?;
|
||||
let (index_scheduler, auth_controller) = setup_meilisearch(&opt)?;
|
||||
|
||||
let analytics =
|
||||
analytics::Analytics::new(&opt, index_scheduler.clone(), auth_controller.clone()).await;
|
||||
|
||||
@@ -49,8 +49,8 @@ impl Server<Owned> {
|
||||
}
|
||||
|
||||
let options = default_settings(dir.path());
|
||||
let handle = tokio::runtime::Handle::current();
|
||||
let (index_scheduler, auth) = setup_meilisearch(&options, handle).unwrap();
|
||||
|
||||
let (index_scheduler, auth) = setup_meilisearch(&options).unwrap();
|
||||
let service = Service { index_scheduler, auth, options, api_key: None };
|
||||
|
||||
Server { service, _dir: Some(dir), _marker: PhantomData }
|
||||
@@ -65,9 +65,7 @@ impl Server<Owned> {
|
||||
|
||||
options.master_key = Some("MASTER_KEY".to_string());
|
||||
|
||||
let handle = tokio::runtime::Handle::current();
|
||||
|
||||
let (index_scheduler, auth) = setup_meilisearch(&options, handle).unwrap();
|
||||
let (index_scheduler, auth) = setup_meilisearch(&options).unwrap();
|
||||
let service = Service { index_scheduler, auth, options, api_key: None };
|
||||
|
||||
Server { service, _dir: Some(dir), _marker: PhantomData }
|
||||
@@ -80,9 +78,7 @@ impl Server<Owned> {
|
||||
}
|
||||
|
||||
pub async fn new_with_options(options: Opt) -> Result<Self, anyhow::Error> {
|
||||
let handle = tokio::runtime::Handle::current();
|
||||
|
||||
let (index_scheduler, auth) = setup_meilisearch(&options, handle)?;
|
||||
let (index_scheduler, auth) = setup_meilisearch(&options)?;
|
||||
let service = Service { index_scheduler, auth, options, api_key: None };
|
||||
|
||||
Ok(Server { service, _dir: None, _marker: PhantomData })
|
||||
@@ -221,9 +217,8 @@ impl Server<Shared> {
|
||||
}
|
||||
|
||||
let options = default_settings(dir.path());
|
||||
let handle = tokio::runtime::Handle::current();
|
||||
|
||||
let (index_scheduler, auth) = setup_meilisearch(&options, handle).unwrap();
|
||||
let (index_scheduler, auth) = setup_meilisearch(&options).unwrap();
|
||||
let service = Service { index_scheduler, auth, api_key: None, options };
|
||||
|
||||
Server { service, _dir: Some(dir), _marker: PhantomData }
|
||||
|
||||
@@ -68,7 +68,7 @@ fn convert_update_files(db_path: &Path) -> anyhow::Result<()> {
|
||||
|
||||
for uuid in file_store.all_uuids().context("while retrieving uuids from file store")? {
|
||||
let uuid = uuid.context("while retrieving uuid from file store")?;
|
||||
let update_file_path = file_store.update_path(uuid);
|
||||
let update_file_path = file_store.get_update_path(uuid);
|
||||
let update_file = file_store
|
||||
.get_update(uuid)
|
||||
.with_context(|| format!("while getting update file for uuid {uuid:?}"))?;
|
||||
|
||||
@@ -34,7 +34,7 @@ grenad = { version = "0.5.0", default-features = false, features = [
|
||||
"rayon",
|
||||
"tempfile",
|
||||
] }
|
||||
heed = { version = "0.22.1-nested-rtxns-2", default-features = false, features = [
|
||||
heed = { version = "0.22.1-nested-rtxns", default-features = false, features = [
|
||||
"serde-json",
|
||||
"serde-bincode",
|
||||
] }
|
||||
|
||||
@@ -425,10 +425,6 @@ impl Index {
|
||||
self.env.info().map_size
|
||||
}
|
||||
|
||||
pub fn try_clone_inner_file(&self) -> Result<File> {
|
||||
Ok(self.env.try_clone_inner_file()?)
|
||||
}
|
||||
|
||||
pub fn copy_to_file(&self, file: &mut File, option: CompactionOption) -> Result<()> {
|
||||
self.env.copy_to_file(file, option).map_err(Into::into)
|
||||
}
|
||||
|
||||
@@ -1192,12 +1192,12 @@ pub fn extract_embeddings_from_fragments<R: io::Read + io::Seek>(
|
||||
Metadata { docid, external_docid: "", extractor_id },
|
||||
value,
|
||||
unused_vectors_distribution,
|
||||
)?;
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// send last chunk
|
||||
let on_embed = session.drain(unused_vectors_distribution)?;
|
||||
let on_embed = session.drain(unused_vectors_distribution);
|
||||
on_embed.finish()
|
||||
}
|
||||
|
||||
|
||||
@@ -684,7 +684,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
|
||||
metadata,
|
||||
input,
|
||||
unused_vectors_distribution,
|
||||
)?;
|
||||
);
|
||||
}
|
||||
ExtractorDiff::Unchanged => { /* nothing to do */ }
|
||||
}
|
||||
@@ -724,7 +724,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
|
||||
if old_is_user_provided || full_reindex {
|
||||
session.on_embed_mut().clear_vectors(docid);
|
||||
}
|
||||
session.request_embedding(metadata, input, unused_vectors_distribution)?;
|
||||
session.request_embedding(metadata, input, unused_vectors_distribution);
|
||||
}
|
||||
ExtractorDiff::Unchanged => { /* do nothing */ }
|
||||
}
|
||||
@@ -764,7 +764,8 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
|
||||
document_template,
|
||||
doc_alloc,
|
||||
new_fields_ids_map,
|
||||
);
|
||||
)
|
||||
.ignore_errors();
|
||||
|
||||
update_autogenerated(
|
||||
docid,
|
||||
@@ -850,7 +851,8 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
|
||||
document_template,
|
||||
doc_alloc,
|
||||
new_fields_ids_map,
|
||||
);
|
||||
)
|
||||
.ignore_errors();
|
||||
|
||||
insert_autogenerated(
|
||||
docid,
|
||||
@@ -885,10 +887,10 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
|
||||
pub fn drain(self, unused_vectors_distribution: &UnusedVectorsDistributionBump) -> Result<()> {
|
||||
match self.kind {
|
||||
ChunkType::DocumentTemplate { document_template: _, session } => {
|
||||
session.drain(unused_vectors_distribution)?;
|
||||
session.drain(unused_vectors_distribution);
|
||||
}
|
||||
ChunkType::Fragments { fragments: _, session } => {
|
||||
session.drain(unused_vectors_distribution)?;
|
||||
session.drain(unused_vectors_distribution);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
@@ -1036,7 +1038,7 @@ where
|
||||
Metadata { docid, external_docid, extractor_id: extractor.extractor_id() };
|
||||
|
||||
if let Some(new_rendered) = new_rendered {
|
||||
session.request_embedding(metadata, new_rendered, unused_vectors_distribution)?
|
||||
session.request_embedding(metadata, new_rendered, unused_vectors_distribution)
|
||||
} else {
|
||||
// remove any existing embedding
|
||||
OnEmbed::process_embedding_response(
|
||||
@@ -1072,7 +1074,7 @@ where
|
||||
Metadata { docid, external_docid, extractor_id: extractor.extractor_id() },
|
||||
new_rendered,
|
||||
unused_vectors_distribution,
|
||||
)?;
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -91,6 +91,7 @@ struct EmbedderData {
|
||||
request: RequestData,
|
||||
response: Response,
|
||||
configuration_source: ConfigurationSource,
|
||||
max_retry_duration: std::time::Duration,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -182,10 +183,15 @@ impl Embedder {
|
||||
) -> Result<Self, NewEmbedderError> {
|
||||
let bearer = options.api_key.as_deref().map(|api_key| format!("Bearer {api_key}"));
|
||||
|
||||
let timeout = std::env::var("MEILI_EXPERIMENTAL_REST_EMBEDDER_TIMEOUT_SECONDS")
|
||||
.ok()
|
||||
.map(|p| p.parse().unwrap())
|
||||
.unwrap_or(30);
|
||||
|
||||
let client = ureq::AgentBuilder::new()
|
||||
.max_idle_connections(REQUEST_PARALLELISM * 2)
|
||||
.max_idle_connections_per_host(REQUEST_PARALLELISM * 2)
|
||||
.timeout(std::time::Duration::from_secs(30))
|
||||
.timeout(std::time::Duration::from_secs(timeout))
|
||||
.build();
|
||||
|
||||
let request = RequestData::new(
|
||||
@@ -196,6 +202,14 @@ impl Embedder {
|
||||
|
||||
let response = Response::new(options.response, &request)?;
|
||||
|
||||
let max_retry_duration =
|
||||
std::env::var("MEILI_EXPERIMENTAL_REST_EMBEDDER_MAX_RETRY_DURATION_SECONDS")
|
||||
.ok()
|
||||
.map(|p| p.parse().unwrap())
|
||||
.unwrap_or(60);
|
||||
|
||||
let max_retry_duration = std::time::Duration::from_secs(max_retry_duration);
|
||||
|
||||
let data = EmbedderData {
|
||||
client,
|
||||
bearer,
|
||||
@@ -204,6 +218,7 @@ impl Embedder {
|
||||
response,
|
||||
configuration_source,
|
||||
headers: options.headers,
|
||||
max_retry_duration,
|
||||
};
|
||||
|
||||
let dimensions = if let Some(dimensions) = options.dimensions {
|
||||
@@ -457,7 +472,7 @@ where
|
||||
}
|
||||
}?;
|
||||
|
||||
let retry_duration = retry_duration.min(std::time::Duration::from_secs(60)); // don't wait more than a minute
|
||||
let retry_duration = retry_duration.min(data.max_retry_duration); // don't wait more than the max duration
|
||||
|
||||
// randomly up to double the retry duration
|
||||
let retry_duration = retry_duration
|
||||
|
||||
@@ -5,7 +5,7 @@ use serde_json::Value;
|
||||
use super::error::EmbedError;
|
||||
use super::{Embedder, Embedding};
|
||||
use crate::progress::EmbedderStats;
|
||||
use crate::{DocumentId, Result, ThreadPoolNoAbort};
|
||||
use crate::{DocumentId, ThreadPoolNoAbort};
|
||||
type ExtractorId = u8;
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
@@ -108,32 +108,28 @@ impl<'doc, C: OnEmbed<'doc>, I: Input> EmbedSession<'doc, C, I> {
|
||||
metadata: Metadata<'doc>,
|
||||
rendered: I,
|
||||
unused_vectors_distribution: &C::ErrorMetadata,
|
||||
) -> Result<()> {
|
||||
) {
|
||||
if self.inputs.len() < self.inputs.capacity() {
|
||||
self.inputs.push(rendered);
|
||||
self.metadata.push(metadata);
|
||||
return Ok(());
|
||||
return;
|
||||
}
|
||||
|
||||
self.embed_chunks(unused_vectors_distribution)
|
||||
}
|
||||
|
||||
pub fn drain(mut self, unused_vectors_distribution: &C::ErrorMetadata) -> Result<C> {
|
||||
self.embed_chunks(unused_vectors_distribution)?;
|
||||
Ok(self.on_embed)
|
||||
pub fn drain(mut self, unused_vectors_distribution: &C::ErrorMetadata) -> C {
|
||||
self.embed_chunks(unused_vectors_distribution);
|
||||
self.on_embed
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn embed_chunks(&mut self, unused_vectors_distribution: &C::ErrorMetadata) -> Result<()> {
|
||||
fn embed_chunks(&mut self, _unused_vectors_distribution: &C::ErrorMetadata) {
|
||||
if self.inputs.is_empty() {
|
||||
return Ok(());
|
||||
return;
|
||||
}
|
||||
let res = match I::embed_ref(
|
||||
self.inputs.as_slice(),
|
||||
self.embedder,
|
||||
self.threads,
|
||||
self.embedder_stats,
|
||||
) {
|
||||
match I::embed_ref(self.inputs.as_slice(), self.embedder, self.threads, self.embedder_stats)
|
||||
{
|
||||
Ok(embeddings) => {
|
||||
for (metadata, embedding) in self.metadata.iter().copied().zip(embeddings) {
|
||||
self.on_embed.process_embedding_response(EmbeddingResponse {
|
||||
@@ -141,27 +137,37 @@ impl<'doc, C: OnEmbed<'doc>, I: Input> EmbedSession<'doc, C, I> {
|
||||
embedding: Some(embedding),
|
||||
});
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
Err(error) => {
|
||||
// reset metadata and inputs, and send metadata to the error processing.
|
||||
let doc_alloc = self.metadata.bump();
|
||||
let metadata = std::mem::replace(
|
||||
&mut self.metadata,
|
||||
BVec::with_capacity_in(self.inputs.capacity(), doc_alloc),
|
||||
tracing::warn!(
|
||||
%error,
|
||||
"error embedding batch of documents, retrying one by one"
|
||||
);
|
||||
self.inputs.clear();
|
||||
return Err(self.on_embed.process_embedding_error(
|
||||
error,
|
||||
self.embedder_name,
|
||||
unused_vectors_distribution,
|
||||
metadata,
|
||||
));
|
||||
// retry with one call per input
|
||||
for (metadata, input) in self.metadata.iter().copied().zip(self.inputs.chunks(1)) {
|
||||
match I::embed_ref(input, self.embedder, self.threads, self.embedder_stats) {
|
||||
Ok(mut embeddings) => {
|
||||
let Some(embedding) = embeddings.pop() else {
|
||||
continue;
|
||||
};
|
||||
self.on_embed.process_embedding_response(EmbeddingResponse {
|
||||
metadata,
|
||||
embedding: Some(embedding),
|
||||
})
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
docid = metadata.external_docid,
|
||||
%err,
|
||||
"error embedding document"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
self.inputs.clear();
|
||||
self.metadata.clear();
|
||||
res
|
||||
}
|
||||
|
||||
pub(crate) fn embedder_name(&self) -> &'doc str {
|
||||
|
||||
Reference in New Issue
Block a user