diff --git a/Cargo.lock b/Cargo.lock index af79cac2a..c919caa15 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2758,9 +2758,9 @@ dependencies = [ [[package]] name = "hannoy" -version = "0.0.9-nested-rtxns" +version = "0.0.9-nested-rtxns-2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc5a945b92b063e677d658cfcc7cb6dec2502fe44631f017084938f14d6ce30e" +checksum = "06eda090938d9dcd568c8c2a5de383047ed9191578ebf4a342d2975d16e621f2" dependencies = [ "bytemuck", "byteorder", diff --git a/crates/meilisearch/src/search/federated/perform.rs b/crates/meilisearch/src/search/federated/perform.rs index 3098037a3..8cd03cc75 100644 --- a/crates/meilisearch/src/search/federated/perform.rs +++ b/crates/meilisearch/src/search/federated/perform.rs @@ -20,10 +20,10 @@ use tokio::task::JoinHandle; use uuid::Uuid; use super::super::ranking_rules::{self, RankingRules}; -use super::super::SearchMetadata; use super::super::{ compute_facet_distribution_stats, prepare_search, AttributesFormat, ComputedFacets, HitMaker, - HitsInfo, RetrieveVectors, SearchHit, SearchKind, SearchQuery, SearchQueryWithIndex, + HitsInfo, RetrieveVectors, SearchHit, SearchKind, SearchMetadata, SearchQuery, + SearchQueryWithIndex, }; use super::proxy::{proxy_search, ProxySearchError, ProxySearchParams}; use super::types::{ @@ -870,6 +870,12 @@ impl SearchByIndex { return Err(error); } let mut results_by_query = Vec::with_capacity(queries.len()); + + // all queries for an index share the same budget + let time_budget = match cutoff { + Some(cutoff) => TimeBudget::new(Duration::from_millis(cutoff)), + None => TimeBudget::default(), + }; for QueryByIndex { query, weight, query_index } in queries { // use an immediately invoked lambda to capture the result without returning from the function @@ -939,17 +945,13 @@ impl SearchByIndex { let retrieve_vectors = RetrieveVectors::new(query.retrieve_vectors); - let time_budget = match cutoff { - Some(cutoff) => TimeBudget::new(Duration::from_millis(cutoff)), - None => TimeBudget::default(), - }; - let (mut search, _is_finite_pagination, _max_total_hits, _offset) = prepare_search( &index, &rtxn, &query, &search_kind, - time_budget, + // clones of `TimeBudget` share the budget rather than restart it + time_budget.clone(), params.features, )?; diff --git a/crates/milli/Cargo.toml b/crates/milli/Cargo.toml index 517cd102c..938dfe95e 100644 --- a/crates/milli/Cargo.toml +++ b/crates/milli/Cargo.toml @@ -90,7 +90,7 @@ rhai = { version = "1.22.2", features = [ "sync", ] } arroy = "0.6.4-nested-rtxns" -hannoy = { version = "0.0.9-nested-rtxns", features = ["arroy"] } +hannoy = { version = "0.0.9-nested-rtxns-2", features = ["arroy"] } rand = "0.8.5" tracing = "0.1.41" ureq = { version = "2.12.1", features = ["json"] } diff --git a/crates/milli/src/search/new/bucket_sort.rs b/crates/milli/src/search/new/bucket_sort.rs index 50362b85b..72c0fed4e 100644 --- a/crates/milli/src/search/new/bucket_sort.rs +++ b/crates/milli/src/search/new/bucket_sort.rs @@ -97,7 +97,7 @@ pub fn bucket_sort<'ctx, Q: RankingRuleQueryTrait>( logger.start_iteration_ranking_rule(0, ranking_rules[0].as_ref(), query, universe); - ranking_rules[0].start_iteration(ctx, logger, universe, query)?; + ranking_rules[0].start_iteration(ctx, logger, universe, query, &time_budget)?; let mut ranking_rule_scores: Vec = vec![]; @@ -168,42 +168,6 @@ pub fn bucket_sort<'ctx, Q: RankingRuleQueryTrait>( }; while valid_docids.len() < max_len_to_evaluate { - if time_budget.exceeded() { - loop { - let bucket = std::mem::take(&mut ranking_rule_universes[cur_ranking_rule_index]); - ranking_rule_scores.push(ScoreDetails::Skipped); - - // remove candidates from the universe without adding them to result if their score is below the threshold - let is_below_threshold = - ranking_score_threshold.is_some_and(|ranking_score_threshold| { - let current_score = ScoreDetails::global_score(ranking_rule_scores.iter()); - current_score < ranking_score_threshold - }); - - if is_below_threshold { - all_candidates -= &bucket; - all_candidates -= &ranking_rule_universes[cur_ranking_rule_index]; - } else { - maybe_add_to_results!(bucket); - } - - ranking_rule_scores.pop(); - - if cur_ranking_rule_index == 0 { - break; - } - - back!(); - } - - return Ok(BucketSortOutput { - scores: valid_scores, - docids: valid_docids, - all_candidates, - degraded: true, - }); - } - // The universe for this bucket is zero, so we don't need to sort // anything, just go back to the parent ranking rule. if ranking_rule_universes[cur_ranking_rule_index].is_empty() @@ -216,14 +180,63 @@ pub fn bucket_sort<'ctx, Q: RankingRuleQueryTrait>( continue; } - let Some(next_bucket) = ranking_rules[cur_ranking_rule_index].next_bucket( - ctx, - logger, - &ranking_rule_universes[cur_ranking_rule_index], - )? - else { - back!(); - continue; + let next_bucket = if time_budget.exceeded() { + match ranking_rules[cur_ranking_rule_index].non_blocking_next_bucket( + ctx, + logger, + &ranking_rule_universes[cur_ranking_rule_index], + )? { + std::task::Poll::Ready(bucket) => bucket, + std::task::Poll::Pending => { + loop { + let bucket = + std::mem::take(&mut ranking_rule_universes[cur_ranking_rule_index]); + ranking_rule_scores.push(ScoreDetails::Skipped); + + // remove candidates from the universe without adding them to result if their score is below the threshold + let is_below_threshold = + ranking_score_threshold.is_some_and(|ranking_score_threshold| { + let current_score = + ScoreDetails::global_score(ranking_rule_scores.iter()); + current_score < ranking_score_threshold + }); + + if is_below_threshold { + all_candidates -= &bucket; + all_candidates -= &ranking_rule_universes[cur_ranking_rule_index]; + } else { + maybe_add_to_results!(bucket); + } + + ranking_rule_scores.pop(); + + if cur_ranking_rule_index == 0 { + break; + } + + back!(); + } + + return Ok(BucketSortOutput { + scores: valid_scores, + docids: valid_docids, + all_candidates, + degraded: true, + }); + } + } + } else { + let Some(next_bucket) = ranking_rules[cur_ranking_rule_index].next_bucket( + ctx, + logger, + &ranking_rule_universes[cur_ranking_rule_index], + &time_budget, + )? + else { + back!(); + continue; + }; + next_bucket }; ranking_rule_scores.push(next_bucket.score); @@ -275,6 +288,7 @@ pub fn bucket_sort<'ctx, Q: RankingRuleQueryTrait>( logger, &next_bucket.candidates, &next_bucket.query, + &time_budget, )?; } diff --git a/crates/milli/src/search/new/exact_attribute.rs b/crates/milli/src/search/new/exact_attribute.rs index 269879ad6..4435be8d0 100644 --- a/crates/milli/src/search/new/exact_attribute.rs +++ b/crates/milli/src/search/new/exact_attribute.rs @@ -6,7 +6,7 @@ use super::ranking_rules::{RankingRule, RankingRuleOutput}; use crate::score_details::{self, ScoreDetails}; use crate::search::new::query_graph::QueryNodeData; use crate::search::new::query_term::ExactTerm; -use crate::{CboRoaringBitmapCodec, Result, SearchContext, SearchLogger}; +use crate::{CboRoaringBitmapCodec, Result, SearchContext, SearchLogger, TimeBudget}; /// A ranking rule that produces 3 disjoint buckets: /// @@ -35,6 +35,7 @@ impl<'ctx> RankingRule<'ctx, QueryGraph> for ExactAttribute { _logger: &mut dyn SearchLogger, universe: &roaring::RoaringBitmap, query: &QueryGraph, + _time_budget: &TimeBudget, ) -> Result<()> { self.state = State::start_iteration(ctx, universe, query)?; Ok(()) @@ -46,6 +47,7 @@ impl<'ctx> RankingRule<'ctx, QueryGraph> for ExactAttribute { _ctx: &mut SearchContext<'ctx>, _logger: &mut dyn SearchLogger, universe: &roaring::RoaringBitmap, + _time_budget: &TimeBudget, ) -> Result>> { let state = std::mem::take(&mut self.state); let (state, output) = State::next(state, universe); diff --git a/crates/milli/src/search/new/geo_sort.rs b/crates/milli/src/search/new/geo_sort.rs index 6c7d7b03b..b88badf07 100644 --- a/crates/milli/src/search/new/geo_sort.rs +++ b/crates/milli/src/search/new/geo_sort.rs @@ -7,7 +7,7 @@ use super::ranking_rules::{RankingRule, RankingRuleOutput, RankingRuleQueryTrait use crate::documents::geo_sort::{fill_cache, next_bucket}; use crate::documents::{GeoSortParameter, GeoSortStrategy}; use crate::score_details::{self, ScoreDetails}; -use crate::{GeoPoint, Result, SearchContext, SearchLogger}; +use crate::{GeoPoint, Result, SearchContext, SearchLogger, TimeBudget}; pub struct GeoSort { query: Option, @@ -84,6 +84,7 @@ impl<'ctx, Q: RankingRuleQueryTrait> RankingRule<'ctx, Q> for GeoSort { _logger: &mut dyn SearchLogger, universe: &RoaringBitmap, query: &Q, + _time_budget: &TimeBudget, ) -> Result<()> { assert!(self.query.is_none()); @@ -110,6 +111,7 @@ impl<'ctx, Q: RankingRuleQueryTrait> RankingRule<'ctx, Q> for GeoSort { ctx: &mut SearchContext<'ctx>, _logger: &mut dyn SearchLogger, universe: &RoaringBitmap, + _time_budget: &TimeBudget, ) -> Result>> { let query = self.query.as_ref().unwrap().clone(); diff --git a/crates/milli/src/search/new/graph_based_ranking_rule.rs b/crates/milli/src/search/new/graph_based_ranking_rule.rs index 67ac92e2c..f54fce8c3 100644 --- a/crates/milli/src/search/new/graph_based_ranking_rule.rs +++ b/crates/milli/src/search/new/graph_based_ranking_rule.rs @@ -53,7 +53,7 @@ use super::{QueryGraph, RankingRule, RankingRuleOutput, SearchContext}; use crate::score_details::Rank; use crate::search::new::query_term::LocatedQueryTermSubset; use crate::search::new::ranking_rule_graph::PathVisitor; -use crate::{Result, TermsMatchingStrategy}; +use crate::{Result, TermsMatchingStrategy, TimeBudget}; pub type Words = GraphBasedRankingRule; impl GraphBasedRankingRule { @@ -135,6 +135,7 @@ impl<'ctx, G: RankingRuleGraphTrait> RankingRule<'ctx, QueryGraph> for GraphBase _logger: &mut dyn SearchLogger, _universe: &RoaringBitmap, query_graph: &QueryGraph, + _time_budget: &TimeBudget, ) -> Result<()> { // the `next_max_cost` is the successor integer to the maximum cost of the paths in the graph. // @@ -217,6 +218,7 @@ impl<'ctx, G: RankingRuleGraphTrait> RankingRule<'ctx, QueryGraph> for GraphBase ctx: &mut SearchContext<'ctx>, logger: &mut dyn SearchLogger, universe: &RoaringBitmap, + _time_budget: &TimeBudget, ) -> Result>> { // Will crash if `next_bucket` is called before `start_iteration` or after `end_iteration`, // should never happen diff --git a/crates/milli/src/search/new/ranking_rules.rs b/crates/milli/src/search/new/ranking_rules.rs index f54a1b8db..4c4b15b3a 100644 --- a/crates/milli/src/search/new/ranking_rules.rs +++ b/crates/milli/src/search/new/ranking_rules.rs @@ -1,9 +1,11 @@ +use std::task::Poll; + use roaring::RoaringBitmap; use super::logger::SearchLogger; use super::{QueryGraph, SearchContext}; use crate::score_details::ScoreDetails; -use crate::Result; +use crate::{Result, TimeBudget}; /// An internal trait implemented by only [`PlaceholderQuery`] and [`QueryGraph`] pub trait RankingRuleQueryTrait: Sized + Clone + 'static {} @@ -28,12 +30,15 @@ pub trait RankingRule<'ctx, Query: RankingRuleQueryTrait> { /// buckets using [`next_bucket`](RankingRule::next_bucket). /// /// The given universe is the universe that will be given to [`next_bucket`](RankingRule::next_bucket). + /// + /// If this function may take a long time, it should check the `time_budget` and return early if exceeded. fn start_iteration( &mut self, ctx: &mut SearchContext<'ctx>, logger: &mut dyn SearchLogger, universe: &RoaringBitmap, query: &Query, + time_budget: &TimeBudget, ) -> Result<()>; /// Return the next bucket of this ranking rule. @@ -43,13 +48,31 @@ pub trait RankingRule<'ctx, Query: RankingRuleQueryTrait> { /// The universe given as argument is either: /// - a subset of the universe given to the previous call to [`next_bucket`](RankingRule::next_bucket); OR /// - the universe given to [`start_iteration`](RankingRule::start_iteration) + /// + /// If this function may take a long time, it should check the `time_budget` and return early if exceeded. fn next_bucket( &mut self, ctx: &mut SearchContext<'ctx>, logger: &mut dyn SearchLogger, universe: &RoaringBitmap, + time_budget: &TimeBudget, ) -> Result>>; + /// Return the next bucket of this ranking rule, if doing so can be done without blocking + /// + /// Even if the time budget is exceeded, when getting the next bucket is a fast operation, this should return `true` + /// to allow Meilisearch to collect the results. + /// + /// Default implementation conservatively returns that it would block. + fn non_blocking_next_bucket( + &mut self, + _ctx: &mut SearchContext<'ctx>, + _logger: &mut dyn SearchLogger, + _universe: &RoaringBitmap, + ) -> Result>> { + Ok(Poll::Pending) + } + /// Finish iterating over the buckets, which yields control to the parent ranking rule /// The next call to this ranking rule, if any, will be [`start_iteration`](RankingRule::start_iteration). fn end_iteration( diff --git a/crates/milli/src/search/new/sort.rs b/crates/milli/src/search/new/sort.rs index cefa2c426..f1564d468 100644 --- a/crates/milli/src/search/new/sort.rs +++ b/crates/milli/src/search/new/sort.rs @@ -7,7 +7,7 @@ use crate::heed_codec::facet::{FacetGroupKeyCodec, OrderedF64Codec}; use crate::heed_codec::{BytesRefCodec, StrRefCodec}; use crate::score_details::{self, ScoreDetails}; use crate::search::facet::{ascending_facet_sort, descending_facet_sort}; -use crate::{FieldId, Index, Result}; +use crate::{FieldId, Index, Result, TimeBudget}; pub trait RankingRuleOutputIter<'ctx, Query> { fn next_bucket(&mut self) -> Result>>; @@ -96,6 +96,7 @@ impl<'ctx, Query: RankingRuleQueryTrait> RankingRule<'ctx, Query> for Sort<'ctx, _logger: &mut dyn SearchLogger, parent_candidates: &RoaringBitmap, parent_query: &Query, + _time_budget: &TimeBudget, ) -> Result<()> { let iter: RankingRuleOutputIterWrapper<'ctx, Query> = match self.field_id { Some(field_id) => { @@ -194,6 +195,7 @@ impl<'ctx, Query: RankingRuleQueryTrait> RankingRule<'ctx, Query> for Sort<'ctx, _ctx: &mut SearchContext<'ctx>, _logger: &mut dyn SearchLogger, universe: &RoaringBitmap, + _time_budget: &TimeBudget, ) -> Result>> { let iter = self.iter.as_mut().unwrap(); if let Some(mut bucket) = iter.next_bucket()? { diff --git a/crates/milli/src/search/new/tests/cutoff.rs b/crates/milli/src/search/new/tests/cutoff.rs index f2dfb45d6..6b436c212 100644 --- a/crates/milli/src/search/new/tests/cutoff.rs +++ b/crates/milli/src/search/new/tests/cutoff.rs @@ -3,12 +3,17 @@ //! 2. A test that ensure the filters are affectively applied even with a cutoff of 0 //! 3. A test that ensure the cutoff works well with the ranking scores +use std::collections::BTreeMap; +use std::sync::Arc; use std::time::Duration; use meili_snap::snapshot; use crate::index::tests::TempIndex; use crate::score_details::{ScoreDetails, ScoringStrategy}; +use crate::update::Setting; +use crate::vector::settings::EmbeddingSettings; +use crate::vector::{Embedder, EmbedderOptions}; use crate::{Criterion, Filter, FilterableAttributesRule, Search, TimeBudget}; fn create_index() -> TempIndex { @@ -361,9 +366,8 @@ fn degraded_search_and_score_details() { ] "###); - // After SIX loop iteration. The words ranking rule gave us a new bucket. - // Since we reached the limit we were able to early exit without checking the typo ranking rule. - search.time_budget(TimeBudget::max().with_stop_after(6)); + // After FIVE loop iterations. The words ranking rule gave us a new bucket. + search.time_budget(TimeBudget::max().with_stop_after(5)); let result = search.execute().unwrap(); snapshot!(format!("IDs: {:?}\nScores: {}\nScore Details:\n{:#?}", result.documents_ids, result.document_scores.iter().map(|scores| format!("{:.4} ", ScoreDetails::global_score(scores.iter()))).collect::(), result.document_scores), @r###" @@ -424,4 +428,399 @@ fn degraded_search_and_score_details() { ], ] "###); + + // After SIX loop iterations. + // we finished + search.time_budget(TimeBudget::max().with_stop_after(6)); + + let result = search.execute().unwrap(); + snapshot!(format!("IDs: {:?}\nScores: {}\nScore Details:\n{:#?}", result.documents_ids, result.document_scores.iter().map(|scores| format!("{:.4} ", ScoreDetails::global_score(scores.iter()))).collect::(), result.document_scores), @r###" + IDs: [4, 1, 0, 3] + Scores: 1.0000 0.9167 0.8333 0.6667 + Score Details: + [ + [ + Words( + Words { + matching_words: 3, + max_matching_words: 3, + }, + ), + Typo( + Typo { + typo_count: 0, + max_typo_count: 3, + }, + ), + ], + [ + Words( + Words { + matching_words: 3, + max_matching_words: 3, + }, + ), + Typo( + Typo { + typo_count: 1, + max_typo_count: 3, + }, + ), + ], + [ + Words( + Words { + matching_words: 3, + max_matching_words: 3, + }, + ), + Typo( + Typo { + typo_count: 2, + max_typo_count: 3, + }, + ), + ], + [ + Words( + Words { + matching_words: 2, + max_matching_words: 3, + }, + ), + Typo( + Typo { + typo_count: 0, + max_typo_count: 2, + }, + ), + ], + ] + "###); +} + +#[test] +fn degraded_search_and_score_details_vector() { + let index = create_index(); + + index + .add_documents(documents!([ + { + "id": 4, + "text": "hella puppo kefir", + "_vectors": { + "default": [0.1, 0.1] + } + }, + { + "id": 3, + "text": "hella puppy kefir", + "_vectors": { + "default": [-0.1, 0.1] + } + }, + { + "id": 2, + "text": "hello", + "_vectors": { + "default": [0.1, -0.1] + } + }, + { + "id": 1, + "text": "hello puppy", + "_vectors": { + "default": [-0.1, -0.1] + } + }, + { + "id": 0, + "text": "hello puppy kefir", + "_vectors": { + "default": null + } + }, + ])) + .unwrap(); + + index + .update_settings(|settings| { + let mut embedders = BTreeMap::new(); + embedders.insert( + "default".into(), + Setting::Set(EmbeddingSettings { + source: Setting::Set(crate::vector::settings::EmbedderSource::UserProvided), + dimensions: Setting::Set(2), + ..Default::default() + }), + ); + settings.set_embedder_settings(embedders); + settings.set_vector_store(crate::vector::VectorStoreBackend::Hannoy); + }) + .unwrap(); + + let rtxn = index.read_txn().unwrap(); + let mut search = Search::new(&rtxn, &index); + + let embedder = Arc::new( + Embedder::new( + EmbedderOptions::UserProvided(crate::vector::embedder::manual::EmbedderOptions { + dimensions: 2, + distribution: None, + }), + 0, + ) + .unwrap(), + ); + + search.semantic("default".into(), embedder, false, Some(vec![1., -1.]), None); + + search.limit(4); + search.scoring_strategy(ScoringStrategy::Detailed); + search.time_budget(TimeBudget::max()); + + let result = search.execute().unwrap(); + snapshot!(format!("IDs: {:?}\nScores: {}\nScore Details:\n{:#?}", result.documents_ids, result.document_scores.iter().map(|scores| format!("{:.4} ", ScoreDetails::global_score(scores.iter()))).collect::(), result.document_scores), @r###" + IDs: [2, 0, 3, 1] + Scores: 1.0000 0.5000 0.5000 0.0000 + Score Details: + [ + [ + Vector( + Vector { + similarity: Some( + 1.0, + ), + }, + ), + ], + [ + Vector( + Vector { + similarity: Some( + 0.5, + ), + }, + ), + ], + [ + Vector( + Vector { + similarity: Some( + 0.5, + ), + }, + ), + ], + [ + Vector( + Vector { + similarity: Some( + 0.0, + ), + }, + ), + ], + ] + "###); + + // Do ONE loop iteration. Not much can be deduced, almost everyone matched the words first bucket. + search.time_budget(TimeBudget::max().with_stop_after(1)); + + let result = search.execute().unwrap(); + snapshot!(format!("IDs: {:?}\nScores: {}\nScore Details:\n{:#?}", result.documents_ids, result.document_scores.iter().map(|scores| format!("{:.4} ", ScoreDetails::global_score(scores.iter()))).collect::(), result.document_scores), @r###" + IDs: [0, 1, 2, 3] + Scores: 0.5000 0.0000 0.0000 0.0000 + Score Details: + [ + [ + Vector( + Vector { + similarity: Some( + 0.5, + ), + }, + ), + ], + [ + Skipped, + ], + [ + Skipped, + ], + [ + Skipped, + ], + ] + "###); + + search.time_budget(TimeBudget::max().with_stop_after(2)); + + let result = search.execute().unwrap(); + snapshot!(format!("IDs: {:?}\nScores: {}\nScore Details:\n{:#?}", result.documents_ids, result.document_scores.iter().map(|scores| format!("{:.4} ", ScoreDetails::global_score(scores.iter()))).collect::(), result.document_scores), @r###" + IDs: [0, 1, 2, 3] + Scores: 0.5000 0.0000 0.0000 0.0000 + Score Details: + [ + [ + Vector( + Vector { + similarity: Some( + 0.5, + ), + }, + ), + ], + [ + Vector( + Vector { + similarity: Some( + 0.0, + ), + }, + ), + ], + [ + Skipped, + ], + [ + Skipped, + ], + ] + "###); + + search.time_budget(TimeBudget::max().with_stop_after(3)); + + let result = search.execute().unwrap(); + snapshot!(format!("IDs: {:?}\nScores: {}\nScore Details:\n{:#?}", result.documents_ids, result.document_scores.iter().map(|scores| format!("{:.4} ", ScoreDetails::global_score(scores.iter()))).collect::(), result.document_scores), @r###" + IDs: [2, 0, 1, 3] + Scores: 1.0000 0.5000 0.0000 0.0000 + Score Details: + [ + [ + Vector( + Vector { + similarity: Some( + 1.0, + ), + }, + ), + ], + [ + Vector( + Vector { + similarity: Some( + 0.5, + ), + }, + ), + ], + [ + Vector( + Vector { + similarity: Some( + 0.0, + ), + }, + ), + ], + [ + Skipped, + ], + ] + "###); + + search.time_budget(TimeBudget::max().with_stop_after(4)); + + let result = search.execute().unwrap(); + snapshot!(format!("IDs: {:?}\nScores: {}\nScore Details:\n{:#?}", result.documents_ids, result.document_scores.iter().map(|scores| format!("{:.4} ", ScoreDetails::global_score(scores.iter()))).collect::(), result.document_scores), @r###" + IDs: [2, 0, 3, 1] + Scores: 1.0000 0.5000 0.5000 0.0000 + Score Details: + [ + [ + Vector( + Vector { + similarity: Some( + 1.0, + ), + }, + ), + ], + [ + Vector( + Vector { + similarity: Some( + 0.5, + ), + }, + ), + ], + [ + Vector( + Vector { + similarity: Some( + 0.5, + ), + }, + ), + ], + [ + Vector( + Vector { + similarity: Some( + 0.0, + ), + }, + ), + ], + ] + "###); + + search.time_budget(TimeBudget::max().with_stop_after(5)); + + let result = search.execute().unwrap(); + snapshot!(format!("IDs: {:?}\nScores: {}\nScore Details:\n{:#?}", result.documents_ids, result.document_scores.iter().map(|scores| format!("{:.4} ", ScoreDetails::global_score(scores.iter()))).collect::(), result.document_scores), @r###" + IDs: [2, 0, 3, 1] + Scores: 1.0000 0.5000 0.5000 0.0000 + Score Details: + [ + [ + Vector( + Vector { + similarity: Some( + 1.0, + ), + }, + ), + ], + [ + Vector( + Vector { + similarity: Some( + 0.5, + ), + }, + ), + ], + [ + Vector( + Vector { + similarity: Some( + 0.5, + ), + }, + ), + ], + [ + Vector( + Vector { + similarity: Some( + 0.0, + ), + }, + ), + ], + ] + "###); } diff --git a/crates/milli/src/search/new/vector_sort.rs b/crates/milli/src/search/new/vector_sort.rs index 5da4c7145..e652e9dc1 100644 --- a/crates/milli/src/search/new/vector_sort.rs +++ b/crates/milli/src/search/new/vector_sort.rs @@ -1,4 +1,5 @@ use std::iter::FromIterator; +use std::task::Poll; use std::time::Instant; use roaring::RoaringBitmap; @@ -7,7 +8,7 @@ use super::ranking_rules::{RankingRule, RankingRuleOutput, RankingRuleQueryTrait use super::VectorStoreStats; use crate::score_details::{self, ScoreDetails}; use crate::vector::{DistributionShift, Embedder, VectorStore}; -use crate::{DocumentId, Result, SearchContext, SearchLogger}; +use crate::{DocumentId, Result, SearchContext, SearchLogger, TimeBudget}; pub struct VectorSort { query: Option, @@ -52,6 +53,7 @@ impl VectorSort { &mut self, ctx: &mut SearchContext<'_>, vector_candidates: &RoaringBitmap, + time_budget: &TimeBudget, ) -> Result<()> { let target = &self.target; let backend = ctx.index.get_vector_store(ctx.txn)?.unwrap_or_default(); @@ -59,7 +61,13 @@ impl VectorSort { let before = Instant::now(); let reader = VectorStore::new(backend, ctx.index.vector_store, self.embedder_index, self.quantized); - let results = reader.nns_by_vector(ctx.txn, target, self.limit, Some(vector_candidates))?; + let results = reader.nns_by_vector( + ctx.txn, + target, + self.limit, + Some(vector_candidates), + time_budget, + )?; self.cached_sorted_docids = results.into_iter(); *ctx.vector_store_stats.get_or_insert_default() += VectorStoreStats { total_time: before.elapsed(), @@ -69,6 +77,20 @@ impl VectorSort { Ok(()) } + + fn next_result(&mut self, vector_candidates: &RoaringBitmap) -> Option<(DocumentId, f32)> { + for (docid, distance) in self.cached_sorted_docids.by_ref() { + if vector_candidates.contains(docid) { + let score = 1.0 - distance; + let score = self + .distribution_shift + .map(|distribution| distribution.shift(score)) + .unwrap_or(score); + return Some((docid, score)); + } + } + None + } } impl<'ctx, Q: RankingRuleQueryTrait> RankingRule<'ctx, Q> for VectorSort { @@ -83,12 +105,13 @@ impl<'ctx, Q: RankingRuleQueryTrait> RankingRule<'ctx, Q> for VectorSort { _logger: &mut dyn SearchLogger, universe: &RoaringBitmap, query: &Q, + time_budget: &TimeBudget, ) -> Result<()> { assert!(self.query.is_none()); self.query = Some(query.clone()); let vector_candidates = &self.vector_candidates & universe; - self.fill_buffer(ctx, &vector_candidates)?; + self.fill_buffer(ctx, &vector_candidates, time_budget)?; Ok(()) } @@ -99,6 +122,7 @@ impl<'ctx, Q: RankingRuleQueryTrait> RankingRule<'ctx, Q> for VectorSort { ctx: &mut SearchContext<'ctx>, _logger: &mut dyn SearchLogger, universe: &RoaringBitmap, + time_budget: &TimeBudget, ) -> Result>> { let query = self.query.as_ref().unwrap().clone(); let vector_candidates = &self.vector_candidates & universe; @@ -111,24 +135,17 @@ impl<'ctx, Q: RankingRuleQueryTrait> RankingRule<'ctx, Q> for VectorSort { })); } - for (docid, distance) in self.cached_sorted_docids.by_ref() { - if vector_candidates.contains(docid) { - let score = 1.0 - distance; - let score = self - .distribution_shift - .map(|distribution| distribution.shift(score)) - .unwrap_or(score); - return Ok(Some(RankingRuleOutput { - query, - candidates: RoaringBitmap::from_iter([docid]), - score: ScoreDetails::Vector(score_details::Vector { similarity: Some(score) }), - })); - } + if let Some((docid, score)) = self.next_result(&vector_candidates) { + return Ok(Some(RankingRuleOutput { + query, + candidates: RoaringBitmap::from_iter([docid]), + score: ScoreDetails::Vector(score_details::Vector { similarity: Some(score) }), + })); } // if we got out of this loop it means we've exhausted our cache. // we need to refill it and run the function again. - self.fill_buffer(ctx, &vector_candidates)?; + self.fill_buffer(ctx, &vector_candidates, time_budget)?; // we tried filling the buffer, but it remained empty 😢 // it means we don't actually have any document remaining in the universe with a vector. @@ -141,11 +158,39 @@ impl<'ctx, Q: RankingRuleQueryTrait> RankingRule<'ctx, Q> for VectorSort { })); } - self.next_bucket(ctx, _logger, universe) + self.next_bucket(ctx, _logger, universe, time_budget) } #[tracing::instrument(level = "trace", skip_all, target = "search::vector_sort")] fn end_iteration(&mut self, _ctx: &mut SearchContext<'ctx>, _logger: &mut dyn SearchLogger) { self.query = None; } + + fn non_blocking_next_bucket( + &mut self, + _ctx: &mut SearchContext<'ctx>, + _logger: &mut dyn SearchLogger, + universe: &RoaringBitmap, + ) -> Result>> { + let query = self.query.as_ref().unwrap().clone(); + let vector_candidates = &self.vector_candidates & universe; + + if vector_candidates.is_empty() { + return Ok(Poll::Ready(RankingRuleOutput { + query, + candidates: universe.clone(), + score: ScoreDetails::Vector(score_details::Vector { similarity: None }), + })); + } + + if let Some((docid, score)) = self.next_result(&vector_candidates) { + Ok(Poll::Ready(RankingRuleOutput { + query, + candidates: RoaringBitmap::from_iter([docid]), + score: ScoreDetails::Vector(score_details::Vector { similarity: Some(score) }), + })) + } else { + Ok(Poll::Pending) + } + } } diff --git a/crates/milli/src/vector/store.rs b/crates/milli/src/vector/store.rs index 76bd6ecfe..100571e85 100644 --- a/crates/milli/src/vector/store.rs +++ b/crates/milli/src/vector/store.rs @@ -8,6 +8,7 @@ use serde::{Deserialize, Serialize}; use crate::progress::Progress; use crate::vector::Embeddings; +use crate::TimeBudget; const HANNOY_EF_CONSTRUCTION: usize = 125; const HANNOY_M: usize = 16; @@ -591,6 +592,7 @@ impl VectorStore { vector: &[f32], limit: usize, filter: Option<&RoaringBitmap>, + time_budget: &TimeBudget, ) -> crate::Result> { if self.backend == VectorStoreBackend::Arroy { if self.quantized { @@ -601,11 +603,25 @@ impl VectorStore { .map_err(Into::into) } } else if self.quantized { - self._hannoy_nns_by_vector(rtxn, self._hannoy_quantized_db(), vector, limit, filter) - .map_err(Into::into) + self._hannoy_nns_by_vector( + rtxn, + self._hannoy_quantized_db(), + vector, + limit, + filter, + time_budget, + ) + .map_err(Into::into) } else { - self._hannoy_nns_by_vector(rtxn, self._hannoy_angular_db(), vector, limit, filter) - .map_err(Into::into) + self._hannoy_nns_by_vector( + rtxn, + self._hannoy_angular_db(), + vector, + limit, + filter, + time_budget, + ) + .map_err(Into::into) } } pub fn item_vectors(&self, rtxn: &RoTxn, item_id: u32) -> crate::Result>> { @@ -1000,6 +1016,7 @@ impl VectorStore { vector: &[f32], limit: usize, filter: Option<&RoaringBitmap>, + time_budget: &TimeBudget, ) -> Result, hannoy::Error> { let mut results = Vec::new(); @@ -1011,7 +1028,10 @@ impl VectorStore { searcher.candidates(filter); } - results.append(&mut searcher.by_vector(rtxn, vector)?); + let (res, _degraded) = + &mut searcher + .by_vector_with_cancellation(rtxn, vector, || time_budget.exceeded())?; + results.append(res); } results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance));