mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-31 07:56:28 +00:00 
			
		
		
		
	wip create a search batcher
This commit is contained in:
		| @@ -1,5 +1,7 @@ | ||||
| use actix_web::HttpRequest; | ||||
| use meilisearch_lib::index::SearchQuery; | ||||
| use serde_json::Value; | ||||
| use std::collections::{HashMap, HashSet}; | ||||
| use std::fmt::Display; | ||||
| use std::fs::read_to_string; | ||||
|  | ||||
| @@ -11,12 +13,14 @@ mod segment { | ||||
|     use crate::analytics::Analytics; | ||||
|     use actix_web::http::header::USER_AGENT; | ||||
|     use actix_web::HttpRequest; | ||||
|     use meilisearch_lib::index::SearchQuery; | ||||
|     use meilisearch_lib::index_controller::Stats; | ||||
|     use meilisearch_lib::MeiliSearch; | ||||
|     use once_cell::sync::Lazy; | ||||
|     use segment::message::{Identify, Track, User}; | ||||
|     use segment::{AutoBatcher, Batcher, HttpClient}; | ||||
|     use serde_json::{json, Value}; | ||||
|     use std::collections::{HashMap, HashSet}; | ||||
|     use std::fmt::Display; | ||||
|     use std::fs; | ||||
|     use std::time::{Duration, Instant}; | ||||
| @@ -32,6 +36,8 @@ mod segment { | ||||
|         user: User, | ||||
|         opt: Opt, | ||||
|         batcher: Mutex<AutoBatcher>, | ||||
|         post_search_batcher: Mutex<SearchBatcher>, | ||||
|         get_search_batcher: Mutex<SearchBatcher>, | ||||
|     } | ||||
|  | ||||
|     impl SegmentAnalytics { | ||||
| @@ -103,6 +109,8 @@ mod segment { | ||||
|                 user, | ||||
|                 opt: opt.clone(), | ||||
|                 batcher, | ||||
|                 post_search_batcher: Mutex::new(SearchBatcher::default()), | ||||
|                 get_search_batcher: Mutex::new(SearchBatcher::default()), | ||||
|             }); | ||||
|             let segment = Box::leak(segment); | ||||
|  | ||||
| @@ -141,6 +149,92 @@ mod segment { | ||||
|                 } | ||||
|             }); | ||||
|         } | ||||
|  | ||||
|         fn start_search( | ||||
|             &'static self, | ||||
|             getter: impl Fn(&'static Self) -> &'static Mutex<SearchBatcher> + Send + Sync + 'static, | ||||
|             query: &SearchQuery, | ||||
|             request: &HttpRequest, | ||||
|         ) { | ||||
|             let user_agent = SearchBatcher::extract_user_agents(request); | ||||
|             let sorted = query.sort.is_some() as usize; | ||||
|             let sort_with_geo_point = query | ||||
|                 .sort | ||||
|                 .as_ref() | ||||
|                 .map_or(false, |s| s.iter().any(|s| s.contains("_geoPoint("))); | ||||
|             let sort_criteria_terms = query.sort.as_ref().map_or(0, |s| s.len()); | ||||
|  | ||||
|             // since there is quite a bit of computation made on the filter we are going to do that in the async task | ||||
|             let filter = query.filter.clone(); | ||||
|             let queried = query.q.is_some(); | ||||
|             let nb_terms = query.q.as_ref().map_or(0, |s| s.split_whitespace().count()); | ||||
|  | ||||
|             let max_limit = query.limit; | ||||
|             let max_offset = query.offset.unwrap_or_default(); | ||||
|  | ||||
|             // to avoid blocking the search we are going to do the heavier computation in an async task | ||||
|             // and take the mutex in the same task | ||||
|             tokio::spawn(async move { | ||||
|                 let filtered = filter.is_some() as usize; | ||||
|                 let syntax = match filter.as_ref() { | ||||
|                     Some(Value::String(_)) => "string".to_string(), | ||||
|                     Some(Value::Array(values)) => { | ||||
|                         if values.iter().map(|v| v.to_string()).any(|s| { | ||||
|                             s.contains(['=', '<', '>', '!'].as_ref()) | ||||
|                                 || s.contains("_geoRadius") | ||||
|                                 || s.contains("TO") | ||||
|                         }) { | ||||
|                             "mixed".to_string() | ||||
|                         } else { | ||||
|                             "array".to_string() | ||||
|                         } | ||||
|                     } | ||||
|                     _ => "none".to_string(), | ||||
|                 }; | ||||
|                 let stringified_filters = filter.map_or(String::new(), |v| v.to_string()); | ||||
|                 let filter_with_geo_radius = stringified_filters.contains("_geoRadius("); | ||||
|                 let filter_number_of_criteria = stringified_filters | ||||
|                     .split("!=") | ||||
|                     .map(|s| s.split("<=")) | ||||
|                     .flatten() | ||||
|                     .map(|s| s.split(">=")) | ||||
|                     .flatten() | ||||
|                     .map(|s| s.split(['=', '<', '>', '!'].as_ref())) | ||||
|                     .flatten() | ||||
|                     .map(|s| s.split("_geoRadius(")) | ||||
|                     .flatten() | ||||
|                     .map(|s| s.split("TO")) | ||||
|                     .flatten() | ||||
|                     .count() | ||||
|                     - 1; | ||||
|  | ||||
|                 println!("Batching a search"); | ||||
|                 let mut search_batcher = getter(self).lock().await; | ||||
|                 user_agent.into_iter().for_each(|ua| { | ||||
|                     search_batcher.user_agents.insert(ua); | ||||
|                 }); | ||||
|                 search_batcher.total_received += 1; | ||||
|  | ||||
|                 // sort | ||||
|                 search_batcher.sort_with_geo_point |= sort_with_geo_point; | ||||
|                 search_batcher.sort_sum_of_criteria_terms += sort_criteria_terms; | ||||
|                 search_batcher.sort_total_number_of_criteria += sorted; | ||||
|  | ||||
|                 // filter | ||||
|                 search_batcher.filter_with_geo_radius |= filter_with_geo_radius; | ||||
|                 search_batcher.filter_sum_of_criteria_terms += filter_number_of_criteria; | ||||
|                 search_batcher.filter_total_number_of_criteria += filtered as usize; | ||||
|                 *search_batcher.used_syntax.entry(syntax).or_insert(0) += 1; | ||||
|  | ||||
|                 // q | ||||
|                 search_batcher.sum_of_terms_count += nb_terms; | ||||
|                 search_batcher.total_number_of_q += queried as usize; | ||||
|  | ||||
|                 // pagination | ||||
|                 search_batcher.max_limit = search_batcher.max_limit.max(max_limit); | ||||
|                 search_batcher.max_offset = search_batcher.max_offset.max(max_offset); | ||||
|             }); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     #[async_trait::async_trait] | ||||
| @@ -160,7 +254,7 @@ mod segment { | ||||
|                     .push(Track { | ||||
|                         user: self.user.clone(), | ||||
|                         event: event_name.clone(), | ||||
|                         context: content_type.map(|user_agent| json!({ "user-agent": user_agent.split(";").map(|u| u.trim()).collect::<Vec<&str>>() })), | ||||
|                         context: content_type.map(|user_agent| json!({ "user-agent": user_agent.split(";").map(str::trim).collect::<Vec<&str>>() })), | ||||
|                         properties: send, | ||||
|                         ..Default::default() | ||||
|                     }) | ||||
| @@ -168,6 +262,30 @@ mod segment { | ||||
|                 println!("ANALYTICS {} pushed", event_name); | ||||
|             }); | ||||
|         } | ||||
|  | ||||
|         fn start_get_search(&'static self, query: &SearchQuery, request: &HttpRequest) { | ||||
|             self.start_search(|s| &s.get_search_batcher, query, request) | ||||
|         } | ||||
|  | ||||
|         fn end_get_search(&'static self, process_time: usize) { | ||||
|             tokio::spawn(async move { | ||||
|                 let mut search_batcher = self.get_search_batcher.lock().await; | ||||
|                 search_batcher.total_succeeded += 1; | ||||
|                 search_batcher.time_spent.push(process_time); | ||||
|             }); | ||||
|         } | ||||
|  | ||||
|         fn start_post_search(&'static self, query: &SearchQuery, request: &HttpRequest) { | ||||
|             self.start_search(|s| &s.post_search_batcher, query, request) | ||||
|         } | ||||
|  | ||||
|         fn end_post_search(&'static self, process_time: usize) { | ||||
|             tokio::spawn(async move { | ||||
|                 let mut search_batcher = self.get_search_batcher.lock().await; | ||||
|                 search_batcher.total_succeeded += 1; | ||||
|                 search_batcher.time_spent.push(process_time); | ||||
|             }); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     impl Display for SegmentAnalytics { | ||||
| @@ -175,6 +293,96 @@ mod segment { | ||||
|             write!(f, "{}", self.user) | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     #[derive(Default)] | ||||
|     pub struct SearchBatcher { | ||||
|         // context | ||||
|         user_agents: HashSet<String>, | ||||
|  | ||||
|         // requests | ||||
|         total_received: usize, | ||||
|         total_succeeded: usize, | ||||
|         time_spent: Vec<usize>, | ||||
|  | ||||
|         // sort | ||||
|         sort_with_geo_point: bool, | ||||
|         // everytime a request has a filter, this field must be incremented by the number of terms it contains | ||||
|         sort_sum_of_criteria_terms: usize, | ||||
|         // everytime a request has a filter, this field must be incremented by one | ||||
|         sort_total_number_of_criteria: usize, | ||||
|  | ||||
|         // filter | ||||
|         filter_with_geo_radius: bool, | ||||
|         // everytime a request has a filter, this field must be incremented by the number of terms it contains | ||||
|         filter_sum_of_criteria_terms: usize, | ||||
|         // everytime a request has a filter, this field must be incremented by one | ||||
|         filter_total_number_of_criteria: usize, | ||||
|         used_syntax: HashMap<String, usize>, | ||||
|  | ||||
|         // q | ||||
|         // everytime a request has a q field, this field must be incremented by the number of terms | ||||
|         sum_of_terms_count: usize, | ||||
|         // everytime a request has a q field, this field must be incremented by one | ||||
|         total_number_of_q: usize, | ||||
|  | ||||
|         // pagination | ||||
|         max_limit: usize, | ||||
|         max_offset: usize, | ||||
|     } | ||||
|  | ||||
|     impl SearchBatcher { | ||||
|         pub fn extract_user_agents(request: &HttpRequest) -> Vec<String> { | ||||
|             request | ||||
|                 .headers() | ||||
|                 .get(USER_AGENT) | ||||
|                 .map(|header| header.to_str().ok()) | ||||
|                 .flatten() | ||||
|                 .unwrap_or("unknown") | ||||
|                 .split(";") | ||||
|                 .map(str::trim) | ||||
|                 .map(ToString::to_string) | ||||
|                 .collect() | ||||
|         } | ||||
|  | ||||
|         pub fn into_event(mut self, user: User, event_name: String) -> Track { | ||||
|             let context = Some(json!({ "user-agent": self.user_agents})); | ||||
|             let percentile_99th = 0.99 * (self.total_succeeded as f64 - 1.) + 1.; | ||||
|             self.time_spent.drain(percentile_99th as usize..); | ||||
|  | ||||
|             let properties = json!({ | ||||
|                 "requests": { | ||||
|                     "99th_response_time":  self.time_spent.len() as f64 / self.time_spent.iter().sum::<usize>() as f64, | ||||
|                     "total_succeeded": self.total_succeeded, | ||||
|                     "total_failed": self.total_received.saturating_sub(self.total_succeeded), // just to be sure we never panics | ||||
|                     "total_received": self.total_received, | ||||
|                 }, | ||||
|                 "sort": { | ||||
|                     "with_geoPoint": self.sort_with_geo_point, | ||||
|                     "avg_criteria_number": self.sort_total_number_of_criteria as f64 / self.sort_sum_of_criteria_terms as f64, | ||||
|                 }, | ||||
|                 "filter": { | ||||
|                    "with_geoRadius": self.filter_with_geo_radius, | ||||
|                    "avg_criteria_number": self.filter_total_number_of_criteria as f64 / self.filter_sum_of_criteria_terms as f64, | ||||
|                    "most_used_syntax": self.used_syntax.iter().max_by_key(|(_, v)| *v).map(|(k, _)| json!(k)).unwrap_or_else(|| json!(null)), | ||||
|                 }, | ||||
|                 "q": { | ||||
|                    "avg_terms_number": self.total_number_of_q as f64 / self.sum_of_terms_count as f64, | ||||
|                 }, | ||||
|                 "pagination": { | ||||
|                    "max_limit": self.max_limit, | ||||
|                    "max_offset": self.max_offset, | ||||
|                 }, | ||||
|             }); | ||||
|  | ||||
|             Track { | ||||
|                 user, | ||||
|                 event: event_name, | ||||
|                 context, | ||||
|                 properties, | ||||
|                 ..Default::default() | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| // if we are in debug mode OR the analytics feature is disabled | ||||
| @@ -199,8 +407,12 @@ impl MockAnalytics { | ||||
|  | ||||
| #[async_trait::async_trait] | ||||
| impl Analytics for MockAnalytics { | ||||
|     /// This is a noop and should be optimized out | ||||
|     // These methods are noop and should be optimized out | ||||
|     fn publish(&'static self, _event_name: String, _send: Value, _request: Option<&HttpRequest>) {} | ||||
|     fn start_get_search(&'static self, _query: &SearchQuery, _request: &HttpRequest) {} | ||||
|     fn end_get_search(&'static self, _process_time: usize) {} | ||||
|     fn start_post_search(&'static self, _query: &SearchQuery, _request: &HttpRequest) {} | ||||
|     fn end_post_search(&'static self, _process_time: usize) {} | ||||
| } | ||||
|  | ||||
| impl Display for MockAnalytics { | ||||
| @@ -211,5 +423,16 @@ impl Display for MockAnalytics { | ||||
|  | ||||
| #[async_trait::async_trait] | ||||
| pub trait Analytics: Display + Sync + Send { | ||||
|     /// The method used to publish most analytics that do not need to be batched every hours | ||||
|     fn publish(&'static self, event_name: String, send: Value, request: Option<&HttpRequest>); | ||||
|  | ||||
|     /// This method should be called to batch a get search request | ||||
|     fn start_get_search(&'static self, query: &SearchQuery, request: &HttpRequest); | ||||
|     /// This method should be called once a get search request has succeeded | ||||
|     fn end_get_search(&'static self, process_time: usize); | ||||
|  | ||||
|     /// This method should be called to batch a get search request | ||||
|     fn start_post_search(&'static self, query: &SearchQuery, request: &HttpRequest); | ||||
|     /// This method should be called once a post search request has succeeded | ||||
|     fn end_post_search(&'static self, process_time: usize); | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user