diff --git a/Cargo.lock b/Cargo.lock index 6450b7f84..a5205e8ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3242,6 +3242,7 @@ dependencies = [ "bumpalo", "bumparaw-collections", "byte-unit", + "bytes", "convert_case 0.8.0", "crossbeam-channel", "csv", diff --git a/crates/index-scheduler/Cargo.toml b/crates/index-scheduler/Cargo.toml index 9d01df3e5..cdfdbbb8d 100644 --- a/crates/index-scheduler/Cargo.toml +++ b/crates/index-scheduler/Cargo.toml @@ -14,6 +14,7 @@ license.workspace = true anyhow = "1.0.98" bincode = "1.3.3" byte-unit = "5.1.6" +bytes = "1.10.1" bumpalo = "3.18.1" bumparaw-collections = "0.1.4" convert_case = "0.8.0" diff --git a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs index f3d8f43e3..d0b5b1623 100644 --- a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs +++ b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs @@ -272,6 +272,12 @@ impl IndexScheduler { mut tasks: Vec, ) -> Result> { const ONE_HOUR: Duration = Duration::from_secs(3600); + // default part size is 250MiB + const MIN_PART_SIZE: usize = 250 * 1024 * 1024; + // 10MiB + const TEN_MIB: usize = 10 * 1024 * 1024; + // The maximum number of parts that can be uploaded to a single multipart upload. + const MAX_NUMBER_PARTS: usize = 10_000; let client = Client::new(); // TODO Remove this unwrap @@ -302,6 +308,7 @@ impl IndexScheduler { .try_clone_inner_file() .map_err(|e| Error::from_milli(e, Some(name.to_string())))?; let mmap = unsafe { memmap2::Mmap::map(&file)? }; + let mmap = bytes::Bytes::from_owner(mmap); let object = uuid.to_string(); let action = bucket.create_multipart_upload(Some(&credential), &object); @@ -312,15 +319,11 @@ impl IndexScheduler { let multipart = CreateMultipartUpload::parse_response(&body).unwrap(); let mut etags = Vec::::new(); - // chunks of 250MiB - let (chunks, remaining_chunk) = mmap.as_chunks::<{ 250 * 1024 * 1024 }>(); + let part_size = mmap.len() / MAX_NUMBER_PARTS; + let part_size = if part_size < TEN_MIB { MIN_PART_SIZE } else { part_size }; - for (i, chunk) in chunks - .iter() - .map(|chunk| chunk.as_slice()) - .chain(std::iter::once(remaining_chunk)) - .enumerate() - { + let number_of_parts = mmap.len().div_ceil(part_size); + for i in 0..number_of_parts { let part_number = u16::try_from(i).unwrap().checked_add(1).unwrap(); let part_upload = bucket.upload_part( Some(&credential), @@ -330,40 +333,40 @@ impl IndexScheduler { ); let url = part_upload.sign(ONE_HOUR); - let resp = client - .put(url) - // TODO Change this for Bytes::from_owned + Bytes::slice - .body(chunk.to_vec()) - .send() - .await - .unwrap() - .error_for_status() - .unwrap(); + // Make sure we do not read out of bound + let body = if mmap.len() < part_size * (i + 1) { + mmap.slice(part_size * i..) + } else { + mmap.slice(part_size * i..part_size * (i + 1)) + }; + + let resp = + client.put(url).body(body).send().await.unwrap().error_for_status().unwrap(); let etag = resp.headers().get(ETAG).expect("every UploadPart request returns an Etag"); // TODO use bumpalo to reduce the number of allocations etags.push(etag.to_str().unwrap().to_owned()); - - let action = bucket.complete_multipart_upload( - Some(&credential), - &object, - multipart.upload_id(), - etags.iter().map(AsRef::as_ref), - ); - let url = action.sign(ONE_HOUR); - - let resp = client - .post(url) - .body(action.body()) - .send() - .await - .unwrap() - .error_for_status() - .unwrap(); - let body = resp.text().await.unwrap(); - // TODO remove this - println!("it worked! {body}"); } + + let action = bucket.complete_multipart_upload( + Some(&credential), + &object, + multipart.upload_id(), + etags.iter().map(AsRef::as_ref), + ); + let url = action.sign(ONE_HOUR); + let resp = client + .post(url) + .body(action.body()) + .send() + .await + .unwrap() + .error_for_status() + .unwrap(); + + let body = resp.text().await.unwrap(); + // TODO remove this + println!("it worked! {body}"); } for task in &mut tasks {