mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-12-02 02:35:36 +00:00
637 lines
24 KiB
Rust
637 lines
24 KiB
Rust
use std::ffi::OsStr;
|
|
use std::fs;
|
|
use std::sync::atomic::Ordering;
|
|
|
|
use meilisearch_types::heed::CompactionOption;
|
|
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
|
|
use meilisearch_types::tasks::{Status, Task};
|
|
use meilisearch_types::{compression, VERSION_FILE_NAME};
|
|
|
|
use crate::heed::EnvOpenOptions;
|
|
use crate::processing::{AtomicUpdateFileStep, SnapshotCreationProgress};
|
|
use crate::queue::TaskQueue;
|
|
use crate::{Error, IndexScheduler, Result};
|
|
|
|
const UPDATE_FILES_DIR_NAME: &str = "update_files";
|
|
|
|
/// # Safety
|
|
///
|
|
/// See [`EnvOpenOptions::open`].
|
|
unsafe fn remove_tasks(
|
|
tasks: &[Task],
|
|
dst: &std::path::Path,
|
|
index_base_map_size: usize,
|
|
) -> Result<()> {
|
|
let env_options = EnvOpenOptions::new();
|
|
let mut env_options = env_options.read_txn_without_tls();
|
|
let env = env_options.max_dbs(TaskQueue::nb_db()).map_size(index_base_map_size).open(dst)?;
|
|
let mut wtxn = env.write_txn()?;
|
|
let task_queue = TaskQueue::new(&env, &mut wtxn)?;
|
|
|
|
// Destructuring to ensure the code below gets updated if a database gets added in the future.
|
|
let TaskQueue {
|
|
all_tasks,
|
|
status,
|
|
kind,
|
|
index_tasks: _, // snapshot creation tasks are not index tasks
|
|
canceled_by,
|
|
enqueued_at,
|
|
started_at,
|
|
finished_at,
|
|
} = task_queue;
|
|
|
|
for task in tasks {
|
|
all_tasks.delete(&mut wtxn, &task.uid)?;
|
|
|
|
let mut tasks = status.get(&wtxn, &task.status)?.unwrap_or_default();
|
|
tasks.remove(task.uid);
|
|
status.put(&mut wtxn, &task.status, &tasks)?;
|
|
|
|
let mut tasks = kind.get(&wtxn, &task.kind.as_kind())?.unwrap_or_default();
|
|
tasks.remove(task.uid);
|
|
kind.put(&mut wtxn, &task.kind.as_kind(), &tasks)?;
|
|
|
|
canceled_by.delete(&mut wtxn, &task.uid)?;
|
|
|
|
let timestamp = task.enqueued_at.unix_timestamp_nanos();
|
|
let mut tasks = enqueued_at.get(&wtxn, ×tamp)?.unwrap_or_default();
|
|
tasks.remove(task.uid);
|
|
enqueued_at.put(&mut wtxn, ×tamp, &tasks)?;
|
|
|
|
if let Some(task_started_at) = task.started_at {
|
|
let timestamp = task_started_at.unix_timestamp_nanos();
|
|
let mut tasks = started_at.get(&wtxn, ×tamp)?.unwrap_or_default();
|
|
tasks.remove(task.uid);
|
|
started_at.put(&mut wtxn, ×tamp, &tasks)?;
|
|
}
|
|
|
|
if let Some(task_finished_at) = task.finished_at {
|
|
let timestamp = task_finished_at.unix_timestamp_nanos();
|
|
let mut tasks = finished_at.get(&wtxn, ×tamp)?.unwrap_or_default();
|
|
tasks.remove(task.uid);
|
|
finished_at.put(&mut wtxn, ×tamp, &tasks)?;
|
|
}
|
|
}
|
|
wtxn.commit()?;
|
|
Ok(())
|
|
}
|
|
|
|
impl IndexScheduler {
|
|
pub(super) fn process_snapshot(
|
|
&self,
|
|
progress: Progress,
|
|
tasks: Vec<Task>,
|
|
) -> Result<Vec<Task>> {
|
|
progress.update_progress(SnapshotCreationProgress::StartTheSnapshotCreation);
|
|
|
|
match self.scheduler.s3_snapshot_options.clone() {
|
|
Some(options) => {
|
|
#[cfg(not(unix))]
|
|
{
|
|
let _ = options;
|
|
panic!("Non-unix platform does not support S3 snapshotting");
|
|
}
|
|
#[cfg(unix)]
|
|
self.runtime
|
|
.as_ref()
|
|
.expect("Runtime not initialized")
|
|
.block_on(self.process_snapshot_to_s3(progress, options, tasks))
|
|
}
|
|
None => self.process_snapshots_to_disk(progress, tasks),
|
|
}
|
|
}
|
|
|
|
fn process_snapshots_to_disk(
|
|
&self,
|
|
progress: Progress,
|
|
mut tasks: Vec<Task>,
|
|
) -> Result<Vec<Task>, Error> {
|
|
fs::create_dir_all(&self.scheduler.snapshots_path)?;
|
|
let temp_snapshot_dir = tempfile::tempdir()?;
|
|
|
|
// 1. Snapshot the version file.
|
|
let dst = temp_snapshot_dir.path().join(VERSION_FILE_NAME);
|
|
fs::copy(&self.scheduler.version_file_path, dst)?;
|
|
|
|
// 2. Snapshot the index-scheduler LMDB env
|
|
//
|
|
// When we call copy_to_path, LMDB opens a read transaction by itself,
|
|
// we can't provide our own. It is an issue as we would like to know
|
|
// the update files to copy but new ones can be enqueued between the copy
|
|
// of the env and the new transaction we open to retrieve the enqueued tasks.
|
|
// So we prefer opening a new transaction after copying the env and copy more
|
|
// update files than not enough.
|
|
//
|
|
// Note that there cannot be any update files deleted between those
|
|
// two read operations as the task processing is synchronous.
|
|
|
|
// 2.1 First copy the LMDB env of the index-scheduler
|
|
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler);
|
|
let dst = temp_snapshot_dir.path().join("tasks");
|
|
fs::create_dir_all(&dst)?;
|
|
let compaction_option = if self.scheduler.experimental_no_snapshot_compaction {
|
|
CompactionOption::Disabled
|
|
} else {
|
|
CompactionOption::Enabled
|
|
};
|
|
self.env.copy_to_path(dst.join("data.mdb"), compaction_option)?;
|
|
|
|
// 2.2 Remove the current snapshot tasks
|
|
//
|
|
// This is done to ensure that the tasks are not processed again when the snapshot is imported
|
|
//
|
|
// # Safety
|
|
//
|
|
// This is safe because we open the env file we just created in a temporary directory.
|
|
// We are sure it's not being used by any other process nor thread.
|
|
unsafe {
|
|
remove_tasks(&tasks, &dst, self.index_mapper.index_base_map_size)?;
|
|
}
|
|
|
|
// 2.3 Create a read transaction on the index-scheduler
|
|
let rtxn = self.env.read_txn()?;
|
|
|
|
// 2.4 Create the update files directory
|
|
let update_files_dir = temp_snapshot_dir.path().join(UPDATE_FILES_DIR_NAME);
|
|
fs::create_dir_all(&update_files_dir)?;
|
|
|
|
// 2.5 Only copy the update files of the enqueued tasks
|
|
progress.update_progress(SnapshotCreationProgress::SnapshotTheUpdateFiles);
|
|
let enqueued = self.queue.tasks.get_status(&rtxn, Status::Enqueued)?;
|
|
let (atomic, update_file_progress) = AtomicUpdateFileStep::new(enqueued.len() as u32);
|
|
progress.update_progress(update_file_progress);
|
|
for task_id in enqueued {
|
|
let task =
|
|
self.queue.tasks.get_task(&rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
|
|
if let Some(content_uuid) = task.content_uuid() {
|
|
let src = self.queue.file_store.update_path(content_uuid);
|
|
let dst = update_files_dir.join(content_uuid.to_string());
|
|
fs::copy(src, dst)?;
|
|
}
|
|
atomic.fetch_add(1, Ordering::Relaxed);
|
|
}
|
|
|
|
// 3. Snapshot every indexes
|
|
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexes);
|
|
let index_mapping = self.index_mapper.index_mapping;
|
|
let nb_indexes = index_mapping.len(&rtxn)? as u32;
|
|
|
|
for (i, result) in index_mapping.iter(&rtxn)?.enumerate() {
|
|
let (name, uuid) = result?;
|
|
progress.update_progress(VariableNameStep::<SnapshotCreationProgress>::new(
|
|
name, i as u32, nb_indexes,
|
|
));
|
|
let index = self.index_mapper.index(&rtxn, name)?;
|
|
let dst = temp_snapshot_dir.path().join("indexes").join(uuid.to_string());
|
|
fs::create_dir_all(&dst)?;
|
|
index
|
|
.copy_to_path(dst.join("data.mdb"), compaction_option)
|
|
.map_err(|e| Error::from_milli(e, Some(name.to_string())))?;
|
|
}
|
|
|
|
drop(rtxn);
|
|
|
|
// 4. Snapshot the auth LMDB env
|
|
progress.update_progress(SnapshotCreationProgress::SnapshotTheApiKeys);
|
|
let dst = temp_snapshot_dir.path().join("auth");
|
|
fs::create_dir_all(&dst)?;
|
|
self.scheduler.auth_env.copy_to_path(dst.join("data.mdb"), compaction_option)?;
|
|
|
|
// 5. Copy and tarball the flat snapshot
|
|
progress.update_progress(SnapshotCreationProgress::CreateTheTarball);
|
|
// 5.1 Find the original name of the database
|
|
// TODO find a better way to get this path
|
|
let mut base_path = self.env.path().to_owned();
|
|
base_path.pop();
|
|
let db_name = base_path.file_name().and_then(OsStr::to_str).unwrap_or("data.ms");
|
|
|
|
// 5.2 Tarball the content of the snapshot in a tempfile with a .snapshot extension
|
|
let snapshot_path = self.scheduler.snapshots_path.join(format!("{}.snapshot", db_name));
|
|
let temp_snapshot_file = tempfile::NamedTempFile::new_in(&self.scheduler.snapshots_path)?;
|
|
compression::to_tar_gz(temp_snapshot_dir.path(), temp_snapshot_file.path())?;
|
|
let file = temp_snapshot_file.persist(snapshot_path)?;
|
|
|
|
// 5.3 Change the permission to make the snapshot readonly
|
|
let mut permissions = file.metadata()?.permissions();
|
|
permissions.set_readonly(true);
|
|
#[cfg(unix)]
|
|
{
|
|
use std::os::unix::fs::PermissionsExt;
|
|
#[allow(clippy::non_octal_unix_permissions)]
|
|
// rwxrwxrwx
|
|
permissions.set_mode(0b100100100);
|
|
}
|
|
|
|
file.set_permissions(permissions)?;
|
|
|
|
for task in &mut tasks {
|
|
task.status = Status::Succeeded;
|
|
}
|
|
|
|
Ok(tasks)
|
|
}
|
|
|
|
#[cfg(unix)]
|
|
pub(super) async fn process_snapshot_to_s3(
|
|
&self,
|
|
progress: Progress,
|
|
opts: meilisearch_types::milli::update::S3SnapshotOptions,
|
|
mut tasks: Vec<Task>,
|
|
) -> Result<Vec<Task>> {
|
|
use meilisearch_types::milli::update::S3SnapshotOptions;
|
|
|
|
let S3SnapshotOptions {
|
|
s3_bucket_url,
|
|
s3_bucket_region,
|
|
s3_bucket_name,
|
|
s3_snapshot_prefix,
|
|
s3_access_key,
|
|
s3_secret_key,
|
|
s3_max_in_flight_parts,
|
|
s3_compression_level: level,
|
|
s3_signature_duration,
|
|
s3_multipart_part_size,
|
|
} = opts;
|
|
|
|
let must_stop_processing = self.scheduler.must_stop_processing.clone();
|
|
let retry_backoff = backoff::ExponentialBackoff::default();
|
|
let db_name = {
|
|
let mut base_path = self.env.path().to_owned();
|
|
base_path.pop();
|
|
base_path.file_name().and_then(OsStr::to_str).unwrap_or("data.ms").to_string()
|
|
};
|
|
|
|
let (reader, writer) = std::io::pipe()?;
|
|
let uploader_task = tokio::spawn(multipart_stream_to_s3(
|
|
s3_bucket_url,
|
|
s3_bucket_region,
|
|
s3_bucket_name,
|
|
s3_snapshot_prefix,
|
|
s3_access_key,
|
|
s3_secret_key,
|
|
s3_max_in_flight_parts,
|
|
s3_signature_duration,
|
|
s3_multipart_part_size,
|
|
must_stop_processing,
|
|
retry_backoff,
|
|
db_name,
|
|
reader,
|
|
));
|
|
|
|
let index_scheduler = IndexScheduler::private_clone(self);
|
|
let builder_task = tokio::task::spawn_blocking(move || {
|
|
stream_tarball_into_pipe(progress, level, writer, index_scheduler)
|
|
});
|
|
|
|
let (uploader_result, builder_result) = tokio::join!(uploader_task, builder_task);
|
|
|
|
// Check uploader result first to early return on task abortion.
|
|
// safety: JoinHandle can return an error if the task was aborted, cancelled, or panicked.
|
|
uploader_result.unwrap()?;
|
|
builder_result.unwrap()?;
|
|
|
|
for task in &mut tasks {
|
|
task.status = Status::Succeeded;
|
|
}
|
|
|
|
Ok(tasks)
|
|
}
|
|
}
|
|
|
|
/// Streams a tarball of the database content into a pipe.
|
|
#[cfg(unix)]
|
|
fn stream_tarball_into_pipe(
|
|
progress: Progress,
|
|
level: u32,
|
|
writer: std::io::PipeWriter,
|
|
index_scheduler: IndexScheduler,
|
|
) -> std::result::Result<(), Error> {
|
|
use std::io::Write as _;
|
|
use std::path::Path;
|
|
|
|
let writer = flate2::write::GzEncoder::new(writer, flate2::Compression::new(level));
|
|
let mut tarball = tar::Builder::new(writer);
|
|
|
|
// 1. Snapshot the version file
|
|
tarball
|
|
.append_path_with_name(&index_scheduler.scheduler.version_file_path, VERSION_FILE_NAME)?;
|
|
|
|
// 2. Snapshot the index scheduler LMDB env
|
|
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler);
|
|
let tasks_env_file = index_scheduler.env.try_clone_inner_file()?;
|
|
let path = Path::new("tasks").join("data.mdb");
|
|
append_file_to_tarball(&mut tarball, path, tasks_env_file)?;
|
|
|
|
// 2.3 Create a read transaction on the index-scheduler
|
|
let rtxn = index_scheduler.env.read_txn()?;
|
|
|
|
// 2.4 Create the update files directory
|
|
// And only copy the update files of the enqueued tasks
|
|
progress.update_progress(SnapshotCreationProgress::SnapshotTheUpdateFiles);
|
|
let enqueued = index_scheduler.queue.tasks.get_status(&rtxn, Status::Enqueued)?;
|
|
let (atomic, update_file_progress) = AtomicUpdateFileStep::new(enqueued.len() as u32);
|
|
progress.update_progress(update_file_progress);
|
|
|
|
// We create the update_files directory so that it
|
|
// always exists even if there are no update files
|
|
let update_files_dir = Path::new(UPDATE_FILES_DIR_NAME);
|
|
let src_update_files_dir = {
|
|
let mut path = index_scheduler.env.path().to_path_buf();
|
|
path.pop();
|
|
path.join(UPDATE_FILES_DIR_NAME)
|
|
};
|
|
tarball.append_dir(update_files_dir, src_update_files_dir)?;
|
|
|
|
for task_id in enqueued {
|
|
let task = index_scheduler
|
|
.queue
|
|
.tasks
|
|
.get_task(&rtxn, task_id)?
|
|
.ok_or(Error::CorruptedTaskQueue)?;
|
|
if let Some(content_uuid) = task.content_uuid() {
|
|
use std::fs::File;
|
|
|
|
let src = index_scheduler.queue.file_store.update_path(content_uuid);
|
|
let mut update_file = File::open(src)?;
|
|
let path = update_files_dir.join(content_uuid.to_string());
|
|
tarball.append_file(path, &mut update_file)?;
|
|
}
|
|
atomic.fetch_add(1, Ordering::Relaxed);
|
|
}
|
|
|
|
// 3. Snapshot every indexes
|
|
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexes);
|
|
let index_mapping = index_scheduler.index_mapper.index_mapping;
|
|
let nb_indexes = index_mapping.len(&rtxn)? as u32;
|
|
let indexes_dir = Path::new("indexes");
|
|
let indexes_references: Vec<_> = index_scheduler
|
|
.index_mapper
|
|
.index_mapping
|
|
.iter(&rtxn)?
|
|
.map(|res| res.map_err(Error::from).map(|(name, uuid)| (name.to_string(), uuid)))
|
|
.collect::<Result<_, Error>>()?;
|
|
|
|
// It's prettier to use a for loop instead of the IndexMapper::try_for_each_index
|
|
// method, especially when we need to access the UUID, local path and index number.
|
|
for (i, (name, uuid)) in indexes_references.into_iter().enumerate() {
|
|
progress.update_progress(VariableNameStep::<SnapshotCreationProgress>::new(
|
|
&name, i as u32, nb_indexes,
|
|
));
|
|
let path = indexes_dir.join(uuid.to_string()).join("data.mdb");
|
|
let index = index_scheduler.index_mapper.index(&rtxn, &name)?;
|
|
let index_file = index.try_clone_inner_file()?;
|
|
tracing::trace!("Appending index file for {name} in {}", path.display());
|
|
append_file_to_tarball(&mut tarball, path, index_file)?;
|
|
}
|
|
|
|
drop(rtxn);
|
|
|
|
// 4. Snapshot the auth LMDB env
|
|
progress.update_progress(SnapshotCreationProgress::SnapshotTheApiKeys);
|
|
let auth_env_file = index_scheduler.scheduler.auth_env.try_clone_inner_file()?;
|
|
let path = Path::new("auth").join("data.mdb");
|
|
append_file_to_tarball(&mut tarball, path, auth_env_file)?;
|
|
|
|
let mut gzencoder = tarball.into_inner()?;
|
|
gzencoder.flush()?;
|
|
gzencoder.try_finish()?;
|
|
let mut writer = gzencoder.finish()?;
|
|
writer.flush()?;
|
|
|
|
Result::<_, Error>::Ok(())
|
|
}
|
|
|
|
#[cfg(unix)]
|
|
fn append_file_to_tarball<W, P>(
|
|
tarball: &mut tar::Builder<W>,
|
|
path: P,
|
|
mut auth_env_file: fs::File,
|
|
) -> Result<(), Error>
|
|
where
|
|
W: std::io::Write,
|
|
P: AsRef<std::path::Path>,
|
|
{
|
|
use std::io::{Seek as _, SeekFrom};
|
|
|
|
// Note: A previous snapshot operation may have left the cursor
|
|
// at the end of the file so we need to seek to the start.
|
|
auth_env_file.seek(SeekFrom::Start(0))?;
|
|
tarball.append_file(path, &mut auth_env_file)?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Streams the content read from the given reader to S3.
|
|
#[cfg(unix)]
|
|
#[allow(clippy::too_many_arguments)]
|
|
async fn multipart_stream_to_s3(
|
|
s3_bucket_url: String,
|
|
s3_bucket_region: String,
|
|
s3_bucket_name: String,
|
|
s3_snapshot_prefix: String,
|
|
s3_access_key: String,
|
|
s3_secret_key: String,
|
|
s3_max_in_flight_parts: std::num::NonZero<usize>,
|
|
s3_signature_duration: std::time::Duration,
|
|
s3_multipart_part_size: u64,
|
|
must_stop_processing: super::MustStopProcessing,
|
|
retry_backoff: backoff::exponential::ExponentialBackoff<backoff::SystemClock>,
|
|
db_name: String,
|
|
reader: std::io::PipeReader,
|
|
) -> Result<(), Error> {
|
|
use std::collections::VecDeque;
|
|
use std::io;
|
|
use std::os::fd::OwnedFd;
|
|
use std::path::PathBuf;
|
|
|
|
use bytes::{Bytes, BytesMut};
|
|
use reqwest::{Client, Response};
|
|
use rusty_s3::actions::CreateMultipartUpload;
|
|
use rusty_s3::{Bucket, BucketError, Credentials, S3Action as _, UrlStyle};
|
|
use tokio::task::JoinHandle;
|
|
|
|
let reader = OwnedFd::from(reader);
|
|
let reader = tokio::net::unix::pipe::Receiver::from_owned_fd(reader)?;
|
|
let s3_snapshot_prefix = PathBuf::from(s3_snapshot_prefix);
|
|
let url =
|
|
s3_bucket_url.parse().map_err(BucketError::ParseError).map_err(Error::S3BucketError)?;
|
|
let bucket = Bucket::new(url, UrlStyle::Path, s3_bucket_name, s3_bucket_region)
|
|
.map_err(Error::S3BucketError)?;
|
|
let credential = Credentials::new(s3_access_key, s3_secret_key);
|
|
|
|
// Note for the future (rust 1.91+): use with_added_extension, it's prettier
|
|
let object_path = s3_snapshot_prefix.join(format!("{db_name}.snapshot"));
|
|
// Note: It doesn't work on Windows and if a port to this platform is needed,
|
|
// use the slash-path crate or similar to get the correct path separator.
|
|
let object = object_path.display().to_string();
|
|
|
|
let action = bucket.create_multipart_upload(Some(&credential), &object);
|
|
let url = action.sign(s3_signature_duration);
|
|
|
|
let client = Client::new();
|
|
let resp = client.post(url).send().await.map_err(Error::S3HttpError)?;
|
|
let status = resp.status();
|
|
|
|
let body = match resp.error_for_status_ref() {
|
|
Ok(_) => resp.text().await.map_err(Error::S3HttpError)?,
|
|
Err(_) => {
|
|
return Err(Error::S3Error { status, body: resp.text().await.unwrap_or_default() })
|
|
}
|
|
};
|
|
|
|
let multipart =
|
|
CreateMultipartUpload::parse_response(&body).map_err(|e| Error::S3XmlError(Box::new(e)))?;
|
|
tracing::debug!("Starting the upload of the snapshot to {object}");
|
|
|
|
// We use this bumpalo for etags strings.
|
|
let bump = bumpalo::Bump::new();
|
|
let mut etags = Vec::<&str>::new();
|
|
let mut in_flight = VecDeque::<(JoinHandle<reqwest::Result<Response>>, Bytes)>::with_capacity(
|
|
s3_max_in_flight_parts.get(),
|
|
);
|
|
|
|
// Part numbers start at 1 and cannot be larger than 10k
|
|
for part_number in 1u16.. {
|
|
if must_stop_processing.get() {
|
|
return Err(Error::AbortedTask);
|
|
}
|
|
|
|
let part_upload =
|
|
bucket.upload_part(Some(&credential), &object, part_number, multipart.upload_id());
|
|
let url = part_upload.sign(s3_signature_duration);
|
|
|
|
// Wait for a buffer to be ready if there are in-flight parts that landed
|
|
let mut buffer = if in_flight.len() >= s3_max_in_flight_parts.get() {
|
|
let (handle, buffer) = in_flight.pop_front().expect("At least one in flight request");
|
|
let resp = join_and_map_error(handle).await?;
|
|
extract_and_append_etag(&bump, &mut etags, resp.headers())?;
|
|
|
|
let mut buffer = match buffer.try_into_mut() {
|
|
Ok(buffer) => buffer,
|
|
Err(_) => unreachable!("All bytes references were consumed in the task"),
|
|
};
|
|
buffer.clear();
|
|
buffer
|
|
} else {
|
|
BytesMut::with_capacity(s3_multipart_part_size as usize)
|
|
};
|
|
|
|
// If we successfully read enough bytes,
|
|
// we can continue and send the buffer/part
|
|
while buffer.len() < (s3_multipart_part_size as usize / 2) {
|
|
// Wait for the pipe to be readable
|
|
|
|
reader.readable().await?;
|
|
|
|
match reader.try_read_buf(&mut buffer) {
|
|
Ok(0) => break,
|
|
// We read some bytes but maybe not enough
|
|
Ok(_) => continue,
|
|
// The readiness event is a false positive.
|
|
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
|
|
Err(e) => return Err(e.into()),
|
|
}
|
|
}
|
|
|
|
if buffer.is_empty() {
|
|
// Break the loop if the buffer is
|
|
// empty after we tried to read bytes
|
|
break;
|
|
}
|
|
|
|
let body = buffer.freeze();
|
|
tracing::trace!("Sending part {part_number}");
|
|
let task = tokio::spawn({
|
|
let client = client.clone();
|
|
let body = body.clone();
|
|
backoff::future::retry(retry_backoff.clone(), move || {
|
|
let client = client.clone();
|
|
let url = url.clone();
|
|
let body = body.clone();
|
|
async move {
|
|
match client.put(url).body(body).send().await {
|
|
Ok(resp) if resp.status().is_client_error() => {
|
|
resp.error_for_status().map_err(backoff::Error::Permanent)
|
|
}
|
|
Ok(resp) => Ok(resp),
|
|
Err(e) => Err(backoff::Error::transient(e)),
|
|
}
|
|
}
|
|
})
|
|
});
|
|
in_flight.push_back((task, body));
|
|
}
|
|
|
|
for (handle, _buffer) in in_flight {
|
|
let resp = join_and_map_error(handle).await?;
|
|
extract_and_append_etag(&bump, &mut etags, resp.headers())?;
|
|
}
|
|
|
|
tracing::debug!("Finalizing the multipart upload");
|
|
|
|
let action = bucket.complete_multipart_upload(
|
|
Some(&credential),
|
|
&object,
|
|
multipart.upload_id(),
|
|
etags.iter().map(AsRef::as_ref),
|
|
);
|
|
let url = action.sign(s3_signature_duration);
|
|
let body = action.body();
|
|
let resp = backoff::future::retry(retry_backoff, move || {
|
|
let client = client.clone();
|
|
let url = url.clone();
|
|
let body = body.clone();
|
|
async move {
|
|
match client.post(url).body(body).send().await {
|
|
Ok(resp) if resp.status().is_client_error() => {
|
|
Err(backoff::Error::Permanent(Error::S3Error {
|
|
status: resp.status(),
|
|
body: resp.text().await.unwrap_or_default(),
|
|
}))
|
|
}
|
|
Ok(resp) => Ok(resp),
|
|
Err(e) => Err(backoff::Error::transient(Error::S3HttpError(e))),
|
|
}
|
|
}
|
|
})
|
|
.await?;
|
|
|
|
let status = resp.status();
|
|
let body = resp.text().await.map_err(|e| Error::S3Error { status, body: e.to_string() })?;
|
|
if status.is_success() {
|
|
Ok(())
|
|
} else {
|
|
Err(Error::S3Error { status, body })
|
|
}
|
|
}
|
|
|
|
#[cfg(unix)]
|
|
async fn join_and_map_error(
|
|
join_handle: tokio::task::JoinHandle<Result<reqwest::Response, reqwest::Error>>,
|
|
) -> Result<reqwest::Response> {
|
|
// safety: Panic happens if the task (JoinHandle) was aborted, cancelled, or panicked
|
|
let request = join_handle.await.unwrap();
|
|
let resp = request.map_err(Error::S3HttpError)?;
|
|
match resp.error_for_status_ref() {
|
|
Ok(_) => Ok(resp),
|
|
Err(_) => Err(Error::S3Error {
|
|
status: resp.status(),
|
|
body: resp.text().await.unwrap_or_default(),
|
|
}),
|
|
}
|
|
}
|
|
|
|
#[cfg(unix)]
|
|
fn extract_and_append_etag<'b>(
|
|
bump: &'b bumpalo::Bump,
|
|
etags: &mut Vec<&'b str>,
|
|
headers: &reqwest::header::HeaderMap,
|
|
) -> Result<()> {
|
|
use reqwest::header::ETAG;
|
|
|
|
let etag = headers.get(ETAG).ok_or_else(|| Error::S3XmlError("Missing ETag header".into()))?;
|
|
let etag = etag.to_str().map_err(|e| Error::S3XmlError(Box::new(e)))?;
|
|
etags.push(bump.alloc_str(etag));
|
|
|
|
Ok(())
|
|
}
|