diff --git a/crates/index-scheduler/src/error.rs b/crates/index-scheduler/src/error.rs index 332b7e040..11d6f6e4c 100644 --- a/crates/index-scheduler/src/error.rs +++ b/crates/index-scheduler/src/error.rs @@ -5,6 +5,7 @@ use meilisearch_types::error::{Code, ErrorCode}; use meilisearch_types::milli::index::RollbackOutcome; use meilisearch_types::tasks::{Kind, Status}; use meilisearch_types::{heed, milli}; +use reqwest::StatusCode; use thiserror::Error; use crate::TaskId; @@ -127,6 +128,14 @@ pub enum Error { #[error("Aborted task")] AbortedTask, + #[error("S3 error: status: {status}, body: {body}")] + S3Error { status: StatusCode, body: String }, + #[error("S3 HTTP error: {0}")] + S3HttpError(reqwest::Error), + #[error("S3 XML error: {0}")] + S3XmlError(Box), + #[error("S3 bucket error: {0}")] + S3BucketError(rusty_s3::BucketError), #[error(transparent)] Dump(#[from] dump::Error), #[error(transparent)] @@ -226,6 +235,10 @@ impl Error { | Error::TaskCancelationWithEmptyQuery | Error::FromRemoteWhenExporting { .. } | Error::AbortedTask + | Error::S3Error { .. } + | Error::S3HttpError(_) + | Error::S3XmlError(_) + | Error::S3BucketError(_) | Error::Dump(_) | Error::Heed(_) | Error::Milli { .. } @@ -293,8 +306,14 @@ impl ErrorCode for Error { Error::BatchNotFound(_) => Code::BatchNotFound, Error::TaskDeletionWithEmptyQuery => Code::MissingTaskFilters, Error::TaskCancelationWithEmptyQuery => Code::MissingTaskFilters, - // TODO: not sure of the Code to use Error::NoSpaceLeftInTaskQueue => Code::NoSpaceLeftOnDevice, + Error::S3Error { status, .. } if status.is_client_error() => { + Code::InvalidS3SnapshotRequest + } + Error::S3Error { .. } => Code::S3SnapshotServerError, + Error::S3HttpError(_) => Code::S3SnapshotServerError, + Error::S3XmlError(_) => Code::S3SnapshotServerError, + Error::S3BucketError(_) => Code::InvalidS3SnapshotParameters, Error::Dump(e) => e.error_code(), Error::Milli { error, .. } => error.error_code(), Error::ProcessBatchPanicked(_) => Code::Internal, diff --git a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs index 278245861..e1a818717 100644 --- a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs +++ b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs @@ -245,7 +245,6 @@ impl IndexScheduler { use std::os::fd::OwnedFd; use std::path::Path; use std::path::PathBuf; - use std::time::Duration; use bytes::{Bytes, BytesMut}; use path_slash::PathBufExt as _; @@ -265,47 +264,59 @@ impl IndexScheduler { s3_secret_key, s3_max_in_flight_parts, s3_compression_level: level, + s3_signature_duration, + s3_multipart_part_size, } = opts; - const ONE_HOUR: Duration = Duration::from_secs(3600); - // Parts are 375MiB which enables databases of up to 3.5TiB. Must be at least 2x5MiB. - const PART_SIZE: usize = 375 * 1024 * 1024; - - let client = Client::new(); - // TODO Remove this unwrap - let s3_snapshot_prefix = PathBuf::from_slash(s3_snapshot_prefix); - let url = s3_bucket_url.parse().unwrap(); - let bucket = Bucket::new(url, UrlStyle::Path, s3_bucket_name, s3_bucket_region).unwrap(); - let credential = Credentials::new(s3_access_key, s3_secret_key); - // TODO change this and use the database name like in the original version - let object_path = s3_snapshot_prefix.join("data.ms.snapshot"); - let object = object_path.to_slash().expect("Invalid UTF-8 path").into_owned(); - - eprintln!("Starting the upload of the snapshot to {object}"); - // TODO implement exponential backoff on upload requests: https://docs.rs/backoff // TODO return a result with actual errors - // TODO sign for longer than an hour? - // NOTE to make it work on Windows we could try using std::io::pipe instead. - // However, we are still using the tokio unix pipe in the async upload loop. let (reader, writer) = std::io::pipe()?; let uploader_task = tokio::spawn(async move { + use rusty_s3::BucketError; + let reader = OwnedFd::from(reader); let reader = tokio::net::unix::pipe::Receiver::from_owned_fd(reader)?; - let action = bucket.create_multipart_upload(Some(&credential), &object); - // TODO Question: If it is only signed for an hour and a snapshot takes longer than an hour, what happens? - // If the part is deleted (like a TTL) we should sign it for at least 24 hours. - let url = action.sign(ONE_HOUR); - let resp = client.post(url).send().await.unwrap().error_for_status().unwrap(); - let body = resp.text().await.unwrap(); - let multipart = CreateMultipartUpload::parse_response(&body).unwrap(); - let mut etags = Vec::::new(); + let s3_snapshot_prefix = PathBuf::from_slash(s3_snapshot_prefix); + let url = s3_bucket_url + .parse() + .map_err(BucketError::ParseError) + .map_err(Error::S3BucketError)?; + let bucket = Bucket::new(url, UrlStyle::Path, s3_bucket_name, s3_bucket_region) + .map_err(Error::S3BucketError)?; + let credential = Credentials::new(s3_access_key, s3_secret_key); + // TODO change this and use the database name like in the original version + let object_path = s3_snapshot_prefix.join("data.ms.snapshot"); + let object = object_path.to_slash().expect("Invalid UTF-8 path").into_owned(); + + let action = bucket.create_multipart_upload(Some(&credential), &object); + let url = action.sign(s3_signature_duration); + let client = Client::new(); + let resp = client.post(url).send().await.map_err(Error::S3HttpError)?; + let status = resp.status(); + let body = match resp.error_for_status_ref() { + Ok(_) => resp.text().await.map_err(Error::S3HttpError)?, + Err(_) => { + return Err(Error::S3Error { + status, + body: resp.text().await.unwrap_or_default(), + }) + } + }; + + let multipart = CreateMultipartUpload::parse_response(&body) + .map_err(|e| Error::S3XmlError(Box::new(e)))?; + tracing::debug!("Starting the upload of the snapshot to {object}"); + + // We use this bumpalo for etags strings. + let bump = bumpalo::Bump::new(); + let mut etags = Vec::<&str>::new(); let mut in_flight = VecDeque::<(JoinHandle>, Bytes)>::with_capacity( - s3_max_in_flight_parts, + s3_max_in_flight_parts.get(), ); + // Part numbers start at 1 and cannot be larger than 10k for part_number in 1u16.. { let part_upload = bucket.upload_part( Some(&credential), @@ -313,79 +324,79 @@ impl IndexScheduler { part_number, multipart.upload_id(), ); - let url = part_upload.sign(ONE_HOUR); + let url = part_upload.sign(s3_signature_duration); // Wait for a buffer to be ready if there are in-flight parts that landed - let mut buffer = if in_flight.len() >= s3_max_in_flight_parts { + let mut buffer = if in_flight.len() >= s3_max_in_flight_parts.get() { let (request, buffer) = in_flight.pop_front().unwrap(); - let resp = request.await.unwrap().unwrap().error_for_status().unwrap(); + let resp = request.await.unwrap().map_err(Error::S3HttpError)?; + let resp = match resp.error_for_status_ref() { + Ok(response) => response, + Err(_) => { + return Err(Error::S3Error { + status: resp.status(), + body: resp.text().await.unwrap_or_default(), + }); + } + }; let etag = resp.headers().get(ETAG).expect("every UploadPart request returns an Etag"); let mut buffer = match buffer.try_into_mut() { Ok(buffer) => buffer, - Err(_) => panic!("Valid to convert into BytesMut"), + Err(_) => unreachable!("Impossible to convert into BytesMut"), }; - // TODO use bumpalo to reduce the number of allocations - etags.push(etag.to_str().unwrap().to_owned()); + etags.push(bump.alloc_str(etag.to_str().unwrap())); buffer.clear(); buffer } else { - // TODO Base this on the available memory - BytesMut::with_capacity(PART_SIZE) + BytesMut::with_capacity(s3_multipart_part_size as usize) }; - while buffer.len() < (PART_SIZE / 2) { - eprintln!( - "buffer is {:.2}% full, trying to read more", - buffer.len() as f32 / buffer.capacity() as f32 * 100.0 - ); - + // If we successfully read enough bytes, + // we can continue and send the buffer/part + while buffer.len() < (s3_multipart_part_size as usize / 2) { // Wait for the pipe to be readable reader.readable().await?; match reader.try_read_buf(&mut buffer) { Ok(0) => break, // We read some bytes but maybe not enough - Ok(n) => { - eprintln!("Read {} bytes from pipe, continuing", n); - continue; - } + Ok(_) => continue, // The readiness event is a false positive. - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - eprintln!("received a WouldBlock"); - continue; - } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, Err(e) => return Err(e.into()), } } - eprintln!( - "buffer is {:.2}% full", - buffer.len() as f32 / buffer.capacity() as f32 * 100.0 - ); - if buffer.is_empty() { - eprintln!("buffer is empty, breaking part number loop"); - // Break the loop if the buffer is empty - // after we tried to read bytes + // Break the loop if the buffer is + // empty after we tried to read bytes break; } let body = buffer.freeze(); - eprintln!("Sending part {}", part_number); + tracing::trace!("Sending part {part_number}"); let task = tokio::spawn(client.put(url).body(body.clone()).send()); in_flight.push_back((task, body)); } for (join_handle, _buffer) in in_flight { - let resp = join_handle.await.unwrap().unwrap().error_for_status().unwrap(); + let resp = join_handle.await.unwrap().map_err(Error::S3HttpError)?; + let resp = match resp.error_for_status_ref() { + Ok(response) => response, + Err(_) => { + return Err(Error::S3Error { + status: resp.status(), + body: resp.text().await.unwrap_or_default(), + }) + } + }; let etag = resp.headers().get(ETAG).expect("every UploadPart request returns an Etag"); - // TODO use bumpalo to reduce the number of allocations - etags.push(etag.to_str().unwrap().to_owned()); + etags.push(bump.alloc_str(etag.to_str().unwrap())); } - eprintln!("Finalizing the multipart upload"); + tracing::trace!("Finalizing the multipart upload"); let action = bucket.complete_multipart_upload( Some(&credential), @@ -393,23 +404,22 @@ impl IndexScheduler { multipart.upload_id(), etags.iter().map(AsRef::as_ref), ); - let url = action.sign(ONE_HOUR); - let resp = client.post(url).body(action.body()).send().await.unwrap(); + let url = action.sign(s3_signature_duration); + let resp = + client.post(url).body(action.body()).send().await.map_err(Error::S3HttpError)?; let status = resp.status(); - let text = resp.text().await.unwrap(); - eprintln!("Status: {status}, Text: {text}"); + let body = + resp.text().await.map_err(|e| Error::S3Error { status, body: e.to_string() })?; - // TODO do a better check and do not assert - // assert!(resp.status().is_success()); - - Result::<_, Error>::Ok(()) + if status.is_success() { + Ok(()) + } else { + Err(Error::S3Error { status, body }) + } }); - // TODO not a big fan of this clone - // remove it and get all the necessary data from the scheduler let index_scheduler = IndexScheduler::private_clone(self); let builder_task = tokio::task::spawn_blocking(move || { - // NOTE enabling compression still generates a corrupted tarball let writer = flate2::write::GzEncoder::new(writer, flate2::Compression::new(level)); let mut tarball = tar::Builder::new(writer); @@ -422,10 +432,11 @@ impl IndexScheduler { // 2. Snapshot the index scheduler LMDB env progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler); let mut tasks_env_file = index_scheduler.env.try_clone_inner_file()?; - // NOTE That made the trick !!! Should I memory map instead? + // Note: Seeking to the start of the file is necessary to ensure + // the tarball reads the file from the beginning. I still + // don't know why the read cursor is not at the beginning. tasks_env_file.seek(SeekFrom::Start(0))?; let path = Path::new("tasks").join("data.mdb"); - // NOTE when commenting this line, the tarball works better tarball.append_file(path, &mut tasks_env_file)?; drop(tasks_env_file); @@ -469,11 +480,8 @@ impl IndexScheduler { .map(|res| res.map_err(Error::from).map(|(name, uuid)| (name.to_string(), uuid))) .collect::>()?; - dbg!(&indexes_references); - - // Note that we need to collect and open all of the indexes files because - // otherwise, using a for loop, we would have to have a Send rtxn. - // TODO I don't need to do this trick if my writer is NOT async + // It's prettier to use a for loop instead of the IndexMapper::try_for_each_index + // method, especially when we need to access the UUID, local path and index number. for (i, (name, uuid)) in indexes_references.into_iter().enumerate() { progress.update_progress(VariableNameStep::::new( &name, i as u32, nb_indexes, @@ -482,7 +490,7 @@ impl IndexScheduler { let index = index_scheduler.index_mapper.index(&rtxn, &name)?; let mut index_file = index.try_clone_inner_file().unwrap(); index_file.seek(SeekFrom::Start(0))?; - eprintln!("Appending index file for {} in {}", name, path.display()); + tracing::trace!("Appending index file for {name} in {}", path.display()); tarball.append_file(path, &mut index_file)?; } diff --git a/crates/meilisearch-types/src/error.rs b/crates/meilisearch-types/src/error.rs index 902454b0b..2d7185b5b 100644 --- a/crates/meilisearch-types/src/error.rs +++ b/crates/meilisearch-types/src/error.rs @@ -390,6 +390,9 @@ TooManyVectors , InvalidRequest , BAD_REQU UnretrievableDocument , Internal , BAD_REQUEST ; UnretrievableErrorCode , InvalidRequest , BAD_REQUEST ; UnsupportedMediaType , InvalidRequest , UNSUPPORTED_MEDIA_TYPE ; +InvalidS3SnapshotRequest , Internal , BAD_REQUEST ; +InvalidS3SnapshotParameters , Internal , BAD_REQUEST ; +S3SnapshotServerError , Internal , BAD_GATEWAY ; // Experimental features VectorEmbeddingError , InvalidRequest , BAD_REQUEST ; diff --git a/crates/meilisearch/src/option.rs b/crates/meilisearch/src/option.rs index 1f4173399..a62a382e7 100644 --- a/crates/meilisearch/src/option.rs +++ b/crates/meilisearch/src/option.rs @@ -7,6 +7,7 @@ use std::ops::Deref; use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; +use std::time::Duration; use std::{env, fmt, fs}; use byte_unit::{Byte, ParseError, UnitType}; @@ -84,6 +85,8 @@ const MEILI_S3_ACCESS_KEY: &str = "MEILI_S3_ACCESS_KEY"; const MEILI_S3_SECRET_KEY: &str = "MEILI_S3_SECRET_KEY"; const MEILI_S3_MAX_IN_FLIGHT_PARTS: &str = "MEILI_S3_MAX_IN_FLIGHT_PARTS"; const MEILI_S3_COMPRESSION_LEVEL: &str = "MEILI_S3_COMPRESSION_LEVEL"; +const MEILI_S3_SIGNATURE_DURATION_SECONDS: &str = "MEILI_S3_SIGNATURE_DURATION_SECONDS"; +const MEILI_S3_MULTIPART_PART_SIZE: &str = "MEILI_S3_MULTIPART_PART_SIZE"; const DEFAULT_CONFIG_FILE_PATH: &str = "./config.toml"; const DEFAULT_DB_PATH: &str = "./data.ms"; @@ -96,6 +99,8 @@ const DEFAULT_SNAPSHOT_INTERVAL_SEC_STR: &str = "86400"; const DEFAULT_DUMP_DIR: &str = "dumps/"; const DEFAULT_S3_SNAPSHOT_MAX_IN_FLIGHT_PARTS: NonZeroUsize = NonZeroUsize::new(10).unwrap(); const DEFAULT_S3_SNAPSHOT_COMPRESSION_LEVEL: u32 = 0; +const DEFAULT_S3_SNAPSHOT_SIGNATURE_DURATION_SECONDS: u64 = 8 * 3600; // 8 hours +const DEFAULT_S3_SNAPSHOT_MULTIPART_PART_SIZE: Byte = Byte::from_u64(375 * 1024 * 1024); // 375 MiB const MEILI_MAX_INDEXING_MEMORY: &str = "MEILI_MAX_INDEXING_MEMORY"; const MEILI_MAX_INDEXING_THREADS: &str = "MEILI_MAX_INDEXING_THREADS"; @@ -951,6 +956,20 @@ pub struct S3SnapshotOpts { #[clap(long, env = MEILI_S3_COMPRESSION_LEVEL, default_value_t = default_s3_snapshot_compression_level())] #[serde(default = "default_s3_snapshot_compression_level")] pub s3_compression_level: u32, + + /// The signature duration for the multipart upload. + #[clap(long, env = MEILI_S3_SIGNATURE_DURATION_SECONDS, default_value_t = default_s3_snapshot_signature_duration_seconds())] + #[serde(default = "default_s3_snapshot_signature_duration_seconds")] + pub s3_signature_duration_seconds: u64, + + /// The size of the the multipart parts. + /// + /// Must not be less than 10MiB and larger than 8GiB. Yes, + /// twice the boundaries of the AWS S3 multipart upload + /// because we use it a bit differently internally. + #[clap(long, env = MEILI_S3_MULTIPART_PART_SIZE, default_value_t = default_s3_snapshot_multipart_part_size())] + #[serde(default = "default_s3_snapshot_multipart_part_size")] + pub s3_multipart_part_size: Byte, } impl S3SnapshotOpts { @@ -965,6 +984,8 @@ impl S3SnapshotOpts { s3_secret_key, s3_max_in_flight_parts, s3_compression_level, + s3_signature_duration_seconds, + s3_multipart_part_size, } = self; export_to_env_if_not_present(MEILI_S3_BUCKET_URL, s3_bucket_url); @@ -978,6 +999,14 @@ impl S3SnapshotOpts { s3_max_in_flight_parts.to_string(), ); export_to_env_if_not_present(MEILI_S3_COMPRESSION_LEVEL, s3_compression_level.to_string()); + export_to_env_if_not_present( + MEILI_S3_SIGNATURE_DURATION_SECONDS, + s3_signature_duration_seconds.to_string(), + ); + export_to_env_if_not_present( + MEILI_S3_MULTIPART_PART_SIZE, + s3_multipart_part_size.to_string(), + ); } } @@ -994,6 +1023,8 @@ impl TryFrom for S3SnapshotOptions { s3_secret_key, s3_max_in_flight_parts, s3_compression_level, + s3_signature_duration_seconds, + s3_multipart_part_size, } = other; Ok(S3SnapshotOptions { @@ -1005,6 +1036,8 @@ impl TryFrom for S3SnapshotOptions { s3_secret_key, s3_max_in_flight_parts, s3_compression_level, + s3_signature_duration: Duration::from_secs(s3_signature_duration_seconds), + s3_multipart_part_size: s3_multipart_part_size.as_u64(), }) } } @@ -1231,6 +1264,14 @@ fn default_s3_snapshot_compression_level() -> u32 { DEFAULT_S3_SNAPSHOT_COMPRESSION_LEVEL } +fn default_s3_snapshot_signature_duration_seconds() -> u64 { + DEFAULT_S3_SNAPSHOT_SIGNATURE_DURATION_SECONDS +} + +fn default_s3_snapshot_multipart_part_size() -> Byte { + DEFAULT_S3_SNAPSHOT_MULTIPART_PART_SIZE +} + fn default_dump_dir() -> PathBuf { PathBuf::from(DEFAULT_DUMP_DIR) } diff --git a/crates/milli/src/update/indexer_config.rs b/crates/milli/src/update/indexer_config.rs index a3aecb9a2..11201c8ec 100644 --- a/crates/milli/src/update/indexer_config.rs +++ b/crates/milli/src/update/indexer_config.rs @@ -1,4 +1,5 @@ use std::num::NonZeroUsize; +use std::time::Duration; use grenad::CompressionType; @@ -50,6 +51,8 @@ pub struct S3SnapshotOptions { pub s3_secret_key: String, pub s3_max_in_flight_parts: NonZeroUsize, pub s3_compression_level: u32, + pub s3_signature_duration: Duration, + pub s3_multipart_part_size: u64, } /// By default use only 1 thread for indexing in tests