move to our new S3 lib

This commit is contained in:
Tamo
2023-09-28 11:18:56 +02:00
parent 6325cda74f
commit 98b67f217a
9 changed files with 116 additions and 212 deletions

View File

@ -34,7 +34,7 @@ uuid = { version = "1.3.1", features = ["serde", "v4"] }
tokio = { version = "1.27.0", features = ["full"] }
zookeeper = "0.8.0"
parking_lot = "0.12.1"
rust-s3 = { version = "0.33.0", default-features = false, features = ["sync-rustls-tls"] }
strois = { git = "http://github.com/meilisearch/strois", branch = "main" }
[dev-dependencies]
big_s = "1.0.2"

View File

@ -46,6 +46,7 @@ use dump::{KindDump, TaskDump, UpdateFile};
pub use error::Error;
pub use features::RoFeatures;
use file_store::FileStore;
use strois::Bucket;
use meilisearch_types::error::ResponseError;
use meilisearch_types::features::{InstanceTogglableFeatures, RuntimeTogglableFeatures};
use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str};
@ -56,7 +57,6 @@ use meilisearch_types::milli::{self, CboRoaringBitmapCodec, Index, RoaringBitmap
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard};
use roaring::RoaringBitmap;
use s3::Bucket;
use synchronoise::SignalEvent;
use tempfile::TempDir;
use time::format_description::well_known::Rfc3339;
@ -74,6 +74,9 @@ use crate::utils::{check_index_swap_validity, clamp_to_page_size};
pub(crate) type BEI128 =
meilisearch_types::heed::zerocopy::I128<meilisearch_types::heed::byteorder::BE>;
/// This is the size of the buffer we use to send each part of a multipart request to S3.
pub const S3_PART_SIZE: usize = 50 * 1024 * 1024; // 50MiB
/// Defines a subset of tasks to be retrieved from the [`IndexScheduler`].
///
/// An empty/default query (where each field is set to `None`) matches all tasks.
@ -467,14 +470,7 @@ impl IndexScheduler {
.unwrap();
let s3 = inner.options.s3.as_ref().unwrap();
let task =
s3.get_object(format!("/tasks/{id:0>10}")).unwrap();
assert_eq!(
task.status_code(),
200,
"could not reach the s3: {:?}",
task.as_str()
);
let task = serde_json::from_slice(task.as_slice()).unwrap();
s3.get_object_json(format!("tasks/{id:0>10}")).unwrap();
inner.register_raw_task(&mut wtxn, &task).unwrap();
// we received a new tasks, we must wake up
self.wake_up.signal();
@ -511,14 +507,8 @@ impl IndexScheduler {
.map(|(_, id)| id.parse::<u32>().unwrap())
.unwrap();
let s3 = inner.options.s3.as_ref().unwrap();
let task = s3.get_object(format!("tasks/{id:0>10}")).unwrap();
assert_eq!(
task.status_code(),
200,
"could not reach the s3: {:?}",
task.as_str()
);
let task = serde_json::from_slice(task.as_slice()).unwrap();
let task =
s3.get_object_json(format!("tasks/{id:0>10}")).unwrap();
inner.register_raw_task(&mut wtxn, &task).unwrap();
wtxn.commit().unwrap();
}
@ -589,11 +579,7 @@ impl IndexScheduler {
);
let mut version_file_path =
File::open(&inner.version_file_path).unwrap();
s3.put_object_stream(&mut version_file_path, dst)
.or_else(|e| match e {
s3::error::S3Error::Http(404, _) => Ok(200),
e => Err(e),
})
s3.put_object_multipart(dst, &mut version_file_path, S3_PART_SIZE)
.unwrap();
version_file_path.sync_data().unwrap();
drop(version_file_path);
@ -610,14 +596,11 @@ impl IndexScheduler {
heed::CompactionOption::Enabled,
)
.unwrap();
s3.put_object_stream(
&mut file,
s3.put_object_multipart(
format!("{snapshot_dir}/tasks.mdb"),
&mut file,
S3_PART_SIZE,
)
.or_else(|e| match e {
s3::error::S3Error::Http(404, _) => Ok(200),
e => Err(e),
})
.unwrap();
temp.close().unwrap();
@ -637,12 +620,12 @@ impl IndexScheduler {
heed::CompactionOption::Enabled,
)
.unwrap();
s3.put_object_stream(&mut file, format!("{dst}/{uuid}.mdb"))
.or_else(|e| match e {
s3::error::S3Error::Http(404, _) => Ok(200),
e => Err(e),
})
.unwrap();
s3.put_object_multipart(
format!("{dst}/{uuid}.mdb"),
&mut file,
S3_PART_SIZE,
)
.unwrap();
temp.close().unwrap();
}
@ -925,26 +908,23 @@ fn load_snapshot(this: &IndexScheduler, path: &str) -> anyhow::Result<()> {
log::info!("Downloading the index scheduler database.");
let tasks_snapshot = format!("{path}/tasks.mdb");
let status = s3.get_object_to_writer(tasks_snapshot, &mut tasks_file)?;
assert!(matches!(status, 200 | 202));
s3.get_object_to_writer(tasks_snapshot, &mut tasks_file)?;
log::info!("Downloading the indexes databases");
let indexes_files = tempfile::TempDir::new_in(&base_path)?;
let src = format!("{path}/indexes");
let uuids = s3.list(src.clone(), None)?.into_iter().flat_map(|lbr| {
lbr.contents.into_iter().map(|o| {
let (_, name) = o.key.rsplit_once('/').unwrap();
name.to_string()
})
let uuids = s3.list_objects(&src)?.into_iter().map(|lbr| {
let key = lbr.unwrap().key;
let (_, name) = key.rsplit_once('/').unwrap();
name.to_string()
});
for uuid in uuids {
log::info!("\tDownloading the index {}", uuid);
std::fs::create_dir_all(indexes_files.path().join(&uuid).with_extension(""))?;
let path = indexes_files.path().join(&uuid).with_extension("").join("data.mdb");
let mut file = File::create(path)?;
let status = s3.get_object_to_writer(format!("{src}/{uuid}"), &mut file)?;
assert!(matches!(status, 200 | 202));
s3.get_object_to_writer(format!("{src}/{uuid}"), &mut file)?;
}
// 3. Lock the index-mapper and close all the env
@ -1273,7 +1253,7 @@ impl IndexSchedulerInner {
for uuid in batch.content_uuids() {
// TODO use a real UUIDv4
let (_, file) = self.file_store.new_update_with_uuid(uuid.as_u128())?;
s3.get_object_to_writer(&format!("/update-files/{}", uuid), &mut &*file).unwrap();
s3.get_object_to_writer(&format!("update-files/{}", uuid), &mut &*file).unwrap();
file.persist()?;
}
}