mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-11-22 04:36:32 +00:00
WIP Do more tests
This commit is contained in:
committed by
Clément Renault
parent
c1f7542dfa
commit
62a8133bcd
@@ -286,18 +286,14 @@ impl IndexScheduler {
|
||||
use async_compression::tokio::write::GzipEncoder;
|
||||
use async_compression::Level;
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use rusty_s3::actions::UploadPart;
|
||||
use tokio::fs::File;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
const ONE_HOUR: Duration = Duration::from_secs(3600);
|
||||
// default part size is 250MiB
|
||||
const MIN_PART_SIZE: usize = 250 * 1024 * 1024;
|
||||
// 10MiB
|
||||
const TEN_MIB: usize = 10 * 1024 * 1024;
|
||||
// The maximum number of parts that can be uploaded to a single multipart upload.
|
||||
const MAX_NUMBER_PARTS: usize = 10_000;
|
||||
// TODO use 375MiB
|
||||
const PART_SIZE: usize = 500 * 1024 * 1024;
|
||||
|
||||
// The maximum number of parts that can be uploaded in parallel.
|
||||
const S3_MAX_IN_FLIGHT_PARTS: &str = "MEILI_S3_MAX_IN_FLIGHT_PARTS";
|
||||
@@ -320,6 +316,8 @@ impl IndexScheduler {
|
||||
// TODO Use a better thing than a String for the object path
|
||||
let (writer, mut reader) = tokio::net::unix::pipe::pipe()?;
|
||||
let uploader_task = tokio::spawn(async move {
|
||||
use meilisearch_types::milli::update::new::StdResult;
|
||||
|
||||
let action = bucket.create_multipart_upload(Some(&credential), object);
|
||||
// TODO Question: If it is only signed for an hour and a snapshot takes longer than an hour, what happens?
|
||||
// If the part is deleted (like a TTL) we should sign it for at least 24 hours.
|
||||
@@ -331,7 +329,7 @@ impl IndexScheduler {
|
||||
let mut etags = Vec::<String>::new();
|
||||
|
||||
let mut in_flight = VecDeque::<(
|
||||
JoinHandle<std::result::Result<reqwest::Response, reqwest::Error>>,
|
||||
JoinHandle<StdResult<reqwest::Response, reqwest::Error>>,
|
||||
Bytes,
|
||||
)>::with_capacity(max_in_flight_parts);
|
||||
for part_number in 1u16.. {
|
||||
@@ -346,7 +344,7 @@ impl IndexScheduler {
|
||||
// Wait for a buffer to be ready if there are in-flight parts that landed
|
||||
let mut buffer = if in_flight.len() >= max_in_flight_parts {
|
||||
let (request, buffer) = in_flight.pop_front().unwrap();
|
||||
let mut buffer = buffer.try_into_mut().expect("to convert into a mut buffer");
|
||||
let mut buffer = buffer.try_into_mut().expect("Valid to convert into BytesMut");
|
||||
let resp = request.await.unwrap().unwrap().error_for_status().unwrap();
|
||||
let etag =
|
||||
resp.headers().get(ETAG).expect("every UploadPart request returns an Etag");
|
||||
@@ -356,20 +354,27 @@ impl IndexScheduler {
|
||||
buffer
|
||||
} else {
|
||||
// TODO Base this on the available memory
|
||||
BytesMut::with_capacity(MIN_PART_SIZE)
|
||||
BytesMut::with_capacity(PART_SIZE)
|
||||
};
|
||||
|
||||
while buffer.len() < (MIN_PART_SIZE / 2) {
|
||||
while buffer.len() < (PART_SIZE / 2) {
|
||||
if reader.read_buf(&mut buffer).await? == 0 {
|
||||
eprintln!(
|
||||
"breaking because read returned 0. Buffer len {}; capacity {}",
|
||||
buffer.len(),
|
||||
buffer.capacity(),
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if buffer.is_empty() {
|
||||
eprintln!("buffer is empty, breaking part number loop");
|
||||
break;
|
||||
}
|
||||
|
||||
let body = buffer.freeze();
|
||||
eprintln!("Sending part {}", part_number);
|
||||
let task = tokio::spawn(client.put(url).body(body.clone()).send());
|
||||
in_flight.push_back((task, body));
|
||||
}
|
||||
@@ -407,9 +412,9 @@ impl IndexScheduler {
|
||||
// TODO not a big fan of this clone
|
||||
// remove it and get all the necessary data from the scheduler
|
||||
let index_scheduler = IndexScheduler::private_clone(self);
|
||||
let builder_task = tokio::task::spawn_local(async move {
|
||||
let compressed_writer = GzipEncoder::with_quality(writer, Level::Fastest);
|
||||
let mut tarball = tokio_tar::Builder::new(compressed_writer);
|
||||
let builder_task = tokio::task::spawn(async move {
|
||||
// let compressed_writer = GzipEncoder::with_quality(writer, Level::Fastest);
|
||||
let mut tarball = tokio_tar::Builder::new(writer);
|
||||
|
||||
// 1. Snapshot the version file
|
||||
tarball
|
||||
@@ -424,6 +429,7 @@ impl IndexScheduler {
|
||||
let mut tasks_env_file =
|
||||
index_scheduler.env.try_clone_inner_file().map(File::from_std)?;
|
||||
let path = Path::new("tasks").join("data.mdb");
|
||||
// NOTE when commenting this line, the tarballl works better
|
||||
tarball.append_file(path, &mut tasks_env_file).await?;
|
||||
drop(tasks_env_file);
|
||||
|
||||
@@ -457,14 +463,25 @@ impl IndexScheduler {
|
||||
let index_mapping = index_scheduler.index_mapper.index_mapping;
|
||||
let nb_indexes = index_mapping.len(&rtxn)? as u32;
|
||||
let indexes_dir = Path::new("indexes");
|
||||
for (i, result) in index_mapping.iter(&rtxn)?.enumerate() {
|
||||
let (name, uuid) = result?;
|
||||
let indexes_references: Vec<_> = index_scheduler
|
||||
.index_mapper
|
||||
.index_mapping
|
||||
.iter(&rtxn)?
|
||||
.map(|res| res.map_err(Error::from).map(|(name, uuid)| (name.to_string(), uuid)))
|
||||
.collect::<Result<_, Error>>()?;
|
||||
|
||||
dbg!(&indexes_references);
|
||||
|
||||
// Note that we need to collect and open all of the indexes files because
|
||||
// otherwise, using a for loop, we would have to have a Send rtxn.
|
||||
for (i, (name, uuid)) in indexes_references.into_iter().enumerate() {
|
||||
progress.update_progress(VariableNameStep::<SnapshotCreationProgress>::new(
|
||||
name, i as u32, nb_indexes,
|
||||
&name, i as u32, nb_indexes,
|
||||
));
|
||||
let index = index_scheduler.index_mapper.index(&rtxn, name)?;
|
||||
let path = indexes_dir.join(uuid.to_string());
|
||||
let path = indexes_dir.join(uuid.to_string()).join("data.mdb");
|
||||
let index = index_scheduler.index_mapper.index(&rtxn, &name)?;
|
||||
let mut index_file = index.try_clone_inner_file().map(File::from_std).unwrap();
|
||||
eprintln!("Appending index file for {} in {}", name, path.display());
|
||||
tarball.append_file(path, &mut index_file).await?;
|
||||
}
|
||||
|
||||
@@ -488,8 +505,8 @@ impl IndexScheduler {
|
||||
|
||||
let (uploader_result, builder_result) = tokio::join!(uploader_task, builder_task);
|
||||
|
||||
uploader_result.unwrap()?;
|
||||
builder_result.unwrap()?;
|
||||
uploader_result.unwrap()?;
|
||||
|
||||
for task in &mut tasks {
|
||||
task.status = Status::Succeeded;
|
||||
|
||||
Reference in New Issue
Block a user