mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-09-05 20:26:31 +00:00
Stop storing the cli webhook in the db
This commit is contained in:
@ -458,6 +458,11 @@ pub(crate) mod test {
|
|||||||
|
|
||||||
// webhooks
|
// webhooks
|
||||||
|
|
||||||
|
// Important note: You might be surprised to see the cli webhook in the dump, as it's not supposed to be saved.
|
||||||
|
// This is because the dump comes from a version that did save it.
|
||||||
|
// It's no longer the case, but that's not what this test is about.
|
||||||
|
// It's ok to see the cli webhook disappear when this test gets updated.
|
||||||
|
|
||||||
let webhooks = dump.webhooks().unwrap();
|
let webhooks = dump.webhooks().unwrap();
|
||||||
insta::assert_json_snapshot!(webhooks, @r#"
|
insta::assert_json_snapshot!(webhooks, @r#"
|
||||||
{
|
{
|
||||||
|
@ -30,6 +30,8 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
|
|||||||
|
|
||||||
index_mapper,
|
index_mapper,
|
||||||
features: _,
|
features: _,
|
||||||
|
cli_webhook_url: _,
|
||||||
|
cli_webhook_authorization: _,
|
||||||
cached_webhooks: _,
|
cached_webhooks: _,
|
||||||
test_breakpoint_sdr: _,
|
test_breakpoint_sdr: _,
|
||||||
planned_failures: _,
|
planned_failures: _,
|
||||||
|
@ -73,6 +73,7 @@ use queue::Queue;
|
|||||||
use roaring::RoaringBitmap;
|
use roaring::RoaringBitmap;
|
||||||
use scheduler::Scheduler;
|
use scheduler::Scheduler;
|
||||||
use time::OffsetDateTime;
|
use time::OffsetDateTime;
|
||||||
|
use uuid::Uuid;
|
||||||
use versioning::Versioning;
|
use versioning::Versioning;
|
||||||
|
|
||||||
use crate::index_mapper::IndexMapper;
|
use crate::index_mapper::IndexMapper;
|
||||||
@ -107,6 +108,10 @@ pub struct IndexSchedulerOptions {
|
|||||||
pub snapshots_path: PathBuf,
|
pub snapshots_path: PathBuf,
|
||||||
/// The path to the folder containing the dumps.
|
/// The path to the folder containing the dumps.
|
||||||
pub dumps_path: PathBuf,
|
pub dumps_path: PathBuf,
|
||||||
|
/// The webhook url that was set by the CLI.
|
||||||
|
pub cli_webhook_url: Option<String>,
|
||||||
|
/// The Authorization header to send to the webhook URL that was set by the CLI.
|
||||||
|
pub cli_webhook_authorization: Option<String>,
|
||||||
/// The maximum size, in bytes, of the task index.
|
/// The maximum size, in bytes, of the task index.
|
||||||
pub task_db_size: usize,
|
pub task_db_size: usize,
|
||||||
/// The size, in bytes, with which a meilisearch index is opened the first time of each meilisearch index.
|
/// The size, in bytes, with which a meilisearch index is opened the first time of each meilisearch index.
|
||||||
@ -179,6 +184,10 @@ pub struct IndexScheduler {
|
|||||||
/// A database to store single-keyed data that is persisted across restarts.
|
/// A database to store single-keyed data that is persisted across restarts.
|
||||||
persisted: Database<Str, Str>,
|
persisted: Database<Str, Str>,
|
||||||
|
|
||||||
|
/// The webhook url that was set by the CLI.
|
||||||
|
cli_webhook_url: Option<String>,
|
||||||
|
/// The Authorization header to send to the webhook URL that was set by the CLI.
|
||||||
|
cli_webhook_authorization: Option<String>,
|
||||||
/// Webhook
|
/// Webhook
|
||||||
cached_webhooks: Arc<RwLock<Webhooks>>,
|
cached_webhooks: Arc<RwLock<Webhooks>>,
|
||||||
|
|
||||||
@ -221,7 +230,11 @@ impl IndexScheduler {
|
|||||||
cleanup_enabled: self.cleanup_enabled,
|
cleanup_enabled: self.cleanup_enabled,
|
||||||
experimental_no_edition_2024_for_dumps: self.experimental_no_edition_2024_for_dumps,
|
experimental_no_edition_2024_for_dumps: self.experimental_no_edition_2024_for_dumps,
|
||||||
persisted: self.persisted,
|
persisted: self.persisted,
|
||||||
|
|
||||||
|
cli_webhook_url: self.cli_webhook_url.clone(),
|
||||||
|
cli_webhook_authorization: self.cli_webhook_authorization.clone(),
|
||||||
cached_webhooks: self.cached_webhooks.clone(),
|
cached_webhooks: self.cached_webhooks.clone(),
|
||||||
|
|
||||||
embedders: self.embedders.clone(),
|
embedders: self.embedders.clone(),
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
test_breakpoint_sdr: self.test_breakpoint_sdr.clone(),
|
test_breakpoint_sdr: self.test_breakpoint_sdr.clone(),
|
||||||
@ -299,7 +312,8 @@ impl IndexScheduler {
|
|||||||
|
|
||||||
let persisted = env.create_database(&mut wtxn, Some(db_name::PERSISTED))?;
|
let persisted = env.create_database(&mut wtxn, Some(db_name::PERSISTED))?;
|
||||||
let webhooks_db = persisted.remap_data_type::<SerdeJson<Webhooks>>();
|
let webhooks_db = persisted.remap_data_type::<SerdeJson<Webhooks>>();
|
||||||
let webhooks = webhooks_db.get(&wtxn, db_keys::WEBHOOKS)?.unwrap_or_default();
|
let mut webhooks = webhooks_db.get(&wtxn, db_keys::WEBHOOKS)?.unwrap_or_default();
|
||||||
|
webhooks.webhooks.remove(&Uuid::nil()); // remove the cli webhook if it was saved by mistake
|
||||||
|
|
||||||
wtxn.commit()?;
|
wtxn.commit()?;
|
||||||
|
|
||||||
@ -317,7 +331,10 @@ impl IndexScheduler {
|
|||||||
.indexer_config
|
.indexer_config
|
||||||
.experimental_no_edition_2024_for_dumps,
|
.experimental_no_edition_2024_for_dumps,
|
||||||
persisted,
|
persisted,
|
||||||
|
|
||||||
cached_webhooks: Arc::new(RwLock::new(webhooks)),
|
cached_webhooks: Arc::new(RwLock::new(webhooks)),
|
||||||
|
cli_webhook_url: options.cli_webhook_url,
|
||||||
|
cli_webhook_authorization: options.cli_webhook_authorization,
|
||||||
|
|
||||||
embedders: Default::default(),
|
embedders: Default::default(),
|
||||||
|
|
||||||
@ -869,9 +886,10 @@ impl IndexScheduler {
|
|||||||
self.features.network()
|
self.features.network()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn put_webhooks(&self, webhooks: Webhooks) -> Result<()> {
|
pub fn put_webhooks(&self, mut webhooks: Webhooks) -> Result<()> {
|
||||||
let mut wtxn = self.env.write_txn()?;
|
let mut wtxn = self.env.write_txn()?;
|
||||||
let webhooks_db = self.persisted.remap_data_type::<SerdeJson<Webhooks>>();
|
let webhooks_db = self.persisted.remap_data_type::<SerdeJson<Webhooks>>();
|
||||||
|
webhooks.webhooks.remove(&Uuid::nil()); // the cli webhook must not be saved
|
||||||
webhooks_db.put(&mut wtxn, db_keys::WEBHOOKS, &webhooks)?;
|
webhooks_db.put(&mut wtxn, db_keys::WEBHOOKS, &webhooks)?;
|
||||||
wtxn.commit()?;
|
wtxn.commit()?;
|
||||||
*self.cached_webhooks.write().unwrap() = webhooks;
|
*self.cached_webhooks.write().unwrap() = webhooks;
|
||||||
@ -880,7 +898,15 @@ impl IndexScheduler {
|
|||||||
|
|
||||||
pub fn webhooks(&self) -> Webhooks {
|
pub fn webhooks(&self) -> Webhooks {
|
||||||
let webhooks = self.cached_webhooks.read().unwrap_or_else(|poisoned| poisoned.into_inner());
|
let webhooks = self.cached_webhooks.read().unwrap_or_else(|poisoned| poisoned.into_inner());
|
||||||
Webhooks::clone(&*webhooks)
|
let mut webhooks = Webhooks::clone(&*webhooks);
|
||||||
|
if let Some(url) = self.cli_webhook_url.as_ref().cloned() {
|
||||||
|
let mut headers = BTreeMap::new();
|
||||||
|
if let Some(auth) = self.cli_webhook_authorization.as_ref().cloned() {
|
||||||
|
headers.insert(String::from("Authorization"), auth);
|
||||||
|
}
|
||||||
|
webhooks.webhooks.insert(Uuid::nil(), Webhook { url, headers });
|
||||||
|
}
|
||||||
|
webhooks
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn embedders(
|
pub fn embedders(
|
||||||
|
@ -26,7 +26,6 @@ use meilisearch_types::error::ResponseError;
|
|||||||
use meilisearch_types::heed::{Env, WithoutTls};
|
use meilisearch_types::heed::{Env, WithoutTls};
|
||||||
use meilisearch_types::milli;
|
use meilisearch_types::milli;
|
||||||
use meilisearch_types::tasks::Status;
|
use meilisearch_types::tasks::Status;
|
||||||
use meilisearch_types::webhooks::Webhooks;
|
|
||||||
use process_batch::ProcessBatchInfo;
|
use process_batch::ProcessBatchInfo;
|
||||||
use rayon::current_num_threads;
|
use rayon::current_num_threads;
|
||||||
use rayon::iter::{IntoParallelIterator, ParallelIterator};
|
use rayon::iter::{IntoParallelIterator, ParallelIterator};
|
||||||
@ -448,9 +447,8 @@ impl IndexScheduler {
|
|||||||
})?;
|
})?;
|
||||||
|
|
||||||
// We shouldn't crash the tick function if we can't send data to the webhooks
|
// We shouldn't crash the tick function if we can't send data to the webhooks
|
||||||
let webhooks = self.cached_webhooks.read().unwrap_or_else(|p| p.into_inner());
|
let webhooks = self.webhooks();
|
||||||
if !webhooks.webhooks.is_empty() {
|
if !webhooks.webhooks.is_empty() {
|
||||||
let webhooks = Webhooks::clone(&*webhooks);
|
|
||||||
let cloned_index_scheduler = self.private_clone();
|
let cloned_index_scheduler = self.private_clone();
|
||||||
std::thread::spawn(move || {
|
std::thread::spawn(move || {
|
||||||
if let Err(e) = cloned_index_scheduler.notify_webhooks(webhooks, &ids) {
|
if let Err(e) = cloned_index_scheduler.notify_webhooks(webhooks, &ids) {
|
||||||
|
@ -98,6 +98,8 @@ impl IndexScheduler {
|
|||||||
indexes_path: tempdir.path().join("indexes"),
|
indexes_path: tempdir.path().join("indexes"),
|
||||||
snapshots_path: tempdir.path().join("snapshots"),
|
snapshots_path: tempdir.path().join("snapshots"),
|
||||||
dumps_path: tempdir.path().join("dumps"),
|
dumps_path: tempdir.path().join("dumps"),
|
||||||
|
cli_webhook_url: None,
|
||||||
|
cli_webhook_authorization: None,
|
||||||
task_db_size: 1000 * 1000 * 10, // 10 MB, we don't use MiB on purpose.
|
task_db_size: 1000 * 1000 * 10, // 10 MB, we don't use MiB on purpose.
|
||||||
index_base_map_size: 1000 * 1000, // 1 MB, we don't use MiB on purpose.
|
index_base_map_size: 1000 * 1000, // 1 MB, we don't use MiB on purpose.
|
||||||
enable_mdb_writemap: false,
|
enable_mdb_writemap: false,
|
||||||
|
@ -13,7 +13,6 @@ pub mod routes;
|
|||||||
pub mod search;
|
pub mod search;
|
||||||
pub mod search_queue;
|
pub mod search_queue;
|
||||||
|
|
||||||
use std::collections::BTreeMap;
|
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::{BufReader, BufWriter};
|
use std::io::{BufReader, BufWriter};
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
@ -49,14 +48,12 @@ use meilisearch_types::tasks::KindWithContent;
|
|||||||
use meilisearch_types::versioning::{
|
use meilisearch_types::versioning::{
|
||||||
create_current_version_file, get_version, VersionFileError, VERSION_MINOR, VERSION_PATCH,
|
create_current_version_file, get_version, VersionFileError, VERSION_MINOR, VERSION_PATCH,
|
||||||
};
|
};
|
||||||
use meilisearch_types::webhooks::Webhook;
|
|
||||||
use meilisearch_types::{compression, heed, milli, VERSION_FILE_NAME};
|
use meilisearch_types::{compression, heed, milli, VERSION_FILE_NAME};
|
||||||
pub use option::Opt;
|
pub use option::Opt;
|
||||||
use option::ScheduleSnapshot;
|
use option::ScheduleSnapshot;
|
||||||
use search_queue::SearchQueue;
|
use search_queue::SearchQueue;
|
||||||
use tracing::{error, info_span};
|
use tracing::{error, info_span};
|
||||||
use tracing_subscriber::filter::Targets;
|
use tracing_subscriber::filter::Targets;
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
use crate::error::MeilisearchHttpError;
|
use crate::error::MeilisearchHttpError;
|
||||||
|
|
||||||
@ -226,6 +223,8 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
|
|||||||
indexes_path: opt.db_path.join("indexes"),
|
indexes_path: opt.db_path.join("indexes"),
|
||||||
snapshots_path: opt.snapshot_dir.clone(),
|
snapshots_path: opt.snapshot_dir.clone(),
|
||||||
dumps_path: opt.dump_dir.clone(),
|
dumps_path: opt.dump_dir.clone(),
|
||||||
|
cli_webhook_url: opt.task_webhook_url.as_ref().map(|url| url.to_string()),
|
||||||
|
cli_webhook_authorization: opt.task_webhook_authorization_header.clone(),
|
||||||
task_db_size: opt.max_task_db_size.as_u64() as usize,
|
task_db_size: opt.max_task_db_size.as_u64() as usize,
|
||||||
index_base_map_size: opt.max_index_size.as_u64() as usize,
|
index_base_map_size: opt.max_index_size.as_u64() as usize,
|
||||||
enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage,
|
enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage,
|
||||||
@ -328,30 +327,6 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
// We set the webhook url
|
|
||||||
let cli_webhook = opt.task_webhook_url.as_ref().map(|u| Webhook {
|
|
||||||
url: u.to_string(),
|
|
||||||
headers: {
|
|
||||||
let mut headers = BTreeMap::new();
|
|
||||||
if let Some(value) = &opt.task_webhook_authorization_header {
|
|
||||||
headers.insert(String::from("Authorization"), value.to_string());
|
|
||||||
}
|
|
||||||
headers
|
|
||||||
},
|
|
||||||
});
|
|
||||||
let mut webhooks = index_scheduler.webhooks();
|
|
||||||
if webhooks.webhooks.get(&Uuid::nil()) != cli_webhook.as_ref() {
|
|
||||||
match cli_webhook {
|
|
||||||
Some(webhook) => {
|
|
||||||
webhooks.webhooks.insert(Uuid::nil(), webhook);
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
webhooks.webhooks.remove(&Uuid::nil());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
index_scheduler.put_webhooks(webhooks)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok((index_scheduler, auth_controller))
|
Ok((index_scheduler, auth_controller))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user