Introduce some backoff retries

This commit is contained in:
Kerollmops
2025-11-04 17:46:01 +01:00
parent 8967e40083
commit cbc0878158

View File

@@ -268,10 +268,10 @@ impl IndexScheduler {
s3_multipart_part_size, s3_multipart_part_size,
} = opts; } = opts;
// TODO implement exponential backoff on upload requests: https://docs.rs/backoff
// TODO return a result with actual errors
let (reader, writer) = std::io::pipe()?;
let must_stop_processing = self.scheduler.must_stop_processing.clone(); let must_stop_processing = self.scheduler.must_stop_processing.clone();
let retry_backoff = backoff::ExponentialBackoff::default();
let (reader, writer) = std::io::pipe()?;
let uploader_task = tokio::spawn(async move { let uploader_task = tokio::spawn(async move {
let reader = OwnedFd::from(reader); let reader = OwnedFd::from(reader);
let reader = tokio::net::unix::pipe::Receiver::from_owned_fd(reader)?; let reader = tokio::net::unix::pipe::Receiver::from_owned_fd(reader)?;
@@ -379,7 +379,24 @@ impl IndexScheduler {
let body = buffer.freeze(); let body = buffer.freeze();
tracing::trace!("Sending part {part_number}"); tracing::trace!("Sending part {part_number}");
let task = tokio::spawn(client.put(url).body(body.clone()).send()); let task = tokio::spawn({
let client = client.clone();
let body = body.clone();
backoff::future::retry(retry_backoff.clone(), move || {
let client = client.clone();
let url = url.clone();
let body = body.clone();
async move {
match client.put(url).body(body).send().await {
Ok(resp) if resp.status().is_client_error() => {
resp.error_for_status().map_err(backoff::Error::Permanent)
}
Ok(resp) => Ok(resp),
Err(e) => Err(backoff::Error::transient(e)),
}
}
})
});
in_flight.push_back((task, body)); in_flight.push_back((task, body));
} }
@@ -408,8 +425,24 @@ impl IndexScheduler {
etags.iter().map(AsRef::as_ref), etags.iter().map(AsRef::as_ref),
); );
let url = action.sign(s3_signature_duration); let url = action.sign(s3_signature_duration);
let resp = let body = action.body();
client.post(url).body(action.body()).send().await.map_err(Error::S3HttpError)?; let resp = backoff::future::retry(retry_backoff, move || {
let client = client.clone();
let url = url.clone();
let body = body.clone();
async move {
match client.post(url).body(body).send().await {
Ok(resp) if resp.status().is_client_error() => {
resp.error_for_status().map_err(backoff::Error::Permanent)
}
Ok(resp) => Ok(resp),
Err(e) => Err(backoff::Error::transient(e)),
}
}
})
.await
.map_err(Error::S3HttpError)?;
let status = resp.status(); let status = resp.status();
let body = let body =
resp.text().await.map_err(|e| Error::S3Error { status, body: e.to_string() })?; resp.text().await.map_err(|e| Error::S3Error { status, body: e.to_string() })?;