mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-11-22 04:36:32 +00:00
Seeking the tasks/data.mdb file to the begining made the trick
This commit is contained in:
committed by
Clément Renault
parent
a829ded023
commit
2caa2be441
@@ -290,6 +290,7 @@ impl IndexScheduler {
|
|||||||
mut tasks: Vec<Task>,
|
mut tasks: Vec<Task>,
|
||||||
) -> Result<Vec<Task>> {
|
) -> Result<Vec<Task>> {
|
||||||
use std::io;
|
use std::io;
|
||||||
|
use std::io::SeekFrom;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
|
|
||||||
use async_compression::tokio::write::GzipEncoder;
|
use async_compression::tokio::write::GzipEncoder;
|
||||||
@@ -297,7 +298,7 @@ impl IndexScheduler {
|
|||||||
use bytes::{Bytes, BytesMut};
|
use bytes::{Bytes, BytesMut};
|
||||||
use meilisearch_types::milli::update::new::StdResult;
|
use meilisearch_types::milli::update::new::StdResult;
|
||||||
use tokio::fs::File;
|
use tokio::fs::File;
|
||||||
use tokio::io::AsyncReadExt;
|
use tokio::io::{AsyncSeekExt as _, AsyncWriteExt as _};
|
||||||
use tokio::task::JoinHandle;
|
use tokio::task::JoinHandle;
|
||||||
|
|
||||||
const ONE_HOUR: Duration = Duration::from_secs(3600);
|
const ONE_HOUR: Duration = Duration::from_secs(3600);
|
||||||
@@ -325,7 +326,8 @@ impl IndexScheduler {
|
|||||||
// TODO return a result with actual errors
|
// TODO return a result with actual errors
|
||||||
// TODO sign for longer than an hour?
|
// TODO sign for longer than an hour?
|
||||||
// TODO Use a better thing than a String for the object path
|
// TODO Use a better thing than a String for the object path
|
||||||
let (writer, mut reader) = tokio::net::unix::pipe::pipe()?;
|
// NOTE to make it work on Windows we could try using std::io::pipe instead
|
||||||
|
let (writer, reader) = tokio::net::unix::pipe::pipe()?;
|
||||||
let uploader_task = tokio::spawn(async move {
|
let uploader_task = tokio::spawn(async move {
|
||||||
let action = bucket.create_multipart_upload(Some(&credential), &object);
|
let action = bucket.create_multipart_upload(Some(&credential), &object);
|
||||||
// TODO Question: If it is only signed for an hour and a snapshot takes longer than an hour, what happens?
|
// TODO Question: If it is only signed for an hour and a snapshot takes longer than an hour, what happens?
|
||||||
@@ -444,7 +446,7 @@ impl IndexScheduler {
|
|||||||
// remove it and get all the necessary data from the scheduler
|
// remove it and get all the necessary data from the scheduler
|
||||||
let index_scheduler = IndexScheduler::private_clone(self);
|
let index_scheduler = IndexScheduler::private_clone(self);
|
||||||
let builder_task = tokio::task::spawn(async move {
|
let builder_task = tokio::task::spawn(async move {
|
||||||
// let compressed_writer = GzipEncoder::with_quality(writer, Level::Fastest);
|
// let writer = GzipEncoder::with_quality(writer, Level::Fastest);
|
||||||
let mut tarball = tokio_tar::Builder::new(writer);
|
let mut tarball = tokio_tar::Builder::new(writer);
|
||||||
|
|
||||||
// 1. Snapshot the version file
|
// 1. Snapshot the version file
|
||||||
@@ -459,8 +461,10 @@ impl IndexScheduler {
|
|||||||
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler);
|
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler);
|
||||||
let mut tasks_env_file =
|
let mut tasks_env_file =
|
||||||
index_scheduler.env.try_clone_inner_file().map(File::from_std)?;
|
index_scheduler.env.try_clone_inner_file().map(File::from_std)?;
|
||||||
|
// NOTE That made the trick !!! Should I memory map instead?
|
||||||
|
tasks_env_file.seek(SeekFrom::Start(0)).await?;
|
||||||
let path = Path::new("tasks").join("data.mdb");
|
let path = Path::new("tasks").join("data.mdb");
|
||||||
// NOTE when commenting this line, the tarballl works better
|
// NOTE when commenting this line, the tarball works better
|
||||||
tarball.append_file(path, &mut tasks_env_file).await?;
|
tarball.append_file(path, &mut tasks_env_file).await?;
|
||||||
drop(tasks_env_file);
|
drop(tasks_env_file);
|
||||||
|
|
||||||
@@ -473,6 +477,9 @@ impl IndexScheduler {
|
|||||||
let enqueued = index_scheduler.queue.tasks.get_status(&rtxn, Status::Enqueued)?;
|
let enqueued = index_scheduler.queue.tasks.get_status(&rtxn, Status::Enqueued)?;
|
||||||
let (atomic, update_file_progress) = AtomicUpdateFileStep::new(enqueued.len() as u32);
|
let (atomic, update_file_progress) = AtomicUpdateFileStep::new(enqueued.len() as u32);
|
||||||
progress.update_progress(update_file_progress);
|
progress.update_progress(update_file_progress);
|
||||||
|
// TODO I need to create the update files directory (even if empty)
|
||||||
|
// I should probably simply use the append_dir_all method
|
||||||
|
// but I'll loose the progression.
|
||||||
let update_files_dir = Path::new("update_files");
|
let update_files_dir = Path::new("update_files");
|
||||||
for task_id in enqueued {
|
for task_id in enqueued {
|
||||||
let task = index_scheduler
|
let task = index_scheduler
|
||||||
@@ -505,6 +512,7 @@ impl IndexScheduler {
|
|||||||
|
|
||||||
// Note that we need to collect and open all of the indexes files because
|
// Note that we need to collect and open all of the indexes files because
|
||||||
// otherwise, using a for loop, we would have to have a Send rtxn.
|
// otherwise, using a for loop, we would have to have a Send rtxn.
|
||||||
|
// TODO I don't need to do this trick if my writer is NOT async
|
||||||
for (i, (name, uuid)) in indexes_references.into_iter().enumerate() {
|
for (i, (name, uuid)) in indexes_references.into_iter().enumerate() {
|
||||||
progress.update_progress(VariableNameStep::<SnapshotCreationProgress>::new(
|
progress.update_progress(VariableNameStep::<SnapshotCreationProgress>::new(
|
||||||
&name, i as u32, nb_indexes,
|
&name, i as u32, nb_indexes,
|
||||||
@@ -512,6 +520,7 @@ impl IndexScheduler {
|
|||||||
let path = indexes_dir.join(uuid.to_string()).join("data.mdb");
|
let path = indexes_dir.join(uuid.to_string()).join("data.mdb");
|
||||||
let index = index_scheduler.index_mapper.index(&rtxn, &name)?;
|
let index = index_scheduler.index_mapper.index(&rtxn, &name)?;
|
||||||
let mut index_file = index.try_clone_inner_file().map(File::from_std).unwrap();
|
let mut index_file = index.try_clone_inner_file().map(File::from_std).unwrap();
|
||||||
|
index_file.seek(SeekFrom::Start(0)).await?;
|
||||||
eprintln!("Appending index file for {} in {}", name, path.display());
|
eprintln!("Appending index file for {} in {}", name, path.display());
|
||||||
tarball.append_file(path, &mut index_file).await?;
|
tarball.append_file(path, &mut index_file).await?;
|
||||||
}
|
}
|
||||||
@@ -526,10 +535,12 @@ impl IndexScheduler {
|
|||||||
.try_clone_inner_file()
|
.try_clone_inner_file()
|
||||||
.map(File::from_std)
|
.map(File::from_std)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
auth_env_file.seek(SeekFrom::Start(0)).await?;
|
||||||
let path = Path::new("auth").join("data.mdb");
|
let path = Path::new("auth").join("data.mdb");
|
||||||
tarball.append_file(path, &mut auth_env_file).await?;
|
tarball.append_file(path, &mut auth_env_file).await?;
|
||||||
|
|
||||||
tarball.into_inner().await?;
|
let mut inner = tarball.into_inner().await?;
|
||||||
|
inner.flush().await?;
|
||||||
|
|
||||||
Result::<_, Error>::Ok(())
|
Result::<_, Error>::Ok(())
|
||||||
});
|
});
|
||||||
|
|||||||
Reference in New Issue
Block a user