diff --git a/Cargo.lock b/Cargo.lock index 4a21d2ab6..e310c967d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -476,7 +476,7 @@ dependencies = [ [[package]] name = "async-openai" version = "0.28.1" -source = "git+https://github.com/meilisearch/async-openai?branch=optional-type-function#603f1d17bb4530c45fb9a6e93294ab715a7af869" +source = "git+https://github.com/meilisearch/async-openai?branch=better-error-handling#42d05e5f7dd7cdd46115c0855965f0b3f24754a2" dependencies = [ "async-openai-macros", "backoff", @@ -501,7 +501,7 @@ dependencies = [ [[package]] name = "async-openai-macros" version = "0.1.0" -source = "git+https://github.com/meilisearch/async-openai?branch=optional-type-function#603f1d17bb4530c45fb9a6e93294ab715a7af869" +source = "git+https://github.com/meilisearch/async-openai?branch=better-error-handling#42d05e5f7dd7cdd46115c0855965f0b3f24754a2" dependencies = [ "proc-macro2", "quote", diff --git a/crates/meilisearch/Cargo.toml b/crates/meilisearch/Cargo.toml index 62f7cfa0a..deea9f803 100644 --- a/crates/meilisearch/Cargo.toml +++ b/crates/meilisearch/Cargo.toml @@ -113,7 +113,7 @@ utoipa = { version = "5.3.1", features = [ "openapi_extensions", ] } utoipa-scalar = { version = "0.3.0", optional = true, features = ["actix-web"] } -async-openai = { git = "https://github.com/meilisearch/async-openai", branch = "optional-type-function" } +async-openai = { git = "https://github.com/meilisearch/async-openai", branch = "better-error-handling" } actix-web-lab = { version = "0.24.1", default-features = false } [dev-dependencies] diff --git a/crates/meilisearch/src/routes/chats/chat_completions.rs b/crates/meilisearch/src/routes/chats/chat_completions.rs index 66fa12abf..e14ce3c2c 100644 --- a/crates/meilisearch/src/routes/chats/chat_completions.rs +++ b/crates/meilisearch/src/routes/chats/chat_completions.rs @@ -10,7 +10,8 @@ use actix_web::web::{self, Data}; use actix_web::{Either, HttpRequest, HttpResponse, Responder}; use actix_web_lab::sse::{self, Event, Sse}; use async_openai::config::{Config, OpenAIConfig}; -use async_openai::error::OpenAIError; +use async_openai::error::{ApiError, OpenAIError}; +use async_openai::reqwest_eventsource::Error as EventSourceError; use async_openai::types::{ ChatChoiceStream, ChatCompletionMessageToolCall, ChatCompletionMessageToolCallChunk, ChatCompletionRequestAssistantMessage, ChatCompletionRequestAssistantMessageArgs, @@ -45,6 +46,7 @@ use serde_json::json; use tokio::runtime::Handle; use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::Sender; +use uuid::Uuid; use super::ChatsParam; use crate::error::MeilisearchHttpError; @@ -120,9 +122,6 @@ pub struct FunctionSupport { /// Defines if we can call the _meiliAppendConversationMessage /// function to provide the messages to append into the conversation. append_to_conversation: bool, - /// Defines if we can call the _meiliReportErrors function - /// to inform the front-end about potential errors. - report_errors: bool, } /// Setup search tool in chat completion request @@ -222,7 +221,7 @@ fn setup_search_tool( }), ); - Ok(FunctionSupport { report_progress, report_sources, append_to_conversation, report_errors }) + Ok(FunctionSupport { report_progress, report_sources, append_to_conversation }) } /// Process search request and return formatted results @@ -441,6 +440,8 @@ async fn streamed_chat( let function_support = setup_search_tool(&index_scheduler, filters, &mut chat_completion, &chat_settings.prompts)?; + tracing::debug!("Conversation function support: {function_support:?}"); + let (tx, rx) = tokio::sync::mpsc::channel(10); let tx = SseEventSender(tx); let _join_handle = Handle::current().spawn(async move { @@ -491,7 +492,7 @@ async fn run_conversation( function_support: FunctionSupport, ) -> Result, ()>, SendError> { let mut finish_reason = None; - + // safety: The unwrap can only happen if the stream is not correctly configured. let mut response = client.chat().create_stream(chat_completion.clone()).await.unwrap(); while let Some(result) = response.next().await { match result { @@ -590,10 +591,9 @@ async fn run_conversation( } } } - Err(err) => { - if function_support.report_errors { - tx.report_error(err).await?; - } + Err(error) => { + let error = StreamErrorEvent::from_openai_error(error).await.unwrap(); + tx.send_error(&error).await?; return Ok(ControlFlow::Break(None)); } } @@ -835,25 +835,6 @@ impl SseEventSender { self.send_json(&resp).await } - pub async fn report_error(&self, error: OpenAIError) -> Result<(), SendError> { - tracing::error!("OpenAI Error: {}", error); - - let (error_code, message) = match error { - OpenAIError::Reqwest(e) => ("internal_reqwest_error", e.to_string()), - OpenAIError::ApiError(api_error) => ("llm_api_issue", api_error.to_string()), - OpenAIError::JSONDeserialize(error) => ("internal_json_deserialize", error.to_string()), - OpenAIError::FileSaveError(_) | OpenAIError::FileReadError(_) => unreachable!(), - OpenAIError::StreamError(error) => ("llm_api_stream_error", error.to_string()), - OpenAIError::InvalidArgument(error) => ("internal_invalid_argument", error.to_string()), - }; - - self.send_json(&json!({ - "error_code": error_code, - "message": message, - })) - .await - } - pub async fn forward_response( &self, resp: &CreateChatCompletionStreamResponse, @@ -861,6 +842,10 @@ impl SseEventSender { self.send_json(resp).await } + pub async fn send_error(&self, error: &StreamErrorEvent) -> Result<(), SendError> { + self.send_json(error).await + } + pub async fn stop(self) -> Result<(), SendError> { self.0.send(Event::Data(sse::Data::new("[DONE]"))).await } @@ -941,3 +926,186 @@ fn format_documents<'t, 'doc>( Ok(renders) } + +/// An error that occurs during the streaming process. +/// +/// It directly comes from the OpenAI API and you can +/// read more about error events on their website: +/// +#[derive(Debug, Serialize, Deserialize)] +pub struct StreamErrorEvent { + /// The unique ID of the server event. + event_id: String, + /// The event type, must be error. + r#type: String, + /// Details of the error. + error: StreamError, +} + +/// Details of the error. +#[derive(Debug, Serialize, Deserialize)] +pub struct StreamError { + /// The type of error (e.g., "invalid_request_error", "server_error"). + r#type: String, + /// Error code, if any. + code: Option, + /// A human-readable error message. + message: String, + /// Parameter related to the error, if any. + param: Option, + /// The event_id of the client event that caused the error, if applicable. + event_id: Option, +} + +impl StreamErrorEvent { + pub async fn from_openai_error(error: OpenAIError) -> Result { + let error_type = "error".to_string(); + match error { + OpenAIError::Reqwest(e) => Ok(StreamErrorEvent { + event_id: Uuid::new_v4().to_string(), + r#type: error_type, + error: StreamError { + r#type: "internal_reqwest_error".to_string(), + code: Some("internal".to_string()), + message: e.to_string(), + param: None, + event_id: None, + }, + }), + OpenAIError::ApiError(ApiError { message, r#type, param, code }) => { + Ok(StreamErrorEvent { + r#type: error_type, + event_id: Uuid::new_v4().to_string(), + error: StreamError { + r#type: r#type.unwrap_or_else(|| "unknown".to_string()), + code, + message, + param, + event_id: None, + }, + }) + } + OpenAIError::JSONDeserialize(error) => Ok(StreamErrorEvent { + event_id: Uuid::new_v4().to_string(), + r#type: error_type, + error: StreamError { + r#type: "json_deserialize_error".to_string(), + code: Some("internal".to_string()), + message: error.to_string(), + param: None, + event_id: None, + }, + }), + OpenAIError::FileSaveError(_) | OpenAIError::FileReadError(_) => unreachable!(), + OpenAIError::StreamError(error) => match error { + EventSourceError::InvalidStatusCode(_status_code, response) => { + let OpenAiOutsideError { + error: OpenAiInnerError { code, message, param, r#type }, + } = response.json().await?; + + Ok(StreamErrorEvent { + event_id: Uuid::new_v4().to_string(), + r#type: error_type, + error: StreamError { r#type, code, message, param, event_id: None }, + }) + } + EventSourceError::InvalidContentType(_header_value, response) => { + let OpenAiOutsideError { + error: OpenAiInnerError { code, message, param, r#type }, + } = response.json().await?; + + Ok(StreamErrorEvent { + event_id: Uuid::new_v4().to_string(), + r#type: error_type, + error: StreamError { r#type, code, message, param, event_id: None }, + }) + } + EventSourceError::Utf8(error) => Ok(StreamErrorEvent { + event_id: Uuid::new_v4().to_string(), + r#type: error_type, + error: StreamError { + r#type: "invalid_utf8_error".to_string(), + code: None, + message: error.to_string(), + param: None, + event_id: None, + }, + }), + EventSourceError::Parser(error) => Ok(StreamErrorEvent { + event_id: Uuid::new_v4().to_string(), + r#type: error_type, + error: StreamError { + r#type: "parser_error".to_string(), + code: None, + message: error.to_string(), + param: None, + event_id: None, + }, + }), + EventSourceError::Transport(error) => Ok(StreamErrorEvent { + event_id: Uuid::new_v4().to_string(), + r#type: error_type, + error: StreamError { + r#type: "transport_error".to_string(), + code: None, + message: error.to_string(), + param: None, + event_id: None, + }, + }), + EventSourceError::InvalidLastEventId(message) => Ok(StreamErrorEvent { + event_id: Uuid::new_v4().to_string(), + r#type: error_type, + error: StreamError { + r#type: "invalid_last_event_id".to_string(), + code: None, + message, + param: None, + event_id: None, + }, + }), + EventSourceError::StreamEnded => Ok(StreamErrorEvent { + event_id: Uuid::new_v4().to_string(), + r#type: error_type, + error: StreamError { + r#type: "stream_ended".to_string(), + code: None, + message: "Stream ended".to_string(), + param: None, + event_id: None, + }, + }), + }, + OpenAIError::InvalidArgument(message) => Ok(StreamErrorEvent { + event_id: Uuid::new_v4().to_string(), + r#type: error_type, + error: StreamError { + r#type: "invalid_argument".to_string(), + code: None, + message, + param: None, + event_id: None, + }, + }), + } + } +} + +#[derive(Debug, Clone, Deserialize)] +pub struct OpenAiOutsideError { + /// Emitted when an error occurs. + error: OpenAiInnerError, +} + +/// Emitted when an error occurs. +#[derive(Debug, Clone, Deserialize)] +pub struct OpenAiInnerError { + /// The error code. + code: Option, + /// The error message. + message: String, + /// The error parameter. + param: Option, + /// The type of the event. Always `error`. + r#type: String, +}