mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-10-10 13:46:28 +00:00
Upload indexes under their uuids
This commit is contained in:
@@ -280,14 +280,7 @@ impl IndexScheduler {
|
|||||||
let bucket = Bucket::new(url, UrlStyle::Path, bucket_name, bucket_region).unwrap();
|
let bucket = Bucket::new(url, UrlStyle::Path, bucket_name, bucket_region).unwrap();
|
||||||
let credential = Credentials::new(access_key, secret_key);
|
let credential = Credentials::new(access_key, secret_key);
|
||||||
|
|
||||||
let object = "test.txt";
|
let rtxn = self.read_txn()?;
|
||||||
let action = bucket.create_multipart_upload(Some(&credential), object);
|
|
||||||
let url = action.sign(ONE_HOUR);
|
|
||||||
let resp = client.post(url).send().await.unwrap().error_for_status().unwrap();
|
|
||||||
let body = resp.text().await.unwrap();
|
|
||||||
|
|
||||||
let multipart = CreateMultipartUpload::parse_response(&body).unwrap();
|
|
||||||
let mut etags = Vec::<String>::new();
|
|
||||||
|
|
||||||
// Every part must be between 5 MB and 5 GB in size, except for the last part
|
// Every part must be between 5 MB and 5 GB in size, except for the last part
|
||||||
// A maximum of 10,000 parts can be uploaded to a single multipart upload.
|
// A maximum of 10,000 parts can be uploaded to a single multipart upload.
|
||||||
@@ -296,27 +289,82 @@ impl IndexScheduler {
|
|||||||
// A part number uniquely identifies a part and also defines its position within
|
// A part number uniquely identifies a part and also defines its position within
|
||||||
// the object being created. If you upload a new part using the same part number
|
// the object being created. If you upload a new part using the same part number
|
||||||
// that was used with a previous part, the previously uploaded part is overwritten.
|
// that was used with a previous part, the previously uploaded part is overwritten.
|
||||||
let part_upload = bucket.upload_part(Some(&credential), object, 1, multipart.upload_id());
|
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexes);
|
||||||
|
let index_mapping = self.index_mapper.index_mapping;
|
||||||
|
let nb_indexes = index_mapping.len(&rtxn)? as u32;
|
||||||
|
for (i, result) in index_mapping.iter(&rtxn)?.enumerate() {
|
||||||
|
let (name, uuid) = result?;
|
||||||
|
progress.update_progress(VariableNameStep::<SnapshotCreationProgress>::new(
|
||||||
|
name, i as u32, nb_indexes,
|
||||||
|
));
|
||||||
|
let index = self.index_mapper.index(&rtxn, name)?;
|
||||||
|
let file = index
|
||||||
|
.try_clone_inner_file()
|
||||||
|
.map_err(|e| Error::from_milli(e, Some(name.to_string())))?;
|
||||||
|
let mmap = unsafe { memmap2::Mmap::map(&file)? };
|
||||||
|
|
||||||
|
let object = uuid.to_string();
|
||||||
|
let action = bucket.create_multipart_upload(Some(&credential), &object);
|
||||||
|
let url = action.sign(ONE_HOUR);
|
||||||
|
let resp = client.post(url).send().await.unwrap().error_for_status().unwrap();
|
||||||
|
let body = resp.text().await.unwrap();
|
||||||
|
|
||||||
|
let multipart = CreateMultipartUpload::parse_response(&body).unwrap();
|
||||||
|
let mut etags = Vec::<String>::new();
|
||||||
|
|
||||||
|
// chunks of 250MiB
|
||||||
|
let (chunks, remaining_chunk) = mmap.as_chunks::<{ 250 * 1024 * 1024 }>();
|
||||||
|
|
||||||
|
for (i, chunk) in chunks
|
||||||
|
.iter()
|
||||||
|
.map(|chunk| chunk.as_slice())
|
||||||
|
.chain(std::iter::once(remaining_chunk))
|
||||||
|
.enumerate()
|
||||||
|
{
|
||||||
|
let part_number = u16::try_from(i).unwrap().checked_add(1).unwrap();
|
||||||
|
let part_upload = bucket.upload_part(
|
||||||
|
Some(&credential),
|
||||||
|
&object,
|
||||||
|
part_number,
|
||||||
|
multipart.upload_id(),
|
||||||
|
);
|
||||||
let url = part_upload.sign(ONE_HOUR);
|
let url = part_upload.sign(ONE_HOUR);
|
||||||
|
|
||||||
let body = "123456789";
|
let resp = client
|
||||||
let resp = client.put(url).body(body).send().await.unwrap().error_for_status().unwrap();
|
.put(url)
|
||||||
let etag = resp.headers().get(ETAG).expect("every UploadPart request returns an Etag");
|
// TODO Change this for Bytes::from_owned + Bytes::slice
|
||||||
|
.body(chunk.to_vec())
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.error_for_status()
|
||||||
|
.unwrap();
|
||||||
|
let etag =
|
||||||
|
resp.headers().get(ETAG).expect("every UploadPart request returns an Etag");
|
||||||
// TODO use bumpalo to reduce the number of allocations
|
// TODO use bumpalo to reduce the number of allocations
|
||||||
etags.push(etag.to_str().unwrap().to_owned());
|
etags.push(etag.to_str().unwrap().to_owned());
|
||||||
|
|
||||||
let action = bucket.complete_multipart_upload(
|
let action = bucket.complete_multipart_upload(
|
||||||
Some(&credential),
|
Some(&credential),
|
||||||
object,
|
&object,
|
||||||
multipart.upload_id(),
|
multipart.upload_id(),
|
||||||
etags.iter().map(AsRef::as_ref),
|
etags.iter().map(AsRef::as_ref),
|
||||||
);
|
);
|
||||||
let url = action.sign(ONE_HOUR);
|
let url = action.sign(ONE_HOUR);
|
||||||
|
|
||||||
let resp =
|
let resp = client
|
||||||
client.post(url).body(action.body()).send().await.unwrap().error_for_status().unwrap();
|
.post(url)
|
||||||
|
.body(action.body())
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.error_for_status()
|
||||||
|
.unwrap();
|
||||||
let body = resp.text().await.unwrap();
|
let body = resp.text().await.unwrap();
|
||||||
|
// TODO remove this
|
||||||
println!("it worked! {body}");
|
println!("it worked! {body}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for task in &mut tasks {
|
for task in &mut tasks {
|
||||||
task.status = Status::Succeeded;
|
task.status = Status::Succeeded;
|
||||||
|
@@ -425,6 +425,10 @@ impl Index {
|
|||||||
self.env.info().map_size
|
self.env.info().map_size
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn try_clone_inner_file(&self) -> Result<File> {
|
||||||
|
Ok(self.env.try_clone_inner_file()?)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn copy_to_file(&self, file: &mut File, option: CompactionOption) -> Result<()> {
|
pub fn copy_to_file(&self, file: &mut File, option: CompactionOption) -> Result<()> {
|
||||||
self.env.copy_to_file(file, option).map_err(Into::into)
|
self.env.copy_to_file(file, option).map_err(Into::into)
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user