diff --git a/crates/xtask/src/bench/mod.rs b/crates/xtask/src/bench/mod.rs index 198c106b7..db31ae255 100644 --- a/crates/xtask/src/bench/mod.rs +++ b/crates/xtask/src/bench/mod.rs @@ -29,13 +29,6 @@ pub struct BenchDeriveArgs { #[command(flatten)] common: CommonArgs, - /// Filename of the workload file, pass multiple filenames - /// to run multiple workloads in the specified order. - /// - /// Each workload run will get its own report file. - #[arg(value_name = "WORKLOAD_FILE", last = false)] - workload_file: Vec, - /// URL of the dashboard. #[arg(long, default_value_t = default_dashboard_url())] dashboard_url: String, @@ -52,18 +45,10 @@ pub struct BenchDeriveArgs { #[arg(long)] api_key: Option, - /// Meilisearch master keys - #[arg(long)] - master_key: Option, - /// Reason for the benchmark invocation #[arg(short, long)] reason: Option, - /// The maximum time in seconds we allow for fetching the task queue before timing out. - #[arg(long, default_value_t = 60)] - tasks_queue_timeout_secs: u64, - /// The path to the binary to run. /// /// If unspecified, runs `cargo run` after building Meilisearch with `cargo build`. @@ -100,14 +85,14 @@ pub fn run(args: BenchDeriveArgs) -> anyhow::Result<()> { // Also we don't want any pesky timeout because we don't know how much time it will take to recover the full trace let logs_client = Client::new( Some("http://127.0.0.1:7700/logs/stream".into()), - args.master_key.as_deref(), + args.common.master_key.as_deref(), None, )?; let meili_client = Client::new( Some("http://127.0.0.1:7700".into()), - args.master_key.as_deref(), - Some(std::time::Duration::from_secs(args.tasks_queue_timeout_secs)), + args.common.master_key.as_deref(), + Some(std::time::Duration::from_secs(args.common.tasks_queue_timeout_secs)), )?; // enter runtime @@ -116,11 +101,11 @@ pub fn run(args: BenchDeriveArgs) -> anyhow::Result<()> { dashboard_client.send_machine_info(&env).await?; let commit_message = build_info.commit_msg.unwrap_or_default().split('\n').next().unwrap(); - let max_workloads = args.workload_file.len(); + let max_workloads = args.common.workload_file.len(); let reason: Option<&str> = args.reason.as_deref(); let invocation_uuid = dashboard_client.create_invocation(build_info.clone(), commit_message, env, max_workloads, reason).await?; - tracing::info!(workload_count = args.workload_file.len(), "handling workload files"); + tracing::info!(workload_count = args.common.workload_file.len(), "handling workload files"); // main task let workload_runs = tokio::spawn( @@ -128,7 +113,7 @@ pub fn run(args: BenchDeriveArgs) -> anyhow::Result<()> { let dashboard_client = dashboard_client.clone(); let mut dashboard_urls = Vec::new(); async move { - for workload_file in args.workload_file.iter() { + for workload_file in args.common.workload_file.iter() { let workload: Workload = serde_json::from_reader( std::fs::File::open(workload_file) .with_context(|| format!("error opening {}", workload_file.display()))?, @@ -147,7 +132,7 @@ pub fn run(args: BenchDeriveArgs) -> anyhow::Result<()> { &logs_client, &meili_client, invocation_uuid, - args.master_key.as_deref(), + args.common.master_key.as_deref(), workload, &args, args.binary_path.as_deref(), diff --git a/crates/xtask/src/common/args.rs b/crates/xtask/src/common/args.rs index 2df9ce287..7f0113dc1 100644 --- a/crates/xtask/src/common/args.rs +++ b/crates/xtask/src/common/args.rs @@ -1,4 +1,5 @@ use clap::Parser; +use std::path::PathBuf; pub fn default_asset_folder() -> String { "./bench/assets/".into() @@ -10,6 +11,17 @@ pub fn default_log_filter() -> String { #[derive(Parser, Debug, Clone)] pub struct CommonArgs { + /// Filename of the workload file, pass multiple filenames + /// to run multiple workloads in the specified order. + /// + /// For benches, each workload run will get its own report file. + #[arg(value_name = "WORKLOAD_FILE", last = false)] + pub workload_file: Vec, + + /// Meilisearch master keys + #[arg(long)] + pub master_key: Option, + /// Directory to store the remote assets. #[arg(long, default_value_t = default_asset_folder())] pub asset_folder: String, @@ -21,4 +33,8 @@ pub struct CommonArgs { /// Authentication bearer for fetching assets #[arg(long)] pub assets_key: Option, + + /// The maximum time in seconds we allow for fetching the task queue before timing out. + #[arg(long, default_value_t = 60)] + pub tasks_queue_timeout_secs: u64, } diff --git a/crates/xtask/src/test/mod.rs b/crates/xtask/src/test/mod.rs index 4a2c4fb52..3ecd2ce1c 100644 --- a/crates/xtask/src/test/mod.rs +++ b/crates/xtask/src/test/mod.rs @@ -1,4 +1,7 @@ -use crate::common::{args::CommonArgs, client::Client, logs::setup_logs}; +use std::time::Duration; + +use crate::common::{args::CommonArgs, client::Client, logs::setup_logs, workload::Workload}; +use anyhow::{bail, Context}; use cargo_metadata::semver::Version; use clap::Parser; @@ -17,14 +20,59 @@ pub struct TestDeriveArgs { } pub fn run(args: TestDeriveArgs) -> anyhow::Result<()> { + let rt = tokio::runtime::Builder::new_current_thread().enable_io().enable_time().build()?; + let _scope = rt.enter(); + + rt.block_on(async { run_inner(args).await })?; + + Ok(()) +} + +async fn run_inner(args: TestDeriveArgs) -> anyhow::Result<()> { setup_logs(&args.common.log_filter)?; // setup clients let assets_client = Client::new( None, args.common.assets_key.as_deref(), - Some(std::time::Duration::from_secs(3600)), // 1h + Some(Duration::from_secs(3600)), // 1h )?; + let meili_client = Client::new( + Some("http://127.0.0.1:7700".into()), + args.common.master_key.as_deref(), + Some(Duration::from_secs(args.common.tasks_queue_timeout_secs)), + )?; + + for workload_file in &args.common.workload_file { + let workload: Workload = serde_json::from_reader( + std::fs::File::open(workload_file) + .with_context(|| format!("error opening {}", workload_file.display()))?, + ) + .with_context(|| format!("error parsing {} as JSON", workload_file.display()))?; + + let Workload::Test(workload) = workload else { + bail!("workload file {} is not a test workload", workload_file.display()); + }; + + match workload.run(&args, &assets_client, &meili_client).await { + Ok(_) => { + println!( + "✅ Workload {} from file {} completed successfully", + workload.name, + workload_file.display() + ); + } + Err(error) => { + println!( + "❌ Workload {} from file {} failed: {error}", + workload.name, + workload_file.display() + ); + return Err(error); + } + } + } + Ok(()) } diff --git a/crates/xtask/src/test/workload.rs b/crates/xtask/src/test/workload.rs index 7a80cdaa2..faabbd479 100644 --- a/crates/xtask/src/test/workload.rs +++ b/crates/xtask/src/test/workload.rs @@ -2,7 +2,14 @@ use cargo_metadata::semver::Version; use serde::Deserialize; use std::collections::BTreeMap; -use crate::common::{assets::Asset, command::Command}; +use crate::{ + common::{ + assets::{fetch_assets, Asset}, + client::Client, + command::{run_commands, Command}, + }, + test::TestDeriveArgs, +}; #[derive(Deserialize)] #[serde(untagged)] @@ -11,6 +18,11 @@ pub enum CommandOrUpgrade { Upgrade { upgrade: Version }, } +enum CommandOrUpgradeVec { + Commands(Vec), + Upgrade(Version), +} + /// A test workload. /// Not to be confused with [a bench workload](crate::bench::workload::Workload). #[derive(Deserialize)] @@ -19,3 +31,45 @@ pub struct TestWorkload { pub assets: BTreeMap, pub commands: Vec, } + +impl TestWorkload { + pub async fn run( + &self, + args: &TestDeriveArgs, + assets_client: &Client, + meili_client: &Client, + ) -> anyhow::Result<()> { + // Fetch assets + fetch_assets(assets_client, &self.assets, &args.common.asset_folder).await?; + + // Group commands between upgrades + let mut commands_or_upgrade = Vec::new(); + let mut current_commands = Vec::new(); + for command_or_upgrade in &self.commands { + match command_or_upgrade { + CommandOrUpgrade::Command(command) => current_commands.push(command.clone()), + CommandOrUpgrade::Upgrade { upgrade } => { + if !current_commands.is_empty() { + commands_or_upgrade.push(CommandOrUpgradeVec::Commands(current_commands)); + current_commands = Vec::new(); + } + commands_or_upgrade.push(CommandOrUpgradeVec::Upgrade(upgrade.clone())); + } + } + } + + for command_or_upgrade in commands_or_upgrade { + match command_or_upgrade { + CommandOrUpgradeVec::Commands(commands) => { + run_commands(meili_client, &commands, &self.assets, &args.common.asset_folder) + .await?; + } + CommandOrUpgradeVec::Upgrade(version) => { + todo!() + } + } + } + + Ok(()) + } +}