fix race condition: take the rtxn before entering the thread so we're sure we won't try to retrieve deleted tasks

This commit is contained in:
Tamo
2025-08-05 16:47:35 +02:00
parent 2b5b41790e
commit 3f1e172c6f
2 changed files with 40 additions and 36 deletions

View File

@ -784,7 +784,7 @@ impl IndexScheduler {
}
/// Once the tasks changes have been committed we must send all the tasks that were updated to our webhooks
fn notify_webhooks(&self, webhooks: Webhooks, updated: &RoaringBitmap) -> Result<()> {
fn notify_webhooks(&self, updated: RoaringBitmap) {
struct TaskReader<'a, 'b> {
rtxn: &'a RoTxn<'a>,
index_scheduler: &'a IndexScheduler,
@ -829,33 +829,46 @@ impl IndexScheduler {
}
}
let rtxn = self.env.read_txn()?;
for (uuid, Webhook { url, headers }) in webhooks.webhooks.iter() {
let task_reader = TaskReader {
rtxn: &rtxn,
index_scheduler: self,
tasks: &mut updated.into_iter(),
buffer: Vec::with_capacity(page_size::get()),
written: 0,
};
let reader = GzEncoder::new(BufReader::new(task_reader), Compression::default());
let mut request = ureq::post(url)
.timeout(Duration::from_secs(30))
.set("Content-Encoding", "gzip")
.set("Content-Type", "application/x-ndjson");
for (header_name, header_value) in headers.iter() {
request = request.set(header_name, header_value);
}
if let Err(e) = request.send(reader) {
tracing::error!("While sending data to the webhook {uuid}: {e}");
}
let webhooks = self.webhooks();
if webhooks.webhooks.is_empty() {
return;
}
let this = self.private_clone();
// We must take the RoTxn before entering the thread::spawn otherwise another batch may be
// processed before we had the time to take our txn.
let rtxn = match self.env.clone().static_read_txn() {
Ok(rtxn) => rtxn,
Err(e) => {
tracing::error!("Couldn't get an rtxn to notify the webhook: {e}");
return;
}
};
Ok(())
std::thread::spawn(move || {
for (uuid, Webhook { url, headers }) in webhooks.webhooks.iter() {
let task_reader = TaskReader {
rtxn: &rtxn,
index_scheduler: &this,
tasks: &mut updated.iter(),
buffer: Vec::with_capacity(page_size::get()),
written: 0,
};
let reader = GzEncoder::new(BufReader::new(task_reader), Compression::default());
let mut request = ureq::post(url)
.timeout(Duration::from_secs(30))
.set("Content-Encoding", "gzip")
.set("Content-Type", "application/x-ndjson");
for (header_name, header_value) in headers.iter() {
request = request.set(header_name, header_value);
}
if let Err(e) = request.send(reader) {
tracing::error!("While sending data to the webhook {uuid}: {e}");
}
}
});
}
pub fn index_stats(&self, index_uid: &str) -> Result<IndexStats> {

View File

@ -446,16 +446,7 @@ impl IndexScheduler {
Ok(())
})?;
// We shouldn't crash the tick function if we can't send data to the webhooks
let webhooks = self.webhooks();
if !webhooks.webhooks.is_empty() {
let cloned_index_scheduler = self.private_clone();
std::thread::spawn(move || {
if let Err(e) = cloned_index_scheduler.notify_webhooks(webhooks, &ids) {
tracing::error!("Failure to notify webhooks: {e}");
}
});
}
self.notify_webhooks(ids);
#[cfg(test)]
self.breakpoint(crate::test_utils::Breakpoint::AfterProcessing);