diff --git a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs index 7a3c6ecbc..c01d49625 100644 --- a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs +++ b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs @@ -438,6 +438,7 @@ async fn multipart_stream_to_s3( db_name: String, reader: std::io::PipeReader, ) -> Result<(), Error> { + use std::io; use std::{os::fd::OwnedFd, path::PathBuf}; use bytes::BytesMut; @@ -527,8 +528,6 @@ async fn multipart_stream_to_s3( // we can continue and send the buffer/part while buffer.len() < (s3_multipart_part_size as usize / 2) { // Wait for the pipe to be readable - - use std::io; reader.readable().await?; match reader.try_read_buf(&mut buffer) { @@ -604,16 +603,19 @@ async fn multipart_stream_to_s3( let body = body.clone(); async move { match client.post(url).body(body).send().await { - Ok(resp) if resp.status().is_client_error() => { - resp.error_for_status().map_err(backoff::Error::Permanent) - } + Ok(resp) if resp.status().is_client_error() => match resp.error_for_status_ref() { + Ok(_) => Ok(resp), + Err(_) => Err(backoff::Error::Permanent(Error::S3Error { + status: resp.status(), + body: resp.text().await.unwrap_or_default(), + })), + }, Ok(resp) => Ok(resp), - Err(e) => Err(backoff::Error::transient(e)), + Err(e) => Err(backoff::Error::transient(Error::S3HttpError(e))), } } }) - .await - .map_err(Error::S3HttpError)?; + .await?; let status = resp.status(); let body = resp.text().await.map_err(|e| Error::S3Error { status, body: e.to_string() })?;