From 2caa2be4412c482aa72c7bd802f6d93804e105e9 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Fri, 17 Oct 2025 15:40:41 +0200 Subject: [PATCH] Seeking the tasks/data.mdb file to the begining made the trick --- .../scheduler/process_snapshot_creation.rs | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs index 42a127c67..0e9f5b838 100644 --- a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs +++ b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs @@ -290,6 +290,7 @@ impl IndexScheduler { mut tasks: Vec, ) -> Result> { use std::io; + use std::io::SeekFrom; use std::path::Path; use async_compression::tokio::write::GzipEncoder; @@ -297,7 +298,7 @@ impl IndexScheduler { use bytes::{Bytes, BytesMut}; use meilisearch_types::milli::update::new::StdResult; use tokio::fs::File; - use tokio::io::AsyncReadExt; + use tokio::io::{AsyncSeekExt as _, AsyncWriteExt as _}; use tokio::task::JoinHandle; const ONE_HOUR: Duration = Duration::from_secs(3600); @@ -325,7 +326,8 @@ impl IndexScheduler { // TODO return a result with actual errors // TODO sign for longer than an hour? // TODO Use a better thing than a String for the object path - let (writer, mut reader) = tokio::net::unix::pipe::pipe()?; + // NOTE to make it work on Windows we could try using std::io::pipe instead + let (writer, reader) = tokio::net::unix::pipe::pipe()?; let uploader_task = tokio::spawn(async move { let action = bucket.create_multipart_upload(Some(&credential), &object); // TODO Question: If it is only signed for an hour and a snapshot takes longer than an hour, what happens? @@ -444,7 +446,7 @@ impl IndexScheduler { // remove it and get all the necessary data from the scheduler let index_scheduler = IndexScheduler::private_clone(self); let builder_task = tokio::task::spawn(async move { - // let compressed_writer = GzipEncoder::with_quality(writer, Level::Fastest); + // let writer = GzipEncoder::with_quality(writer, Level::Fastest); let mut tarball = tokio_tar::Builder::new(writer); // 1. Snapshot the version file @@ -459,8 +461,10 @@ impl IndexScheduler { progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler); let mut tasks_env_file = index_scheduler.env.try_clone_inner_file().map(File::from_std)?; + // NOTE That made the trick !!! Should I memory map instead? + tasks_env_file.seek(SeekFrom::Start(0)).await?; let path = Path::new("tasks").join("data.mdb"); - // NOTE when commenting this line, the tarballl works better + // NOTE when commenting this line, the tarball works better tarball.append_file(path, &mut tasks_env_file).await?; drop(tasks_env_file); @@ -473,6 +477,9 @@ impl IndexScheduler { let enqueued = index_scheduler.queue.tasks.get_status(&rtxn, Status::Enqueued)?; let (atomic, update_file_progress) = AtomicUpdateFileStep::new(enqueued.len() as u32); progress.update_progress(update_file_progress); + // TODO I need to create the update files directory (even if empty) + // I should probably simply use the append_dir_all method + // but I'll loose the progression. let update_files_dir = Path::new("update_files"); for task_id in enqueued { let task = index_scheduler @@ -505,6 +512,7 @@ impl IndexScheduler { // Note that we need to collect and open all of the indexes files because // otherwise, using a for loop, we would have to have a Send rtxn. + // TODO I don't need to do this trick if my writer is NOT async for (i, (name, uuid)) in indexes_references.into_iter().enumerate() { progress.update_progress(VariableNameStep::::new( &name, i as u32, nb_indexes, @@ -512,6 +520,7 @@ impl IndexScheduler { let path = indexes_dir.join(uuid.to_string()).join("data.mdb"); let index = index_scheduler.index_mapper.index(&rtxn, &name)?; let mut index_file = index.try_clone_inner_file().map(File::from_std).unwrap(); + index_file.seek(SeekFrom::Start(0)).await?; eprintln!("Appending index file for {} in {}", name, path.display()); tarball.append_file(path, &mut index_file).await?; } @@ -526,10 +535,12 @@ impl IndexScheduler { .try_clone_inner_file() .map(File::from_std) .unwrap(); + auth_env_file.seek(SeekFrom::Start(0)).await?; let path = Path::new("auth").join("data.mdb"); tarball.append_file(path, &mut auth_env_file).await?; - tarball.into_inner().await?; + let mut inner = tarball.into_inner().await?; + inner.flush().await?; Result::<_, Error>::Ok(()) });