From 15321ce924977f5b5c6b23332ee38e0b55e6e3c5 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Tue, 11 Nov 2025 17:26:46 +0100 Subject: [PATCH] Replace the hand-made VecDequeue by a FutureUnordered --- Cargo.lock | 1 + crates/index-scheduler/Cargo.toml | 3 +- .../scheduler/process_snapshot_creation.rs | 64 +++++++++++-------- 3 files changed, 39 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 72bd50014..75a3ff39d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3212,6 +3212,7 @@ dependencies = [ "enum-iterator", "file-store", "flate2", + "futures", "indexmap", "insta", "maplit", diff --git a/crates/index-scheduler/Cargo.toml b/crates/index-scheduler/Cargo.toml index fb8b6ff7b..5c81a8d55 100644 --- a/crates/index-scheduler/Cargo.toml +++ b/crates/index-scheduler/Cargo.toml @@ -46,10 +46,11 @@ time = { version = "0.3.41", features = [ tracing = "0.1.41" ureq = "2.12.1" uuid = { version = "1.17.0", features = ["serde", "v4"] } -backoff = "0.4.0" +backoff = { version = "0.4.0", features = ["tokio"] } reqwest = { version = "0.12.23", features = ["rustls-tls", "http2"], default-features = false } rusty-s3 = "0.8.1" tokio = { version = "1.47.1", features = ["full"] } +futures = "0.3.31" [dev-dependencies] big_s = "1.0.2" diff --git a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs index 6297ab6a4..7a3c6ecbc 100644 --- a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs +++ b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs @@ -438,13 +438,13 @@ async fn multipart_stream_to_s3( db_name: String, reader: std::io::PipeReader, ) -> Result<(), Error> { - use std::{collections::VecDeque, os::fd::OwnedFd, path::PathBuf}; + use std::{os::fd::OwnedFd, path::PathBuf}; - use bytes::{Bytes, BytesMut}; - use reqwest::{Client, Response}; + use bytes::BytesMut; + use futures::stream::{FuturesUnordered, StreamExt}; + use reqwest::Client; use rusty_s3::S3Action as _; use rusty_s3::{actions::CreateMultipartUpload, Bucket, BucketError, Credentials, UrlStyle}; - use tokio::task::JoinHandle; let reader = OwnedFd::from(reader); let reader = tokio::net::unix::pipe::Receiver::from_owned_fd(reader)?; @@ -482,9 +482,7 @@ async fn multipart_stream_to_s3( // We use this bumpalo for etags strings. let bump = bumpalo::Bump::new(); let mut etags = Vec::<&str>::new(); - let mut in_flight = VecDeque::<(JoinHandle>, Bytes)>::with_capacity( - s3_max_in_flight_parts.get(), - ); + let mut in_flight = FuturesUnordered::new(); // Part numbers start at 1 and cannot be larger than 10k for part_number in 1u16.. { @@ -498,8 +496,21 @@ async fn multipart_stream_to_s3( // Wait for a buffer to be ready if there are in-flight parts that landed let mut buffer = if in_flight.len() >= s3_max_in_flight_parts.get() { - let (handle, buffer) = in_flight.pop_front().expect("At least one in flight request"); - let resp = join_and_map_error(handle).await?; + let (join_result, buffer): ( + Result, tokio::task::JoinError>, + bytes::Bytes, + ) = in_flight.next().await.expect("At least one in flight request"); + // safety: Panic happens if the task (JoinHandle) was aborted, cancelled, or panicked + let resp = join_result.unwrap().map_err(Error::S3HttpError)?; + let resp = match resp.error_for_status_ref() { + Ok(_) => resp, + Err(_) => { + return Err(Error::S3Error { + status: resp.status(), + body: resp.text().await.unwrap_or_default(), + }) + } + }; extract_and_append_etag(&bump, &mut etags, resp.headers())?; let mut buffer = match buffer.try_into_mut() { @@ -556,11 +567,24 @@ async fn multipart_stream_to_s3( } }) }); - in_flight.push_back((task, body)); + + // Wrap the task to return both the result and the buffer + let task_with_buffer = async move { (task.await, body) }; + in_flight.push(task_with_buffer); } - for (handle, _buffer) in in_flight { - let resp = join_and_map_error(handle).await?; + while let Some((join_result, _buffer)) = in_flight.next().await { + // safety: Panic happens if the task (JoinHandle) was aborted, cancelled, or panicked + let resp = join_result.unwrap().map_err(Error::S3HttpError)?; + let resp = match resp.error_for_status_ref() { + Ok(_) => resp, + Err(_) => { + return Err(Error::S3Error { + status: resp.status(), + body: resp.text().await.unwrap_or_default(), + }) + } + }; extract_and_append_etag(&bump, &mut etags, resp.headers())?; } @@ -600,22 +624,6 @@ async fn multipart_stream_to_s3( } } -#[cfg(unix)] -async fn join_and_map_error( - join_handle: tokio::task::JoinHandle>, -) -> Result { - // safety: Panic happens if the task (JoinHandle) was aborted, cancelled, or panicked - let request = join_handle.await.unwrap(); - let resp = request.map_err(Error::S3HttpError)?; - match resp.error_for_status_ref() { - Ok(_) => Ok(resp), - Err(_) => Err(Error::S3Error { - status: resp.status(), - body: resp.text().await.unwrap_or_default(), - }), - } -} - #[cfg(unix)] fn extract_and_append_etag<'b>( bump: &'b bumpalo::Bump,