From b5f0c194064836340157ff635b59271daa71e915 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Tue, 26 Aug 2025 16:32:17 +0200 Subject: [PATCH] Split the `vector` module in submodules --- crates/dump/src/reader/v6/mod.rs | 2 +- crates/milli/src/error.rs | 2 +- .../src/update/new/extract/vectors/mod.rs | 2 +- crates/milli/src/update/settings.rs | 50 +- crates/milli/src/vector/distribution.rs | 128 ++ .../src/vector/{ => embedder}/composite.rs | 12 +- crates/milli/src/vector/{ => embedder}/hf.rs | 5 +- .../milli/src/vector/{ => embedder}/manual.rs | 5 +- crates/milli/src/vector/embedder/mod.rs | 381 +++++ .../milli/src/vector/{ => embedder}/ollama.rs | 8 +- .../milli/src/vector/{ => embedder}/openai.rs | 7 +- .../milli/src/vector/{ => embedder}/rest.rs | 12 +- crates/milli/src/vector/embeddings.rs | 76 + crates/milli/src/vector/error.rs | 4 +- crates/milli/src/vector/mod.rs | 1446 +---------------- crates/milli/src/vector/runtime.rs | 81 + crates/milli/src/vector/session.rs | 3 +- crates/milli/src/vector/settings.rs | 121 +- crates/milli/src/vector/store.rs | 775 +++++++++ 19 files changed, 1557 insertions(+), 1563 deletions(-) create mode 100644 crates/milli/src/vector/distribution.rs rename crates/milli/src/vector/{ => embedder}/composite.rs (97%) rename crates/milli/src/vector/{ => embedder}/hf.rs (98%) rename crates/milli/src/vector/{ => embedder}/manual.rs (93%) create mode 100644 crates/milli/src/vector/embedder/mod.rs rename crates/milli/src/vector/{ => embedder}/ollama.rs (96%) rename crates/milli/src/vector/{ => embedder}/openai.rs (98%) rename crates/milli/src/vector/{ => embedder}/rest.rs (98%) create mode 100644 crates/milli/src/vector/embeddings.rs create mode 100644 crates/milli/src/vector/runtime.rs create mode 100644 crates/milli/src/vector/store.rs diff --git a/crates/dump/src/reader/v6/mod.rs b/crates/dump/src/reader/v6/mod.rs index 75ff2ebe6..b5549ec65 100644 --- a/crates/dump/src/reader/v6/mod.rs +++ b/crates/dump/src/reader/v6/mod.rs @@ -4,7 +4,7 @@ use std::io::{BufRead, BufReader, ErrorKind}; use std::path::Path; pub use meilisearch_types::milli; -use meilisearch_types::milli::vector::hf::OverridePooling; +use meilisearch_types::milli::vector::embedder::hf::OverridePooling; use tempfile::TempDir; use time::OffsetDateTime; use tracing::debug; diff --git a/crates/milli/src/error.rs b/crates/milli/src/error.rs index 787f42753..11d7756c1 100644 --- a/crates/milli/src/error.rs +++ b/crates/milli/src/error.rs @@ -355,7 +355,7 @@ and can not be more than 511 bytes.", .document_id.to_string() context: crate::vector::settings::NestingContext, field: crate::vector::settings::MetaEmbeddingSetting, }, - #[error("`.embedders.{embedder_name}.model`: Invalid model `{model}` for OpenAI. Supported models: {:?}", crate::vector::openai::EmbeddingModel::supported_models())] + #[error("`.embedders.{embedder_name}.model`: Invalid model `{model}` for OpenAI. Supported models: {:?}", crate::vector::embedder::openai::EmbeddingModel::supported_models())] InvalidOpenAiModel { embedder_name: String, model: String }, #[error("`.embedders.{embedder_name}`: Missing field `{field}` (note: this field is mandatory for source `{source_}`)")] MissingFieldForSource { diff --git a/crates/milli/src/update/new/extract/vectors/mod.rs b/crates/milli/src/update/new/extract/vectors/mod.rs index 71fa9bf09..f147de360 100644 --- a/crates/milli/src/update/new/extract/vectors/mod.rs +++ b/crates/milli/src/update/new/extract/vectors/mod.rs @@ -475,7 +475,7 @@ impl<'doc> OnEmbed<'doc> for OnEmbeddingDocumentUpdates<'doc, '_> { } fn process_embedding_error( &mut self, - error: crate::vector::hf::EmbedError, + error: crate::vector::error::EmbedError, embedder_name: &'doc str, unused_vectors_distribution: &UnusedVectorsDistributionBump, metadata: BVec<'doc, Metadata<'doc>>, diff --git a/crates/milli/src/update/settings.rs b/crates/milli/src/update/settings.rs index bca8fbc59..5530ae718 100644 --- a/crates/milli/src/update/settings.rs +++ b/crates/milli/src/update/settings.rs @@ -33,6 +33,7 @@ use crate::update::index_documents::IndexDocumentsMethod; use crate::update::new::indexer::reindex; use crate::update::{IndexDocuments, UpdateIndexingStep}; use crate::vector::db::{FragmentConfigs, IndexEmbeddingConfig}; +use crate::vector::embedder::{openai, rest}; use crate::vector::json_template::JsonTemplate; use crate::vector::settings::{ EmbedderAction, EmbedderSource, EmbeddingSettings, EmbeddingValidationContext, NestingContext, @@ -2208,39 +2209,29 @@ pub fn validate_embedding_settings( if let Some(request) = request.as_ref().set() { let request = match with_fragments { WithFragments::Yes { indexing_fragments, search_fragments } => { - crate::vector::rest::RequestData::new( - request.to_owned(), - indexing_fragments, - search_fragments, - ) - .map_err(|error| crate::UserError::VectorEmbeddingError(error.into())) + rest::RequestData::new(request.to_owned(), indexing_fragments, search_fragments) + .map_err(|error| crate::UserError::VectorEmbeddingError(error.into())) + } + WithFragments::No => { + rest::RequestData::new(request.to_owned(), Default::default(), Default::default()) + .map_err(|error| crate::UserError::VectorEmbeddingError(error.into())) } - WithFragments::No => crate::vector::rest::RequestData::new( - request.to_owned(), - Default::default(), - Default::default(), - ) - .map_err(|error| crate::UserError::VectorEmbeddingError(error.into())), WithFragments::Maybe => { let mut indexing_fragments = BTreeMap::new(); indexing_fragments.insert("test".to_string(), serde_json::json!("test")); - crate::vector::rest::RequestData::new( - request.to_owned(), - indexing_fragments, - Default::default(), - ) - .or_else(|_| { - crate::vector::rest::RequestData::new( - request.to_owned(), - Default::default(), - Default::default(), - ) - }) - .map_err(|error| crate::UserError::VectorEmbeddingError(error.into())) + rest::RequestData::new(request.to_owned(), indexing_fragments, Default::default()) + .or_else(|_| { + rest::RequestData::new( + request.to_owned(), + Default::default(), + Default::default(), + ) + }) + .map_err(|error| crate::UserError::VectorEmbeddingError(error.into())) } }?; if let Some(response) = response.as_ref().set() { - crate::vector::rest::Response::new(response.to_owned(), &request) + rest::Response::new(response.to_owned(), &request) .map_err(|error| crate::UserError::VectorEmbeddingError(error.into()))?; } } @@ -2293,11 +2284,12 @@ pub fn validate_embedding_settings( match inferred_source { EmbedderSource::OpenAi => { if let Setting::Set(model) = &model { - let model = crate::vector::openai::EmbeddingModel::from_name(model.as_str()) - .ok_or(crate::error::UserError::InvalidOpenAiModel { + let model = openai::EmbeddingModel::from_name(model.as_str()).ok_or( + crate::error::UserError::InvalidOpenAiModel { embedder_name: name.to_owned(), model: model.clone(), - })?; + }, + )?; if let Setting::Set(dimensions) = dimensions { if !model.supports_overriding_dimensions() && dimensions != model.default_dimensions() diff --git a/crates/milli/src/vector/distribution.rs b/crates/milli/src/vector/distribution.rs new file mode 100644 index 000000000..b17ad9204 --- /dev/null +++ b/crates/milli/src/vector/distribution.rs @@ -0,0 +1,128 @@ +use deserr::{DeserializeError, Deserr}; +use ordered_float::OrderedFloat; +use serde::{Deserialize, Serialize}; +use utoipa::ToSchema; + +/// Describes the mean and sigma of distribution of embedding similarity in the embedding space. +/// +/// The intended use is to make the similarity score more comparable to the regular ranking score. +/// This allows to correct effects where results are too "packed" around a certain value. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Deserialize, Serialize, ToSchema)] +#[serde(from = "DistributionShiftSerializable")] +#[serde(into = "DistributionShiftSerializable")] +pub struct DistributionShift { + /// Value where the results are "packed". + /// + /// Similarity scores are translated so that they are packed around 0.5 instead + #[schema(value_type = f32)] + pub current_mean: OrderedFloat, + + /// standard deviation of a similarity score. + /// + /// Set below 0.4 to make the results less packed around the mean, and above 0.4 to make them more packed. + #[schema(value_type = f32)] + pub current_sigma: OrderedFloat, +} + +impl Deserr for DistributionShift +where + E: DeserializeError, +{ + fn deserialize_from_value( + value: deserr::Value, + location: deserr::ValuePointerRef<'_>, + ) -> Result { + let value = DistributionShiftSerializable::deserialize_from_value(value, location)?; + if value.mean < 0. || value.mean > 1. { + return Err(deserr::take_cf_content(E::error::( + None, + deserr::ErrorKind::Unexpected { + msg: format!( + "the distribution mean must be in the range [0, 1], got {}", + value.mean + ), + }, + location, + ))); + } + if value.sigma <= 0. || value.sigma > 1. { + return Err(deserr::take_cf_content(E::error::( + None, + deserr::ErrorKind::Unexpected { + msg: format!( + "the distribution sigma must be in the range ]0, 1], got {}", + value.sigma + ), + }, + location, + ))); + } + + Ok(value.into()) + } +} + +#[derive(Serialize, Deserialize, Deserr)] +#[serde(deny_unknown_fields)] +#[deserr(deny_unknown_fields)] +struct DistributionShiftSerializable { + mean: f32, + sigma: f32, +} + +impl From for DistributionShiftSerializable { + fn from( + DistributionShift { + current_mean: OrderedFloat(current_mean), + current_sigma: OrderedFloat(current_sigma), + }: DistributionShift, + ) -> Self { + Self { mean: current_mean, sigma: current_sigma } + } +} + +impl From for DistributionShift { + fn from(DistributionShiftSerializable { mean, sigma }: DistributionShiftSerializable) -> Self { + Self { current_mean: OrderedFloat(mean), current_sigma: OrderedFloat(sigma) } + } +} + +impl DistributionShift { + /// `None` if sigma <= 0. + pub fn new(mean: f32, sigma: f32) -> Option { + if sigma <= 0.0 { + None + } else { + Some(Self { current_mean: OrderedFloat(mean), current_sigma: OrderedFloat(sigma) }) + } + } + + pub fn shift(&self, score: f32) -> f32 { + let current_mean = self.current_mean.0; + let current_sigma = self.current_sigma.0; + // + // We're somewhat abusively mapping the distribution of distances to a gaussian. + // The parameters we're given is the mean and sigma of the native result distribution. + // We're using them to retarget the distribution to a gaussian centered on 0.5 with a sigma of 0.4. + + let target_mean = 0.5; + let target_sigma = 0.4; + + // a^2 sig1^2 = sig2^2 => a^2 = sig2^2 / sig1^2 => a = sig2 / sig1, assuming a, sig1, and sig2 positive. + let factor = target_sigma / current_sigma; + // a*mu1 + b = mu2 => b = mu2 - a*mu1 + let offset = target_mean - (factor * current_mean); + + let mut score = factor * score + offset; + + // clamp the final score in the ]0, 1] interval. + if score <= 0.0 { + score = f32::EPSILON; + } + if score > 1.0 { + score = 1.0; + } + + score + } +} diff --git a/crates/milli/src/vector/composite.rs b/crates/milli/src/vector/embedder/composite.rs similarity index 97% rename from crates/milli/src/vector/composite.rs rename to crates/milli/src/vector/embedder/composite.rs index 539e92ba8..c34c31b41 100644 --- a/crates/milli/src/vector/composite.rs +++ b/crates/milli/src/vector/embedder/composite.rs @@ -2,14 +2,14 @@ use std::time::Instant; use hannoy::Distance; -use super::error::CompositeEmbedderContainsHuggingFace; -use super::{ - hf, manual, ollama, openai, rest, DistributionShift, EmbedError, Embedding, EmbeddingCache, - NewEmbedderError, -}; +use super::{hf, manual, ollama, openai, rest, Embedding, EmbeddingCache}; use crate::progress::EmbedderStats; +use crate::vector::error::{CompositeEmbedderContainsHuggingFace, EmbedError, NewEmbedderError}; +use crate::vector::DistributionShift; use crate::ThreadPoolNoAbort; +pub(in crate::vector) const MAX_COMPOSITE_DISTANCE: f32 = 0.01; + #[derive(Debug)] pub enum SubEmbedder { /// An embedder based on running local models, fetched from the Hugging Face Hub. @@ -336,7 +336,7 @@ fn check_similarity( }; let distance = hannoy::distances::Cosine::distance(&left, &right); - if distance > super::MAX_COMPOSITE_DISTANCE { + if distance > crate::vector::embedder::composite::MAX_COMPOSITE_DISTANCE { return Err(NewEmbedderError::composite_embedding_value_mismatch(distance, hint)); } } diff --git a/crates/milli/src/vector/hf.rs b/crates/milli/src/vector/embedder/hf.rs similarity index 98% rename from crates/milli/src/vector/hf.rs rename to crates/milli/src/vector/embedder/hf.rs index 1e5c7bd1c..18f80dec1 100644 --- a/crates/milli/src/vector/hf.rs +++ b/crates/milli/src/vector/embedder/hf.rs @@ -6,8 +6,9 @@ use hf_hub::api::sync::Api; use hf_hub::{Repo, RepoType}; use tokenizers::{PaddingParams, Tokenizer}; -pub use super::error::{EmbedError, Error, NewEmbedderError}; -use super::{DistributionShift, Embedding, EmbeddingCache}; +use super::EmbeddingCache; +use crate::vector::error::{EmbedError, NewEmbedderError}; +use crate::vector::{DistributionShift, Embedding}; #[derive( Debug, diff --git a/crates/milli/src/vector/manual.rs b/crates/milli/src/vector/embedder/manual.rs similarity index 93% rename from crates/milli/src/vector/manual.rs rename to crates/milli/src/vector/embedder/manual.rs index b95bf0ea2..132aab0bf 100644 --- a/crates/milli/src/vector/manual.rs +++ b/crates/milli/src/vector/embedder/manual.rs @@ -1,6 +1,5 @@ -use super::error::EmbedError; -use super::DistributionShift; -use crate::vector::Embedding; +use crate::vector::error::EmbedError; +use crate::vector::{DistributionShift, Embedding}; #[derive(Debug, Clone, Copy)] pub struct Embedder { diff --git a/crates/milli/src/vector/embedder/mod.rs b/crates/milli/src/vector/embedder/mod.rs new file mode 100644 index 000000000..b7f7b1de4 --- /dev/null +++ b/crates/milli/src/vector/embedder/mod.rs @@ -0,0 +1,381 @@ +pub mod composite; +pub mod hf; +pub mod manual; +pub mod ollama; +pub mod openai; +pub mod rest; + +use std::num::NonZeroUsize; +use std::sync::Mutex; +use std::time::Instant; + +use composite::SubEmbedderOptions; + +use crate::progress::EmbedderStats; +use crate::prompt::PromptData; +use crate::vector::error::{EmbedError, NewEmbedderError}; +use crate::vector::{DistributionShift, Embedding}; +use crate::ThreadPoolNoAbort; + +/// An embedder can be used to transform text into embeddings. +#[derive(Debug)] +pub enum Embedder { + /// An embedder based on running local models, fetched from the Hugging Face Hub. + HuggingFace(hf::Embedder), + /// An embedder based on making embedding queries against the OpenAI API. + OpenAi(openai::Embedder), + /// An embedder based on the user providing the embeddings in the documents and queries. + UserProvided(manual::Embedder), + /// An embedder based on making embedding queries against an embedding server. + Ollama(ollama::Embedder), + /// An embedder based on making embedding queries against a generic JSON/REST embedding server. + Rest(rest::Embedder), + /// An embedder composed of an embedder at search time and an embedder at indexing time. + Composite(composite::Embedder), +} + +/// Configuration for an embedder. +#[derive(Debug, Clone, Default, serde::Deserialize, serde::Serialize)] +pub struct EmbeddingConfig { + /// Options of the embedder, specific to each kind of embedder + pub embedder_options: EmbedderOptions, + /// Document template + pub prompt: PromptData, + /// If this embedder is binary quantized + pub quantized: Option, + // TODO: add metrics and anything needed +} + +impl EmbeddingConfig { + pub fn quantized(&self) -> bool { + self.quantized.unwrap_or_default() + } +} + +/// Options of an embedder, specific to each kind of embedder. +#[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] +pub enum EmbedderOptions { + HuggingFace(hf::EmbedderOptions), + OpenAi(openai::EmbedderOptions), + Ollama(ollama::EmbedderOptions), + UserProvided(manual::EmbedderOptions), + Rest(rest::EmbedderOptions), + Composite(composite::EmbedderOptions), +} + +impl EmbedderOptions { + pub fn fragment(&self, name: &str) -> Option<&serde_json::Value> { + match &self { + EmbedderOptions::HuggingFace(_) + | EmbedderOptions::OpenAi(_) + | EmbedderOptions::Ollama(_) + | EmbedderOptions::UserProvided(_) => None, + EmbedderOptions::Rest(embedder_options) => { + embedder_options.indexing_fragments.get(name) + } + EmbedderOptions::Composite(embedder_options) => { + if let SubEmbedderOptions::Rest(embedder_options) = &embedder_options.index { + embedder_options.indexing_fragments.get(name) + } else { + None + } + } + } + } + + pub fn has_fragments(&self) -> bool { + match &self { + EmbedderOptions::HuggingFace(_) + | EmbedderOptions::OpenAi(_) + | EmbedderOptions::Ollama(_) + | EmbedderOptions::UserProvided(_) => false, + EmbedderOptions::Rest(embedder_options) => { + !embedder_options.indexing_fragments.is_empty() + } + EmbedderOptions::Composite(embedder_options) => { + if let SubEmbedderOptions::Rest(embedder_options) = &embedder_options.index { + !embedder_options.indexing_fragments.is_empty() + } else { + false + } + } + } + } +} + +impl Default for EmbedderOptions { + fn default() -> Self { + Self::HuggingFace(Default::default()) + } +} + +impl Embedder { + /// Spawns a new embedder built from its options. + pub fn new( + options: EmbedderOptions, + cache_cap: usize, + ) -> std::result::Result { + Ok(match options { + EmbedderOptions::HuggingFace(options) => { + Self::HuggingFace(hf::Embedder::new(options, cache_cap)?) + } + EmbedderOptions::OpenAi(options) => { + Self::OpenAi(openai::Embedder::new(options, cache_cap)?) + } + EmbedderOptions::Ollama(options) => { + Self::Ollama(ollama::Embedder::new(options, cache_cap)?) + } + EmbedderOptions::UserProvided(options) => { + Self::UserProvided(manual::Embedder::new(options)) + } + EmbedderOptions::Rest(options) => Self::Rest(rest::Embedder::new( + options, + cache_cap, + rest::ConfigurationSource::User, + )?), + EmbedderOptions::Composite(options) => { + Self::Composite(composite::Embedder::new(options, cache_cap)?) + } + }) + } + + /// Embed in search context + + #[tracing::instrument(level = "debug", skip_all, target = "search")] + pub fn embed_search( + &self, + query: SearchQuery<'_>, + deadline: Option, + ) -> std::result::Result { + match query { + SearchQuery::Text(text) => self.embed_search_text(text, deadline), + SearchQuery::Media { q, media } => self.embed_search_media(q, media, deadline), + } + } + + pub fn embed_search_text( + &self, + text: &str, + deadline: Option, + ) -> std::result::Result { + if let Some(cache) = self.cache() { + if let Some(embedding) = cache.get(text) { + tracing::trace!(text, "embedding found in cache"); + return Ok(embedding); + } + } + let embedding = match self { + Embedder::HuggingFace(embedder) => embedder.embed_one(text), + Embedder::OpenAi(embedder) => embedder + .embed(&[text], deadline, None)? + .pop() + .ok_or_else(EmbedError::missing_embedding), + Embedder::Ollama(embedder) => embedder + .embed(&[text], deadline, None)? + .pop() + .ok_or_else(EmbedError::missing_embedding), + Embedder::UserProvided(embedder) => embedder.embed_one(text), + Embedder::Rest(embedder) => embedder.embed_one(SearchQuery::Text(text), deadline, None), + Embedder::Composite(embedder) => embedder.search.embed_one(text, deadline, None), + }?; + + if let Some(cache) = self.cache() { + cache.put(text.to_owned(), embedding.clone()); + } + + Ok(embedding) + } + + pub fn embed_search_media( + &self, + q: Option<&str>, + media: Option<&serde_json::Value>, + deadline: Option, + ) -> std::result::Result { + let Embedder::Rest(embedder) = self else { + return Err(EmbedError::rest_media_not_a_rest()); + }; + embedder.embed_one(SearchQuery::Media { q, media }, deadline, None) + } + + /// Embed multiple chunks of texts. + /// + /// Each chunk is composed of one or multiple texts. + pub fn embed_index( + &self, + text_chunks: Vec>, + threads: &ThreadPoolNoAbort, + embedder_stats: &EmbedderStats, + ) -> std::result::Result>, EmbedError> { + match self { + Embedder::HuggingFace(embedder) => embedder.embed_index(text_chunks), + Embedder::OpenAi(embedder) => { + embedder.embed_index(text_chunks, threads, embedder_stats) + } + Embedder::Ollama(embedder) => { + embedder.embed_index(text_chunks, threads, embedder_stats) + } + Embedder::UserProvided(embedder) => embedder.embed_index(text_chunks), + Embedder::Rest(embedder) => embedder.embed_index(text_chunks, threads, embedder_stats), + Embedder::Composite(embedder) => { + embedder.index.embed_index(text_chunks, threads, embedder_stats) + } + } + } + + /// Non-owning variant of [`Self::embed_index`]. + pub fn embed_index_ref( + &self, + texts: &[&str], + threads: &ThreadPoolNoAbort, + embedder_stats: &EmbedderStats, + ) -> std::result::Result, EmbedError> { + match self { + Embedder::HuggingFace(embedder) => embedder.embed_index_ref(texts), + Embedder::OpenAi(embedder) => embedder.embed_index_ref(texts, threads, embedder_stats), + Embedder::Ollama(embedder) => embedder.embed_index_ref(texts, threads, embedder_stats), + Embedder::UserProvided(embedder) => embedder.embed_index_ref(texts), + Embedder::Rest(embedder) => embedder.embed_index_ref(texts, threads, embedder_stats), + Embedder::Composite(embedder) => { + embedder.index.embed_index_ref(texts, threads, embedder_stats) + } + } + } + + pub fn embed_index_ref_fragments( + &self, + fragments: &[serde_json::Value], + threads: &ThreadPoolNoAbort, + embedder_stats: &EmbedderStats, + ) -> std::result::Result, EmbedError> { + if let Embedder::Rest(embedder) = self { + embedder.embed_index_ref(fragments, threads, embedder_stats) + } else { + let Embedder::Composite(embedder) = self else { + unimplemented!("embedding fragments is only available for rest embedders") + }; + let crate::vector::embedder::composite::SubEmbedder::Rest(embedder) = &embedder.index + else { + unimplemented!("embedding fragments is only available for rest embedders") + }; + + embedder.embed_index_ref(fragments, threads, embedder_stats) + } + } + + /// Indicates the preferred number of chunks to pass to [`Self::embed_chunks`] + pub fn chunk_count_hint(&self) -> usize { + match self { + Embedder::HuggingFace(embedder) => embedder.chunk_count_hint(), + Embedder::OpenAi(embedder) => embedder.chunk_count_hint(), + Embedder::Ollama(embedder) => embedder.chunk_count_hint(), + Embedder::UserProvided(_) => 100, + Embedder::Rest(embedder) => embedder.chunk_count_hint(), + Embedder::Composite(embedder) => embedder.index.chunk_count_hint(), + } + } + + /// Indicates the preferred number of texts in a single chunk passed to [`Self::embed`] + pub fn prompt_count_in_chunk_hint(&self) -> usize { + match self { + Embedder::HuggingFace(embedder) => embedder.prompt_count_in_chunk_hint(), + Embedder::OpenAi(embedder) => embedder.prompt_count_in_chunk_hint(), + Embedder::Ollama(embedder) => embedder.prompt_count_in_chunk_hint(), + Embedder::UserProvided(_) => 1, + Embedder::Rest(embedder) => embedder.prompt_count_in_chunk_hint(), + Embedder::Composite(embedder) => embedder.index.prompt_count_in_chunk_hint(), + } + } + + /// Indicates the dimensions of a single embedding produced by the embedder. + pub fn dimensions(&self) -> usize { + match self { + Embedder::HuggingFace(embedder) => embedder.dimensions(), + Embedder::OpenAi(embedder) => embedder.dimensions(), + Embedder::Ollama(embedder) => embedder.dimensions(), + Embedder::UserProvided(embedder) => embedder.dimensions(), + Embedder::Rest(embedder) => embedder.dimensions(), + Embedder::Composite(embedder) => embedder.dimensions(), + } + } + + /// An optional distribution used to apply an affine transformation to the similarity score of a document. + pub fn distribution(&self) -> Option { + match self { + Embedder::HuggingFace(embedder) => embedder.distribution(), + Embedder::OpenAi(embedder) => embedder.distribution(), + Embedder::Ollama(embedder) => embedder.distribution(), + Embedder::UserProvided(embedder) => embedder.distribution(), + Embedder::Rest(embedder) => embedder.distribution(), + Embedder::Composite(embedder) => embedder.distribution(), + } + } + + pub fn uses_document_template(&self) -> bool { + match self { + Embedder::HuggingFace(_) + | Embedder::OpenAi(_) + | Embedder::Ollama(_) + | Embedder::Rest(_) => true, + Embedder::UserProvided(_) => false, + Embedder::Composite(embedder) => embedder.index.uses_document_template(), + } + } + + fn cache(&self) -> Option<&EmbeddingCache> { + match self { + Embedder::HuggingFace(embedder) => Some(embedder.cache()), + Embedder::OpenAi(embedder) => Some(embedder.cache()), + Embedder::UserProvided(_) => None, + Embedder::Ollama(embedder) => Some(embedder.cache()), + Embedder::Rest(embedder) => Some(embedder.cache()), + Embedder::Composite(embedder) => embedder.search.cache(), + } + } +} + +#[derive(Clone, Copy)] +pub enum SearchQuery<'a> { + Text(&'a str), + Media { q: Option<&'a str>, media: Option<&'a serde_json::Value> }, +} + +#[derive(Debug)] +struct EmbeddingCache { + data: Option>>, +} + +impl EmbeddingCache { + const MAX_TEXT_LEN: usize = 2000; + + pub fn new(cap: usize) -> Self { + let data = NonZeroUsize::new(cap).map(lru::LruCache::new).map(Mutex::new); + Self { data } + } + + /// Get the embedding corresponding to `text`, if any is present in the cache. + pub fn get(&self, text: &str) -> Option { + let data = self.data.as_ref()?; + if text.len() > Self::MAX_TEXT_LEN { + return None; + } + let mut cache = data.lock().unwrap(); + + cache.get(text).cloned() + } + + /// Puts a new embedding for the specified `text` + pub fn put(&self, text: String, embedding: Embedding) { + let Some(data) = self.data.as_ref() else { + return; + }; + if text.len() > Self::MAX_TEXT_LEN { + return; + } + tracing::trace!(text, "embedding added to cache"); + + let mut cache = data.lock().unwrap(); + + cache.put(text, embedding); + } +} diff --git a/crates/milli/src/vector/ollama.rs b/crates/milli/src/vector/embedder/ollama.rs similarity index 96% rename from crates/milli/src/vector/ollama.rs rename to crates/milli/src/vector/embedder/ollama.rs index feec92cc0..6e2dc185f 100644 --- a/crates/milli/src/vector/ollama.rs +++ b/crates/milli/src/vector/embedder/ollama.rs @@ -3,12 +3,12 @@ use std::time::Instant; use rayon::iter::{IntoParallelIterator as _, ParallelIterator as _}; use rayon::slice::ParallelSlice as _; -use super::error::{EmbedError, EmbedErrorKind, NewEmbedderError, NewEmbedderErrorKind}; use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions}; -use super::{DistributionShift, EmbeddingCache, REQUEST_PARALLELISM}; +use super::EmbeddingCache; use crate::error::FaultSource; use crate::progress::EmbedderStats; -use crate::vector::Embedding; +use crate::vector::error::{EmbedError, EmbedErrorKind, NewEmbedderError, NewEmbedderErrorKind}; +use crate::vector::{DistributionShift, Embedding, REQUEST_PARALLELISM}; use crate::ThreadPoolNoAbort; #[derive(Debug)] @@ -88,7 +88,7 @@ impl Embedder { Err(NewEmbedderError { kind: NewEmbedderErrorKind::CouldNotDetermineDimension(EmbedError { - kind: super::error::EmbedErrorKind::RestOtherStatusCode(404, error), + kind: EmbedErrorKind::RestOtherStatusCode(404, error), fault: _, }), fault: _, diff --git a/crates/milli/src/vector/openai.rs b/crates/milli/src/vector/embedder/openai.rs similarity index 98% rename from crates/milli/src/vector/openai.rs rename to crates/milli/src/vector/embedder/openai.rs index bf6c92978..4fec228e4 100644 --- a/crates/milli/src/vector/openai.rs +++ b/crates/milli/src/vector/embedder/openai.rs @@ -5,13 +5,12 @@ use ordered_float::OrderedFloat; use rayon::iter::{IntoParallelIterator, ParallelIterator as _}; use rayon::slice::ParallelSlice as _; -use super::error::{EmbedError, NewEmbedderError}; use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions}; -use super::{DistributionShift, EmbeddingCache, REQUEST_PARALLELISM}; +use super::{DistributionShift, EmbeddingCache}; use crate::error::FaultSource; use crate::progress::EmbedderStats; -use crate::vector::error::EmbedErrorKind; -use crate::vector::Embedding; +use crate::vector::error::{EmbedError, EmbedErrorKind, NewEmbedderError}; +use crate::vector::{Embedding, REQUEST_PARALLELISM}; use crate::ThreadPoolNoAbort; #[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] diff --git a/crates/milli/src/vector/rest.rs b/crates/milli/src/vector/embedder/rest.rs similarity index 98% rename from crates/milli/src/vector/rest.rs rename to crates/milli/src/vector/embedder/rest.rs index 64e4e74c7..3e0c5989a 100644 --- a/crates/milli/src/vector/rest.rs +++ b/crates/milli/src/vector/embedder/rest.rs @@ -8,14 +8,12 @@ use rayon::slice::ParallelSlice as _; use serde::{Deserialize, Serialize}; use serde_json::Value; -use super::error::EmbedErrorKind; -use super::json_template::{InjectableValue, JsonTemplate}; -use super::{ - DistributionShift, EmbedError, Embedding, EmbeddingCache, NewEmbedderError, SearchQuery, - REQUEST_PARALLELISM, -}; +use super::EmbeddingCache; use crate::error::FaultSource; use crate::progress::EmbedderStats; +use crate::vector::error::{EmbedError, EmbedErrorKind, NewEmbedderError}; +use crate::vector::json_template::{InjectableValue, JsonTemplate}; +use crate::vector::{DistributionShift, Embedding, SearchQuery, REQUEST_PARALLELISM}; use crate::ThreadPoolNoAbort; // retrying in case of failure @@ -315,7 +313,7 @@ impl Embedder { } pub fn chunk_count_hint(&self) -> usize { - super::REQUEST_PARALLELISM + crate::vector::REQUEST_PARALLELISM } pub fn prompt_count_in_chunk_hint(&self) -> usize { diff --git a/crates/milli/src/vector/embeddings.rs b/crates/milli/src/vector/embeddings.rs new file mode 100644 index 000000000..467ebc81e --- /dev/null +++ b/crates/milli/src/vector/embeddings.rs @@ -0,0 +1,76 @@ +/// One or multiple embeddings stored consecutively in a flat vector. +#[derive(Debug, PartialEq)] +pub struct Embeddings { + data: Vec, + dimension: usize, +} + +impl Embeddings { + /// Declares an empty vector of embeddings of the specified dimensions. + pub fn new(dimension: usize) -> Self { + Self { data: Default::default(), dimension } + } + + /// Declares a vector of embeddings containing a single element. + /// + /// The dimension is inferred from the length of the passed embedding. + pub fn from_single_embedding(embedding: Vec) -> Self { + Self { dimension: embedding.len(), data: embedding } + } + + /// Declares a vector of embeddings from its components. + /// + /// `data.len()` must be a multiple of `dimension`, otherwise an error is returned. + pub fn from_inner(data: Vec, dimension: usize) -> Result> { + let mut this = Self::new(dimension); + this.append(data)?; + Ok(this) + } + + /// Returns the number of embeddings in this vector of embeddings. + pub fn embedding_count(&self) -> usize { + self.data.len() / self.dimension + } + + /// Dimension of a single embedding. + pub fn dimension(&self) -> usize { + self.dimension + } + + /// Deconstructs self into the inner flat vector. + pub fn into_inner(self) -> Vec { + self.data + } + + /// A reference to the inner flat vector. + pub fn as_inner(&self) -> &[F] { + &self.data + } + + /// Iterates over the embeddings contained in the flat vector. + pub fn iter(&self) -> impl Iterator + '_ { + self.data.as_slice().chunks_exact(self.dimension) + } + + /// Push an embedding at the end of the embeddings. + /// + /// If `embedding.len() != self.dimension`, then the push operation fails. + pub fn push(&mut self, mut embedding: Vec) -> Result<(), Vec> { + if embedding.len() != self.dimension { + return Err(embedding); + } + self.data.append(&mut embedding); + Ok(()) + } + + /// Append a flat vector of embeddings at the end of the embeddings. + /// + /// If `embeddings.len() % self.dimension != 0`, then the append operation fails. + pub fn append(&mut self, mut embeddings: Vec) -> Result<(), Vec> { + if embeddings.len() % self.dimension != 0 { + return Err(embeddings); + } + self.data.append(&mut embeddings); + Ok(()) + } +} diff --git a/crates/milli/src/vector/error.rs b/crates/milli/src/vector/error.rs index 0d737cbfc..b4b90b24b 100644 --- a/crates/milli/src/vector/error.rs +++ b/crates/milli/src/vector/error.rs @@ -6,10 +6,10 @@ use hf_hub::api::sync::ApiError; use itertools::Itertools as _; use super::parsed_vectors::ParsedVectorsDiff; -use super::rest::ConfigurationSource; -use super::MAX_COMPOSITE_DISTANCE; use crate::error::FaultSource; use crate::update::new::vector_document::VectorDocument; +use crate::vector::embedder::composite::MAX_COMPOSITE_DISTANCE; +use crate::vector::embedder::rest::ConfigurationSource; use crate::{FieldDistribution, PanicCatched}; #[derive(Debug, thiserror::Error)] diff --git a/crates/milli/src/vector/mod.rs b/crates/milli/src/vector/mod.rs index 9487cd9b1..f9b62d7d8 100644 --- a/crates/milli/src/vector/mod.rs +++ b/crates/milli/src/vector/mod.rs @@ -1,1453 +1,29 @@ -use std::collections::HashMap; -use std::num::NonZeroUsize; -use std::sync::{Arc, Mutex}; -use std::time::Instant; - -use deserr::{DeserializeError, Deserr}; -use hannoy::distances::{Cosine, Hamming}; -use hannoy::ItemId; -use heed::{RoTxn, RwTxn, Unspecified}; -use ordered_float::OrderedFloat; -use rand::SeedableRng as _; -use roaring::RoaringBitmap; -use serde::{Deserialize, Serialize}; -use utoipa::ToSchema; - -use self::error::{EmbedError, NewEmbedderError}; -use crate::progress::{EmbedderStats, Progress}; -use crate::prompt::{Prompt, PromptData}; -use crate::vector::composite::SubEmbedderOptions; -use crate::vector::json_template::JsonTemplate; -use crate::ThreadPoolNoAbort; - -pub mod composite; pub mod db; +mod distribution; +pub mod embedder; +mod embeddings; pub mod error; pub mod extractor; -pub mod hf; pub mod json_template; -pub mod manual; -pub mod openai; pub mod parsed_vectors; +mod runtime; pub mod session; pub mod settings; - -pub mod ollama; -pub mod rest; +mod store; pub use self::error::Error; pub type Embedding = Vec; +pub use distribution::DistributionShift; +pub use embedder::{Embedder, EmbedderOptions, EmbeddingConfig, SearchQuery}; +pub use embeddings::Embeddings; +pub use runtime::{RuntimeEmbedder, RuntimeEmbedders, RuntimeFragment}; +pub use store::{HannoyStats, VectorStore}; + pub const REQUEST_PARALLELISM: usize = 40; -pub const MAX_COMPOSITE_DISTANCE: f32 = 0.01; - -const HANNOY_EF_CONSTRUCTION: usize = 125; -const HANNOY_M: usize = 16; -const HANNOY_M0: usize = 32; - -pub struct VectorStore { - version: (u32, u32, u32), - database: hannoy::Database, - embedder_index: u8, - quantized: bool, -} - -impl VectorStore { - pub fn new( - version: (u32, u32, u32), - database: hannoy::Database, - embedder_index: u8, - quantized: bool, - ) -> Self { - Self { version, database, embedder_index, quantized } - } - - pub fn embedder_index(&self) -> u8 { - self.embedder_index - } - - /// Whether we must use the arroy to read the vector store. - pub fn version_uses_arroy(&self) -> bool { - let (major, minor, _patch) = self.version; - major == 1 && minor < 18 - } - - fn arroy_readers<'a, D: arroy::Distance>( - &'a self, - rtxn: &'a RoTxn<'a>, - db: arroy::Database, - ) -> impl Iterator, arroy::Error>> + 'a { - vector_store_range_for_embedder(self.embedder_index).filter_map(move |index| { - match arroy::Reader::open(rtxn, index, db) { - Ok(reader) => match reader.is_empty(rtxn) { - Ok(false) => Some(Ok(reader)), - Ok(true) => None, - Err(e) => Some(Err(e)), - }, - Err(arroy::Error::MissingMetadata(_)) => None, - Err(e) => Some(Err(e)), - } - }) - } - - fn readers<'a, D: hannoy::Distance>( - &'a self, - rtxn: &'a RoTxn<'a>, - db: hannoy::Database, - ) -> impl Iterator, hannoy::Error>> + 'a { - vector_store_range_for_embedder(self.embedder_index).filter_map(move |index| { - match hannoy::Reader::open(rtxn, index, db) { - Ok(reader) => match reader.is_empty(rtxn) { - Ok(false) => Some(Ok(reader)), - Ok(true) => None, - Err(e) => Some(Err(e)), - }, - Err(hannoy::Error::MissingMetadata(_)) => None, - Err(e) => Some(Err(e)), - } - }) - } - - /// The item ids that are present in the store specified by its id. - /// - /// The ids are accessed via a lambda to avoid lifetime shenanigans. - pub fn items_in_store( - &self, - rtxn: &RoTxn, - store_id: u8, - with_items: F, - ) -> crate::Result - where - F: FnOnce(&RoaringBitmap) -> O, - { - if self.version_uses_arroy() { - if self.quantized { - self._arroy_items_in_store(rtxn, self.arroy_quantized_db(), store_id, with_items) - .map_err(Into::into) - } else { - self._arroy_items_in_store(rtxn, self.arroy_angular_db(), store_id, with_items) - .map_err(Into::into) - } - } else if self.quantized { - self._items_in_store(rtxn, self.quantized_db(), store_id, with_items) - .map_err(Into::into) - } else { - self._items_in_store(rtxn, self.angular_db(), store_id, with_items).map_err(Into::into) - } - } - - fn _arroy_items_in_store( - &self, - rtxn: &RoTxn, - db: arroy::Database, - store_id: u8, - with_items: F, - ) -> Result - where - F: FnOnce(&RoaringBitmap) -> O, - { - let index = vector_store_for_embedder(self.embedder_index, store_id); - let reader = arroy::Reader::open(rtxn, index, db); - match reader { - Ok(reader) => Ok(with_items(reader.item_ids())), - Err(arroy::Error::MissingMetadata(_)) => Ok(with_items(&RoaringBitmap::new())), - Err(err) => Err(err), - } - } - - fn _items_in_store( - &self, - rtxn: &RoTxn, - db: hannoy::Database, - store_id: u8, - with_items: F, - ) -> Result - where - F: FnOnce(&RoaringBitmap) -> O, - { - let index = vector_store_for_embedder(self.embedder_index, store_id); - let reader = hannoy::Reader::open(rtxn, index, db); - match reader { - Ok(reader) => Ok(with_items(reader.item_ids())), - Err(hannoy::Error::MissingMetadata(_)) => Ok(with_items(&RoaringBitmap::new())), - Err(err) => Err(err), - } - } - - pub fn dimensions(&self, rtxn: &RoTxn) -> crate::Result> { - if self.version_uses_arroy() { - if self.quantized { - Ok(self - .arroy_readers(rtxn, self.arroy_quantized_db()) - .next() - .transpose()? - .map(|reader| reader.dimensions())) - } else { - Ok(self - .arroy_readers(rtxn, self.arroy_angular_db()) - .next() - .transpose()? - .map(|reader| reader.dimensions())) - } - } else if self.quantized { - Ok(self - .readers(rtxn, self.quantized_db()) - .next() - .transpose()? - .map(|reader| reader.dimensions())) - } else { - Ok(self - .readers(rtxn, self.angular_db()) - .next() - .transpose()? - .map(|reader| reader.dimensions())) - } - } - - pub fn convert_from_arroy(&self, wtxn: &mut RwTxn, progress: Progress) -> crate::Result<()> { - if self.quantized { - let dimensions = self - .arroy_readers(wtxn, self.arroy_quantized_db()) - .next() - .transpose()? - .map(|reader| reader.dimensions()); - - let Some(dimensions) = dimensions else { return Ok(()) }; - - for index in vector_store_range_for_embedder(self.embedder_index) { - let mut rng = rand::rngs::StdRng::from_entropy(); - let writer = hannoy::Writer::new(self.quantized_db(), index, dimensions); - let mut builder = writer.builder(&mut rng).progress(progress.clone()); - builder.prepare_arroy_conversion(wtxn)?; - builder.build::(wtxn)?; - } - - Ok(()) - } else { - let dimensions = self - .arroy_readers(wtxn, self.arroy_angular_db()) - .next() - .transpose()? - .map(|reader| reader.dimensions()); - - let Some(dimensions) = dimensions else { return Ok(()) }; - - for index in vector_store_range_for_embedder(self.embedder_index) { - let mut rng = rand::rngs::StdRng::from_entropy(); - let writer = hannoy::Writer::new(self.angular_db(), index, dimensions); - let mut builder = writer.builder(&mut rng).progress(progress.clone()); - builder.prepare_arroy_conversion(wtxn)?; - builder.build::(wtxn)?; - } - - Ok(()) - } - } - - #[allow(clippy::too_many_arguments)] - pub fn build_and_quantize( - &mut self, - wtxn: &mut RwTxn, - progress: Progress, - rng: &mut R, - dimension: usize, - quantizing: bool, - hannoy_memory: Option, - cancel: &(impl Fn() -> bool + Sync + Send), - ) -> Result<(), hannoy::Error> { - for index in vector_store_range_for_embedder(self.embedder_index) { - if self.quantized { - let writer = hannoy::Writer::new(self.quantized_db(), index, dimension); - if writer.need_build(wtxn)? { - let mut builder = writer.builder(rng).progress(progress.clone()); - builder - .available_memory(hannoy_memory.unwrap_or(usize::MAX)) - .cancel(cancel) - .ef_construction(HANNOY_EF_CONSTRUCTION) - .build::(wtxn)?; - } else if writer.is_empty(wtxn)? { - continue; - } - } else { - let writer = hannoy::Writer::new(self.angular_db(), index, dimension); - // If we are quantizing the databases, we can't know from meilisearch - // if the db was empty but still contained the wrong metadata, thus we need - // to quantize everything and can't stop early. Since this operation can - // only happens once in the life of an embedder, it's not very performances - // sensitive. - if quantizing && !self.quantized { - let writer = writer.prepare_changing_distance::(wtxn)?; - let mut builder = writer.builder(rng).progress(progress.clone()); - builder - .available_memory(hannoy_memory.unwrap_or(usize::MAX)) - .cancel(cancel) - .ef_construction(HANNOY_EF_CONSTRUCTION) - .build::(wtxn)?; - } else if writer.need_build(wtxn)? { - let mut builder = writer.builder(rng).progress(progress.clone()); - builder - .available_memory(hannoy_memory.unwrap_or(usize::MAX)) - .cancel(cancel) - .ef_construction(HANNOY_EF_CONSTRUCTION) - .build::(wtxn)?; - } else if writer.is_empty(wtxn)? { - continue; - } - } - } - Ok(()) - } - - /// Overwrite all the embeddings associated with the index and item ID. - /// /!\ It won't remove embeddings after the last passed embedding, which can leave stale embeddings. - /// You should call `del_items` on the `item_id` before calling this method. - /// /!\ Cannot insert more than u8::MAX embeddings; after inserting u8::MAX embeddings, all the remaining ones will be silently ignored. - pub fn add_items( - &self, - wtxn: &mut RwTxn, - item_id: hannoy::ItemId, - embeddings: &Embeddings, - ) -> Result<(), hannoy::Error> { - let dimension = embeddings.dimension(); - for (index, vector) in - vector_store_range_for_embedder(self.embedder_index).zip(embeddings.iter()) - { - if self.quantized { - hannoy::Writer::new(self.quantized_db(), index, dimension) - .add_item(wtxn, item_id, vector)? - } else { - hannoy::Writer::new(self.angular_db(), index, dimension) - .add_item(wtxn, item_id, vector)? - } - } - Ok(()) - } - - /// Add one document int for this index where we can find an empty spot. - pub fn add_item( - &self, - wtxn: &mut RwTxn, - item_id: hannoy::ItemId, - vector: &[f32], - ) -> Result<(), hannoy::Error> { - if self.quantized { - self._add_item(wtxn, self.quantized_db(), item_id, vector) - } else { - self._add_item(wtxn, self.angular_db(), item_id, vector) - } - } - - fn _add_item( - &self, - wtxn: &mut RwTxn, - db: hannoy::Database, - item_id: hannoy::ItemId, - vector: &[f32], - ) -> Result<(), hannoy::Error> { - let dimension = vector.len(); - - for index in vector_store_range_for_embedder(self.embedder_index) { - let writer = hannoy::Writer::new(db, index, dimension); - if !writer.contains_item(wtxn, item_id)? { - writer.add_item(wtxn, item_id, vector)?; - break; - } - } - Ok(()) - } - - /// Add a vector associated with a document in store specified by its id. - /// - /// Any existing vector associated with the document in the store will be replaced by the new vector. - pub fn add_item_in_store( - &self, - wtxn: &mut RwTxn, - item_id: hannoy::ItemId, - store_id: u8, - vector: &[f32], - ) -> Result<(), hannoy::Error> { - if self.quantized { - self._add_item_in_store(wtxn, self.quantized_db(), item_id, store_id, vector) - } else { - self._add_item_in_store(wtxn, self.angular_db(), item_id, store_id, vector) - } - } - - fn _add_item_in_store( - &self, - wtxn: &mut RwTxn, - db: hannoy::Database, - item_id: hannoy::ItemId, - store_id: u8, - vector: &[f32], - ) -> Result<(), hannoy::Error> { - let dimension = vector.len(); - - let index = vector_store_for_embedder(self.embedder_index, store_id); - let writer = hannoy::Writer::new(db, index, dimension); - writer.add_item(wtxn, item_id, vector) - } - - /// Delete all embeddings from a specific `item_id` - pub fn del_items( - &self, - wtxn: &mut RwTxn, - dimension: usize, - item_id: hannoy::ItemId, - ) -> Result<(), hannoy::Error> { - for index in vector_store_range_for_embedder(self.embedder_index) { - if self.quantized { - let writer = hannoy::Writer::new(self.quantized_db(), index, dimension); - writer.del_item(wtxn, item_id)?; - } else { - let writer = hannoy::Writer::new(self.angular_db(), index, dimension); - writer.del_item(wtxn, item_id)?; - } - } - - Ok(()) - } - - /// Removes the item specified by its id from the store specified by its id. - /// - /// Returns whether the item was removed. - /// - /// # Warning - /// - /// - This function will silently fail to remove the item if used against an arroy database that was never built. - pub fn del_item_in_store( - &self, - wtxn: &mut RwTxn, - item_id: hannoy::ItemId, - store_id: u8, - dimensions: usize, - ) -> Result { - if self.quantized { - self._del_item_in_store(wtxn, self.quantized_db(), item_id, store_id, dimensions) - } else { - self._del_item_in_store(wtxn, self.angular_db(), item_id, store_id, dimensions) - } - } - - fn _del_item_in_store( - &self, - wtxn: &mut RwTxn, - db: hannoy::Database, - item_id: hannoy::ItemId, - store_id: u8, - dimensions: usize, - ) -> Result { - let index = vector_store_for_embedder(self.embedder_index, store_id); - let writer = hannoy::Writer::new(db, index, dimensions); - writer.del_item(wtxn, item_id) - } - - /// Removes all items from the store specified by its id. - /// - /// # Warning - /// - /// - This function will silently fail to remove the items if used against an arroy database that was never built. - pub fn clear_store( - &self, - wtxn: &mut RwTxn, - store_id: u8, - dimensions: usize, - ) -> Result<(), hannoy::Error> { - if self.quantized { - self._clear_store(wtxn, self.quantized_db(), store_id, dimensions) - } else { - self._clear_store(wtxn, self.angular_db(), store_id, dimensions) - } - } - - fn _clear_store( - &self, - wtxn: &mut RwTxn, - db: hannoy::Database, - store_id: u8, - dimensions: usize, - ) -> Result<(), hannoy::Error> { - let index = vector_store_for_embedder(self.embedder_index, store_id); - let writer = hannoy::Writer::new(db, index, dimensions); - writer.clear(wtxn) - } - - /// Delete one item from its value. - pub fn del_item( - &self, - wtxn: &mut RwTxn, - item_id: hannoy::ItemId, - vector: &[f32], - ) -> Result { - if self.quantized { - self._del_item(wtxn, self.quantized_db(), item_id, vector) - } else { - self._del_item(wtxn, self.angular_db(), item_id, vector) - } - } - - fn _del_item( - &self, - wtxn: &mut RwTxn, - db: hannoy::Database, - item_id: hannoy::ItemId, - vector: &[f32], - ) -> Result { - let dimension = vector.len(); - - for index in vector_store_range_for_embedder(self.embedder_index) { - let writer = hannoy::Writer::new(db, index, dimension); - if writer.contains_item(wtxn, item_id)? { - return writer.del_item(wtxn, item_id); - } - } - Ok(false) - } - - pub fn clear(&self, wtxn: &mut RwTxn, dimension: usize) -> Result<(), hannoy::Error> { - for index in vector_store_range_for_embedder(self.embedder_index) { - if self.quantized { - let writer = hannoy::Writer::new(self.quantized_db(), index, dimension); - if writer.is_empty(wtxn)? { - continue; - } - writer.clear(wtxn)?; - } else { - let writer = hannoy::Writer::new(self.angular_db(), index, dimension); - if writer.is_empty(wtxn)? { - continue; - } - writer.clear(wtxn)?; - } - } - Ok(()) - } - - pub fn contains_item( - &self, - rtxn: &RoTxn, - dimension: usize, - item: hannoy::ItemId, - ) -> crate::Result { - for index in vector_store_range_for_embedder(self.embedder_index) { - let contains = if self.version_uses_arroy() { - if self.quantized { - let writer = arroy::Writer::new(self.arroy_quantized_db(), index, dimension); - if writer.is_empty(rtxn)? { - continue; - } - writer.contains_item(rtxn, item)? - } else { - let writer = arroy::Writer::new(self.arroy_angular_db(), index, dimension); - if writer.is_empty(rtxn)? { - continue; - } - writer.contains_item(rtxn, item)? - } - } else if self.quantized { - let writer = hannoy::Writer::new(self.quantized_db(), index, dimension); - if writer.is_empty(rtxn)? { - continue; - } - writer.contains_item(rtxn, item)? - } else { - let writer = hannoy::Writer::new(self.angular_db(), index, dimension); - if writer.is_empty(rtxn)? { - continue; - } - writer.contains_item(rtxn, item)? - }; - if contains { - return Ok(contains); - } - } - Ok(false) - } - - pub fn nns_by_item( - &self, - rtxn: &RoTxn, - item: ItemId, - limit: usize, - filter: Option<&RoaringBitmap>, - ) -> crate::Result> { - if self.version_uses_arroy() { - if self.quantized { - self._arroy_nns_by_item(rtxn, self.arroy_quantized_db(), item, limit, filter) - .map_err(Into::into) - } else { - self._arroy_nns_by_item(rtxn, self.arroy_angular_db(), item, limit, filter) - .map_err(Into::into) - } - } else if self.quantized { - self._nns_by_item(rtxn, self.quantized_db(), item, limit, filter).map_err(Into::into) - } else { - self._nns_by_item(rtxn, self.angular_db(), item, limit, filter).map_err(Into::into) - } - } - - fn _arroy_nns_by_item( - &self, - rtxn: &RoTxn, - db: arroy::Database, - item: ItemId, - limit: usize, - filter: Option<&RoaringBitmap>, - ) -> Result, arroy::Error> { - let mut results = Vec::new(); - - for reader in self.arroy_readers(rtxn, db) { - let reader = reader?; - let mut searcher = reader.nns(limit); - if let Some(filter) = filter { - if reader.item_ids().is_disjoint(filter) { - continue; - } - searcher.candidates(filter); - } - - if let Some(mut ret) = searcher.by_item(rtxn, item)? { - results.append(&mut ret); - } - } - results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance)); - Ok(results) - } - - fn _nns_by_item( - &self, - rtxn: &RoTxn, - db: hannoy::Database, - item: ItemId, - limit: usize, - filter: Option<&RoaringBitmap>, - ) -> Result, hannoy::Error> { - let mut results = Vec::new(); - - for reader in self.readers(rtxn, db) { - let reader = reader?; - let mut searcher = reader.nns(limit); - searcher.ef_search((limit * 10).max(100)); // TODO find better ef - if let Some(filter) = filter { - searcher.candidates(filter); - } - - if let Some(mut ret) = searcher.by_item(rtxn, item)? { - results.append(&mut ret); - } - } - results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance)); - Ok(results) - } - - pub fn nns_by_vector( - &self, - rtxn: &RoTxn, - vector: &[f32], - limit: usize, - filter: Option<&RoaringBitmap>, - ) -> crate::Result> { - if self.version_uses_arroy() { - if self.quantized { - self._arroy_nns_by_vector(rtxn, self.arroy_quantized_db(), vector, limit, filter) - .map_err(Into::into) - } else { - self._arroy_nns_by_vector(rtxn, self.arroy_angular_db(), vector, limit, filter) - .map_err(Into::into) - } - } else if self.quantized { - self._nns_by_vector(rtxn, self.quantized_db(), vector, limit, filter) - .map_err(Into::into) - } else { - self._nns_by_vector(rtxn, self.angular_db(), vector, limit, filter).map_err(Into::into) - } - } - - fn _arroy_nns_by_vector( - &self, - rtxn: &RoTxn, - db: arroy::Database, - vector: &[f32], - limit: usize, - filter: Option<&RoaringBitmap>, - ) -> Result, arroy::Error> { - let mut results = Vec::new(); - - for reader in self.arroy_readers(rtxn, db) { - let reader = reader?; - let mut searcher = reader.nns(limit); - if let Some(filter) = filter { - if reader.item_ids().is_disjoint(filter) { - continue; - } - searcher.candidates(filter); - } - - results.append(&mut searcher.by_vector(rtxn, vector)?); - } - - results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance)); - - Ok(results) - } - - fn _nns_by_vector( - &self, - rtxn: &RoTxn, - db: hannoy::Database, - vector: &[f32], - limit: usize, - filter: Option<&RoaringBitmap>, - ) -> Result, hannoy::Error> { - let mut results = Vec::new(); - - for reader in self.readers(rtxn, db) { - let reader = reader?; - let mut searcher = reader.nns(limit); - searcher.ef_search((limit * 10).max(100)); // TODO find better ef - if let Some(filter) = filter { - searcher.candidates(filter); - } - - results.append(&mut searcher.by_vector(rtxn, vector)?); - } - - results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance)); - - Ok(results) - } - - pub fn item_vectors(&self, rtxn: &RoTxn, item_id: u32) -> crate::Result>> { - let mut vectors = Vec::new(); - - if self.version_uses_arroy() { - if self.quantized { - for reader in self.arroy_readers(rtxn, self.arroy_quantized_db()) { - if let Some(vec) = reader?.item_vector(rtxn, item_id)? { - vectors.push(vec); - } - } - } else { - for reader in self.arroy_readers(rtxn, self.arroy_angular_db()) { - if let Some(vec) = reader?.item_vector(rtxn, item_id)? { - vectors.push(vec); - } - } - } - } else if self.quantized { - for reader in self.readers(rtxn, self.quantized_db()) { - if let Some(vec) = reader?.item_vector(rtxn, item_id)? { - vectors.push(vec); - } - } - } else { - for reader in self.readers(rtxn, self.angular_db()) { - if let Some(vec) = reader?.item_vector(rtxn, item_id)? { - vectors.push(vec); - } - } - } - - Ok(vectors) - } - - fn arroy_angular_db(&self) -> arroy::Database { - self.database.remap_types() - } - - fn arroy_quantized_db(&self) -> arroy::Database { - self.database.remap_types() - } - - fn angular_db(&self) -> hannoy::Database { - self.database.remap_data_type() - } - - fn quantized_db(&self) -> hannoy::Database { - self.database.remap_data_type() - } - - pub fn aggregate_stats( - &self, - rtxn: &RoTxn, - stats: &mut HannoyStats, - ) -> Result<(), hannoy::Error> { - if self.quantized { - for reader in self.readers(rtxn, self.quantized_db()) { - let reader = reader?; - let documents = reader.item_ids(); - stats.documents |= documents; - stats.number_of_embeddings += documents.len(); - } - } else { - for reader in self.readers(rtxn, self.angular_db()) { - let reader = reader?; - let documents = reader.item_ids(); - stats.documents |= documents; - stats.number_of_embeddings += documents.len(); - } - } - - Ok(()) - } -} - -#[derive(Debug, Default, Clone)] -pub struct HannoyStats { - pub number_of_embeddings: u64, - pub documents: RoaringBitmap, -} - -/// One or multiple embeddings stored consecutively in a flat vector. -#[derive(Debug, PartialEq)] -pub struct Embeddings { - data: Vec, - dimension: usize, -} - -impl Embeddings { - /// Declares an empty vector of embeddings of the specified dimensions. - pub fn new(dimension: usize) -> Self { - Self { data: Default::default(), dimension } - } - - /// Declares a vector of embeddings containing a single element. - /// - /// The dimension is inferred from the length of the passed embedding. - pub fn from_single_embedding(embedding: Vec) -> Self { - Self { dimension: embedding.len(), data: embedding } - } - - /// Declares a vector of embeddings from its components. - /// - /// `data.len()` must be a multiple of `dimension`, otherwise an error is returned. - pub fn from_inner(data: Vec, dimension: usize) -> Result> { - let mut this = Self::new(dimension); - this.append(data)?; - Ok(this) - } - - /// Returns the number of embeddings in this vector of embeddings. - pub fn embedding_count(&self) -> usize { - self.data.len() / self.dimension - } - - /// Dimension of a single embedding. - pub fn dimension(&self) -> usize { - self.dimension - } - - /// Deconstructs self into the inner flat vector. - pub fn into_inner(self) -> Vec { - self.data - } - - /// A reference to the inner flat vector. - pub fn as_inner(&self) -> &[F] { - &self.data - } - - /// Iterates over the embeddings contained in the flat vector. - pub fn iter(&self) -> impl Iterator + '_ { - self.data.as_slice().chunks_exact(self.dimension) - } - - /// Push an embedding at the end of the embeddings. - /// - /// If `embedding.len() != self.dimension`, then the push operation fails. - pub fn push(&mut self, mut embedding: Vec) -> Result<(), Vec> { - if embedding.len() != self.dimension { - return Err(embedding); - } - self.data.append(&mut embedding); - Ok(()) - } - - /// Append a flat vector of embeddings at the end of the embeddings. - /// - /// If `embeddings.len() % self.dimension != 0`, then the append operation fails. - pub fn append(&mut self, mut embeddings: Vec) -> Result<(), Vec> { - if embeddings.len() % self.dimension != 0 { - return Err(embeddings); - } - self.data.append(&mut embeddings); - Ok(()) - } -} - -/// An embedder can be used to transform text into embeddings. -#[derive(Debug)] -pub enum Embedder { - /// An embedder based on running local models, fetched from the Hugging Face Hub. - HuggingFace(hf::Embedder), - /// An embedder based on making embedding queries against the OpenAI API. - OpenAi(openai::Embedder), - /// An embedder based on the user providing the embeddings in the documents and queries. - UserProvided(manual::Embedder), - /// An embedder based on making embedding queries against an embedding server. - Ollama(ollama::Embedder), - /// An embedder based on making embedding queries against a generic JSON/REST embedding server. - Rest(rest::Embedder), - /// An embedder composed of an embedder at search time and an embedder at indexing time. - Composite(composite::Embedder), -} - -#[derive(Debug)] -struct EmbeddingCache { - data: Option>>, -} - -impl EmbeddingCache { - const MAX_TEXT_LEN: usize = 2000; - - pub fn new(cap: usize) -> Self { - let data = NonZeroUsize::new(cap).map(lru::LruCache::new).map(Mutex::new); - Self { data } - } - - /// Get the embedding corresponding to `text`, if any is present in the cache. - pub fn get(&self, text: &str) -> Option { - let data = self.data.as_ref()?; - if text.len() > Self::MAX_TEXT_LEN { - return None; - } - let mut cache = data.lock().unwrap(); - - cache.get(text).cloned() - } - - /// Puts a new embedding for the specified `text` - pub fn put(&self, text: String, embedding: Embedding) { - let Some(data) = self.data.as_ref() else { - return; - }; - if text.len() > Self::MAX_TEXT_LEN { - return; - } - tracing::trace!(text, "embedding added to cache"); - - let mut cache = data.lock().unwrap(); - - cache.put(text, embedding); - } -} - -/// Configuration for an embedder. -#[derive(Debug, Clone, Default, serde::Deserialize, serde::Serialize)] -pub struct EmbeddingConfig { - /// Options of the embedder, specific to each kind of embedder - pub embedder_options: EmbedderOptions, - /// Document template - pub prompt: PromptData, - /// If this embedder is binary quantized - pub quantized: Option, - // TODO: add metrics and anything needed -} - -impl EmbeddingConfig { - pub fn quantized(&self) -> bool { - self.quantized.unwrap_or_default() - } -} - -/// Map of runtime embedder data. -#[derive(Clone, Default)] -pub struct RuntimeEmbedders(HashMap>); - -pub struct RuntimeEmbedder { - pub embedder: Arc, - pub document_template: Prompt, - fragments: Vec, - pub is_quantized: bool, -} - -impl RuntimeEmbedder { - pub fn new( - embedder: Arc, - document_template: Prompt, - mut fragments: Vec, - is_quantized: bool, - ) -> Self { - fragments.sort_unstable_by(|left, right| left.name.cmp(&right.name)); - Self { embedder, document_template, fragments, is_quantized } - } - - /// The runtime fragments sorted by name. - pub fn fragments(&self) -> &[RuntimeFragment] { - self.fragments.as_slice() - } -} - -pub struct RuntimeFragment { - pub name: String, - pub id: u8, - pub template: JsonTemplate, -} - -impl RuntimeEmbedders { - /// Create the map from its internal component.s - pub fn new(data: HashMap>) -> Self { - Self(data) - } - - pub fn contains(&self, name: &str) -> bool { - self.0.contains_key(name) - } - - /// Get an embedder configuration and template from its name. - pub fn get(&self, name: &str) -> Option<&Arc> { - self.0.get(name) - } - - pub fn inner_as_ref(&self) -> &HashMap> { - &self.0 - } - - pub fn into_inner(self) -> HashMap> { - self.0 - } - - pub fn len(&self) -> usize { - self.0.len() - } - - pub fn is_empty(&self) -> bool { - self.0.is_empty() - } -} - -impl IntoIterator for RuntimeEmbedders { - type Item = (String, Arc); - - type IntoIter = std::collections::hash_map::IntoIter>; - - fn into_iter(self) -> Self::IntoIter { - self.0.into_iter() - } -} - -/// Options of an embedder, specific to each kind of embedder. -#[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] -pub enum EmbedderOptions { - HuggingFace(hf::EmbedderOptions), - OpenAi(openai::EmbedderOptions), - Ollama(ollama::EmbedderOptions), - UserProvided(manual::EmbedderOptions), - Rest(rest::EmbedderOptions), - Composite(composite::EmbedderOptions), -} - -impl EmbedderOptions { - pub fn fragment(&self, name: &str) -> Option<&serde_json::Value> { - match &self { - EmbedderOptions::HuggingFace(_) - | EmbedderOptions::OpenAi(_) - | EmbedderOptions::Ollama(_) - | EmbedderOptions::UserProvided(_) => None, - EmbedderOptions::Rest(embedder_options) => { - embedder_options.indexing_fragments.get(name) - } - EmbedderOptions::Composite(embedder_options) => { - if let SubEmbedderOptions::Rest(embedder_options) = &embedder_options.index { - embedder_options.indexing_fragments.get(name) - } else { - None - } - } - } - } - - pub fn has_fragments(&self) -> bool { - match &self { - EmbedderOptions::HuggingFace(_) - | EmbedderOptions::OpenAi(_) - | EmbedderOptions::Ollama(_) - | EmbedderOptions::UserProvided(_) => false, - EmbedderOptions::Rest(embedder_options) => { - !embedder_options.indexing_fragments.is_empty() - } - EmbedderOptions::Composite(embedder_options) => { - if let SubEmbedderOptions::Rest(embedder_options) = &embedder_options.index { - !embedder_options.indexing_fragments.is_empty() - } else { - false - } - } - } - } -} - -impl Default for EmbedderOptions { - fn default() -> Self { - Self::HuggingFace(Default::default()) - } -} - -impl Embedder { - /// Spawns a new embedder built from its options. - pub fn new( - options: EmbedderOptions, - cache_cap: usize, - ) -> std::result::Result { - Ok(match options { - EmbedderOptions::HuggingFace(options) => { - Self::HuggingFace(hf::Embedder::new(options, cache_cap)?) - } - EmbedderOptions::OpenAi(options) => { - Self::OpenAi(openai::Embedder::new(options, cache_cap)?) - } - EmbedderOptions::Ollama(options) => { - Self::Ollama(ollama::Embedder::new(options, cache_cap)?) - } - EmbedderOptions::UserProvided(options) => { - Self::UserProvided(manual::Embedder::new(options)) - } - EmbedderOptions::Rest(options) => Self::Rest(rest::Embedder::new( - options, - cache_cap, - rest::ConfigurationSource::User, - )?), - EmbedderOptions::Composite(options) => { - Self::Composite(composite::Embedder::new(options, cache_cap)?) - } - }) - } - - /// Embed in search context - - #[tracing::instrument(level = "debug", skip_all, target = "search")] - pub fn embed_search( - &self, - query: SearchQuery<'_>, - deadline: Option, - ) -> std::result::Result { - match query { - SearchQuery::Text(text) => self.embed_search_text(text, deadline), - SearchQuery::Media { q, media } => self.embed_search_media(q, media, deadline), - } - } - - pub fn embed_search_text( - &self, - text: &str, - deadline: Option, - ) -> std::result::Result { - if let Some(cache) = self.cache() { - if let Some(embedding) = cache.get(text) { - tracing::trace!(text, "embedding found in cache"); - return Ok(embedding); - } - } - let embedding = match self { - Embedder::HuggingFace(embedder) => embedder.embed_one(text), - Embedder::OpenAi(embedder) => embedder - .embed(&[text], deadline, None)? - .pop() - .ok_or_else(EmbedError::missing_embedding), - Embedder::Ollama(embedder) => embedder - .embed(&[text], deadline, None)? - .pop() - .ok_or_else(EmbedError::missing_embedding), - Embedder::UserProvided(embedder) => embedder.embed_one(text), - Embedder::Rest(embedder) => embedder.embed_one(SearchQuery::Text(text), deadline, None), - Embedder::Composite(embedder) => embedder.search.embed_one(text, deadline, None), - }?; - - if let Some(cache) = self.cache() { - cache.put(text.to_owned(), embedding.clone()); - } - - Ok(embedding) - } - - pub fn embed_search_media( - &self, - q: Option<&str>, - media: Option<&serde_json::Value>, - deadline: Option, - ) -> std::result::Result { - let Embedder::Rest(embedder) = self else { - return Err(EmbedError::rest_media_not_a_rest()); - }; - embedder.embed_one(SearchQuery::Media { q, media }, deadline, None) - } - - /// Embed multiple chunks of texts. - /// - /// Each chunk is composed of one or multiple texts. - pub fn embed_index( - &self, - text_chunks: Vec>, - threads: &ThreadPoolNoAbort, - embedder_stats: &EmbedderStats, - ) -> std::result::Result>, EmbedError> { - match self { - Embedder::HuggingFace(embedder) => embedder.embed_index(text_chunks), - Embedder::OpenAi(embedder) => { - embedder.embed_index(text_chunks, threads, embedder_stats) - } - Embedder::Ollama(embedder) => { - embedder.embed_index(text_chunks, threads, embedder_stats) - } - Embedder::UserProvided(embedder) => embedder.embed_index(text_chunks), - Embedder::Rest(embedder) => embedder.embed_index(text_chunks, threads, embedder_stats), - Embedder::Composite(embedder) => { - embedder.index.embed_index(text_chunks, threads, embedder_stats) - } - } - } - - /// Non-owning variant of [`Self::embed_index`]. - pub fn embed_index_ref( - &self, - texts: &[&str], - threads: &ThreadPoolNoAbort, - embedder_stats: &EmbedderStats, - ) -> std::result::Result, EmbedError> { - match self { - Embedder::HuggingFace(embedder) => embedder.embed_index_ref(texts), - Embedder::OpenAi(embedder) => embedder.embed_index_ref(texts, threads, embedder_stats), - Embedder::Ollama(embedder) => embedder.embed_index_ref(texts, threads, embedder_stats), - Embedder::UserProvided(embedder) => embedder.embed_index_ref(texts), - Embedder::Rest(embedder) => embedder.embed_index_ref(texts, threads, embedder_stats), - Embedder::Composite(embedder) => { - embedder.index.embed_index_ref(texts, threads, embedder_stats) - } - } - } - - pub fn embed_index_ref_fragments( - &self, - fragments: &[serde_json::Value], - threads: &ThreadPoolNoAbort, - embedder_stats: &EmbedderStats, - ) -> std::result::Result, EmbedError> { - if let Embedder::Rest(embedder) = self { - embedder.embed_index_ref(fragments, threads, embedder_stats) - } else { - let Embedder::Composite(embedder) = self else { - unimplemented!("embedding fragments is only available for rest embedders") - }; - let crate::vector::composite::SubEmbedder::Rest(embedder) = &embedder.index else { - unimplemented!("embedding fragments is only available for rest embedders") - }; - - embedder.embed_index_ref(fragments, threads, embedder_stats) - } - } - - /// Indicates the preferred number of chunks to pass to [`Self::embed_chunks`] - pub fn chunk_count_hint(&self) -> usize { - match self { - Embedder::HuggingFace(embedder) => embedder.chunk_count_hint(), - Embedder::OpenAi(embedder) => embedder.chunk_count_hint(), - Embedder::Ollama(embedder) => embedder.chunk_count_hint(), - Embedder::UserProvided(_) => 100, - Embedder::Rest(embedder) => embedder.chunk_count_hint(), - Embedder::Composite(embedder) => embedder.index.chunk_count_hint(), - } - } - - /// Indicates the preferred number of texts in a single chunk passed to [`Self::embed`] - pub fn prompt_count_in_chunk_hint(&self) -> usize { - match self { - Embedder::HuggingFace(embedder) => embedder.prompt_count_in_chunk_hint(), - Embedder::OpenAi(embedder) => embedder.prompt_count_in_chunk_hint(), - Embedder::Ollama(embedder) => embedder.prompt_count_in_chunk_hint(), - Embedder::UserProvided(_) => 1, - Embedder::Rest(embedder) => embedder.prompt_count_in_chunk_hint(), - Embedder::Composite(embedder) => embedder.index.prompt_count_in_chunk_hint(), - } - } - - /// Indicates the dimensions of a single embedding produced by the embedder. - pub fn dimensions(&self) -> usize { - match self { - Embedder::HuggingFace(embedder) => embedder.dimensions(), - Embedder::OpenAi(embedder) => embedder.dimensions(), - Embedder::Ollama(embedder) => embedder.dimensions(), - Embedder::UserProvided(embedder) => embedder.dimensions(), - Embedder::Rest(embedder) => embedder.dimensions(), - Embedder::Composite(embedder) => embedder.dimensions(), - } - } - - /// An optional distribution used to apply an affine transformation to the similarity score of a document. - pub fn distribution(&self) -> Option { - match self { - Embedder::HuggingFace(embedder) => embedder.distribution(), - Embedder::OpenAi(embedder) => embedder.distribution(), - Embedder::Ollama(embedder) => embedder.distribution(), - Embedder::UserProvided(embedder) => embedder.distribution(), - Embedder::Rest(embedder) => embedder.distribution(), - Embedder::Composite(embedder) => embedder.distribution(), - } - } - - pub fn uses_document_template(&self) -> bool { - match self { - Embedder::HuggingFace(_) - | Embedder::OpenAi(_) - | Embedder::Ollama(_) - | Embedder::Rest(_) => true, - Embedder::UserProvided(_) => false, - Embedder::Composite(embedder) => embedder.index.uses_document_template(), - } - } - - fn cache(&self) -> Option<&EmbeddingCache> { - match self { - Embedder::HuggingFace(embedder) => Some(embedder.cache()), - Embedder::OpenAi(embedder) => Some(embedder.cache()), - Embedder::UserProvided(_) => None, - Embedder::Ollama(embedder) => Some(embedder.cache()), - Embedder::Rest(embedder) => Some(embedder.cache()), - Embedder::Composite(embedder) => embedder.search.cache(), - } - } -} - -#[derive(Clone, Copy)] -pub enum SearchQuery<'a> { - Text(&'a str), - Media { q: Option<&'a str>, media: Option<&'a serde_json::Value> }, -} - -/// Describes the mean and sigma of distribution of embedding similarity in the embedding space. -/// -/// The intended use is to make the similarity score more comparable to the regular ranking score. -/// This allows to correct effects where results are too "packed" around a certain value. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Deserialize, Serialize, ToSchema)] -#[serde(from = "DistributionShiftSerializable")] -#[serde(into = "DistributionShiftSerializable")] -pub struct DistributionShift { - /// Value where the results are "packed". - /// - /// Similarity scores are translated so that they are packed around 0.5 instead - #[schema(value_type = f32)] - pub current_mean: OrderedFloat, - - /// standard deviation of a similarity score. - /// - /// Set below 0.4 to make the results less packed around the mean, and above 0.4 to make them more packed. - #[schema(value_type = f32)] - pub current_sigma: OrderedFloat, -} - -impl Deserr for DistributionShift -where - E: DeserializeError, -{ - fn deserialize_from_value( - value: deserr::Value, - location: deserr::ValuePointerRef<'_>, - ) -> Result { - let value = DistributionShiftSerializable::deserialize_from_value(value, location)?; - if value.mean < 0. || value.mean > 1. { - return Err(deserr::take_cf_content(E::error::( - None, - deserr::ErrorKind::Unexpected { - msg: format!( - "the distribution mean must be in the range [0, 1], got {}", - value.mean - ), - }, - location, - ))); - } - if value.sigma <= 0. || value.sigma > 1. { - return Err(deserr::take_cf_content(E::error::( - None, - deserr::ErrorKind::Unexpected { - msg: format!( - "the distribution sigma must be in the range ]0, 1], got {}", - value.sigma - ), - }, - location, - ))); - } - - Ok(value.into()) - } -} - -#[derive(Serialize, Deserialize, Deserr)] -#[serde(deny_unknown_fields)] -#[deserr(deny_unknown_fields)] -struct DistributionShiftSerializable { - mean: f32, - sigma: f32, -} - -impl From for DistributionShiftSerializable { - fn from( - DistributionShift { - current_mean: OrderedFloat(current_mean), - current_sigma: OrderedFloat(current_sigma), - }: DistributionShift, - ) -> Self { - Self { mean: current_mean, sigma: current_sigma } - } -} - -impl From for DistributionShift { - fn from(DistributionShiftSerializable { mean, sigma }: DistributionShiftSerializable) -> Self { - Self { current_mean: OrderedFloat(mean), current_sigma: OrderedFloat(sigma) } - } -} - -impl DistributionShift { - /// `None` if sigma <= 0. - pub fn new(mean: f32, sigma: f32) -> Option { - if sigma <= 0.0 { - None - } else { - Some(Self { current_mean: OrderedFloat(mean), current_sigma: OrderedFloat(sigma) }) - } - } - - pub fn shift(&self, score: f32) -> f32 { - let current_mean = self.current_mean.0; - let current_sigma = self.current_sigma.0; - // - // We're somewhat abusively mapping the distribution of distances to a gaussian. - // The parameters we're given is the mean and sigma of the native result distribution. - // We're using them to retarget the distribution to a gaussian centered on 0.5 with a sigma of 0.4. - - let target_mean = 0.5; - let target_sigma = 0.4; - - // a^2 sig1^2 = sig2^2 => a^2 = sig2^2 / sig1^2 => a = sig2 / sig1, assuming a, sig1, and sig2 positive. - let factor = target_sigma / current_sigma; - // a*mu1 + b = mu2 => b = mu2 - a*mu1 - let offset = target_mean - (factor * current_mean); - - let mut score = factor * score + offset; - - // clamp the final score in the ]0, 1] interval. - if score <= 0.0 { - score = f32::EPSILON; - } - if score > 1.0 { - score = 1.0; - } - - score - } -} /// Whether CUDA is supported in this version of Meilisearch. pub const fn is_cuda_enabled() -> bool { cfg!(feature = "cuda") } - -fn vector_store_range_for_embedder(embedder_id: u8) -> impl Iterator { - (0..=u8::MAX).map(move |store_id| vector_store_for_embedder(embedder_id, store_id)) -} - -fn vector_store_for_embedder(embedder_id: u8, store_id: u8) -> u16 { - let embedder_id = (embedder_id as u16) << 8; - embedder_id | (store_id as u16) -} diff --git a/crates/milli/src/vector/runtime.rs b/crates/milli/src/vector/runtime.rs new file mode 100644 index 000000000..5a653f1b1 --- /dev/null +++ b/crates/milli/src/vector/runtime.rs @@ -0,0 +1,81 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use super::Embedder; +use crate::prompt::Prompt; +use crate::vector::json_template::JsonTemplate; + +/// Map of runtime embedder data. +#[derive(Clone, Default)] +pub struct RuntimeEmbedders(HashMap>); + +pub struct RuntimeEmbedder { + pub embedder: Arc, + pub document_template: Prompt, + fragments: Vec, + pub is_quantized: bool, +} + +impl RuntimeEmbedder { + pub fn new( + embedder: Arc, + document_template: Prompt, + mut fragments: Vec, + is_quantized: bool, + ) -> Self { + fragments.sort_unstable_by(|left, right| left.name.cmp(&right.name)); + Self { embedder, document_template, fragments, is_quantized } + } + + /// The runtime fragments sorted by name. + pub fn fragments(&self) -> &[RuntimeFragment] { + self.fragments.as_slice() + } +} +pub struct RuntimeFragment { + pub name: String, + pub id: u8, + pub template: JsonTemplate, +} + +impl RuntimeEmbedders { + /// Create the map from its internal component.s + pub fn new(data: HashMap>) -> Self { + Self(data) + } + + pub fn contains(&self, name: &str) -> bool { + self.0.contains_key(name) + } + + /// Get an embedder configuration and template from its name. + pub fn get(&self, name: &str) -> Option<&Arc> { + self.0.get(name) + } + + pub fn inner_as_ref(&self) -> &HashMap> { + &self.0 + } + + pub fn into_inner(self) -> HashMap> { + self.0 + } + + pub fn len(&self) -> usize { + self.0.len() + } + + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } +} + +impl IntoIterator for RuntimeEmbedders { + type Item = (String, Arc); + + type IntoIter = std::collections::hash_map::IntoIter>; + + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() + } +} diff --git a/crates/milli/src/vector/session.rs b/crates/milli/src/vector/session.rs index b582bd840..b7ee7262b 100644 --- a/crates/milli/src/vector/session.rs +++ b/crates/milli/src/vector/session.rs @@ -2,7 +2,8 @@ use bumpalo::collections::Vec as BVec; use bumpalo::Bump; use serde_json::Value; -use super::{EmbedError, Embedder, Embedding}; +use super::error::EmbedError; +use super::{Embedder, Embedding}; use crate::progress::EmbedderStats; use crate::{DocumentId, Result, ThreadPoolNoAbort}; type ExtractorId = u8; diff --git a/crates/milli/src/vector/settings.rs b/crates/milli/src/vector/settings.rs index 1b85dd503..499ab3955 100644 --- a/crates/milli/src/vector/settings.rs +++ b/crates/milli/src/vector/settings.rs @@ -8,12 +8,12 @@ use roaring::RoaringBitmap; use serde::{Deserialize, Serialize}; use utoipa::ToSchema; -use super::composite::SubEmbedderOptions; -use super::hf::OverridePooling; -use super::{ollama, openai, DistributionShift, EmbedderOptions}; use crate::prompt::{default_max_bytes, PromptData}; use crate::update::Setting; -use crate::vector::EmbeddingConfig; +use crate::vector::embedder::composite::{self, SubEmbedderOptions}; +use crate::vector::embedder::hf::{self, OverridePooling}; +use crate::vector::embedder::{manual, ollama, openai, rest, EmbedderOptions}; +use crate::vector::{DistributionShift, EmbeddingConfig}; use crate::UserError; #[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq, Deserr, ToSchema)] @@ -1789,12 +1789,7 @@ pub struct Fragment { impl EmbeddingSettings { fn from_hugging_face( - super::hf::EmbedderOptions { - model, - revision, - distribution, - pooling, - }: super::hf::EmbedderOptions, + hf::EmbedderOptions { model, revision, distribution, pooling }: hf::EmbedderOptions, document_template: Setting, document_template_max_bytes: Setting, quantized: Option, @@ -1822,13 +1817,13 @@ impl EmbeddingSettings { } fn from_openai( - super::openai::EmbedderOptions { + openai::EmbedderOptions { url, api_key, embedding_model, dimensions, distribution, - }: super::openai::EmbedderOptions, + }: openai::EmbedderOptions, document_template: Setting, document_template_max_bytes: Setting, quantized: Option, @@ -1856,13 +1851,13 @@ impl EmbeddingSettings { } fn from_ollama( - super::ollama::EmbedderOptions { - embedding_model, - url, - api_key, - distribution, - dimensions, - }: super::ollama::EmbedderOptions, + ollama::EmbedderOptions { + embedding_model, + url, + api_key, + distribution, + dimensions, + }: ollama::EmbedderOptions, document_template: Setting, document_template_max_bytes: Setting, quantized: Option, @@ -1890,7 +1885,7 @@ impl EmbeddingSettings { } fn from_user_provided( - super::manual::EmbedderOptions { dimensions, distribution }: super::manual::EmbedderOptions, + manual::EmbedderOptions { dimensions, distribution }: manual::EmbedderOptions, quantized: Option, ) -> Self { Self { @@ -1916,7 +1911,7 @@ impl EmbeddingSettings { } fn from_rest( - super::rest::EmbedderOptions { + rest::EmbedderOptions { api_key, dimensions, url, @@ -1926,7 +1921,7 @@ impl EmbeddingSettings { response, distribution, headers, - }: super::rest::EmbedderOptions, + }: rest::EmbedderOptions, document_template: Setting, document_template_max_bytes: Setting, quantized: Option, @@ -2015,37 +2010,36 @@ impl From for EmbeddingSettings { document_template_max_bytes, quantized, ), - super::EmbedderOptions::Composite(super::composite::EmbedderOptions { - search, - index, - }) => Self { - source: Setting::Set(EmbedderSource::Composite), - model: Setting::NotSet, - revision: Setting::NotSet, - pooling: Setting::NotSet, - api_key: Setting::NotSet, - dimensions: Setting::NotSet, - binary_quantized: Setting::some_or_not_set(quantized), - document_template: Setting::NotSet, - document_template_max_bytes: Setting::NotSet, - url: Setting::NotSet, - indexing_fragments: Setting::NotSet, - search_fragments: Setting::NotSet, - request: Setting::NotSet, - response: Setting::NotSet, - headers: Setting::NotSet, - distribution: Setting::some_or_not_set(search.distribution()), - search_embedder: Setting::Set(SubEmbeddingSettings::from_options( - search, - Setting::NotSet, - Setting::NotSet, - )), - indexing_embedder: Setting::Set(SubEmbeddingSettings::from_options( - index, - Setting::Set(prompt.template), - document_template_max_bytes, - )), - }, + super::EmbedderOptions::Composite(composite::EmbedderOptions { search, index }) => { + Self { + source: Setting::Set(EmbedderSource::Composite), + model: Setting::NotSet, + revision: Setting::NotSet, + pooling: Setting::NotSet, + api_key: Setting::NotSet, + dimensions: Setting::NotSet, + binary_quantized: Setting::some_or_not_set(quantized), + document_template: Setting::NotSet, + document_template_max_bytes: Setting::NotSet, + url: Setting::NotSet, + indexing_fragments: Setting::NotSet, + search_fragments: Setting::NotSet, + request: Setting::NotSet, + response: Setting::NotSet, + headers: Setting::NotSet, + distribution: Setting::some_or_not_set(search.distribution()), + search_embedder: Setting::Set(SubEmbeddingSettings::from_options( + search, + Setting::NotSet, + Setting::NotSet, + )), + indexing_embedder: Setting::Set(SubEmbeddingSettings::from_options( + index, + Setting::Set(prompt.template), + document_template_max_bytes, + )), + } + } } } } @@ -2212,7 +2206,7 @@ impl From for EmbeddingConfig { ) .into(), EmbedderSource::Composite => { - super::EmbedderOptions::Composite(super::composite::EmbedderOptions { + super::EmbedderOptions::Composite(composite::EmbedderOptions { // it is important to give the distribution to the search here, as this is from where we'll retrieve it search: SubEmbedderOptions::from_settings( search_embedder.set().unwrap(), @@ -2290,9 +2284,9 @@ impl SubEmbedderOptions { dimensions: Setting, distribution: Setting, ) -> Self { - let mut options = super::openai::EmbedderOptions::with_default_model(None); + let mut options = openai::EmbedderOptions::with_default_model(None); if let Some(model) = model.set() { - if let Some(model) = super::openai::EmbeddingModel::from_name(&model) { + if let Some(model) = openai::EmbeddingModel::from_name(&model) { options.embedding_model = model; } } @@ -2314,7 +2308,7 @@ impl SubEmbedderOptions { pooling: Setting, distribution: Setting, ) -> Self { - let mut options = super::hf::EmbedderOptions::default(); + let mut options = hf::EmbedderOptions::default(); if let Some(model) = model.set() { options.model = model; // Reset the revision if we are setting the model. @@ -2334,10 +2328,7 @@ impl SubEmbedderOptions { SubEmbedderOptions::HuggingFace(options) } fn user_provided(dimensions: usize, distribution: Setting) -> Self { - Self::UserProvided(super::manual::EmbedderOptions { - dimensions, - distribution: distribution.set(), - }) + Self::UserProvided(manual::EmbedderOptions { dimensions, distribution: distribution.set() }) } #[allow(clippy::too_many_arguments)] @@ -2352,7 +2343,7 @@ impl SubEmbedderOptions { dimensions: Setting, distribution: Setting, ) -> Self { - Self::Rest(super::rest::EmbedderOptions { + Self::Rest(rest::EmbedderOptions { api_key: api_key.set(), dimensions: dimensions.set(), url, @@ -2386,11 +2377,7 @@ impl SubEmbedderOptions { distribution: Setting, ) -> Self { let mut options: ollama::EmbedderOptions = - super::ollama::EmbedderOptions::with_default_model( - api_key.set(), - url.set(), - dimensions.set(), - ); + ollama::EmbedderOptions::with_default_model(api_key.set(), url.set(), dimensions.set()); if let Some(model) = model.set() { options.embedding_model = model; } diff --git a/crates/milli/src/vector/store.rs b/crates/milli/src/vector/store.rs new file mode 100644 index 000000000..72b2985d4 --- /dev/null +++ b/crates/milli/src/vector/store.rs @@ -0,0 +1,775 @@ +use hannoy::distances::{Cosine, Hamming}; +use hannoy::ItemId; +use heed::{RoTxn, RwTxn, Unspecified}; +use ordered_float::OrderedFloat; +use rand::SeedableRng as _; +use roaring::RoaringBitmap; + +use crate::progress::Progress; +use crate::vector::Embeddings; + +const HANNOY_EF_CONSTRUCTION: usize = 125; +const HANNOY_M: usize = 16; +const HANNOY_M0: usize = 32; + +pub struct VectorStore { + version: (u32, u32, u32), + database: hannoy::Database, + embedder_index: u8, + quantized: bool, +} + +impl VectorStore { + pub fn new( + version: (u32, u32, u32), + database: hannoy::Database, + embedder_index: u8, + quantized: bool, + ) -> Self { + Self { version, database, embedder_index, quantized } + } + + pub fn embedder_index(&self) -> u8 { + self.embedder_index + } + + /// Whether we must use the arroy to read the vector store. + pub fn version_uses_arroy(&self) -> bool { + let (major, minor, _patch) = self.version; + major == 1 && minor < 18 + } + + fn arroy_readers<'a, D: arroy::Distance>( + &'a self, + rtxn: &'a RoTxn<'a>, + db: arroy::Database, + ) -> impl Iterator, arroy::Error>> + 'a { + vector_store_range_for_embedder(self.embedder_index).filter_map(move |index| { + match arroy::Reader::open(rtxn, index, db) { + Ok(reader) => match reader.is_empty(rtxn) { + Ok(false) => Some(Ok(reader)), + Ok(true) => None, + Err(e) => Some(Err(e)), + }, + Err(arroy::Error::MissingMetadata(_)) => None, + Err(e) => Some(Err(e)), + } + }) + } + + fn readers<'a, D: hannoy::Distance>( + &'a self, + rtxn: &'a RoTxn<'a>, + db: hannoy::Database, + ) -> impl Iterator, hannoy::Error>> + 'a { + vector_store_range_for_embedder(self.embedder_index).filter_map(move |index| { + match hannoy::Reader::open(rtxn, index, db) { + Ok(reader) => match reader.is_empty(rtxn) { + Ok(false) => Some(Ok(reader)), + Ok(true) => None, + Err(e) => Some(Err(e)), + }, + Err(hannoy::Error::MissingMetadata(_)) => None, + Err(e) => Some(Err(e)), + } + }) + } + + /// The item ids that are present in the store specified by its id. + /// + /// The ids are accessed via a lambda to avoid lifetime shenanigans. + pub fn items_in_store( + &self, + rtxn: &RoTxn, + store_id: u8, + with_items: F, + ) -> crate::Result + where + F: FnOnce(&RoaringBitmap) -> O, + { + if self.version_uses_arroy() { + if self.quantized { + self._arroy_items_in_store(rtxn, self.arroy_quantized_db(), store_id, with_items) + .map_err(Into::into) + } else { + self._arroy_items_in_store(rtxn, self.arroy_angular_db(), store_id, with_items) + .map_err(Into::into) + } + } else if self.quantized { + self._items_in_store(rtxn, self.quantized_db(), store_id, with_items) + .map_err(Into::into) + } else { + self._items_in_store(rtxn, self.angular_db(), store_id, with_items).map_err(Into::into) + } + } + + fn _arroy_items_in_store( + &self, + rtxn: &RoTxn, + db: arroy::Database, + store_id: u8, + with_items: F, + ) -> Result + where + F: FnOnce(&RoaringBitmap) -> O, + { + let index = vector_store_for_embedder(self.embedder_index, store_id); + let reader = arroy::Reader::open(rtxn, index, db); + match reader { + Ok(reader) => Ok(with_items(reader.item_ids())), + Err(arroy::Error::MissingMetadata(_)) => Ok(with_items(&RoaringBitmap::new())), + Err(err) => Err(err), + } + } + + fn _items_in_store( + &self, + rtxn: &RoTxn, + db: hannoy::Database, + store_id: u8, + with_items: F, + ) -> Result + where + F: FnOnce(&RoaringBitmap) -> O, + { + let index = vector_store_for_embedder(self.embedder_index, store_id); + let reader = hannoy::Reader::open(rtxn, index, db); + match reader { + Ok(reader) => Ok(with_items(reader.item_ids())), + Err(hannoy::Error::MissingMetadata(_)) => Ok(with_items(&RoaringBitmap::new())), + Err(err) => Err(err), + } + } + + pub fn dimensions(&self, rtxn: &RoTxn) -> crate::Result> { + if self.version_uses_arroy() { + if self.quantized { + Ok(self + .arroy_readers(rtxn, self.arroy_quantized_db()) + .next() + .transpose()? + .map(|reader| reader.dimensions())) + } else { + Ok(self + .arroy_readers(rtxn, self.arroy_angular_db()) + .next() + .transpose()? + .map(|reader| reader.dimensions())) + } + } else if self.quantized { + Ok(self + .readers(rtxn, self.quantized_db()) + .next() + .transpose()? + .map(|reader| reader.dimensions())) + } else { + Ok(self + .readers(rtxn, self.angular_db()) + .next() + .transpose()? + .map(|reader| reader.dimensions())) + } + } + + pub fn convert_from_arroy(&self, wtxn: &mut RwTxn, progress: Progress) -> crate::Result<()> { + if self.quantized { + let dimensions = self + .arroy_readers(wtxn, self.arroy_quantized_db()) + .next() + .transpose()? + .map(|reader| reader.dimensions()); + + let Some(dimensions) = dimensions else { return Ok(()) }; + + for index in vector_store_range_for_embedder(self.embedder_index) { + let mut rng = rand::rngs::StdRng::from_entropy(); + let writer = hannoy::Writer::new(self.quantized_db(), index, dimensions); + let mut builder = writer.builder(&mut rng).progress(progress.clone()); + builder.prepare_arroy_conversion(wtxn)?; + builder.build::(wtxn)?; + } + + Ok(()) + } else { + let dimensions = self + .arroy_readers(wtxn, self.arroy_angular_db()) + .next() + .transpose()? + .map(|reader| reader.dimensions()); + + let Some(dimensions) = dimensions else { return Ok(()) }; + + for index in vector_store_range_for_embedder(self.embedder_index) { + let mut rng = rand::rngs::StdRng::from_entropy(); + let writer = hannoy::Writer::new(self.angular_db(), index, dimensions); + let mut builder = writer.builder(&mut rng).progress(progress.clone()); + builder.prepare_arroy_conversion(wtxn)?; + builder.build::(wtxn)?; + } + + Ok(()) + } + } + + #[allow(clippy::too_many_arguments)] + pub fn build_and_quantize( + &mut self, + wtxn: &mut RwTxn, + progress: Progress, + rng: &mut R, + dimension: usize, + quantizing: bool, + hannoy_memory: Option, + cancel: &(impl Fn() -> bool + Sync + Send), + ) -> Result<(), hannoy::Error> { + for index in vector_store_range_for_embedder(self.embedder_index) { + if self.quantized { + let writer = hannoy::Writer::new(self.quantized_db(), index, dimension); + if writer.need_build(wtxn)? { + let mut builder = writer.builder(rng).progress(progress.clone()); + builder + .available_memory(hannoy_memory.unwrap_or(usize::MAX)) + .cancel(cancel) + .ef_construction(HANNOY_EF_CONSTRUCTION) + .build::(wtxn)?; + } else if writer.is_empty(wtxn)? { + continue; + } + } else { + let writer = hannoy::Writer::new(self.angular_db(), index, dimension); + // If we are quantizing the databases, we can't know from meilisearch + // if the db was empty but still contained the wrong metadata, thus we need + // to quantize everything and can't stop early. Since this operation can + // only happens once in the life of an embedder, it's not very performances + // sensitive. + if quantizing && !self.quantized { + let writer = writer.prepare_changing_distance::(wtxn)?; + let mut builder = writer.builder(rng).progress(progress.clone()); + builder + .available_memory(hannoy_memory.unwrap_or(usize::MAX)) + .cancel(cancel) + .ef_construction(HANNOY_EF_CONSTRUCTION) + .build::(wtxn)?; + } else if writer.need_build(wtxn)? { + let mut builder = writer.builder(rng).progress(progress.clone()); + builder + .available_memory(hannoy_memory.unwrap_or(usize::MAX)) + .cancel(cancel) + .ef_construction(HANNOY_EF_CONSTRUCTION) + .build::(wtxn)?; + } else if writer.is_empty(wtxn)? { + continue; + } + } + } + Ok(()) + } + + /// Overwrite all the embeddings associated with the index and item ID. + /// /!\ It won't remove embeddings after the last passed embedding, which can leave stale embeddings. + /// You should call `del_items` on the `item_id` before calling this method. + /// /!\ Cannot insert more than u8::MAX embeddings; after inserting u8::MAX embeddings, all the remaining ones will be silently ignored. + pub fn add_items( + &self, + wtxn: &mut RwTxn, + item_id: hannoy::ItemId, + embeddings: &Embeddings, + ) -> Result<(), hannoy::Error> { + let dimension = embeddings.dimension(); + for (index, vector) in + vector_store_range_for_embedder(self.embedder_index).zip(embeddings.iter()) + { + if self.quantized { + hannoy::Writer::new(self.quantized_db(), index, dimension) + .add_item(wtxn, item_id, vector)? + } else { + hannoy::Writer::new(self.angular_db(), index, dimension) + .add_item(wtxn, item_id, vector)? + } + } + Ok(()) + } + + /// Add one document int for this index where we can find an empty spot. + pub fn add_item( + &self, + wtxn: &mut RwTxn, + item_id: hannoy::ItemId, + vector: &[f32], + ) -> Result<(), hannoy::Error> { + if self.quantized { + self._add_item(wtxn, self.quantized_db(), item_id, vector) + } else { + self._add_item(wtxn, self.angular_db(), item_id, vector) + } + } + + fn _add_item( + &self, + wtxn: &mut RwTxn, + db: hannoy::Database, + item_id: hannoy::ItemId, + vector: &[f32], + ) -> Result<(), hannoy::Error> { + let dimension = vector.len(); + + for index in vector_store_range_for_embedder(self.embedder_index) { + let writer = hannoy::Writer::new(db, index, dimension); + if !writer.contains_item(wtxn, item_id)? { + writer.add_item(wtxn, item_id, vector)?; + break; + } + } + Ok(()) + } + + /// Add a vector associated with a document in store specified by its id. + /// + /// Any existing vector associated with the document in the store will be replaced by the new vector. + pub fn add_item_in_store( + &self, + wtxn: &mut RwTxn, + item_id: hannoy::ItemId, + store_id: u8, + vector: &[f32], + ) -> Result<(), hannoy::Error> { + if self.quantized { + self._add_item_in_store(wtxn, self.quantized_db(), item_id, store_id, vector) + } else { + self._add_item_in_store(wtxn, self.angular_db(), item_id, store_id, vector) + } + } + + fn _add_item_in_store( + &self, + wtxn: &mut RwTxn, + db: hannoy::Database, + item_id: hannoy::ItemId, + store_id: u8, + vector: &[f32], + ) -> Result<(), hannoy::Error> { + let dimension = vector.len(); + + let index = vector_store_for_embedder(self.embedder_index, store_id); + let writer = hannoy::Writer::new(db, index, dimension); + writer.add_item(wtxn, item_id, vector) + } + + /// Delete all embeddings from a specific `item_id` + pub fn del_items( + &self, + wtxn: &mut RwTxn, + dimension: usize, + item_id: hannoy::ItemId, + ) -> Result<(), hannoy::Error> { + for index in vector_store_range_for_embedder(self.embedder_index) { + if self.quantized { + let writer = hannoy::Writer::new(self.quantized_db(), index, dimension); + writer.del_item(wtxn, item_id)?; + } else { + let writer = hannoy::Writer::new(self.angular_db(), index, dimension); + writer.del_item(wtxn, item_id)?; + } + } + + Ok(()) + } + + /// Removes the item specified by its id from the store specified by its id. + /// + /// Returns whether the item was removed. + /// + /// # Warning + /// + /// - This function will silently fail to remove the item if used against an arroy database that was never built. + pub fn del_item_in_store( + &self, + wtxn: &mut RwTxn, + item_id: hannoy::ItemId, + store_id: u8, + dimensions: usize, + ) -> Result { + if self.quantized { + self._del_item_in_store(wtxn, self.quantized_db(), item_id, store_id, dimensions) + } else { + self._del_item_in_store(wtxn, self.angular_db(), item_id, store_id, dimensions) + } + } + + fn _del_item_in_store( + &self, + wtxn: &mut RwTxn, + db: hannoy::Database, + item_id: hannoy::ItemId, + store_id: u8, + dimensions: usize, + ) -> Result { + let index = vector_store_for_embedder(self.embedder_index, store_id); + let writer = hannoy::Writer::new(db, index, dimensions); + writer.del_item(wtxn, item_id) + } + + /// Removes all items from the store specified by its id. + /// + /// # Warning + /// + /// - This function will silently fail to remove the items if used against an arroy database that was never built. + pub fn clear_store( + &self, + wtxn: &mut RwTxn, + store_id: u8, + dimensions: usize, + ) -> Result<(), hannoy::Error> { + if self.quantized { + self._clear_store(wtxn, self.quantized_db(), store_id, dimensions) + } else { + self._clear_store(wtxn, self.angular_db(), store_id, dimensions) + } + } + + fn _clear_store( + &self, + wtxn: &mut RwTxn, + db: hannoy::Database, + store_id: u8, + dimensions: usize, + ) -> Result<(), hannoy::Error> { + let index = vector_store_for_embedder(self.embedder_index, store_id); + let writer = hannoy::Writer::new(db, index, dimensions); + writer.clear(wtxn) + } + + /// Delete one item from its value. + pub fn del_item( + &self, + wtxn: &mut RwTxn, + item_id: hannoy::ItemId, + vector: &[f32], + ) -> Result { + if self.quantized { + self._del_item(wtxn, self.quantized_db(), item_id, vector) + } else { + self._del_item(wtxn, self.angular_db(), item_id, vector) + } + } + + fn _del_item( + &self, + wtxn: &mut RwTxn, + db: hannoy::Database, + item_id: hannoy::ItemId, + vector: &[f32], + ) -> Result { + let dimension = vector.len(); + + for index in vector_store_range_for_embedder(self.embedder_index) { + let writer = hannoy::Writer::new(db, index, dimension); + if writer.contains_item(wtxn, item_id)? { + return writer.del_item(wtxn, item_id); + } + } + Ok(false) + } + + pub fn clear(&self, wtxn: &mut RwTxn, dimension: usize) -> Result<(), hannoy::Error> { + for index in vector_store_range_for_embedder(self.embedder_index) { + if self.quantized { + let writer = hannoy::Writer::new(self.quantized_db(), index, dimension); + if writer.is_empty(wtxn)? { + continue; + } + writer.clear(wtxn)?; + } else { + let writer = hannoy::Writer::new(self.angular_db(), index, dimension); + if writer.is_empty(wtxn)? { + continue; + } + writer.clear(wtxn)?; + } + } + Ok(()) + } + + pub fn contains_item( + &self, + rtxn: &RoTxn, + dimension: usize, + item: hannoy::ItemId, + ) -> crate::Result { + for index in vector_store_range_for_embedder(self.embedder_index) { + let contains = if self.version_uses_arroy() { + if self.quantized { + let writer = arroy::Writer::new(self.arroy_quantized_db(), index, dimension); + if writer.is_empty(rtxn)? { + continue; + } + writer.contains_item(rtxn, item)? + } else { + let writer = arroy::Writer::new(self.arroy_angular_db(), index, dimension); + if writer.is_empty(rtxn)? { + continue; + } + writer.contains_item(rtxn, item)? + } + } else if self.quantized { + let writer = hannoy::Writer::new(self.quantized_db(), index, dimension); + if writer.is_empty(rtxn)? { + continue; + } + writer.contains_item(rtxn, item)? + } else { + let writer = hannoy::Writer::new(self.angular_db(), index, dimension); + if writer.is_empty(rtxn)? { + continue; + } + writer.contains_item(rtxn, item)? + }; + if contains { + return Ok(contains); + } + } + Ok(false) + } + + pub fn nns_by_item( + &self, + rtxn: &RoTxn, + item: ItemId, + limit: usize, + filter: Option<&RoaringBitmap>, + ) -> crate::Result> { + if self.version_uses_arroy() { + if self.quantized { + self._arroy_nns_by_item(rtxn, self.arroy_quantized_db(), item, limit, filter) + .map_err(Into::into) + } else { + self._arroy_nns_by_item(rtxn, self.arroy_angular_db(), item, limit, filter) + .map_err(Into::into) + } + } else if self.quantized { + self._nns_by_item(rtxn, self.quantized_db(), item, limit, filter).map_err(Into::into) + } else { + self._nns_by_item(rtxn, self.angular_db(), item, limit, filter).map_err(Into::into) + } + } + + fn _arroy_nns_by_item( + &self, + rtxn: &RoTxn, + db: arroy::Database, + item: ItemId, + limit: usize, + filter: Option<&RoaringBitmap>, + ) -> Result, arroy::Error> { + let mut results = Vec::new(); + + for reader in self.arroy_readers(rtxn, db) { + let reader = reader?; + let mut searcher = reader.nns(limit); + if let Some(filter) = filter { + if reader.item_ids().is_disjoint(filter) { + continue; + } + searcher.candidates(filter); + } + + if let Some(mut ret) = searcher.by_item(rtxn, item)? { + results.append(&mut ret); + } + } + results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance)); + Ok(results) + } + + fn _nns_by_item( + &self, + rtxn: &RoTxn, + db: hannoy::Database, + item: ItemId, + limit: usize, + filter: Option<&RoaringBitmap>, + ) -> Result, hannoy::Error> { + let mut results = Vec::new(); + + for reader in self.readers(rtxn, db) { + let reader = reader?; + let mut searcher = reader.nns(limit); + searcher.ef_search((limit * 10).max(100)); // TODO find better ef + if let Some(filter) = filter { + searcher.candidates(filter); + } + + if let Some(mut ret) = searcher.by_item(rtxn, item)? { + results.append(&mut ret); + } + } + results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance)); + Ok(results) + } + + pub fn nns_by_vector( + &self, + rtxn: &RoTxn, + vector: &[f32], + limit: usize, + filter: Option<&RoaringBitmap>, + ) -> crate::Result> { + if self.version_uses_arroy() { + if self.quantized { + self._arroy_nns_by_vector(rtxn, self.arroy_quantized_db(), vector, limit, filter) + .map_err(Into::into) + } else { + self._arroy_nns_by_vector(rtxn, self.arroy_angular_db(), vector, limit, filter) + .map_err(Into::into) + } + } else if self.quantized { + self._nns_by_vector(rtxn, self.quantized_db(), vector, limit, filter) + .map_err(Into::into) + } else { + self._nns_by_vector(rtxn, self.angular_db(), vector, limit, filter).map_err(Into::into) + } + } + + fn _arroy_nns_by_vector( + &self, + rtxn: &RoTxn, + db: arroy::Database, + vector: &[f32], + limit: usize, + filter: Option<&RoaringBitmap>, + ) -> Result, arroy::Error> { + let mut results = Vec::new(); + + for reader in self.arroy_readers(rtxn, db) { + let reader = reader?; + let mut searcher = reader.nns(limit); + if let Some(filter) = filter { + if reader.item_ids().is_disjoint(filter) { + continue; + } + searcher.candidates(filter); + } + + results.append(&mut searcher.by_vector(rtxn, vector)?); + } + + results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance)); + + Ok(results) + } + + fn _nns_by_vector( + &self, + rtxn: &RoTxn, + db: hannoy::Database, + vector: &[f32], + limit: usize, + filter: Option<&RoaringBitmap>, + ) -> Result, hannoy::Error> { + let mut results = Vec::new(); + + for reader in self.readers(rtxn, db) { + let reader = reader?; + let mut searcher = reader.nns(limit); + searcher.ef_search((limit * 10).max(100)); // TODO find better ef + if let Some(filter) = filter { + searcher.candidates(filter); + } + + results.append(&mut searcher.by_vector(rtxn, vector)?); + } + + results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance)); + + Ok(results) + } + + pub fn item_vectors(&self, rtxn: &RoTxn, item_id: u32) -> crate::Result>> { + let mut vectors = Vec::new(); + + if self.version_uses_arroy() { + if self.quantized { + for reader in self.arroy_readers(rtxn, self.arroy_quantized_db()) { + if let Some(vec) = reader?.item_vector(rtxn, item_id)? { + vectors.push(vec); + } + } + } else { + for reader in self.arroy_readers(rtxn, self.arroy_angular_db()) { + if let Some(vec) = reader?.item_vector(rtxn, item_id)? { + vectors.push(vec); + } + } + } + } else if self.quantized { + for reader in self.readers(rtxn, self.quantized_db()) { + if let Some(vec) = reader?.item_vector(rtxn, item_id)? { + vectors.push(vec); + } + } + } else { + for reader in self.readers(rtxn, self.angular_db()) { + if let Some(vec) = reader?.item_vector(rtxn, item_id)? { + vectors.push(vec); + } + } + } + + Ok(vectors) + } + + fn arroy_angular_db(&self) -> arroy::Database { + self.database.remap_types() + } + + fn arroy_quantized_db(&self) -> arroy::Database { + self.database.remap_types() + } + + fn angular_db(&self) -> hannoy::Database { + self.database.remap_data_type() + } + + fn quantized_db(&self) -> hannoy::Database { + self.database.remap_data_type() + } + + pub fn aggregate_stats( + &self, + rtxn: &RoTxn, + stats: &mut HannoyStats, + ) -> Result<(), hannoy::Error> { + if self.quantized { + for reader in self.readers(rtxn, self.quantized_db()) { + let reader = reader?; + let documents = reader.item_ids(); + stats.documents |= documents; + stats.number_of_embeddings += documents.len(); + } + } else { + for reader in self.readers(rtxn, self.angular_db()) { + let reader = reader?; + let documents = reader.item_ids(); + stats.documents |= documents; + stats.number_of_embeddings += documents.len(); + } + } + + Ok(()) + } +} + +#[derive(Debug, Default, Clone)] +pub struct HannoyStats { + pub number_of_embeddings: u64, + pub documents: RoaringBitmap, +} + +fn vector_store_range_for_embedder(embedder_id: u8) -> impl Iterator { + (0..=u8::MAX).map(move |store_id| vector_store_for_embedder(embedder_id, store_id)) +} + +fn vector_store_for_embedder(embedder_id: u8, store_id: u8) -> u16 { + let embedder_id = (embedder_id as u16) << 8; + embedder_id | (store_id as u16) +}