Replace the hand-made VecDequeue by a FutureUnordered

This commit is contained in:
Kerollmops
2025-11-11 17:26:46 +01:00
parent 89006fd4b3
commit 15321ce924
3 changed files with 39 additions and 29 deletions

1
Cargo.lock generated
View File

@@ -3212,6 +3212,7 @@ dependencies = [
"enum-iterator",
"file-store",
"flate2",
"futures",
"indexmap",
"insta",
"maplit",

View File

@@ -46,10 +46,11 @@ time = { version = "0.3.41", features = [
tracing = "0.1.41"
ureq = "2.12.1"
uuid = { version = "1.17.0", features = ["serde", "v4"] }
backoff = "0.4.0"
backoff = { version = "0.4.0", features = ["tokio"] }
reqwest = { version = "0.12.23", features = ["rustls-tls", "http2"], default-features = false }
rusty-s3 = "0.8.1"
tokio = { version = "1.47.1", features = ["full"] }
futures = "0.3.31"
[dev-dependencies]
big_s = "1.0.2"

View File

@@ -438,13 +438,13 @@ async fn multipart_stream_to_s3(
db_name: String,
reader: std::io::PipeReader,
) -> Result<(), Error> {
use std::{collections::VecDeque, os::fd::OwnedFd, path::PathBuf};
use std::{os::fd::OwnedFd, path::PathBuf};
use bytes::{Bytes, BytesMut};
use reqwest::{Client, Response};
use bytes::BytesMut;
use futures::stream::{FuturesUnordered, StreamExt};
use reqwest::Client;
use rusty_s3::S3Action as _;
use rusty_s3::{actions::CreateMultipartUpload, Bucket, BucketError, Credentials, UrlStyle};
use tokio::task::JoinHandle;
let reader = OwnedFd::from(reader);
let reader = tokio::net::unix::pipe::Receiver::from_owned_fd(reader)?;
@@ -482,9 +482,7 @@ async fn multipart_stream_to_s3(
// We use this bumpalo for etags strings.
let bump = bumpalo::Bump::new();
let mut etags = Vec::<&str>::new();
let mut in_flight = VecDeque::<(JoinHandle<reqwest::Result<Response>>, Bytes)>::with_capacity(
s3_max_in_flight_parts.get(),
);
let mut in_flight = FuturesUnordered::new();
// Part numbers start at 1 and cannot be larger than 10k
for part_number in 1u16.. {
@@ -498,8 +496,21 @@ async fn multipart_stream_to_s3(
// Wait for a buffer to be ready if there are in-flight parts that landed
let mut buffer = if in_flight.len() >= s3_max_in_flight_parts.get() {
let (handle, buffer) = in_flight.pop_front().expect("At least one in flight request");
let resp = join_and_map_error(handle).await?;
let (join_result, buffer): (
Result<reqwest::Result<reqwest::Response>, tokio::task::JoinError>,
bytes::Bytes,
) = in_flight.next().await.expect("At least one in flight request");
// safety: Panic happens if the task (JoinHandle) was aborted, cancelled, or panicked
let resp = join_result.unwrap().map_err(Error::S3HttpError)?;
let resp = match resp.error_for_status_ref() {
Ok(_) => resp,
Err(_) => {
return Err(Error::S3Error {
status: resp.status(),
body: resp.text().await.unwrap_or_default(),
})
}
};
extract_and_append_etag(&bump, &mut etags, resp.headers())?;
let mut buffer = match buffer.try_into_mut() {
@@ -556,11 +567,24 @@ async fn multipart_stream_to_s3(
}
})
});
in_flight.push_back((task, body));
// Wrap the task to return both the result and the buffer
let task_with_buffer = async move { (task.await, body) };
in_flight.push(task_with_buffer);
}
for (handle, _buffer) in in_flight {
let resp = join_and_map_error(handle).await?;
while let Some((join_result, _buffer)) = in_flight.next().await {
// safety: Panic happens if the task (JoinHandle) was aborted, cancelled, or panicked
let resp = join_result.unwrap().map_err(Error::S3HttpError)?;
let resp = match resp.error_for_status_ref() {
Ok(_) => resp,
Err(_) => {
return Err(Error::S3Error {
status: resp.status(),
body: resp.text().await.unwrap_or_default(),
})
}
};
extract_and_append_etag(&bump, &mut etags, resp.headers())?;
}
@@ -600,22 +624,6 @@ async fn multipart_stream_to_s3(
}
}
#[cfg(unix)]
async fn join_and_map_error(
join_handle: tokio::task::JoinHandle<Result<reqwest::Response, reqwest::Error>>,
) -> Result<reqwest::Response> {
// safety: Panic happens if the task (JoinHandle) was aborted, cancelled, or panicked
let request = join_handle.await.unwrap();
let resp = request.map_err(Error::S3HttpError)?;
match resp.error_for_status_ref() {
Ok(_) => Ok(resp),
Err(_) => Err(Error::S3Error {
status: resp.status(),
body: resp.text().await.unwrap_or_default(),
}),
}
}
#[cfg(unix)]
fn extract_and_append_etag<'b>(
bump: &'b bumpalo::Bump,