mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-25 04:56:28 +00:00 
			
		
		
		
	stop trying to process searches after one minute
This commit is contained in:
		| @@ -18,6 +18,7 @@ | |||||||
| //!                         And should drop the Permit only once you have freed all the RAM consumed by the method. | //!                         And should drop the Permit only once you have freed all the RAM consumed by the method. | ||||||
|  |  | ||||||
| use std::num::NonZeroUsize; | use std::num::NonZeroUsize; | ||||||
|  | use std::time::Duration; | ||||||
|  |  | ||||||
| use rand::rngs::StdRng; | use rand::rngs::StdRng; | ||||||
| use rand::{Rng, SeedableRng}; | use rand::{Rng, SeedableRng}; | ||||||
| @@ -29,6 +30,9 @@ use crate::error::MeilisearchHttpError; | |||||||
| pub struct SearchQueue { | pub struct SearchQueue { | ||||||
|     sender: mpsc::Sender<oneshot::Sender<Permit>>, |     sender: mpsc::Sender<oneshot::Sender<Permit>>, | ||||||
|     capacity: usize, |     capacity: usize, | ||||||
|  |     /// If we have waited longer than this to get a permit, we should abort the search request entirely. | ||||||
|  |     /// The client probably already closed the connection, but we have no way to find out. | ||||||
|  |     time_to_abort: Duration, | ||||||
| } | } | ||||||
|  |  | ||||||
| /// You should only run search requests while holding this permit. | /// You should only run search requests while holding this permit. | ||||||
| @@ -65,7 +69,11 @@ impl SearchQueue { | |||||||
|         let (sender, receiver) = mpsc::channel(1); |         let (sender, receiver) = mpsc::channel(1); | ||||||
|  |  | ||||||
|         tokio::task::spawn(Self::run(capacity, paralellism, receiver)); |         tokio::task::spawn(Self::run(capacity, paralellism, receiver)); | ||||||
|         Self { sender, capacity } |         Self { sender, capacity, time_to_abort: Duration::from_secs(60) } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub fn with_time_to_abort(self, time_to_abort: Duration) -> Self { | ||||||
|  |         Self { time_to_abort, ..self } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     /// This function is the main loop, it's in charge on scheduling which search request should execute first and |     /// This function is the main loop, it's in charge on scheduling which search request should execute first and | ||||||
| @@ -131,9 +139,23 @@ impl SearchQueue { | |||||||
|     /// Returns a search `Permit`. |     /// Returns a search `Permit`. | ||||||
|     /// It should be dropped as soon as you've freed all the RAM associated with the search request being processed. |     /// It should be dropped as soon as you've freed all the RAM associated with the search request being processed. | ||||||
|     pub async fn try_get_search_permit(&self) -> Result<Permit, MeilisearchHttpError> { |     pub async fn try_get_search_permit(&self) -> Result<Permit, MeilisearchHttpError> { | ||||||
|  |         let now = std::time::Instant::now(); | ||||||
|         let (sender, receiver) = oneshot::channel(); |         let (sender, receiver) = oneshot::channel(); | ||||||
|         self.sender.send(sender).await.map_err(|_| MeilisearchHttpError::SearchLimiterIsDown)?; |         self.sender.send(sender).await.map_err(|_| MeilisearchHttpError::SearchLimiterIsDown)?; | ||||||
|         receiver.await.map_err(|_| MeilisearchHttpError::TooManySearchRequests(self.capacity)) |         let permit = receiver | ||||||
|  |             .await | ||||||
|  |             .map_err(|_| MeilisearchHttpError::TooManySearchRequests(self.capacity))?; | ||||||
|  |  | ||||||
|  |         // If we've been for more than one minute to get a search permit, it's better to simply | ||||||
|  |         // abort the search request than spending time processing something were the client | ||||||
|  |         // most certainly exited or got a timeout a long time ago. | ||||||
|  |         // We may find a better solution in https://github.com/actix/actix-web/issues/3462. | ||||||
|  |         if now.elapsed() > self.time_to_abort { | ||||||
|  |             permit.drop().await; | ||||||
|  |             Err(MeilisearchHttpError::TooManySearchRequests(self.capacity)) | ||||||
|  |         } else { | ||||||
|  |             Ok(permit) | ||||||
|  |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     /// Returns `Ok(())` if everything seems normal. |     /// Returns `Ok(())` if everything seems normal. | ||||||
|   | |||||||
| @@ -56,6 +56,24 @@ async fn search_queue_register_with_explicit_drop() { | |||||||
|     let _permit4 = queue.try_get_search_permit().await.unwrap(); |     let _permit4 = queue.try_get_search_permit().await.unwrap(); | ||||||
| } | } | ||||||
|  |  | ||||||
|  | #[actix_rt::test] | ||||||
|  | async fn search_queue_register_with_time_to_abort() { | ||||||
|  |     let queue = Arc::new( | ||||||
|  |         SearchQueue::new(1, NonZeroUsize::new(1).unwrap()) | ||||||
|  |             .with_time_to_abort(Duration::from_secs(1)), | ||||||
|  |     ); | ||||||
|  |  | ||||||
|  |     // First, use all the cores | ||||||
|  |     let permit1 = queue.try_get_search_permit().await.unwrap(); | ||||||
|  |     let q = queue.clone(); | ||||||
|  |     let permit2 = tokio::task::spawn(async move { q.try_get_search_permit().await }); | ||||||
|  |     tokio::time::sleep(Duration::from_secs(1)).await; | ||||||
|  |     permit1.drop().await; | ||||||
|  |     let ret = permit2.await.unwrap(); | ||||||
|  |  | ||||||
|  |     snapshot!(ret.unwrap_err(), @"Too many search requests running at the same time: 1. Retry after 10s."); | ||||||
|  | } | ||||||
|  |  | ||||||
| #[actix_rt::test] | #[actix_rt::test] | ||||||
| async fn wait_till_cores_are_available() { | async fn wait_till_cores_are_available() { | ||||||
|     let queue = Arc::new(SearchQueue::new(4, NonZeroUsize::new(1).unwrap())); |     let queue = Arc::new(SearchQueue::new(4, NonZeroUsize::new(1).unwrap())); | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user