4908: Bring back changes from release v1.10.1 to main r=dureuill a=irevoire

# Pull Request

Following the [latest release](https://github.com/meilisearch/meilisearch/releases/tag/v1.10.1), this PR brings back the changes to main.

Co-authored-by: Tamo <tamo@meilisearch.com>
Co-authored-by: irevoire <irevoire@users.noreply.github.com>
This commit is contained in:
meili-bors[bot]
2024-09-03 14:28:12 +00:00
committed by GitHub
23 changed files with 764 additions and 229 deletions

View File

@ -13,11 +13,10 @@ pub mod search_queue;
use std::fs::File;
use std::io::{BufReader, BufWriter};
use std::num::NonZeroUsize;
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use std::thread::{self, available_parallelism};
use std::thread;
use std::time::Duration;
use actix_cors::Cors;
@ -118,6 +117,7 @@ pub type LogStderrType = tracing_subscriber::filter::Filtered<
pub fn create_app(
index_scheduler: Data<IndexScheduler>,
auth_controller: Data<AuthController>,
search_queue: Data<SearchQueue>,
opt: Opt,
logs: (LogRouteHandle, LogStderrHandle),
analytics: Arc<dyn Analytics>,
@ -137,6 +137,7 @@ pub fn create_app(
s,
index_scheduler.clone(),
auth_controller.clone(),
search_queue.clone(),
&opt,
logs,
analytics.clone(),
@ -469,19 +470,16 @@ pub fn configure_data(
config: &mut web::ServiceConfig,
index_scheduler: Data<IndexScheduler>,
auth: Data<AuthController>,
search_queue: Data<SearchQueue>,
opt: &Opt,
(logs_route, logs_stderr): (LogRouteHandle, LogStderrHandle),
analytics: Arc<dyn Analytics>,
) {
let search_queue = SearchQueue::new(
opt.experimental_search_queue_size,
available_parallelism().unwrap_or(NonZeroUsize::new(2).unwrap()),
);
let http_payload_size_limit = opt.http_payload_size_limit.as_u64() as usize;
config
.app_data(index_scheduler)
.app_data(auth)
.app_data(web::Data::new(search_queue))
.app_data(search_queue)
.app_data(web::Data::from(analytics))
.app_data(web::Data::new(logs_route))
.app_data(web::Data::new(logs_stderr))

View File

@ -1,8 +1,10 @@
use std::env;
use std::io::{stderr, LineWriter, Write};
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use std::thread::available_parallelism;
use actix_web::http::KeepAlive;
use actix_web::web::Data;
@ -11,6 +13,7 @@ use index_scheduler::IndexScheduler;
use is_terminal::IsTerminal;
use meilisearch::analytics::Analytics;
use meilisearch::option::LogMode;
use meilisearch::search_queue::SearchQueue;
use meilisearch::{
analytics, create_app, setup_meilisearch, LogRouteHandle, LogRouteType, LogStderrHandle,
LogStderrType, Opt, SubscriberForSecondLayer,
@ -148,11 +151,17 @@ async fn run_http(
let opt_clone = opt.clone();
let index_scheduler = Data::from(index_scheduler);
let auth_controller = Data::from(auth_controller);
let search_queue = SearchQueue::new(
opt.experimental_search_queue_size,
available_parallelism().unwrap_or(NonZeroUsize::new(2).unwrap()),
);
let search_queue = Data::new(search_queue);
let http_server = HttpServer::new(move || {
create_app(
index_scheduler.clone(),
auth_controller.clone(),
search_queue.clone(),
opt.clone(),
logs.clone(),
analytics.clone(),

View File

@ -81,7 +81,7 @@ pub async fn search(
let index = index_scheduler.index(&index_uid)?;
let features = index_scheduler.features();
let search_kind = search_kind(&search_query, &index_scheduler, &index, features)?;
let _permit = search_queue.try_get_search_permit().await?;
let permit = search_queue.try_get_search_permit().await?;
let search_result = tokio::task::spawn_blocking(move || {
perform_facet_search(
&index,
@ -93,7 +93,9 @@ pub async fn search(
locales,
)
})
.await?;
.await;
permit.drop().await;
let search_result = search_result?;
if let Ok(ref search_result) = search_result {
aggregate.succeed(search_result);

View File

@ -233,11 +233,13 @@ pub async fn search_with_url_query(
let search_kind = search_kind(&query, index_scheduler.get_ref(), &index, features)?;
let retrieve_vector = RetrieveVectors::new(query.retrieve_vectors, features)?;
let _permit = search_queue.try_get_search_permit().await?;
let permit = search_queue.try_get_search_permit().await?;
let search_result = tokio::task::spawn_blocking(move || {
perform_search(&index, query, search_kind, retrieve_vector, index_scheduler.features())
})
.await?;
.await;
permit.drop().await;
let search_result = search_result?;
if let Ok(ref search_result) = search_result {
aggregate.succeed(search_result);
}
@ -276,11 +278,13 @@ pub async fn search_with_post(
let search_kind = search_kind(&query, index_scheduler.get_ref(), &index, features)?;
let retrieve_vectors = RetrieveVectors::new(query.retrieve_vectors, features)?;
let _permit = search_queue.try_get_search_permit().await?;
let permit = search_queue.try_get_search_permit().await?;
let search_result = tokio::task::spawn_blocking(move || {
perform_search(&index, query, search_kind, retrieve_vectors, index_scheduler.features())
})
.await?;
.await;
permit.drop().await;
let search_result = search_result?;
if let Ok(ref search_result) = search_result {
aggregate.succeed(search_result);
if search_result.degraded {

View File

@ -39,7 +39,7 @@ pub async fn multi_search_with_post(
) -> Result<HttpResponse, ResponseError> {
// Since we don't want to process half of the search requests and then get a permit refused
// we're going to get one permit for the whole duration of the multi-search request.
let _permit = search_queue.try_get_search_permit().await?;
let permit = search_queue.try_get_search_permit().await?;
let federated_search = params.into_inner();
@ -81,6 +81,7 @@ pub async fn multi_search_with_post(
perform_federated_search(&index_scheduler, queries, federation, features)
})
.await;
permit.drop().await;
if let Ok(Ok(_)) = search_result {
multi_aggregate.succeed();
@ -143,6 +144,7 @@ pub async fn multi_search_with_post(
Ok(search_results)
}
.await;
permit.drop().await;
if search_results.is_ok() {
multi_aggregate.succeed();

View File

@ -18,6 +18,7 @@
//! And should drop the Permit only once you have freed all the RAM consumed by the method.
use std::num::NonZeroUsize;
use std::time::Duration;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
@ -29,16 +30,31 @@ use crate::error::MeilisearchHttpError;
pub struct SearchQueue {
sender: mpsc::Sender<oneshot::Sender<Permit>>,
capacity: usize,
/// If we have waited longer than this to get a permit, we should abort the search request entirely.
/// The client probably already closed the connection, but we have no way to find out.
time_to_abort: Duration,
}
/// You should only run search requests while holding this permit.
/// Once it's dropped, a new search request will be able to process.
/// You should always try to drop the permit yourself calling the `drop` async method on it.
#[derive(Debug)]
pub struct Permit {
sender: mpsc::Sender<()>,
}
impl Permit {
/// Drop the permit giving back on permit to the search queue.
pub async fn drop(self) {
// if the channel is closed then the whole instance is down
let _ = self.sender.send(()).await;
}
}
impl Drop for Permit {
/// The implicit drop implementation can still be called in multiple cases:
/// - We forgot to call the explicit one somewhere => this should be fixed on our side asap
/// - The future is cancelled while running and the permit dropped with it
fn drop(&mut self) {
let sender = self.sender.clone();
// if the channel is closed then the whole instance is down
@ -53,7 +69,11 @@ impl SearchQueue {
let (sender, receiver) = mpsc::channel(1);
tokio::task::spawn(Self::run(capacity, paralellism, receiver));
Self { sender, capacity }
Self { sender, capacity, time_to_abort: Duration::from_secs(60) }
}
pub fn with_time_to_abort(self, time_to_abort: Duration) -> Self {
Self { time_to_abort, ..self }
}
/// This function is the main loop, it's in charge on scheduling which search request should execute first and
@ -119,9 +139,23 @@ impl SearchQueue {
/// Returns a search `Permit`.
/// It should be dropped as soon as you've freed all the RAM associated with the search request being processed.
pub async fn try_get_search_permit(&self) -> Result<Permit, MeilisearchHttpError> {
let now = std::time::Instant::now();
let (sender, receiver) = oneshot::channel();
self.sender.send(sender).await.map_err(|_| MeilisearchHttpError::SearchLimiterIsDown)?;
receiver.await.map_err(|_| MeilisearchHttpError::TooManySearchRequests(self.capacity))
let permit = receiver
.await
.map_err(|_| MeilisearchHttpError::TooManySearchRequests(self.capacity))?;
// If we've been for more than one minute to get a search permit, it's better to simply
// abort the search request than spending time processing something were the client
// most certainly exited or got a timeout a long time ago.
// We may find a better solution in https://github.com/actix/actix-web/issues/3462.
if now.elapsed() > self.time_to_abort {
permit.drop().await;
Err(MeilisearchHttpError::TooManySearchRequests(self.capacity))
} else {
Ok(permit)
}
}
/// Returns `Ok(())` if everything seems normal.

View File

@ -11,13 +11,11 @@ use actix_web::http::StatusCode;
use byte_unit::{Byte, Unit};
use clap::Parser;
use meilisearch::option::{IndexerOpts, MaxMemory, MaxThreads, Opt};
use meilisearch::{analytics, create_app, setup_meilisearch, SubscriberForSecondLayer};
use meilisearch::setup_meilisearch;
use once_cell::sync::Lazy;
use tempfile::TempDir;
use tokio::sync::OnceCell;
use tokio::time::sleep;
use tracing::level_filters::LevelFilter;
use tracing_subscriber::Layer;
use uuid::Uuid;
use super::index::Index;
@ -183,7 +181,7 @@ impl Server<Shared> {
let options = default_settings(dir.path());
let (index_scheduler, auth) = setup_meilisearch(&options).unwrap();
let service = Service { index_scheduler, auth, options, api_key: None };
let service = Service { index_scheduler, auth, api_key: None, options };
Server { service, _dir: Some(dir), _marker: PhantomData }
}
@ -263,28 +261,7 @@ impl<State> Server<State> {
Response = ServiceResponse<impl MessageBody>,
Error = actix_web::Error,
> {
let (_route_layer, route_layer_handle) =
tracing_subscriber::reload::Layer::new(None.with_filter(
tracing_subscriber::filter::Targets::new().with_target("", LevelFilter::OFF),
));
let (_stderr_layer, stderr_layer_handle) = tracing_subscriber::reload::Layer::new(
(Box::new(
tracing_subscriber::fmt::layer()
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE),
)
as Box<dyn tracing_subscriber::Layer<SubscriberForSecondLayer> + Send + Sync>)
.with_filter(tracing_subscriber::filter::Targets::new()),
);
actix_web::test::init_service(create_app(
self.service.index_scheduler.clone().into(),
self.service.auth.clone().into(),
self.service.options.clone(),
(route_layer_handle, stderr_layer_handle),
analytics::MockAnalytics::new(&self.service.options),
true,
))
.await
self.service.init_web_app().await
}
pub async fn list_api_keys(&self, params: &str) -> (Value, StatusCode) {

View File

@ -1,10 +1,15 @@
use std::num::NonZeroUsize;
use std::sync::Arc;
use actix_web::body::MessageBody;
use actix_web::dev::ServiceResponse;
use actix_web::http::header::ContentType;
use actix_web::http::StatusCode;
use actix_web::test;
use actix_web::test::TestRequest;
use actix_web::web::Data;
use index_scheduler::IndexScheduler;
use meilisearch::search_queue::SearchQueue;
use meilisearch::{analytics, create_app, Opt, SubscriberForSecondLayer};
use meilisearch_auth::AuthController;
use tracing::level_filters::LevelFilter;
@ -106,7 +111,13 @@ impl Service {
self.request(req).await
}
pub async fn request(&self, mut req: test::TestRequest) -> (Value, StatusCode) {
pub async fn init_web_app(
&self,
) -> impl actix_web::dev::Service<
actix_http::Request,
Response = ServiceResponse<impl MessageBody>,
Error = actix_web::Error,
> {
let (_route_layer, route_layer_handle) =
tracing_subscriber::reload::Layer::new(None.with_filter(
tracing_subscriber::filter::Targets::new().with_target("", LevelFilter::OFF),
@ -119,16 +130,25 @@ impl Service {
as Box<dyn tracing_subscriber::Layer<SubscriberForSecondLayer> + Send + Sync>)
.with_filter(tracing_subscriber::filter::Targets::new()),
);
let search_queue = SearchQueue::new(
self.options.experimental_search_queue_size,
NonZeroUsize::new(1).unwrap(),
);
let app = test::init_service(create_app(
actix_web::test::init_service(create_app(
self.index_scheduler.clone().into(),
self.auth.clone().into(),
Data::new(search_queue),
self.options.clone(),
(route_layer_handle, stderr_layer_handle),
analytics::MockAnalytics::new(&self.options),
true,
))
.await;
.await
}
pub async fn request(&self, mut req: test::TestRequest) -> (Value, StatusCode) {
let app = self.init_web_app().await;
if let Some(api_key) = &self.api_key {
req = req.insert_header(("Authorization", ["Bearer ", api_key].concat()));

View File

@ -6,6 +6,7 @@ use actix_web::test;
use crate::common::{Server, Value};
#[derive(Debug)]
enum HttpVerb {
Put,
Patch,
@ -80,7 +81,7 @@ async fn error_json_bad_content_type() {
let status_code = res.status();
let body = test::read_body(res).await;
let response: Value = serde_json::from_slice(&body).unwrap_or_default();
assert_eq!(status_code, 415, "calling the route `{}` without content-type is supposed to throw a bad media type error", route);
assert_eq!(status_code, 415, "calling the route `{verb:?} {route}` without content-type is supposed to throw a bad media type error:\n{}", String::from_utf8_lossy(&body));
assert_eq!(
response,
json!({

View File

@ -1,10 +1,13 @@
mod error;
use std::num::NonZeroUsize;
use std::rc::Rc;
use std::str::FromStr;
use actix_web::http::header::ContentType;
use actix_web::web::Data;
use meili_snap::snapshot;
use meilisearch::search_queue::SearchQueue;
use meilisearch::{analytics, create_app, Opt, SubscriberForSecondLayer};
use tracing::level_filters::LevelFilter;
use tracing_subscriber::layer::SubscriberExt;
@ -40,10 +43,15 @@ async fn basic_test_log_stream_route() {
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::ACTIVE)
.with_filter(tracing_subscriber::filter::LevelFilter::from_str("OFF").unwrap()),
);
let search_queue = SearchQueue::new(
server.service.options.experimental_search_queue_size,
NonZeroUsize::new(1).unwrap(),
);
let app = actix_web::test::init_service(create_app(
server.service.index_scheduler.clone().into(),
server.service.auth.clone().into(),
Data::new(search_queue),
server.service.options.clone(),
(route_layer_handle, stderr_layer_handle),
analytics::MockAnalytics::new(&server.service.options),

View File

@ -37,6 +37,43 @@ async fn search_queue_register() {
.unwrap();
}
#[actix_rt::test]
async fn search_queue_register_with_explicit_drop() {
let queue = SearchQueue::new(4, NonZeroUsize::new(2).unwrap());
// First, use all the cores
let permit1 = queue.try_get_search_permit().await.unwrap();
let _permit2 = queue.try_get_search_permit().await.unwrap();
// If we free one spot we should be able to register one new search
permit1.drop().await;
let permit3 = queue.try_get_search_permit().await.unwrap();
// And again
permit3.drop().await;
let _permit4 = queue.try_get_search_permit().await.unwrap();
}
#[actix_rt::test]
async fn search_queue_register_with_time_to_abort() {
let queue = Arc::new(
SearchQueue::new(1, NonZeroUsize::new(1).unwrap())
.with_time_to_abort(Duration::from_secs(1)),
);
// First, use all the cores
let permit1 = queue.try_get_search_permit().await.unwrap();
let q = queue.clone();
let permit2 = tokio::task::spawn(async move { q.try_get_search_permit().await });
tokio::time::sleep(Duration::from_secs(1)).await;
permit1.drop().await;
let ret = permit2.await.unwrap();
snapshot!(ret.unwrap_err(), @"Too many search requests running at the same time: 1. Retry after 10s.");
}
#[actix_rt::test]
async fn wait_till_cores_are_available() {
let queue = Arc::new(SearchQueue::new(4, NonZeroUsize::new(1).unwrap()));