diff --git a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs index c8b1acfda..236f3a48b 100644 --- a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs +++ b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs @@ -268,10 +268,10 @@ impl IndexScheduler { s3_multipart_part_size, } = opts; - // TODO implement exponential backoff on upload requests: https://docs.rs/backoff - // TODO return a result with actual errors - let (reader, writer) = std::io::pipe()?; let must_stop_processing = self.scheduler.must_stop_processing.clone(); + let retry_backoff = backoff::ExponentialBackoff::default(); + + let (reader, writer) = std::io::pipe()?; let uploader_task = tokio::spawn(async move { let reader = OwnedFd::from(reader); let reader = tokio::net::unix::pipe::Receiver::from_owned_fd(reader)?; @@ -379,7 +379,24 @@ impl IndexScheduler { let body = buffer.freeze(); tracing::trace!("Sending part {part_number}"); - let task = tokio::spawn(client.put(url).body(body.clone()).send()); + let task = tokio::spawn({ + let client = client.clone(); + let body = body.clone(); + backoff::future::retry(retry_backoff.clone(), move || { + let client = client.clone(); + let url = url.clone(); + let body = body.clone(); + async move { + match client.put(url).body(body).send().await { + Ok(resp) if resp.status().is_client_error() => { + resp.error_for_status().map_err(backoff::Error::Permanent) + } + Ok(resp) => Ok(resp), + Err(e) => Err(backoff::Error::transient(e)), + } + } + }) + }); in_flight.push_back((task, body)); } @@ -408,8 +425,24 @@ impl IndexScheduler { etags.iter().map(AsRef::as_ref), ); let url = action.sign(s3_signature_duration); - let resp = - client.post(url).body(action.body()).send().await.map_err(Error::S3HttpError)?; + let body = action.body(); + let resp = backoff::future::retry(retry_backoff, move || { + let client = client.clone(); + let url = url.clone(); + let body = body.clone(); + async move { + match client.post(url).body(body).send().await { + Ok(resp) if resp.status().is_client_error() => { + resp.error_for_status().map_err(backoff::Error::Permanent) + } + Ok(resp) => Ok(resp), + Err(e) => Err(backoff::Error::transient(e)), + } + } + }) + .await + .map_err(Error::S3HttpError)?; + let status = resp.status(); let body = resp.text().await.map_err(|e| Error::S3Error { status, body: e.to_string() })?;