From 3b75394357f265394595967921ffc74ca3ab1f65 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Thu, 9 Oct 2025 16:44:59 +0200 Subject: [PATCH] Upload indexes under their uuids --- .../scheduler/process_snapshot_creation.rs | 100 +++++++++++++----- crates/milli/src/index.rs | 4 + 2 files changed, 78 insertions(+), 26 deletions(-) diff --git a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs index 8e6f6954e..f3d8f43e3 100644 --- a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs +++ b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs @@ -280,14 +280,7 @@ impl IndexScheduler { let bucket = Bucket::new(url, UrlStyle::Path, bucket_name, bucket_region).unwrap(); let credential = Credentials::new(access_key, secret_key); - let object = "test.txt"; - let action = bucket.create_multipart_upload(Some(&credential), object); - let url = action.sign(ONE_HOUR); - let resp = client.post(url).send().await.unwrap().error_for_status().unwrap(); - let body = resp.text().await.unwrap(); - - let multipart = CreateMultipartUpload::parse_response(&body).unwrap(); - let mut etags = Vec::::new(); + let rtxn = self.read_txn()?; // Every part must be between 5 MB and 5 GB in size, except for the last part // A maximum of 10,000 parts can be uploaded to a single multipart upload. @@ -296,27 +289,82 @@ impl IndexScheduler { // A part number uniquely identifies a part and also defines its position within // the object being created. If you upload a new part using the same part number // that was used with a previous part, the previously uploaded part is overwritten. - let part_upload = bucket.upload_part(Some(&credential), object, 1, multipart.upload_id()); - let url = part_upload.sign(ONE_HOUR); + progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexes); + let index_mapping = self.index_mapper.index_mapping; + let nb_indexes = index_mapping.len(&rtxn)? as u32; + for (i, result) in index_mapping.iter(&rtxn)?.enumerate() { + let (name, uuid) = result?; + progress.update_progress(VariableNameStep::::new( + name, i as u32, nb_indexes, + )); + let index = self.index_mapper.index(&rtxn, name)?; + let file = index + .try_clone_inner_file() + .map_err(|e| Error::from_milli(e, Some(name.to_string())))?; + let mmap = unsafe { memmap2::Mmap::map(&file)? }; - let body = "123456789"; - let resp = client.put(url).body(body).send().await.unwrap().error_for_status().unwrap(); - let etag = resp.headers().get(ETAG).expect("every UploadPart request returns an Etag"); - // TODO use bumpalo to reduce the number of allocations - etags.push(etag.to_str().unwrap().to_owned()); + let object = uuid.to_string(); + let action = bucket.create_multipart_upload(Some(&credential), &object); + let url = action.sign(ONE_HOUR); + let resp = client.post(url).send().await.unwrap().error_for_status().unwrap(); + let body = resp.text().await.unwrap(); - let action = bucket.complete_multipart_upload( - Some(&credential), - object, - multipart.upload_id(), - etags.iter().map(AsRef::as_ref), - ); - let url = action.sign(ONE_HOUR); + let multipart = CreateMultipartUpload::parse_response(&body).unwrap(); + let mut etags = Vec::::new(); - let resp = - client.post(url).body(action.body()).send().await.unwrap().error_for_status().unwrap(); - let body = resp.text().await.unwrap(); - println!("it worked! {body}"); + // chunks of 250MiB + let (chunks, remaining_chunk) = mmap.as_chunks::<{ 250 * 1024 * 1024 }>(); + + for (i, chunk) in chunks + .iter() + .map(|chunk| chunk.as_slice()) + .chain(std::iter::once(remaining_chunk)) + .enumerate() + { + let part_number = u16::try_from(i).unwrap().checked_add(1).unwrap(); + let part_upload = bucket.upload_part( + Some(&credential), + &object, + part_number, + multipart.upload_id(), + ); + let url = part_upload.sign(ONE_HOUR); + + let resp = client + .put(url) + // TODO Change this for Bytes::from_owned + Bytes::slice + .body(chunk.to_vec()) + .send() + .await + .unwrap() + .error_for_status() + .unwrap(); + let etag = + resp.headers().get(ETAG).expect("every UploadPart request returns an Etag"); + // TODO use bumpalo to reduce the number of allocations + etags.push(etag.to_str().unwrap().to_owned()); + + let action = bucket.complete_multipart_upload( + Some(&credential), + &object, + multipart.upload_id(), + etags.iter().map(AsRef::as_ref), + ); + let url = action.sign(ONE_HOUR); + + let resp = client + .post(url) + .body(action.body()) + .send() + .await + .unwrap() + .error_for_status() + .unwrap(); + let body = resp.text().await.unwrap(); + // TODO remove this + println!("it worked! {body}"); + } + } for task in &mut tasks { task.status = Status::Succeeded; diff --git a/crates/milli/src/index.rs b/crates/milli/src/index.rs index d5f5a45dc..22c2cdd99 100644 --- a/crates/milli/src/index.rs +++ b/crates/milli/src/index.rs @@ -425,6 +425,10 @@ impl Index { self.env.info().map_size } + pub fn try_clone_inner_file(&self) -> Result { + Ok(self.env.try_clone_inner_file()?) + } + pub fn copy_to_file(&self, file: &mut File, option: CompactionOption) -> Result<()> { self.env.copy_to_file(file, option).map_err(Into::into) }