From 918a6eaec91d38a88c3f7c822b8e62102059f59f Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Wed, 15 Oct 2025 15:36:06 +0200 Subject: [PATCH] Implement for vector store ranking rule --- crates/milli/src/search/new/vector_sort.rs | 81 +++++++++++++++++----- crates/milli/src/vector/store.rs | 30 ++++++-- 2 files changed, 88 insertions(+), 23 deletions(-) diff --git a/crates/milli/src/search/new/vector_sort.rs b/crates/milli/src/search/new/vector_sort.rs index 5da4c7145..e652e9dc1 100644 --- a/crates/milli/src/search/new/vector_sort.rs +++ b/crates/milli/src/search/new/vector_sort.rs @@ -1,4 +1,5 @@ use std::iter::FromIterator; +use std::task::Poll; use std::time::Instant; use roaring::RoaringBitmap; @@ -7,7 +8,7 @@ use super::ranking_rules::{RankingRule, RankingRuleOutput, RankingRuleQueryTrait use super::VectorStoreStats; use crate::score_details::{self, ScoreDetails}; use crate::vector::{DistributionShift, Embedder, VectorStore}; -use crate::{DocumentId, Result, SearchContext, SearchLogger}; +use crate::{DocumentId, Result, SearchContext, SearchLogger, TimeBudget}; pub struct VectorSort { query: Option, @@ -52,6 +53,7 @@ impl VectorSort { &mut self, ctx: &mut SearchContext<'_>, vector_candidates: &RoaringBitmap, + time_budget: &TimeBudget, ) -> Result<()> { let target = &self.target; let backend = ctx.index.get_vector_store(ctx.txn)?.unwrap_or_default(); @@ -59,7 +61,13 @@ impl VectorSort { let before = Instant::now(); let reader = VectorStore::new(backend, ctx.index.vector_store, self.embedder_index, self.quantized); - let results = reader.nns_by_vector(ctx.txn, target, self.limit, Some(vector_candidates))?; + let results = reader.nns_by_vector( + ctx.txn, + target, + self.limit, + Some(vector_candidates), + time_budget, + )?; self.cached_sorted_docids = results.into_iter(); *ctx.vector_store_stats.get_or_insert_default() += VectorStoreStats { total_time: before.elapsed(), @@ -69,6 +77,20 @@ impl VectorSort { Ok(()) } + + fn next_result(&mut self, vector_candidates: &RoaringBitmap) -> Option<(DocumentId, f32)> { + for (docid, distance) in self.cached_sorted_docids.by_ref() { + if vector_candidates.contains(docid) { + let score = 1.0 - distance; + let score = self + .distribution_shift + .map(|distribution| distribution.shift(score)) + .unwrap_or(score); + return Some((docid, score)); + } + } + None + } } impl<'ctx, Q: RankingRuleQueryTrait> RankingRule<'ctx, Q> for VectorSort { @@ -83,12 +105,13 @@ impl<'ctx, Q: RankingRuleQueryTrait> RankingRule<'ctx, Q> for VectorSort { _logger: &mut dyn SearchLogger, universe: &RoaringBitmap, query: &Q, + time_budget: &TimeBudget, ) -> Result<()> { assert!(self.query.is_none()); self.query = Some(query.clone()); let vector_candidates = &self.vector_candidates & universe; - self.fill_buffer(ctx, &vector_candidates)?; + self.fill_buffer(ctx, &vector_candidates, time_budget)?; Ok(()) } @@ -99,6 +122,7 @@ impl<'ctx, Q: RankingRuleQueryTrait> RankingRule<'ctx, Q> for VectorSort { ctx: &mut SearchContext<'ctx>, _logger: &mut dyn SearchLogger, universe: &RoaringBitmap, + time_budget: &TimeBudget, ) -> Result>> { let query = self.query.as_ref().unwrap().clone(); let vector_candidates = &self.vector_candidates & universe; @@ -111,24 +135,17 @@ impl<'ctx, Q: RankingRuleQueryTrait> RankingRule<'ctx, Q> for VectorSort { })); } - for (docid, distance) in self.cached_sorted_docids.by_ref() { - if vector_candidates.contains(docid) { - let score = 1.0 - distance; - let score = self - .distribution_shift - .map(|distribution| distribution.shift(score)) - .unwrap_or(score); - return Ok(Some(RankingRuleOutput { - query, - candidates: RoaringBitmap::from_iter([docid]), - score: ScoreDetails::Vector(score_details::Vector { similarity: Some(score) }), - })); - } + if let Some((docid, score)) = self.next_result(&vector_candidates) { + return Ok(Some(RankingRuleOutput { + query, + candidates: RoaringBitmap::from_iter([docid]), + score: ScoreDetails::Vector(score_details::Vector { similarity: Some(score) }), + })); } // if we got out of this loop it means we've exhausted our cache. // we need to refill it and run the function again. - self.fill_buffer(ctx, &vector_candidates)?; + self.fill_buffer(ctx, &vector_candidates, time_budget)?; // we tried filling the buffer, but it remained empty 😢 // it means we don't actually have any document remaining in the universe with a vector. @@ -141,11 +158,39 @@ impl<'ctx, Q: RankingRuleQueryTrait> RankingRule<'ctx, Q> for VectorSort { })); } - self.next_bucket(ctx, _logger, universe) + self.next_bucket(ctx, _logger, universe, time_budget) } #[tracing::instrument(level = "trace", skip_all, target = "search::vector_sort")] fn end_iteration(&mut self, _ctx: &mut SearchContext<'ctx>, _logger: &mut dyn SearchLogger) { self.query = None; } + + fn non_blocking_next_bucket( + &mut self, + _ctx: &mut SearchContext<'ctx>, + _logger: &mut dyn SearchLogger, + universe: &RoaringBitmap, + ) -> Result>> { + let query = self.query.as_ref().unwrap().clone(); + let vector_candidates = &self.vector_candidates & universe; + + if vector_candidates.is_empty() { + return Ok(Poll::Ready(RankingRuleOutput { + query, + candidates: universe.clone(), + score: ScoreDetails::Vector(score_details::Vector { similarity: None }), + })); + } + + if let Some((docid, score)) = self.next_result(&vector_candidates) { + Ok(Poll::Ready(RankingRuleOutput { + query, + candidates: RoaringBitmap::from_iter([docid]), + score: ScoreDetails::Vector(score_details::Vector { similarity: Some(score) }), + })) + } else { + Ok(Poll::Pending) + } + } } diff --git a/crates/milli/src/vector/store.rs b/crates/milli/src/vector/store.rs index 76bd6ecfe..100571e85 100644 --- a/crates/milli/src/vector/store.rs +++ b/crates/milli/src/vector/store.rs @@ -8,6 +8,7 @@ use serde::{Deserialize, Serialize}; use crate::progress::Progress; use crate::vector::Embeddings; +use crate::TimeBudget; const HANNOY_EF_CONSTRUCTION: usize = 125; const HANNOY_M: usize = 16; @@ -591,6 +592,7 @@ impl VectorStore { vector: &[f32], limit: usize, filter: Option<&RoaringBitmap>, + time_budget: &TimeBudget, ) -> crate::Result> { if self.backend == VectorStoreBackend::Arroy { if self.quantized { @@ -601,11 +603,25 @@ impl VectorStore { .map_err(Into::into) } } else if self.quantized { - self._hannoy_nns_by_vector(rtxn, self._hannoy_quantized_db(), vector, limit, filter) - .map_err(Into::into) + self._hannoy_nns_by_vector( + rtxn, + self._hannoy_quantized_db(), + vector, + limit, + filter, + time_budget, + ) + .map_err(Into::into) } else { - self._hannoy_nns_by_vector(rtxn, self._hannoy_angular_db(), vector, limit, filter) - .map_err(Into::into) + self._hannoy_nns_by_vector( + rtxn, + self._hannoy_angular_db(), + vector, + limit, + filter, + time_budget, + ) + .map_err(Into::into) } } pub fn item_vectors(&self, rtxn: &RoTxn, item_id: u32) -> crate::Result>> { @@ -1000,6 +1016,7 @@ impl VectorStore { vector: &[f32], limit: usize, filter: Option<&RoaringBitmap>, + time_budget: &TimeBudget, ) -> Result, hannoy::Error> { let mut results = Vec::new(); @@ -1011,7 +1028,10 @@ impl VectorStore { searcher.candidates(filter); } - results.append(&mut searcher.by_vector(rtxn, vector)?); + let (res, _degraded) = + &mut searcher + .by_vector_with_cancellation(rtxn, vector, || time_budget.exceeded())?; + results.append(res); } results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance));