diff --git a/crates/index-scheduler/src/lib.rs b/crates/index-scheduler/src/lib.rs index 9fb92f041..261fe030c 100644 --- a/crates/index-scheduler/src/lib.rs +++ b/crates/index-scheduler/src/lib.rs @@ -54,7 +54,7 @@ use meilisearch_types::batches::Batch; use meilisearch_types::features::{InstanceTogglableFeatures, Network, RuntimeTogglableFeatures}; use meilisearch_types::heed::byteorder::BE; use meilisearch_types::heed::types::{SerdeJson, Str, I128}; -use meilisearch_types::heed::{self, Database, Env, RoTxn, WithoutTls}; +use meilisearch_types::heed::{self, Database, Env, RoTxn, RwTxn, WithoutTls}; use meilisearch_types::milli::index::IndexEmbeddingConfig; use meilisearch_types::milli::update::IndexerConfig; use meilisearch_types::milli::vector::{Embedder, EmbedderOptions, EmbeddingConfigs}; @@ -154,7 +154,7 @@ pub struct IndexScheduler { features: features::FeatureData, /// Stores the custom chat prompts and other settings of the indexes. - chat_settings: Database>, + pub(crate) chat_settings: Database>, /// Everything related to the processing of the tasks pub scheduler: scheduler::Scheduler, @@ -308,6 +308,14 @@ impl IndexScheduler { Ok(this) } + pub fn write_txn(&self) -> Result { + self.env.write_txn().map_err(|e| e.into()) + } + + pub fn read_txn(&self) -> Result> { + self.env.read_txn().map_err(|e| e.into()) + } + /// Return `Ok(())` if the index scheduler is able to access one of its database. pub fn health(&self) -> Result<()> { let rtxn = self.env.read_txn()?; @@ -384,10 +392,6 @@ impl IndexScheduler { } } - pub fn read_txn(&self) -> Result> { - self.env.read_txn().map_err(|e| e.into()) - } - /// Start the run loop for the given index scheduler. /// /// This function will execute in a different thread and must be called @@ -505,7 +509,7 @@ impl IndexScheduler { /// Returns the total number of indexes available for the specified filter. /// And a `Vec` of the index_uid + its stats - pub fn get_paginated_indexes_stats( + pub fn paginated_indexes_stats( &self, filters: &meilisearch_auth::AuthFilter, from: usize, @@ -546,6 +550,25 @@ impl IndexScheduler { ret.map(|ret| (total, ret)) } + /// Returns the total number of chat workspaces available ~~for the specified filter~~. + /// And a `Vec` of the workspace_uids + pub fn paginated_chat_workspace_uids( + &self, + _filters: &meilisearch_auth::AuthFilter, + from: usize, + limit: usize, + ) -> Result<(usize, Vec)> { + let rtxn = self.read_txn()?; + let total = self.chat_settings.len(&rtxn)?; + let mut iter = self.chat_settings.iter(&rtxn)?.skip(from); + iter.by_ref() + .take(limit) + .map(|ret| ret.map_err(Error::from)) + .map(|ret| ret.map(|(uid, _)| uid.to_string())) + .collect::, Error>>() + .map(|ret| (total as usize, ret)) + } + /// The returned structure contains: /// 1. The name of the property being observed can be `statuses`, `types`, or `indexes`. /// 2. The name of the specific data related to the property can be `enqueued` for the `statuses`, `settingsUpdate` for the `types`, or the name of the index for the `indexes`, for example. @@ -875,16 +898,21 @@ impl IndexScheduler { res.map(EmbeddingConfigs::new) } - pub fn chat_settings(&self) -> Result> { - let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?; - self.chat_settings.get(&rtxn, "main").map_err(Into::into) + pub fn chat_settings(&self, rtxn: &RoTxn, uid: &str) -> Result> { + self.chat_settings.get(rtxn, uid).map_err(Into::into) } - pub fn put_chat_settings(&self, settings: &serde_json::Value) -> Result<()> { - let mut wtxn = self.env.write_txn().map_err(Error::HeedTransaction)?; - self.chat_settings.put(&mut wtxn, "main", settings)?; - wtxn.commit().map_err(Error::HeedTransaction)?; - Ok(()) + pub fn put_chat_settings( + &self, + wtxn: &mut RwTxn, + uid: &str, + settings: &serde_json::Value, + ) -> Result<()> { + self.chat_settings.put(wtxn, uid, settings).map_err(Into::into) + } + + pub fn delete_chat_settings(&self, wtxn: &mut RwTxn, uid: &str) -> Result { + self.chat_settings.delete(wtxn, uid).map_err(Into::into) } } diff --git a/crates/index-scheduler/src/scheduler/test.rs b/crates/index-scheduler/src/scheduler/test.rs index f13af9f87..06bc14051 100644 --- a/crates/index-scheduler/src/scheduler/test.rs +++ b/crates/index-scheduler/src/scheduler/test.rs @@ -894,7 +894,7 @@ fn create_and_list_index() { let err = index_scheduler.index("kefir").map(|_| ()).unwrap_err(); snapshot!(err, @"Index `kefir` not found."); - let empty = index_scheduler.get_paginated_indexes_stats(&AuthFilter::default(), 0, 20).unwrap(); + let empty = index_scheduler.paginated_indexes_stats(&AuthFilter::default(), 0, 20).unwrap(); snapshot!(format!("{empty:?}"), @"(0, [])"); // After advancing just once the index should've been created, the wtxn has been released and commited @@ -902,7 +902,7 @@ fn create_and_list_index() { handle.advance_till([InsideProcessBatch]); index_scheduler.index("kefir").unwrap(); - let list = index_scheduler.get_paginated_indexes_stats(&AuthFilter::default(), 0, 20).unwrap(); + let list = index_scheduler.paginated_indexes_stats(&AuthFilter::default(), 0, 20).unwrap(); snapshot!(json_string!(list, { "[1][0][1].created_at" => "[date]", "[1][0][1].updated_at" => "[date]", "[1][0][1].used_database_size" => "[bytes]", "[1][0][1].database_size" => "[bytes]" }), @r###" [ 1, diff --git a/crates/meilisearch-types/src/keys.rs b/crates/meilisearch-types/src/keys.rs index 1c1ebad5b..e30ef1008 100644 --- a/crates/meilisearch-types/src/keys.rs +++ b/crates/meilisearch-types/src/keys.rs @@ -323,18 +323,25 @@ pub enum Action { #[serde(rename = "network.update")] #[deserr(rename = "network.update")] NetworkUpdate, + // TODO should we rename it chatCompletions.get ? #[serde(rename = "chat.get")] #[deserr(rename = "chat.get")] Chat, - #[serde(rename = "chatSettings.*")] - #[deserr(rename = "chatSettings.*")] - ChatSettingsAll, - #[serde(rename = "chatSettings.get")] - #[deserr(rename = "chatSettings.get")] - ChatSettingsGet, - #[serde(rename = "chatSettings.update")] - #[deserr(rename = "chatSettings.update")] - ChatSettingsUpdate, + #[serde(rename = "chats.get")] + #[deserr(rename = "chats.get")] + Chats, + #[serde(rename = "chatsSettings.*")] + #[deserr(rename = "chatsSettings.*")] + ChatsSettingsAll, + #[serde(rename = "chatsSettings.get")] + #[deserr(rename = "chatsSettings.get")] + ChatsSettingsGet, + #[serde(rename = "chatsSettings.update")] + #[deserr(rename = "chatsSettings.update")] + ChatsSettingsUpdate, + #[serde(rename = "chatsSettings.delete")] + #[deserr(rename = "chatsSettings.delete")] + ChatsSettingsDelete, } impl Action { @@ -360,9 +367,12 @@ impl Action { SETTINGS_ALL => Some(Self::SettingsAll), SETTINGS_GET => Some(Self::SettingsGet), SETTINGS_UPDATE => Some(Self::SettingsUpdate), - CHAT_SETTINGS_ALL => Some(Self::ChatSettingsAll), - CHAT_SETTINGS_GET => Some(Self::ChatSettingsGet), - CHAT_SETTINGS_UPDATE => Some(Self::ChatSettingsUpdate), + CHAT => Some(Self::Chat), + CHATS_GET => Some(Self::Chats), + CHATS_SETTINGS_ALL => Some(Self::ChatsSettingsAll), + CHATS_SETTINGS_GET => Some(Self::ChatsSettingsGet), + CHATS_SETTINGS_UPDATE => Some(Self::ChatsSettingsUpdate), + CHATS_SETTINGS_DELETE => Some(Self::ChatsSettingsDelete), STATS_ALL => Some(Self::StatsAll), STATS_GET => Some(Self::StatsGet), METRICS_ALL => Some(Self::MetricsAll), @@ -379,7 +389,6 @@ impl Action { EXPERIMENTAL_FEATURES_UPDATE => Some(Self::ExperimentalFeaturesUpdate), NETWORK_GET => Some(Self::NetworkGet), NETWORK_UPDATE => Some(Self::NetworkUpdate), - CHAT => Some(Self::Chat), _otherwise => None, } } @@ -430,7 +439,9 @@ pub mod actions { pub const NETWORK_UPDATE: u8 = NetworkUpdate.repr(); pub const CHAT: u8 = Chat.repr(); - pub const CHAT_SETTINGS_ALL: u8 = ChatSettingsAll.repr(); - pub const CHAT_SETTINGS_GET: u8 = ChatSettingsGet.repr(); - pub const CHAT_SETTINGS_UPDATE: u8 = ChatSettingsUpdate.repr(); + pub const CHATS_GET: u8 = Chats.repr(); + pub const CHATS_SETTINGS_ALL: u8 = ChatsSettingsAll.repr(); + pub const CHATS_SETTINGS_GET: u8 = ChatsSettingsGet.repr(); + pub const CHATS_SETTINGS_UPDATE: u8 = ChatsSettingsUpdate.repr(); + pub const CHATS_SETTINGS_DELETE: u8 = ChatsSettingsDelete.repr(); } diff --git a/crates/meilisearch/src/routes/chat.rs b/crates/meilisearch/src/routes/chats/chat_completions.rs similarity index 96% rename from crates/meilisearch/src/routes/chat.rs rename to crates/meilisearch/src/routes/chats/chat_completions.rs index 1cc5f1012..ee16a33cd 100644 --- a/crates/meilisearch/src/routes/chat.rs +++ b/crates/meilisearch/src/routes/chats/chat_completions.rs @@ -44,7 +44,8 @@ use tokio::runtime::Handle; use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::Sender; -use super::settings::chat::{ChatPrompts, GlobalChatSettings}; +use super::settings::{ChatPrompts, GlobalChatSettings}; +use super::ChatsParam; use crate::error::MeilisearchHttpError; use crate::extractors::authentication::policies::ActionPolicy; use crate::extractors::authentication::{extract_token_from_request, GuardedData, Policy as _}; @@ -60,13 +61,14 @@ const MEILI_REPORT_ERRORS_NAME: &str = "_meiliReportErrors"; const MEILI_SEARCH_IN_INDEX_FUNCTION_NAME: &str = "_meiliSearchInIndex"; pub fn configure(cfg: &mut web::ServiceConfig) { - cfg.service(web::resource("/completions").route(web::post().to(chat))); + cfg.service(web::resource("").route(web::post().to(chat))); } /// Get a chat completion async fn chat( index_scheduler: GuardedData, Data>, auth_ctrl: web::Data, + chats_param: web::Path, req: HttpRequest, search_queue: web::Data, web::Json(chat_completion): web::Json, @@ -74,6 +76,8 @@ async fn chat( // To enable later on, when the feature will be experimental // index_scheduler.features().check_chat("Using the /chat route")?; + let ChatsParam { workspace_uid } = chats_param.into_inner(); + assert_eq!( chat_completion.n.unwrap_or(1), 1, @@ -82,11 +86,27 @@ async fn chat( if chat_completion.stream.unwrap_or(false) { Either::Right( - streamed_chat(index_scheduler, auth_ctrl, search_queue, req, chat_completion).await, + streamed_chat( + index_scheduler, + auth_ctrl, + search_queue, + &workspace_uid, + req, + chat_completion, + ) + .await, ) } else { Either::Left( - non_streamed_chat(index_scheduler, auth_ctrl, search_queue, req, chat_completion).await, + non_streamed_chat( + index_scheduler, + auth_ctrl, + search_queue, + &workspace_uid, + req, + chat_completion, + ) + .await, ) } } @@ -294,12 +314,14 @@ async fn non_streamed_chat( index_scheduler: GuardedData, Data>, auth_ctrl: web::Data, search_queue: web::Data, + workspace_uid: &str, req: HttpRequest, mut chat_completion: CreateChatCompletionRequest, ) -> Result { let filters = index_scheduler.filters(); - let chat_settings = match index_scheduler.chat_settings().unwrap() { + let rtxn = index_scheduler.read_txn()?; + let chat_settings = match index_scheduler.chat_settings(&rtxn, workspace_uid).unwrap() { Some(value) => serde_json::from_value(value).unwrap(), None => GlobalChatSettings::default(), }; @@ -387,15 +409,18 @@ async fn streamed_chat( index_scheduler: GuardedData, Data>, auth_ctrl: web::Data, search_queue: web::Data, + workspace_uid: &str, req: HttpRequest, mut chat_completion: CreateChatCompletionRequest, ) -> Result { let filters = index_scheduler.filters(); - let chat_settings = match index_scheduler.chat_settings().unwrap() { + let rtxn = index_scheduler.read_txn()?; + let chat_settings = match index_scheduler.chat_settings(&rtxn, workspace_uid).unwrap() { Some(value) => serde_json::from_value(value.clone()).unwrap(), None => GlobalChatSettings::default(), }; + drop(rtxn); let mut config = OpenAIConfig::default(); if let Setting::Set(api_key) = chat_settings.api_key.as_ref() { @@ -662,6 +687,7 @@ impl SseEventSender { function_name: String, function_arguments: String, ) -> Result<(), SendError> { + #[allow(deprecated)] let message = ChatCompletionRequestMessage::Assistant(ChatCompletionRequestAssistantMessage { content: None, @@ -698,6 +724,7 @@ impl SseEventSender { resp.choices[0] = ChatChoiceStream { index: 0, + #[allow(deprecated)] delta: ChatCompletionStreamResponseDelta { content: None, function_call: None, @@ -744,6 +771,7 @@ impl SseEventSender { resp.choices[0] = ChatChoiceStream { index: 0, + #[allow(deprecated)] delta: ChatCompletionStreamResponseDelta { content: None, function_call: None, @@ -788,6 +816,7 @@ impl SseEventSender { resp.choices[0] = ChatChoiceStream { index: 0, + #[allow(deprecated)] delta: ChatCompletionStreamResponseDelta { content: None, function_call: None, diff --git a/crates/meilisearch/src/routes/chats/mod.rs b/crates/meilisearch/src/routes/chats/mod.rs new file mode 100644 index 000000000..00e3b28b5 --- /dev/null +++ b/crates/meilisearch/src/routes/chats/mod.rs @@ -0,0 +1,87 @@ +use actix_web::{ + web::{self, Data}, + HttpResponse, +}; +use deserr::{actix_web::AwebQueryParameter, Deserr}; +use index_scheduler::IndexScheduler; +use meilisearch_types::{ + deserr::{query_params::Param, DeserrQueryParamError}, + error::{ + deserr_codes::{InvalidIndexLimit, InvalidIndexOffset}, + ResponseError, + }, + keys::actions, +}; +use serde::{Deserialize, Serialize}; +use tracing::debug; +use utoipa::{IntoParams, ToSchema}; + +use crate::{ + extractors::authentication::{policies::ActionPolicy, GuardedData}, + routes::PAGINATION_DEFAULT_LIMIT, +}; + +use super::Pagination; + +// TODO supports chats/$workspace/settings + /chats/$workspace/chat/completions +pub mod chat_completions; +pub mod settings; + +#[derive(Deserialize)] +pub struct ChatsParam { + workspace_uid: String, +} + +pub fn configure(cfg: &mut web::ServiceConfig) { + cfg.service(web::resource("").route(web::get().to(list_workspaces))).service( + web::scope("/{workspace_uid}") + .service(web::scope("/chat/completions").configure(chat_completions::configure)) + .service(web::scope("/settings").configure(settings::configure)), + ); +} + +#[derive(Deserr, Debug, Clone, Copy, IntoParams)] +#[deserr(error = DeserrQueryParamError, rename_all = camelCase, deny_unknown_fields)] +#[into_params(rename_all = "camelCase", parameter_in = Query)] +pub struct ListChats { + /// The number of chat workspaces to skip before starting to retrieve anything + #[param(value_type = Option, default, example = 100)] + #[deserr(default, error = DeserrQueryParamError)] + pub offset: Param, + /// The number of chat workspaces to retrieve + #[param(value_type = Option, default = 20, example = 1)] + #[deserr(default = Param(PAGINATION_DEFAULT_LIMIT), error = DeserrQueryParamError)] + pub limit: Param, +} + +impl ListChats { + fn as_pagination(self) -> Pagination { + Pagination { offset: self.offset.0, limit: self.limit.0 } + } +} + +#[derive(Debug, Serialize, Clone, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ChatWorkspaceView { + /// Unique identifier for the index + pub uid: String, +} + +pub async fn list_workspaces( + index_scheduler: GuardedData, Data>, + paginate: AwebQueryParameter, +) -> Result { + debug!(parameters = ?paginate, "List chat workspaces"); + let filters = index_scheduler.filters(); + let (total, workspaces) = index_scheduler.paginated_chat_workspace_uids( + filters, + *paginate.offset, + *paginate.limit, + )?; + let workspaces = + workspaces.into_iter().map(|uid| ChatWorkspaceView { uid }).collect::>(); + let ret = paginate.as_pagination().format_with(total, workspaces); + + debug!(returns = ?ret, "List chat workspaces"); + Ok(HttpResponse::Ok().json(ret)) +} diff --git a/crates/meilisearch/src/routes/settings/chat.rs b/crates/meilisearch/src/routes/chats/settings.rs similarity index 80% rename from crates/meilisearch/src/routes/settings/chat.rs rename to crates/meilisearch/src/routes/chats/settings.rs index 60fd01ab6..0d3f88938 100644 --- a/crates/meilisearch/src/routes/settings/chat.rs +++ b/crates/meilisearch/src/routes/chats/settings.rs @@ -10,21 +10,29 @@ use crate::extractors::authentication::policies::ActionPolicy; use crate::extractors::authentication::GuardedData; use crate::extractors::sequential_extractor::SeqHandler; +use super::ChatsParam; + pub fn configure(cfg: &mut web::ServiceConfig) { cfg.service( web::resource("") .route(web::get().to(get_settings)) - .route(web::patch().to(SeqHandler(patch_settings))), + .route(web::patch().to(SeqHandler(patch_settings))) + .route(web::delete().to(SeqHandler(delete_settings))), ); } async fn get_settings( index_scheduler: GuardedData< - ActionPolicy<{ actions::CHAT_SETTINGS_GET }>, + ActionPolicy<{ actions::CHATS_SETTINGS_GET }>, Data, >, + chats_param: web::Path, ) -> Result { - let mut settings = match index_scheduler.chat_settings()? { + let ChatsParam { workspace_uid } = chats_param.into_inner(); + + // TODO do a spawn_blocking here ??? + let rtxn = index_scheduler.read_txn()?; + let mut settings = match index_scheduler.chat_settings(&rtxn, &workspace_uid)? { Some(value) => serde_json::from_value(value).unwrap(), None => GlobalChatSettings::default(), }; @@ -34,12 +42,17 @@ async fn get_settings( async fn patch_settings( index_scheduler: GuardedData< - ActionPolicy<{ actions::CHAT_SETTINGS_UPDATE }>, + ActionPolicy<{ actions::CHATS_SETTINGS_UPDATE }>, Data, >, + chats_param: web::Path, web::Json(new): web::Json, ) -> Result { - let old = match index_scheduler.chat_settings()? { + let ChatsParam { workspace_uid } = chats_param.into_inner(); + + // TODO do a spawn_blocking here + let mut wtxn = index_scheduler.write_txn()?; + let old = match index_scheduler.chat_settings(&mut wtxn, &workspace_uid)? { Some(value) => serde_json::from_value(value).unwrap(), None => GlobalChatSettings::default(), }; @@ -64,16 +77,39 @@ async fn patch_settings( }; let value = serde_json::to_value(settings).unwrap(); - index_scheduler.put_chat_settings(&value)?; + index_scheduler.put_chat_settings(&mut wtxn, &workspace_uid, &value)?; + wtxn.commit()?; + Ok(HttpResponse::Ok().finish()) } +async fn delete_settings( + index_scheduler: GuardedData< + ActionPolicy<{ actions::CHATS_SETTINGS_DELETE }>, + Data, + >, + chats_param: web::Path, +) -> Result { + let ChatsParam { workspace_uid } = chats_param.into_inner(); + + // TODO do a spawn_blocking here + let mut wtxn = index_scheduler.write_txn()?; + if index_scheduler.delete_chat_settings(&mut wtxn, &workspace_uid)? { + wtxn.commit()?; + Ok(HttpResponse::Ok().finish()) + } else { + Ok(HttpResponse::NotFound().finish()) + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(deny_unknown_fields, rename_all = "camelCase")] pub enum ChatSource { OpenAi, } +// TODO Implement Deserr on that. +// TODO Declare DbGlobalChatSettings (alias it). #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(deny_unknown_fields, rename_all = "camelCase")] pub struct GlobalChatSettings { @@ -114,6 +150,8 @@ impl GlobalChatSettings { } } +// TODO Implement Deserr on that. +// TODO Declare DbChatPrompts (alias it). #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(deny_unknown_fields, rename_all = "camelCase")] pub struct ChatPrompts { diff --git a/crates/meilisearch/src/routes/indexes/mod.rs b/crates/meilisearch/src/routes/indexes/mod.rs index 48ed1cfb1..04b3e12c4 100644 --- a/crates/meilisearch/src/routes/indexes/mod.rs +++ b/crates/meilisearch/src/routes/indexes/mod.rs @@ -172,7 +172,7 @@ pub async fn list_indexes( debug!(parameters = ?paginate, "List indexes"); let filters = index_scheduler.filters(); let (total, indexes) = - index_scheduler.get_paginated_indexes_stats(filters, *paginate.offset, *paginate.limit)?; + index_scheduler.paginated_indexes_stats(filters, *paginate.offset, *paginate.limit)?; let indexes = indexes .into_iter() .map(|(name, stats)| IndexView { diff --git a/crates/meilisearch/src/routes/mod.rs b/crates/meilisearch/src/routes/mod.rs index 3d56ce8e8..572092fea 100644 --- a/crates/meilisearch/src/routes/mod.rs +++ b/crates/meilisearch/src/routes/mod.rs @@ -52,7 +52,7 @@ const PAGINATION_DEFAULT_LIMIT_FN: fn() -> usize = || 20; mod api_key; pub mod batches; -pub mod chat; +pub mod chats; mod dump; pub mod features; pub mod indexes; @@ -62,7 +62,6 @@ mod multi_search; mod multi_search_analytics; pub mod network; mod open_api_utils; -pub mod settings; mod snapshot; mod swap_indexes; pub mod tasks; @@ -116,8 +115,7 @@ pub fn configure(cfg: &mut web::ServiceConfig) { .service(web::scope("/metrics").configure(metrics::configure)) .service(web::scope("/experimental-features").configure(features::configure)) .service(web::scope("/network").configure(network::configure)) - .service(web::scope("/chat").configure(chat::configure)) - .service(web::scope("/settings/chat").configure(settings::chat::configure)); + .service(web::scope("/chats").configure(chats::settings::configure)); #[cfg(feature = "swagger")] { diff --git a/crates/meilisearch/src/routes/settings/mod.rs b/crates/meilisearch/src/routes/settings/mod.rs deleted file mode 100644 index 30a62fc50..000000000 --- a/crates/meilisearch/src/routes/settings/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod chat;