Add variable registration mechanism

This commit is contained in:
Mubelotix
2025-08-26 15:30:17 +02:00
parent b99410ee6c
commit 5b31960967
4 changed files with 92 additions and 20 deletions

View File

@ -1,4 +1,4 @@
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::fs::File;
use std::io::{Seek as _, Write as _};
use std::path::Path;
@ -47,7 +47,15 @@ async fn run_workload_commands(
let assets = Arc::new(workload.assets.clone());
let asset_folder = args.common.asset_folder.clone().leak();
run_commands(meili_client, &workload.precommands, &assets, asset_folder, false).await?;
run_commands(
meili_client,
&workload.precommands,
&assets,
asset_folder,
&mut HashMap::new(),
false,
)
.await?;
std::fs::create_dir_all(report_folder)
.with_context(|| format!("could not create report directory at {report_folder}"))?;
@ -57,7 +65,15 @@ async fn run_workload_commands(
let report_handle = start_report(logs_client, trace_filename, &workload.target).await?;
run_commands(meili_client, &workload.commands, &assets, asset_folder, false).await?;
run_commands(
meili_client,
&workload.commands,
&assets,
asset_folder,
&mut HashMap::new(),
false,
)
.await?;
let processor =
stop_report(dashboard_client, logs_client, workload_uuid, report_filename, report_handle)

View File

@ -1,4 +1,4 @@
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::fmt::Display;
use std::io::Read as _;
use std::sync::Arc;
@ -23,6 +23,8 @@ pub struct Command {
pub expected_status: Option<u16>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub expected_response: Option<serde_json::Value>,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub register: HashMap<String, String>,
#[serde(default)]
synchronous: SyncMode,
}
@ -44,15 +46,46 @@ impl Body {
pub fn get(
self,
assets: &BTreeMap<String, Asset>,
registered: HashMap<String, Value>,
asset_folder: &str,
) -> anyhow::Result<Option<(Vec<u8>, &'static str)>> {
Ok(match self {
Body::Inline { inline: body } => Some((
serde_json::to_vec(&body)
.context("serializing to bytes")
.context("while getting inline body")?,
"application/json",
)),
Body::Inline { inline: mut body } => {
fn insert_variables(value: &mut Value, registered: &HashMap<String, Value>) {
match value {
Value::Null | Value::Bool(_) | Value::Number(_) => (),
Value::String(s) => {
if s.starts_with("{{") && s.ends_with("}}") {
let name = s[2..s.len() - 2].trim();
if let Some(replacement) = registered.get(name) {
*value = replacement.clone();
}
}
}
Value::Array(values) => {
for value in values {
insert_variables(value, registered);
}
}
Value::Object(map) => {
for (_key, value) in map.iter_mut() {
insert_variables(value, registered);
}
}
}
}
if !registered.is_empty() {
insert_variables(&mut body, &registered);
}
Some((
serde_json::to_vec(&body)
.context("serializing to bytes")
.context("while getting inline body")?,
"application/json",
))
}
Body::Asset { asset: name } => Some({
let context = || format!("while getting body from asset '{name}'");
let (mut file, format) =
@ -82,28 +115,40 @@ enum SyncMode {
async fn run_batch(
client: &Arc<Client>,
batch: Vec<Command>,
batch: &[Command],
assets: &Arc<BTreeMap<String, Asset>>,
asset_folder: &'static str,
registered: &mut HashMap<String, Value>,
return_response: bool,
) -> anyhow::Result<Vec<(Value, StatusCode)>> {
let [.., last] = batch.as_slice() else { return Ok(Vec::new()) };
let [.., last] = batch else { return Ok(Vec::new()) };
let sync = last.synchronous;
let batch_len = batch.len();
let mut tasks = Vec::with_capacity(batch.len());
for batch in batch {
for command in batch.iter().cloned() {
let client2 = Arc::clone(client);
let assets2 = Arc::clone(assets);
let needs_response = return_response || !command.register.is_empty();
let registered2 = registered.clone(); // FIXME: cloning the whole map for each command is inefficient
tasks.push(tokio::spawn(async move {
run(&client2, &batch, &assets2, asset_folder, return_response).await
run(&client2, &command, &assets2, registered2, asset_folder, needs_response).await
}));
}
let mut outputs = Vec::with_capacity(if return_response { batch_len } else { 0 });
for task in tasks {
for (task, command) in tasks.into_iter().zip(batch.iter()) {
let output = task.await.context("task panicked")??;
if let Some(output) = output {
for (name, path) in &command.register {
let value = output
.0
.pointer(path)
.with_context(|| format!("could not find path '{path}' in response (required to register '{name}')"))?
.clone();
registered.insert(name.clone(), value);
}
if return_response {
outputs.push(output);
}
@ -203,6 +248,7 @@ pub async fn run(
client: &Client,
command: &Command,
assets: &BTreeMap<String, Asset>,
registered: HashMap<String, Value>,
asset_folder: &str,
return_value: bool,
) -> anyhow::Result<Option<(Value, StatusCode)>> {
@ -210,7 +256,7 @@ pub async fn run(
let body = command
.body
.clone()
.get(assets, asset_folder)
.get(assets, registered, asset_folder)
.with_context(|| format!("while getting body for command {command}"))?;
let request = client.request(command.method.into(), &command.route);
@ -287,6 +333,7 @@ pub async fn run_commands(
commands: &[Command],
assets: &Arc<BTreeMap<String, Asset>>,
asset_folder: &'static str,
registered: &mut HashMap<String, Value>,
return_response: bool,
) -> anyhow::Result<Vec<(Value, StatusCode)>> {
let mut responses = Vec::new();
@ -294,7 +341,7 @@ pub async fn run_commands(
commands.split_inclusive(|command| !matches!(command.synchronous, SyncMode::DontWait))
{
let mut new_responses =
run_batch(client, batch.to_vec(), assets, asset_folder, return_response).await?;
run_batch(client, batch, assets, asset_folder, registered, return_response).await?;
responses.append(&mut new_responses);
}
@ -306,6 +353,7 @@ pub fn health_command() -> Command {
route: "/health".into(),
method: crate::common::client::Method::Get,
body: Default::default(),
register: HashMap::new(),
synchronous: SyncMode::WaitForResponse,
expected_status: None,
expected_response: None,

View File

@ -1,4 +1,4 @@
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::path::Path;
use std::time::Duration;
@ -128,7 +128,9 @@ async fn wait_for_health(
meilisearch: &mut tokio::process::Child,
) -> anyhow::Result<()> {
for i in 0..100 {
let res = run_command(client, &health_command(), &BTreeMap::new(), "", false).await;
let res =
run_command(client, &health_command(), &BTreeMap::new(), HashMap::new(), "", false)
.await;
if res.is_ok() {
// check that this is actually the current Meilisearch instance that answered us
if let Some(exit_code) =

View File

@ -3,7 +3,11 @@ use cargo_metadata::semver::Version;
use chrono::DateTime;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::{collections::BTreeMap, io::Write, sync::Arc};
use std::{
collections::{BTreeMap, HashMap},
io::Write,
sync::Arc,
};
use crate::{
common::{
@ -121,6 +125,7 @@ impl TestWorkload {
let assets = Arc::new(self.assets.clone());
let return_responses = dbg!(args.add_missing_responses || args.update_responses);
let mut registered = HashMap::new();
for command_or_upgrade in commands_or_upgrade {
match command_or_upgrade {
CommandOrUpgradeVec::Commands(commands) => {
@ -130,6 +135,7 @@ impl TestWorkload {
&cloned,
&assets,
asset_folder,
&mut registered,
return_responses,
)
.await?;