mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-10-10 05:36:35 +00:00
Upload ten parts at a time
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
use std::collections::VecDeque;
|
||||
use std::env::VarError;
|
||||
use std::ffi::OsStr;
|
||||
use std::fs;
|
||||
@@ -278,6 +279,8 @@ impl IndexScheduler {
|
||||
const TEN_MIB: usize = 10 * 1024 * 1024;
|
||||
// The maximum number of parts that can be uploaded to a single multipart upload.
|
||||
const MAX_NUMBER_PARTS: usize = 10_000;
|
||||
// The maximum number of parts that can be uploaded in parallel.
|
||||
const MAX_IN_FLIGHT_PARTS: usize = 10;
|
||||
|
||||
let client = Client::new();
|
||||
// TODO Remove this unwrap
|
||||
@@ -322,6 +325,7 @@ impl IndexScheduler {
|
||||
let part_size = mmap.len() / MAX_NUMBER_PARTS;
|
||||
let part_size = if part_size < TEN_MIB { MIN_PART_SIZE } else { part_size };
|
||||
|
||||
let mut in_flight_parts = VecDeque::with_capacity(MAX_IN_FLIGHT_PARTS);
|
||||
let number_of_parts = mmap.len().div_ceil(part_size);
|
||||
for i in 0..number_of_parts {
|
||||
let part_number = u16::try_from(i).unwrap().checked_add(1).unwrap();
|
||||
@@ -340,8 +344,27 @@ impl IndexScheduler {
|
||||
mmap.slice(part_size * i..part_size * (i + 1))
|
||||
};
|
||||
|
||||
let resp =
|
||||
client.put(url).body(body).send().await.unwrap().error_for_status().unwrap();
|
||||
let task = tokio::spawn(client.put(url).body(body).send());
|
||||
in_flight_parts.push_back(task);
|
||||
|
||||
if in_flight_parts.len() == MAX_IN_FLIGHT_PARTS {
|
||||
let resp = in_flight_parts
|
||||
.pop_front()
|
||||
.unwrap()
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.error_for_status()
|
||||
.unwrap();
|
||||
let etag =
|
||||
resp.headers().get(ETAG).expect("every UploadPart request returns an Etag");
|
||||
// TODO use bumpalo to reduce the number of allocations
|
||||
etags.push(etag.to_str().unwrap().to_owned());
|
||||
}
|
||||
}
|
||||
|
||||
for join_handle in in_flight_parts {
|
||||
let resp = join_handle.await.unwrap().unwrap().error_for_status().unwrap();
|
||||
let etag =
|
||||
resp.headers().get(ETAG).expect("every UploadPart request returns an Etag");
|
||||
// TODO use bumpalo to reduce the number of allocations
|
||||
|
Reference in New Issue
Block a user