Make commands common

This commit is contained in:
Mubelotix
2025-08-25 12:07:27 +02:00
parent 3a2ec5f576
commit 72b6b73a91
6 changed files with 36 additions and 18 deletions

View File

@ -1,194 +0,0 @@
use std::collections::BTreeMap;
use std::fmt::Display;
use std::io::Read as _;
use anyhow::{bail, Context as _};
use serde::Deserialize;
use crate::common::assets::{fetch_asset, Asset};
use crate::common::client::{Client, Method};
#[derive(Clone, Deserialize)]
pub struct Command {
pub route: String,
pub method: Method,
#[serde(default)]
pub body: Body,
#[serde(default)]
pub synchronous: SyncMode,
}
#[derive(Default, Clone, Deserialize)]
#[serde(untagged)]
pub enum Body {
Inline {
inline: serde_json::Value,
},
Asset {
asset: String,
},
#[default]
Empty,
}
impl Body {
pub fn get(
self,
assets: &BTreeMap<String, Asset>,
asset_folder: &str,
) -> anyhow::Result<Option<(Vec<u8>, &'static str)>> {
Ok(match self {
Body::Inline { inline: body } => Some((
serde_json::to_vec(&body)
.context("serializing to bytes")
.context("while getting inline body")?,
"application/json",
)),
Body::Asset { asset: name } => Some({
let context = || format!("while getting body from asset '{name}'");
let (mut file, format) =
fetch_asset(&name, assets, asset_folder).with_context(context)?;
let mut buf = Vec::new();
file.read_to_end(&mut buf).with_context(context)?;
(buf, format.to_content_type(&name))
}),
Body::Empty => None,
})
}
}
impl Display for Command {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?} {} ({:?})", self.method, self.route, self.synchronous)
}
}
#[derive(Default, Debug, Clone, Copy, Deserialize)]
pub enum SyncMode {
DontWait,
#[default]
WaitForResponse,
WaitForTask,
}
pub async fn run_batch(
client: &Client,
batch: &[Command],
assets: &BTreeMap<String, Asset>,
asset_folder: &str,
) -> anyhow::Result<()> {
let [.., last] = batch else { return Ok(()) };
let sync = last.synchronous;
let mut tasks = tokio::task::JoinSet::new();
for command in batch {
// FIXME: you probably don't want to copy assets everytime here
tasks.spawn({
let client = client.clone();
let command = command.clone();
let assets = assets.clone();
let asset_folder = asset_folder.to_owned();
async move { run(client, command, &assets, &asset_folder).await }
});
}
while let Some(result) = tasks.join_next().await {
result
.context("panicked while executing command")?
.context("error while executing command")?;
}
match sync {
SyncMode::DontWait => {}
SyncMode::WaitForResponse => {}
SyncMode::WaitForTask => wait_for_tasks(client).await?,
}
Ok(())
}
async fn wait_for_tasks(client: &Client) -> anyhow::Result<()> {
loop {
let response = client
.get("tasks?statuses=enqueued,processing")
.send()
.await
.context("could not wait for tasks")?;
let response: serde_json::Value = response
.json()
.await
.context("could not deserialize response to JSON")
.context("could not wait for tasks")?;
match response.get("total") {
Some(serde_json::Value::Number(number)) => {
let number = number.as_u64().with_context(|| {
format!("waiting for tasks: could not parse 'total' as integer, got {}", number)
})?;
if number == 0 {
break;
} else {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
continue;
}
}
Some(thing_else) => {
bail!(format!(
"waiting for tasks: could not parse 'total' as a number, got '{thing_else}'"
))
}
None => {
bail!(format!(
"waiting for tasks: expected response to contain 'total', got '{response}'"
))
}
}
}
Ok(())
}
#[tracing::instrument(skip(client, command, assets, asset_folder), fields(command = %command))]
pub async fn run(
client: Client,
mut command: Command,
assets: &BTreeMap<String, Asset>,
asset_folder: &str,
) -> anyhow::Result<()> {
// memtake the body here to leave an empty body in its place, so that command is not partially moved-out
let body = std::mem::take(&mut command.body)
.get(assets, asset_folder)
.with_context(|| format!("while getting body for command {command}"))?;
let request = client.request(command.method.into(), &command.route);
let request = if let Some((body, content_type)) = body {
request.body(body).header(reqwest::header::CONTENT_TYPE, content_type)
} else {
request
};
let response =
request.send().await.with_context(|| format!("error sending command: {}", command))?;
let code = response.status();
if code.is_client_error() {
tracing::error!(%command, %code, "error in workload file");
let response: serde_json::Value = response
.json()
.await
.context("could not deserialize response as JSON")
.context("parsing error in workload file when sending command")?;
bail!("error in workload file: server responded with error code {code} and '{response}'")
} else if code.is_server_error() {
tracing::error!(%command, %code, "server error");
let response: serde_json::Value = response
.json()
.await
.context("could not deserialize response as JSON")
.context("parsing server error when sending command")?;
bail!("server error: server responded with error code {code} and '{response}'")
}
Ok(())
}

