mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-09-04 19:56:30 +00:00
make sure we NEVER ever write the cli defined webhook to the database or dumps
This commit is contained in:
@ -25,7 +25,7 @@ pub type Key = meilisearch_types::keys::Key;
|
||||
pub type ChatCompletionSettings = meilisearch_types::features::ChatCompletionSettings;
|
||||
pub type RuntimeTogglableFeatures = meilisearch_types::features::RuntimeTogglableFeatures;
|
||||
pub type Network = meilisearch_types::features::Network;
|
||||
pub type Webhooks = meilisearch_types::webhooks::Webhooks;
|
||||
pub type Webhooks = meilisearch_types::webhooks::WebhooksDumpView;
|
||||
|
||||
// ===== Other types to clarify the code of the compat module
|
||||
// everything related to the tasks
|
||||
|
@ -8,7 +8,7 @@ use meilisearch_types::batches::Batch;
|
||||
use meilisearch_types::features::{ChatCompletionSettings, Network, RuntimeTogglableFeatures};
|
||||
use meilisearch_types::keys::Key;
|
||||
use meilisearch_types::settings::{Checked, Settings};
|
||||
use meilisearch_types::webhooks::Webhooks;
|
||||
use meilisearch_types::webhooks::WebhooksDumpView;
|
||||
use serde_json::{Map, Value};
|
||||
use tempfile::TempDir;
|
||||
use time::OffsetDateTime;
|
||||
@ -75,11 +75,7 @@ impl DumpWriter {
|
||||
Ok(std::fs::write(self.dir.path().join("network.json"), serde_json::to_string(&network)?)?)
|
||||
}
|
||||
|
||||
pub fn create_webhooks(&self, mut webhooks: Webhooks) -> Result<()> {
|
||||
if webhooks == Webhooks::default() {
|
||||
return Ok(());
|
||||
}
|
||||
webhooks.webhooks.remove(&Uuid::nil()); // Don't store the cli webhook
|
||||
pub fn create_webhooks(&self, webhooks: WebhooksDumpView) -> Result<()> {
|
||||
Ok(std::fs::write(
|
||||
self.dir.path().join("webhooks.json"),
|
||||
serde_json::to_string(&webhooks)?,
|
||||
|
@ -30,9 +30,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
|
||||
|
||||
index_mapper,
|
||||
features: _,
|
||||
cli_webhook_url: _,
|
||||
cli_webhook_authorization: _,
|
||||
cached_webhooks: _,
|
||||
webhooks: _,
|
||||
test_breakpoint_sdr: _,
|
||||
planned_failures: _,
|
||||
run_loop_iteration: _,
|
||||
|
@ -65,13 +65,14 @@ use meilisearch_types::milli::vector::{
|
||||
use meilisearch_types::milli::{self, Index};
|
||||
use meilisearch_types::task_view::TaskView;
|
||||
use meilisearch_types::tasks::{KindWithContent, Task};
|
||||
use meilisearch_types::webhooks::{Webhook, Webhooks};
|
||||
use meilisearch_types::webhooks::{Webhook, WebhooksDumpView, WebhooksView};
|
||||
use milli::vector::db::IndexEmbeddingConfig;
|
||||
use processing::ProcessingTasks;
|
||||
pub use queue::Query;
|
||||
use queue::Queue;
|
||||
use roaring::RoaringBitmap;
|
||||
use scheduler::Scheduler;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use time::OffsetDateTime;
|
||||
use uuid::Uuid;
|
||||
use versioning::Versioning;
|
||||
@ -184,12 +185,8 @@ pub struct IndexScheduler {
|
||||
/// A database to store single-keyed data that is persisted across restarts.
|
||||
persisted: Database<Str, Str>,
|
||||
|
||||
/// The webhook url that was set by the CLI.
|
||||
cli_webhook_url: Option<String>,
|
||||
/// The Authorization header to send to the webhook URL that was set by the CLI.
|
||||
cli_webhook_authorization: Option<String>,
|
||||
/// Webhook
|
||||
cached_webhooks: Arc<RwLock<Webhooks>>,
|
||||
/// Webhook, loaded and stored in the `persisted` database
|
||||
webhooks: Arc<Webhooks>,
|
||||
|
||||
/// A map to retrieve the runtime representation of an embedder depending on its configuration.
|
||||
///
|
||||
@ -231,10 +228,7 @@ impl IndexScheduler {
|
||||
experimental_no_edition_2024_for_dumps: self.experimental_no_edition_2024_for_dumps,
|
||||
persisted: self.persisted,
|
||||
|
||||
cli_webhook_url: self.cli_webhook_url.clone(),
|
||||
cli_webhook_authorization: self.cli_webhook_authorization.clone(),
|
||||
cached_webhooks: self.cached_webhooks.clone(),
|
||||
|
||||
webhooks: self.webhooks.clone(),
|
||||
embedders: self.embedders.clone(),
|
||||
#[cfg(test)]
|
||||
test_breakpoint_sdr: self.test_breakpoint_sdr.clone(),
|
||||
@ -313,7 +307,8 @@ impl IndexScheduler {
|
||||
let persisted = env.create_database(&mut wtxn, Some(db_name::PERSISTED))?;
|
||||
let webhooks_db = persisted.remap_data_type::<SerdeJson<Webhooks>>();
|
||||
let mut webhooks = webhooks_db.get(&wtxn, db_keys::WEBHOOKS)?.unwrap_or_default();
|
||||
webhooks.webhooks.remove(&Uuid::nil()); // remove the cli webhook if it was saved by mistake
|
||||
webhooks
|
||||
.with_cli(options.cli_webhook_url.clone(), options.cli_webhook_authorization.clone());
|
||||
|
||||
wtxn.commit()?;
|
||||
|
||||
@ -331,11 +326,7 @@ impl IndexScheduler {
|
||||
.indexer_config
|
||||
.experimental_no_edition_2024_for_dumps,
|
||||
persisted,
|
||||
|
||||
cached_webhooks: Arc::new(RwLock::new(webhooks)),
|
||||
cli_webhook_url: options.cli_webhook_url,
|
||||
cli_webhook_authorization: options.cli_webhook_authorization,
|
||||
|
||||
webhooks: Arc::new(webhooks),
|
||||
embedders: Default::default(),
|
||||
|
||||
#[cfg(test)]
|
||||
@ -829,8 +820,8 @@ impl IndexScheduler {
|
||||
}
|
||||
}
|
||||
|
||||
let webhooks = self.webhooks();
|
||||
if webhooks.webhooks.is_empty() {
|
||||
let webhooks = self.webhooks.get_all();
|
||||
if webhooks.is_empty() {
|
||||
return;
|
||||
}
|
||||
let this = self.private_clone();
|
||||
@ -845,7 +836,7 @@ impl IndexScheduler {
|
||||
};
|
||||
|
||||
std::thread::spawn(move || {
|
||||
for (uuid, Webhook { url, headers }) in webhooks.webhooks.iter() {
|
||||
for (uuid, Webhook { url, headers }) in webhooks.iter() {
|
||||
let task_reader = TaskReader {
|
||||
rtxn: &rtxn,
|
||||
index_scheduler: &this,
|
||||
@ -899,27 +890,27 @@ impl IndexScheduler {
|
||||
self.features.network()
|
||||
}
|
||||
|
||||
pub fn put_webhooks(&self, mut webhooks: Webhooks) -> Result<()> {
|
||||
pub fn update_runtime_webhooks(&self, runtime: RuntimeWebhooks) -> Result<()> {
|
||||
let webhooks = Webhooks::from_runtime(runtime);
|
||||
let mut wtxn = self.env.write_txn()?;
|
||||
let webhooks_db = self.persisted.remap_data_type::<SerdeJson<Webhooks>>();
|
||||
webhooks.webhooks.remove(&Uuid::nil()); // the cli webhook must not be saved
|
||||
webhooks_db.put(&mut wtxn, db_keys::WEBHOOKS, &webhooks)?;
|
||||
wtxn.commit()?;
|
||||
*self.cached_webhooks.write().unwrap() = webhooks;
|
||||
self.webhooks.update_runtime(webhooks.into_runtime());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn webhooks(&self) -> Webhooks {
|
||||
let webhooks = self.cached_webhooks.read().unwrap_or_else(|poisoned| poisoned.into_inner());
|
||||
let mut webhooks = Webhooks::clone(&*webhooks);
|
||||
if let Some(url) = self.cli_webhook_url.as_ref().cloned() {
|
||||
let mut headers = BTreeMap::new();
|
||||
if let Some(auth) = self.cli_webhook_authorization.as_ref().cloned() {
|
||||
headers.insert(String::from("Authorization"), auth);
|
||||
}
|
||||
webhooks.webhooks.insert(Uuid::nil(), Webhook { url, headers });
|
||||
}
|
||||
webhooks
|
||||
pub fn webhooks_dump_view(&self) -> WebhooksDumpView {
|
||||
// We must not dump the cli api key
|
||||
WebhooksDumpView { webhooks: self.webhooks.get_runtime() }
|
||||
}
|
||||
|
||||
pub fn webhooks_view(&self) -> WebhooksView {
|
||||
WebhooksView { webhooks: self.webhooks.get_all() }
|
||||
}
|
||||
|
||||
pub fn retrieve_runtime_webhooks(&self) -> RuntimeWebhooks {
|
||||
self.webhooks.get_runtime()
|
||||
}
|
||||
|
||||
pub fn embedders(
|
||||
@ -1050,3 +1041,72 @@ pub struct IndexStats {
|
||||
/// Internal stats computed from the index.
|
||||
pub inner_stats: index_mapper::IndexStats,
|
||||
}
|
||||
|
||||
/// These structure are not meant to be exposed to the end user, if needed, use the meilisearch-types::webhooks structure instead.
|
||||
/// /!\ Everytime you deserialize this structure you should fill the cli_webhook later on with the `with_cli` method. /!\
|
||||
#[derive(Debug, Serialize, Deserialize, Default)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
struct Webhooks {
|
||||
// The cli webhook should *never* be stored in a database.
|
||||
// It represent a state that only exists for this execution of meilisearch
|
||||
#[serde(skip)]
|
||||
pub cli: Option<CliWebhook>,
|
||||
|
||||
#[serde(default)]
|
||||
pub runtime: RwLock<RuntimeWebhooks>,
|
||||
}
|
||||
|
||||
type RuntimeWebhooks = BTreeMap<Uuid, Webhook>;
|
||||
|
||||
impl Webhooks {
|
||||
pub fn with_cli(&mut self, url: Option<String>, auth: Option<String>) {
|
||||
if let Some(url) = url {
|
||||
let webhook = CliWebhook { url, auth };
|
||||
self.cli = Some(webhook);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_runtime(webhooks: RuntimeWebhooks) -> Self {
|
||||
Self { cli: None, runtime: RwLock::new(webhooks) }
|
||||
}
|
||||
|
||||
pub fn into_runtime(self) -> RuntimeWebhooks {
|
||||
// safe because we own self and it cannot be cloned
|
||||
self.runtime.into_inner().unwrap()
|
||||
}
|
||||
|
||||
pub fn update_runtime(&self, webhooks: RuntimeWebhooks) {
|
||||
*self.runtime.write().unwrap() = webhooks;
|
||||
}
|
||||
|
||||
/// Returns all the webhooks in an unified view. The cli webhook is represented with an uuid set to 0
|
||||
pub fn get_all(&self) -> BTreeMap<Uuid, Webhook> {
|
||||
self.cli
|
||||
.as_ref()
|
||||
.map(|wh| (Uuid::nil(), Webhook::from(wh)))
|
||||
.into_iter()
|
||||
.chain(self.runtime.read().unwrap().iter().map(|(uuid, wh)| (*uuid, wh.clone())))
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Returns all the runtime webhooks.
|
||||
pub fn get_runtime(&self) -> BTreeMap<Uuid, Webhook> {
|
||||
self.runtime.read().unwrap().iter().map(|(uuid, wh)| (*uuid, wh.clone())).collect()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Default, Clone, PartialEq)]
|
||||
struct CliWebhook {
|
||||
pub url: String,
|
||||
pub auth: Option<String>,
|
||||
}
|
||||
|
||||
impl From<&CliWebhook> for Webhook {
|
||||
fn from(webhook: &CliWebhook) -> Self {
|
||||
let mut headers = BTreeMap::new();
|
||||
if let Some(ref auth) = webhook.auth {
|
||||
headers.insert("Authorization".to_string(), auth.to_string());
|
||||
}
|
||||
Self { url: webhook.url.to_string(), headers }
|
||||
}
|
||||
}
|
||||
|
@ -272,7 +272,7 @@ impl IndexScheduler {
|
||||
|
||||
// 7. Dump the webhooks
|
||||
progress.update_progress(DumpCreationProgress::DumpTheWebhooks);
|
||||
let webhooks = self.webhooks();
|
||||
let webhooks = self.webhooks_dump_view();
|
||||
dump.create_webhooks(webhooks)?;
|
||||
|
||||
let dump_uid = started_at.format(format_description!(
|
||||
|
@ -11,9 +11,18 @@ pub struct Webhook {
|
||||
pub headers: BTreeMap<String, String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Default, Clone, PartialEq)]
|
||||
#[derive(Debug, Serialize, Default, Clone, PartialEq)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Webhooks {
|
||||
pub struct WebhooksView {
|
||||
#[serde(default)]
|
||||
pub webhooks: BTreeMap<Uuid, Webhook>,
|
||||
}
|
||||
|
||||
// Same as the WebhooksView instead it should never contains the CLI webhooks.
|
||||
// It's the right structure to use in the dump
|
||||
#[derive(Debug, Deserialize, Serialize, Default, Clone, PartialEq)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct WebhooksDumpView {
|
||||
#[serde(default)]
|
||||
pub webhooks: BTreeMap<Uuid, Webhook>,
|
||||
}
|
||||
|
@ -493,7 +493,7 @@ fn import_dump(
|
||||
|
||||
// 2. Import the webhooks
|
||||
if let Some(webhooks) = dump_reader.webhooks() {
|
||||
index_scheduler.put_webhooks(webhooks.clone())?;
|
||||
index_scheduler.update_runtime_webhooks(webhooks.webhooks.clone())?;
|
||||
}
|
||||
|
||||
// 3. Import the `Key`s.
|
||||
|
@ -146,7 +146,7 @@ pub(super) struct WebhookResults {
|
||||
async fn get_webhooks(
|
||||
index_scheduler: GuardedData<ActionPolicy<{ actions::WEBHOOKS_GET }>, Data<IndexScheduler>>,
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
let webhooks = index_scheduler.webhooks();
|
||||
let webhooks = index_scheduler.webhooks_view();
|
||||
let results = webhooks
|
||||
.webhooks
|
||||
.into_iter()
|
||||
@ -326,7 +326,7 @@ async fn get_webhook(
|
||||
uuid: Path<String>,
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
let uuid = Uuid::from_str(&uuid.into_inner()).map_err(InvalidUuid)?;
|
||||
let mut webhooks = index_scheduler.webhooks();
|
||||
let mut webhooks = index_scheduler.webhooks_view();
|
||||
|
||||
let webhook = webhooks.webhooks.remove(&uuid).ok_or(WebhookNotFound(uuid))?;
|
||||
let webhook = WebhookWithMetadata::from(uuid, webhook);
|
||||
@ -368,8 +368,8 @@ async fn post_webhook(
|
||||
return Err(TooManyHeaders(uuid).into());
|
||||
}
|
||||
|
||||
let mut webhooks = index_scheduler.webhooks();
|
||||
if webhooks.webhooks.len() >= 20 {
|
||||
let mut webhooks = index_scheduler.retrieve_runtime_webhooks();
|
||||
if webhooks.len() >= 20 {
|
||||
return Err(TooManyWebhooks.into());
|
||||
}
|
||||
|
||||
@ -383,8 +383,8 @@ async fn post_webhook(
|
||||
};
|
||||
|
||||
check_changed(uuid, &webhook)?;
|
||||
webhooks.webhooks.insert(uuid, webhook.clone());
|
||||
index_scheduler.put_webhooks(webhooks)?;
|
||||
webhooks.insert(uuid, webhook.clone());
|
||||
index_scheduler.update_runtime_webhooks(webhooks)?;
|
||||
|
||||
analytics.publish(PatchWebhooksAnalytics::post_webhook(), &req);
|
||||
|
||||
@ -426,13 +426,17 @@ async fn patch_webhook(
|
||||
let webhook_settings = webhook_settings.into_inner();
|
||||
debug!(parameters = ?(uuid, &webhook_settings), "Patch webhook");
|
||||
|
||||
let mut webhooks = index_scheduler.webhooks();
|
||||
let old_webhook = webhooks.webhooks.remove(&uuid).ok_or(WebhookNotFound(uuid))?;
|
||||
if uuid.is_nil() {
|
||||
return Err(ImmutableWebhook(uuid).into());
|
||||
}
|
||||
|
||||
let mut webhooks = index_scheduler.retrieve_runtime_webhooks();
|
||||
let old_webhook = webhooks.remove(&uuid).ok_or(WebhookNotFound(uuid))?;
|
||||
let webhook = patch_webhook_inner(&uuid, old_webhook, webhook_settings)?;
|
||||
|
||||
check_changed(uuid, &webhook)?;
|
||||
webhooks.webhooks.insert(uuid, webhook.clone());
|
||||
index_scheduler.put_webhooks(webhooks)?;
|
||||
webhooks.insert(uuid, webhook.clone());
|
||||
index_scheduler.update_runtime_webhooks(webhooks)?;
|
||||
|
||||
analytics.publish(PatchWebhooksAnalytics::patch_webhook(), &req);
|
||||
|
||||
@ -468,9 +472,9 @@ async fn delete_webhook(
|
||||
return Err(ImmutableWebhook(uuid).into());
|
||||
}
|
||||
|
||||
let mut webhooks = index_scheduler.webhooks();
|
||||
webhooks.webhooks.remove(&uuid).ok_or(WebhookNotFound(uuid))?;
|
||||
index_scheduler.put_webhooks(webhooks)?;
|
||||
let mut webhooks = index_scheduler.retrieve_runtime_webhooks();
|
||||
webhooks.remove(&uuid).ok_or(WebhookNotFound(uuid))?;
|
||||
index_scheduler.update_runtime_webhooks(webhooks)?;
|
||||
|
||||
analytics.publish(PatchWebhooksAnalytics::delete_webhook(), &req);
|
||||
|
||||
|
@ -283,7 +283,6 @@ async fn reserved_names() {
|
||||
let (value, code) = server
|
||||
.patch_webhook(Uuid::nil().to_string(), json!({ "url": "http://localhost:8080" }))
|
||||
.await;
|
||||
snapshot!(code, @"400 Bad Request");
|
||||
snapshot!(value, @r#"
|
||||
{
|
||||
"message": "Webhook `[uuid]` is immutable. The webhook defined from the command line cannot be modified using the API.",
|
||||
@ -292,9 +291,9 @@ async fn reserved_names() {
|
||||
"link": "https://docs.meilisearch.com/errors#immutable_webhook"
|
||||
}
|
||||
"#);
|
||||
snapshot!(code, @"400 Bad Request");
|
||||
|
||||
let (value, code) = server.delete_webhook(Uuid::nil().to_string()).await;
|
||||
snapshot!(code, @"400 Bad Request");
|
||||
snapshot!(value, @r#"
|
||||
{
|
||||
"message": "Webhook `[uuid]` is immutable. The webhook defined from the command line cannot be modified using the API.",
|
||||
@ -303,6 +302,7 @@ async fn reserved_names() {
|
||||
"link": "https://docs.meilisearch.com/errors#immutable_webhook"
|
||||
}
|
||||
"#);
|
||||
snapshot!(code, @"400 Bad Request");
|
||||
}
|
||||
|
||||
#[actix_web::test]
|
||||
|
Reference in New Issue
Block a user