Implement test workload running logic

This commit is contained in:
Mubelotix
2025-08-25 13:32:34 +02:00
parent 0d8b2edfb0
commit c98efe18c9
4 changed files with 128 additions and 25 deletions

View File

@ -29,13 +29,6 @@ pub struct BenchDeriveArgs {
#[command(flatten)] #[command(flatten)]
common: CommonArgs, common: CommonArgs,
/// Filename of the workload file, pass multiple filenames
/// to run multiple workloads in the specified order.
///
/// Each workload run will get its own report file.
#[arg(value_name = "WORKLOAD_FILE", last = false)]
workload_file: Vec<PathBuf>,
/// URL of the dashboard. /// URL of the dashboard.
#[arg(long, default_value_t = default_dashboard_url())] #[arg(long, default_value_t = default_dashboard_url())]
dashboard_url: String, dashboard_url: String,
@ -52,18 +45,10 @@ pub struct BenchDeriveArgs {
#[arg(long)] #[arg(long)]
api_key: Option<String>, api_key: Option<String>,
/// Meilisearch master keys
#[arg(long)]
master_key: Option<String>,
/// Reason for the benchmark invocation /// Reason for the benchmark invocation
#[arg(short, long)] #[arg(short, long)]
reason: Option<String>, reason: Option<String>,
/// The maximum time in seconds we allow for fetching the task queue before timing out.
#[arg(long, default_value_t = 60)]
tasks_queue_timeout_secs: u64,
/// The path to the binary to run. /// The path to the binary to run.
/// ///
/// If unspecified, runs `cargo run` after building Meilisearch with `cargo build`. /// If unspecified, runs `cargo run` after building Meilisearch with `cargo build`.
@ -100,14 +85,14 @@ pub fn run(args: BenchDeriveArgs) -> anyhow::Result<()> {
// Also we don't want any pesky timeout because we don't know how much time it will take to recover the full trace // Also we don't want any pesky timeout because we don't know how much time it will take to recover the full trace
let logs_client = Client::new( let logs_client = Client::new(
Some("http://127.0.0.1:7700/logs/stream".into()), Some("http://127.0.0.1:7700/logs/stream".into()),
args.master_key.as_deref(), args.common.master_key.as_deref(),
None, None,
)?; )?;
let meili_client = Client::new( let meili_client = Client::new(
Some("http://127.0.0.1:7700".into()), Some("http://127.0.0.1:7700".into()),
args.master_key.as_deref(), args.common.master_key.as_deref(),
Some(std::time::Duration::from_secs(args.tasks_queue_timeout_secs)), Some(std::time::Duration::from_secs(args.common.tasks_queue_timeout_secs)),
)?; )?;
// enter runtime // enter runtime
@ -116,11 +101,11 @@ pub fn run(args: BenchDeriveArgs) -> anyhow::Result<()> {
dashboard_client.send_machine_info(&env).await?; dashboard_client.send_machine_info(&env).await?;
let commit_message = build_info.commit_msg.unwrap_or_default().split('\n').next().unwrap(); let commit_message = build_info.commit_msg.unwrap_or_default().split('\n').next().unwrap();
let max_workloads = args.workload_file.len(); let max_workloads = args.common.workload_file.len();
let reason: Option<&str> = args.reason.as_deref(); let reason: Option<&str> = args.reason.as_deref();
let invocation_uuid = dashboard_client.create_invocation(build_info.clone(), commit_message, env, max_workloads, reason).await?; let invocation_uuid = dashboard_client.create_invocation(build_info.clone(), commit_message, env, max_workloads, reason).await?;
tracing::info!(workload_count = args.workload_file.len(), "handling workload files"); tracing::info!(workload_count = args.common.workload_file.len(), "handling workload files");
// main task // main task
let workload_runs = tokio::spawn( let workload_runs = tokio::spawn(
@ -128,7 +113,7 @@ pub fn run(args: BenchDeriveArgs) -> anyhow::Result<()> {
let dashboard_client = dashboard_client.clone(); let dashboard_client = dashboard_client.clone();
let mut dashboard_urls = Vec::new(); let mut dashboard_urls = Vec::new();
async move { async move {
for workload_file in args.workload_file.iter() { for workload_file in args.common.workload_file.iter() {
let workload: Workload = serde_json::from_reader( let workload: Workload = serde_json::from_reader(
std::fs::File::open(workload_file) std::fs::File::open(workload_file)
.with_context(|| format!("error opening {}", workload_file.display()))?, .with_context(|| format!("error opening {}", workload_file.display()))?,
@ -147,7 +132,7 @@ pub fn run(args: BenchDeriveArgs) -> anyhow::Result<()> {
&logs_client, &logs_client,
&meili_client, &meili_client,
invocation_uuid, invocation_uuid,
args.master_key.as_deref(), args.common.master_key.as_deref(),
workload, workload,
&args, &args,
args.binary_path.as_deref(), args.binary_path.as_deref(),

View File

@ -1,4 +1,5 @@
use clap::Parser; use clap::Parser;
use std::path::PathBuf;
pub fn default_asset_folder() -> String { pub fn default_asset_folder() -> String {
"./bench/assets/".into() "./bench/assets/".into()
@ -10,6 +11,17 @@ pub fn default_log_filter() -> String {
#[derive(Parser, Debug, Clone)] #[derive(Parser, Debug, Clone)]
pub struct CommonArgs { pub struct CommonArgs {
/// Filename of the workload file, pass multiple filenames
/// to run multiple workloads in the specified order.
///
/// For benches, each workload run will get its own report file.
#[arg(value_name = "WORKLOAD_FILE", last = false)]
pub workload_file: Vec<PathBuf>,
/// Meilisearch master keys
#[arg(long)]
pub master_key: Option<String>,
/// Directory to store the remote assets. /// Directory to store the remote assets.
#[arg(long, default_value_t = default_asset_folder())] #[arg(long, default_value_t = default_asset_folder())]
pub asset_folder: String, pub asset_folder: String,
@ -21,4 +33,8 @@ pub struct CommonArgs {
/// Authentication bearer for fetching assets /// Authentication bearer for fetching assets
#[arg(long)] #[arg(long)]
pub assets_key: Option<String>, pub assets_key: Option<String>,
/// The maximum time in seconds we allow for fetching the task queue before timing out.
#[arg(long, default_value_t = 60)]
pub tasks_queue_timeout_secs: u64,
} }

View File

@ -1,4 +1,7 @@
use crate::common::{args::CommonArgs, client::Client, logs::setup_logs}; use std::time::Duration;
use crate::common::{args::CommonArgs, client::Client, logs::setup_logs, workload::Workload};
use anyhow::{bail, Context};
use cargo_metadata::semver::Version; use cargo_metadata::semver::Version;
use clap::Parser; use clap::Parser;
@ -17,14 +20,59 @@ pub struct TestDeriveArgs {
} }
pub fn run(args: TestDeriveArgs) -> anyhow::Result<()> { pub fn run(args: TestDeriveArgs) -> anyhow::Result<()> {
let rt = tokio::runtime::Builder::new_current_thread().enable_io().enable_time().build()?;
let _scope = rt.enter();
rt.block_on(async { run_inner(args).await })?;
Ok(())
}
async fn run_inner(args: TestDeriveArgs) -> anyhow::Result<()> {
setup_logs(&args.common.log_filter)?; setup_logs(&args.common.log_filter)?;
// setup clients // setup clients
let assets_client = Client::new( let assets_client = Client::new(
None, None,
args.common.assets_key.as_deref(), args.common.assets_key.as_deref(),
Some(std::time::Duration::from_secs(3600)), // 1h Some(Duration::from_secs(3600)), // 1h
)?; )?;
let meili_client = Client::new(
Some("http://127.0.0.1:7700".into()),
args.common.master_key.as_deref(),
Some(Duration::from_secs(args.common.tasks_queue_timeout_secs)),
)?;
for workload_file in &args.common.workload_file {
let workload: Workload = serde_json::from_reader(
std::fs::File::open(workload_file)
.with_context(|| format!("error opening {}", workload_file.display()))?,
)
.with_context(|| format!("error parsing {} as JSON", workload_file.display()))?;
let Workload::Test(workload) = workload else {
bail!("workload file {} is not a test workload", workload_file.display());
};
match workload.run(&args, &assets_client, &meili_client).await {
Ok(_) => {
println!(
"✅ Workload {} from file {} completed successfully",
workload.name,
workload_file.display()
);
}
Err(error) => {
println!(
"❌ Workload {} from file {} failed: {error}",
workload.name,
workload_file.display()
);
return Err(error);
}
}
}
Ok(()) Ok(())
} }

View File

@ -2,7 +2,14 @@ use cargo_metadata::semver::Version;
use serde::Deserialize; use serde::Deserialize;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use crate::common::{assets::Asset, command::Command}; use crate::{
common::{
assets::{fetch_assets, Asset},
client::Client,
command::{run_commands, Command},
},
test::TestDeriveArgs,
};
#[derive(Deserialize)] #[derive(Deserialize)]
#[serde(untagged)] #[serde(untagged)]
@ -11,6 +18,11 @@ pub enum CommandOrUpgrade {
Upgrade { upgrade: Version }, Upgrade { upgrade: Version },
} }
enum CommandOrUpgradeVec {
Commands(Vec<Command>),
Upgrade(Version),
}
/// A test workload. /// A test workload.
/// Not to be confused with [a bench workload](crate::bench::workload::Workload). /// Not to be confused with [a bench workload](crate::bench::workload::Workload).
#[derive(Deserialize)] #[derive(Deserialize)]
@ -19,3 +31,45 @@ pub struct TestWorkload {
pub assets: BTreeMap<String, Asset>, pub assets: BTreeMap<String, Asset>,
pub commands: Vec<CommandOrUpgrade>, pub commands: Vec<CommandOrUpgrade>,
} }
impl TestWorkload {
pub async fn run(
&self,
args: &TestDeriveArgs,
assets_client: &Client,
meili_client: &Client,
) -> anyhow::Result<()> {
// Fetch assets
fetch_assets(assets_client, &self.assets, &args.common.asset_folder).await?;
// Group commands between upgrades
let mut commands_or_upgrade = Vec::new();
let mut current_commands = Vec::new();
for command_or_upgrade in &self.commands {
match command_or_upgrade {
CommandOrUpgrade::Command(command) => current_commands.push(command.clone()),
CommandOrUpgrade::Upgrade { upgrade } => {
if !current_commands.is_empty() {
commands_or_upgrade.push(CommandOrUpgradeVec::Commands(current_commands));
current_commands = Vec::new();
}
commands_or_upgrade.push(CommandOrUpgradeVec::Upgrade(upgrade.clone()));
}
}
}
for command_or_upgrade in commands_or_upgrade {
match command_or_upgrade {
CommandOrUpgradeVec::Commands(commands) => {
run_commands(meili_client, &commands, &self.assets, &args.common.asset_folder)
.await?;
}
CommandOrUpgradeVec::Upgrade(version) => {
todo!()
}
}
}
Ok(())
}
}