Make notify_webhooks execute in its own thread

This commit is contained in:
Mubelotix
2025-08-04 17:13:05 +02:00
parent ddfcacbb62
commit 7251cccd03
2 changed files with 14 additions and 10 deletions

View File

@ -766,14 +766,8 @@ impl IndexScheduler {
Ok(()) Ok(())
} }
/// Once the tasks changes have been committed we must send all the tasks that were updated to our webhook if there is one. /// Once the tasks changes have been committed we must send all the tasks that were updated to our webhooks
fn notify_webhook(&self, updated: &RoaringBitmap) -> Result<()> { fn notify_webhooks(&self, webhooks: Webhooks, updated: &RoaringBitmap) -> Result<()> {
let webhooks = self.cached_webhooks.read().unwrap_or_else(|poisoned| poisoned.into_inner());
if webhooks.webhooks.is_empty() {
return Ok(());
}
let webhooks = Webhooks::clone(&*webhooks);
struct TaskReader<'a, 'b> { struct TaskReader<'a, 'b> {
rtxn: &'a RoTxn<'a>, rtxn: &'a RoTxn<'a>,
index_scheduler: &'a IndexScheduler, index_scheduler: &'a IndexScheduler,

View File

@ -26,6 +26,7 @@ use meilisearch_types::error::ResponseError;
use meilisearch_types::heed::{Env, WithoutTls}; use meilisearch_types::heed::{Env, WithoutTls};
use meilisearch_types::milli; use meilisearch_types::milli;
use meilisearch_types::tasks::Status; use meilisearch_types::tasks::Status;
use meilisearch_types::webhooks::Webhooks;
use process_batch::ProcessBatchInfo; use process_batch::ProcessBatchInfo;
use rayon::current_num_threads; use rayon::current_num_threads;
use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rayon::iter::{IntoParallelIterator, ParallelIterator};
@ -446,8 +447,17 @@ impl IndexScheduler {
Ok(()) Ok(())
})?; })?;
// We shouldn't crash the tick function if we can't send data to the webhook. // We shouldn't crash the tick function if we can't send data to the webhooks
let _ = self.notify_webhook(&ids); let webhooks = self.cached_webhooks.read().unwrap_or_else(|p| p.into_inner());
if !webhooks.webhooks.is_empty() {
let webhooks = Webhooks::clone(&*webhooks);
let cloned_index_scheduler = self.private_clone();
std::thread::spawn(move || {
if let Err(e) = cloned_index_scheduler.notify_webhooks(webhooks, &ids) {
tracing::error!("Failure to notify webhooks: {e}");
}
});
}
#[cfg(test)] #[cfg(test)]
self.breakpoint(crate::test_utils::Breakpoint::AfterProcessing); self.breakpoint(crate::test_utils::Breakpoint::AfterProcessing);