From ed147f80ac782a0fade13f009b4365859b037ec0 Mon Sep 17 00:00:00 2001 From: Mubelotix Date: Thu, 31 Jul 2025 16:45:30 +0200 Subject: [PATCH] Add test and fix bug --- crates/index-scheduler/src/lib.rs | 41 +++++++++++++------ crates/meilisearch/tests/tasks/webhook.rs | 49 ++++++++++++++++++++++- 2 files changed, 77 insertions(+), 13 deletions(-) diff --git a/crates/index-scheduler/src/lib.rs b/crates/index-scheduler/src/lib.rs index ce8791a63..8d7617b6c 100644 --- a/crates/index-scheduler/src/lib.rs +++ b/crates/index-scheduler/src/lib.rs @@ -828,16 +828,32 @@ impl IndexScheduler { written: 0, }; - enum EitherRead { - Other(T), - Data(Vec), + enum EitherRead<'a, T: Read> { + Other(Option), + Data(&'a [u8]), } - impl Read for &mut EitherRead { + impl EitherRead<'_, T> { + /// A clone that works only once for the Other variant. + fn clone(&mut self) -> Self { + match self { + Self::Other(r) => { + let r = r.take(); + Self::Other(r) + } + Self::Data(arg0) => Self::Data(arg0), + } + } + } + + impl Read for EitherRead<'_, T> { fn read(&mut self, buf: &mut [u8]) -> std::io::Result { match self { - EitherRead::Other(reader) => reader.read(buf), - EitherRead::Data(data) => data.as_slice().read(buf), + EitherRead::Other(Some(reader)) => reader.read(buf), + EitherRead::Other(None) => { + Err(io::Error::new(io::ErrorKind::Other, "No reader available")) + } + EitherRead::Data(data) => data.read(buf), } } } @@ -845,16 +861,17 @@ impl IndexScheduler { let mut reader = GzEncoder::new(BufReader::new(task_reader), Compression::default()); // When there is more than one webhook, cache the data in memory + let mut data; let mut reader = match webhooks.webhooks.len() { - 1 => EitherRead::Other(reader), + 1 => EitherRead::Other(Some(reader)), _ => { - let mut data = Vec::new(); + data = Vec::new(); reader.read_to_end(&mut data)?; - EitherRead::Data(data) + EitherRead::Data(&data) } }; - for (name, Webhook { url, headers }) in webhooks.webhooks.iter() { + for (uuid, Webhook { url, headers }) in webhooks.webhooks.iter() { let mut request = ureq::post(url) .timeout(Duration::from_secs(30)) .set("Content-Encoding", "gzip") @@ -863,8 +880,8 @@ impl IndexScheduler { request = request.set(header_name, header_value); } - if let Err(e) = request.send(&mut reader) { - tracing::error!("While sending data to the webhook {name}: {e}"); + if let Err(e) = request.send(reader.clone()) { + tracing::error!("While sending data to the webhook {uuid}: {e}"); } } diff --git a/crates/meilisearch/tests/tasks/webhook.rs b/crates/meilisearch/tests/tasks/webhook.rs index 14946e415..4caa7df92 100644 --- a/crates/meilisearch/tests/tasks/webhook.rs +++ b/crates/meilisearch/tests/tasks/webhook.rs @@ -115,10 +115,10 @@ async fn single_receives_data() { } "#); - let index = server.index("tamo"); // May be flaky: we're relying on the fact that while the first document addition is processed, the other // operations will be received and will be batched together. If it doesn't happen it's not a problem // the rest of the test won't assume anything about the number of tasks per batch. + let index = server.index("tamo"); for i in 0..5 { let (_, _status) = index.add_documents(json!({ "id": i, "doggo": "bone" }), None).await; } @@ -164,6 +164,53 @@ async fn single_receives_data() { server_handle.abort(); } +#[actix_web::test] +async fn multiple_receive_data() { + let server = Server::new().await; + + let WebhookHandle { server_handle: handle1, url: url1, receiver: mut receiver1 } = + create_webhook_server().await; + let WebhookHandle { server_handle: handle2, url: url2, receiver: mut receiver2 } = + create_webhook_server().await; + let WebhookHandle { server_handle: handle3, url: url3, receiver: mut receiver3 } = + create_webhook_server().await; + + for url in [url1, url2, url3] { + let (value, code) = server.create_webhook(json!({ "url": url })).await; + snapshot!(code, @"201 Created"); + snapshot!(json_string!(value, { ".uuid" => "[uuid]", ".url" => "[ignored]" }), @r#" + { + "uuid": "[uuid]", + "isEditable": true, + "url": "[ignored]", + "headers": {} + } + "#); + } + let index = server.index("tamo"); + let (_, status) = index.add_documents(json!({ "id": 1, "doggo": "bone" }), None).await; + snapshot!(status, @"202 Accepted"); + + let mut count1 = 0; + let mut count2 = 0; + let mut count3 = 0; + while count1 == 0 || count2 == 0 || count3 == 0 { + tokio::select! { + msg = receiver1.recv() => { if msg.is_some() { count1 += 1; } }, + msg = receiver2.recv() => { if msg.is_some() { count2 += 1; } }, + msg = receiver3.recv() => { if msg.is_some() { count3 += 1; } }, + } + } + + assert_eq!(count1, 1); + assert_eq!(count2, 1); + assert_eq!(count3, 1); + + handle1.abort(); + handle2.abort(); + handle3.abort(); +} + #[actix_web::test] async fn cli_with_dumps() { let db_path = tempfile::tempdir().unwrap();