Introduce some backoff retries

This commit is contained in:
Kerollmops
2025-11-04 17:46:01 +01:00
parent dbb5abebb6
commit 3ef1afc0f1

View File

@@ -268,10 +268,10 @@ impl IndexScheduler {
s3_multipart_part_size,
} = opts;
// TODO implement exponential backoff on upload requests: https://docs.rs/backoff
// TODO return a result with actual errors
let (reader, writer) = std::io::pipe()?;
let must_stop_processing = self.scheduler.must_stop_processing.clone();
let retry_backoff = backoff::ExponentialBackoff::default();
let (reader, writer) = std::io::pipe()?;
let uploader_task = tokio::spawn(async move {
let reader = OwnedFd::from(reader);
let reader = tokio::net::unix::pipe::Receiver::from_owned_fd(reader)?;
@@ -379,7 +379,24 @@ impl IndexScheduler {
let body = buffer.freeze();
tracing::trace!("Sending part {part_number}");
let task = tokio::spawn(client.put(url).body(body.clone()).send());
let task = tokio::spawn({
let client = client.clone();
let body = body.clone();
backoff::future::retry(retry_backoff.clone(), move || {
let client = client.clone();
let url = url.clone();
let body = body.clone();
async move {
match client.put(url).body(body).send().await {
Ok(resp) if resp.status().is_client_error() => {
resp.error_for_status().map_err(backoff::Error::Permanent)
}
Ok(resp) => Ok(resp),
Err(e) => Err(backoff::Error::transient(e)),
}
}
})
});
in_flight.push_back((task, body));
}
@@ -408,8 +425,24 @@ impl IndexScheduler {
etags.iter().map(AsRef::as_ref),
);
let url = action.sign(s3_signature_duration);
let resp =
client.post(url).body(action.body()).send().await.map_err(Error::S3HttpError)?;
let body = action.body();
let resp = backoff::future::retry(retry_backoff, move || {
let client = client.clone();
let url = url.clone();
let body = body.clone();
async move {
match client.post(url).body(body).send().await {
Ok(resp) if resp.status().is_client_error() => {
resp.error_for_status().map_err(backoff::Error::Permanent)
}
Ok(resp) => Ok(resp),
Err(e) => Err(backoff::Error::transient(e)),
}
}
})
.await
.map_err(Error::S3HttpError)?;
let status = resp.status();
let body =
resp.text().await.map_err(|e| Error::S3Error { status, body: e.to_string() })?;