From 3f1e172c6f8adf9db2d782e043032c0848b63d45 Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 5 Aug 2025 16:47:35 +0200 Subject: [PATCH] fix race condition: take the rtxn before entering the thread so we're sure we won't try to retrieve deleted tasks --- crates/index-scheduler/src/lib.rs | 65 ++++++++++++--------- crates/index-scheduler/src/scheduler/mod.rs | 11 +--- 2 files changed, 40 insertions(+), 36 deletions(-) diff --git a/crates/index-scheduler/src/lib.rs b/crates/index-scheduler/src/lib.rs index d04b8f9e2..419e6f21e 100644 --- a/crates/index-scheduler/src/lib.rs +++ b/crates/index-scheduler/src/lib.rs @@ -784,7 +784,7 @@ impl IndexScheduler { } /// Once the tasks changes have been committed we must send all the tasks that were updated to our webhooks - fn notify_webhooks(&self, webhooks: Webhooks, updated: &RoaringBitmap) -> Result<()> { + fn notify_webhooks(&self, updated: RoaringBitmap) { struct TaskReader<'a, 'b> { rtxn: &'a RoTxn<'a>, index_scheduler: &'a IndexScheduler, @@ -829,33 +829,46 @@ impl IndexScheduler { } } - let rtxn = self.env.read_txn()?; - - for (uuid, Webhook { url, headers }) in webhooks.webhooks.iter() { - let task_reader = TaskReader { - rtxn: &rtxn, - index_scheduler: self, - tasks: &mut updated.into_iter(), - buffer: Vec::with_capacity(page_size::get()), - written: 0, - }; - - let reader = GzEncoder::new(BufReader::new(task_reader), Compression::default()); - - let mut request = ureq::post(url) - .timeout(Duration::from_secs(30)) - .set("Content-Encoding", "gzip") - .set("Content-Type", "application/x-ndjson"); - for (header_name, header_value) in headers.iter() { - request = request.set(header_name, header_value); - } - - if let Err(e) = request.send(reader) { - tracing::error!("While sending data to the webhook {uuid}: {e}"); - } + let webhooks = self.webhooks(); + if webhooks.webhooks.is_empty() { + return; } + let this = self.private_clone(); + // We must take the RoTxn before entering the thread::spawn otherwise another batch may be + // processed before we had the time to take our txn. + let rtxn = match self.env.clone().static_read_txn() { + Ok(rtxn) => rtxn, + Err(e) => { + tracing::error!("Couldn't get an rtxn to notify the webhook: {e}"); + return; + } + }; - Ok(()) + std::thread::spawn(move || { + for (uuid, Webhook { url, headers }) in webhooks.webhooks.iter() { + let task_reader = TaskReader { + rtxn: &rtxn, + index_scheduler: &this, + tasks: &mut updated.iter(), + buffer: Vec::with_capacity(page_size::get()), + written: 0, + }; + + let reader = GzEncoder::new(BufReader::new(task_reader), Compression::default()); + + let mut request = ureq::post(url) + .timeout(Duration::from_secs(30)) + .set("Content-Encoding", "gzip") + .set("Content-Type", "application/x-ndjson"); + for (header_name, header_value) in headers.iter() { + request = request.set(header_name, header_value); + } + + if let Err(e) = request.send(reader) { + tracing::error!("While sending data to the webhook {uuid}: {e}"); + } + } + }); } pub fn index_stats(&self, index_uid: &str) -> Result { diff --git a/crates/index-scheduler/src/scheduler/mod.rs b/crates/index-scheduler/src/scheduler/mod.rs index bbfc4e058..b2bb90c0b 100644 --- a/crates/index-scheduler/src/scheduler/mod.rs +++ b/crates/index-scheduler/src/scheduler/mod.rs @@ -446,16 +446,7 @@ impl IndexScheduler { Ok(()) })?; - // We shouldn't crash the tick function if we can't send data to the webhooks - let webhooks = self.webhooks(); - if !webhooks.webhooks.is_empty() { - let cloned_index_scheduler = self.private_clone(); - std::thread::spawn(move || { - if let Err(e) = cloned_index_scheduler.notify_webhooks(webhooks, &ids) { - tracing::error!("Failure to notify webhooks: {e}"); - } - }); - } + self.notify_webhooks(ids); #[cfg(test)] self.breakpoint(crate::test_utils::Breakpoint::AfterProcessing);