Use the Bytes crate to send the parts

This commit is contained in:
Clément Renault
2025-10-09 17:41:16 +02:00
parent 3b75394357
commit 05e9a7eefe
3 changed files with 42 additions and 37 deletions

1
Cargo.lock generated
View File

@@ -3242,6 +3242,7 @@ dependencies = [
"bumpalo", "bumpalo",
"bumparaw-collections", "bumparaw-collections",
"byte-unit", "byte-unit",
"bytes",
"convert_case 0.8.0", "convert_case 0.8.0",
"crossbeam-channel", "crossbeam-channel",
"csv", "csv",

View File

@@ -14,6 +14,7 @@ license.workspace = true
anyhow = "1.0.98" anyhow = "1.0.98"
bincode = "1.3.3" bincode = "1.3.3"
byte-unit = "5.1.6" byte-unit = "5.1.6"
bytes = "1.10.1"
bumpalo = "3.18.1" bumpalo = "3.18.1"
bumparaw-collections = "0.1.4" bumparaw-collections = "0.1.4"
convert_case = "0.8.0" convert_case = "0.8.0"

View File

@@ -272,6 +272,12 @@ impl IndexScheduler {
mut tasks: Vec<Task>, mut tasks: Vec<Task>,
) -> Result<Vec<Task>> { ) -> Result<Vec<Task>> {
const ONE_HOUR: Duration = Duration::from_secs(3600); const ONE_HOUR: Duration = Duration::from_secs(3600);
// default part size is 250MiB
const MIN_PART_SIZE: usize = 250 * 1024 * 1024;
// 10MiB
const TEN_MIB: usize = 10 * 1024 * 1024;
// The maximum number of parts that can be uploaded to a single multipart upload.
const MAX_NUMBER_PARTS: usize = 10_000;
let client = Client::new(); let client = Client::new();
// TODO Remove this unwrap // TODO Remove this unwrap
@@ -302,6 +308,7 @@ impl IndexScheduler {
.try_clone_inner_file() .try_clone_inner_file()
.map_err(|e| Error::from_milli(e, Some(name.to_string())))?; .map_err(|e| Error::from_milli(e, Some(name.to_string())))?;
let mmap = unsafe { memmap2::Mmap::map(&file)? }; let mmap = unsafe { memmap2::Mmap::map(&file)? };
let mmap = bytes::Bytes::from_owner(mmap);
let object = uuid.to_string(); let object = uuid.to_string();
let action = bucket.create_multipart_upload(Some(&credential), &object); let action = bucket.create_multipart_upload(Some(&credential), &object);
@@ -312,15 +319,11 @@ impl IndexScheduler {
let multipart = CreateMultipartUpload::parse_response(&body).unwrap(); let multipart = CreateMultipartUpload::parse_response(&body).unwrap();
let mut etags = Vec::<String>::new(); let mut etags = Vec::<String>::new();
// chunks of 250MiB let part_size = mmap.len() / MAX_NUMBER_PARTS;
let (chunks, remaining_chunk) = mmap.as_chunks::<{ 250 * 1024 * 1024 }>(); let part_size = if part_size < TEN_MIB { MIN_PART_SIZE } else { part_size };
for (i, chunk) in chunks let number_of_parts = mmap.len().div_ceil(part_size);
.iter() for i in 0..number_of_parts {
.map(|chunk| chunk.as_slice())
.chain(std::iter::once(remaining_chunk))
.enumerate()
{
let part_number = u16::try_from(i).unwrap().checked_add(1).unwrap(); let part_number = u16::try_from(i).unwrap().checked_add(1).unwrap();
let part_upload = bucket.upload_part( let part_upload = bucket.upload_part(
Some(&credential), Some(&credential),
@@ -330,40 +333,40 @@ impl IndexScheduler {
); );
let url = part_upload.sign(ONE_HOUR); let url = part_upload.sign(ONE_HOUR);
let resp = client // Make sure we do not read out of bound
.put(url) let body = if mmap.len() < part_size * (i + 1) {
// TODO Change this for Bytes::from_owned + Bytes::slice mmap.slice(part_size * i..)
.body(chunk.to_vec()) } else {
.send() mmap.slice(part_size * i..part_size * (i + 1))
.await };
.unwrap()
.error_for_status() let resp =
.unwrap(); client.put(url).body(body).send().await.unwrap().error_for_status().unwrap();
let etag = let etag =
resp.headers().get(ETAG).expect("every UploadPart request returns an Etag"); resp.headers().get(ETAG).expect("every UploadPart request returns an Etag");
// TODO use bumpalo to reduce the number of allocations // TODO use bumpalo to reduce the number of allocations
etags.push(etag.to_str().unwrap().to_owned()); etags.push(etag.to_str().unwrap().to_owned());
let action = bucket.complete_multipart_upload(
Some(&credential),
&object,
multipart.upload_id(),
etags.iter().map(AsRef::as_ref),
);
let url = action.sign(ONE_HOUR);
let resp = client
.post(url)
.body(action.body())
.send()
.await
.unwrap()
.error_for_status()
.unwrap();
let body = resp.text().await.unwrap();
// TODO remove this
println!("it worked! {body}");
} }
let action = bucket.complete_multipart_upload(
Some(&credential),
&object,
multipart.upload_id(),
etags.iter().map(AsRef::as_ref),
);
let url = action.sign(ONE_HOUR);
let resp = client
.post(url)
.body(action.body())
.send()
.await
.unwrap()
.error_for_status()
.unwrap();
let body = resp.text().await.unwrap();
// TODO remove this
println!("it worked! {body}");
} }
for task in &mut tasks { for task in &mut tasks {