WIP Do more tests

This commit is contained in:
Kerollmops
2025-10-15 19:26:22 +02:00
parent 63f247cdda
commit 4c30f090c7

View File

@@ -286,18 +286,14 @@ impl IndexScheduler {
use async_compression::tokio::write::GzipEncoder;
use async_compression::Level;
use bytes::{Bytes, BytesMut};
use rusty_s3::actions::UploadPart;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio::task::JoinHandle;
const ONE_HOUR: Duration = Duration::from_secs(3600);
// default part size is 250MiB
const MIN_PART_SIZE: usize = 250 * 1024 * 1024;
// 10MiB
const TEN_MIB: usize = 10 * 1024 * 1024;
// The maximum number of parts that can be uploaded to a single multipart upload.
const MAX_NUMBER_PARTS: usize = 10_000;
// TODO use 375MiB
const PART_SIZE: usize = 500 * 1024 * 1024;
// The maximum number of parts that can be uploaded in parallel.
const S3_MAX_IN_FLIGHT_PARTS: &str = "MEILI_S3_MAX_IN_FLIGHT_PARTS";
@@ -320,6 +316,8 @@ impl IndexScheduler {
// TODO Use a better thing than a String for the object path
let (writer, mut reader) = tokio::net::unix::pipe::pipe()?;
let uploader_task = tokio::spawn(async move {
use meilisearch_types::milli::update::new::StdResult;
let action = bucket.create_multipart_upload(Some(&credential), object);
// TODO Question: If it is only signed for an hour and a snapshot takes longer than an hour, what happens?
// If the part is deleted (like a TTL) we should sign it for at least 24 hours.
@@ -331,7 +329,7 @@ impl IndexScheduler {
let mut etags = Vec::<String>::new();
let mut in_flight = VecDeque::<(
JoinHandle<std::result::Result<reqwest::Response, reqwest::Error>>,
JoinHandle<StdResult<reqwest::Response, reqwest::Error>>,
Bytes,
)>::with_capacity(max_in_flight_parts);
for part_number in 1u16.. {
@@ -346,7 +344,7 @@ impl IndexScheduler {
// Wait for a buffer to be ready if there are in-flight parts that landed
let mut buffer = if in_flight.len() >= max_in_flight_parts {
let (request, buffer) = in_flight.pop_front().unwrap();
let mut buffer = buffer.try_into_mut().expect("to convert into a mut buffer");
let mut buffer = buffer.try_into_mut().expect("Valid to convert into BytesMut");
let resp = request.await.unwrap().unwrap().error_for_status().unwrap();
let etag =
resp.headers().get(ETAG).expect("every UploadPart request returns an Etag");
@@ -356,20 +354,27 @@ impl IndexScheduler {
buffer
} else {
// TODO Base this on the available memory
BytesMut::with_capacity(MIN_PART_SIZE)
BytesMut::with_capacity(PART_SIZE)
};
while buffer.len() < (MIN_PART_SIZE / 2) {
while buffer.len() < (PART_SIZE / 2) {
if reader.read_buf(&mut buffer).await? == 0 {
eprintln!(
"breaking because read returned 0. Buffer len {}; capacity {}",
buffer.len(),
buffer.capacity(),
);
break;
}
}
if buffer.is_empty() {
eprintln!("buffer is empty, breaking part number loop");
break;
}
let body = buffer.freeze();
eprintln!("Sending part {}", part_number);
let task = tokio::spawn(client.put(url).body(body.clone()).send());
in_flight.push_back((task, body));
}
@@ -407,9 +412,9 @@ impl IndexScheduler {
// TODO not a big fan of this clone
// remove it and get all the necessary data from the scheduler
let index_scheduler = IndexScheduler::private_clone(self);
let builder_task = tokio::task::spawn_local(async move {
let compressed_writer = GzipEncoder::with_quality(writer, Level::Fastest);
let mut tarball = tokio_tar::Builder::new(compressed_writer);
let builder_task = tokio::task::spawn(async move {
// let compressed_writer = GzipEncoder::with_quality(writer, Level::Fastest);
let mut tarball = tokio_tar::Builder::new(writer);
// 1. Snapshot the version file
tarball
@@ -424,6 +429,7 @@ impl IndexScheduler {
let mut tasks_env_file =
index_scheduler.env.try_clone_inner_file().map(File::from_std)?;
let path = Path::new("tasks").join("data.mdb");
// NOTE when commenting this line, the tarballl works better
tarball.append_file(path, &mut tasks_env_file).await?;
drop(tasks_env_file);
@@ -457,14 +463,25 @@ impl IndexScheduler {
let index_mapping = index_scheduler.index_mapper.index_mapping;
let nb_indexes = index_mapping.len(&rtxn)? as u32;
let indexes_dir = Path::new("indexes");
for (i, result) in index_mapping.iter(&rtxn)?.enumerate() {
let (name, uuid) = result?;
let indexes_references: Vec<_> = index_scheduler
.index_mapper
.index_mapping
.iter(&rtxn)?
.map(|res| res.map_err(Error::from).map(|(name, uuid)| (name.to_string(), uuid)))
.collect::<Result<_, Error>>()?;
dbg!(&indexes_references);
// Note that we need to collect and open all of the indexes files because
// otherwise, using a for loop, we would have to have a Send rtxn.
for (i, (name, uuid)) in indexes_references.into_iter().enumerate() {
progress.update_progress(VariableNameStep::<SnapshotCreationProgress>::new(
name, i as u32, nb_indexes,
&name, i as u32, nb_indexes,
));
let index = index_scheduler.index_mapper.index(&rtxn, name)?;
let path = indexes_dir.join(uuid.to_string());
let path = indexes_dir.join(uuid.to_string()).join("data.mdb");
let index = index_scheduler.index_mapper.index(&rtxn, &name)?;
let mut index_file = index.try_clone_inner_file().map(File::from_std).unwrap();
eprintln!("Appending index file for {} in {}", name, path.display());
tarball.append_file(path, &mut index_file).await?;
}
@@ -488,8 +505,8 @@ impl IndexScheduler {
let (uploader_result, builder_result) = tokio::join!(uploader_task, builder_task);
uploader_result.unwrap()?;
builder_result.unwrap()?;
uploader_result.unwrap()?;
for task in &mut tasks {
task.status = Status::Succeeded;