From cc37eb870f7cb3835a49bae2785fadaff4689166 Mon Sep 17 00:00:00 2001 From: Mubelotix Date: Wed, 30 Jul 2025 12:01:40 +0200 Subject: [PATCH] Initial implementation --- crates/index-scheduler/src/features.rs | 1 + crates/index-scheduler/src/insta_snapshot.rs | 20 +- crates/index-scheduler/src/lib.rs | 190 +++++++++------ crates/meilisearch-types/src/error.rs | 6 +- crates/meilisearch-types/src/lib.rs | 1 + crates/meilisearch-types/src/webhooks.rs | 18 ++ crates/meilisearch/src/routes/mod.rs | 5 +- crates/meilisearch/src/routes/network.rs | 2 +- crates/meilisearch/src/routes/webhooks.rs | 239 +++++++++++++++++++ 9 files changed, 409 insertions(+), 73 deletions(-) create mode 100644 crates/meilisearch-types/src/webhooks.rs create mode 100644 crates/meilisearch/src/routes/webhooks.rs diff --git a/crates/index-scheduler/src/features.rs b/crates/index-scheduler/src/features.rs index b52a659a6..dacac1a9c 100644 --- a/crates/index-scheduler/src/features.rs +++ b/crates/index-scheduler/src/features.rs @@ -182,6 +182,7 @@ impl FeatureData { ..persisted_features })); + // Once this is stabilized, network should be stored along with webhooks in index-scheduler's persisted database let network_db = runtime_features_db.remap_data_type::>(); let network: Network = network_db.get(wtxn, db_keys::NETWORK)?.unwrap_or_default(); diff --git a/crates/index-scheduler/src/insta_snapshot.rs b/crates/index-scheduler/src/insta_snapshot.rs index 32ce131b5..21626fb2e 100644 --- a/crates/index-scheduler/src/insta_snapshot.rs +++ b/crates/index-scheduler/src/insta_snapshot.rs @@ -26,11 +26,11 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String { version, queue, scheduler, + persisted, index_mapper, features: _, - webhook_url: _, - webhook_authorization_header: _, + cached_webhooks: _, test_breakpoint_sdr: _, planned_failures: _, run_loop_iteration: _, @@ -62,6 +62,10 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String { } snap.push_str("\n----------------------------------------------------------------------\n"); + snap.push_str("### Persisted:\n"); + snap.push_str(&snapshot_persisted_db(&rtxn, persisted)); + snap.push_str("----------------------------------------------------------------------\n"); + snap.push_str("### All Tasks:\n"); snap.push_str(&snapshot_all_tasks(&rtxn, queue.tasks.all_tasks)); snap.push_str("----------------------------------------------------------------------\n"); @@ -200,6 +204,16 @@ pub fn snapshot_date_db(rtxn: &RoTxn, db: Database) -> String { + let mut snap = String::new(); + let iter = db.iter(rtxn).unwrap(); + for next in iter { + let (key, value) = next.unwrap(); + snap.push_str(&format!("{key}: {value}\n")); + } + snap +} + pub fn snapshot_task(task: &Task) -> String { let mut snap = String::new(); let Task { @@ -311,6 +325,7 @@ pub fn snapshot_status( } snap } + pub fn snapshot_kind(rtxn: &RoTxn, db: Database, RoaringBitmapCodec>) -> String { let mut snap = String::new(); let iter = db.iter(rtxn).unwrap(); @@ -331,6 +346,7 @@ pub fn snapshot_index_tasks(rtxn: &RoTxn, db: Database) } snap } + pub fn snapshot_canceled_by(rtxn: &RoTxn, db: Database) -> String { let mut snap = String::new(); let iter = db.iter(rtxn).unwrap(); diff --git a/crates/index-scheduler/src/lib.rs b/crates/index-scheduler/src/lib.rs index 46566b9ba..9e1d8d1a8 100644 --- a/crates/index-scheduler/src/lib.rs +++ b/crates/index-scheduler/src/lib.rs @@ -65,6 +65,7 @@ use meilisearch_types::milli::vector::{ use meilisearch_types::milli::{self, Index}; use meilisearch_types::task_view::TaskView; use meilisearch_types::tasks::{KindWithContent, Task}; +use meilisearch_types::webhooks::{Webhook, Webhooks}; use milli::vector::db::IndexEmbeddingConfig; use processing::ProcessingTasks; pub use queue::Query; @@ -80,7 +81,15 @@ use crate::utils::clamp_to_page_size; pub(crate) type BEI128 = I128; const TASK_SCHEDULER_SIZE_THRESHOLD_PERCENT_INT: u64 = 40; -const CHAT_SETTINGS_DB_NAME: &str = "chat-settings"; + +mod db_name { + pub const CHAT_SETTINGS: &str = "chat-settings"; + pub const PERSISTED: &str = "persisted"; +} + +mod db_keys { + pub const WEBHOOKS: &str = "webhooks"; +} #[derive(Debug)] pub struct IndexSchedulerOptions { @@ -171,10 +180,11 @@ pub struct IndexScheduler { /// Whether we should use the old document indexer or the new one. pub(crate) experimental_no_edition_2024_for_dumps: bool, - /// The webhook url we should send tasks to after processing every batches. - pub(crate) webhook_url: Option, - /// The Authorization header to send to the webhook URL. - pub(crate) webhook_authorization_header: Option, + /// A database to store single-keyed data that is persisted across restarts. + persisted: Database, + + /// Webhook + cached_webhooks: Arc>, /// A map to retrieve the runtime representation of an embedder depending on its configuration. /// @@ -214,8 +224,8 @@ impl IndexScheduler { index_mapper: self.index_mapper.clone(), cleanup_enabled: self.cleanup_enabled, experimental_no_edition_2024_for_dumps: self.experimental_no_edition_2024_for_dumps, - webhook_url: self.webhook_url.clone(), - webhook_authorization_header: self.webhook_authorization_header.clone(), + persisted: self.persisted, + cached_webhooks: self.cached_webhooks.clone(), embedders: self.embedders.clone(), #[cfg(test)] test_breakpoint_sdr: self.test_breakpoint_sdr.clone(), @@ -284,10 +294,16 @@ impl IndexScheduler { let version = versioning::Versioning::new(&env, from_db_version)?; let mut wtxn = env.write_txn()?; + let features = features::FeatureData::new(&env, &mut wtxn, options.instance_features)?; let queue = Queue::new(&env, &mut wtxn, &options)?; let index_mapper = IndexMapper::new(&env, &mut wtxn, &options, budget)?; - let chat_settings = env.create_database(&mut wtxn, Some(CHAT_SETTINGS_DB_NAME))?; + let chat_settings = env.create_database(&mut wtxn, Some(db_name::CHAT_SETTINGS))?; + + let persisted = env.create_database(&mut wtxn, Some(db_name::PERSISTED))?; + let webhooks_db = persisted.remap_data_type::>(); + let webhooks = webhooks_db.get(&wtxn, db_keys::WEBHOOKS)?.unwrap_or_default(); + wtxn.commit()?; // allow unreachable_code to get rids of the warning in the case of a test build. @@ -303,8 +319,9 @@ impl IndexScheduler { experimental_no_edition_2024_for_dumps: options .indexer_config .experimental_no_edition_2024_for_dumps, - webhook_url: options.webhook_url, - webhook_authorization_header: options.webhook_authorization_header, + persisted, + cached_webhooks: Arc::new(RwLock::new(webhooks)), + embedders: Default::default(), #[cfg(test)] @@ -754,80 +771,103 @@ impl IndexScheduler { /// Once the tasks changes have been committed we must send all the tasks that were updated to our webhook if there is one. fn notify_webhook(&self, updated: &RoaringBitmap) -> Result<()> { - if let Some(ref url) = self.webhook_url { - struct TaskReader<'a, 'b> { - rtxn: &'a RoTxn<'a>, - index_scheduler: &'a IndexScheduler, - tasks: &'b mut roaring::bitmap::Iter<'b>, - buffer: Vec, - written: usize, - } + let webhooks = self.cached_webhooks.read().unwrap_or_else(|poisoned| poisoned.into_inner()); + if webhooks.webhooks.is_empty() { + return Ok(()); + } + let webhooks = Webhooks::clone(&*webhooks); - impl Read for TaskReader<'_, '_> { - fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result { - if self.buffer.is_empty() { - match self.tasks.next() { - None => return Ok(0), - Some(task_id) => { - let task = self - .index_scheduler - .queue - .tasks - .get_task(self.rtxn, task_id) - .map_err(|err| io::Error::new(io::ErrorKind::Other, err))? - .ok_or_else(|| { - io::Error::new( - io::ErrorKind::Other, - Error::CorruptedTaskQueue, - ) - })?; + struct TaskReader<'a, 'b> { + rtxn: &'a RoTxn<'a>, + index_scheduler: &'a IndexScheduler, + tasks: &'b mut roaring::bitmap::Iter<'b>, + buffer: Vec, + written: usize, + } - serde_json::to_writer( - &mut self.buffer, - &TaskView::from_task(&task), - )?; - self.buffer.push(b'\n'); - } + impl Read for TaskReader<'_, '_> { + fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result { + if self.buffer.is_empty() { + match self.tasks.next() { + None => return Ok(0), + Some(task_id) => { + let task = self + .index_scheduler + .queue + .tasks + .get_task(self.rtxn, task_id) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))? + .ok_or_else(|| { + io::Error::new(io::ErrorKind::Other, Error::CorruptedTaskQueue) + })?; + + serde_json::to_writer(&mut self.buffer, &TaskView::from_task(&task))?; + self.buffer.push(b'\n'); } } + } - let mut to_write = &self.buffer[self.written..]; - let wrote = io::copy(&mut to_write, &mut buf)?; - self.written += wrote as usize; + let mut to_write = &self.buffer[self.written..]; + let wrote = io::copy(&mut to_write, &mut buf)?; + self.written += wrote as usize; - // we wrote everything and must refresh our buffer on the next call - if self.written == self.buffer.len() { - self.written = 0; - self.buffer.clear(); - } + // we wrote everything and must refresh our buffer on the next call + if self.written == self.buffer.len() { + self.written = 0; + self.buffer.clear(); + } - Ok(wrote as usize) + Ok(wrote as usize) + } + } + + let rtxn = self.env.read_txn()?; + + let task_reader = TaskReader { + rtxn: &rtxn, + index_scheduler: self, + tasks: &mut updated.into_iter(), + buffer: Vec::with_capacity(800), // on average a task is around ~600 bytes + written: 0, + }; + + enum EitherRead { + Other(T), + Data(Vec), + } + + impl Read for &mut EitherRead { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + match self { + EitherRead::Other(reader) => reader.read(buf), + EitherRead::Data(data) => data.as_slice().read(buf), } } + } - let rtxn = self.env.read_txn()?; + let mut reader = GzEncoder::new(BufReader::new(task_reader), Compression::default()); - let task_reader = TaskReader { - rtxn: &rtxn, - index_scheduler: self, - tasks: &mut updated.into_iter(), - buffer: Vec::with_capacity(50), // on average a task is around ~100 bytes - written: 0, - }; + // When there is more than one webhook, cache the data in memory + let mut reader = match webhooks.webhooks.len() { + 1 => EitherRead::Other(reader), + _ => { + let mut data = Vec::new(); + reader.read_to_end(&mut data)?; + EitherRead::Data(data) + } + }; - // let reader = GzEncoder::new(BufReader::new(task_reader), Compression::default()); - let reader = GzEncoder::new(BufReader::new(task_reader), Compression::default()); - let request = ureq::post(url) + for (name, Webhook { url, headers }) in webhooks.webhooks.iter() { + let mut request = ureq::post(url) .timeout(Duration::from_secs(30)) .set("Content-Encoding", "gzip") .set("Content-Type", "application/x-ndjson"); - let request = match &self.webhook_authorization_header { - Some(header) => request.set("Authorization", header), - None => request, - }; + for (header_name, header_value) in headers.iter() { + request = request.set(header_name, header_value); + } - if let Err(e) = request.send(reader) { - tracing::error!("While sending data to the webhook: {e}"); + if let Err(e) = request.send(&mut reader) { + tracing::error!("While sending data to the webhook {name}: {e}"); } } @@ -862,6 +902,20 @@ impl IndexScheduler { self.features.network() } + pub fn put_webhooks(&self, webhooks: Webhooks) -> Result<()> { + let mut wtxn = self.env.write_txn()?; + let webhooks_db = self.persisted.remap_data_type::>(); + webhooks_db.put(&mut wtxn, db_keys::WEBHOOKS, &webhooks)?; + wtxn.commit()?; + *self.cached_webhooks.write().unwrap() = webhooks; + Ok(()) + } + + pub fn webhooks(&self) -> Webhooks { + let webhooks = self.cached_webhooks.read().unwrap_or_else(|poisoned| poisoned.into_inner()); + Webhooks::clone(&*webhooks) + } + pub fn embedders( &self, index_uid: String, diff --git a/crates/meilisearch-types/src/error.rs b/crates/meilisearch-types/src/error.rs index 458034c00..92425d386 100644 --- a/crates/meilisearch-types/src/error.rs +++ b/crates/meilisearch-types/src/error.rs @@ -418,7 +418,11 @@ InvalidChatCompletionSearchDescriptionPrompt , InvalidRequest , BAD_REQU InvalidChatCompletionSearchQueryParamPrompt , InvalidRequest , BAD_REQUEST ; InvalidChatCompletionSearchFilterParamPrompt , InvalidRequest , BAD_REQUEST ; InvalidChatCompletionSearchIndexUidParamPrompt , InvalidRequest , BAD_REQUEST ; -InvalidChatCompletionPreQueryPrompt , InvalidRequest , BAD_REQUEST +InvalidChatCompletionPreQueryPrompt , InvalidRequest , BAD_REQUEST ; +// Webhooks +InvalidWebhooks , InvalidRequest , BAD_REQUEST ; +InvalidWebhooksUrl , InvalidRequest , BAD_REQUEST ; +InvalidWebhooksHeaders , InvalidRequest , BAD_REQUEST } impl ErrorCode for JoinError { diff --git a/crates/meilisearch-types/src/lib.rs b/crates/meilisearch-types/src/lib.rs index fe69da526..9857bfb29 100644 --- a/crates/meilisearch-types/src/lib.rs +++ b/crates/meilisearch-types/src/lib.rs @@ -15,6 +15,7 @@ pub mod star_or; pub mod task_view; pub mod tasks; pub mod versioning; +pub mod webhooks; pub use milli::{heed, Index}; use uuid::Uuid; pub use versioning::VERSION_FILE_NAME; diff --git a/crates/meilisearch-types/src/webhooks.rs b/crates/meilisearch-types/src/webhooks.rs new file mode 100644 index 000000000..c30d32bc6 --- /dev/null +++ b/crates/meilisearch-types/src/webhooks.rs @@ -0,0 +1,18 @@ +use std::collections::BTreeMap; + +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct Webhook { + pub url: String, + #[serde(default)] + pub headers: BTreeMap, +} + +#[derive(Debug, Serialize, Deserialize, Default, Clone)] +#[serde(rename_all = "camelCase")] +pub struct Webhooks { + #[serde(default)] + pub webhooks: BTreeMap, +} diff --git a/crates/meilisearch/src/routes/mod.rs b/crates/meilisearch/src/routes/mod.rs index 260d973a1..4ae72b0bd 100644 --- a/crates/meilisearch/src/routes/mod.rs +++ b/crates/meilisearch/src/routes/mod.rs @@ -70,6 +70,7 @@ mod swap_indexes; pub mod tasks; #[cfg(test)] mod tasks_test; +mod webhooks; #[derive(OpenApi)] #[openapi( @@ -89,6 +90,7 @@ mod tasks_test; (path = "/experimental-features", api = features::ExperimentalFeaturesApi), (path = "/export", api = export::ExportApi), (path = "/network", api = network::NetworkApi), + (path = "/webhooks", api = webhooks::WebhooksApi), ), paths(get_health, get_version, get_stats), tags( @@ -120,7 +122,8 @@ pub fn configure(cfg: &mut web::ServiceConfig) { .service(web::scope("/experimental-features").configure(features::configure)) .service(web::scope("/network").configure(network::configure)) .service(web::scope("/export").configure(export::configure)) - .service(web::scope("/chats").configure(chats::configure)); + .service(web::scope("/chats").configure(chats::configure)) + .service(web::scope("/webhooks").configure(webhooks::configure)); #[cfg(feature = "swagger")] { diff --git a/crates/meilisearch/src/routes/network.rs b/crates/meilisearch/src/routes/network.rs index 6ee68ea33..4afa32c09 100644 --- a/crates/meilisearch/src/routes/network.rs +++ b/crates/meilisearch/src/routes/network.rs @@ -168,7 +168,7 @@ impl Aggregate for PatchNetworkAnalytics { path = "", tag = "Network", request_body = Network, - security(("Bearer" = ["network.update", "network.*", "*"])), + security(("Bearer" = ["network.update", "*"])), responses( (status = OK, description = "New network state is returned", body = Network, content_type = "application/json", example = json!( { diff --git a/crates/meilisearch/src/routes/webhooks.rs b/crates/meilisearch/src/routes/webhooks.rs new file mode 100644 index 000000000..d05c16672 --- /dev/null +++ b/crates/meilisearch/src/routes/webhooks.rs @@ -0,0 +1,239 @@ +use std::collections::BTreeMap; + +use actix_web::web::{self, Data}; +use actix_web::{HttpRequest, HttpResponse}; +use deserr::actix_web::AwebJson; +use deserr::Deserr; +use index_scheduler::IndexScheduler; +use meilisearch_types::deserr::DeserrJsonError; +use meilisearch_types::error::deserr_codes::{ + InvalidWebhooks, InvalidWebhooksHeaders, InvalidWebhooksUrl, +}; +use meilisearch_types::error::{ErrorCode, ResponseError}; +use meilisearch_types::keys::actions; +use meilisearch_types::milli::update::Setting; +use meilisearch_types::webhooks::{Webhook, Webhooks}; +use serde::Serialize; +use tracing::debug; +use utoipa::{OpenApi, ToSchema}; + +use crate::analytics::{Aggregate, Analytics}; +use crate::extractors::authentication::policies::ActionPolicy; +use crate::extractors::authentication::GuardedData; +use crate::extractors::sequential_extractor::SeqHandler; + +#[derive(OpenApi)] +#[openapi( + paths(get_webhooks, patch_webhooks), + tags(( + name = "Webhooks", + description = "The `/webhooks` route allows you to register endpoints to be called once tasks are processed.", + external_docs(url = "https://www.meilisearch.com/docs/reference/api/webhooks"), + )), +)] +pub struct WebhooksApi; + +pub fn configure(cfg: &mut web::ServiceConfig) { + cfg.service( + web::resource("") + .route(web::get().to(get_webhooks)) + .route(web::patch().to(SeqHandler(patch_webhooks))), + ); +} + +#[utoipa::path( + get, + path = "", + tag = "Webhooks", + security(("Bearer" = ["webhooks.get", "*.get", "*"])), + responses( + (status = OK, description = "Webhooks are returned", body = WebhooksSettings, content_type = "application/json", example = json!({ + "webhooks": { + "name": { + "url": "http://example.com/webhook", + }, + "anotherName": { + "url": "https://your.site/on-tasks-completed", + "headers": { + "Authorization": "Bearer a-secret-token" + } + } + } + })), + (status = 401, description = "The authorization header is missing", body = ResponseError, content_type = "application/json", example = json!( + { + "message": "The Authorization header is missing. It must use the bearer authorization method.", + "code": "missing_authorization_header", + "type": "auth", + "link": "https://docs.meilisearch.com/errors#missing_authorization_header" + } + )), + ) +)] +async fn get_webhooks( + index_scheduler: GuardedData, Data>, +) -> Result { + let webhooks = index_scheduler.webhooks(); + debug!(returns = ?webhooks, "Get webhooks"); + Ok(HttpResponse::Ok().json(webhooks)) +} + +#[derive(Debug, Deserr, ToSchema)] +#[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)] +#[serde(rename_all = "camelCase")] +#[schema(rename_all = "camelCase")] +struct WebhookSettings { + #[schema(value_type = Option)] + #[deserr(default, error = DeserrJsonError)] + #[serde(default)] + url: Setting, + #[schema(value_type = Option>, example = json!({"Authorization":"Bearer a-secret-token"}))] + #[deserr(default, error = DeserrJsonError)] + #[serde(default)] + headers: Setting>>, +} + +#[derive(Debug, Deserr, ToSchema)] +#[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)] +#[serde(rename_all = "camelCase")] +#[schema(rename_all = "camelCase")] +struct WebhooksSettings { + #[schema(value_type = Option>)] + #[deserr(default, error = DeserrJsonError)] + #[serde(default)] + webhooks: Setting>>, +} + +#[derive(Serialize)] +pub struct PatchWebhooksAnalytics; + +impl Aggregate for PatchWebhooksAnalytics { + fn event_name(&self) -> &'static str { + "Webhooks Updated" + } + + fn aggregate(self: Box, _new: Box) -> Box { + self + } + + fn into_event(self: Box) -> serde_json::Value { + serde_json::to_value(*self).unwrap_or_default() + } +} + +#[derive(Debug, thiserror::Error)] +enum WebhooksError { + #[error("The URL for the webhook `{0}` is missing.")] + MissingUrl(String), +} + +impl ErrorCode for WebhooksError { + fn error_code(&self) -> meilisearch_types::error::Code { + match self { + WebhooksError::MissingUrl(_) => meilisearch_types::error::Code::InvalidWebhooksUrl, + } + } +} + +#[utoipa::path( + patch, + path = "", + tag = "Webhooks", + request_body = WebhooksSettings, + security(("Bearer" = ["webhooks.update", "*"])), + responses( + (status = 200, description = "Returns the updated webhooks", body = WebhooksSettings, content_type = "application/json", example = json!({ + "webhooks": { + "name": { + "url": "http://example.com/webhook", + }, + "anotherName": { + "url": "https://your.site/on-tasks-completed", + "headers": { + "Authorization": "Bearer a-secret-token" + } + } + } + })), + (status = 401, description = "The authorization header is missing", body = ResponseError, content_type = "application/json", example = json!({ + "message": "The Authorization header is missing. It must use the bearer authorization method.", + "code": "missing_authorization_header", + "type": "auth", + "link": "https://docs.meilisearch.com/errors#missing_authorization_header" + })), + ) +)] +async fn patch_webhooks( + index_scheduler: GuardedData, Data>, + new_webhooks: AwebJson, + req: HttpRequest, + analytics: Data, +) -> Result { + let WebhooksSettings { webhooks: new_webhooks } = new_webhooks.0; + let Webhooks { mut webhooks } = index_scheduler.webhooks(); + debug!(parameters = ?new_webhooks, "Patch webhooks"); + + fn merge_webhook( + name: &str, + old_webhook: Option, + new_webhook: WebhookSettings, + ) -> Result { + let (old_url, mut headers) = + old_webhook.map(|w| (Some(w.url), w.headers)).unwrap_or((None, BTreeMap::new())); + + let url = match new_webhook.url { + Setting::Set(url) => url, + Setting::NotSet => old_url.ok_or_else(|| WebhooksError::MissingUrl(name.to_owned()))?, + Setting::Reset => return Err(WebhooksError::MissingUrl(name.to_owned())), + }; + + let headers = match new_webhook.headers { + Setting::Set(new_headers) => { + for (name, value) in new_headers { + match value { + Setting::Set(value) => { + headers.insert(name, value); + } + Setting::NotSet => continue, + Setting::Reset => { + headers.remove(&name); + continue; + } + } + } + headers + } + Setting::NotSet => headers, + Setting::Reset => BTreeMap::new(), + }; + + Ok(Webhook { url, headers }) + } + + match new_webhooks { + Setting::Set(new_webhooks) => { + for (name, new_webhook) in new_webhooks { + match new_webhook { + Setting::Set(new_webhook) => { + let old_webhook = webhooks.remove(&name); + let webhook = merge_webhook(&name, old_webhook, new_webhook)?; + webhooks.insert(name.clone(), webhook); + } + Setting::Reset => { + webhooks.remove(&name); + } + Setting::NotSet => (), + } + } + } + Setting::Reset => webhooks.clear(), + Setting::NotSet => (), + }; + + analytics.publish(PatchWebhooksAnalytics, &req); + + let webhooks = Webhooks { webhooks }; + index_scheduler.put_webhooks(webhooks.clone())?; + debug!(returns = ?webhooks, "Patch webhooks"); + Ok(HttpResponse::Ok().json(webhooks)) +}