Compare commits

...

24 Commits

Author SHA1 Message Date
Kerollmops
fbe56822f0 Disable compression entirely to avoid being CPU bound 2025-10-23 13:32:32 +02:00
Clément Renault
5e1af30b42 Improve the way we create the snapshot path 2025-10-20 16:48:14 +02:00
Clément Renault
d298b21a95 Remove useless dependencies 2025-10-20 16:48:14 +02:00
Kerollmops
a4ad87febf Make it finaly work but without async on the write side 2025-10-20 16:48:14 +02:00
Kerollmops
2caa2be441 Seeking the tasks/data.mdb file to the begining made the trick 2025-10-20 16:48:14 +02:00
Kerollmops
a829ded023 Improve understanding of S3-related errors 2025-10-20 16:48:14 +02:00
Kerollmops
98be43b66b Retrieve the bytesMut only when released 2025-10-20 16:48:14 +02:00
Kerollmops
3ec4750426 Fix minimum part size 2025-10-20 16:48:14 +02:00
Kerollmops
d717ec3486 Improve error messaging when missing env var 2025-10-20 16:48:14 +02:00
Clément Renault
0c06bdefac WIP 2025-10-20 16:48:13 +02:00
Kerollmops
62a8133bcd WIP Do more tests 2025-10-20 16:48:13 +02:00
Clément Renault
c1f7542dfa WIP sending multiparts of 250MiB 2025-10-20 16:48:13 +02:00
Clément Renault
9105e8bb8c Rename the update_path function 2025-10-20 16:48:13 +02:00
Clément Renault
59e2394e69 Geenrate an async tarball 2025-10-20 16:48:13 +02:00
Kerollmops
430bc91c4c WIP 2025-10-20 16:48:13 +02:00
Kerollmops
1172dce093 Make max in flights parts fro upload configurable 2025-10-20 16:48:13 +02:00
Kerollmops
21140a33e6 Use a good mem advice for uploads 2025-10-20 16:48:13 +02:00
Kerollmops
9ed7a81495 Move the S3 snapshots to disk into a dedicated method 2025-10-20 16:48:13 +02:00
Clément Renault
c8c1d95efd Upload ten parts at a time 2025-10-20 16:48:13 +02:00
Clément Renault
10d1e26478 Use the Bytes crate to send the parts 2025-10-20 16:48:13 +02:00
Clément Renault
cd781d267b Upload indexes under their uuids 2025-10-20 16:48:13 +02:00
Clément Renault
b16af4a763 Initial working S3 uploads to RustFS 2025-10-20 16:48:13 +02:00
Louis Dureuil
1514d13ab3 Pass tokio handle to index-scheduler 2025-10-20 16:48:13 +02:00
Kerollmops
d51f13a59e Use the latest version of heed 2025-10-20 16:48:13 +02:00
13 changed files with 492 additions and 25 deletions

93
Cargo.lock generated
View File

