Merge pull request #5994 from meilisearch/improve-s3-error-messages

Improve S3 upload by showing errors in the task queue
This commit is contained in:
Clément Renault
2025-11-19 16:58:02 +00:00
committed by GitHub
3 changed files with 19 additions and 16 deletions

View File

@@ -438,12 +438,15 @@ async fn multipart_stream_to_s3(
db_name: String, db_name: String,
reader: std::io::PipeReader, reader: std::io::PipeReader,
) -> Result<(), Error> { ) -> Result<(), Error> {
use std::{collections::VecDeque, os::fd::OwnedFd, path::PathBuf}; use std::collections::VecDeque;
use std::io;
use std::os::fd::OwnedFd;
use std::path::PathBuf;
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use reqwest::{Client, Response}; use reqwest::{Client, Response};
use rusty_s3::S3Action as _; use rusty_s3::actions::CreateMultipartUpload;
use rusty_s3::{actions::CreateMultipartUpload, Bucket, BucketError, Credentials, UrlStyle}; use rusty_s3::{Bucket, BucketError, Credentials, S3Action as _, UrlStyle};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
let reader = OwnedFd::from(reader); let reader = OwnedFd::from(reader);
@@ -517,7 +520,6 @@ async fn multipart_stream_to_s3(
while buffer.len() < (s3_multipart_part_size as usize / 2) { while buffer.len() < (s3_multipart_part_size as usize / 2) {
// Wait for the pipe to be readable // Wait for the pipe to be readable
use std::io;
reader.readable().await?; reader.readable().await?;
match reader.try_read_buf(&mut buffer) { match reader.try_read_buf(&mut buffer) {
@@ -581,15 +583,17 @@ async fn multipart_stream_to_s3(
async move { async move {
match client.post(url).body(body).send().await { match client.post(url).body(body).send().await {
Ok(resp) if resp.status().is_client_error() => { Ok(resp) if resp.status().is_client_error() => {
resp.error_for_status().map_err(backoff::Error::Permanent) Err(backoff::Error::Permanent(Error::S3Error {
status: resp.status(),
body: resp.text().await.unwrap_or_default(),
}))
} }
Ok(resp) => Ok(resp), Ok(resp) => Ok(resp),
Err(e) => Err(backoff::Error::transient(e)), Err(e) => Err(backoff::Error::transient(Error::S3HttpError(e))),
} }
} }
}) })
.await .await?;
.map_err(Error::S3HttpError)?;
let status = resp.status(); let status = resp.status();
let body = resp.text().await.map_err(|e| Error::S3Error { status, body: e.to_string() })?; let body = resp.text().await.map_err(|e| Error::S3Error { status, body: e.to_string() })?;

View File

@@ -1,14 +1,14 @@
use crate::search::{Personalize, SearchResult}; use std::time::Duration;
use meilisearch_types::{
error::{Code, ErrorCode, ResponseError}, use meilisearch_types::error::{Code, ErrorCode, ResponseError};
milli::TimeBudget, use meilisearch_types::milli::TimeBudget;
};
use rand::Rng; use rand::Rng;
use reqwest::Client; use reqwest::Client;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::time::Duration;
use tracing::{debug, info, warn}; use tracing::{debug, info, warn};
use crate::search::{Personalize, SearchResult};
const COHERE_API_URL: &str = "https://api.cohere.ai/v1/rerank"; const COHERE_API_URL: &str = "https://api.cohere.ai/v1/rerank";
const MAX_RETRIES: u32 = 10; const MAX_RETRIES: u32 = 10;

View File

@@ -18,10 +18,9 @@ use serde::{Deserialize, Serialize};
use utoipa::ToSchema; use utoipa::ToSchema;
use uuid::Uuid; use uuid::Uuid;
use crate::search::SearchMetadata;
use super::super::{ComputedFacets, FacetStats, HitsInfo, SearchHit, SearchQueryWithIndex}; use super::super::{ComputedFacets, FacetStats, HitsInfo, SearchHit, SearchQueryWithIndex};
use crate::milli::vector::Embedding; use crate::milli::vector::Embedding;
use crate::search::SearchMetadata;
pub const DEFAULT_FEDERATED_WEIGHT: f64 = 1.0; pub const DEFAULT_FEDERATED_WEIGHT: f64 = 1.0;