Introduce listing/getting/deleting/updating chat workspace settings

This commit is contained in:
Kerollmops 2025-05-30 12:12:47 +02:00
parent 7e42293a23
commit 5bb0714c5f
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
9 changed files with 241 additions and 51 deletions

View File

@ -54,7 +54,7 @@ use meilisearch_types::batches::Batch;
use meilisearch_types::features::{InstanceTogglableFeatures, Network, RuntimeTogglableFeatures};
use meilisearch_types::heed::byteorder::BE;
use meilisearch_types::heed::types::{SerdeJson, Str, I128};
use meilisearch_types::heed::{self, Database, Env, RoTxn, WithoutTls};
use meilisearch_types::heed::{self, Database, Env, RoTxn, RwTxn, WithoutTls};
use meilisearch_types::milli::index::IndexEmbeddingConfig;
use meilisearch_types::milli::update::IndexerConfig;
use meilisearch_types::milli::vector::{Embedder, EmbedderOptions, EmbeddingConfigs};
@ -154,7 +154,7 @@ pub struct IndexScheduler {
features: features::FeatureData,
/// Stores the custom chat prompts and other settings of the indexes.
chat_settings: Database<Str, SerdeJson<serde_json::Value>>,
pub(crate) chat_settings: Database<Str, SerdeJson<serde_json::Value>>,
/// Everything related to the processing of the tasks
pub scheduler: scheduler::Scheduler,
@ -308,6 +308,14 @@ impl IndexScheduler {
Ok(this)
}
pub fn write_txn(&self) -> Result<RwTxn> {
self.env.write_txn().map_err(|e| e.into())
}
pub fn read_txn(&self) -> Result<RoTxn<WithoutTls>> {
self.env.read_txn().map_err(|e| e.into())
}
/// Return `Ok(())` if the index scheduler is able to access one of its database.
pub fn health(&self) -> Result<()> {
let rtxn = self.env.read_txn()?;
@ -384,10 +392,6 @@ impl IndexScheduler {
}
}
pub fn read_txn(&self) -> Result<RoTxn<WithoutTls>> {
self.env.read_txn().map_err(|e| e.into())
}
/// Start the run loop for the given index scheduler.
///
/// This function will execute in a different thread and must be called
@ -505,7 +509,7 @@ impl IndexScheduler {
/// Returns the total number of indexes available for the specified filter.
/// And a `Vec` of the index_uid + its stats
pub fn get_paginated_indexes_stats(
pub fn paginated_indexes_stats(
&self,
filters: &meilisearch_auth::AuthFilter,
from: usize,
@ -546,6 +550,25 @@ impl IndexScheduler {
ret.map(|ret| (total, ret))
}
/// Returns the total number of chat workspaces available ~~for the specified filter~~.
/// And a `Vec` of the workspace_uids
pub fn paginated_chat_workspace_uids(
&self,
_filters: &meilisearch_auth::AuthFilter,
from: usize,
limit: usize,
) -> Result<(usize, Vec<String>)> {
let rtxn = self.read_txn()?;
let total = self.chat_settings.len(&rtxn)?;
let mut iter = self.chat_settings.iter(&rtxn)?.skip(from);
iter.by_ref()
.take(limit)
.map(|ret| ret.map_err(Error::from))
.map(|ret| ret.map(|(uid, _)| uid.to_string()))
.collect::<Result<Vec<_>, Error>>()
.map(|ret| (total as usize, ret))
}
/// The returned structure contains:
/// 1. The name of the property being observed can be `statuses`, `types`, or `indexes`.
/// 2. The name of the specific data related to the property can be `enqueued` for the `statuses`, `settingsUpdate` for the `types`, or the name of the index for the `indexes`, for example.
@ -875,16 +898,21 @@ impl IndexScheduler {
res.map(EmbeddingConfigs::new)
}
pub fn chat_settings(&self) -> Result<Option<serde_json::Value>> {
let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?;
self.chat_settings.get(&rtxn, "main").map_err(Into::into)
pub fn chat_settings(&self, rtxn: &RoTxn, uid: &str) -> Result<Option<serde_json::Value>> {
self.chat_settings.get(rtxn, uid).map_err(Into::into)
}
pub fn put_chat_settings(&self, settings: &serde_json::Value) -> Result<()> {
let mut wtxn = self.env.write_txn().map_err(Error::HeedTransaction)?;
self.chat_settings.put(&mut wtxn, "main", settings)?;
wtxn.commit().map_err(Error::HeedTransaction)?;
Ok(())
pub fn put_chat_settings(
&self,
wtxn: &mut RwTxn,
uid: &str,
settings: &serde_json::Value,
) -> Result<()> {
self.chat_settings.put(wtxn, uid, settings).map_err(Into::into)
}
pub fn delete_chat_settings(&self, wtxn: &mut RwTxn, uid: &str) -> Result<bool> {
self.chat_settings.delete(wtxn, uid).map_err(Into::into)
}
}

View File

@ -894,7 +894,7 @@ fn create_and_list_index() {
let err = index_scheduler.index("kefir").map(|_| ()).unwrap_err();
snapshot!(err, @"Index `kefir` not found.");
let empty = index_scheduler.get_paginated_indexes_stats(&AuthFilter::default(), 0, 20).unwrap();
let empty = index_scheduler.paginated_indexes_stats(&AuthFilter::default(), 0, 20).unwrap();
snapshot!(format!("{empty:?}"), @"(0, [])");
// After advancing just once the index should've been created, the wtxn has been released and commited
@ -902,7 +902,7 @@ fn create_and_list_index() {
handle.advance_till([InsideProcessBatch]);
index_scheduler.index("kefir").unwrap();
let list = index_scheduler.get_paginated_indexes_stats(&AuthFilter::default(), 0, 20).unwrap();
let list = index_scheduler.paginated_indexes_stats(&AuthFilter::default(), 0, 20).unwrap();
snapshot!(json_string!(list, { "[1][0][1].created_at" => "[date]", "[1][0][1].updated_at" => "[date]", "[1][0][1].used_database_size" => "[bytes]", "[1][0][1].database_size" => "[bytes]" }), @r###"
[
1,

View File

@ -323,18 +323,25 @@ pub enum Action {
#[serde(rename = "network.update")]
#[deserr(rename = "network.update")]
NetworkUpdate,
// TODO should we rename it chatCompletions.get ?
#[serde(rename = "chat.get")]
#[deserr(rename = "chat.get")]
Chat,
#[serde(rename = "chatSettings.*")]
#[deserr(rename = "chatSettings.*")]
ChatSettingsAll,
#[serde(rename = "chatSettings.get")]
#[deserr(rename = "chatSettings.get")]
ChatSettingsGet,
#[serde(rename = "chatSettings.update")]
#[deserr(rename = "chatSettings.update")]
ChatSettingsUpdate,
#[serde(rename = "chats.get")]
#[deserr(rename = "chats.get")]
Chats,
#[serde(rename = "chatsSettings.*")]
#[deserr(rename = "chatsSettings.*")]
ChatsSettingsAll,
#[serde(rename = "chatsSettings.get")]
#[deserr(rename = "chatsSettings.get")]
ChatsSettingsGet,
#[serde(rename = "chatsSettings.update")]
#[deserr(rename = "chatsSettings.update")]
ChatsSettingsUpdate,
#[serde(rename = "chatsSettings.delete")]
#[deserr(rename = "chatsSettings.delete")]
ChatsSettingsDelete,
}
impl Action {
@ -360,9 +367,12 @@ impl Action {
SETTINGS_ALL => Some(Self::SettingsAll),
SETTINGS_GET => Some(Self::SettingsGet),
SETTINGS_UPDATE => Some(Self::SettingsUpdate),
CHAT_SETTINGS_ALL => Some(Self::ChatSettingsAll),
CHAT_SETTINGS_GET => Some(Self::ChatSettingsGet),
CHAT_SETTINGS_UPDATE => Some(Self::ChatSettingsUpdate),
CHAT => Some(Self::Chat),
CHATS_GET => Some(Self::Chats),
CHATS_SETTINGS_ALL => Some(Self::ChatsSettingsAll),
CHATS_SETTINGS_GET => Some(Self::ChatsSettingsGet),
CHATS_SETTINGS_UPDATE => Some(Self::ChatsSettingsUpdate),
CHATS_SETTINGS_DELETE => Some(Self::ChatsSettingsDelete),
STATS_ALL => Some(Self::StatsAll),
STATS_GET => Some(Self::StatsGet),
METRICS_ALL => Some(Self::MetricsAll),
@ -379,7 +389,6 @@ impl Action {
EXPERIMENTAL_FEATURES_UPDATE => Some(Self::ExperimentalFeaturesUpdate),
NETWORK_GET => Some(Self::NetworkGet),
NETWORK_UPDATE => Some(Self::NetworkUpdate),
CHAT => Some(Self::Chat),
_otherwise => None,
}
}
@ -430,7 +439,9 @@ pub mod actions {
pub const NETWORK_UPDATE: u8 = NetworkUpdate.repr();
pub const CHAT: u8 = Chat.repr();
pub const CHAT_SETTINGS_ALL: u8 = ChatSettingsAll.repr();
pub const CHAT_SETTINGS_GET: u8 = ChatSettingsGet.repr();
pub const CHAT_SETTINGS_UPDATE: u8 = ChatSettingsUpdate.repr();
pub const CHATS_GET: u8 = Chats.repr();
pub const CHATS_SETTINGS_ALL: u8 = ChatsSettingsAll.repr();
pub const CHATS_SETTINGS_GET: u8 = ChatsSettingsGet.repr();
pub const CHATS_SETTINGS_UPDATE: u8 = ChatsSettingsUpdate.repr();
pub const CHATS_SETTINGS_DELETE: u8 = ChatsSettingsDelete.repr();
}

View File

@ -44,7 +44,8 @@ use tokio::runtime::Handle;
use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::Sender;
use super::settings::chat::{ChatPrompts, GlobalChatSettings};
use super::settings::{ChatPrompts, GlobalChatSettings};
use super::ChatsParam;
use crate::error::MeilisearchHttpError;
use crate::extractors::authentication::policies::ActionPolicy;
use crate::extractors::authentication::{extract_token_from_request, GuardedData, Policy as _};
@ -60,13 +61,14 @@ const MEILI_REPORT_ERRORS_NAME: &str = "_meiliReportErrors";
const MEILI_SEARCH_IN_INDEX_FUNCTION_NAME: &str = "_meiliSearchInIndex";
pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(web::resource("/completions").route(web::post().to(chat)));
cfg.service(web::resource("").route(web::post().to(chat)));
}
/// Get a chat completion
async fn chat(
index_scheduler: GuardedData<ActionPolicy<{ actions::CHAT }>, Data<IndexScheduler>>,
auth_ctrl: web::Data<AuthController>,
chats_param: web::Path<ChatsParam>,
req: HttpRequest,
search_queue: web::Data<SearchQueue>,
web::Json(chat_completion): web::Json<CreateChatCompletionRequest>,
@ -74,6 +76,8 @@ async fn chat(
// To enable later on, when the feature will be experimental
// index_scheduler.features().check_chat("Using the /chat route")?;
let ChatsParam { workspace_uid } = chats_param.into_inner();
assert_eq!(
chat_completion.n.unwrap_or(1),
1,
@ -82,11 +86,27 @@ async fn chat(
if chat_completion.stream.unwrap_or(false) {
Either::Right(
streamed_chat(index_scheduler, auth_ctrl, search_queue, req, chat_completion).await,
streamed_chat(
index_scheduler,
auth_ctrl,
search_queue,
&workspace_uid,
req,
chat_completion,
)
.await,
)
} else {
Either::Left(
non_streamed_chat(index_scheduler, auth_ctrl, search_queue, req, chat_completion).await,
non_streamed_chat(
index_scheduler,
auth_ctrl,
search_queue,
&workspace_uid,
req,
chat_completion,
)
.await,
)
}
}
@ -294,12 +314,14 @@ async fn non_streamed_chat(
index_scheduler: GuardedData<ActionPolicy<{ actions::CHAT }>, Data<IndexScheduler>>,
auth_ctrl: web::Data<AuthController>,
search_queue: web::Data<SearchQueue>,
workspace_uid: &str,
req: HttpRequest,
mut chat_completion: CreateChatCompletionRequest,
) -> Result<HttpResponse, ResponseError> {
let filters = index_scheduler.filters();
let chat_settings = match index_scheduler.chat_settings().unwrap() {
let rtxn = index_scheduler.read_txn()?;
let chat_settings = match index_scheduler.chat_settings(&rtxn, workspace_uid).unwrap() {
Some(value) => serde_json::from_value(value).unwrap(),
None => GlobalChatSettings::default(),
};
@ -387,15 +409,18 @@ async fn streamed_chat(
index_scheduler: GuardedData<ActionPolicy<{ actions::CHAT }>, Data<IndexScheduler>>,
auth_ctrl: web::Data<AuthController>,
search_queue: web::Data<SearchQueue>,
workspace_uid: &str,
req: HttpRequest,
mut chat_completion: CreateChatCompletionRequest,
) -> Result<impl Responder, ResponseError> {
let filters = index_scheduler.filters();
let chat_settings = match index_scheduler.chat_settings().unwrap() {
let rtxn = index_scheduler.read_txn()?;
let chat_settings = match index_scheduler.chat_settings(&rtxn, workspace_uid).unwrap() {
Some(value) => serde_json::from_value(value.clone()).unwrap(),
None => GlobalChatSettings::default(),
};
drop(rtxn);
let mut config = OpenAIConfig::default();
if let Setting::Set(api_key) = chat_settings.api_key.as_ref() {
@ -662,6 +687,7 @@ impl SseEventSender {
function_name: String,
function_arguments: String,
) -> Result<(), SendError<Event>> {
#[allow(deprecated)]
let message =
ChatCompletionRequestMessage::Assistant(ChatCompletionRequestAssistantMessage {
content: None,
@ -698,6 +724,7 @@ impl SseEventSender {
resp.choices[0] = ChatChoiceStream {
index: 0,
#[allow(deprecated)]
delta: ChatCompletionStreamResponseDelta {
content: None,
function_call: None,
@ -744,6 +771,7 @@ impl SseEventSender {
resp.choices[0] = ChatChoiceStream {
index: 0,
#[allow(deprecated)]
delta: ChatCompletionStreamResponseDelta {
content: None,
function_call: None,
@ -788,6 +816,7 @@ impl SseEventSender {
resp.choices[0] = ChatChoiceStream {
index: 0,
#[allow(deprecated)]
delta: ChatCompletionStreamResponseDelta {
content: None,
function_call: None,

View File

@ -0,0 +1,87 @@
use actix_web::{
web::{self, Data},
HttpResponse,
};
use deserr::{actix_web::AwebQueryParameter, Deserr};
use index_scheduler::IndexScheduler;
use meilisearch_types::{
deserr::{query_params::Param, DeserrQueryParamError},
error::{
deserr_codes::{InvalidIndexLimit, InvalidIndexOffset},
ResponseError,
},
keys::actions,
};
use serde::{Deserialize, Serialize};
use tracing::debug;
use utoipa::{IntoParams, ToSchema};
use crate::{
extractors::authentication::{policies::ActionPolicy, GuardedData},
routes::PAGINATION_DEFAULT_LIMIT,
};
use super::Pagination;
// TODO supports chats/$workspace/settings + /chats/$workspace/chat/completions
pub mod chat_completions;
pub mod settings;
#[derive(Deserialize)]
pub struct ChatsParam {
workspace_uid: String,
}
pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(web::resource("").route(web::get().to(list_workspaces))).service(
web::scope("/{workspace_uid}")
.service(web::scope("/chat/completions").configure(chat_completions::configure))
.service(web::scope("/settings").configure(settings::configure)),
);
}
#[derive(Deserr, Debug, Clone, Copy, IntoParams)]
#[deserr(error = DeserrQueryParamError, rename_all = camelCase, deny_unknown_fields)]
#[into_params(rename_all = "camelCase", parameter_in = Query)]
pub struct ListChats {
/// The number of chat workspaces to skip before starting to retrieve anything
#[param(value_type = Option<usize>, default, example = 100)]
#[deserr(default, error = DeserrQueryParamError<InvalidIndexOffset>)]
pub offset: Param<usize>,
/// The number of chat workspaces to retrieve
#[param(value_type = Option<usize>, default = 20, example = 1)]
#[deserr(default = Param(PAGINATION_DEFAULT_LIMIT), error = DeserrQueryParamError<InvalidIndexLimit>)]
pub limit: Param<usize>,
}
impl ListChats {
fn as_pagination(self) -> Pagination {
Pagination { offset: self.offset.0, limit: self.limit.0 }
}
}
#[derive(Debug, Serialize, Clone, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ChatWorkspaceView {
/// Unique identifier for the index
pub uid: String,
}
pub async fn list_workspaces(
index_scheduler: GuardedData<ActionPolicy<{ actions::CHATS_GET }>, Data<IndexScheduler>>,
paginate: AwebQueryParameter<ListChats, DeserrQueryParamError>,
) -> Result<HttpResponse, ResponseError> {
debug!(parameters = ?paginate, "List chat workspaces");
let filters = index_scheduler.filters();
let (total, workspaces) = index_scheduler.paginated_chat_workspace_uids(
filters,
*paginate.offset,
*paginate.limit,
)?;
let workspaces =
workspaces.into_iter().map(|uid| ChatWorkspaceView { uid }).collect::<Vec<_>>();
let ret = paginate.as_pagination().format_with(total, workspaces);
debug!(returns = ?ret, "List chat workspaces");
Ok(HttpResponse::Ok().json(ret))
}

View File

@ -10,21 +10,29 @@ use crate::extractors::authentication::policies::ActionPolicy;
use crate::extractors::authentication::GuardedData;
use crate::extractors::sequential_extractor::SeqHandler;
use super::ChatsParam;
pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(
web::resource("")
.route(web::get().to(get_settings))
.route(web::patch().to(SeqHandler(patch_settings))),
.route(web::patch().to(SeqHandler(patch_settings)))
.route(web::delete().to(SeqHandler(delete_settings))),
);
}
async fn get_settings(
index_scheduler: GuardedData<
ActionPolicy<{ actions::CHAT_SETTINGS_GET }>,
ActionPolicy<{ actions::CHATS_SETTINGS_GET }>,
Data<IndexScheduler>,
>,
chats_param: web::Path<ChatsParam>,
) -> Result<HttpResponse, ResponseError> {
let mut settings = match index_scheduler.chat_settings()? {
let ChatsParam { workspace_uid } = chats_param.into_inner();
// TODO do a spawn_blocking here ???
let rtxn = index_scheduler.read_txn()?;
let mut settings = match index_scheduler.chat_settings(&rtxn, &workspace_uid)? {
Some(value) => serde_json::from_value(value).unwrap(),
None => GlobalChatSettings::default(),
};
@ -34,12 +42,17 @@ async fn get_settings(
async fn patch_settings(
index_scheduler: GuardedData<
ActionPolicy<{ actions::CHAT_SETTINGS_UPDATE }>,
ActionPolicy<{ actions::CHATS_SETTINGS_UPDATE }>,
Data<IndexScheduler>,
>,
chats_param: web::Path<ChatsParam>,
web::Json(new): web::Json<GlobalChatSettings>,
) -> Result<HttpResponse, ResponseError> {
let old = match index_scheduler.chat_settings()? {
let ChatsParam { workspace_uid } = chats_param.into_inner();
// TODO do a spawn_blocking here
let mut wtxn = index_scheduler.write_txn()?;
let old = match index_scheduler.chat_settings(&mut wtxn, &workspace_uid)? {
Some(value) => serde_json::from_value(value).unwrap(),
None => GlobalChatSettings::default(),
};
@ -64,16 +77,39 @@ async fn patch_settings(
};
let value = serde_json::to_value(settings).unwrap();
index_scheduler.put_chat_settings(&value)?;
index_scheduler.put_chat_settings(&mut wtxn, &workspace_uid, &value)?;
wtxn.commit()?;
Ok(HttpResponse::Ok().finish())
}
async fn delete_settings(
index_scheduler: GuardedData<
ActionPolicy<{ actions::CHATS_SETTINGS_DELETE }>,
Data<IndexScheduler>,
>,
chats_param: web::Path<ChatsParam>,
) -> Result<HttpResponse, ResponseError> {
let ChatsParam { workspace_uid } = chats_param.into_inner();
// TODO do a spawn_blocking here
let mut wtxn = index_scheduler.write_txn()?;
if index_scheduler.delete_chat_settings(&mut wtxn, &workspace_uid)? {
wtxn.commit()?;
Ok(HttpResponse::Ok().finish())
} else {
Ok(HttpResponse::NotFound().finish())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields, rename_all = "camelCase")]
pub enum ChatSource {
OpenAi,
}
// TODO Implement Deserr on that.
// TODO Declare DbGlobalChatSettings (alias it).
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields, rename_all = "camelCase")]
pub struct GlobalChatSettings {
@ -114,6 +150,8 @@ impl GlobalChatSettings {
}
}
// TODO Implement Deserr on that.
// TODO Declare DbChatPrompts (alias it).
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields, rename_all = "camelCase")]
pub struct ChatPrompts {

View File

@ -172,7 +172,7 @@ pub async fn list_indexes(
debug!(parameters = ?paginate, "List indexes");
let filters = index_scheduler.filters();
let (total, indexes) =
index_scheduler.get_paginated_indexes_stats(filters, *paginate.offset, *paginate.limit)?;
index_scheduler.paginated_indexes_stats(filters, *paginate.offset, *paginate.limit)?;
let indexes = indexes
.into_iter()
.map(|(name, stats)| IndexView {

View File

@ -52,7 +52,7 @@ const PAGINATION_DEFAULT_LIMIT_FN: fn() -> usize = || 20;
mod api_key;
pub mod batches;
pub mod chat;
pub mod chats;
mod dump;
pub mod features;
pub mod indexes;
@ -62,7 +62,6 @@ mod multi_search;
mod multi_search_analytics;
pub mod network;
mod open_api_utils;
pub mod settings;
mod snapshot;
mod swap_indexes;
pub mod tasks;
@ -116,8 +115,7 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
.service(web::scope("/metrics").configure(metrics::configure))
.service(web::scope("/experimental-features").configure(features::configure))
.service(web::scope("/network").configure(network::configure))
.service(web::scope("/chat").configure(chat::configure))
.service(web::scope("/settings/chat").configure(settings::chat::configure));
.service(web::scope("/chats").configure(chats::settings::configure));
#[cfg(feature = "swagger")]
{

View File

@ -1 +0,0 @@
pub mod chat;