Use the time budget instead of defining a deadline outside the scope

This commit is contained in:
ManyTheFish
2025-11-06 11:02:26 +01:00
parent 3e47201365
commit c29749741b
5 changed files with 122 additions and 51 deletions

View File

@@ -1,9 +1,12 @@
use crate::search::{Personalize, SearchResult}; use crate::search::{Personalize, SearchResult};
use meilisearch_types::error::{Code, ErrorCode, ResponseError}; use meilisearch_types::{
error::{Code, ErrorCode, ResponseError},
milli::TimeBudget,
};
use rand::Rng; use rand::Rng;
use reqwest::Client; use reqwest::Client;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::time::{Duration, Instant}; use std::time::Duration;
use tracing::{debug, info, warn}; use tracing::{debug, info, warn};
const COHERE_API_URL: &str = "https://api.cohere.ai/v1/rerank"; const COHERE_API_URL: &str = "https://api.cohere.ai/v1/rerank";
@@ -72,7 +75,7 @@ impl CohereService {
search_result: SearchResult, search_result: SearchResult,
personalize: &Personalize, personalize: &Personalize,
query: Option<&str>, query: Option<&str>,
deadline: Option<Instant>, time_budget: TimeBudget,
) -> Result<SearchResult, ResponseError> { ) -> Result<SearchResult, ResponseError> {
// Extract user context from personalization // Extract user context from personalization
let user_context = personalize.user_context.as_str(); let user_context = personalize.user_context.as_str();
@@ -99,7 +102,7 @@ impl CohereService {
// Call Cohere's rerank API with retry logic // Call Cohere's rerank API with retry logic
let reranked_indices = let reranked_indices =
match self.call_rerank_with_retry(&prompt, &documents, deadline).await { match self.call_rerank_with_retry(&prompt, &documents, time_budget).await {
Ok(indices) => indices, Ok(indices) => indices,
Err(PersonalizationError::DeadlineExceeded) => { Err(PersonalizationError::DeadlineExceeded) => {
// If the deadline is exceeded, return the original search result instead of an error // If the deadline is exceeded, return the original search result instead of an error
@@ -125,7 +128,7 @@ impl CohereService {
&self, &self,
query: &str, query: &str,
documents: &[String], documents: &[String],
deadline: Option<Instant>, time_budget: TimeBudget,
) -> Result<Vec<usize>, PersonalizationError> { ) -> Result<Vec<usize>, PersonalizationError> {
let request_body = CohereRerankRequest { let request_body = CohereRerankRequest {
query: query.to_string(), query: query.to_string(),
@@ -142,18 +145,8 @@ impl CohereService {
Err(retry) => { Err(retry) => {
warn!("Cohere rerank attempt #{} failed: {}", attempt, retry.error); warn!("Cohere rerank attempt #{} failed: {}", attempt, retry.error);
if let Some(deadline) = deadline { if time_budget.exceeded() {
let now = Instant::now(); return Err(PersonalizationError::DeadlineExceeded);
if now > deadline {
warn!("Could not rerank due to deadline");
return Err(PersonalizationError::DeadlineExceeded);
}
let duration_to_deadline = deadline - now;
match retry.into_duration(attempt) {
Ok(d) => d.min(duration_to_deadline),
Err(error) => return Err(error),
}
} else { } else {
match retry.into_duration(attempt) { match retry.into_duration(attempt) {
Ok(d) => d, Ok(d) => d,
@@ -345,12 +338,12 @@ impl PersonalizationService {
search_result: SearchResult, search_result: SearchResult,
personalize: &Personalize, personalize: &Personalize,
query: Option<&str>, query: Option<&str>,
deadline: Option<Instant>, time_budget: TimeBudget,
) -> Result<SearchResult, ResponseError> { ) -> Result<SearchResult, ResponseError> {
match self { match self {
Self::Cohere(cohere_service) => { Self::Cohere(cohere_service) => {
cohere_service cohere_service
.rerank_search_results(search_result, personalize, query, deadline) .rerank_search_results(search_result, personalize, query, time_budget)
.await .await
} }
Self::Disabled => Err(PersonalizationError::FeatureNotEnabled( Self::Disabled => Err(PersonalizationError::FeatureNotEnabled(

View File

@@ -352,16 +352,6 @@ pub async fn search_with_url_query(
// Extract personalization and query string before moving query // Extract personalization and query string before moving query
let personalize = query.personalize.take(); let personalize = query.personalize.take();
// Get search cutoff to create deadline for personalization (before moving index)
let deadline = if personalize.is_some() {
let rtxn = index.read_txn()?;
index.search_cutoff(&rtxn)?.map(|cutoff_ms| {
std::time::Instant::now() + std::time::Duration::from_millis(cutoff_ms)
})
} else {
None
};
let search_kind = let search_kind =
search_kind(&query, index_scheduler.get_ref(), index_uid.to_string(), &index)?; search_kind(&query, index_scheduler.get_ref(), index_uid.to_string(), &index)?;
let retrieve_vector = RetrieveVectors::new(query.retrieve_vectors); let retrieve_vector = RetrieveVectors::new(query.retrieve_vectors);
@@ -389,12 +379,12 @@ pub async fn search_with_url_query(
.await; .await;
permit.drop().await; permit.drop().await;
let search_result = search_result?; let search_result = search_result?;
if let Ok(ref search_result) = search_result { if let Ok((search_result, _)) = search_result.as_ref() {
aggregate.succeed(search_result); aggregate.succeed(search_result);
} }
analytics.publish(aggregate, &req); analytics.publish(aggregate, &req);
let mut search_result = search_result?; let (mut search_result, time_budget) = search_result?;
// Apply personalization if requested // Apply personalization if requested
if let Some(personalize) = personalize.as_ref() { if let Some(personalize) = personalize.as_ref() {
@@ -403,7 +393,7 @@ pub async fn search_with_url_query(
search_result, search_result,
personalize, personalize,
personalize_query.as_deref(), personalize_query.as_deref(),
deadline, time_budget,
) )
.await?; .await?;
} }
@@ -495,16 +485,6 @@ pub async fn search_with_post(
// Extract personalization and query string before moving query // Extract personalization and query string before moving query
let personalize = query.personalize.take(); let personalize = query.personalize.take();
// Get search cutoff to create deadline for personalization (before moving index)
let deadline = if personalize.is_some() {
let rtxn = index.read_txn()?;
index.search_cutoff(&rtxn)?.map(|cutoff_ms| {
std::time::Instant::now() + std::time::Duration::from_millis(cutoff_ms)
})
} else {
None
};
let search_kind = let search_kind =
search_kind(&query, index_scheduler.get_ref(), index_uid.to_string(), &index)?; search_kind(&query, index_scheduler.get_ref(), index_uid.to_string(), &index)?;
let retrieve_vectors = RetrieveVectors::new(query.retrieve_vectors); let retrieve_vectors = RetrieveVectors::new(query.retrieve_vectors);
@@ -532,7 +512,7 @@ pub async fn search_with_post(
.await; .await;
permit.drop().await; permit.drop().await;
let search_result = search_result?; let search_result = search_result?;
if let Ok(ref search_result) = search_result { if let Ok((ref search_result, _)) = search_result {
aggregate.succeed(search_result); aggregate.succeed(search_result);
if search_result.degraded { if search_result.degraded {
MEILISEARCH_DEGRADED_SEARCH_REQUESTS.inc(); MEILISEARCH_DEGRADED_SEARCH_REQUESTS.inc();
@@ -540,7 +520,7 @@ pub async fn search_with_post(
} }
analytics.publish(aggregate, &req); analytics.publish(aggregate, &req);
let mut search_result = search_result?; let (mut search_result, time_budget) = search_result?;
// Apply personalization if requested // Apply personalization if requested
if let Some(personalize) = personalize.as_ref() { if let Some(personalize) = personalize.as_ref() {
@@ -549,7 +529,7 @@ pub async fn search_with_post(
search_result, search_result,
personalize, personalize,
personalize_query.as_deref(), personalize_query.as_deref(),
deadline, time_budget,
) )
.await?; .await?;
} }

View File

@@ -146,6 +146,7 @@ pub struct SearchResults {
pub async fn multi_search_with_post( pub async fn multi_search_with_post(
index_scheduler: GuardedData<ActionPolicy<{ actions::SEARCH }>, Data<IndexScheduler>>, index_scheduler: GuardedData<ActionPolicy<{ actions::SEARCH }>, Data<IndexScheduler>>,
search_queue: Data<SearchQueue>, search_queue: Data<SearchQueue>,
personalization_service: web::Data<crate::personalization::PersonalizationService>,
params: AwebJson<FederatedSearch, DeserrJsonError>, params: AwebJson<FederatedSearch, DeserrJsonError>,
req: HttpRequest, req: HttpRequest,
analytics: web::Data<Analytics>, analytics: web::Data<Analytics>,
@@ -236,7 +237,7 @@ pub async fn multi_search_with_post(
// changes. // changes.
let search_results: Result<_, (ResponseError, usize)> = async { let search_results: Result<_, (ResponseError, usize)> = async {
let mut search_results = Vec::with_capacity(queries.len()); let mut search_results = Vec::with_capacity(queries.len());
for (query_index, (index_uid, query, federation_options)) in queries for (query_index, (index_uid, mut query, federation_options)) in queries
.into_iter() .into_iter()
.map(SearchQueryWithIndex::into_index_query_federation) .map(SearchQueryWithIndex::into_index_query_federation)
.enumerate() .enumerate()
@@ -269,6 +270,13 @@ pub async fn multi_search_with_post(
}) })
.with_index(query_index)?; .with_index(query_index)?;
// Extract personalization and query string before moving query
let personalize = query.personalize.take();
// Save the query string for personalization if requested
let personalize_query =
personalize.is_some().then(|| query.q.clone()).flatten();
let index_uid_str = index_uid.to_string(); let index_uid_str = index_uid.to_string();
let search_kind = search_kind( let search_kind = search_kind(
@@ -280,7 +288,7 @@ pub async fn multi_search_with_post(
.with_index(query_index)?; .with_index(query_index)?;
let retrieve_vector = RetrieveVectors::new(query.retrieve_vectors); let retrieve_vector = RetrieveVectors::new(query.retrieve_vectors);
let search_result = tokio::task::spawn_blocking(move || { let (mut search_result, time_budget) = tokio::task::spawn_blocking(move || {
perform_search( perform_search(
SearchParams { SearchParams {
index_uid: index_uid_str.clone(), index_uid: index_uid_str.clone(),
@@ -295,11 +303,25 @@ pub async fn multi_search_with_post(
) )
}) })
.await .await
.with_index(query_index)?
.with_index(query_index)?; .with_index(query_index)?;
// Apply personalization if requested
if let Some(personalize) = personalize.as_ref() {
search_result = personalization_service
.rerank_search_results(
search_result,
personalize,
personalize_query.as_deref(),
time_budget,
)
.await
.with_index(query_index)?;
}
search_results.push(SearchResultWithIndex { search_results.push(SearchResultWithIndex {
index_uid: index_uid.into_inner(), index_uid: index_uid.into_inner(),
result: search_result.with_index(query_index)?, result: search_result,
}); });
} }
Ok(search_results) Ok(search_results)

View File

@@ -1171,7 +1171,10 @@ pub struct SearchParams {
pub include_metadata: bool, pub include_metadata: bool,
} }
pub fn perform_search(params: SearchParams, index: &Index) -> Result<SearchResult, ResponseError> { pub fn perform_search(
params: SearchParams,
index: &Index,
) -> Result<(SearchResult, TimeBudget), ResponseError> {
let SearchParams { let SearchParams {
index_uid, index_uid,
query, query,
@@ -1190,7 +1193,7 @@ pub fn perform_search(params: SearchParams, index: &Index) -> Result<SearchResul
}; };
let (search, is_finite_pagination, max_total_hits, offset) = let (search, is_finite_pagination, max_total_hits, offset) =
prepare_search(index, &rtxn, &query, &search_kind, time_budget, features)?; prepare_search(index, &rtxn, &query, &search_kind, time_budget.clone(), features)?;
let ( let (
milli::SearchResult { milli::SearchResult {
@@ -1314,7 +1317,7 @@ pub fn perform_search(params: SearchParams, index: &Index) -> Result<SearchResul
request_uid: Some(request_uid), request_uid: Some(request_uid),
metadata, metadata,
}; };
Ok(result) Ok((result, time_budget))
} }
#[derive(Debug, Clone, Default, Serialize, Deserialize, ToSchema)] #[derive(Debug, Clone, Default, Serialize, Deserialize, ToSchema)]

View File

@@ -249,3 +249,76 @@ async fn search_with_personalization_without_enabling_the_feature() {
} }
"###); "###);
} }
#[actix_rt::test]
async fn multi_search_with_personalization_without_enabling_the_feature() {
let server = Server::new().await;
let index = server.unique_index();
// Create the index and add some documents
let (task, _code) = index.create(None).await;
server.wait_task(task.uid()).await.succeeded();
let (task, _code) = index
.add_documents(
json!([
{"id": 1, "title": "The Dark Knight", "genre": "Action"},
{"id": 2, "title": "Inception", "genre": "Sci-Fi"},
{"id": 3, "title": "The Matrix", "genre": "Sci-Fi"}
]),
None,
)
.await;
server.wait_task(task.uid()).await.succeeded();
// Try to multi-search with personalization - should return feature_not_enabled error
let (response, code) = server
.multi_search(json!({
"queries": [
{
"indexUid": index.uid,
"q": "movie",
"personalize": {
"userContext": "I love science fiction movies"
}
}
]
}))
.await;
meili_snap::snapshot!(code, @"400 Bad Request");
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"message": "Inside `.queries[0]`: reranking search results requires enabling the `personalization` experimental feature. See https://github.com/orgs/meilisearch/discussions/866",
"code": "feature_not_enabled",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#feature_not_enabled"
}
"###);
// Try to federated search with personalization - should return feature_not_enabled error
let (response, code) = server
.multi_search(json!({
"federation": {},
"queries": [
{
"indexUid": index.uid,
"q": "movie",
"personalize": {
"userContext": "I love science fiction movies"
}
}
]
}))
.await;
meili_snap::snapshot!(code, @"400 Bad Request");
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"message": "Inside `.queries[0]`: reranking search results requires enabling the `personalization` experimental feature. See https://github.com/orgs/meilisearch/discussions/866",
"code": "feature_not_enabled",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#feature_not_enabled"
}
"###);
}