@@ -1123,7 +1123,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "159fa412eae48a1d94d0b9ecdb85c97ce56eb2a347c62394d3fdbf221adabc1a"
dependencies = [
"path-matchers",
"path-slash",
"path-slash 0.1.5",
]
[[package]]
@@ -2838,9 +2838,9 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
[[package]]
name = "heed"
version = "0.22.1-nested-rtxns"
version = "0.22.1-nested-rtxns-2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ff115ba5712b1f1fc7617b195f5c2f139e29c397ff79da040cd19db75ccc240"
checksum = "a644ab0b1e8234a7c82e83e26b24af1e4a4b8317629fa6ccd9ea2608c1595c73"
dependencies = [
"bitflags 2.9.4",
"byteorder",
@@ -3242,6 +3242,7 @@ dependencies = [
"bumpalo",
"bumparaw-collections",
"byte-unit",
"bytes",
"convert_case 0.8.0",
"crossbeam-channel",
"csv",
@@ -3258,14 +3259,19 @@ dependencies = [
"meilisearch-types",
"memmap2",
"page_size",
"path-slash 0.2.1",
"rayon",
"reqwest",
"roaring 0.10.12",
"rusty-s3",
"serde",
"serde_json",
"synchronoise",
"tar",
"tempfile",
"thiserror 2.0.16",
"time",
"tokio",
"tracing",
"ureq",
"uuid",
@@ -3465,6 +3471,30 @@ dependencies = [
"regex",
]
[[package]]
name = "jiff"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be1f93b8b1eb69c77f24bbb0afdf66f54b632ee39af40ca21c4365a1d7347e49"
dependencies = [
"jiff-static",
"log",
"portable-atomic",
"portable-atomic-util",
"serde",
]
[[package]]
name = "jiff-static"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03343451ff899767262ec32146f6d559dd759fdadf42ff0e227c7c48f72594b4"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.106",
]
[[package]]
name = "jobserver"
version = "0.1.34"
@@ -3988,6 +4018,16 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d"
[[package]]
name = "md-5"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf"
dependencies = [
"cfg-if",
"digest",
]
[[package]]
name = "md5"
version = "0.7.0"
@@ -4745,6 +4785,12 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "498a099351efa4becc6a19c72aa9270598e8fd274ca47052e37455241c88b696"
[[package]]
name = "path-slash"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e91099d4268b0e11973f036e885d652fb0b21fedcf69738c627f94db6a44f42"
[[package]]
name = "pbkdf2"
version = "0.12.2"
@@ -4962,6 +5008,15 @@ version = "1.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483"
[[package]]
name = "portable-atomic-util"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507"
dependencies = [
"portable-atomic",
]
[[package]]
name = "potential_utf"
version = "0.1.3"
@@ -5139,6 +5194,16 @@ dependencies = [
"version_check",
]
[[package]]
name = "quick-xml"
version = "0.38.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42a232e7487fc2ef313d96dde7948e7a3c05101870d8985e4fd8d26aedd27b89"
dependencies = [
"memchr",
"serde",
]
[[package]]
name = "quinn"
version = "0.11.9"
@@ -5414,6 +5479,7 @@ dependencies = [
"futures-channel",
"futures-core",
"futures-util",
"h2 0.4.12",
"http 1.3.1",
"http-body",
"http-body-util",
@@ -5704,6 +5770,25 @@ version = "1.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d"
[[package]]
name = "rusty-s3"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fac2edd2f0b56bd79a7343f49afc01c2d41010df480538a510e0abc56044f66c"
dependencies = [
"base64 0.22.1",
"hmac",
"jiff",
"md-5",
"percent-encoding",
"quick-xml",
"serde",
"serde_json",
"sha2",
"url",
"zeroize",
]
[[package]]
name = "ryu"
version = "1.0.20"
@@ -6182,7 +6267,7 @@ checksum = "f9c425c07353535ef55b45420f5a8b0a397cd9bc3d7e5236497ca0d90604aa9b"
dependencies = [
"change-detection",
"mime_guess",
"path-slash",
"path-slash 0.1.5",
]
[[package]]

View File

@@ -60,7 +60,7 @@ impl FileStore {
/// Returns the file corresponding to the requested uuid.
pub fn get_update(&self, uuid: Uuid) -> Result<StdFile> {
let path = self.get_update_path(uuid);
let path = self.update_path(uuid);
let file = match StdFile::open(path) {
Ok(file) => file,
Err(e) => {
@@ -72,7 +72,7 @@ impl FileStore {
}
/// Returns the path that correspond to this uuid, the path could not exists.
pub fn get_update_path(&self, uuid: Uuid) -> PathBuf {
pub fn update_path(&self, uuid: Uuid) -> PathBuf {
self.path.join(uuid.to_string())
}

View File

@@ -14,6 +14,7 @@ license.workspace = true
anyhow = "1.0.98"
bincode = "1.3.3"
byte-unit = "5.1.6"
bytes = "1.10.1"
bumpalo = "3.18.1"
bumparaw-collections = "0.1.4"
convert_case = "0.8.0"
@@ -28,10 +29,12 @@ meilisearch-auth = { path = "../meilisearch-auth" }
meilisearch-types = { path = "../meilisearch-types" }
memmap2 = "0.9.7"
page_size = "0.6.0"
path-slash = "0.2.1"
rayon = "1.10.0"
roaring = { version = "0.10.12", features = ["serde"] }
serde = { version = "1.0.219", features = ["derive"] }
serde_json = { version = "1.0.140", features = ["preserve_order"] }
tar = "0.4.44"
synchronoise = "1.0.1"
tempfile = "3.20.0"
thiserror = "2.0.12"
@@ -45,6 +48,9 @@ tracing = "0.1.41"
ureq = "2.12.1"
uuid = { version = "1.17.0", features = ["serde", "v4"] }
backoff = "0.4.0"
reqwest = { version = "0.12.23", features = ["rustls-tls", "http2"], default-features = false }
rusty-s3 = "0.8.1"
tokio = { version = "1.47.1", features = ["full"] }
[dev-dependencies]
big_s = "1.0.2"

View File

@@ -36,6 +36,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
run_loop_iteration: _,
embedders: _,
chat_settings: _,
runtime: _,
} = scheduler;
let rtxn = env.read_txn().unwrap();

View File

@@ -216,6 +216,8 @@ pub struct IndexScheduler {
/// A counter that is incremented before every call to [`tick`](IndexScheduler::tick)
#[cfg(test)]
run_loop_iteration: Arc<RwLock<usize>>,
runtime: Option<tokio::runtime::Handle>,
}
impl IndexScheduler {
@@ -242,6 +244,7 @@ impl IndexScheduler {
run_loop_iteration: self.run_loop_iteration.clone(),
features: self.features.clone(),
chat_settings: self.chat_settings,
runtime: self.runtime.clone(),
}
}
@@ -260,6 +263,7 @@ impl IndexScheduler {
options: IndexSchedulerOptions,
auth_env: Env<WithoutTls>,
from_db_version: (u32, u32, u32),
runtime: Option<tokio::runtime::Handle>,
#[cfg(test)] test_breakpoint_sdr: crossbeam_channel::Sender<(test_utils::Breakpoint, bool)>,
#[cfg(test)] planned_failures: Vec<(usize, test_utils::FailureLocation)>,
) -> Result<Self> {
@@ -341,6 +345,7 @@ impl IndexScheduler {
run_loop_iteration: Arc::new(RwLock::new(0)),
features,
chat_settings,
runtime,
};
this.run();

View File

@@ -1,11 +1,20 @@
use std::collections::VecDeque;
use std::env::VarError;
use std::ffi::OsStr;
use std::fs;
use std::path::PathBuf;
use std::sync::atomic::Ordering;
use std::time::Duration;
use meilisearch_types::heed::CompactionOption;
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
use meilisearch_types::tasks::{Status, Task};
use meilisearch_types::{compression, VERSION_FILE_NAME};
use path_slash::PathBufExt;
use reqwest::header::ETAG;
use reqwest::Client;
use rusty_s3::actions::{CreateMultipartUpload, S3Action as _};
use rusty_s3::{Bucket, Credentials, UrlStyle};
use crate::heed::EnvOpenOptions;
use crate::processing::{AtomicUpdateFileStep, SnapshotCreationProgress};
@@ -78,10 +87,73 @@ impl IndexScheduler {
pub(super) fn process_snapshot(
&self,
progress: Progress,
mut tasks: Vec<Task>,
tasks: Vec<Task>,
) -> Result<Vec<Task>> {
progress.update_progress(SnapshotCreationProgress::StartTheSnapshotCreation);
const S3_BUCKET_URL: &str = "MEILI_S3_BUCKET_URL";
const S3_BUCKET_REGION: &str = "MEILI_S3_BUCKET_REGION";
const S3_BUCKET_NAME: &str = "MEILI_S3_BUCKET_NAME";
const S3_SNAPSHOT_PREFIX: &str = "MEILI_S3_SNAPSHOT_PREFIX";
const S3_ACCESS_KEY: &str = "MEILI_S3_ACCESS_KEY";
const S3_SECRET_KEY: &str = "MEILI_S3_SECRET_KEY";
let bucket_url = std::env::var(S3_BUCKET_URL).map_err(|e| (S3_BUCKET_URL, e));
let bucket_region = std::env::var(S3_BUCKET_REGION).map_err(|e| (S3_BUCKET_REGION, e));
let bucket_name = std::env::var(S3_BUCKET_NAME).map_err(|e| (S3_BUCKET_NAME, e));
let snapshot_prefix =
std::env::var(S3_SNAPSHOT_PREFIX).map_err(|e| (S3_SNAPSHOT_PREFIX, e));
let access_key = std::env::var(S3_ACCESS_KEY).map_err(|e| (S3_ACCESS_KEY, e));
let secret_key = std::env::var(S3_SECRET_KEY).map_err(|e| (S3_SECRET_KEY, e));
match (bucket_url, bucket_region, bucket_name, snapshot_prefix, access_key, secret_key) {
(
Ok(bucket_url),
Ok(bucket_region),
Ok(bucket_name),
Ok(snapshot_prefix),
Ok(access_key),
Ok(secret_key),
) => {
let runtime = self.runtime.as_ref().expect("Runtime not initialized");
#[cfg(not(unix))]
panic!("Non-unix platform does not support S3 snapshotting");
#[cfg(unix)]
runtime.block_on(self.process_snapshot_to_s3(
progress,
bucket_url,
bucket_region,
bucket_name,
PathBuf::from_slash(snapshot_prefix),
access_key,
secret_key,
tasks,
))
}
(
Err((_, VarError::NotPresent)),
Err((_, VarError::NotPresent)),
Err((_, VarError::NotPresent)),
Err((_, VarError::NotPresent)),
Err((_, VarError::NotPresent)),
Err((_, VarError::NotPresent)),
) => self.process_snapshots_to_disk(progress, tasks),
(Err((var, e)), _, _, _, _, _)
| (_, Err((var, e)), _, _, _, _)
| (_, _, Err((var, e)), _, _, _)
| (_, _, _, Err((var, e)), _, _)
| (_, _, _, _, Err((var, e)), _)
| (_, _, _, _, _, Err((var, e))) => {
// TODO: Handle error gracefully
panic!("Error while reading environment variables: {}: {}", var, e);
}
}
}
fn process_snapshots_to_disk(
&self,
progress: Progress,
mut tasks: Vec<Task>,
) -> Result<Vec<Task>, Error> {
fs::create_dir_all(&self.scheduler.snapshots_path)?;
let temp_snapshot_dir = tempfile::tempdir()?;
@@ -140,7 +212,7 @@ impl IndexScheduler {
let task =
self.queue.tasks.get_task(&rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
if let Some(content_uuid) = task.content_uuid() {
let src = self.queue.file_store.get_update_path(content_uuid);
let src = self.queue.file_store.update_path(content_uuid);
let dst = update_files_dir.join(content_uuid.to_string());
fs::copy(src, dst)?;
}
@@ -206,4 +278,283 @@ impl IndexScheduler {
Ok(tasks)
}
#[cfg(unix)]
pub(super) async fn process_snapshot_to_s3(
&self,
progress: Progress,
bucket_url: String,
bucket_region: String,
bucket_name: String,
snapshot_prefix: PathBuf,
access_key: String,
secret_key: String,
mut tasks: Vec<Task>,
) -> Result<Vec<Task>> {
use std::fs::File;
use std::io::{self, Seek as _, SeekFrom, Write as _};
use std::os::fd::OwnedFd;
use std::path::Path;
use bytes::{Bytes, BytesMut};
use reqwest::Response;
use tokio::task::JoinHandle;
const ONE_HOUR: Duration = Duration::from_secs(3600);
// default part size is 250MiB
// TODO use 375MiB
// It must be at least 2x5MiB
const PART_SIZE: usize = 10 * 1024 * 1024;
// The maximum number of parts that can be uploaded in parallel.
const S3_MAX_IN_FLIGHT_PARTS: &str = "MEILI_S3_MAX_IN_FLIGHT_PARTS";
let max_in_flight_parts: usize = match std::env::var(S3_MAX_IN_FLIGHT_PARTS) {
Ok(val) => val.parse().expect("Failed to parse MEILI_S3_MAX_IN_FLIGHT_PARTS"),
Err(_) => 10,
};
let client = Client::new();
// TODO Remove this unwrap
let url = bucket_url.parse().unwrap();
let bucket = Bucket::new(url, UrlStyle::Path, bucket_name, bucket_region).unwrap();
let credential = Credentials::new(access_key, secret_key);
// TODO change this and use the database name like in the original version
let object_path = snapshot_prefix.join("data.ms.snapshot");
let object = object_path.to_slash().expect("Invalid UTF-8 path").into_owned();
eprintln!("Starting the upload of the snapshot to {object}");
// TODO implement exponential backoff on upload requests: https://docs.rs/backoff
// TODO return a result with actual errors
// TODO sign for longer than an hour?
// NOTE to make it work on Windows we could try using std::io::pipe instead.
// However, we are still using the tokio unix pipe in the async upload loop.
let (reader, writer) = std::io::pipe()?;
let uploader_task = tokio::spawn(async move {
let reader = OwnedFd::from(reader);
let reader = tokio::net::unix::pipe::Receiver::from_owned_fd(reader)?;
let action = bucket.create_multipart_upload(Some(&credential), &object);
// TODO Question: If it is only signed for an hour and a snapshot takes longer than an hour, what happens?
// If the part is deleted (like a TTL) we should sign it for at least 24 hours.
let url = action.sign(ONE_HOUR);
let resp = client.post(url).send().await.unwrap().error_for_status().unwrap();
let body = resp.text().await.unwrap();
let multipart = CreateMultipartUpload::parse_response(&body).unwrap();
let mut etags = Vec::<String>::new();
let mut in_flight =
VecDeque::<(JoinHandle<reqwest::Result<Response>>, Bytes)>::with_capacity(
max_in_flight_parts,
);
for part_number in 1u16.. {
let part_upload = bucket.upload_part(
Some(&credential),
&object,
part_number,
multipart.upload_id(),
);
let url = part_upload.sign(ONE_HOUR);
// Wait for a buffer to be ready if there are in-flight parts that landed
let mut buffer = if in_flight.len() >= max_in_flight_parts {
let (request, buffer) = in_flight.pop_front().unwrap();
let resp = request.await.unwrap().unwrap().error_for_status().unwrap();
let etag =
resp.headers().get(ETAG).expect("every UploadPart request returns an Etag");
let mut buffer = match buffer.try_into_mut() {
Ok(buffer) => buffer,
Err(_) => panic!("Valid to convert into BytesMut"),
};
// TODO use bumpalo to reduce the number of allocations
etags.push(etag.to_str().unwrap().to_owned());
buffer.clear();
buffer
} else {
// TODO Base this on the available memory
BytesMut::with_capacity(PART_SIZE)
};
while buffer.len() < (PART_SIZE / 2) {
eprintln!(
"buffer is {:.2}% full, trying to read more",
buffer.len() as f32 / buffer.capacity() as f32 * 100.0
);
// Wait for the pipe to be readable
reader.readable().await?;
match reader.try_read_buf(&mut buffer) {
Ok(0) => break,
// We read some bytes but maybe not enough
Ok(n) => {
eprintln!("Read {} bytes from pipe, continuing", n);
continue;
}
// The readiness event is a false positive.
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
eprintln!("received a WouldBlock");
continue;
}
Err(e) => return Err(e.into()),
}
}
eprintln!(
"buffer is {:.2}% full",
buffer.len() as f32 / buffer.capacity() as f32 * 100.0
);
if buffer.is_empty() {
eprintln!("buffer is empty, breaking part number loop");
// Break the loop if the buffer is empty
// after we tried to read bytes
break;
}
let body = buffer.freeze();
eprintln!("Sending part {}", part_number);
let task = tokio::spawn(client.put(url).body(body.clone()).send());
in_flight.push_back((task, body));
}
for (join_handle, _buffer) in in_flight {
let resp = join_handle.await.unwrap().unwrap().error_for_status().unwrap();
let etag =
resp.headers().get(ETAG).expect("every UploadPart request returns an Etag");
// TODO use bumpalo to reduce the number of allocations
etags.push(etag.to_str().unwrap().to_owned());
}
eprintln!("Finalizing the multipart upload");
let action = bucket.complete_multipart_upload(
Some(&credential),
&object,
multipart.upload_id(),
etags.iter().map(AsRef::as_ref),
);
let url = action.sign(ONE_HOUR);
let resp = client.post(url).body(action.body()).send().await.unwrap();
let status = resp.status();
let text = resp.text().await.unwrap();
eprintln!("Status: {status}, Text: {text}");
// TODO do a better check and do not assert
// assert!(resp.status().is_success());
Result::<_, Error>::Ok(())
});
// TODO not a big fan of this clone
// remove it and get all the necessary data from the scheduler
let index_scheduler = IndexScheduler::private_clone(self);
let builder_task = tokio::task::spawn_blocking(move || {
// NOTE enabling compression still generates a corrupted tarball
let writer = flate2::write::GzEncoder::new(writer, flate2::Compression::none());
let mut tarball = tar::Builder::new(writer);
// 1. Snapshot the version file
tarball.append_path_with_name(
&index_scheduler.scheduler.version_file_path,
VERSION_FILE_NAME,
)?;
// 2. Snapshot the index scheduler LMDB env
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler);
let mut tasks_env_file = index_scheduler.env.try_clone_inner_file()?;
// NOTE That made the trick !!! Should I memory map instead?
tasks_env_file.seek(SeekFrom::Start(0))?;
let path = Path::new("tasks").join("data.mdb");
// NOTE when commenting this line, the tarball works better
tarball.append_file(path, &mut tasks_env_file)?;
drop(tasks_env_file);
// 2.3 Create a read transaction on the index-scheduler
let rtxn = index_scheduler.env.read_txn()?;
// 2.4 Create the update files directory
// And only copy the update files of the enqueued tasks
progress.update_progress(SnapshotCreationProgress::SnapshotTheUpdateFiles);
let enqueued = index_scheduler.queue.tasks.get_status(&rtxn, Status::Enqueued)?;
let (atomic, update_file_progress) = AtomicUpdateFileStep::new(enqueued.len() as u32);
progress.update_progress(update_file_progress);
// TODO I need to create the update files directory (even if empty)
// I should probably simply use the append_dir_all method
// but I'll loose the progression.
let update_files_dir = Path::new("update_files");
for task_id in enqueued {
let task = index_scheduler
.queue
.tasks
.get_task(&rtxn, task_id)?
.ok_or(Error::CorruptedTaskQueue)?;
if let Some(content_uuid) = task.content_uuid() {
let src = index_scheduler.queue.file_store.update_path(content_uuid);
let mut update_file = File::open(src)?;
let path = update_files_dir.join(content_uuid.to_string());
tarball.append_file(path, &mut update_file)?;
}
atomic.fetch_add(1, Ordering::Relaxed);
}
// 3. Snapshot every indexes
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexes);
let index_mapping = index_scheduler.index_mapper.index_mapping;
let nb_indexes = index_mapping.len(&rtxn)? as u32;
let indexes_dir = Path::new("indexes");
let indexes_references: Vec<_> = index_scheduler
.index_mapper
.index_mapping
.iter(&rtxn)?
.map(|res| res.map_err(Error::from).map(|(name, uuid)| (name.to_string(), uuid)))
.collect::<Result<_, Error>>()?;
dbg!(&indexes_references);
// Note that we need to collect and open all of the indexes files because
// otherwise, using a for loop, we would have to have a Send rtxn.
// TODO I don't need to do this trick if my writer is NOT async
for (i, (name, uuid)) in indexes_references.into_iter().enumerate() {
progress.update_progress(VariableNameStep::<SnapshotCreationProgress>::new(
&name, i as u32, nb_indexes,
));
let path = indexes_dir.join(uuid.to_string()).join("data.mdb");
let index = index_scheduler.index_mapper.index(&rtxn, &name)?;
let mut index_file = index.try_clone_inner_file().unwrap();
index_file.seek(SeekFrom::Start(0))?;
eprintln!("Appending index file for {} in {}", name, path.display());
tarball.append_file(path, &mut index_file)?;
}
drop(rtxn);
// 4. Snapshot the auth LMDB env
progress.update_progress(SnapshotCreationProgress::SnapshotTheApiKeys);
let mut auth_env_file =
index_scheduler.scheduler.auth_env.try_clone_inner_file().unwrap();
auth_env_file.seek(SeekFrom::Start(0))?;
let path = Path::new("auth").join("data.mdb");
tarball.append_file(path, &mut auth_env_file)?;
let mut gzencoder = tarball.into_inner()?;
gzencoder.flush()?;
gzencoder.try_finish()?;
let mut writer = gzencoder.finish()?;
writer.flush()?;
Result::<_, Error>::Ok(())
});
let (uploader_result, builder_result) = tokio::join!(uploader_task, builder_task);
builder_result.unwrap()?;
uploader_result.unwrap()?;
for task in &mut tasks {
task.status = Status::Succeeded;
}
Ok(tasks)
}
}

View File

@@ -126,7 +126,7 @@ impl IndexScheduler {
std::fs::create_dir_all(&options.auth_path).unwrap();
let auth_env = open_auth_store_env(&options.auth_path).unwrap();
let index_scheduler =
Self::new(options, auth_env, version, sender, planned_failures).unwrap();
Self::new(options, auth_env, version, None, sender, planned_failures).unwrap();
// To be 100% consistent between all test we're going to start the scheduler right now
// and ensure it's in the expected starting state.

View File

@@ -216,7 +216,10 @@ enum OnFailure {
KeepDb,
}
pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<AuthController>)> {
pub fn setup_meilisearch(
opt: &Opt,
handle: tokio::runtime::Handle,
) -> anyhow::Result<(Arc<IndexScheduler>, Arc<AuthController>)> {
let index_scheduler_opt = IndexSchedulerOptions {
version_file_path: opt.db_path.join(VERSION_FILE_NAME),
auth_path: opt.db_path.join("auth"),
@@ -256,6 +259,7 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
index_scheduler_opt,
OnFailure::RemoveDb,
binary_version, // the db is empty
handle,
)?,
Err(e) => {
std::fs::remove_dir_all(&opt.db_path)?;
@@ -273,7 +277,7 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
bail!("snapshot doesn't exist at {}", snapshot_path.display())
// the snapshot and the db exist, and we can ignore the snapshot because of the ignore_snapshot_if_db_exists flag
} else {
open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version)?
open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version, handle)?
}
} else if let Some(ref path) = opt.import_dump {
let src_path_exists = path.exists();
@@ -284,6 +288,7 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
index_scheduler_opt,
OnFailure::RemoveDb,
binary_version, // the db is empty
handle,
)?;
match import_dump(&opt.db_path, path, &mut index_scheduler, &mut auth_controller) {
Ok(()) => (index_scheduler, auth_controller),
@@ -304,10 +309,10 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
// the dump and the db exist and we can ignore the dump because of the ignore_dump_if_db_exists flag
// or, the dump is missing but we can ignore that because of the ignore_missing_dump flag
} else {
open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version)?
open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version, handle)?
}
} else {
open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version)?
open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version, handle)?
};
// We create a loop in a thread that registers snapshotCreation tasks
@@ -338,6 +343,7 @@ fn open_or_create_database_unchecked(
index_scheduler_opt: IndexSchedulerOptions,
on_failure: OnFailure,
version: (u32, u32, u32),
handle: tokio::runtime::Handle,
) -> anyhow::Result<(IndexScheduler, AuthController)> {
// we don't want to create anything in the data.ms yet, thus we
// wrap our two builders in a closure that'll be executed later.
@@ -345,7 +351,7 @@ fn open_or_create_database_unchecked(
let auth_env = open_auth_store_env(&index_scheduler_opt.auth_path).unwrap();
let auth_controller = AuthController::new(auth_env.clone(), &opt.master_key);
let index_scheduler_builder = || -> anyhow::Result<_> {
Ok(IndexScheduler::new(index_scheduler_opt, auth_env, version)?)
Ok(IndexScheduler::new(index_scheduler_opt, auth_env, version, Some(handle))?)
};
match (
@@ -452,6 +458,7 @@ fn open_or_create_database(
index_scheduler_opt: IndexSchedulerOptions,
empty_db: bool,
binary_version: (u32, u32, u32),
handle: tokio::runtime::Handle,
) -> anyhow::Result<(IndexScheduler, AuthController)> {
let version = if !empty_db {
check_version(opt, &index_scheduler_opt, binary_version)?
@@ -459,7 +466,7 @@ fn open_or_create_database(
binary_version
};
open_or_create_database_unchecked(opt, index_scheduler_opt, OnFailure::KeepDb, version)
open_or_create_database_unchecked(opt, index_scheduler_opt, OnFailure::KeepDb, version, handle)
}
fn import_dump(

View File

@@ -76,7 +76,10 @@ fn on_panic(info: &std::panic::PanicHookInfo) {
#[actix_web::main]
async fn main() -> anyhow::Result<()> {
try_main().await.inspect_err(|error| {
// won't panic inside of tokio::main
let runtime = tokio::runtime::Handle::current();
try_main(runtime).await.inspect_err(|error| {
tracing::error!(%error);
let mut current = error.source();
let mut depth = 0;
@@ -88,7 +91,7 @@ async fn main() -> anyhow::Result<()> {
})
}
async fn try_main() -> anyhow::Result<()> {
async fn try_main(runtime: tokio::runtime::Handle) -> anyhow::Result<()> {
let (opt, config_read_from) = Opt::try_build()?;
std::panic::set_hook(Box::new(on_panic));
@@ -122,7 +125,7 @@ async fn try_main() -> anyhow::Result<()> {
_ => (),
}
let (index_scheduler, auth_controller) = setup_meilisearch(&opt)?;
let (index_scheduler, auth_controller) = setup_meilisearch(&opt, runtime)?;
let analytics =
analytics::Analytics::new(&opt, index_scheduler.clone(), auth_controller.clone()).await;

View File

@@ -49,8 +49,8 @@ impl Server<Owned> {
}
let options = default_settings(dir.path());
let (index_scheduler, auth) = setup_meilisearch(&options).unwrap();
let handle = tokio::runtime::Handle::current();
let (index_scheduler, auth) = setup_meilisearch(&options, handle).unwrap();
let service = Service { index_scheduler, auth, options, api_key: None };
Server { service, _dir: Some(dir), _marker: PhantomData }
@@ -65,7 +65,9 @@ impl Server<Owned> {
options.master_key = Some("MASTER_KEY".to_string());
let (index_scheduler, auth) = setup_meilisearch(&options).unwrap();
let handle = tokio::runtime::Handle::current();
let (index_scheduler, auth) = setup_meilisearch(&options, handle).unwrap();
let service = Service { index_scheduler, auth, options, api_key: None };
Server { service, _dir: Some(dir), _marker: PhantomData }
@@ -78,7 +80,9 @@ impl Server<Owned> {
}
pub async fn new_with_options(options: Opt) -> Result<Self, anyhow::Error> {
let (index_scheduler, auth) = setup_meilisearch(&options)?;
let handle = tokio::runtime::Handle::current();
let (index_scheduler, auth) = setup_meilisearch(&options, handle)?;
let service = Service { index_scheduler, auth, options, api_key: None };
Ok(Server { service, _dir: None, _marker: PhantomData })
@@ -217,8 +221,9 @@ impl Server<Shared> {
}
let options = default_settings(dir.path());
let handle = tokio::runtime::Handle::current();
let (index_scheduler, auth) = setup_meilisearch(&options).unwrap();
let (index_scheduler, auth) = setup_meilisearch(&options, handle).unwrap();
let service = Service { index_scheduler, auth, api_key: None, options };
Server { service, _dir: Some(dir), _marker: PhantomData }

View File

@@ -68,7 +68,7 @@ fn convert_update_files(db_path: &Path) -> anyhow::Result<()> {
for uuid in file_store.all_uuids().context("while retrieving uuids from file store")? {
let uuid = uuid.context("while retrieving uuid from file store")?;
let update_file_path = file_store.get_update_path(uuid);
let update_file_path = file_store.update_path(uuid);
let update_file = file_store
.get_update(uuid)
.with_context(|| format!("while getting update file for uuid {uuid:?}"))?;

View File

@@ -34,7 +34,7 @@ grenad = { version = "0.5.0", default-features = false, features = [
"rayon",
"tempfile",
] }
heed = { version = "0.22.1-nested-rtxns", default-features = false, features = [
heed = { version = "0.22.1-nested-rtxns-2", default-features = false, features = [
"serde-json",
"serde-bincode",
] }

View File

@@ -425,6 +425,10 @@ impl Index {
self.env.info().map_size
}
pub fn try_clone_inner_file(&self) -> Result<File> {
Ok(self.env.try_clone_inner_file()?)
}
pub fn copy_to_file(&self, file: &mut File, option: CompactionOption) -> Result<()> {
self.env.copy_to_file(file, option).map_err(Into::into)
}