Clean up the code

This commit is contained in:
Kerollmops
2025-11-04 14:27:58 +01:00
parent b64b083003
commit f6a0db9cb8
5 changed files with 158 additions and 84 deletions

View File

@@ -5,6 +5,7 @@ use meilisearch_types::error::{Code, ErrorCode};
use meilisearch_types::milli::index::RollbackOutcome; use meilisearch_types::milli::index::RollbackOutcome;
use meilisearch_types::tasks::{Kind, Status}; use meilisearch_types::tasks::{Kind, Status};
use meilisearch_types::{heed, milli}; use meilisearch_types::{heed, milli};
use reqwest::StatusCode;
use thiserror::Error; use thiserror::Error;
use crate::TaskId; use crate::TaskId;
@@ -127,6 +128,14 @@ pub enum Error {
#[error("Aborted task")] #[error("Aborted task")]
AbortedTask, AbortedTask,
#[error("S3 error: status: {status}, body: {body}")]
S3Error { status: StatusCode, body: String },
#[error("S3 HTTP error: {0}")]
S3HttpError(reqwest::Error),
#[error("S3 XML error: {0}")]
S3XmlError(Box<dyn std::error::Error + Send + Sync>),
#[error("S3 bucket error: {0}")]
S3BucketError(rusty_s3::BucketError),
#[error(transparent)] #[error(transparent)]
Dump(#[from] dump::Error), Dump(#[from] dump::Error),
#[error(transparent)] #[error(transparent)]
@@ -226,6 +235,10 @@ impl Error {
| Error::TaskCancelationWithEmptyQuery | Error::TaskCancelationWithEmptyQuery
| Error::FromRemoteWhenExporting { .. } | Error::FromRemoteWhenExporting { .. }
| Error::AbortedTask | Error::AbortedTask
| Error::S3Error { .. }
| Error::S3HttpError(_)
| Error::S3XmlError(_)
| Error::S3BucketError(_)
| Error::Dump(_) | Error::Dump(_)
| Error::Heed(_) | Error::Heed(_)
| Error::Milli { .. } | Error::Milli { .. }
@@ -293,8 +306,14 @@ impl ErrorCode for Error {
Error::BatchNotFound(_) => Code::BatchNotFound, Error::BatchNotFound(_) => Code::BatchNotFound,
Error::TaskDeletionWithEmptyQuery => Code::MissingTaskFilters, Error::TaskDeletionWithEmptyQuery => Code::MissingTaskFilters,
Error::TaskCancelationWithEmptyQuery => Code::MissingTaskFilters, Error::TaskCancelationWithEmptyQuery => Code::MissingTaskFilters,
// TODO: not sure of the Code to use
Error::NoSpaceLeftInTaskQueue => Code::NoSpaceLeftOnDevice, Error::NoSpaceLeftInTaskQueue => Code::NoSpaceLeftOnDevice,
Error::S3Error { status, .. } if status.is_client_error() => {
Code::InvalidS3SnapshotRequest
}
Error::S3Error { .. } => Code::S3SnapshotServerError,
Error::S3HttpError(_) => Code::S3SnapshotServerError,
Error::S3XmlError(_) => Code::S3SnapshotServerError,
Error::S3BucketError(_) => Code::InvalidS3SnapshotParameters,
Error::Dump(e) => e.error_code(), Error::Dump(e) => e.error_code(),
Error::Milli { error, .. } => error.error_code(), Error::Milli { error, .. } => error.error_code(),
Error::ProcessBatchPanicked(_) => Code::Internal, Error::ProcessBatchPanicked(_) => Code::Internal,

View File

@@ -245,7 +245,6 @@ impl IndexScheduler {
use std::os::fd::OwnedFd; use std::os::fd::OwnedFd;
use std::path::Path; use std::path::Path;
use std::path::PathBuf; use std::path::PathBuf;
use std::time::Duration;
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use path_slash::PathBufExt as _; use path_slash::PathBufExt as _;
@@ -265,47 +264,59 @@ impl IndexScheduler {
s3_secret_key, s3_secret_key,
s3_max_in_flight_parts, s3_max_in_flight_parts,
s3_compression_level: level, s3_compression_level: level,
s3_signature_duration,
s3_multipart_part_size,
} = opts; } = opts;
const ONE_HOUR: Duration = Duration::from_secs(3600);
// Parts are 375MiB which enables databases of up to 3.5TiB. Must be at least 2x5MiB.
const PART_SIZE: usize = 375 * 1024 * 1024;
let client = Client::new();
// TODO Remove this unwrap
let s3_snapshot_prefix = PathBuf::from_slash(s3_snapshot_prefix);
let url = s3_bucket_url.parse().unwrap();
let bucket = Bucket::new(url, UrlStyle::Path, s3_bucket_name, s3_bucket_region).unwrap();
let credential = Credentials::new(s3_access_key, s3_secret_key);
// TODO change this and use the database name like in the original version
let object_path = s3_snapshot_prefix.join("data.ms.snapshot");
let object = object_path.to_slash().expect("Invalid UTF-8 path").into_owned();
eprintln!("Starting the upload of the snapshot to {object}");
// TODO implement exponential backoff on upload requests: https://docs.rs/backoff // TODO implement exponential backoff on upload requests: https://docs.rs/backoff
// TODO return a result with actual errors // TODO return a result with actual errors
// TODO sign for longer than an hour?
// NOTE to make it work on Windows we could try using std::io::pipe instead.
// However, we are still using the tokio unix pipe in the async upload loop.
let (reader, writer) = std::io::pipe()?; let (reader, writer) = std::io::pipe()?;
let uploader_task = tokio::spawn(async move { let uploader_task = tokio::spawn(async move {
use rusty_s3::BucketError;
let reader = OwnedFd::from(reader); let reader = OwnedFd::from(reader);
let reader = tokio::net::unix::pipe::Receiver::from_owned_fd(reader)?; let reader = tokio::net::unix::pipe::Receiver::from_owned_fd(reader)?;
let action = bucket.create_multipart_upload(Some(&credential), &object);
// TODO Question: If it is only signed for an hour and a snapshot takes longer than an hour, what happens?
// If the part is deleted (like a TTL) we should sign it for at least 24 hours.
let url = action.sign(ONE_HOUR);
let resp = client.post(url).send().await.unwrap().error_for_status().unwrap();
let body = resp.text().await.unwrap();
let multipart = CreateMultipartUpload::parse_response(&body).unwrap(); let s3_snapshot_prefix = PathBuf::from_slash(s3_snapshot_prefix);
let mut etags = Vec::<String>::new(); let url = s3_bucket_url
.parse()
.map_err(BucketError::ParseError)
.map_err(Error::S3BucketError)?;
let bucket = Bucket::new(url, UrlStyle::Path, s3_bucket_name, s3_bucket_region)
.map_err(Error::S3BucketError)?;
let credential = Credentials::new(s3_access_key, s3_secret_key);
// TODO change this and use the database name like in the original version
let object_path = s3_snapshot_prefix.join("data.ms.snapshot");
let object = object_path.to_slash().expect("Invalid UTF-8 path").into_owned();
let action = bucket.create_multipart_upload(Some(&credential), &object);
let url = action.sign(s3_signature_duration);
let client = Client::new();
let resp = client.post(url).send().await.map_err(Error::S3HttpError)?;
let status = resp.status();
let body = match resp.error_for_status_ref() {
Ok(_) => resp.text().await.map_err(Error::S3HttpError)?,
Err(_) => {
return Err(Error::S3Error {
status,
body: resp.text().await.unwrap_or_default(),
})
}
};
let multipart = CreateMultipartUpload::parse_response(&body)
.map_err(|e| Error::S3XmlError(Box::new(e)))?;
tracing::debug!("Starting the upload of the snapshot to {object}");
// We use this bumpalo for etags strings.
let bump = bumpalo::Bump::new();
let mut etags = Vec::<&str>::new();
let mut in_flight = let mut in_flight =
VecDeque::<(JoinHandle<reqwest::Result<Response>>, Bytes)>::with_capacity( VecDeque::<(JoinHandle<reqwest::Result<Response>>, Bytes)>::with_capacity(
s3_max_in_flight_parts, s3_max_in_flight_parts.get(),
); );
// Part numbers start at 1 and cannot be larger than 10k
for part_number in 1u16.. { for part_number in 1u16.. {
let part_upload = bucket.upload_part( let part_upload = bucket.upload_part(
Some(&credential), Some(&credential),
@@ -313,79 +324,79 @@ impl IndexScheduler {
part_number, part_number,
multipart.upload_id(), multipart.upload_id(),
); );
let url = part_upload.sign(ONE_HOUR); let url = part_upload.sign(s3_signature_duration);
// Wait for a buffer to be ready if there are in-flight parts that landed // Wait for a buffer to be ready if there are in-flight parts that landed
let mut buffer = if in_flight.len() >= s3_max_in_flight_parts { let mut buffer = if in_flight.len() >= s3_max_in_flight_parts.get() {
let (request, buffer) = in_flight.pop_front().unwrap(); let (request, buffer) = in_flight.pop_front().unwrap();
let resp = request.await.unwrap().unwrap().error_for_status().unwrap(); let resp = request.await.unwrap().map_err(Error::S3HttpError)?;
let resp = match resp.error_for_status_ref() {
Ok(response) => response,
Err(_) => {
return Err(Error::S3Error {
status: resp.status(),
body: resp.text().await.unwrap_or_default(),
});
}
};
let etag = let etag =
resp.headers().get(ETAG).expect("every UploadPart request returns an Etag"); resp.headers().get(ETAG).expect("every UploadPart request returns an Etag");
let mut buffer = match buffer.try_into_mut() { let mut buffer = match buffer.try_into_mut() {
Ok(buffer) => buffer, Ok(buffer) => buffer,
Err(_) => panic!("Valid to convert into BytesMut"), Err(_) => unreachable!("Impossible to convert into BytesMut"),
}; };
// TODO use bumpalo to reduce the number of allocations etags.push(bump.alloc_str(etag.to_str().unwrap()));
etags.push(etag.to_str().unwrap().to_owned());
buffer.clear(); buffer.clear();
buffer buffer
} else { } else {
// TODO Base this on the available memory BytesMut::with_capacity(s3_multipart_part_size as usize)
BytesMut::with_capacity(PART_SIZE)
}; };
while buffer.len() < (PART_SIZE / 2) { // If we successfully read enough bytes,
eprintln!( // we can continue and send the buffer/part
"buffer is {:.2}% full, trying to read more", while buffer.len() < (s3_multipart_part_size as usize / 2) {
buffer.len() as f32 / buffer.capacity() as f32 * 100.0
);
// Wait for the pipe to be readable // Wait for the pipe to be readable
reader.readable().await?; reader.readable().await?;
match reader.try_read_buf(&mut buffer) { match reader.try_read_buf(&mut buffer) {
Ok(0) => break, Ok(0) => break,
// We read some bytes but maybe not enough // We read some bytes but maybe not enough
Ok(n) => { Ok(_) => continue,
eprintln!("Read {} bytes from pipe, continuing", n);
continue;
}
// The readiness event is a false positive. // The readiness event is a false positive.
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
eprintln!("received a WouldBlock");
continue;
}
Err(e) => return Err(e.into()), Err(e) => return Err(e.into()),
} }
} }
eprintln!(
"buffer is {:.2}% full",
buffer.len() as f32 / buffer.capacity() as f32 * 100.0
);
if buffer.is_empty() { if buffer.is_empty() {
eprintln!("buffer is empty, breaking part number loop"); // Break the loop if the buffer is
// Break the loop if the buffer is empty // empty after we tried to read bytes
// after we tried to read bytes
break; break;
} }
let body = buffer.freeze(); let body = buffer.freeze();
eprintln!("Sending part {}", part_number); tracing::trace!("Sending part {part_number}");
let task = tokio::spawn(client.put(url).body(body.clone()).send()); let task = tokio::spawn(client.put(url).body(body.clone()).send());
in_flight.push_back((task, body)); in_flight.push_back((task, body));
} }
for (join_handle, _buffer) in in_flight { for (join_handle, _buffer) in in_flight {
let resp = join_handle.await.unwrap().unwrap().error_for_status().unwrap(); let resp = join_handle.await.unwrap().map_err(Error::S3HttpError)?;
let resp = match resp.error_for_status_ref() {
Ok(response) => response,
Err(_) => {
return Err(Error::S3Error {
status: resp.status(),
body: resp.text().await.unwrap_or_default(),
})
}
};
let etag = let etag =
resp.headers().get(ETAG).expect("every UploadPart request returns an Etag"); resp.headers().get(ETAG).expect("every UploadPart request returns an Etag");
// TODO use bumpalo to reduce the number of allocations etags.push(bump.alloc_str(etag.to_str().unwrap()));
etags.push(etag.to_str().unwrap().to_owned());
} }
eprintln!("Finalizing the multipart upload"); tracing::trace!("Finalizing the multipart upload");
let action = bucket.complete_multipart_upload( let action = bucket.complete_multipart_upload(
Some(&credential), Some(&credential),
@@ -393,23 +404,22 @@ impl IndexScheduler {
multipart.upload_id(), multipart.upload_id(),
etags.iter().map(AsRef::as_ref), etags.iter().map(AsRef::as_ref),
); );
let url = action.sign(ONE_HOUR); let url = action.sign(s3_signature_duration);
let resp = client.post(url).body(action.body()).send().await.unwrap(); let resp =
client.post(url).body(action.body()).send().await.map_err(Error::S3HttpError)?;
let status = resp.status(); let status = resp.status();
let text = resp.text().await.unwrap(); let body =
eprintln!("Status: {status}, Text: {text}"); resp.text().await.map_err(|e| Error::S3Error { status, body: e.to_string() })?;
// TODO do a better check and do not assert if status.is_success() {
// assert!(resp.status().is_success()); Ok(())
} else {
Result::<_, Error>::Ok(()) Err(Error::S3Error { status, body })
}
}); });
// TODO not a big fan of this clone
// remove it and get all the necessary data from the scheduler
let index_scheduler = IndexScheduler::private_clone(self); let index_scheduler = IndexScheduler::private_clone(self);
let builder_task = tokio::task::spawn_blocking(move || { let builder_task = tokio::task::spawn_blocking(move || {
// NOTE enabling compression still generates a corrupted tarball
let writer = flate2::write::GzEncoder::new(writer, flate2::Compression::new(level)); let writer = flate2::write::GzEncoder::new(writer, flate2::Compression::new(level));
let mut tarball = tar::Builder::new(writer); let mut tarball = tar::Builder::new(writer);
@@ -422,10 +432,11 @@ impl IndexScheduler {
// 2. Snapshot the index scheduler LMDB env // 2. Snapshot the index scheduler LMDB env
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler); progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler);
let mut tasks_env_file = index_scheduler.env.try_clone_inner_file()?; let mut tasks_env_file = index_scheduler.env.try_clone_inner_file()?;
// NOTE That made the trick !!! Should I memory map instead? // Note: Seeking to the start of the file is necessary to ensure
// the tarball reads the file from the beginning. I still
// don't know why the read cursor is not at the beginning.
tasks_env_file.seek(SeekFrom::Start(0))?; tasks_env_file.seek(SeekFrom::Start(0))?;
let path = Path::new("tasks").join("data.mdb"); let path = Path::new("tasks").join("data.mdb");
// NOTE when commenting this line, the tarball works better
tarball.append_file(path, &mut tasks_env_file)?; tarball.append_file(path, &mut tasks_env_file)?;
drop(tasks_env_file); drop(tasks_env_file);
@@ -469,11 +480,8 @@ impl IndexScheduler {
.map(|res| res.map_err(Error::from).map(|(name, uuid)| (name.to_string(), uuid))) .map(|res| res.map_err(Error::from).map(|(name, uuid)| (name.to_string(), uuid)))
.collect::<Result<_, Error>>()?; .collect::<Result<_, Error>>()?;
dbg!(&indexes_references); // It's prettier to use a for loop instead of the IndexMapper::try_for_each_index
// method, especially when we need to access the UUID, local path and index number.
// Note that we need to collect and open all of the indexes files because
// otherwise, using a for loop, we would have to have a Send rtxn.
// TODO I don't need to do this trick if my writer is NOT async
for (i, (name, uuid)) in indexes_references.into_iter().enumerate() { for (i, (name, uuid)) in indexes_references.into_iter().enumerate() {
progress.update_progress(VariableNameStep::<SnapshotCreationProgress>::new( progress.update_progress(VariableNameStep::<SnapshotCreationProgress>::new(
&name, i as u32, nb_indexes, &name, i as u32, nb_indexes,
@@ -482,7 +490,7 @@ impl IndexScheduler {
let index = index_scheduler.index_mapper.index(&rtxn, &name)?; let index = index_scheduler.index_mapper.index(&rtxn, &name)?;
let mut index_file = index.try_clone_inner_file().unwrap(); let mut index_file = index.try_clone_inner_file().unwrap();
index_file.seek(SeekFrom::Start(0))?; index_file.seek(SeekFrom::Start(0))?;
eprintln!("Appending index file for {} in {}", name, path.display()); tracing::trace!("Appending index file for {name} in {}", path.display());
tarball.append_file(path, &mut index_file)?; tarball.append_file(path, &mut index_file)?;
} }

View File

@@ -390,6 +390,9 @@ TooManyVectors , InvalidRequest , BAD_REQU
UnretrievableDocument , Internal , BAD_REQUEST ; UnretrievableDocument , Internal , BAD_REQUEST ;
UnretrievableErrorCode , InvalidRequest , BAD_REQUEST ; UnretrievableErrorCode , InvalidRequest , BAD_REQUEST ;
UnsupportedMediaType , InvalidRequest , UNSUPPORTED_MEDIA_TYPE ; UnsupportedMediaType , InvalidRequest , UNSUPPORTED_MEDIA_TYPE ;
InvalidS3SnapshotRequest , Internal , BAD_REQUEST ;
InvalidS3SnapshotParameters , Internal , BAD_REQUEST ;
S3SnapshotServerError , Internal , BAD_GATEWAY ;
// Experimental features // Experimental features
VectorEmbeddingError , InvalidRequest , BAD_REQUEST ; VectorEmbeddingError , InvalidRequest , BAD_REQUEST ;

View File

@@ -7,6 +7,7 @@ use std::ops::Deref;
use std::path::PathBuf; use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use std::{env, fmt, fs}; use std::{env, fmt, fs};
use byte_unit::{Byte, ParseError, UnitType}; use byte_unit::{Byte, ParseError, UnitType};
@@ -84,6 +85,8 @@ const MEILI_S3_ACCESS_KEY: &str = "MEILI_S3_ACCESS_KEY";
const MEILI_S3_SECRET_KEY: &str = "MEILI_S3_SECRET_KEY"; const MEILI_S3_SECRET_KEY: &str = "MEILI_S3_SECRET_KEY";
const MEILI_S3_MAX_IN_FLIGHT_PARTS: &str = "MEILI_S3_MAX_IN_FLIGHT_PARTS"; const MEILI_S3_MAX_IN_FLIGHT_PARTS: &str = "MEILI_S3_MAX_IN_FLIGHT_PARTS";
const MEILI_S3_COMPRESSION_LEVEL: &str = "MEILI_S3_COMPRESSION_LEVEL"; const MEILI_S3_COMPRESSION_LEVEL: &str = "MEILI_S3_COMPRESSION_LEVEL";
const MEILI_S3_SIGNATURE_DURATION_SECONDS: &str = "MEILI_S3_SIGNATURE_DURATION_SECONDS";
const MEILI_S3_MULTIPART_PART_SIZE: &str = "MEILI_S3_MULTIPART_PART_SIZE";
const DEFAULT_CONFIG_FILE_PATH: &str = "./config.toml"; const DEFAULT_CONFIG_FILE_PATH: &str = "./config.toml";
const DEFAULT_DB_PATH: &str = "./data.ms"; const DEFAULT_DB_PATH: &str = "./data.ms";
@@ -96,6 +99,8 @@ const DEFAULT_SNAPSHOT_INTERVAL_SEC_STR: &str = "86400";
const DEFAULT_DUMP_DIR: &str = "dumps/"; const DEFAULT_DUMP_DIR: &str = "dumps/";
const DEFAULT_S3_SNAPSHOT_MAX_IN_FLIGHT_PARTS: NonZeroUsize = NonZeroUsize::new(10).unwrap(); const DEFAULT_S3_SNAPSHOT_MAX_IN_FLIGHT_PARTS: NonZeroUsize = NonZeroUsize::new(10).unwrap();
const DEFAULT_S3_SNAPSHOT_COMPRESSION_LEVEL: u32 = 0; const DEFAULT_S3_SNAPSHOT_COMPRESSION_LEVEL: u32 = 0;
const DEFAULT_S3_SNAPSHOT_SIGNATURE_DURATION_SECONDS: u64 = 8 * 3600; // 8 hours
const DEFAULT_S3_SNAPSHOT_MULTIPART_PART_SIZE: Byte = Byte::from_u64(375 * 1024 * 1024); // 375 MiB
const MEILI_MAX_INDEXING_MEMORY: &str = "MEILI_MAX_INDEXING_MEMORY"; const MEILI_MAX_INDEXING_MEMORY: &str = "MEILI_MAX_INDEXING_MEMORY";
const MEILI_MAX_INDEXING_THREADS: &str = "MEILI_MAX_INDEXING_THREADS"; const MEILI_MAX_INDEXING_THREADS: &str = "MEILI_MAX_INDEXING_THREADS";
@@ -951,6 +956,20 @@ pub struct S3SnapshotOpts {
#[clap(long, env = MEILI_S3_COMPRESSION_LEVEL, default_value_t = default_s3_snapshot_compression_level())] #[clap(long, env = MEILI_S3_COMPRESSION_LEVEL, default_value_t = default_s3_snapshot_compression_level())]
#[serde(default = "default_s3_snapshot_compression_level")] #[serde(default = "default_s3_snapshot_compression_level")]
pub s3_compression_level: u32, pub s3_compression_level: u32,
/// The signature duration for the multipart upload.
#[clap(long, env = MEILI_S3_SIGNATURE_DURATION_SECONDS, default_value_t = default_s3_snapshot_signature_duration_seconds())]
#[serde(default = "default_s3_snapshot_signature_duration_seconds")]
pub s3_signature_duration_seconds: u64,
/// The size of the the multipart parts.
///
/// Must not be less than 10MiB and larger than 8GiB. Yes,
/// twice the boundaries of the AWS S3 multipart upload
/// because we use it a bit differently internally.
#[clap(long, env = MEILI_S3_MULTIPART_PART_SIZE, default_value_t = default_s3_snapshot_multipart_part_size())]
#[serde(default = "default_s3_snapshot_multipart_part_size")]
pub s3_multipart_part_size: Byte,
} }
impl S3SnapshotOpts { impl S3SnapshotOpts {
@@ -965,6 +984,8 @@ impl S3SnapshotOpts {
s3_secret_key, s3_secret_key,
s3_max_in_flight_parts, s3_max_in_flight_parts,
s3_compression_level, s3_compression_level,
s3_signature_duration_seconds,
s3_multipart_part_size,
} = self; } = self;
export_to_env_if_not_present(MEILI_S3_BUCKET_URL, s3_bucket_url); export_to_env_if_not_present(MEILI_S3_BUCKET_URL, s3_bucket_url);
@@ -978,6 +999,14 @@ impl S3SnapshotOpts {
s3_max_in_flight_parts.to_string(), s3_max_in_flight_parts.to_string(),
); );
export_to_env_if_not_present(MEILI_S3_COMPRESSION_LEVEL, s3_compression_level.to_string()); export_to_env_if_not_present(MEILI_S3_COMPRESSION_LEVEL, s3_compression_level.to_string());
export_to_env_if_not_present(
MEILI_S3_SIGNATURE_DURATION_SECONDS,
s3_signature_duration_seconds.to_string(),
);
export_to_env_if_not_present(
MEILI_S3_MULTIPART_PART_SIZE,
s3_multipart_part_size.to_string(),
);
} }
} }
@@ -994,6 +1023,8 @@ impl TryFrom<S3SnapshotOpts> for S3SnapshotOptions {
s3_secret_key, s3_secret_key,
s3_max_in_flight_parts, s3_max_in_flight_parts,
s3_compression_level, s3_compression_level,
s3_signature_duration_seconds,
s3_multipart_part_size,
} = other; } = other;
Ok(S3SnapshotOptions { Ok(S3SnapshotOptions {
@@ -1005,6 +1036,8 @@ impl TryFrom<S3SnapshotOpts> for S3SnapshotOptions {
s3_secret_key, s3_secret_key,
s3_max_in_flight_parts, s3_max_in_flight_parts,
s3_compression_level, s3_compression_level,
s3_signature_duration: Duration::from_secs(s3_signature_duration_seconds),
s3_multipart_part_size: s3_multipart_part_size.as_u64(),
}) })
} }
} }
@@ -1231,6 +1264,14 @@ fn default_s3_snapshot_compression_level() -> u32 {
DEFAULT_S3_SNAPSHOT_COMPRESSION_LEVEL DEFAULT_S3_SNAPSHOT_COMPRESSION_LEVEL
} }
fn default_s3_snapshot_signature_duration_seconds() -> u64 {
DEFAULT_S3_SNAPSHOT_SIGNATURE_DURATION_SECONDS
}
fn default_s3_snapshot_multipart_part_size() -> Byte {
DEFAULT_S3_SNAPSHOT_MULTIPART_PART_SIZE
}
fn default_dump_dir() -> PathBuf { fn default_dump_dir() -> PathBuf {
PathBuf::from(DEFAULT_DUMP_DIR) PathBuf::from(DEFAULT_DUMP_DIR)
} }

View File

@@ -1,4 +1,5 @@
use std::num::NonZeroUsize; use std::num::NonZeroUsize;
use std::time::Duration;
use grenad::CompressionType; use grenad::CompressionType;
@@ -50,6 +51,8 @@ pub struct S3SnapshotOptions {
pub s3_secret_key: String, pub s3_secret_key: String,
pub s3_max_in_flight_parts: NonZeroUsize, pub s3_max_in_flight_parts: NonZeroUsize,
pub s3_compression_level: u32, pub s3_compression_level: u32,
pub s3_signature_duration: Duration,
pub s3_multipart_part_size: u64,
} }
/// By default use only 1 thread for indexing in tests /// By default use only 1 thread for indexing in tests