mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-26 13:36:27 +00:00 
			
		
		
		
	Allow dry runs for benchmarks where reports are generated but not sent to the dashboard
This commit is contained in:
		| @@ -11,61 +11,30 @@ use super::client::Client; | |||||||
| use super::env_info; | use super::env_info; | ||||||
| use super::workload::Workload; | use super::workload::Workload; | ||||||
|  |  | ||||||
| pub async fn cancel_on_ctrl_c( | #[derive(Debug, Clone)] | ||||||
|     invocation_uuid: Uuid, | pub enum DashboardClient { | ||||||
|     dashboard_client: Client, |     Client(Client), | ||||||
|     abort_handle: AbortHandle, |     Dry, | ||||||
| ) { |  | ||||||
|     tracing::info!("press Ctrl-C to cancel the invocation"); |  | ||||||
|     match ctrl_c().await { |  | ||||||
|         Ok(()) => { |  | ||||||
|             tracing::info!(%invocation_uuid, "received Ctrl-C, cancelling invocation"); |  | ||||||
|             mark_as_failed(dashboard_client, invocation_uuid, None).await; |  | ||||||
|             abort_handle.abort(); |  | ||||||
|         } |  | ||||||
|         Err(error) => tracing::warn!( |  | ||||||
|             error = &error as &dyn std::error::Error, |  | ||||||
|             "failed to listen to Ctrl-C signal, invocation won't be canceled on Ctrl-C" |  | ||||||
|         ), |  | ||||||
|     } |  | ||||||
| } | } | ||||||
|  |  | ||||||
| pub async fn mark_as_failed( | impl DashboardClient { | ||||||
|     dashboard_client: Client, |     pub fn new(dashboard_url: &str, api_key: Option<&str>) -> anyhow::Result<Self> { | ||||||
|     invocation_uuid: Uuid, |         let dashboard_client = Client::new( | ||||||
|     failure_reason: Option<String>, |             Some(format!("{}/api/v1", dashboard_url)), | ||||||
| ) { |             api_key, | ||||||
|     let response = dashboard_client |             Some(std::time::Duration::from_secs(60)), | ||||||
|         .post("cancel-invocation") |         )?; | ||||||
|         .json(&json!({ |  | ||||||
|             "invocation_uuid": invocation_uuid, |  | ||||||
|             "failure_reason": failure_reason, |  | ||||||
|         })) |  | ||||||
|         .send() |  | ||||||
|         .await; |  | ||||||
|     let response = match response { |  | ||||||
|         Ok(response) => response, |  | ||||||
|         Err(response_error) => { |  | ||||||
|             tracing::error!(error = &response_error as &dyn std::error::Error, %invocation_uuid, "could not mark invocation as failed"); |  | ||||||
|             return; |  | ||||||
|         } |  | ||||||
|     }; |  | ||||||
|  |  | ||||||
|     if !response.status().is_success() { |         Ok(Self::Client(dashboard_client)) | ||||||
|         tracing::error!( |  | ||||||
|             %invocation_uuid, |  | ||||||
|             "could not mark invocation as failed: {}", |  | ||||||
|             response.text().await.unwrap() |  | ||||||
|         ); |  | ||||||
|         return; |  | ||||||
|     } |     } | ||||||
|     tracing::warn!(%invocation_uuid, "marked invocation as failed or canceled"); |  | ||||||
| } |  | ||||||
|  |  | ||||||
| pub async fn send_machine_info( |     pub fn new_dry() -> Self { | ||||||
|     dashboard_client: &Client, |         Self::Dry | ||||||
|     env: &env_info::Environment, |     } | ||||||
| ) -> anyhow::Result<()> { |  | ||||||
|  |     pub async fn send_machine_info(&self, env: &env_info::Environment) -> anyhow::Result<()> { | ||||||
|  |         let Self::Client(dashboard_client) = self else { return Ok(()) }; | ||||||
|  |  | ||||||
|         let response = dashboard_client |         let response = dashboard_client | ||||||
|             .put("machine") |             .put("machine") | ||||||
|             .json(&json!({"hostname": env.hostname})) |             .json(&json!({"hostname": env.hostname})) | ||||||
| @@ -80,16 +49,18 @@ pub async fn send_machine_info( | |||||||
|             ); |             ); | ||||||
|         } |         } | ||||||
|         Ok(()) |         Ok(()) | ||||||
| } |     } | ||||||
|  |  | ||||||
| pub async fn create_invocation( |     pub async fn create_invocation( | ||||||
|     dashboard_client: &Client, |         &self, | ||||||
|         build_info: build_info::BuildInfo, |         build_info: build_info::BuildInfo, | ||||||
|         commit_message: &str, |         commit_message: &str, | ||||||
|         env: env_info::Environment, |         env: env_info::Environment, | ||||||
|         max_workloads: usize, |         max_workloads: usize, | ||||||
|         reason: Option<&str>, |         reason: Option<&str>, | ||||||
| ) -> anyhow::Result<Uuid> { |     ) -> anyhow::Result<Uuid> { | ||||||
|  |         let Self::Client(dashboard_client) = self else { return Ok(Uuid::now_v7()) }; | ||||||
|  |  | ||||||
|         let response = dashboard_client |         let response = dashboard_client | ||||||
|             .put("invocation") |             .put("invocation") | ||||||
|             .json(&json!({ |             .json(&json!({ | ||||||
| @@ -116,13 +87,15 @@ pub async fn create_invocation( | |||||||
|         let invocation_uuid: Uuid = |         let invocation_uuid: Uuid = | ||||||
|             response.json().await.context("could not deserialize invocation response as JSON")?; |             response.json().await.context("could not deserialize invocation response as JSON")?; | ||||||
|         Ok(invocation_uuid) |         Ok(invocation_uuid) | ||||||
| } |     } | ||||||
|  |  | ||||||
| pub async fn create_workload( |     pub async fn create_workload( | ||||||
|     dashboard_client: &Client, |         &self, | ||||||
|         invocation_uuid: Uuid, |         invocation_uuid: Uuid, | ||||||
|         workload: &Workload, |         workload: &Workload, | ||||||
| ) -> anyhow::Result<Uuid> { |     ) -> anyhow::Result<Uuid> { | ||||||
|  |         let Self::Client(dashboard_client) = self else { return Ok(Uuid::now_v7()) }; | ||||||
|  |  | ||||||
|         let response = dashboard_client |         let response = dashboard_client | ||||||
|             .put("workload") |             .put("workload") | ||||||
|             .json(&json!({ |             .json(&json!({ | ||||||
| @@ -141,13 +114,15 @@ pub async fn create_workload( | |||||||
|         let workload_uuid: Uuid = |         let workload_uuid: Uuid = | ||||||
|             response.json().await.context("could not deserialize JSON as UUID")?; |             response.json().await.context("could not deserialize JSON as UUID")?; | ||||||
|         Ok(workload_uuid) |         Ok(workload_uuid) | ||||||
| } |     } | ||||||
|  |  | ||||||
| pub async fn create_run( |     pub async fn create_run( | ||||||
|     dashboard_client: Client, |         &self, | ||||||
|         workload_uuid: Uuid, |         workload_uuid: Uuid, | ||||||
|         report: &BTreeMap<String, CallStats>, |         report: &BTreeMap<String, CallStats>, | ||||||
| ) -> anyhow::Result<()> { |     ) -> anyhow::Result<()> { | ||||||
|  |         let Self::Client(dashboard_client) = self else { return Ok(()) }; | ||||||
|  |  | ||||||
|         let response = dashboard_client |         let response = dashboard_client | ||||||
|             .put("run") |             .put("run") | ||||||
|             .json(&json!({ |             .json(&json!({ | ||||||
| @@ -164,4 +139,51 @@ pub async fn create_run( | |||||||
|             ) |             ) | ||||||
|         } |         } | ||||||
|         Ok(()) |         Ok(()) | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub async fn cancel_on_ctrl_c(self, invocation_uuid: Uuid, abort_handle: AbortHandle) { | ||||||
|  |         tracing::info!("press Ctrl-C to cancel the invocation"); | ||||||
|  |         match ctrl_c().await { | ||||||
|  |             Ok(()) => { | ||||||
|  |                 tracing::info!(%invocation_uuid, "received Ctrl-C, cancelling invocation"); | ||||||
|  |                 self.mark_as_failed(invocation_uuid, None).await; | ||||||
|  |                 abort_handle.abort(); | ||||||
|  |             } | ||||||
|  |             Err(error) => tracing::warn!( | ||||||
|  |                 error = &error as &dyn std::error::Error, | ||||||
|  |                 "failed to listen to Ctrl-C signal, invocation won't be canceled on Ctrl-C" | ||||||
|  |             ), | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub async fn mark_as_failed(&self, invocation_uuid: Uuid, failure_reason: Option<String>) { | ||||||
|  |         if let DashboardClient::Client(client) = self { | ||||||
|  |             let response = client | ||||||
|  |                 .post("cancel-invocation") | ||||||
|  |                 .json(&json!({ | ||||||
|  |                     "invocation_uuid": invocation_uuid, | ||||||
|  |                     "failure_reason": failure_reason, | ||||||
|  |                 })) | ||||||
|  |                 .send() | ||||||
|  |                 .await; | ||||||
|  |             let response = match response { | ||||||
|  |                 Ok(response) => response, | ||||||
|  |                 Err(response_error) => { | ||||||
|  |                     tracing::error!(error = &response_error as &dyn std::error::Error, %invocation_uuid, "could not mark invocation as failed"); | ||||||
|  |                     return; | ||||||
|  |                 } | ||||||
|  |             }; | ||||||
|  |  | ||||||
|  |             if !response.status().is_success() { | ||||||
|  |                 tracing::error!( | ||||||
|  |                     %invocation_uuid, | ||||||
|  |                     "could not mark invocation as failed: {}", | ||||||
|  |                     response.text().await.unwrap() | ||||||
|  |                 ); | ||||||
|  |                 return; | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         tracing::warn!(%invocation_uuid, "marked invocation as failed or canceled"); | ||||||
|  |     } | ||||||
| } | } | ||||||
|   | |||||||
| @@ -50,6 +50,10 @@ pub struct BenchDeriveArgs { | |||||||
|     #[arg(long, default_value_t = default_dashboard_url())] |     #[arg(long, default_value_t = default_dashboard_url())] | ||||||
|     dashboard_url: String, |     dashboard_url: String, | ||||||
|  |  | ||||||
|  |     /// Don't actually send results to the dashboard | ||||||
|  |     #[arg(long)] | ||||||
|  |     no_dashboard: bool, | ||||||
|  |  | ||||||
|     /// Directory to output reports. |     /// Directory to output reports. | ||||||
|     #[arg(long, default_value_t = default_report_folder())] |     #[arg(long, default_value_t = default_report_folder())] | ||||||
|     report_folder: String, |     report_folder: String, | ||||||
| @@ -103,11 +107,11 @@ pub fn run(args: BenchDeriveArgs) -> anyhow::Result<()> { | |||||||
|     let assets_client = |     let assets_client = | ||||||
|         Client::new(None, args.assets_key.as_deref(), Some(std::time::Duration::from_secs(3600)))?; // 1h |         Client::new(None, args.assets_key.as_deref(), Some(std::time::Duration::from_secs(3600)))?; // 1h | ||||||
|  |  | ||||||
|     let dashboard_client = Client::new( |     let dashboard_client = if args.no_dashboard { | ||||||
|         Some(format!("{}/api/v1", args.dashboard_url)), |         dashboard::DashboardClient::new_dry() | ||||||
|         args.api_key.as_deref(), |     } else { | ||||||
|         Some(std::time::Duration::from_secs(60)), |         dashboard::DashboardClient::new(&args.dashboard_url, args.api_key.as_deref())? | ||||||
|     )?; |     }; | ||||||
|  |  | ||||||
|     // reporting uses its own client because keeping the stream open to wait for entries |     // reporting uses its own client because keeping the stream open to wait for entries | ||||||
|     // blocks any other requests |     // blocks any other requests | ||||||
| @@ -127,12 +131,12 @@ pub fn run(args: BenchDeriveArgs) -> anyhow::Result<()> { | |||||||
|     // enter runtime |     // enter runtime | ||||||
|  |  | ||||||
|     rt.block_on(async { |     rt.block_on(async { | ||||||
|         dashboard::send_machine_info(&dashboard_client, &env).await?; |         dashboard_client.send_machine_info(&env).await?; | ||||||
|  |  | ||||||
|         let commit_message = build_info.commit_msg.context("missing commit message")?.split('\n').next().unwrap(); |         let commit_message = build_info.commit_msg.context("missing commit message")?.split('\n').next().unwrap(); | ||||||
|         let max_workloads = args.workload_file.len(); |         let max_workloads = args.workload_file.len(); | ||||||
|         let reason: Option<&str> = args.reason.as_deref(); |         let reason: Option<&str> = args.reason.as_deref(); | ||||||
|         let invocation_uuid = dashboard::create_invocation(&dashboard_client, build_info, commit_message, env, max_workloads, reason).await?; |         let invocation_uuid = dashboard_client.create_invocation( build_info, commit_message, env, max_workloads, reason).await?; | ||||||
|  |  | ||||||
|         tracing::info!(workload_count = args.workload_file.len(), "handling workload files"); |         tracing::info!(workload_count = args.workload_file.len(), "handling workload files"); | ||||||
|  |  | ||||||
| @@ -167,7 +171,7 @@ pub fn run(args: BenchDeriveArgs) -> anyhow::Result<()> { | |||||||
|         let abort_handle = workload_runs.abort_handle(); |         let abort_handle = workload_runs.abort_handle(); | ||||||
|         tokio::spawn({ |         tokio::spawn({ | ||||||
|             let dashboard_client = dashboard_client.clone(); |             let dashboard_client = dashboard_client.clone(); | ||||||
|             dashboard::cancel_on_ctrl_c(invocation_uuid, dashboard_client, abort_handle) |             dashboard_client.cancel_on_ctrl_c(invocation_uuid, abort_handle) | ||||||
|         }); |         }); | ||||||
|  |  | ||||||
|         // wait for the end of the main task, handle result |         // wait for the end of the main task, handle result | ||||||
| @@ -178,7 +182,7 @@ pub fn run(args: BenchDeriveArgs) -> anyhow::Result<()> { | |||||||
|             } |             } | ||||||
|             Ok(Err(error)) => { |             Ok(Err(error)) => { | ||||||
|                 tracing::error!(%invocation_uuid, error = %error, "invocation failed, attempting to report the failure to dashboard"); |                 tracing::error!(%invocation_uuid, error = %error, "invocation failed, attempting to report the failure to dashboard"); | ||||||
|                 dashboard::mark_as_failed(dashboard_client, invocation_uuid, Some(error.to_string())).await; |                 dashboard_client.mark_as_failed(invocation_uuid, Some(error.to_string())).await; | ||||||
|                 tracing::warn!(%invocation_uuid, "invocation marked as failed following error"); |                 tracing::warn!(%invocation_uuid, "invocation marked as failed following error"); | ||||||
|                 Err(error) |                 Err(error) | ||||||
|             }, |             }, | ||||||
| @@ -186,7 +190,7 @@ pub fn run(args: BenchDeriveArgs) -> anyhow::Result<()> { | |||||||
|                 match join_error.try_into_panic() { |                 match join_error.try_into_panic() { | ||||||
|                     Ok(panic) => { |                     Ok(panic) => { | ||||||
|                         tracing::error!("invocation panicked, attempting to report the failure to dashboard"); |                         tracing::error!("invocation panicked, attempting to report the failure to dashboard"); | ||||||
|                         dashboard::mark_as_failed(dashboard_client, invocation_uuid, Some("Panicked".into())).await; |                         dashboard_client.mark_as_failed( invocation_uuid, Some("Panicked".into())).await; | ||||||
|                         std::panic::resume_unwind(panic) |                         std::panic::resume_unwind(panic) | ||||||
|                     } |                     } | ||||||
|                     Err(_) => { |                     Err(_) => { | ||||||
|   | |||||||
| @@ -12,8 +12,9 @@ use uuid::Uuid; | |||||||
| use super::assets::Asset; | use super::assets::Asset; | ||||||
| use super::client::Client; | use super::client::Client; | ||||||
| use super::command::SyncMode; | use super::command::SyncMode; | ||||||
|  | use super::dashboard::DashboardClient; | ||||||
| use super::BenchDeriveArgs; | use super::BenchDeriveArgs; | ||||||
| use crate::bench::{assets, dashboard, meili_process}; | use crate::bench::{assets, meili_process}; | ||||||
|  |  | ||||||
| #[derive(Deserialize)] | #[derive(Deserialize)] | ||||||
| pub struct Workload { | pub struct Workload { | ||||||
| @@ -25,7 +26,7 @@ pub struct Workload { | |||||||
| } | } | ||||||
|  |  | ||||||
| async fn run_commands( | async fn run_commands( | ||||||
|     dashboard_client: &Client, |     dashboard_client: &DashboardClient, | ||||||
|     logs_client: &Client, |     logs_client: &Client, | ||||||
|     meili_client: &Client, |     meili_client: &Client, | ||||||
|     workload_uuid: Uuid, |     workload_uuid: Uuid, | ||||||
| @@ -64,7 +65,7 @@ async fn run_commands( | |||||||
| #[tracing::instrument(skip(assets_client, dashboard_client, logs_client, meili_client, workload, master_key, args), fields(workload = workload.name))] | #[tracing::instrument(skip(assets_client, dashboard_client, logs_client, meili_client, workload, master_key, args), fields(workload = workload.name))] | ||||||
| pub async fn execute( | pub async fn execute( | ||||||
|     assets_client: &Client, |     assets_client: &Client, | ||||||
|     dashboard_client: &Client, |     dashboard_client: &DashboardClient, | ||||||
|     logs_client: &Client, |     logs_client: &Client, | ||||||
|     meili_client: &Client, |     meili_client: &Client, | ||||||
|     invocation_uuid: Uuid, |     invocation_uuid: Uuid, | ||||||
| @@ -74,8 +75,7 @@ pub async fn execute( | |||||||
| ) -> anyhow::Result<()> { | ) -> anyhow::Result<()> { | ||||||
|     assets::fetch_assets(assets_client, &workload.assets, &args.asset_folder).await?; |     assets::fetch_assets(assets_client, &workload.assets, &args.asset_folder).await?; | ||||||
|  |  | ||||||
|     let workload_uuid = |     let workload_uuid = dashboard_client.create_workload(invocation_uuid, &workload).await?; | ||||||
|         dashboard::create_workload(dashboard_client, invocation_uuid, &workload).await?; |  | ||||||
|  |  | ||||||
|     let mut tasks = Vec::new(); |     let mut tasks = Vec::new(); | ||||||
|  |  | ||||||
| @@ -113,7 +113,7 @@ pub async fn execute( | |||||||
| #[allow(clippy::too_many_arguments)] // not best code quality, but this is a benchmark runner | #[allow(clippy::too_many_arguments)] // not best code quality, but this is a benchmark runner | ||||||
| #[tracing::instrument(skip(dashboard_client, logs_client, meili_client, workload, master_key, args), fields(workload = %workload.name))] | #[tracing::instrument(skip(dashboard_client, logs_client, meili_client, workload, master_key, args), fields(workload = %workload.name))] | ||||||
| async fn execute_run( | async fn execute_run( | ||||||
|     dashboard_client: &Client, |     dashboard_client: &DashboardClient, | ||||||
|     logs_client: &Client, |     logs_client: &Client, | ||||||
|     meili_client: &Client, |     meili_client: &Client, | ||||||
|     workload_uuid: Uuid, |     workload_uuid: Uuid, | ||||||
| @@ -202,7 +202,7 @@ async fn start_report( | |||||||
| } | } | ||||||
|  |  | ||||||
| async fn stop_report( | async fn stop_report( | ||||||
|     dashboard_client: &Client, |     dashboard_client: &DashboardClient, | ||||||
|     logs_client: &Client, |     logs_client: &Client, | ||||||
|     workload_uuid: Uuid, |     workload_uuid: Uuid, | ||||||
|     filename: String, |     filename: String, | ||||||
| @@ -232,7 +232,7 @@ async fn stop_report( | |||||||
|             .context("could not convert trace to report")?; |             .context("could not convert trace to report")?; | ||||||
|             let context = || format!("writing report to {filename}"); |             let context = || format!("writing report to {filename}"); | ||||||
|  |  | ||||||
|             dashboard::create_run(dashboard_client, workload_uuid, &report).await?; |             dashboard_client.create_run(workload_uuid, &report).await?; | ||||||
|  |  | ||||||
|             let mut output_file = std::io::BufWriter::new( |             let mut output_file = std::io::BufWriter::new( | ||||||
|                 std::fs::File::options() |                 std::fs::File::options() | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user