View File

@ -2,17 +2,18 @@ use std::collections::BTreeMap;
use std::time::Duration;
use anyhow::{bail, Context as _};
use tokio::process::Command;
use tokio::process::Command as TokioCommand;
use tokio::time;
use super::workload::BenchWorkload;
use crate::common::assets::Asset;
use crate::common::client::Client;
use crate::common::command::{run as run_command, Command, SyncMode};
pub async fn kill(mut meilisearch: tokio::process::Child) {
let Some(id) = meilisearch.id() else { return };
match Command::new("kill").args(["--signal=TERM", &id.to_string()]).spawn() {
match TokioCommand::new("kill").args(["--signal=TERM", &id.to_string()]).spawn() {
Ok(mut cmd) => {
let Err(error) = cmd.wait().await else { return };
tracing::warn!(
@ -50,7 +51,7 @@ pub async fn kill(mut meilisearch: tokio::process::Child) {
#[tracing::instrument]
pub async fn build() -> anyhow::Result<()> {
let mut command = Command::new("cargo");
let mut command = TokioCommand::new("cargo");
command.arg("build").arg("--release").arg("-p").arg("meilisearch");
command.kill_on_drop(true);
@ -70,7 +71,7 @@ pub async fn start(
master_key: Option<&str>,
workload: &BenchWorkload,
asset_folder: &str,
mut command: Command,
mut command: TokioCommand,
) -> anyhow::Result<tokio::process::Child> {
command.arg("--db-path").arg("./_xtask_benchmark.ms");
if let Some(master_key) = master_key {
@ -98,7 +99,7 @@ async fn wait_for_health(
asset_folder: &str,
) -> anyhow::Result<()> {
for i in 0..100 {
let res = super::command::run(client.clone(), health_command(), assets, asset_folder).await;
let res = run_command(client.clone(), health_command(), assets, asset_folder).await;
if res.is_ok() {
// check that this is actually the current Meilisearch instance that answered us
if let Some(exit_code) =
@ -122,12 +123,12 @@ async fn wait_for_health(
bail!("meilisearch is not responding")
}
fn health_command() -> super::command::Command {
super::command::Command {
fn health_command() -> Command {
Command {
route: "/health".into(),
method: crate::common::client::Method::Get,
body: Default::default(),
synchronous: super::command::SyncMode::WaitForResponse,
synchronous: SyncMode::WaitForResponse,
}
}

View File

@ -1,4 +1,3 @@
mod command;
mod dashboard;
mod env_info;
mod meili_process;
@ -138,7 +137,7 @@ pub fn run(args: BenchDeriveArgs) -> anyhow::Result<()> {
.with_context(|| format!("error opening {}", workload_file.display()))?,
)
.with_context(|| format!("error parsing {} as JSON", workload_file.display()))?;
let Workload::Bench(workload) = workload else {
bail!("workload file {} is not a bench workload", workload_file.display());
};

View File

@ -10,12 +10,12 @@ use serde_json::json;
use tokio::task::JoinHandle;
use uuid::Uuid;
use super::command::SyncMode;
use super::dashboard::DashboardClient;
use super::BenchDeriveArgs;
use crate::bench::meili_process;
use crate::common::assets::{self, Asset};
use crate::common::client::Client;
use crate::common::command::{run_batch as run_command_batch, Command, SyncMode};
/// A bench workload.
/// Not to be confused with [a test workload](crate::test::workload::Workload).
@ -28,8 +28,8 @@ pub struct BenchWorkload {
#[serde(default)]
pub target: String,
#[serde(default)]
pub precommands: Vec<super::command::Command>,
pub commands: Vec<super::command::Command>,
pub precommands: Vec<Command>,
pub commands: Vec<Command>,
}
async fn run_commands(
@ -49,8 +49,7 @@ async fn run_commands(
.as_slice()
.split_inclusive(|command| !matches!(command.synchronous, SyncMode::DontWait))
{
super::command::run_batch(meili_client, batch, &workload.assets, &args.common.asset_folder)
.await?;
run_command_batch(meili_client, batch, &workload.assets, &args.common.asset_folder).await?;
}
std::fs::create_dir_all(report_folder)
@ -66,8 +65,7 @@ async fn run_commands(
.as_slice()
.split_inclusive(|command| !matches!(command.synchronous, SyncMode::DontWait))
{
super::command::run_batch(meili_client, batch, &workload.assets, &args.common.asset_folder)
.await?;
run_command_batch(meili_client, batch, &workload.assets, &args.common.asset_folder).await?;
}
let processor =