Add test and fix bug

This commit is contained in:
Mubelotix
2025-07-31 16:45:30 +02:00
parent 35537e0b0b
commit ed147f80ac
2 changed files with 77 additions and 13 deletions

View File

@ -828,16 +828,32 @@ impl IndexScheduler {
written: 0,
};
enum EitherRead<T: Read> {
Other(T),
Data(Vec<u8>),
enum EitherRead<'a, T: Read> {
Other(Option<T>),
Data(&'a [u8]),
}
impl<T: Read> Read for &mut EitherRead<T> {
impl<T: Read> EitherRead<'_, T> {
/// A clone that works only once for the Other variant.
fn clone(&mut self) -> Self {
match self {
Self::Other(r) => {
let r = r.take();
Self::Other(r)
}
Self::Data(arg0) => Self::Data(arg0),
}
}
}
impl<T: Read> Read for EitherRead<'_, T> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
match self {
EitherRead::Other(reader) => reader.read(buf),
EitherRead::Data(data) => data.as_slice().read(buf),
EitherRead::Other(Some(reader)) => reader.read(buf),
EitherRead::Other(None) => {
Err(io::Error::new(io::ErrorKind::Other, "No reader available"))
}
EitherRead::Data(data) => data.read(buf),
}
}
}
@ -845,16 +861,17 @@ impl IndexScheduler {
let mut reader = GzEncoder::new(BufReader::new(task_reader), Compression::default());
// When there is more than one webhook, cache the data in memory
let mut data;
let mut reader = match webhooks.webhooks.len() {
1 => EitherRead::Other(reader),
1 => EitherRead::Other(Some(reader)),
_ => {
let mut data = Vec::new();
data = Vec::new();
reader.read_to_end(&mut data)?;
EitherRead::Data(data)
EitherRead::Data(&data)
}
};
for (name, Webhook { url, headers }) in webhooks.webhooks.iter() {
for (uuid, Webhook { url, headers }) in webhooks.webhooks.iter() {
let mut request = ureq::post(url)
.timeout(Duration::from_secs(30))
.set("Content-Encoding", "gzip")
@ -863,8 +880,8 @@ impl IndexScheduler {
request = request.set(header_name, header_value);
}
if let Err(e) = request.send(&mut reader) {
tracing::error!("While sending data to the webhook {name}: {e}");
if let Err(e) = request.send(reader.clone()) {
tracing::error!("While sending data to the webhook {uuid}: {e}");
}
}

View File

@ -115,10 +115,10 @@ async fn single_receives_data() {
}
"#);
let index = server.index("tamo");
// May be flaky: we're relying on the fact that while the first document addition is processed, the other
// operations will be received and will be batched together. If it doesn't happen it's not a problem
// the rest of the test won't assume anything about the number of tasks per batch.
let index = server.index("tamo");
for i in 0..5 {
let (_, _status) = index.add_documents(json!({ "id": i, "doggo": "bone" }), None).await;
}
@ -164,6 +164,53 @@ async fn single_receives_data() {
server_handle.abort();
}
#[actix_web::test]
async fn multiple_receive_data() {
let server = Server::new().await;
let WebhookHandle { server_handle: handle1, url: url1, receiver: mut receiver1 } =
create_webhook_server().await;
let WebhookHandle { server_handle: handle2, url: url2, receiver: mut receiver2 } =
create_webhook_server().await;
let WebhookHandle { server_handle: handle3, url: url3, receiver: mut receiver3 } =
create_webhook_server().await;
for url in [url1, url2, url3] {
let (value, code) = server.create_webhook(json!({ "url": url })).await;
snapshot!(code, @"201 Created");
snapshot!(json_string!(value, { ".uuid" => "[uuid]", ".url" => "[ignored]" }), @r#"
{
"uuid": "[uuid]",
"isEditable": true,
"url": "[ignored]",
"headers": {}
}
"#);
}
let index = server.index("tamo");
let (_, status) = index.add_documents(json!({ "id": 1, "doggo": "bone" }), None).await;
snapshot!(status, @"202 Accepted");
let mut count1 = 0;
let mut count2 = 0;
let mut count3 = 0;
while count1 == 0 || count2 == 0 || count3 == 0 {
tokio::select! {
msg = receiver1.recv() => { if msg.is_some() { count1 += 1; } },
msg = receiver2.recv() => { if msg.is_some() { count2 += 1; } },
msg = receiver3.recv() => { if msg.is_some() { count3 += 1; } },
}
}
assert_eq!(count1, 1);
assert_eq!(count2, 1);
assert_eq!(count3, 1);
handle1.abort();
handle2.abort();
handle3.abort();
}
#[actix_web::test]
async fn cli_with_dumps() {
let db_path = tempfile::tempdir().unwrap();