mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-25 04:56:28 +00:00 
			
		
		
		
	Merge #5044
5044: Adds new metrics to prometheus r=irevoire a=PedroTurik not 100% confident in this solution, especially because i couldn't make the "Search Queue searches waiting" metric give me any value other than 0 with my local testing 😆. But i believe it solves the Issue. # Pull Request ## Related issue Fixes #4998 ## What does this PR do? ### Adds new metrics to prometheus; - SearchQueue size, - SearchQueue searches running, - and Search Queue searches waiting. ## PR checklist Please check if your PR fulfills the following requirements: - [x] Does this PR fix an existing issue, or have you listed the changes applied in the PR description (and why they are needed)? - [x] Have you read the contributing guidelines? - [x] Have you made sure that the title is accurate and descriptive of the changes? Co-authored-by: Pedro Turik Firmino <pedroturik@gmail.com>
This commit is contained in:
		| @@ -49,4 +49,18 @@ lazy_static! { | ||||
|     pub static ref MEILISEARCH_IS_INDEXING: IntGauge = | ||||
|         register_int_gauge!(opts!("meilisearch_is_indexing", "Meilisearch Is Indexing")) | ||||
|             .expect("Can't create a metric"); | ||||
|     pub static ref MEILISEARCH_SEARCH_QUEUE_SIZE: IntGauge = register_int_gauge!(opts!( | ||||
|         "meilisearch_search_queue_size", | ||||
|         "Meilisearch Search Queue Size" | ||||
|     )) | ||||
|     .expect("Can't create a metric"); | ||||
|     pub static ref MEILISEARCH_SEARCHES_RUNNING: IntGauge = | ||||
|         register_int_gauge!(opts!("meilisearch_searches_running", "Meilisearch Searches Running")) | ||||
|             .expect("Can't create a metric"); | ||||
|     pub static ref MEILISEARCH_SEARCHES_WAITING_TO_BE_PROCESSED: IntGauge = | ||||
|         register_int_gauge!(opts!( | ||||
|             "meilisearch_searches_waiting_to_be_processed", | ||||
|             "Meilisearch Searches Being Processed" | ||||
|         )) | ||||
|         .expect("Can't create a metric"); | ||||
| } | ||||
|   | ||||
| @@ -10,6 +10,7 @@ use prometheus::{Encoder, TextEncoder}; | ||||
| use crate::extractors::authentication::policies::ActionPolicy; | ||||
| use crate::extractors::authentication::{AuthenticationError, GuardedData}; | ||||
| use crate::routes::create_all_stats; | ||||
| use crate::search_queue::SearchQueue; | ||||
|  | ||||
| pub fn configure(config: &mut web::ServiceConfig) { | ||||
|     config.service(web::resource("").route(web::get().to(get_metrics))); | ||||
| @@ -18,6 +19,7 @@ pub fn configure(config: &mut web::ServiceConfig) { | ||||
| pub async fn get_metrics( | ||||
|     index_scheduler: GuardedData<ActionPolicy<{ actions::METRICS_GET }>, Data<IndexScheduler>>, | ||||
|     auth_controller: Data<AuthController>, | ||||
|     search_queue: web::Data<SearchQueue>, | ||||
| ) -> Result<HttpResponse, ResponseError> { | ||||
|     index_scheduler.features().check_metrics()?; | ||||
|     let auth_filters = index_scheduler.filters(); | ||||
| @@ -35,6 +37,11 @@ pub async fn get_metrics( | ||||
|     crate::metrics::MEILISEARCH_USED_DB_SIZE_BYTES.set(response.used_database_size as i64); | ||||
|     crate::metrics::MEILISEARCH_INDEX_COUNT.set(response.indexes.len() as i64); | ||||
|  | ||||
|     crate::metrics::MEILISEARCH_SEARCH_QUEUE_SIZE.set(search_queue.capacity() as i64); | ||||
|     crate::metrics::MEILISEARCH_SEARCHES_RUNNING.set(search_queue.searches_running() as i64); | ||||
|     crate::metrics::MEILISEARCH_SEARCHES_WAITING_TO_BE_PROCESSED | ||||
|         .set(search_queue.searches_waiting() as i64); | ||||
|  | ||||
|     for (index, value) in response.indexes.iter() { | ||||
|         crate::metrics::MEILISEARCH_INDEX_DOCS_COUNT | ||||
|             .with_label_values(&[index]) | ||||
|   | ||||
| @@ -18,6 +18,8 @@ | ||||
| //!                         And should drop the Permit only once you have freed all the RAM consumed by the method. | ||||
|  | ||||
| use std::num::NonZeroUsize; | ||||
| use std::sync::atomic::{AtomicUsize, Ordering}; | ||||
| use std::sync::Arc; | ||||
| use std::time::Duration; | ||||
|  | ||||
| use rand::rngs::StdRng; | ||||
| @@ -33,6 +35,8 @@ pub struct SearchQueue { | ||||
|     /// If we have waited longer than this to get a permit, we should abort the search request entirely. | ||||
|     /// The client probably already closed the connection, but we have no way to find out. | ||||
|     time_to_abort: Duration, | ||||
|     searches_running: Arc<AtomicUsize>, | ||||
|     searches_waiting_to_be_processed: Arc<AtomicUsize>, | ||||
| } | ||||
|  | ||||
| /// You should only run search requests while holding this permit. | ||||
| @@ -68,14 +72,41 @@ impl SearchQueue { | ||||
|         // so let's not allocate any RAM and keep a capacity of 1. | ||||
|         let (sender, receiver) = mpsc::channel(1); | ||||
|  | ||||
|         tokio::task::spawn(Self::run(capacity, paralellism, receiver)); | ||||
|         Self { sender, capacity, time_to_abort: Duration::from_secs(60) } | ||||
|         let instance = Self { | ||||
|             sender, | ||||
|             capacity, | ||||
|             time_to_abort: Duration::from_secs(60), | ||||
|             searches_running: Default::default(), | ||||
|             searches_waiting_to_be_processed: Default::default(), | ||||
|         }; | ||||
|  | ||||
|         tokio::task::spawn(Self::run( | ||||
|             capacity, | ||||
|             paralellism, | ||||
|             receiver, | ||||
|             Arc::clone(&instance.searches_running), | ||||
|             Arc::clone(&instance.searches_waiting_to_be_processed), | ||||
|         )); | ||||
|  | ||||
|         instance | ||||
|     } | ||||
|  | ||||
|     pub fn with_time_to_abort(self, time_to_abort: Duration) -> Self { | ||||
|         Self { time_to_abort, ..self } | ||||
|     } | ||||
|  | ||||
|     pub fn capacity(&self) -> usize { | ||||
|         self.capacity | ||||
|     } | ||||
|  | ||||
|     pub fn searches_running(&self) -> usize { | ||||
|         self.searches_running.load(Ordering::Relaxed) | ||||
|     } | ||||
|  | ||||
|     pub fn searches_waiting(&self) -> usize { | ||||
|         self.searches_waiting_to_be_processed.load(Ordering::Relaxed) | ||||
|     } | ||||
|  | ||||
|     /// This function is the main loop, it's in charge on scheduling which search request should execute first and | ||||
|     /// how many should executes at the same time. | ||||
|     /// | ||||
| @@ -84,6 +115,8 @@ impl SearchQueue { | ||||
|         capacity: usize, | ||||
|         parallelism: NonZeroUsize, | ||||
|         mut receive_new_searches: mpsc::Receiver<oneshot::Sender<Permit>>, | ||||
|         metric_searches_running: Arc<AtomicUsize>, | ||||
|         metric_searches_waiting: Arc<AtomicUsize>, | ||||
|     ) { | ||||
|         let mut queue: Vec<oneshot::Sender<Permit>> = Default::default(); | ||||
|         let mut rng: StdRng = StdRng::from_entropy(); | ||||
| @@ -133,6 +166,9 @@ impl SearchQueue { | ||||
|                     queue.push(search_request); | ||||
|                 }, | ||||
|             } | ||||
|  | ||||
|             metric_searches_running.store(searches_running, Ordering::Relaxed); | ||||
|             metric_searches_waiting.store(queue.len(), Ordering::Relaxed); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user