mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-11-22 04:36:32 +00:00
Make it finaly work but without async on the write side
This commit is contained in:
committed by
Clément Renault
parent
2caa2be441
commit
a4ad87febf
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -3313,6 +3313,7 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"synchronoise",
|
||||
"tar",
|
||||
"tempfile",
|
||||
"thiserror 2.0.16",
|
||||
"time",
|
||||
|
||||
@@ -35,6 +35,7 @@ rayon = "1.10.0"
|
||||
roaring = { version = "0.10.12", features = ["serde"] }
|
||||
serde = { version = "1.0.219", features = ["derive"] }
|
||||
serde_json = { version = "1.0.140", features = ["preserve_order"] }
|
||||
tar = "0.4.44"
|
||||
synchronoise = "1.0.1"
|
||||
tempfile = "3.20.0"
|
||||
thiserror = "2.0.12"
|
||||
|
||||
@@ -289,16 +289,13 @@ impl IndexScheduler {
|
||||
secret_key: String,
|
||||
mut tasks: Vec<Task>,
|
||||
) -> Result<Vec<Task>> {
|
||||
use std::io;
|
||||
use std::io::SeekFrom;
|
||||
use std::fs::File;
|
||||
use std::io::{self, Seek as _, SeekFrom, Write as _};
|
||||
use std::os::fd::OwnedFd;
|
||||
use std::path::Path;
|
||||
|
||||
use async_compression::tokio::write::GzipEncoder;
|
||||
use async_compression::Level;
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use meilisearch_types::milli::update::new::StdResult;
|
||||
use tokio::fs::File;
|
||||
use tokio::io::{AsyncSeekExt as _, AsyncWriteExt as _};
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
const ONE_HOUR: Duration = Duration::from_secs(3600);
|
||||
@@ -327,8 +324,11 @@ impl IndexScheduler {
|
||||
// TODO sign for longer than an hour?
|
||||
// TODO Use a better thing than a String for the object path
|
||||
// NOTE to make it work on Windows we could try using std::io::pipe instead
|
||||
let (writer, reader) = tokio::net::unix::pipe::pipe()?;
|
||||
// let (writer, reader) = tokio::net::unix::pipe::pipe()?;
|
||||
let (reader, writer) = std::io::pipe()?;
|
||||
let uploader_task = tokio::spawn(async move {
|
||||
let reader = OwnedFd::from(reader);
|
||||
let reader = tokio::net::unix::pipe::Receiver::from_owned_fd(reader)?;
|
||||
let action = bucket.create_multipart_upload(Some(&credential), &object);
|
||||
// TODO Question: If it is only signed for an hour and a snapshot takes longer than an hour, what happens?
|
||||
// If the part is deleted (like a TTL) we should sign it for at least 24 hours.
|
||||
@@ -445,27 +445,25 @@ impl IndexScheduler {
|
||||
// TODO not a big fan of this clone
|
||||
// remove it and get all the necessary data from the scheduler
|
||||
let index_scheduler = IndexScheduler::private_clone(self);
|
||||
let builder_task = tokio::task::spawn(async move {
|
||||
// let writer = GzipEncoder::with_quality(writer, Level::Fastest);
|
||||
let mut tarball = tokio_tar::Builder::new(writer);
|
||||
let builder_task = tokio::task::spawn_blocking(move || {
|
||||
// NOTE enabling compression still generates a corrupted tarball
|
||||
let writer = flate2::write::GzEncoder::new(writer, flate2::Compression::fast());
|
||||
let mut tarball = tar::Builder::new(writer);
|
||||
|
||||
// 1. Snapshot the version file
|
||||
tarball
|
||||
.append_path_with_name(
|
||||
&index_scheduler.scheduler.version_file_path,
|
||||
VERSION_FILE_NAME,
|
||||
)
|
||||
.await?;
|
||||
tarball.append_path_with_name(
|
||||
&index_scheduler.scheduler.version_file_path,
|
||||
VERSION_FILE_NAME,
|
||||
)?;
|
||||
|
||||
// 2. Snapshot the index scheduler LMDB env
|
||||
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler);
|
||||
let mut tasks_env_file =
|
||||
index_scheduler.env.try_clone_inner_file().map(File::from_std)?;
|
||||
let mut tasks_env_file = index_scheduler.env.try_clone_inner_file()?;
|
||||
// NOTE That made the trick !!! Should I memory map instead?
|
||||
tasks_env_file.seek(SeekFrom::Start(0)).await?;
|
||||
tasks_env_file.seek(SeekFrom::Start(0))?;
|
||||
let path = Path::new("tasks").join("data.mdb");
|
||||
// NOTE when commenting this line, the tarball works better
|
||||
tarball.append_file(path, &mut tasks_env_file).await?;
|
||||
tarball.append_file(path, &mut tasks_env_file)?;
|
||||
drop(tasks_env_file);
|
||||
|
||||
// 2.3 Create a read transaction on the index-scheduler
|
||||
@@ -489,9 +487,9 @@ impl IndexScheduler {
|
||||
.ok_or(Error::CorruptedTaskQueue)?;
|
||||
if let Some(content_uuid) = task.content_uuid() {
|
||||
let src = index_scheduler.queue.file_store.update_path(content_uuid);
|
||||
let mut update_file = File::open(src).await?;
|
||||
let mut update_file = File::open(src)?;
|
||||
let path = update_files_dir.join(content_uuid.to_string());
|
||||
tarball.append_file(path, &mut update_file).await?;
|
||||
tarball.append_file(path, &mut update_file)?;
|
||||
}
|
||||
atomic.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
@@ -519,28 +517,27 @@ impl IndexScheduler {
|
||||
));
|
||||
let path = indexes_dir.join(uuid.to_string()).join("data.mdb");
|
||||
let index = index_scheduler.index_mapper.index(&rtxn, &name)?;
|
||||
let mut index_file = index.try_clone_inner_file().map(File::from_std).unwrap();
|
||||
index_file.seek(SeekFrom::Start(0)).await?;
|
||||
let mut index_file = index.try_clone_inner_file().unwrap();
|
||||
index_file.seek(SeekFrom::Start(0))?;
|
||||
eprintln!("Appending index file for {} in {}", name, path.display());
|
||||
tarball.append_file(path, &mut index_file).await?;
|
||||
tarball.append_file(path, &mut index_file)?;
|
||||
}
|
||||
|
||||
drop(rtxn);
|
||||
|
||||
// 4. Snapshot the auth LMDB env
|
||||
progress.update_progress(SnapshotCreationProgress::SnapshotTheApiKeys);
|
||||
let mut auth_env_file = index_scheduler
|
||||
.scheduler
|
||||
.auth_env
|
||||
.try_clone_inner_file()
|
||||
.map(File::from_std)
|
||||
.unwrap();
|
||||
auth_env_file.seek(SeekFrom::Start(0)).await?;
|
||||
let mut auth_env_file =
|
||||
index_scheduler.scheduler.auth_env.try_clone_inner_file().unwrap();
|
||||
auth_env_file.seek(SeekFrom::Start(0))?;
|
||||
let path = Path::new("auth").join("data.mdb");
|
||||
tarball.append_file(path, &mut auth_env_file).await?;
|
||||
tarball.append_file(path, &mut auth_env_file)?;
|
||||
|
||||
let mut inner = tarball.into_inner().await?;
|
||||
inner.flush().await?;
|
||||
let mut gzencoder = tarball.into_inner()?;
|
||||
gzencoder.flush()?;
|
||||
gzencoder.try_finish()?;
|
||||
let mut writer = gzencoder.finish()?;
|
||||
writer.flush()?;
|
||||
|
||||
Result::<_, Error>::Ok(())
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user