Extract more logic into dedicated functions

This commit is contained in:
Kerollmops
2025-11-06 11:53:22 +01:00
parent a5c0a282c5
commit 1b74709b91

View File

@@ -423,7 +423,6 @@ async fn multipart_stream_to_s3(
use std::{collections::VecDeque, os::fd::OwnedFd, path::PathBuf};
use bytes::{Bytes, BytesMut};
use reqwest::header::ETAG;
use reqwest::{Client, Response};
use rusty_s3::S3Action as _;
use rusty_s3::{actions::CreateMultipartUpload, Bucket, BucketError, Credentials, UrlStyle};
@@ -481,25 +480,14 @@ async fn multipart_stream_to_s3(
// Wait for a buffer to be ready if there are in-flight parts that landed
let mut buffer = if in_flight.len() >= s3_max_in_flight_parts.get() {
let (request, buffer) = in_flight.pop_front().expect("At least one in flight request");
// safety: Panic happens if the task (JoinHandle) was aborted, cancelled, or panicked
let resp = request.await.unwrap().map_err(Error::S3HttpError)?;
let resp = match resp.error_for_status_ref() {
Ok(response) => response,
Err(_) => {
return Err(Error::S3Error {
status: resp.status(),
body: resp.text().await.unwrap_or_default(),
});
}
};
let etag = resp.headers().get(ETAG).expect("every UploadPart request returns an Etag");
let (handle, buffer) = in_flight.pop_front().expect("At least one in flight request");
let resp = join_and_map_error(handle).await?;
extract_and_append_etag(&bump, &mut etags, resp.headers())?;
let mut buffer = match buffer.try_into_mut() {
Ok(buffer) => buffer,
Err(_) => unreachable!("All bytes references were consumed in the task"),
};
let etag = etag.to_str().expect("etag is a valid string");
etags.push(bump.alloc_str(etag));
buffer.clear();
buffer
} else {
@@ -553,21 +541,9 @@ async fn multipart_stream_to_s3(
in_flight.push_back((task, body));
}
for (join_handle, _buffer) in in_flight {
// safety: Panic happens if the task (JoinHandle) was aborted, cancelled, or panicked
let resp = join_handle.await.unwrap().map_err(Error::S3HttpError)?;
let resp = match resp.error_for_status_ref() {
Ok(response) => response,
Err(_) => {
return Err(Error::S3Error {
status: resp.status(),
body: resp.text().await.unwrap_or_default(),
})
}
};
let etag = resp.headers().get(ETAG).expect("every UploadPart request returns an Etag");
let etag = etag.to_str().expect("etag is a valid string");
etags.push(bump.alloc_str(etag));
for (handle, _buffer) in in_flight {
let resp = join_and_map_error(handle).await?;
extract_and_append_etag(&bump, &mut etags, resp.headers())?;
}
tracing::debug!("Finalizing the multipart upload");
@@ -605,3 +581,34 @@ async fn multipart_stream_to_s3(
Err(Error::S3Error { status, body })
}
}
#[cfg(unix)]
async fn join_and_map_error(
join_handle: tokio::task::JoinHandle<Result<reqwest::Response, reqwest::Error>>,
) -> Result<reqwest::Response> {
// safety: Panic happens if the task (JoinHandle) was aborted, cancelled, or panicked
let request = join_handle.await.unwrap();
let resp = request.map_err(Error::S3HttpError)?;
match resp.error_for_status_ref() {
Ok(_) => Ok(resp),
Err(_) => Err(Error::S3Error {
status: resp.status(),
body: resp.text().await.unwrap_or_default(),
}),
}
}
#[cfg(unix)]
fn extract_and_append_etag<'b>(
bump: &'b bumpalo::Bump,
etags: &mut Vec<&'b str>,
headers: &reqwest::header::HeaderMap,
) -> Result<()> {
use reqwest::header::ETAG;
let etag = headers.get(ETAG).ok_or_else(|| Error::S3XmlError("Missing ETag header".into()))?;
let etag = etag.to_str().map_err(|e| Error::S3XmlError(Box::new(e)))?;
etags.push(bump.alloc_str(etag));
Ok(())
}