diff --git a/crates/dump/src/reader/mod.rs b/crates/dump/src/reader/mod.rs index 844aadc99..129b01f46 100644 --- a/crates/dump/src/reader/mod.rs +++ b/crates/dump/src/reader/mod.rs @@ -458,6 +458,11 @@ pub(crate) mod test { // webhooks + // Important note: You might be surprised to see the cli webhook in the dump, as it's not supposed to be saved. + // This is because the dump comes from a version that did save it. + // It's no longer the case, but that's not what this test is about. + // It's ok to see the cli webhook disappear when this test gets updated. + let webhooks = dump.webhooks().unwrap(); insta::assert_json_snapshot!(webhooks, @r#" { diff --git a/crates/index-scheduler/src/insta_snapshot.rs b/crates/index-scheduler/src/insta_snapshot.rs index f3431dd33..addd87be8 100644 --- a/crates/index-scheduler/src/insta_snapshot.rs +++ b/crates/index-scheduler/src/insta_snapshot.rs @@ -30,6 +30,8 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String { index_mapper, features: _, + cli_webhook_url: _, + cli_webhook_authorization: _, cached_webhooks: _, test_breakpoint_sdr: _, planned_failures: _, diff --git a/crates/index-scheduler/src/lib.rs b/crates/index-scheduler/src/lib.rs index c18efeaf2..d04b8f9e2 100644 --- a/crates/index-scheduler/src/lib.rs +++ b/crates/index-scheduler/src/lib.rs @@ -73,6 +73,7 @@ use queue::Queue; use roaring::RoaringBitmap; use scheduler::Scheduler; use time::OffsetDateTime; +use uuid::Uuid; use versioning::Versioning; use crate::index_mapper::IndexMapper; @@ -107,6 +108,10 @@ pub struct IndexSchedulerOptions { pub snapshots_path: PathBuf, /// The path to the folder containing the dumps. pub dumps_path: PathBuf, + /// The webhook url that was set by the CLI. + pub cli_webhook_url: Option, + /// The Authorization header to send to the webhook URL that was set by the CLI. + pub cli_webhook_authorization: Option, /// The maximum size, in bytes, of the task index. pub task_db_size: usize, /// The size, in bytes, with which a meilisearch index is opened the first time of each meilisearch index. @@ -179,6 +184,10 @@ pub struct IndexScheduler { /// A database to store single-keyed data that is persisted across restarts. persisted: Database, + /// The webhook url that was set by the CLI. + cli_webhook_url: Option, + /// The Authorization header to send to the webhook URL that was set by the CLI. + cli_webhook_authorization: Option, /// Webhook cached_webhooks: Arc>, @@ -221,7 +230,11 @@ impl IndexScheduler { cleanup_enabled: self.cleanup_enabled, experimental_no_edition_2024_for_dumps: self.experimental_no_edition_2024_for_dumps, persisted: self.persisted, + + cli_webhook_url: self.cli_webhook_url.clone(), + cli_webhook_authorization: self.cli_webhook_authorization.clone(), cached_webhooks: self.cached_webhooks.clone(), + embedders: self.embedders.clone(), #[cfg(test)] test_breakpoint_sdr: self.test_breakpoint_sdr.clone(), @@ -299,7 +312,8 @@ impl IndexScheduler { let persisted = env.create_database(&mut wtxn, Some(db_name::PERSISTED))?; let webhooks_db = persisted.remap_data_type::>(); - let webhooks = webhooks_db.get(&wtxn, db_keys::WEBHOOKS)?.unwrap_or_default(); + let mut webhooks = webhooks_db.get(&wtxn, db_keys::WEBHOOKS)?.unwrap_or_default(); + webhooks.webhooks.remove(&Uuid::nil()); // remove the cli webhook if it was saved by mistake wtxn.commit()?; @@ -317,7 +331,10 @@ impl IndexScheduler { .indexer_config .experimental_no_edition_2024_for_dumps, persisted, + cached_webhooks: Arc::new(RwLock::new(webhooks)), + cli_webhook_url: options.cli_webhook_url, + cli_webhook_authorization: options.cli_webhook_authorization, embedders: Default::default(), @@ -869,9 +886,10 @@ impl IndexScheduler { self.features.network() } - pub fn put_webhooks(&self, webhooks: Webhooks) -> Result<()> { + pub fn put_webhooks(&self, mut webhooks: Webhooks) -> Result<()> { let mut wtxn = self.env.write_txn()?; let webhooks_db = self.persisted.remap_data_type::>(); + webhooks.webhooks.remove(&Uuid::nil()); // the cli webhook must not be saved webhooks_db.put(&mut wtxn, db_keys::WEBHOOKS, &webhooks)?; wtxn.commit()?; *self.cached_webhooks.write().unwrap() = webhooks; @@ -880,7 +898,15 @@ impl IndexScheduler { pub fn webhooks(&self) -> Webhooks { let webhooks = self.cached_webhooks.read().unwrap_or_else(|poisoned| poisoned.into_inner()); - Webhooks::clone(&*webhooks) + let mut webhooks = Webhooks::clone(&*webhooks); + if let Some(url) = self.cli_webhook_url.as_ref().cloned() { + let mut headers = BTreeMap::new(); + if let Some(auth) = self.cli_webhook_authorization.as_ref().cloned() { + headers.insert(String::from("Authorization"), auth); + } + webhooks.webhooks.insert(Uuid::nil(), Webhook { url, headers }); + } + webhooks } pub fn embedders( diff --git a/crates/index-scheduler/src/scheduler/mod.rs b/crates/index-scheduler/src/scheduler/mod.rs index b5acf7582..bbfc4e058 100644 --- a/crates/index-scheduler/src/scheduler/mod.rs +++ b/crates/index-scheduler/src/scheduler/mod.rs @@ -26,7 +26,6 @@ use meilisearch_types::error::ResponseError; use meilisearch_types::heed::{Env, WithoutTls}; use meilisearch_types::milli; use meilisearch_types::tasks::Status; -use meilisearch_types::webhooks::Webhooks; use process_batch::ProcessBatchInfo; use rayon::current_num_threads; use rayon::iter::{IntoParallelIterator, ParallelIterator}; @@ -448,9 +447,8 @@ impl IndexScheduler { })?; // We shouldn't crash the tick function if we can't send data to the webhooks - let webhooks = self.cached_webhooks.read().unwrap_or_else(|p| p.into_inner()); + let webhooks = self.webhooks(); if !webhooks.webhooks.is_empty() { - let webhooks = Webhooks::clone(&*webhooks); let cloned_index_scheduler = self.private_clone(); std::thread::spawn(move || { if let Err(e) = cloned_index_scheduler.notify_webhooks(webhooks, &ids) { diff --git a/crates/index-scheduler/src/test_utils.rs b/crates/index-scheduler/src/test_utils.rs index b7d69b5b3..36de0ed9e 100644 --- a/crates/index-scheduler/src/test_utils.rs +++ b/crates/index-scheduler/src/test_utils.rs @@ -98,6 +98,8 @@ impl IndexScheduler { indexes_path: tempdir.path().join("indexes"), snapshots_path: tempdir.path().join("snapshots"), dumps_path: tempdir.path().join("dumps"), + cli_webhook_url: None, + cli_webhook_authorization: None, task_db_size: 1000 * 1000 * 10, // 10 MB, we don't use MiB on purpose. index_base_map_size: 1000 * 1000, // 1 MB, we don't use MiB on purpose. enable_mdb_writemap: false, diff --git a/crates/meilisearch/src/lib.rs b/crates/meilisearch/src/lib.rs index 613268936..533f0327f 100644 --- a/crates/meilisearch/src/lib.rs +++ b/crates/meilisearch/src/lib.rs @@ -13,7 +13,6 @@ pub mod routes; pub mod search; pub mod search_queue; -use std::collections::BTreeMap; use std::fs::File; use std::io::{BufReader, BufWriter}; use std::path::Path; @@ -49,14 +48,12 @@ use meilisearch_types::tasks::KindWithContent; use meilisearch_types::versioning::{ create_current_version_file, get_version, VersionFileError, VERSION_MINOR, VERSION_PATCH, }; -use meilisearch_types::webhooks::Webhook; use meilisearch_types::{compression, heed, milli, VERSION_FILE_NAME}; pub use option::Opt; use option::ScheduleSnapshot; use search_queue::SearchQueue; use tracing::{error, info_span}; use tracing_subscriber::filter::Targets; -use uuid::Uuid; use crate::error::MeilisearchHttpError; @@ -226,6 +223,8 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc, Arc< indexes_path: opt.db_path.join("indexes"), snapshots_path: opt.snapshot_dir.clone(), dumps_path: opt.dump_dir.clone(), + cli_webhook_url: opt.task_webhook_url.as_ref().map(|url| url.to_string()), + cli_webhook_authorization: opt.task_webhook_authorization_header.clone(), task_db_size: opt.max_task_db_size.as_u64() as usize, index_base_map_size: opt.max_index_size.as_u64() as usize, enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage, @@ -328,30 +327,6 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc, Arc< .unwrap(); } - // We set the webhook url - let cli_webhook = opt.task_webhook_url.as_ref().map(|u| Webhook { - url: u.to_string(), - headers: { - let mut headers = BTreeMap::new(); - if let Some(value) = &opt.task_webhook_authorization_header { - headers.insert(String::from("Authorization"), value.to_string()); - } - headers - }, - }); - let mut webhooks = index_scheduler.webhooks(); - if webhooks.webhooks.get(&Uuid::nil()) != cli_webhook.as_ref() { - match cli_webhook { - Some(webhook) => { - webhooks.webhooks.insert(Uuid::nil(), webhook); - } - None => { - webhooks.webhooks.remove(&Uuid::nil()); - } - } - index_scheduler.put_webhooks(webhooks)?; - } - Ok((index_scheduler, auth_controller)) }