Better report errors happening with the underlying LLM

This commit is contained in:
Clément Renault 2025-06-03 14:07:38 +02:00
parent f827c2442c
commit 201a808fe2
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
3 changed files with 200 additions and 32 deletions

4
Cargo.lock generated
View File

@ -476,7 +476,7 @@ dependencies = [
[[package]] [[package]]
name = "async-openai" name = "async-openai"
version = "0.28.1" version = "0.28.1"
source = "git+https://github.com/meilisearch/async-openai?branch=optional-type-function#603f1d17bb4530c45fb9a6e93294ab715a7af869" source = "git+https://github.com/meilisearch/async-openai?branch=better-error-handling#42d05e5f7dd7cdd46115c0855965f0b3f24754a2"
dependencies = [ dependencies = [
"async-openai-macros", "async-openai-macros",
"backoff", "backoff",
@ -501,7 +501,7 @@ dependencies = [
[[package]] [[package]]
name = "async-openai-macros" name = "async-openai-macros"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/meilisearch/async-openai?branch=optional-type-function#603f1d17bb4530c45fb9a6e93294ab715a7af869" source = "git+https://github.com/meilisearch/async-openai?branch=better-error-handling#42d05e5f7dd7cdd46115c0855965f0b3f24754a2"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",

View File

@ -113,7 +113,7 @@ utoipa = { version = "5.3.1", features = [
"openapi_extensions", "openapi_extensions",
] } ] }
utoipa-scalar = { version = "0.3.0", optional = true, features = ["actix-web"] } utoipa-scalar = { version = "0.3.0", optional = true, features = ["actix-web"] }
async-openai = { git = "https://github.com/meilisearch/async-openai", branch = "optional-type-function" } async-openai = { git = "https://github.com/meilisearch/async-openai", branch = "better-error-handling" }
actix-web-lab = { version = "0.24.1", default-features = false } actix-web-lab = { version = "0.24.1", default-features = false }
[dev-dependencies] [dev-dependencies]

View File

@ -10,7 +10,8 @@ use actix_web::web::{self, Data};
use actix_web::{Either, HttpRequest, HttpResponse, Responder}; use actix_web::{Either, HttpRequest, HttpResponse, Responder};
use actix_web_lab::sse::{self, Event, Sse}; use actix_web_lab::sse::{self, Event, Sse};
use async_openai::config::{Config, OpenAIConfig}; use async_openai::config::{Config, OpenAIConfig};
use async_openai::error::OpenAIError; use async_openai::error::{ApiError, OpenAIError};
use async_openai::reqwest_eventsource::Error as EventSourceError;
use async_openai::types::{ use async_openai::types::{
ChatChoiceStream, ChatCompletionMessageToolCall, ChatCompletionMessageToolCallChunk, ChatChoiceStream, ChatCompletionMessageToolCall, ChatCompletionMessageToolCallChunk,
ChatCompletionRequestAssistantMessage, ChatCompletionRequestAssistantMessageArgs, ChatCompletionRequestAssistantMessage, ChatCompletionRequestAssistantMessageArgs,
@ -45,6 +46,7 @@ use serde_json::json;
use tokio::runtime::Handle; use tokio::runtime::Handle;
use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use uuid::Uuid;
use super::ChatsParam; use super::ChatsParam;
use crate::error::MeilisearchHttpError; use crate::error::MeilisearchHttpError;
@ -120,9 +122,6 @@ pub struct FunctionSupport {
/// Defines if we can call the _meiliAppendConversationMessage /// Defines if we can call the _meiliAppendConversationMessage
/// function to provide the messages to append into the conversation. /// function to provide the messages to append into the conversation.
append_to_conversation: bool, append_to_conversation: bool,
/// Defines if we can call the _meiliReportErrors function
/// to inform the front-end about potential errors.
report_errors: bool,
} }
/// Setup search tool in chat completion request /// Setup search tool in chat completion request
@ -222,7 +221,7 @@ fn setup_search_tool(
}), }),
); );
Ok(FunctionSupport { report_progress, report_sources, append_to_conversation, report_errors }) Ok(FunctionSupport { report_progress, report_sources, append_to_conversation })
} }
/// Process search request and return formatted results /// Process search request and return formatted results
@ -441,6 +440,8 @@ async fn streamed_chat(
let function_support = let function_support =
setup_search_tool(&index_scheduler, filters, &mut chat_completion, &chat_settings.prompts)?; setup_search_tool(&index_scheduler, filters, &mut chat_completion, &chat_settings.prompts)?;
tracing::debug!("Conversation function support: {function_support:?}");
let (tx, rx) = tokio::sync::mpsc::channel(10); let (tx, rx) = tokio::sync::mpsc::channel(10);
let tx = SseEventSender(tx); let tx = SseEventSender(tx);
let _join_handle = Handle::current().spawn(async move { let _join_handle = Handle::current().spawn(async move {
@ -491,7 +492,7 @@ async fn run_conversation<C: Config>(
function_support: FunctionSupport, function_support: FunctionSupport,
) -> Result<ControlFlow<Option<FinishReason>, ()>, SendError<Event>> { ) -> Result<ControlFlow<Option<FinishReason>, ()>, SendError<Event>> {
let mut finish_reason = None; let mut finish_reason = None;
// safety: The unwrap can only happen if the stream is not correctly configured.
let mut response = client.chat().create_stream(chat_completion.clone()).await.unwrap(); let mut response = client.chat().create_stream(chat_completion.clone()).await.unwrap();
while let Some(result) = response.next().await { while let Some(result) = response.next().await {
match result { match result {
@ -590,10 +591,9 @@ async fn run_conversation<C: Config>(
} }
} }
} }
Err(err) => { Err(error) => {
if function_support.report_errors { let error = StreamErrorEvent::from_openai_error(error).await.unwrap();
tx.report_error(err).await?; tx.send_error(&error).await?;
}
return Ok(ControlFlow::Break(None)); return Ok(ControlFlow::Break(None));
} }
} }
@ -835,25 +835,6 @@ impl SseEventSender {
self.send_json(&resp).await self.send_json(&resp).await
} }
pub async fn report_error(&self, error: OpenAIError) -> Result<(), SendError<Event>> {
tracing::error!("OpenAI Error: {}", error);
let (error_code, message) = match error {
OpenAIError::Reqwest(e) => ("internal_reqwest_error", e.to_string()),
OpenAIError::ApiError(api_error) => ("llm_api_issue", api_error.to_string()),
OpenAIError::JSONDeserialize(error) => ("internal_json_deserialize", error.to_string()),
OpenAIError::FileSaveError(_) | OpenAIError::FileReadError(_) => unreachable!(),
OpenAIError::StreamError(error) => ("llm_api_stream_error", error.to_string()),
OpenAIError::InvalidArgument(error) => ("internal_invalid_argument", error.to_string()),
};
self.send_json(&json!({
"error_code": error_code,
"message": message,
}))
.await
}
pub async fn forward_response( pub async fn forward_response(
&self, &self,
resp: &CreateChatCompletionStreamResponse, resp: &CreateChatCompletionStreamResponse,
@ -861,6 +842,10 @@ impl SseEventSender {
self.send_json(resp).await self.send_json(resp).await
} }
pub async fn send_error(&self, error: &StreamErrorEvent) -> Result<(), SendError<Event>> {
self.send_json(error).await
}
pub async fn stop(self) -> Result<(), SendError<Event>> { pub async fn stop(self) -> Result<(), SendError<Event>> {
self.0.send(Event::Data(sse::Data::new("[DONE]"))).await self.0.send(Event::Data(sse::Data::new("[DONE]"))).await
} }
@ -941,3 +926,186 @@ fn format_documents<'t, 'doc>(
Ok(renders) Ok(renders)
} }
/// An error that occurs during the streaming process.
///
/// It directly comes from the OpenAI API and you can
/// read more about error events on their website:
/// <https://platform.openai.com/docs/api-reference/realtime-server-events/error>
#[derive(Debug, Serialize, Deserialize)]
pub struct StreamErrorEvent {
/// The unique ID of the server event.
event_id: String,
/// The event type, must be error.
r#type: String,
/// Details of the error.
error: StreamError,
}
/// Details of the error.
#[derive(Debug, Serialize, Deserialize)]
pub struct StreamError {
/// The type of error (e.g., "invalid_request_error", "server_error").
r#type: String,
/// Error code, if any.
code: Option<String>,
/// A human-readable error message.
message: String,
/// Parameter related to the error, if any.
param: Option<String>,
/// The event_id of the client event that caused the error, if applicable.
event_id: Option<String>,
}
impl StreamErrorEvent {
pub async fn from_openai_error(error: OpenAIError) -> Result<Self, reqwest::Error> {
let error_type = "error".to_string();
match error {
OpenAIError::Reqwest(e) => Ok(StreamErrorEvent {
event_id: Uuid::new_v4().to_string(),
r#type: error_type,
error: StreamError {
r#type: "internal_reqwest_error".to_string(),
code: Some("internal".to_string()),
message: e.to_string(),
param: None,
event_id: None,
},
}),
OpenAIError::ApiError(ApiError { message, r#type, param, code }) => {
Ok(StreamErrorEvent {
r#type: error_type,
event_id: Uuid::new_v4().to_string(),
error: StreamError {
r#type: r#type.unwrap_or_else(|| "unknown".to_string()),
code,
message,
param,
event_id: None,
},
})
}
OpenAIError::JSONDeserialize(error) => Ok(StreamErrorEvent {
event_id: Uuid::new_v4().to_string(),
r#type: error_type,
error: StreamError {
r#type: "json_deserialize_error".to_string(),
code: Some("internal".to_string()),
message: error.to_string(),
param: None,
event_id: None,
},
}),
OpenAIError::FileSaveError(_) | OpenAIError::FileReadError(_) => unreachable!(),
OpenAIError::StreamError(error) => match error {
EventSourceError::InvalidStatusCode(_status_code, response) => {
let OpenAiOutsideError {
error: OpenAiInnerError { code, message, param, r#type },
} = response.json().await?;
Ok(StreamErrorEvent {
event_id: Uuid::new_v4().to_string(),
r#type: error_type,
error: StreamError { r#type, code, message, param, event_id: None },
})
}
EventSourceError::InvalidContentType(_header_value, response) => {
let OpenAiOutsideError {
error: OpenAiInnerError { code, message, param, r#type },
} = response.json().await?;
Ok(StreamErrorEvent {
event_id: Uuid::new_v4().to_string(),
r#type: error_type,
error: StreamError { r#type, code, message, param, event_id: None },
})
}
EventSourceError::Utf8(error) => Ok(StreamErrorEvent {
event_id: Uuid::new_v4().to_string(),
r#type: error_type,
error: StreamError {
r#type: "invalid_utf8_error".to_string(),
code: None,
message: error.to_string(),
param: None,
event_id: None,
},
}),
EventSourceError::Parser(error) => Ok(StreamErrorEvent {
event_id: Uuid::new_v4().to_string(),
r#type: error_type,
error: StreamError {
r#type: "parser_error".to_string(),
code: None,
message: error.to_string(),
param: None,
event_id: None,
},
}),
EventSourceError::Transport(error) => Ok(StreamErrorEvent {
event_id: Uuid::new_v4().to_string(),
r#type: error_type,
error: StreamError {
r#type: "transport_error".to_string(),
code: None,
message: error.to_string(),
param: None,
event_id: None,
},
}),
EventSourceError::InvalidLastEventId(message) => Ok(StreamErrorEvent {
event_id: Uuid::new_v4().to_string(),
r#type: error_type,
error: StreamError {
r#type: "invalid_last_event_id".to_string(),
code: None,
message,
param: None,
event_id: None,
},
}),
EventSourceError::StreamEnded => Ok(StreamErrorEvent {
event_id: Uuid::new_v4().to_string(),
r#type: error_type,
error: StreamError {
r#type: "stream_ended".to_string(),
code: None,
message: "Stream ended".to_string(),
param: None,
event_id: None,
},
}),
},
OpenAIError::InvalidArgument(message) => Ok(StreamErrorEvent {
event_id: Uuid::new_v4().to_string(),
r#type: error_type,
error: StreamError {
r#type: "invalid_argument".to_string(),
code: None,
message,
param: None,
event_id: None,
},
}),
}
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct OpenAiOutsideError {
/// Emitted when an error occurs.
error: OpenAiInnerError,
}
/// Emitted when an error occurs.
#[derive(Debug, Clone, Deserialize)]
pub struct OpenAiInnerError {
/// The error code.
code: Option<String>,
/// The error message.
message: String,
/// The error parameter.
param: Option<String>,
/// The type of the event. Always `error`.
r#type: String,
}