diff --git a/crates/xtask/src/bench/meili_process.rs b/crates/xtask/src/bench/meili_process.rs index 14c7e2e47..dc3d98647 100644 --- a/crates/xtask/src/bench/meili_process.rs +++ b/crates/xtask/src/bench/meili_process.rs @@ -99,7 +99,7 @@ async fn wait_for_health( asset_folder: &str, ) -> anyhow::Result<()> { for i in 0..100 { - let res = run_command(client.clone(), health_command(), assets, asset_folder).await; + let res = run_command(client, &health_command(), assets, asset_folder, false).await; if res.is_ok() { // check that this is actually the current Meilisearch instance that answered us if let Some(exit_code) = diff --git a/crates/xtask/src/bench/mod.rs b/crates/xtask/src/bench/mod.rs index db31ae255..da8fc4afd 100644 --- a/crates/xtask/src/bench/mod.rs +++ b/crates/xtask/src/bench/mod.rs @@ -6,7 +6,7 @@ mod workload; use crate::common::args::CommonArgs; use crate::common::logs::setup_logs; use crate::common::workload::Workload; -use std::path::PathBuf; +use std::{path::PathBuf, sync::Arc}; use anyhow::{bail, Context}; use clap::Parser; @@ -89,11 +89,11 @@ pub fn run(args: BenchDeriveArgs) -> anyhow::Result<()> { None, )?; - let meili_client = Client::new( + let meili_client = Arc::new(Client::new( Some("http://127.0.0.1:7700".into()), args.common.master_key.as_deref(), Some(std::time::Duration::from_secs(args.common.tasks_queue_timeout_secs)), - )?; + )?); // enter runtime diff --git a/crates/xtask/src/bench/workload.rs b/crates/xtask/src/bench/workload.rs index 9b9d3033c..c88b0c4e3 100644 --- a/crates/xtask/src/bench/workload.rs +++ b/crates/xtask/src/bench/workload.rs @@ -2,10 +2,11 @@ use std::collections::BTreeMap; use std::fs::File; use std::io::{Seek as _, Write as _}; use std::path::Path; +use std::sync::Arc; use anyhow::{bail, Context as _}; use futures_util::TryStreamExt as _; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use serde_json::json; use tokio::task::JoinHandle; use uuid::Uuid; @@ -19,7 +20,7 @@ use crate::common::command::{run_commands, Command}; /// A bench workload. /// Not to be confused with [a test workload](crate::test::workload::Workload). -#[derive(Deserialize)] +#[derive(Serialize, Deserialize)] pub struct BenchWorkload { pub name: String, pub run_count: u16, @@ -35,7 +36,7 @@ pub struct BenchWorkload { async fn run_workload_commands( dashboard_client: &DashboardClient, logs_client: &Client, - meili_client: &Client, + meili_client: &Arc, workload_uuid: Uuid, workload: &BenchWorkload, args: &BenchDeriveArgs, @@ -43,9 +44,10 @@ async fn run_workload_commands( ) -> anyhow::Result>> { let report_folder = &args.report_folder; let workload_name = &workload.name; + let assets = Arc::new(workload.assets.clone()); + let asset_folder = args.common.asset_folder.clone().leak(); - run_commands(meili_client, &workload.precommands, &workload.assets, &args.common.asset_folder) - .await?; + run_commands(meili_client, &workload.precommands, &assets, asset_folder, false).await?; std::fs::create_dir_all(report_folder) .with_context(|| format!("could not create report directory at {report_folder}"))?; @@ -55,8 +57,7 @@ async fn run_workload_commands( let report_handle = start_report(logs_client, trace_filename, &workload.target).await?; - run_commands(meili_client, &workload.commands, &workload.assets, &args.common.asset_folder) - .await?; + run_commands(meili_client, &workload.commands, &assets, asset_folder, false).await?; let processor = stop_report(dashboard_client, logs_client, workload_uuid, report_filename, report_handle) @@ -71,7 +72,7 @@ pub async fn execute( assets_client: &Client, dashboard_client: &DashboardClient, logs_client: &Client, - meili_client: &Client, + meili_client: &Arc, invocation_uuid: Uuid, master_key: Option<&str>, workload: BenchWorkload, @@ -119,7 +120,7 @@ pub async fn execute( async fn execute_run( dashboard_client: &DashboardClient, logs_client: &Client, - meili_client: &Client, + meili_client: &Arc, workload_uuid: Uuid, master_key: Option<&str>, workload: &BenchWorkload, diff --git a/crates/xtask/src/common/assets.rs b/crates/xtask/src/common/assets.rs index 22c2e0992..31eeb70a9 100644 --- a/crates/xtask/src/common/assets.rs +++ b/crates/xtask/src/common/assets.rs @@ -3,21 +3,22 @@ use std::io::{Read as _, Seek as _, Write as _}; use anyhow::{bail, Context}; use futures_util::TryStreamExt as _; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use sha2::Digest; use super::client::Client; -#[derive(Deserialize, Clone)] +#[derive(Serialize, Deserialize, Clone)] pub struct Asset { pub local_location: Option, pub remote_location: Option, - #[serde(default)] + #[serde(default, skip_serializing_if = "AssetFormat::is_default")] pub format: AssetFormat, + #[serde(default, skip_serializing_if = "Option::is_none")] pub sha256: Option, } -#[derive(Deserialize, Default, Copy, Clone)] +#[derive(Serialize, Deserialize, Default, Copy, Clone)] pub enum AssetFormat { #[default] Auto, @@ -27,6 +28,10 @@ pub enum AssetFormat { } impl AssetFormat { + fn is_default(&self) -> bool { + matches!(self, AssetFormat::Auto) + } + pub fn to_content_type(self, filename: &str) -> &'static str { match self { AssetFormat::Auto => Self::auto_detect(filename).to_content_type(filename), diff --git a/crates/xtask/src/common/client.rs b/crates/xtask/src/common/client.rs index 1c2b743af..453da5c31 100644 --- a/crates/xtask/src/common/client.rs +++ b/crates/xtask/src/common/client.rs @@ -1,5 +1,5 @@ use anyhow::Context; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; #[derive(Debug, Clone)] pub struct Client { @@ -61,7 +61,7 @@ impl Client { } } -#[derive(Debug, Clone, Copy, Deserialize)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] #[serde(rename_all = "SCREAMING_SNAKE_CASE")] pub enum Method { Get, diff --git a/crates/xtask/src/common/command.rs b/crates/xtask/src/common/command.rs index a99f3b75e..f38d89280 100644 --- a/crates/xtask/src/common/command.rs +++ b/crates/xtask/src/common/command.rs @@ -1,24 +1,32 @@ use std::collections::BTreeMap; use std::fmt::Display; use std::io::Read as _; +use std::sync::Arc; use anyhow::{bail, Context as _}; -use serde::Deserialize; +use reqwest::StatusCode; +use serde::{Deserialize, Serialize}; +use serde_json::Value; use crate::common::assets::{fetch_asset, Asset}; use crate::common::client::{Client, Method}; -#[derive(Clone, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] pub struct Command { pub route: String, pub method: Method, #[serde(default)] pub body: Body, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub expected_status: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub expected_response: Option, #[serde(default)] synchronous: SyncMode, } -#[derive(Default, Clone, Deserialize)] +#[derive(Default, Clone, Serialize, Deserialize)] #[serde(untagged)] pub enum Body { Inline { @@ -63,7 +71,7 @@ impl Display for Command { } } -#[derive(Default, Debug, Clone, Copy, Deserialize)] +#[derive(Default, Debug, Clone, Copy, Serialize, Deserialize)] enum SyncMode { DontWait, #[default] @@ -72,41 +80,42 @@ enum SyncMode { } async fn run_batch( - client: &Client, - batch: &[Command], - assets: &BTreeMap, - asset_folder: &str, -) -> anyhow::Result<()> { - let [.., last] = batch else { return Ok(()) }; + client: &Arc, + batch: Vec, + assets: &Arc>, + asset_folder: &'static str, + return_response: bool, +) -> anyhow::Result> { + let [.., last] = batch.as_slice() else { return Ok(Vec::new()) }; let sync = last.synchronous; + let batch_len = batch.len(); - let mut tasks = tokio::task::JoinSet::new(); - - for command in batch { - // FIXME: you probably don't want to copy assets everytime here - tasks.spawn({ - let client = client.clone(); - let command = command.clone(); - let assets = assets.clone(); - let asset_folder = asset_folder.to_owned(); - - async move { run(client, command, &assets, &asset_folder).await } - }); + let mut tasks = Vec::with_capacity(batch.len()); + for batch in batch { + let client2 = Arc::clone(&client); + let assets2 = Arc::clone(&assets); + tasks.push(tokio::spawn(async move { + run(&client2, &batch, &assets2, asset_folder, return_response).await + })); } - while let Some(result) = tasks.join_next().await { - result - .context("panicked while executing command")? - .context("error while executing command")?; + let mut outputs = Vec::with_capacity(if return_response { batch_len } else { 0 }); + for task in tasks { + let output = task.await.context("task panicked")??; + if let Some(output) = output { + if return_response { + outputs.push(output); + } + } } match sync { SyncMode::DontWait => {} SyncMode::WaitForResponse => {} - SyncMode::WaitForTask => wait_for_tasks(client).await?, + SyncMode::WaitForTask => wait_for_tasks(&client).await?, } - Ok(()) + Ok(outputs) } async fn wait_for_tasks(client: &Client) -> anyhow::Result<()> { @@ -150,13 +159,16 @@ async fn wait_for_tasks(client: &Client) -> anyhow::Result<()> { #[tracing::instrument(skip(client, command, assets, asset_folder), fields(command = %command))] pub async fn run( - client: Client, - mut command: Command, + client: &Client, + command: &Command, assets: &BTreeMap, asset_folder: &str, -) -> anyhow::Result<()> { + return_value: bool, +) -> anyhow::Result> { // memtake the body here to leave an empty body in its place, so that command is not partially moved-out - let body = std::mem::take(&mut command.body) + let body = command + .body + .clone() .get(assets, asset_folder) .with_context(|| format!("while getting body for command {command}"))?; @@ -172,7 +184,17 @@ pub async fn run( request.send().await.with_context(|| format!("error sending command: {}", command))?; let code = response.status(); - if code.is_client_error() { + + if let Some(expected_status) = command.expected_status { + if code.as_u16() != expected_status { + let response = response + .text() + .await + .context("could not read response body as text") + .context("reading response body when checking expected status")?; + bail!("unexpected status code: got {}, expected {expected_status}, response body: '{response}'", code.as_u16()); + } + } else if code.is_client_error() { tracing::error!(%command, %code, "error in workload file"); let response: serde_json::Value = response .json() @@ -190,22 +212,44 @@ pub async fn run( bail!("server error: server responded with error code {code} and '{response}'") } - Ok(()) + if return_value { + let response: serde_json::Value = response + .json() + .await + .context("could not deserialize response as JSON") + .context("parsing response when recording expected response")?; + return Ok(Some((response, code))); + } else if let Some(expected_response) = &command.expected_response { + let response: serde_json::Value = response + .json() + .await + .context("could not deserialize response as JSON") + .context("parsing response when checking expected response")?; + if &response != expected_response { + bail!("unexpected response: got '{response}', expected '{expected_response}'"); + } + } + + Ok(None) } pub async fn run_commands( - client: &Client, + client: &Arc, commands: &[Command], - assets: &BTreeMap, - asset_folder: &str, -) -> anyhow::Result<()> { + assets: &Arc>, + asset_folder: &'static str, + return_response: bool, +) -> anyhow::Result> { + let mut responses = Vec::new(); for batch in commands.split_inclusive(|command| !matches!(command.synchronous, SyncMode::DontWait)) { - run_batch(client, batch, assets, asset_folder).await?; + let mut new_responses = + run_batch(client, batch.to_vec(), assets, asset_folder, return_response).await?; + responses.append(&mut new_responses); } - Ok(()) + Ok(responses) } pub fn health_command() -> Command { @@ -214,5 +258,7 @@ pub fn health_command() -> Command { method: crate::common::client::Method::Get, body: Default::default(), synchronous: SyncMode::WaitForResponse, + expected_status: None, + expected_response: None, } } diff --git a/crates/xtask/src/common/workload.rs b/crates/xtask/src/common/workload.rs index 61fa292c1..94d98ec12 100644 --- a/crates/xtask/src/common/workload.rs +++ b/crates/xtask/src/common/workload.rs @@ -1,8 +1,8 @@ -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use crate::{bench::BenchWorkload, test::TestWorkload}; -#[derive(Deserialize)] +#[derive(Serialize, Deserialize)] #[serde(tag = "type")] #[serde(rename_all = "camelCase")] pub enum Workload { diff --git a/crates/xtask/src/test/mod.rs b/crates/xtask/src/test/mod.rs index c6f009f01..21b2f7d88 100644 --- a/crates/xtask/src/test/mod.rs +++ b/crates/xtask/src/test/mod.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use crate::common::{args::CommonArgs, client::Client, logs::setup_logs, workload::Workload}; use anyhow::{bail, Context}; @@ -15,6 +15,14 @@ pub struct TestDeriveArgs { /// Common arguments shared with other commands #[command(flatten)] common: CommonArgs, + + /// Enables workloads to be rewritten in place to update expected responses. + #[arg(short, long, default_value_t = false)] + pub update_responses: bool, + + /// Enables workloads to be rewritten in place to add missing expected responses. + #[arg(short, long, default_value_t = false)] + pub add_missing_responses: bool, } pub fn run(args: TestDeriveArgs) -> anyhow::Result<()> { @@ -30,18 +38,19 @@ async fn run_inner(args: TestDeriveArgs) -> anyhow::Result<()> { setup_logs(&args.common.log_filter)?; // setup clients - let assets_client = Client::new( + let assets_client = Arc::new(Client::new( None, args.common.assets_key.as_deref(), Some(Duration::from_secs(3600)), // 1h - )?; + )?); - let meili_client = Client::new( + let meili_client = Arc::new(Client::new( Some("http://127.0.0.1:7700".into()), args.common.master_key.as_deref(), Some(Duration::from_secs(args.common.tasks_queue_timeout_secs)), - )?; + )?); + let asset_folder = args.common.asset_folder.clone().leak(); for workload_file in &args.common.workload_file { let workload: Workload = serde_json::from_reader( std::fs::File::open(workload_file) @@ -49,16 +58,18 @@ async fn run_inner(args: TestDeriveArgs) -> anyhow::Result<()> { ) .with_context(|| format!("error parsing {} as JSON", workload_file.display()))?; - let Workload::Test(mut workload) = workload else { + let Workload::Test(workload) = workload else { bail!("workload file {} is not a test workload", workload_file.display()); }; - match workload.run(&args, &assets_client, &meili_client).await { + let name = workload.name.clone(); + match workload.run(&args, &assets_client, &meili_client, asset_folder).await { Ok(_) => { - println!("✅ Workload {} completed successfully", workload.name,); + println!("✅ Workload {name} completed successfully"); } Err(error) => { - println!("❌ Workload {} failed: {error}", workload.name,); + println!("❌ Workload {name} failed: {error}"); + println!("Is this intentional? If so, rerun with --update-responses to update the workload files."); return Err(error); } } diff --git a/crates/xtask/src/test/workload.rs b/crates/xtask/src/test/workload.rs index 528ff645f..72cd2ddfd 100644 --- a/crates/xtask/src/test/workload.rs +++ b/crates/xtask/src/test/workload.rs @@ -1,12 +1,14 @@ +use anyhow::Context; use cargo_metadata::semver::Version; -use serde::Deserialize; -use std::collections::BTreeMap; +use serde::{Deserialize, Serialize}; +use std::{collections::BTreeMap, sync::Arc}; use crate::{ common::{ assets::{fetch_assets, Asset}, client::Client, command::{run_commands, Command}, + workload::Workload, }, test::{versions::expand_assets_with_versions, TestDeriveArgs}, }; @@ -33,21 +35,33 @@ impl<'a> Deserialize<'a> for VersionOrLatest { } } -#[derive(Deserialize)] +impl Serialize for VersionOrLatest { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + match self { + VersionOrLatest::Version(v) => serializer.serialize_str(&v.to_string()), + VersionOrLatest::Latest => serializer.serialize_str("latest"), + } + } +} + +#[derive(Serialize, Deserialize)] #[serde(untagged)] pub enum CommandOrUpgrade { Command(Command), Upgrade { upgrade: VersionOrLatest }, } -enum CommandOrUpgradeVec { - Commands(Vec), +enum CommandOrUpgradeVec<'a> { + Commands(Vec<&'a mut Command>), Upgrade(VersionOrLatest), } /// A test workload. /// Not to be confused with [a bench workload](crate::bench::workload::Workload). -#[derive(Deserialize)] +#[derive(Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct TestWorkload { pub name: String, @@ -58,18 +72,19 @@ pub struct TestWorkload { impl TestWorkload { pub async fn run( - &mut self, + mut self, args: &TestDeriveArgs, assets_client: &Client, - meili_client: &Client, + meili_client: &Arc, + asset_folder: &'static str, ) -> anyhow::Result<()> { // Group commands between upgrades let mut commands_or_upgrade = Vec::new(); let mut current_commands = Vec::new(); let mut all_versions = vec![self.initial_version.clone()]; - for command_or_upgrade in &self.commands { + for command_or_upgrade in &mut self.commands { match command_or_upgrade { - CommandOrUpgrade::Command(command) => current_commands.push(command.clone()), + CommandOrUpgrade::Command(command) => current_commands.push(command), CommandOrUpgrade::Upgrade { upgrade } => { if !current_commands.is_empty() { commands_or_upgrade.push(CommandOrUpgradeVec::Commands(current_commands)); @@ -82,16 +97,40 @@ impl TestWorkload { } } } + if !current_commands.is_empty() { + commands_or_upgrade.push(CommandOrUpgradeVec::Commands(current_commands)); + } // Fetch assets expand_assets_with_versions(&mut self.assets, &all_versions).await?; fetch_assets(assets_client, &self.assets, &args.common.asset_folder).await?; + let assets = Arc::new(self.assets.clone()); + let return_responses = dbg!(args.add_missing_responses || args.update_responses); for command_or_upgrade in commands_or_upgrade { match command_or_upgrade { CommandOrUpgradeVec::Commands(commands) => { - run_commands(meili_client, &commands, &self.assets, &args.common.asset_folder) - .await?; + let cloned: Vec<_> = commands.iter().map(|c| (*c).clone()).collect(); + let responses = run_commands( + meili_client, + &cloned, + &assets, + asset_folder, + return_responses, + ) + .await?; + if return_responses { + assert_eq!(responses.len(), cloned.len()); + for (command, (response, status)) in commands.into_iter().zip(responses) { + if args.update_responses + || (dbg!(args.add_missing_responses) + && dbg!(command.expected_response.is_none())) + { + command.expected_response = Some(response); + command.expected_status = Some(status.as_u16()); + } + } + } } CommandOrUpgradeVec::Upgrade(version) => { todo!() @@ -99,6 +138,23 @@ impl TestWorkload { } } + // Write back the workload if needed + if return_responses { + // Filter out the assets we added for the versions + self.assets.retain(|_, asset| { + asset.local_location.as_ref().is_none_or(|a| !a.starts_with("meilisearch-")) + }); + + let workload = Workload::Test(self); + let file = std::fs::File::create(&args.common.workload_file[0]).with_context(|| { + format!("could not open {}", args.common.workload_file[0].display()) + })?; + serde_json::to_writer_pretty(file, &workload).with_context(|| { + format!("could not write to {}", args.common.workload_file[0].display()) + })?; + tracing::info!("Updated workload file {}", args.common.workload_file[0].display()); + } + Ok(()) } } diff --git a/workloads/tests/movies.json b/workloads/tests/movies.json index f437f7bd5..90e10fa1d 100644 --- a/workloads/tests/movies.json +++ b/workloads/tests/movies.json @@ -1,6 +1,6 @@ { - "name": "movies", "type": "test", + "name": "movies", "initialVersion": "1.12.0", "assets": { "movies.json": { @@ -9,5 +9,34 @@ "sha256": "5b6e4cb660bc20327776e8a33ea197b43d9ec84856710ead1cc87ab24df77de1" } }, - "commands": [] -} + "commands": [ + { + "route": "indexes/movies/settings", + "method": "PATCH", + "body": { + "inline": { + "filterableAttributes": [ + "genres", + "release_date" + ], + "searchableAttributes": [ + "title", + "overview" + ], + "sortableAttributes": [ + "release_date" + ] + } + }, + "synchronous": "DontWait" + }, + { + "route": "indexes/movies/documents", + "method": "POST", + "body": { + "asset": "movies.json" + }, + "synchronous": "WaitForTask" + } + ] +} \ No newline at end of file