Merge pull request #5767 from meilisearch/arroy-becomes-hannoy

Add index setting to switch from arroy to hannoy
This commit is contained in:
Tamo
2025-09-09 17:59:45 +00:00
committed by GitHub
80 changed files with 3388 additions and 1943 deletions

View File

@ -1,17 +1,13 @@
use crate::{
distance_between_two_points,
heed_codec::facet::{FieldDocIdFacetCodec, OrderedF64Codec},
lat_lng_to_xyz,
search::new::{facet_string_values, facet_values_prefix_key},
GeoPoint, Index,
};
use heed::{
types::{Bytes, Unit},
RoPrefix, RoTxn,
};
use std::collections::VecDeque;
use heed::types::{Bytes, Unit};
use heed::{RoPrefix, RoTxn};
use roaring::RoaringBitmap;
use rstar::RTree;
use std::collections::VecDeque;
use crate::heed_codec::facet::{FieldDocIdFacetCodec, OrderedF64Codec};
use crate::search::new::{facet_string_values, facet_values_prefix_key};
use crate::{distance_between_two_points, lat_lng_to_xyz, GeoPoint, Index};
#[derive(Debug, Clone, Copy)]
pub struct GeoSortParameter {

View File

@ -1,19 +1,16 @@
use std::collections::{BTreeSet, VecDeque};
use crate::{
constants::RESERVED_GEO_FIELD_NAME,
documents::{geo_sort::next_bucket, GeoSortParameter},
heed_codec::{
facet::{FacetGroupKeyCodec, FacetGroupValueCodec},
BytesRefCodec,
},
is_faceted,
search::facet::{ascending_facet_sort, descending_facet_sort},
AscDesc, DocumentId, Member, UserError,
};
use heed::Database;
use roaring::RoaringBitmap;
use crate::constants::RESERVED_GEO_FIELD_NAME;
use crate::documents::geo_sort::next_bucket;
use crate::documents::GeoSortParameter;
use crate::heed_codec::facet::{FacetGroupKeyCodec, FacetGroupValueCodec};
use crate::heed_codec::BytesRefCodec;
use crate::search::facet::{ascending_facet_sort, descending_facet_sort};
use crate::{is_faceted, AscDesc, DocumentId, Member, UserError};
#[derive(Debug, Clone, Copy)]
enum AscDescId {
Facet { field_id: u16, ascending: bool },

View File

@ -78,6 +78,8 @@ pub enum InternalError {
#[error(transparent)]
ArroyError(#[from] arroy::Error),
#[error(transparent)]
HannoyError(#[from] hannoy::Error),
#[error(transparent)]
VectorEmbeddingError(#[from] crate::vector::Error),
}
@ -353,7 +355,7 @@ and can not be more than 511 bytes.", .document_id.to_string()
context: crate::vector::settings::NestingContext,
field: crate::vector::settings::MetaEmbeddingSetting,
},
#[error("`.embedders.{embedder_name}.model`: Invalid model `{model}` for OpenAI. Supported models: {:?}", crate::vector::openai::EmbeddingModel::supported_models())]
#[error("`.embedders.{embedder_name}.model`: Invalid model `{model}` for OpenAI. Supported models: {:?}", crate::vector::embedder::openai::EmbeddingModel::supported_models())]
InvalidOpenAiModel { embedder_name: String, model: String },
#[error("`.embedders.{embedder_name}`: Missing field `{field}` (note: this field is mandatory for source `{source_}`)")]
MissingFieldForSource {
@ -441,6 +443,29 @@ impl From<arroy::Error> for Error {
}
}
impl From<hannoy::Error> for Error {
fn from(value: hannoy::Error) -> Self {
match value {
hannoy::Error::Heed(heed) => heed.into(),
hannoy::Error::Io(io) => io.into(),
hannoy::Error::InvalidVecDimension { expected, received } => {
Error::UserError(UserError::InvalidVectorDimensions { expected, found: received })
}
hannoy::Error::BuildCancelled => Error::InternalError(InternalError::AbortedIndexation),
hannoy::Error::DatabaseFull
| hannoy::Error::InvalidItemAppend
| hannoy::Error::UnmatchingDistance { .. }
| hannoy::Error::NeedBuild(_)
| hannoy::Error::MissingKey { .. }
| hannoy::Error::MissingMetadata(_)
| hannoy::Error::UnknownVersion { .. }
| hannoy::Error::CannotDecodeKeyMode { .. } => {
Error::InternalError(InternalError::HannoyError(value))
}
}
}
}
#[derive(Error, Debug)]
pub enum GeoError {
#[error("The `_geo` field in the document with the id: `{document_id}` is not an object. Was expecting an object with the `_geo.lat` and `_geo.lng` fields but instead got `{value}`.")]

View File

@ -31,7 +31,7 @@ use crate::prompt::PromptData;
use crate::proximity::ProximityPrecision;
use crate::update::new::StdResult;
use crate::vector::db::IndexEmbeddingConfigs;
use crate::vector::{ArroyStats, ArroyWrapper, Embedding};
use crate::vector::{Embedding, VectorStore, VectorStoreBackend, VectorStoreStats};
use crate::{
default_criteria, CboRoaringBitmapCodec, Criterion, DocumentId, ExternalDocumentsIds,
FacetDistribution, FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldIdWordCountCodec,
@ -87,6 +87,7 @@ pub mod main_key {
pub const DOCUMENTS_STATS: &str = "documents_stats";
pub const DISABLED_TYPOS_TERMS: &str = "disabled_typos_terms";
pub const CHAT: &str = "chat";
pub const VECTOR_STORE_BACKEND: &str = "vector_store_backend";
}
pub mod db_name {
@ -113,7 +114,7 @@ pub mod db_name {
pub const FIELD_ID_DOCID_FACET_F64S: &str = "field-id-docid-facet-f64s";
pub const FIELD_ID_DOCID_FACET_STRINGS: &str = "field-id-docid-facet-strings";
pub const VECTOR_EMBEDDER_CATEGORY_ID: &str = "vector-embedder-category-id";
pub const VECTOR_ARROY: &str = "vector-arroy";
pub const VECTOR_STORE: &str = "vector-arroy";
pub const DOCUMENTS: &str = "documents";
}
const NUMBER_OF_DBS: u32 = 25;
@ -177,10 +178,10 @@ pub struct Index {
/// Maps the document id, the facet field id and the strings.
pub field_id_docid_facet_strings: Database<FieldDocIdFacetStringCodec, Str>,
/// Maps an embedder name to its id in the arroy store.
/// Maps an embedder name to its id in the vector store.
pub(crate) embedder_category_id: Database<Unspecified, Unspecified>,
/// Vector store based on arroy™.
pub vector_arroy: arroy::Database<Unspecified>,
/// Vector store based on hannoy™.
pub vector_store: hannoy::Database<Unspecified>,
/// Maps the document id to the document as an obkv store.
pub(crate) documents: Database<BEU32, ObkvCodec>,
@ -237,7 +238,7 @@ impl Index {
// vector stuff
let embedder_category_id =
env.create_database(&mut wtxn, Some(VECTOR_EMBEDDER_CATEGORY_ID))?;
let vector_arroy = env.create_database(&mut wtxn, Some(VECTOR_ARROY))?;
let vector_store = env.create_database(&mut wtxn, Some(VECTOR_STORE))?;
let documents = env.create_database(&mut wtxn, Some(DOCUMENTS))?;
@ -264,7 +265,7 @@ impl Index {
facet_id_is_empty_docids,
field_id_docid_facet_f64s,
field_id_docid_facet_strings,
vector_arroy,
vector_store,
embedder_category_id,
documents,
};
@ -454,6 +455,34 @@ impl Index {
self.main.remap_types::<Str, VersionCodec>().get(rtxn, main_key::VERSION_KEY)
}
/* vector store */
/// Writes the vector store
pub(crate) fn put_vector_store(
&self,
wtxn: &mut RwTxn<'_>,
backend: VectorStoreBackend,
) -> Result<()> {
Ok(self.main.remap_types::<Str, SerdeJson<VectorStoreBackend>>().put(
wtxn,
main_key::VECTOR_STORE_BACKEND,
&backend,
)?)
}
pub fn get_vector_store(&self, rtxn: &RoTxn<'_>) -> Result<Option<VectorStoreBackend>> {
Ok(self
.main
.remap_types::<Str, SerdeJson<VectorStoreBackend>>()
.get(rtxn, main_key::VECTOR_STORE_BACKEND)?)
}
pub(crate) fn delete_vector_store(&self, wtxn: &mut RwTxn<'_>) -> Result<bool> {
Ok(self
.main
.remap_types::<Str, SerdeJson<VectorStoreBackend>>()
.delete(wtxn, main_key::VECTOR_STORE_BACKEND)?)
}
/* documents ids */
/// Writes the documents ids that corresponds to the user-ids-documents-ids FST.
@ -1769,11 +1798,14 @@ impl Index {
) -> Result<BTreeMap<String, EmbeddingsWithMetadata>> {
let mut res = BTreeMap::new();
let embedders = self.embedding_configs();
let backend = self.get_vector_store(rtxn)?.unwrap_or_default();
for config in embedders.embedding_configs(rtxn)? {
let embedder_info = embedders.embedder_info(rtxn, &config.name)?.unwrap();
let has_fragments = config.config.embedder_options.has_fragments();
let reader = ArroyWrapper::new(
self.vector_arroy,
let reader = VectorStore::new(
backend,
self.vector_store,
embedder_info.embedder_id,
config.config.quantized(),
);
@ -1792,13 +1824,19 @@ impl Index {
Ok(PrefixSettings { compute_prefixes, max_prefix_length: 4, prefix_count_threshold: 100 })
}
pub fn arroy_stats(&self, rtxn: &RoTxn<'_>) -> Result<ArroyStats> {
let mut stats = ArroyStats::default();
pub fn vector_store_stats(&self, rtxn: &RoTxn<'_>) -> Result<VectorStoreStats> {
let mut stats = VectorStoreStats::default();
let embedding_configs = self.embedding_configs();
let backend = self.get_vector_store(rtxn)?.unwrap_or_default();
for config in embedding_configs.embedding_configs(rtxn)? {
let embedder_id = embedding_configs.embedder_id(rtxn, &config.name)?.unwrap();
let reader =
ArroyWrapper::new(self.vector_arroy, embedder_id, config.config.quantized());
let reader = VectorStore::new(
backend,
self.vector_store,
embedder_id,
config.config.quantized(),
);
reader.aggregate_stats(rtxn, &mut stats)?;
}
Ok(stats)
@ -1842,7 +1880,7 @@ impl Index {
facet_id_is_empty_docids,
field_id_docid_facet_f64s,
field_id_docid_facet_strings,
vector_arroy,
vector_store,
embedder_category_id,
documents,
} = self;
@ -1913,7 +1951,7 @@ impl Index {
"field_id_docid_facet_strings",
field_id_docid_facet_strings.stat(rtxn).map(compute_size)?,
);
sizes.insert("vector_arroy", vector_arroy.stat(rtxn).map(compute_size)?);
sizes.insert("vector_store", vector_store.stat(rtxn).map(compute_size)?);
sizes.insert("embedder_category_id", embedder_category_id.stat(rtxn).map(compute_size)?);
sizes.insert("documents", documents.stat(rtxn).map(compute_size)?);

View File

@ -53,7 +53,7 @@ pub use search::new::{
};
use serde_json::Value;
pub use thread_pool_no_abort::{PanicCatched, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
pub use {arroy, charabia as tokenizer, heed, rhai};
pub use {arroy, charabia as tokenizer, hannoy, heed, rhai};
pub use self::asc_desc::{AscDesc, AscDescError, Member, SortError};
pub use self::attribute_patterns::{AttributePatterns, PatternMatch};

View File

@ -5,7 +5,7 @@ use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use enum_iterator::Sequence;
use enum_iterator::Sequence as _;
use indexmap::IndexMap;
use itertools::Itertools;
use serde::Serialize;
@ -278,6 +278,30 @@ impl<U: Send + Sync + 'static> Step for VariableNameStep<U> {
}
}
// Integration with steppe
impl steppe::Progress for Progress {
fn update(&self, sub_progress: impl steppe::Step) {
self.update_progress(Compat(sub_progress));
}
}
struct Compat<T: steppe::Step>(T);
impl<T: steppe::Step> Step for Compat<T> {
fn name(&self) -> Cow<'static, str> {
self.0.name()
}
fn current(&self) -> u32 {
self.0.current().try_into().unwrap_or(u32::MAX)
}
fn total(&self) -> u32 {
self.0.total().try_into().unwrap_or(u32::MAX)
}
}
impl Step for arroy::MainStep {
fn name(&self) -> Cow<'static, str> {
match self {
@ -292,6 +316,7 @@ impl Step for arroy::MainStep {
arroy::MainStep::WritingNodesToDatabase => "writing nodes to database",
arroy::MainStep::DeleteExtraneousTrees => "delete extraneous trees",
arroy::MainStep::WriteTheMetadata => "write the metadata",
arroy::MainStep::ConvertingHannoyToArroy => "converting hannoy to arroy",
}
.into()
}

View File

@ -3,7 +3,7 @@ use roaring::{MultiOps, RoaringBitmap};
use crate::error::{DidYouMean, Error};
use crate::vector::db::IndexEmbeddingConfig;
use crate::vector::{ArroyStats, ArroyWrapper};
use crate::vector::{VectorStore, VectorStoreStats};
use crate::Index;
#[derive(Debug, thiserror::Error)]
@ -82,6 +82,7 @@ fn evaluate_inner(
embedding_configs: &[IndexEmbeddingConfig],
filter: &VectorFilter<'_>,
) -> crate::Result<RoaringBitmap> {
let backend = index.get_vector_store(rtxn)?.unwrap_or_default();
let embedder_name = embedder.value();
let available_embedders =
|| embedding_configs.iter().map(|c| c.name.clone()).collect::<Vec<_>>();
@ -96,8 +97,9 @@ fn evaluate_inner(
.embedder_info(rtxn, embedder_name)?
.ok_or_else(|| EmbedderDoesNotExist { embedder, available: available_embedders() })?;
let arroy_wrapper = ArroyWrapper::new(
index.vector_arroy,
let vector_store = VectorStore::new(
backend,
index.vector_store,
embedder_info.embedder_id,
embedding_config.config.quantized(),
);
@ -122,7 +124,7 @@ fn evaluate_inner(
})?;
let user_provided_docids = embedder_info.embedding_status.user_provided_docids();
arroy_wrapper.items_in_store(rtxn, fragment_config.id, |bitmap| {
vector_store.items_in_store(rtxn, fragment_config.id, |bitmap| {
bitmap.clone() - user_provided_docids
})?
}
@ -132,8 +134,8 @@ fn evaluate_inner(
}
let user_provided_docids = embedder_info.embedding_status.user_provided_docids();
let mut stats = ArroyStats::default();
arroy_wrapper.aggregate_stats(rtxn, &mut stats)?;
let mut stats = VectorStoreStats::default();
vector_store.aggregate_stats(rtxn, &mut stats)?;
stats.documents - user_provided_docids.clone()
}
VectorFilter::UserProvided => {
@ -141,14 +143,14 @@ fn evaluate_inner(
user_provided_docids.clone()
}
VectorFilter::Regenerate => {
let mut stats = ArroyStats::default();
arroy_wrapper.aggregate_stats(rtxn, &mut stats)?;
let mut stats = VectorStoreStats::default();
vector_store.aggregate_stats(rtxn, &mut stats)?;
let skip_regenerate = embedder_info.embedding_status.skip_regenerate_docids();
stats.documents - skip_regenerate
}
VectorFilter::None => {
let mut stats = ArroyStats::default();
arroy_wrapper.aggregate_stats(rtxn, &mut stats)?;
let mut stats = VectorStoreStats::default();
vector_store.aggregate_stats(rtxn, &mut stats)?;
stats.documents
}
};

View File

@ -6,7 +6,7 @@ use roaring::RoaringBitmap;
use super::ranking_rules::{RankingRule, RankingRuleOutput, RankingRuleQueryTrait};
use super::VectorStoreStats;
use crate::score_details::{self, ScoreDetails};
use crate::vector::{ArroyWrapper, DistributionShift, Embedder};
use crate::vector::{DistributionShift, Embedder, VectorStore};
use crate::{DocumentId, Result, SearchContext, SearchLogger};
pub struct VectorSort<Q: RankingRuleQueryTrait> {
@ -54,9 +54,11 @@ impl<Q: RankingRuleQueryTrait> VectorSort<Q> {
vector_candidates: &RoaringBitmap,
) -> Result<()> {
let target = &self.target;
let backend = ctx.index.get_vector_store(ctx.txn)?.unwrap_or_default();
let before = Instant::now();
let reader = ArroyWrapper::new(ctx.index.vector_arroy, self.embedder_index, self.quantized);
let reader =
VectorStore::new(backend, ctx.index.vector_store, self.embedder_index, self.quantized);
let results = reader.nns_by_vector(ctx.txn, target, self.limit, Some(vector_candidates))?;
self.cached_sorted_docids = results.into_iter();
*ctx.vector_store_stats.get_or_insert_default() += VectorStoreStats {

View File

@ -3,7 +3,7 @@ use std::sync::Arc;
use roaring::RoaringBitmap;
use crate::score_details::{self, ScoreDetails};
use crate::vector::{ArroyWrapper, Embedder};
use crate::vector::{Embedder, VectorStore};
use crate::{filtered_universe, DocumentId, Filter, Index, Result, SearchResult};
pub struct Similar<'a> {
@ -72,7 +72,10 @@ impl<'a> Similar<'a> {
crate::UserError::InvalidSimilarEmbedder(self.embedder_name.to_owned())
})?;
let reader = ArroyWrapper::new(self.index.vector_arroy, embedder_index, self.quantized);
let backend = self.index.get_vector_store(self.rtxn)?.unwrap_or_default();
let reader =
VectorStore::new(backend, self.index.vector_store, embedder_index, self.quantized);
let results = reader.nns_by_item(
self.rtxn,
self.id,

View File

@ -2,7 +2,8 @@ use heed::RwTxn;
use roaring::RoaringBitmap;
use time::OffsetDateTime;
use crate::{database_stats::DatabaseStats, FieldDistribution, Index, Result};
use crate::database_stats::DatabaseStats;
use crate::{FieldDistribution, Index, Result};
pub struct ClearDocuments<'t, 'i> {
wtxn: &'t mut RwTxn<'i>,
@ -45,7 +46,7 @@ impl<'t, 'i> ClearDocuments<'t, 'i> {
facet_id_is_empty_docids,
field_id_docid_facet_f64s,
field_id_docid_facet_strings,
vector_arroy,
vector_store,
embedder_category_id: _,
documents,
} = self.index;
@ -88,7 +89,7 @@ impl<'t, 'i> ClearDocuments<'t, 'i> {
field_id_docid_facet_f64s.clear(self.wtxn)?;
field_id_docid_facet_strings.clear(self.wtxn)?;
// vector
vector_arroy.clear(self.wtxn)?;
vector_store.clear(self.wtxn)?;
documents.clear(self.wtxn)?;

View File

@ -2,9 +2,8 @@ use std::collections::BTreeSet;
use std::fs::File;
use std::io::{self, BufReader};
use heed::{BytesDecode, BytesEncode};
use heed::BytesDecode;
use obkv::KvReaderU16;
use roaring::RoaringBitmap;
use super::helpers::{
create_sorter, create_writer, try_split_array_at, writer_into_reader, GrenadParameters,
@ -16,7 +15,7 @@ use crate::index::db_name::DOCID_WORD_POSITIONS;
use crate::update::del_add::{is_noop_del_add_obkv, DelAdd, KvReaderDelAdd, KvWriterDelAdd};
use crate::update::index_documents::helpers::sorter_into_reader;
use crate::update::settings::InnerIndexSettingsDiff;
use crate::{CboRoaringBitmapCodec, DocumentId, FieldId, Result};
use crate::{DocumentId, FieldId, Result};
/// Extracts the word and the documents ids where this word appear.
///
@ -201,45 +200,3 @@ fn words_into_sorter(
Ok(())
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing::extract")]
fn docids_into_writers<W>(
word: &str,
deletions: &RoaringBitmap,
additions: &RoaringBitmap,
writer: &mut grenad::Writer<W>,
) -> Result<()>
where
W: std::io::Write,
{
if deletions == additions {
// if the same value is deleted and added, do nothing.
return Ok(());
}
// Write each value in the same KvDelAdd before inserting it in the final writer.
let mut obkv = KvWriterDelAdd::memory();
// deletions:
if !deletions.is_empty() && !deletions.is_subset(additions) {
obkv.insert(
DelAdd::Deletion,
CboRoaringBitmapCodec::bytes_encode(deletions).map_err(|_| {
SerializationError::Encoding { db_name: Some(DOCID_WORD_POSITIONS) }
})?,
)?;
}
// additions:
if !additions.is_empty() {
obkv.insert(
DelAdd::Addition,
CboRoaringBitmapCodec::bytes_encode(additions).map_err(|_| {
SerializationError::Encoding { db_name: Some(DOCID_WORD_POSITIONS) }
})?,
)?;
}
// insert everything in the same writer.
writer.insert(word.as_bytes(), obkv.into_inner().unwrap())?;
Ok(())
}

View File

@ -39,7 +39,7 @@ use crate::update::{
IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst,
};
use crate::vector::db::EmbedderInfo;
use crate::vector::{ArroyWrapper, RuntimeEmbedders};
use crate::vector::{RuntimeEmbedders, VectorStore};
use crate::{CboRoaringBitmapCodec, Index, Result, UserError};
static MERGED_DATABASE_COUNT: usize = 7;
@ -485,6 +485,7 @@ where
// If an embedder wasn't used in the typedchunk but must be binary quantized
// we should insert it in `dimension`
let backend = self.index.get_vector_store(self.wtxn)?.unwrap_or_default();
for (name, action) in settings_diff.embedding_config_updates.iter() {
if action.is_being_quantized && !dimension.contains_key(name.as_str()) {
let index = self.index.embedding_configs().embedder_id(self.wtxn, name)?.ok_or(
@ -494,7 +495,7 @@ where
},
)?;
let reader =
ArroyWrapper::new(self.index.vector_arroy, index, action.was_quantized);
VectorStore::new(backend, self.index.vector_store, index, action.was_quantized);
let Some(dim) = reader.dimensions(self.wtxn)? else {
continue;
};
@ -504,7 +505,7 @@ where
for (embedder_name, dimension) in dimension {
let wtxn = &mut *self.wtxn;
let vector_arroy = self.index.vector_arroy;
let vector_store = self.index.vector_store;
let cancel = &self.should_abort;
let embedder_index =
@ -523,11 +524,12 @@ where
let is_quantizing = embedder_config.is_some_and(|action| action.is_being_quantized);
pool.install(|| {
let mut writer = ArroyWrapper::new(vector_arroy, embedder_index, was_quantized);
let mut writer =
VectorStore::new(backend, vector_store, embedder_index, was_quantized);
writer.build_and_quantize(
wtxn,
// In the settings we don't have any progress to share
&Progress::default(),
Progress::default(),
&mut rng,
dimension,
is_quantizing,

View File

@ -32,7 +32,7 @@ use crate::update::settings::{InnerIndexSettings, InnerIndexSettingsDiff};
use crate::update::{AvailableIds, UpdateIndexingStep};
use crate::vector::parsed_vectors::{ExplicitVectors, VectorOrArrayOfVectors};
use crate::vector::settings::{RemoveFragments, WriteBackToDocuments};
use crate::vector::ArroyWrapper;
use crate::vector::VectorStore;
use crate::{FieldDistribution, FieldId, FieldIdMapMissingEntry, Index, Result};
pub struct TransformOutput {
@ -834,15 +834,17 @@ impl<'a, 'i> Transform<'a, 'i> {
None
};
let readers: BTreeMap<&str, (ArroyWrapper, &RoaringBitmap)> = settings_diff
let backend = self.index.get_vector_store(wtxn)?.unwrap_or_default();
let readers: BTreeMap<&str, (VectorStore, &RoaringBitmap)> = settings_diff
.embedding_config_updates
.iter()
.filter_map(|(name, action)| {
if let Some(WriteBackToDocuments { embedder_id, user_provided }) =
action.write_back()
{
let reader = ArroyWrapper::new(
self.index.vector_arroy,
let reader = VectorStore::new(
backend,
self.index.vector_store,
*embedder_id,
action.was_quantized,
);
@ -882,10 +884,7 @@ impl<'a, 'i> Transform<'a, 'i> {
InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None },
)?;
let injected_vectors: std::result::Result<
serde_json::Map<String, serde_json::Value>,
arroy::Error,
> = readers
let injected_vectors: crate::Result<_> = readers
.iter()
.filter_map(|(name, (reader, user_provided))| {
if !user_provided.contains(docid) {
@ -949,9 +948,13 @@ impl<'a, 'i> Transform<'a, 'i> {
else {
continue;
};
let arroy =
ArroyWrapper::new(self.index.vector_arroy, infos.embedder_id, was_quantized);
let Some(dimensions) = arroy.dimensions(wtxn)? else {
let vector_store = VectorStore::new(
backend,
self.index.vector_store,
infos.embedder_id,
was_quantized,
);
let Some(dimensions) = vector_store.dimensions(wtxn)? else {
continue;
};
for fragment_id in fragment_ids {
@ -959,17 +962,17 @@ impl<'a, 'i> Transform<'a, 'i> {
if infos.embedding_status.user_provided_docids().is_empty() {
// no user provided: clear store
arroy.clear_store(wtxn, *fragment_id, dimensions)?;
vector_store.clear_store(wtxn, *fragment_id, dimensions)?;
continue;
}
// some user provided, remove only the ids that are not user provided
let to_delete = arroy.items_in_store(wtxn, *fragment_id, |items| {
let to_delete = vector_store.items_in_store(wtxn, *fragment_id, |items| {
items - infos.embedding_status.user_provided_docids()
})?;
for to_delete in to_delete {
arroy.del_item_in_store(wtxn, to_delete, *fragment_id, dimensions)?;
vector_store.del_item_in_store(wtxn, to_delete, *fragment_id, dimensions)?;
}
}
}

View File

@ -27,7 +27,7 @@ use crate::update::index_documents::helpers::{
};
use crate::update::settings::InnerIndexSettingsDiff;
use crate::vector::db::{EmbeddingStatusDelta, IndexEmbeddingConfig};
use crate::vector::ArroyWrapper;
use crate::vector::VectorStore;
use crate::{
lat_lng_to_xyz, CboRoaringBitmapCodec, DocumentId, FieldId, GeoPoint, Index, InternalError,
Result, SerializationError, U8StrStrCodec,
@ -619,6 +619,7 @@ pub(crate) fn write_typed_chunk_into_index(
let _entered = span.enter();
let embedders = index.embedding_configs();
let backend = index.get_vector_store(wtxn)?.unwrap_or_default();
let mut remove_vectors_builder = MergerBuilder::new(KeepFirst);
let mut manual_vectors_builder = MergerBuilder::new(KeepFirst);
@ -677,7 +678,8 @@ pub(crate) fn write_typed_chunk_into_index(
.get(&embedder_name)
.is_some_and(|conf| conf.is_quantized);
// FIXME: allow customizing distance
let writer = ArroyWrapper::new(index.vector_arroy, infos.embedder_id, binary_quantized);
let writer =
VectorStore::new(backend, index.vector_store, infos.embedder_id, binary_quantized);
// remove vectors for docids we want them removed
let merger = remove_vectors_builder.build();

View File

@ -1,7 +1,8 @@
use grenad::CompressionType;
use super::GrenadParameters;
use crate::{thread_pool_no_abort::ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
use crate::thread_pool_no_abort::ThreadPoolNoAbort;
use crate::ThreadPoolNoAbortBuilder;
#[derive(Debug)]
pub struct IndexerConfig {

View File

@ -255,9 +255,9 @@ impl<'a> From<FrameGrantR<'a>> for FrameWithHeader<'a> {
#[repr(u8)]
pub enum EntryHeader {
DbOperation(DbOperation),
ArroyDeleteVector(ArroyDeleteVector),
ArroySetVectors(ArroySetVectors),
ArroySetVector(ArroySetVector),
DeleteVector(DeleteVector),
SetVectors(SetVectors),
SetVector(SetVector),
}
impl EntryHeader {
@ -268,9 +268,9 @@ impl EntryHeader {
const fn variant_id(&self) -> u8 {
match self {
EntryHeader::DbOperation(_) => 0,
EntryHeader::ArroyDeleteVector(_) => 1,
EntryHeader::ArroySetVectors(_) => 2,
EntryHeader::ArroySetVector(_) => 3,
EntryHeader::DeleteVector(_) => 1,
EntryHeader::SetVectors(_) => 2,
EntryHeader::SetVector(_) => 3,
}
}
@ -286,26 +286,26 @@ impl EntryHeader {
}
const fn total_delete_vector_size() -> usize {
Self::variant_size() + mem::size_of::<ArroyDeleteVector>()
Self::variant_size() + mem::size_of::<DeleteVector>()
}
/// The `dimensions` corresponds to the number of `f32` in the embedding.
fn total_set_vectors_size(count: usize, dimensions: usize) -> usize {
let embedding_size = dimensions * mem::size_of::<f32>();
Self::variant_size() + mem::size_of::<ArroySetVectors>() + embedding_size * count
Self::variant_size() + mem::size_of::<SetVectors>() + embedding_size * count
}
fn total_set_vector_size(dimensions: usize) -> usize {
let embedding_size = dimensions * mem::size_of::<f32>();
Self::variant_size() + mem::size_of::<ArroySetVector>() + embedding_size
Self::variant_size() + mem::size_of::<SetVector>() + embedding_size
}
fn header_size(&self) -> usize {
let payload_size = match self {
EntryHeader::DbOperation(op) => mem::size_of_val(op),
EntryHeader::ArroyDeleteVector(adv) => mem::size_of_val(adv),
EntryHeader::ArroySetVectors(asvs) => mem::size_of_val(asvs),
EntryHeader::ArroySetVector(asv) => mem::size_of_val(asv),
EntryHeader::DeleteVector(adv) => mem::size_of_val(adv),
EntryHeader::SetVectors(asvs) => mem::size_of_val(asvs),
EntryHeader::SetVector(asv) => mem::size_of_val(asv),
};
Self::variant_size() + payload_size
}
@ -319,19 +319,19 @@ impl EntryHeader {
EntryHeader::DbOperation(header)
}
1 => {
let header_bytes = &remaining[..mem::size_of::<ArroyDeleteVector>()];
let header_bytes = &remaining[..mem::size_of::<DeleteVector>()];
let header = checked::pod_read_unaligned(header_bytes);
EntryHeader::ArroyDeleteVector(header)
EntryHeader::DeleteVector(header)
}
2 => {
let header_bytes = &remaining[..mem::size_of::<ArroySetVectors>()];
let header_bytes = &remaining[..mem::size_of::<SetVectors>()];
let header = checked::pod_read_unaligned(header_bytes);
EntryHeader::ArroySetVectors(header)
EntryHeader::SetVectors(header)
}
3 => {
let header_bytes = &remaining[..mem::size_of::<ArroySetVector>()];
let header_bytes = &remaining[..mem::size_of::<SetVector>()];
let header = checked::pod_read_unaligned(header_bytes);
EntryHeader::ArroySetVector(header)
EntryHeader::SetVector(header)
}
id => panic!("invalid variant id: {id}"),
}
@ -341,9 +341,9 @@ impl EntryHeader {
let (first, remaining) = header_bytes.split_first_mut().unwrap();
let payload_bytes = match self {
EntryHeader::DbOperation(op) => bytemuck::bytes_of(op),
EntryHeader::ArroyDeleteVector(adv) => bytemuck::bytes_of(adv),
EntryHeader::ArroySetVectors(asvs) => bytemuck::bytes_of(asvs),
EntryHeader::ArroySetVector(asv) => bytemuck::bytes_of(asv),
EntryHeader::DeleteVector(adv) => bytemuck::bytes_of(adv),
EntryHeader::SetVectors(asvs) => bytemuck::bytes_of(asvs),
EntryHeader::SetVector(asv) => bytemuck::bytes_of(asv),
};
*first = self.variant_id();
remaining.copy_from_slice(payload_bytes);
@ -378,7 +378,7 @@ impl DbOperation {
#[derive(Debug, Clone, Copy, NoUninit, CheckedBitPattern)]
#[repr(transparent)]
pub struct ArroyDeleteVector {
pub struct DeleteVector {
pub docid: DocumentId,
}
@ -386,13 +386,13 @@ pub struct ArroyDeleteVector {
#[repr(C)]
/// The embeddings are in the remaining space and represents
/// non-aligned [f32] each with dimensions f32s.
pub struct ArroySetVectors {
pub struct SetVectors {
pub docid: DocumentId,
pub embedder_id: u8,
_padding: [u8; 3],
}
impl ArroySetVectors {
impl SetVectors {
fn embeddings_bytes<'a>(frame: &'a FrameGrantR<'_>) -> &'a [u8] {
let skip = EntryHeader::variant_size() + mem::size_of::<Self>();
&frame[skip..]
@ -416,14 +416,14 @@ impl ArroySetVectors {
#[repr(C)]
/// The embeddings are in the remaining space and represents
/// non-aligned [f32] each with dimensions f32s.
pub struct ArroySetVector {
pub struct SetVector {
pub docid: DocumentId,
pub embedder_id: u8,
pub extractor_id: u8,
_padding: [u8; 2],
}
impl ArroySetVector {
impl SetVector {
fn embeddings_bytes<'a>(frame: &'a FrameGrantR<'_>) -> &'a [u8] {
let skip = EntryHeader::variant_size() + mem::size_of::<Self>();
&frame[skip..]
@ -553,7 +553,7 @@ impl<'b> ExtractorBbqueueSender<'b> {
let refcell = self.producers.get().unwrap();
let mut producer = refcell.0.borrow_mut_or_yield();
let payload_header = EntryHeader::ArroyDeleteVector(ArroyDeleteVector { docid });
let payload_header = EntryHeader::DeleteVector(DeleteVector { docid });
let total_length = EntryHeader::total_delete_vector_size();
if total_length > max_grant {
panic!("The entry is larger ({total_length} bytes) than the BBQueue max grant ({max_grant} bytes)");
@ -589,8 +589,8 @@ impl<'b> ExtractorBbqueueSender<'b> {
// to zero to allocate no extra space at all
let dimensions = embeddings.first().map_or(0, |emb| emb.len());
let arroy_set_vector = ArroySetVectors { docid, embedder_id, _padding: [0; 3] };
let payload_header = EntryHeader::ArroySetVectors(arroy_set_vector);
let set_vectors = SetVectors { docid, embedder_id, _padding: [0; 3] };
let payload_header = EntryHeader::SetVectors(set_vectors);
let total_length = EntryHeader::total_set_vectors_size(embeddings.len(), dimensions);
if total_length > max_grant {
let mut value_file = tempfile::tempfile().map(BufWriter::new)?;
@ -650,9 +650,8 @@ impl<'b> ExtractorBbqueueSender<'b> {
// to zero to allocate no extra space at all
let dimensions = embedding.as_ref().map_or(0, |emb| emb.len());
let arroy_set_vector =
ArroySetVector { docid, embedder_id, extractor_id, _padding: [0; 2] };
let payload_header = EntryHeader::ArroySetVector(arroy_set_vector);
let set_vector = SetVector { docid, embedder_id, extractor_id, _padding: [0; 2] };
let payload_header = EntryHeader::SetVector(set_vector);
let total_length = EntryHeader::total_set_vector_size(dimensions);
if total_length > max_grant {
let mut value_file = tempfile::tempfile().map(BufWriter::new)?;

View File

@ -240,12 +240,12 @@ impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeDocumentE
/// modifies them by adding or removing vector fields based on embedder actions,
/// and then updates the database.
#[tracing::instrument(level = "trace", skip_all, target = "indexing::documents::extract")]
pub fn update_database_documents<'indexer, 'extractor, MSP, SD>(
pub fn update_database_documents<'indexer, MSP, SD>(
documents: &'indexer DocumentsIndentifiers<'indexer>,
indexing_context: IndexingContext<MSP>,
extractor_sender: &ExtractorBbqueueSender,
settings_delta: &SD,
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
extractor_allocs: &mut ThreadLocal<FullySend<Bump>>,
) -> Result<()>
where
MSP: Fn() -> bool + Sync,

View File

@ -475,7 +475,7 @@ impl<'doc> OnEmbed<'doc> for OnEmbeddingDocumentUpdates<'doc, '_> {
}
fn process_embedding_error(
&mut self,
error: crate::vector::hf::EmbedError,
error: crate::vector::error::EmbedError,
embedder_name: &'doc str,
unused_vectors_distribution: &UnusedVectorsDistributionBump,
metadata: BVec<'doc, Metadata<'doc>>,

View File

@ -8,7 +8,7 @@ use document_changes::{DocumentChanges, IndexingContext};
pub use document_deletion::DocumentDeletion;
pub use document_operation::{DocumentOperation, PayloadStats};
use hashbrown::HashMap;
use heed::RwTxn;
use heed::{RoTxn, RwTxn};
pub use partial_dump::PartialDump;
pub use post_processing::recompute_word_fst_from_word_docids_database;
pub use update_by_function::UpdateByFunction;
@ -24,7 +24,7 @@ use crate::progress::{EmbedderStats, Progress};
use crate::update::settings::SettingsDelta;
use crate::update::GrenadParameters;
use crate::vector::settings::{EmbedderAction, RemoveFragments, WriteBackToDocuments};
use crate::vector::{ArroyWrapper, Embedder, RuntimeEmbedders};
use crate::vector::{Embedder, RuntimeEmbedders, VectorStore};
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort};
pub(crate) mod de;
@ -67,7 +67,7 @@ where
let mut bbbuffers = Vec::new();
let finished_extraction = AtomicBool::new(false);
let arroy_memory = grenad_parameters.max_memory;
let vector_memory = grenad_parameters.max_memory;
let (grenad_parameters, total_bbbuffer_capacity) =
indexer_memory_settings(pool.current_num_threads(), grenad_parameters);
@ -130,8 +130,9 @@ where
let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map);
let vector_arroy = index.vector_arroy;
let arroy_writers: Result<HashMap<_, _>> = embedders
let vector_arroy = index.vector_store;
let backend = index.get_vector_store(wtxn)?.unwrap_or_default();
let vector_stores: Result<HashMap<_, _>> = embedders
.inner_as_ref()
.iter()
.map(|(embedder_name, runtime)| {
@ -144,7 +145,8 @@ where
})?;
let dimensions = runtime.embedder.dimensions();
let writer = ArroyWrapper::new(vector_arroy, embedder_index, runtime.is_quantized);
let writer =
VectorStore::new(backend, vector_arroy, embedder_index, runtime.is_quantized);
Ok((
embedder_index,
@ -153,10 +155,10 @@ where
})
.collect();
let mut arroy_writers = arroy_writers?;
let mut vector_stores = vector_stores?;
let congestion =
write_to_db(writer_receiver, finished_extraction, index, wtxn, &arroy_writers)?;
write_to_db(writer_receiver, finished_extraction, index, wtxn, &vector_stores)?;
indexing_context.progress.update_progress(IndexingStep::WaitingForExtractors);
@ -170,8 +172,8 @@ where
wtxn,
indexing_context.progress,
index_embeddings,
arroy_memory,
&mut arroy_writers,
vector_memory,
&mut vector_stores,
None,
&indexing_context.must_stop_processing,
)
@ -227,7 +229,7 @@ where
let mut bbbuffers = Vec::new();
let finished_extraction = AtomicBool::new(false);
let arroy_memory = grenad_parameters.max_memory;
let vector_memory = grenad_parameters.max_memory;
let (grenad_parameters, total_bbbuffer_capacity) =
indexer_memory_settings(pool.current_num_threads(), grenad_parameters);
@ -284,15 +286,16 @@ where
let new_embedders = settings_delta.new_embedders();
let embedder_actions = settings_delta.embedder_actions();
let index_embedder_category_ids = settings_delta.new_embedder_category_id();
let mut arroy_writers = arroy_writers_from_embedder_actions(
let mut vector_stores = vector_stores_from_embedder_actions(
index,
wtxn,
embedder_actions,
new_embedders,
index_embedder_category_ids,
)?;
let congestion =
write_to_db(writer_receiver, finished_extraction, index, wtxn, &arroy_writers)?;
write_to_db(writer_receiver, finished_extraction, index, wtxn, &vector_stores)?;
indexing_context.progress.update_progress(IndexingStep::WaitingForExtractors);
@ -306,8 +309,8 @@ where
wtxn,
indexing_context.progress,
index_embeddings,
arroy_memory,
&mut arroy_writers,
vector_memory,
&mut vector_stores,
Some(embedder_actions),
&indexing_context.must_stop_processing,
)
@ -337,13 +340,15 @@ where
Ok(congestion)
}
fn arroy_writers_from_embedder_actions<'indexer>(
fn vector_stores_from_embedder_actions<'indexer>(
index: &Index,
rtxn: &RoTxn,
embedder_actions: &'indexer BTreeMap<String, EmbedderAction>,
embedders: &'indexer RuntimeEmbedders,
index_embedder_category_ids: &'indexer std::collections::HashMap<String, u8>,
) -> Result<HashMap<u8, (&'indexer str, &'indexer Embedder, ArroyWrapper, usize)>> {
let vector_arroy = index.vector_arroy;
) -> Result<HashMap<u8, (&'indexer str, &'indexer Embedder, VectorStore, usize)>> {
let vector_arroy = index.vector_store;
let backend = index.get_vector_store(rtxn)?.unwrap_or_default();
embedders
.inner_as_ref()
@ -361,8 +366,12 @@ fn arroy_writers_from_embedder_actions<'indexer>(
},
)));
};
let writer =
ArroyWrapper::new(vector_arroy, embedder_category_id, action.was_quantized);
let writer = VectorStore::new(
backend,
vector_arroy,
embedder_category_id,
action.was_quantized,
);
let dimensions = runtime.embedder.dimensions();
Some(Ok((
embedder_category_id,
@ -381,11 +390,13 @@ fn delete_old_embedders_and_fragments<SD>(
where
SD: SettingsDelta,
{
let backend = index.get_vector_store(wtxn)?.unwrap_or_default();
for action in settings_delta.embedder_actions().values() {
let Some(WriteBackToDocuments { embedder_id, .. }) = action.write_back() else {
continue;
};
let reader = ArroyWrapper::new(index.vector_arroy, *embedder_id, action.was_quantized);
let reader =
VectorStore::new(backend, index.vector_store, *embedder_id, action.was_quantized);
let Some(dimensions) = reader.dimensions(wtxn)? else {
continue;
};
@ -401,7 +412,7 @@ where
let Some(infos) = index.embedding_configs().embedder_info(wtxn, embedder_name)? else {
continue;
};
let arroy = ArroyWrapper::new(index.vector_arroy, infos.embedder_id, was_quantized);
let arroy = VectorStore::new(backend, index.vector_store, infos.embedder_id, was_quantized);
let Some(dimensions) = arroy.dimensions(wtxn)? else {
continue;
};

View File

@ -15,7 +15,7 @@ use crate::progress::Progress;
use crate::update::settings::InnerIndexSettings;
use crate::vector::db::IndexEmbeddingConfig;
use crate::vector::settings::EmbedderAction;
use crate::vector::{ArroyWrapper, Embedder, Embeddings, RuntimeEmbedders};
use crate::vector::{Embedder, Embeddings, RuntimeEmbedders, VectorStore};
use crate::{Error, Index, InternalError, Result, UserError};
pub fn write_to_db(
@ -23,9 +23,9 @@ pub fn write_to_db(
finished_extraction: &AtomicBool,
index: &Index,
wtxn: &mut RwTxn<'_>,
arroy_writers: &HashMap<u8, (&str, &Embedder, ArroyWrapper, usize)>,
vector_stores: &HashMap<u8, (&str, &Embedder, VectorStore, usize)>,
) -> Result<ChannelCongestion> {
// Used by by the ArroySetVector to copy the embedding into an
// Used by by the HannoySetVector to copy the embedding into an
// aligned memory area, required by arroy to accept a new vector.
let mut aligned_embedding = Vec::new();
let span = tracing::trace_span!(target: "indexing::write_db", "all");
@ -56,7 +56,7 @@ pub fn write_to_db(
ReceiverAction::LargeVectors(large_vectors) => {
let LargeVectors { docid, embedder_id, .. } = large_vectors;
let (_, _, writer, dimensions) =
arroy_writers.get(&embedder_id).expect("requested a missing embedder");
vector_stores.get(&embedder_id).expect("requested a missing embedder");
let mut embeddings = Embeddings::new(*dimensions);
for embedding in large_vectors.read_embeddings(*dimensions) {
embeddings.push(embedding.to_vec()).unwrap();
@ -68,7 +68,7 @@ pub fn write_to_db(
large_vector @ LargeVector { docid, embedder_id, extractor_id, .. },
) => {
let (_, _, writer, dimensions) =
arroy_writers.get(&embedder_id).expect("requested a missing embedder");
vector_stores.get(&embedder_id).expect("requested a missing embedder");
let embedding = large_vector.read_embedding(*dimensions);
writer.add_item_in_store(wtxn, docid, extractor_id, embedding)?;
}
@ -80,12 +80,12 @@ pub fn write_to_db(
&mut writer_receiver,
index,
wtxn,
arroy_writers,
vector_stores,
&mut aligned_embedding,
)?;
}
write_from_bbqueue(&mut writer_receiver, index, wtxn, arroy_writers, &mut aligned_embedding)?;
write_from_bbqueue(&mut writer_receiver, index, wtxn, vector_stores, &mut aligned_embedding)?;
Ok(ChannelCongestion {
attempts: writer_receiver.sent_messages_attempts(),
@ -115,8 +115,8 @@ pub fn build_vectors<MSP>(
wtxn: &mut RwTxn<'_>,
progress: &Progress,
index_embeddings: Vec<IndexEmbeddingConfig>,
arroy_memory: Option<usize>,
arroy_writers: &mut HashMap<u8, (&str, &Embedder, ArroyWrapper, usize)>,
vector_memory: Option<usize>,
vector_stores: &mut HashMap<u8, (&str, &Embedder, VectorStore, usize)>,
embeder_actions: Option<&BTreeMap<String, EmbedderAction>>,
must_stop_processing: &MSP,
) -> Result<()>
@ -129,18 +129,18 @@ where
let seed = rand::random();
let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
for (_index, (embedder_name, _embedder, writer, dimensions)) in arroy_writers {
for (_index, (embedder_name, _embedder, writer, dimensions)) in vector_stores {
let dimensions = *dimensions;
let is_being_quantized = embeder_actions
.and_then(|actions| actions.get(*embedder_name).map(|action| action.is_being_quantized))
.unwrap_or(false);
writer.build_and_quantize(
wtxn,
progress,
progress.clone(),
&mut rng,
dimensions,
is_being_quantized,
arroy_memory,
vector_memory,
must_stop_processing,
)?;
}
@ -181,7 +181,7 @@ pub fn write_from_bbqueue(
writer_receiver: &mut WriterBbqueueReceiver<'_>,
index: &Index,
wtxn: &mut RwTxn<'_>,
arroy_writers: &HashMap<u8, (&str, &crate::vector::Embedder, ArroyWrapper, usize)>,
vector_stores: &HashMap<u8, (&str, &crate::vector::Embedder, VectorStore, usize)>,
aligned_embedding: &mut Vec<f32>,
) -> crate::Result<()> {
while let Some(frame_with_header) = writer_receiver.recv_frame() {
@ -221,17 +221,17 @@ pub fn write_from_bbqueue(
},
}
}
EntryHeader::ArroyDeleteVector(ArroyDeleteVector { docid }) => {
for (_index, (_name, _embedder, writer, dimensions)) in arroy_writers {
EntryHeader::DeleteVector(DeleteVector { docid }) => {
for (_index, (_name, _embedder, writer, dimensions)) in vector_stores {
let dimensions = *dimensions;
writer.del_items(wtxn, dimensions, docid)?;
}
}
EntryHeader::ArroySetVectors(asvs) => {
let ArroySetVectors { docid, embedder_id, .. } = asvs;
EntryHeader::SetVectors(asvs) => {
let SetVectors { docid, embedder_id, .. } = asvs;
let frame = frame_with_header.frame();
let (_, _, writer, dimensions) =
arroy_writers.get(&embedder_id).expect("requested a missing embedder");
vector_stores.get(&embedder_id).expect("requested a missing embedder");
let mut embeddings = Embeddings::new(*dimensions);
let all_embeddings = asvs.read_all_embeddings_into_vec(frame, aligned_embedding);
writer.del_items(wtxn, *dimensions, docid)?;
@ -245,12 +245,10 @@ pub fn write_from_bbqueue(
writer.add_items(wtxn, docid, &embeddings)?;
}
}
EntryHeader::ArroySetVector(
asv @ ArroySetVector { docid, embedder_id, extractor_id, .. },
) => {
EntryHeader::SetVector(asv @ SetVector { docid, embedder_id, extractor_id, .. }) => {
let frame = frame_with_header.frame();
let (_, _, writer, dimensions) =
arroy_writers.get(&embedder_id).expect("requested a missing embedder");
vector_stores.get(&embedder_id).expect("requested a missing embedder");
let embedding = asv.read_all_embeddings_into_vec(frame, aligned_embedding);
if embedding.is_empty() {

View File

@ -63,8 +63,8 @@ where
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
pub fn merge_and_send_docids<'extractor, MSP, D>(
mut caches: Vec<BalancedCaches<'extractor>>,
pub fn merge_and_send_docids<MSP, D>(
mut caches: Vec<BalancedCaches<'_>>,
database: Database<Bytes, Bytes>,
index: &Index,
docids_sender: WordDocidsSender<D>,
@ -91,8 +91,8 @@ where
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
pub fn merge_and_send_facet_docids<'extractor>(
mut caches: Vec<BalancedCaches<'extractor>>,
pub fn merge_and_send_facet_docids(
mut caches: Vec<BalancedCaches<'_>>,
database: FacetDatabases,
index: &Index,
rtxn: &RoTxn,

View File

@ -21,6 +21,14 @@ make_enum_progress! {
}
}
make_enum_progress! {
pub enum SettingsIndexerStep {
ChangingVectorStore,
UsingStableIndexer,
UsingExperimentalIndexer,
}
}
make_enum_progress! {
pub enum PostProcessingFacets {
StringsBulk,

View File

@ -14,7 +14,7 @@ use crate::constants::RESERVED_VECTORS_FIELD_NAME;
use crate::documents::FieldIdMapper;
use crate::vector::db::{EmbeddingStatus, IndexEmbeddingConfig};
use crate::vector::parsed_vectors::{RawVectors, RawVectorsError, VectorOrArrayOfVectors};
use crate::vector::{ArroyWrapper, Embedding, RuntimeEmbedders};
use crate::vector::{Embedding, RuntimeEmbedders, VectorStore};
use crate::{DocumentId, Index, InternalError, Result, UserError};
#[derive(Serialize)]
@ -120,8 +120,13 @@ impl<'t> VectorDocumentFromDb<'t> {
config: &IndexEmbeddingConfig,
status: &EmbeddingStatus,
) -> Result<VectorEntry<'t>> {
let reader =
ArroyWrapper::new(self.index.vector_arroy, embedder_id, config.config.quantized());
let backend = self.index.get_vector_store(self.rtxn)?.unwrap_or_default();
let reader = VectorStore::new(
backend,
self.index.vector_store,
embedder_id,
config.config.quantized(),
);
let vectors = reader.item_vectors(self.rtxn, self.docid)?;
Ok(VectorEntry {
@ -149,7 +154,7 @@ impl<'t> VectorDocument<'t> for VectorDocumentFromDb<'t> {
name,
entry_from_raw_value(value, false).map_err(|_| {
InternalError::Serialization(crate::SerializationError::Decoding {
db_name: Some(crate::index::db_name::VECTOR_ARROY),
db_name: Some(crate::index::db_name::VECTOR_STORE),
})
})?,
))
@ -167,7 +172,7 @@ impl<'t> VectorDocument<'t> for VectorDocumentFromDb<'t> {
Some(embedding_from_doc) => {
Some(entry_from_raw_value(embedding_from_doc, false).map_err(|_| {
InternalError::Serialization(crate::SerializationError::Decoding {
db_name: Some(crate::index::db_name::VECTOR_ARROY),
db_name: Some(crate::index::db_name::VECTOR_STORE),
})
})?)
}

View File

@ -26,13 +26,15 @@ use crate::index::{
DEFAULT_MIN_WORD_LEN_TWO_TYPOS,
};
use crate::order_by_map::OrderByMap;
use crate::progress::{EmbedderStats, Progress};
use crate::progress::{EmbedderStats, Progress, VariableNameStep};
use crate::prompt::{default_max_bytes, default_template_text, PromptData};
use crate::proximity::ProximityPrecision;
use crate::update::index_documents::IndexDocumentsMethod;
use crate::update::new::indexer::reindex;
use crate::update::new::steps::SettingsIndexerStep;
use crate::update::{IndexDocuments, UpdateIndexingStep};
use crate::vector::db::{FragmentConfigs, IndexEmbeddingConfig};
use crate::vector::embedder::{openai, rest};
use crate::vector::json_template::JsonTemplate;
use crate::vector::settings::{
EmbedderAction, EmbedderSource, EmbeddingSettings, EmbeddingValidationContext, NestingContext,
@ -40,6 +42,7 @@ use crate::vector::settings::{
};
use crate::vector::{
Embedder, EmbeddingConfig, RuntimeEmbedder, RuntimeEmbedders, RuntimeFragment,
VectorStoreBackend,
};
use crate::{
ChannelCongestion, FieldId, FilterableAttributesRule, Index, LocalizedAttributesRule, Result,
@ -198,6 +201,7 @@ pub struct Settings<'a, 't, 'i> {
prefix_search: Setting<PrefixSearch>,
facet_search: Setting<bool>,
chat: Setting<ChatSettings>,
vector_store: Setting<VectorStoreBackend>,
}
impl<'a, 't, 'i> Settings<'a, 't, 'i> {
@ -237,6 +241,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
prefix_search: Setting::NotSet,
facet_search: Setting::NotSet,
chat: Setting::NotSet,
vector_store: Setting::NotSet,
indexer_config,
}
}
@ -475,6 +480,14 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
self.chat = Setting::Reset;
}
pub fn set_vector_store(&mut self, value: VectorStoreBackend) {
self.vector_store = Setting::Set(value);
}
pub fn reset_vector_store(&mut self) {
self.vector_store = Setting::Reset;
}
#[tracing::instrument(
level = "trace"
skip(self, progress_callback, should_abort, settings_diff, embedder_stats),
@ -1416,7 +1429,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
}
}
pub fn legacy_execute<FP, FA>(
fn legacy_execute<FP, FA>(
mut self,
progress_callback: FP,
should_abort: FA,
@ -1485,6 +1498,70 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
Ok(())
}
fn execute_vector_backend<'indexer, MSP>(
&mut self,
must_stop_processing: &'indexer MSP,
progress: &'indexer Progress,
) -> Result<()>
where
MSP: Fn() -> bool + Sync,
{
let old_backend = self.index.get_vector_store(self.wtxn)?.unwrap_or_default();
let new_backend = match self.vector_store {
Setting::Set(new_backend) => {
self.index.put_vector_store(self.wtxn, new_backend)?;
new_backend
}
Setting::Reset => {
self.index.delete_vector_store(self.wtxn)?;
VectorStoreBackend::default()
}
Setting::NotSet => return Ok(()),
};
if old_backend == new_backend {
return Ok(());
}
let embedders = self.index.embedding_configs();
let embedding_configs = embedders.embedding_configs(self.wtxn)?;
enum VectorStoreBackendChangeIndex {}
let embedder_count = embedding_configs.len();
let rtxn = self.index.read_txn()?;
for (i, config) in embedding_configs.into_iter().enumerate() {
if must_stop_processing() {
return Err(crate::InternalError::AbortedIndexation.into());
}
let embedder_name = &config.name;
progress.update_progress(VariableNameStep::<VectorStoreBackendChangeIndex>::new(
format!("Changing vector store backend for embedder `{embedder_name}`"),
i as u32,
embedder_count as u32,
));
let quantized = config.config.quantized();
let embedder_id = embedders.embedder_id(self.wtxn, &config.name)?.unwrap();
let vector_store = crate::vector::VectorStore::new(
old_backend,
self.index.vector_store,
embedder_id,
quantized,
);
vector_store.change_backend(
&rtxn,
self.wtxn,
progress.clone(),
must_stop_processing,
self.indexer_config.max_memory,
)?;
}
Ok(())
}
pub fn execute<'indexer, MSP>(
mut self,
must_stop_processing: &'indexer MSP,
@ -1494,8 +1571,13 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
where
MSP: Fn() -> bool + Sync,
{
progress.update_progress(SettingsIndexerStep::ChangingVectorStore);
// execute any pending vector store backend change
self.execute_vector_backend(must_stop_processing, progress)?;
// force the old indexer if the environment says so
if self.indexer_config.experimental_no_edition_2024_for_settings {
progress.update_progress(SettingsIndexerStep::UsingStableIndexer);
return self
.legacy_execute(
|indexing_step| tracing::debug!(update = ?indexing_step),
@ -1535,11 +1617,14 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
facet_search: Setting::NotSet,
disable_on_numbers: Setting::NotSet,
chat: Setting::NotSet,
vector_store: Setting::NotSet,
wtxn: _,
index: _,
indexer_config: _,
} = &self
{
progress.update_progress(SettingsIndexerStep::UsingExperimentalIndexer);
self.index.set_updated_at(self.wtxn, &OffsetDateTime::now_utc())?;
let old_inner_settings = InnerIndexSettings::from_index(self.index, self.wtxn, None)?;
@ -1578,6 +1663,8 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
Ok(None)
}
} else {
progress.update_progress(SettingsIndexerStep::UsingStableIndexer);
self.legacy_execute(
|indexing_step| tracing::debug!(update = ?indexing_step),
must_stop_processing,
@ -2208,39 +2295,29 @@ pub fn validate_embedding_settings(
if let Some(request) = request.as_ref().set() {
let request = match with_fragments {
WithFragments::Yes { indexing_fragments, search_fragments } => {
crate::vector::rest::RequestData::new(
request.to_owned(),
indexing_fragments,
search_fragments,
)
.map_err(|error| crate::UserError::VectorEmbeddingError(error.into()))
rest::RequestData::new(request.to_owned(), indexing_fragments, search_fragments)
.map_err(|error| crate::UserError::VectorEmbeddingError(error.into()))
}
WithFragments::No => {
rest::RequestData::new(request.to_owned(), Default::default(), Default::default())
.map_err(|error| crate::UserError::VectorEmbeddingError(error.into()))
}
WithFragments::No => crate::vector::rest::RequestData::new(
request.to_owned(),
Default::default(),
Default::default(),
)
.map_err(|error| crate::UserError::VectorEmbeddingError(error.into())),
WithFragments::Maybe => {
let mut indexing_fragments = BTreeMap::new();
indexing_fragments.insert("test".to_string(), serde_json::json!("test"));
crate::vector::rest::RequestData::new(
request.to_owned(),
indexing_fragments,
Default::default(),
)
.or_else(|_| {
crate::vector::rest::RequestData::new(
request.to_owned(),
Default::default(),
Default::default(),
)
})
.map_err(|error| crate::UserError::VectorEmbeddingError(error.into()))
rest::RequestData::new(request.to_owned(), indexing_fragments, Default::default())
.or_else(|_| {
rest::RequestData::new(
request.to_owned(),
Default::default(),
Default::default(),
)
})
.map_err(|error| crate::UserError::VectorEmbeddingError(error.into()))
}
}?;
if let Some(response) = response.as_ref().set() {
crate::vector::rest::Response::new(response.to_owned(), &request)
rest::Response::new(response.to_owned(), &request)
.map_err(|error| crate::UserError::VectorEmbeddingError(error.into()))?;
}
}
@ -2293,11 +2370,12 @@ pub fn validate_embedding_settings(
match inferred_source {
EmbedderSource::OpenAi => {
if let Setting::Set(model) = &model {
let model = crate::vector::openai::EmbeddingModel::from_name(model.as_str())
.ok_or(crate::error::UserError::InvalidOpenAiModel {
let model = openai::EmbeddingModel::from_name(model.as_str()).ok_or(
crate::error::UserError::InvalidOpenAiModel {
embedder_name: name.to_owned(),
model: model.clone(),
})?;
},
)?;
if let Setting::Set(dimensions) = dimensions {
if !model.supports_overriding_dimensions()
&& dimensions != model.default_dimensions()

View File

@ -898,6 +898,7 @@ fn test_correct_settings_init() {
facet_search,
disable_on_numbers,
chat,
vector_store,
} = settings;
assert!(matches!(searchable_fields, Setting::NotSet));
assert!(matches!(displayed_fields, Setting::NotSet));
@ -927,6 +928,7 @@ fn test_correct_settings_init() {
assert!(matches!(facet_search, Setting::NotSet));
assert!(matches!(disable_on_numbers, Setting::NotSet));
assert!(matches!(chat, Setting::NotSet));
assert!(matches!(vector_store, Setting::NotSet));
})
.unwrap();
}

View File

@ -3,15 +3,16 @@ mod v1_13;
mod v1_14;
mod v1_15;
mod v1_16;
use heed::RwTxn;
use v1_12::{V1_12_3_To_V1_13_0, V1_12_To_V1_12_3};
use v1_13::{V1_13_0_To_V1_13_1, V1_13_1_To_Latest_V1_13};
use v1_14::Latest_V1_13_To_Latest_V1_14;
use v1_15::Latest_V1_14_To_Latest_V1_15;
use v1_16::Latest_V1_15_To_V1_16_0;
use crate::constants::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
use crate::progress::{Progress, VariableNameStep};
use crate::update::upgrade::v1_16::Latest_V1_15_To_V1_16_0;
use crate::{Index, InternalError, Result};
trait UpgradeIndex {
@ -34,6 +35,9 @@ const UPGRADE_FUNCTIONS: &[&dyn UpgradeIndex] = &[
&Latest_V1_13_To_Latest_V1_14 {},
&Latest_V1_14_To_Latest_V1_15 {},
&Latest_V1_15_To_V1_16_0 {},
&ToTargetNoOp { target: (1, 18, 0) },
&ToTargetNoOp { target: (1, 19, 0) },
&ToTargetNoOp { target: (1, 20, 0) },
// This is the last upgrade function, it will be called when the index is up to date.
// any other upgrade function should be added before this one.
&ToCurrentNoOp {},
@ -61,11 +65,10 @@ const fn start(from: (u32, u32, u32)) -> Option<usize> {
(1, 14, _) => function_index!(5),
// We must handle the current version in the match because in case of a failure some index may have been upgraded but not other.
(1, 15, _) => function_index!(6),
(1, 16, _) => function_index!(7),
(1, 17, _) => function_index!(7),
(1, 18, _) => function_index!(7),
(1, 19, _) => function_index!(7),
(1, 20, _) => function_index!(7),
(1, 16, _) | (1, 17, _) => function_index!(7),
(1, 18, _) => function_index!(8),
(1, 19, _) => function_index!(9),
(1, 20, _) => function_index!(10),
// We deliberately don't add a placeholder with (VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH) here to force manually
// considering dumpless upgrade.
(_major, _minor, _patch) => return None,
@ -148,3 +151,25 @@ impl UpgradeIndex for ToCurrentNoOp {
(VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH)
}
}
/// Perform no operation during the upgrade except changing to the specified target version.
#[allow(non_camel_case_types)]
struct ToTargetNoOp {
pub target: (u32, u32, u32),
}
impl UpgradeIndex for ToTargetNoOp {
fn upgrade(
&self,
_wtxn: &mut RwTxn,
_index: &Index,
_original: (u32, u32, u32),
_progress: Progress,
) -> Result<bool> {
Ok(false)
}
fn target_version(&self) -> (u32, u32, u32) {
self.target
}
}

View File

@ -27,9 +27,9 @@ impl UpgradeIndex for Latest_V1_13_To_Latest_V1_14 {
let rtxn = index.read_txn()?;
arroy::upgrade::from_0_5_to_0_6::<Cosine>(
&rtxn,
index.vector_arroy.remap_data_type(),
index.vector_store.remap_types(),
wtxn,
index.vector_arroy.remap_data_type(),
index.vector_store.remap_types(),
)?;
Ok(false)

View File

@ -0,0 +1,128 @@
use deserr::{DeserializeError, Deserr};
use ordered_float::OrderedFloat;
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
/// Describes the mean and sigma of distribution of embedding similarity in the embedding space.
///
/// The intended use is to make the similarity score more comparable to the regular ranking score.
/// This allows to correct effects where results are too "packed" around a certain value.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Deserialize, Serialize, ToSchema)]
#[serde(from = "DistributionShiftSerializable")]
#[serde(into = "DistributionShiftSerializable")]
pub struct DistributionShift {
/// Value where the results are "packed".
///
/// Similarity scores are translated so that they are packed around 0.5 instead
#[schema(value_type = f32)]
pub current_mean: OrderedFloat<f32>,
/// standard deviation of a similarity score.
///
/// Set below 0.4 to make the results less packed around the mean, and above 0.4 to make them more packed.
#[schema(value_type = f32)]
pub current_sigma: OrderedFloat<f32>,
}
impl<E> Deserr<E> for DistributionShift
where
E: DeserializeError,
{
fn deserialize_from_value<V: deserr::IntoValue>(
value: deserr::Value<V>,
location: deserr::ValuePointerRef<'_>,
) -> Result<Self, E> {
let value = DistributionShiftSerializable::deserialize_from_value(value, location)?;
if value.mean < 0. || value.mean > 1. {
return Err(deserr::take_cf_content(E::error::<std::convert::Infallible>(
None,
deserr::ErrorKind::Unexpected {
msg: format!(
"the distribution mean must be in the range [0, 1], got {}",
value.mean
),
},
location,
)));
}
if value.sigma <= 0. || value.sigma > 1. {
return Err(deserr::take_cf_content(E::error::<std::convert::Infallible>(
None,
deserr::ErrorKind::Unexpected {
msg: format!(
"the distribution sigma must be in the range ]0, 1], got {}",
value.sigma
),
},
location,
)));
}
Ok(value.into())
}
}
#[derive(Serialize, Deserialize, Deserr)]
#[serde(deny_unknown_fields)]
#[deserr(deny_unknown_fields)]
struct DistributionShiftSerializable {
mean: f32,
sigma: f32,
}
impl From<DistributionShift> for DistributionShiftSerializable {
fn from(
DistributionShift {
current_mean: OrderedFloat(current_mean),
current_sigma: OrderedFloat(current_sigma),
}: DistributionShift,
) -> Self {
Self { mean: current_mean, sigma: current_sigma }
}
}
impl From<DistributionShiftSerializable> for DistributionShift {
fn from(DistributionShiftSerializable { mean, sigma }: DistributionShiftSerializable) -> Self {
Self { current_mean: OrderedFloat(mean), current_sigma: OrderedFloat(sigma) }
}
}
impl DistributionShift {
/// `None` if sigma <= 0.
pub fn new(mean: f32, sigma: f32) -> Option<Self> {
if sigma <= 0.0 {
None
} else {
Some(Self { current_mean: OrderedFloat(mean), current_sigma: OrderedFloat(sigma) })
}
}
pub fn shift(&self, score: f32) -> f32 {
let current_mean = self.current_mean.0;
let current_sigma = self.current_sigma.0;
// <https://math.stackexchange.com/a/2894689>
// We're somewhat abusively mapping the distribution of distances to a gaussian.
// The parameters we're given is the mean and sigma of the native result distribution.
// We're using them to retarget the distribution to a gaussian centered on 0.5 with a sigma of 0.4.
let target_mean = 0.5;
let target_sigma = 0.4;
// a^2 sig1^2 = sig2^2 => a^2 = sig2^2 / sig1^2 => a = sig2 / sig1, assuming a, sig1, and sig2 positive.
let factor = target_sigma / current_sigma;
// a*mu1 + b = mu2 => b = mu2 - a*mu1
let offset = target_mean - (factor * current_mean);
let mut score = factor * score + offset;
// clamp the final score in the ]0, 1] interval.
if score <= 0.0 {
score = f32::EPSILON;
}
if score > 1.0 {
score = 1.0;
}
score
}
}

View File

@ -1,15 +1,15 @@
use std::time::Instant;
use arroy::Distance;
use hannoy::Distance;
use super::error::CompositeEmbedderContainsHuggingFace;
use super::{
hf, manual, ollama, openai, rest, DistributionShift, EmbedError, Embedding, EmbeddingCache,
NewEmbedderError,
};
use super::{hf, manual, ollama, openai, rest, Embedding, EmbeddingCache};
use crate::progress::EmbedderStats;
use crate::vector::error::{CompositeEmbedderContainsHuggingFace, EmbedError, NewEmbedderError};
use crate::vector::DistributionShift;
use crate::ThreadPoolNoAbort;
pub(in crate::vector) const MAX_COMPOSITE_DISTANCE: f32 = 0.01;
#[derive(Debug)]
pub enum SubEmbedder {
/// An embedder based on running local models, fetched from the Hugging Face Hub.
@ -324,20 +324,19 @@ fn check_similarity(
}
for (left, right) in left.into_iter().zip(right) {
let left = arroy::internals::UnalignedVector::from_slice(&left);
let right = arroy::internals::UnalignedVector::from_slice(&right);
let left = arroy::internals::Leaf {
header: arroy::distances::Cosine::new_header(&left),
let left = hannoy::internals::UnalignedVector::from_slice(&left);
let right = hannoy::internals::UnalignedVector::from_slice(&right);
let left = hannoy::internals::Item {
header: hannoy::distances::Cosine::new_header(&left),
vector: left,
};
let right = arroy::internals::Leaf {
header: arroy::distances::Cosine::new_header(&right),
let right = hannoy::internals::Item {
header: hannoy::distances::Cosine::new_header(&right),
vector: right,
};
let distance = arroy::distances::Cosine::built_distance(&left, &right);
if distance > super::MAX_COMPOSITE_DISTANCE {
let distance = hannoy::distances::Cosine::distance(&left, &right);
if distance > crate::vector::embedder::composite::MAX_COMPOSITE_DISTANCE {
return Err(NewEmbedderError::composite_embedding_value_mismatch(distance, hint));
}
}

View File

@ -6,8 +6,9 @@ use hf_hub::api::sync::Api;
use hf_hub::{Repo, RepoType};
use tokenizers::{PaddingParams, Tokenizer};
pub use super::error::{EmbedError, Error, NewEmbedderError};
use super::{DistributionShift, Embedding, EmbeddingCache};
use super::EmbeddingCache;
use crate::vector::error::{EmbedError, NewEmbedderError};
use crate::vector::{DistributionShift, Embedding};
#[derive(
Debug,

View File

@ -1,6 +1,5 @@
use super::error::EmbedError;
use super::DistributionShift;
use crate::vector::Embedding;
use crate::vector::error::EmbedError;
use crate::vector::{DistributionShift, Embedding};
#[derive(Debug, Clone, Copy)]
pub struct Embedder {

View File

@ -0,0 +1,381 @@
pub mod composite;
pub mod hf;
pub mod manual;
pub mod ollama;
pub mod openai;
pub mod rest;
use std::num::NonZeroUsize;
use std::sync::Mutex;
use std::time::Instant;
use composite::SubEmbedderOptions;
use crate::progress::EmbedderStats;
use crate::prompt::PromptData;
use crate::vector::error::{EmbedError, NewEmbedderError};
use crate::vector::{DistributionShift, Embedding};
use crate::ThreadPoolNoAbort;
/// An embedder can be used to transform text into embeddings.
#[derive(Debug)]
pub enum Embedder {
/// An embedder based on running local models, fetched from the Hugging Face Hub.
HuggingFace(hf::Embedder),
/// An embedder based on making embedding queries against the OpenAI API.
OpenAi(openai::Embedder),
/// An embedder based on the user providing the embeddings in the documents and queries.
UserProvided(manual::Embedder),
/// An embedder based on making embedding queries against an <https://ollama.com> embedding server.
Ollama(ollama::Embedder),
/// An embedder based on making embedding queries against a generic JSON/REST embedding server.
Rest(rest::Embedder),
/// An embedder composed of an embedder at search time and an embedder at indexing time.
Composite(composite::Embedder),
}
/// Configuration for an embedder.
#[derive(Debug, Clone, Default, serde::Deserialize, serde::Serialize)]
pub struct EmbeddingConfig {
/// Options of the embedder, specific to each kind of embedder
pub embedder_options: EmbedderOptions,
/// Document template
pub prompt: PromptData,
/// If this embedder is binary quantized
pub quantized: Option<bool>,
// TODO: add metrics and anything needed
}
impl EmbeddingConfig {
pub fn quantized(&self) -> bool {
self.quantized.unwrap_or_default()
}
}
/// Options of an embedder, specific to each kind of embedder.
#[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
pub enum EmbedderOptions {
HuggingFace(hf::EmbedderOptions),
OpenAi(openai::EmbedderOptions),
Ollama(ollama::EmbedderOptions),
UserProvided(manual::EmbedderOptions),
Rest(rest::EmbedderOptions),
Composite(composite::EmbedderOptions),
}
impl EmbedderOptions {
pub fn fragment(&self, name: &str) -> Option<&serde_json::Value> {
match &self {
EmbedderOptions::HuggingFace(_)
| EmbedderOptions::OpenAi(_)
| EmbedderOptions::Ollama(_)
| EmbedderOptions::UserProvided(_) => None,
EmbedderOptions::Rest(embedder_options) => {
embedder_options.indexing_fragments.get(name)
}
EmbedderOptions::Composite(embedder_options) => {
if let SubEmbedderOptions::Rest(embedder_options) = &embedder_options.index {
embedder_options.indexing_fragments.get(name)
} else {
None
}
}
}
}
pub fn has_fragments(&self) -> bool {
match &self {
EmbedderOptions::HuggingFace(_)
| EmbedderOptions::OpenAi(_)
| EmbedderOptions::Ollama(_)
| EmbedderOptions::UserProvided(_) => false,
EmbedderOptions::Rest(embedder_options) => {
!embedder_options.indexing_fragments.is_empty()
}
EmbedderOptions::Composite(embedder_options) => {
if let SubEmbedderOptions::Rest(embedder_options) = &embedder_options.index {
!embedder_options.indexing_fragments.is_empty()
} else {
false
}
}
}
}
}
impl Default for EmbedderOptions {
fn default() -> Self {
Self::HuggingFace(Default::default())
}
}
impl Embedder {
/// Spawns a new embedder built from its options.
pub fn new(
options: EmbedderOptions,
cache_cap: usize,
) -> std::result::Result<Self, NewEmbedderError> {
Ok(match options {
EmbedderOptions::HuggingFace(options) => {
Self::HuggingFace(hf::Embedder::new(options, cache_cap)?)
}
EmbedderOptions::OpenAi(options) => {
Self::OpenAi(openai::Embedder::new(options, cache_cap)?)
}
EmbedderOptions::Ollama(options) => {
Self::Ollama(ollama::Embedder::new(options, cache_cap)?)
}
EmbedderOptions::UserProvided(options) => {
Self::UserProvided(manual::Embedder::new(options))
}
EmbedderOptions::Rest(options) => Self::Rest(rest::Embedder::new(
options,
cache_cap,
rest::ConfigurationSource::User,
)?),
EmbedderOptions::Composite(options) => {
Self::Composite(composite::Embedder::new(options, cache_cap)?)
}
})
}
/// Embed in search context
#[tracing::instrument(level = "debug", skip_all, target = "search")]
pub fn embed_search(
&self,
query: SearchQuery<'_>,
deadline: Option<Instant>,
) -> std::result::Result<Embedding, EmbedError> {
match query {
SearchQuery::Text(text) => self.embed_search_text(text, deadline),
SearchQuery::Media { q, media } => self.embed_search_media(q, media, deadline),
}
}
pub fn embed_search_text(
&self,
text: &str,
deadline: Option<Instant>,
) -> std::result::Result<Embedding, EmbedError> {
if let Some(cache) = self.cache() {
if let Some(embedding) = cache.get(text) {
tracing::trace!(text, "embedding found in cache");
return Ok(embedding);
}
}
let embedding = match self {
Embedder::HuggingFace(embedder) => embedder.embed_one(text),
Embedder::OpenAi(embedder) => embedder
.embed(&[text], deadline, None)?
.pop()
.ok_or_else(EmbedError::missing_embedding),
Embedder::Ollama(embedder) => embedder
.embed(&[text], deadline, None)?
.pop()
.ok_or_else(EmbedError::missing_embedding),
Embedder::UserProvided(embedder) => embedder.embed_one(text),
Embedder::Rest(embedder) => embedder.embed_one(SearchQuery::Text(text), deadline, None),
Embedder::Composite(embedder) => embedder.search.embed_one(text, deadline, None),
}?;
if let Some(cache) = self.cache() {
cache.put(text.to_owned(), embedding.clone());
}
Ok(embedding)
}
pub fn embed_search_media(
&self,
q: Option<&str>,
media: Option<&serde_json::Value>,
deadline: Option<Instant>,
) -> std::result::Result<Embedding, EmbedError> {
let Embedder::Rest(embedder) = self else {
return Err(EmbedError::rest_media_not_a_rest());
};
embedder.embed_one(SearchQuery::Media { q, media }, deadline, None)
}
/// Embed multiple chunks of texts.
///
/// Each chunk is composed of one or multiple texts.
pub fn embed_index(
&self,
text_chunks: Vec<Vec<String>>,
threads: &ThreadPoolNoAbort,
embedder_stats: &EmbedderStats,
) -> std::result::Result<Vec<Vec<Embedding>>, EmbedError> {
match self {
Embedder::HuggingFace(embedder) => embedder.embed_index(text_chunks),
Embedder::OpenAi(embedder) => {
embedder.embed_index(text_chunks, threads, embedder_stats)
}
Embedder::Ollama(embedder) => {
embedder.embed_index(text_chunks, threads, embedder_stats)
}
Embedder::UserProvided(embedder) => embedder.embed_index(text_chunks),
Embedder::Rest(embedder) => embedder.embed_index(text_chunks, threads, embedder_stats),
Embedder::Composite(embedder) => {
embedder.index.embed_index(text_chunks, threads, embedder_stats)
}
}
}
/// Non-owning variant of [`Self::embed_index`].
pub fn embed_index_ref(
&self,
texts: &[&str],
threads: &ThreadPoolNoAbort,
embedder_stats: &EmbedderStats,
) -> std::result::Result<Vec<Embedding>, EmbedError> {
match self {
Embedder::HuggingFace(embedder) => embedder.embed_index_ref(texts),
Embedder::OpenAi(embedder) => embedder.embed_index_ref(texts, threads, embedder_stats),
Embedder::Ollama(embedder) => embedder.embed_index_ref(texts, threads, embedder_stats),
Embedder::UserProvided(embedder) => embedder.embed_index_ref(texts),
Embedder::Rest(embedder) => embedder.embed_index_ref(texts, threads, embedder_stats),
Embedder::Composite(embedder) => {
embedder.index.embed_index_ref(texts, threads, embedder_stats)
}
}
}
pub fn embed_index_ref_fragments(
&self,
fragments: &[serde_json::Value],
threads: &ThreadPoolNoAbort,
embedder_stats: &EmbedderStats,
) -> std::result::Result<Vec<Embedding>, EmbedError> {
if let Embedder::Rest(embedder) = self {
embedder.embed_index_ref(fragments, threads, embedder_stats)
} else {
let Embedder::Composite(embedder) = self else {
unimplemented!("embedding fragments is only available for rest embedders")
};
let crate::vector::embedder::composite::SubEmbedder::Rest(embedder) = &embedder.index
else {
unimplemented!("embedding fragments is only available for rest embedders")
};
embedder.embed_index_ref(fragments, threads, embedder_stats)
}
}
/// Indicates the preferred number of chunks to pass to [`Self::embed_chunks`]
pub fn chunk_count_hint(&self) -> usize {
match self {
Embedder::HuggingFace(embedder) => embedder.chunk_count_hint(),
Embedder::OpenAi(embedder) => embedder.chunk_count_hint(),
Embedder::Ollama(embedder) => embedder.chunk_count_hint(),
Embedder::UserProvided(_) => 100,
Embedder::Rest(embedder) => embedder.chunk_count_hint(),
Embedder::Composite(embedder) => embedder.index.chunk_count_hint(),
}
}
/// Indicates the preferred number of texts in a single chunk passed to [`Self::embed`]
pub fn prompt_count_in_chunk_hint(&self) -> usize {
match self {
Embedder::HuggingFace(embedder) => embedder.prompt_count_in_chunk_hint(),
Embedder::OpenAi(embedder) => embedder.prompt_count_in_chunk_hint(),
Embedder::Ollama(embedder) => embedder.prompt_count_in_chunk_hint(),
Embedder::UserProvided(_) => 1,
Embedder::Rest(embedder) => embedder.prompt_count_in_chunk_hint(),
Embedder::Composite(embedder) => embedder.index.prompt_count_in_chunk_hint(),
}
}
/// Indicates the dimensions of a single embedding produced by the embedder.
pub fn dimensions(&self) -> usize {
match self {
Embedder::HuggingFace(embedder) => embedder.dimensions(),
Embedder::OpenAi(embedder) => embedder.dimensions(),
Embedder::Ollama(embedder) => embedder.dimensions(),
Embedder::UserProvided(embedder) => embedder.dimensions(),
Embedder::Rest(embedder) => embedder.dimensions(),
Embedder::Composite(embedder) => embedder.dimensions(),
}
}
/// An optional distribution used to apply an affine transformation to the similarity score of a document.
pub fn distribution(&self) -> Option<DistributionShift> {
match self {
Embedder::HuggingFace(embedder) => embedder.distribution(),
Embedder::OpenAi(embedder) => embedder.distribution(),
Embedder::Ollama(embedder) => embedder.distribution(),
Embedder::UserProvided(embedder) => embedder.distribution(),
Embedder::Rest(embedder) => embedder.distribution(),
Embedder::Composite(embedder) => embedder.distribution(),
}
}
pub fn uses_document_template(&self) -> bool {
match self {
Embedder::HuggingFace(_)
| Embedder::OpenAi(_)
| Embedder::Ollama(_)
| Embedder::Rest(_) => true,
Embedder::UserProvided(_) => false,
Embedder::Composite(embedder) => embedder.index.uses_document_template(),
}
}
fn cache(&self) -> Option<&EmbeddingCache> {
match self {
Embedder::HuggingFace(embedder) => Some(embedder.cache()),
Embedder::OpenAi(embedder) => Some(embedder.cache()),
Embedder::UserProvided(_) => None,
Embedder::Ollama(embedder) => Some(embedder.cache()),
Embedder::Rest(embedder) => Some(embedder.cache()),
Embedder::Composite(embedder) => embedder.search.cache(),
}
}
}
#[derive(Clone, Copy)]
pub enum SearchQuery<'a> {
Text(&'a str),
Media { q: Option<&'a str>, media: Option<&'a serde_json::Value> },
}
#[derive(Debug)]
struct EmbeddingCache {
data: Option<Mutex<lru::LruCache<String, Embedding>>>,
}
impl EmbeddingCache {
const MAX_TEXT_LEN: usize = 2000;
pub fn new(cap: usize) -> Self {
let data = NonZeroUsize::new(cap).map(lru::LruCache::new).map(Mutex::new);
Self { data }
}
/// Get the embedding corresponding to `text`, if any is present in the cache.
pub fn get(&self, text: &str) -> Option<Embedding> {
let data = self.data.as_ref()?;
if text.len() > Self::MAX_TEXT_LEN {
return None;
}
let mut cache = data.lock().unwrap();
cache.get(text).cloned()
}
/// Puts a new embedding for the specified `text`
pub fn put(&self, text: String, embedding: Embedding) {
let Some(data) = self.data.as_ref() else {
return;
};
if text.len() > Self::MAX_TEXT_LEN {
return;
}
tracing::trace!(text, "embedding added to cache");
let mut cache = data.lock().unwrap();
cache.put(text, embedding);
}
}

View File

@ -3,12 +3,12 @@ use std::time::Instant;
use rayon::iter::{IntoParallelIterator as _, ParallelIterator as _};
use rayon::slice::ParallelSlice as _;
use super::error::{EmbedError, EmbedErrorKind, NewEmbedderError, NewEmbedderErrorKind};
use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions};
use super::{DistributionShift, EmbeddingCache, REQUEST_PARALLELISM};
use super::EmbeddingCache;
use crate::error::FaultSource;
use crate::progress::EmbedderStats;
use crate::vector::Embedding;
use crate::vector::error::{EmbedError, EmbedErrorKind, NewEmbedderError, NewEmbedderErrorKind};
use crate::vector::{DistributionShift, Embedding, REQUEST_PARALLELISM};
use crate::ThreadPoolNoAbort;
#[derive(Debug)]
@ -88,7 +88,7 @@ impl Embedder {
Err(NewEmbedderError {
kind:
NewEmbedderErrorKind::CouldNotDetermineDimension(EmbedError {
kind: super::error::EmbedErrorKind::RestOtherStatusCode(404, error),
kind: EmbedErrorKind::RestOtherStatusCode(404, error),
fault: _,
}),
fault: _,

View File

@ -5,13 +5,12 @@ use ordered_float::OrderedFloat;
use rayon::iter::{IntoParallelIterator, ParallelIterator as _};
use rayon::slice::ParallelSlice as _;
use super::error::{EmbedError, NewEmbedderError};
use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions};
use super::{DistributionShift, EmbeddingCache, REQUEST_PARALLELISM};
use super::{DistributionShift, EmbeddingCache};
use crate::error::FaultSource;
use crate::progress::EmbedderStats;
use crate::vector::error::EmbedErrorKind;
use crate::vector::Embedding;
use crate::vector::error::{EmbedError, EmbedErrorKind, NewEmbedderError};
use crate::vector::{Embedding, REQUEST_PARALLELISM};
use crate::ThreadPoolNoAbort;
#[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)]

View File

@ -8,14 +8,12 @@ use rayon::slice::ParallelSlice as _;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use super::error::EmbedErrorKind;
use super::json_template::{InjectableValue, JsonTemplate};
use super::{
DistributionShift, EmbedError, Embedding, EmbeddingCache, NewEmbedderError, SearchQuery,
REQUEST_PARALLELISM,
};
use super::EmbeddingCache;
use crate::error::FaultSource;
use crate::progress::EmbedderStats;
use crate::vector::error::{EmbedError, EmbedErrorKind, NewEmbedderError};
use crate::vector::json_template::{InjectableValue, JsonTemplate};
use crate::vector::{DistributionShift, Embedding, SearchQuery, REQUEST_PARALLELISM};
use crate::ThreadPoolNoAbort;
// retrying in case of failure
@ -315,7 +313,7 @@ impl Embedder {
}
pub fn chunk_count_hint(&self) -> usize {
super::REQUEST_PARALLELISM
crate::vector::REQUEST_PARALLELISM
}
pub fn prompt_count_in_chunk_hint(&self) -> usize {

View File

@ -0,0 +1,76 @@
/// One or multiple embeddings stored consecutively in a flat vector.
#[derive(Debug, PartialEq)]
pub struct Embeddings<F> {
data: Vec<F>,
dimension: usize,
}
impl<F> Embeddings<F> {
/// Declares an empty vector of embeddings of the specified dimensions.
pub fn new(dimension: usize) -> Self {
Self { data: Default::default(), dimension }
}
/// Declares a vector of embeddings containing a single element.
///
/// The dimension is inferred from the length of the passed embedding.
pub fn from_single_embedding(embedding: Vec<F>) -> Self {
Self { dimension: embedding.len(), data: embedding }
}
/// Declares a vector of embeddings from its components.
///
/// `data.len()` must be a multiple of `dimension`, otherwise an error is returned.
pub fn from_inner(data: Vec<F>, dimension: usize) -> Result<Self, Vec<F>> {
let mut this = Self::new(dimension);
this.append(data)?;
Ok(this)
}
/// Returns the number of embeddings in this vector of embeddings.
pub fn embedding_count(&self) -> usize {
self.data.len() / self.dimension
}
/// Dimension of a single embedding.
pub fn dimension(&self) -> usize {
self.dimension
}
/// Deconstructs self into the inner flat vector.
pub fn into_inner(self) -> Vec<F> {
self.data
}
/// A reference to the inner flat vector.
pub fn as_inner(&self) -> &[F] {
&self.data
}
/// Iterates over the embeddings contained in the flat vector.
pub fn iter(&self) -> impl Iterator<Item = &'_ [F]> + '_ {
self.data.as_slice().chunks_exact(self.dimension)
}
/// Push an embedding at the end of the embeddings.
///
/// If `embedding.len() != self.dimension`, then the push operation fails.
pub fn push(&mut self, mut embedding: Vec<F>) -> Result<(), Vec<F>> {
if embedding.len() != self.dimension {
return Err(embedding);
}
self.data.append(&mut embedding);
Ok(())
}
/// Append a flat vector of embeddings at the end of the embeddings.
///
/// If `embeddings.len() % self.dimension != 0`, then the append operation fails.
pub fn append(&mut self, mut embeddings: Vec<F>) -> Result<(), Vec<F>> {
if embeddings.len() % self.dimension != 0 {
return Err(embeddings);
}
self.data.append(&mut embeddings);
Ok(())
}
}

View File

@ -6,10 +6,10 @@ use hf_hub::api::sync::ApiError;
use itertools::Itertools as _;
use super::parsed_vectors::ParsedVectorsDiff;
use super::rest::ConfigurationSource;
use super::MAX_COMPOSITE_DISTANCE;
use crate::error::FaultSource;
use crate::update::new::vector_document::VectorDocument;
use crate::vector::embedder::composite::MAX_COMPOSITE_DISTANCE;
use crate::vector::embedder::rest::ConfigurationSource;
use crate::{FieldDistribution, PanicCatched};
#[derive(Debug, thiserror::Error)]

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,81 @@
use std::collections::HashMap;
use std::sync::Arc;
use super::Embedder;
use crate::prompt::Prompt;
use crate::vector::json_template::JsonTemplate;
/// Map of runtime embedder data.
#[derive(Clone, Default)]
pub struct RuntimeEmbedders(HashMap<String, Arc<RuntimeEmbedder>>);
pub struct RuntimeEmbedder {
pub embedder: Arc<Embedder>,
pub document_template: Prompt,
fragments: Vec<RuntimeFragment>,
pub is_quantized: bool,
}
impl RuntimeEmbedder {
pub fn new(
embedder: Arc<Embedder>,
document_template: Prompt,
mut fragments: Vec<RuntimeFragment>,
is_quantized: bool,
) -> Self {
fragments.sort_unstable_by(|left, right| left.name.cmp(&right.name));
Self { embedder, document_template, fragments, is_quantized }
}
/// The runtime fragments sorted by name.
pub fn fragments(&self) -> &[RuntimeFragment] {
self.fragments.as_slice()
}
}
pub struct RuntimeFragment {
pub name: String,
pub id: u8,
pub template: JsonTemplate,
}
impl RuntimeEmbedders {
/// Create the map from its internal component.s
pub fn new(data: HashMap<String, Arc<RuntimeEmbedder>>) -> Self {
Self(data)
}
pub fn contains(&self, name: &str) -> bool {
self.0.contains_key(name)
}
/// Get an embedder configuration and template from its name.
pub fn get(&self, name: &str) -> Option<&Arc<RuntimeEmbedder>> {
self.0.get(name)
}
pub fn inner_as_ref(&self) -> &HashMap<String, Arc<RuntimeEmbedder>> {
&self.0
}
pub fn into_inner(self) -> HashMap<String, Arc<RuntimeEmbedder>> {
self.0
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
}
impl IntoIterator for RuntimeEmbedders {
type Item = (String, Arc<RuntimeEmbedder>);
type IntoIter = std::collections::hash_map::IntoIter<String, Arc<RuntimeEmbedder>>;
fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
}
}

View File

@ -2,7 +2,8 @@ use bumpalo::collections::Vec as BVec;
use bumpalo::Bump;
use serde_json::Value;
use super::{EmbedError, Embedder, Embedding};
use super::error::EmbedError;
use super::{Embedder, Embedding};
use crate::progress::EmbedderStats;
use crate::{DocumentId, Result, ThreadPoolNoAbort};
type ExtractorId = u8;

View File

@ -8,12 +8,12 @@ use roaring::RoaringBitmap;
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
use super::composite::SubEmbedderOptions;
use super::hf::OverridePooling;
use super::{ollama, openai, DistributionShift, EmbedderOptions};
use crate::prompt::{default_max_bytes, PromptData};
use crate::update::Setting;
use crate::vector::EmbeddingConfig;
use crate::vector::embedder::composite::{self, SubEmbedderOptions};
use crate::vector::embedder::hf::{self, OverridePooling};
use crate::vector::embedder::{manual, ollama, openai, rest, EmbedderOptions};
use crate::vector::{DistributionShift, EmbeddingConfig};
use crate::UserError;
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq, Deserr, ToSchema)]
@ -1789,12 +1789,7 @@ pub struct Fragment {
impl EmbeddingSettings {
fn from_hugging_face(
super::hf::EmbedderOptions {
model,
revision,
distribution,
pooling,
}: super::hf::EmbedderOptions,
hf::EmbedderOptions { model, revision, distribution, pooling }: hf::EmbedderOptions,
document_template: Setting<String>,
document_template_max_bytes: Setting<usize>,
quantized: Option<bool>,
@ -1822,13 +1817,13 @@ impl EmbeddingSettings {
}
fn from_openai(
super::openai::EmbedderOptions {
openai::EmbedderOptions {
url,
api_key,
embedding_model,
dimensions,
distribution,
}: super::openai::EmbedderOptions,
}: openai::EmbedderOptions,
document_template: Setting<String>,
document_template_max_bytes: Setting<usize>,
quantized: Option<bool>,
@ -1856,13 +1851,13 @@ impl EmbeddingSettings {
}
fn from_ollama(
super::ollama::EmbedderOptions {
embedding_model,
url,
api_key,
distribution,
dimensions,
}: super::ollama::EmbedderOptions,
ollama::EmbedderOptions {
embedding_model,
url,
api_key,
distribution,
dimensions,
}: ollama::EmbedderOptions,
document_template: Setting<String>,
document_template_max_bytes: Setting<usize>,
quantized: Option<bool>,
@ -1890,7 +1885,7 @@ impl EmbeddingSettings {
}
fn from_user_provided(
super::manual::EmbedderOptions { dimensions, distribution }: super::manual::EmbedderOptions,
manual::EmbedderOptions { dimensions, distribution }: manual::EmbedderOptions,
quantized: Option<bool>,
) -> Self {
Self {
@ -1916,7 +1911,7 @@ impl EmbeddingSettings {
}
fn from_rest(
super::rest::EmbedderOptions {
rest::EmbedderOptions {
api_key,
dimensions,
url,
@ -1926,7 +1921,7 @@ impl EmbeddingSettings {
response,
distribution,
headers,
}: super::rest::EmbedderOptions,
}: rest::EmbedderOptions,
document_template: Setting<String>,
document_template_max_bytes: Setting<usize>,
quantized: Option<bool>,
@ -2015,37 +2010,36 @@ impl From<EmbeddingConfig> for EmbeddingSettings {
document_template_max_bytes,
quantized,
),
super::EmbedderOptions::Composite(super::composite::EmbedderOptions {
search,
index,
}) => Self {
source: Setting::Set(EmbedderSource::Composite),
model: Setting::NotSet,
revision: Setting::NotSet,
pooling: Setting::NotSet,
api_key: Setting::NotSet,
dimensions: Setting::NotSet,
binary_quantized: Setting::some_or_not_set(quantized),
document_template: Setting::NotSet,
document_template_max_bytes: Setting::NotSet,
url: Setting::NotSet,
indexing_fragments: Setting::NotSet,
search_fragments: Setting::NotSet,
request: Setting::NotSet,
response: Setting::NotSet,
headers: Setting::NotSet,
distribution: Setting::some_or_not_set(search.distribution()),
search_embedder: Setting::Set(SubEmbeddingSettings::from_options(
search,
Setting::NotSet,
Setting::NotSet,
)),
indexing_embedder: Setting::Set(SubEmbeddingSettings::from_options(
index,
Setting::Set(prompt.template),
document_template_max_bytes,
)),
},
super::EmbedderOptions::Composite(composite::EmbedderOptions { search, index }) => {
Self {
source: Setting::Set(EmbedderSource::Composite),
model: Setting::NotSet,
revision: Setting::NotSet,
pooling: Setting::NotSet,
api_key: Setting::NotSet,
dimensions: Setting::NotSet,
binary_quantized: Setting::some_or_not_set(quantized),
document_template: Setting::NotSet,
document_template_max_bytes: Setting::NotSet,
url: Setting::NotSet,
indexing_fragments: Setting::NotSet,
search_fragments: Setting::NotSet,
request: Setting::NotSet,
response: Setting::NotSet,
headers: Setting::NotSet,
distribution: Setting::some_or_not_set(search.distribution()),
search_embedder: Setting::Set(SubEmbeddingSettings::from_options(
search,
Setting::NotSet,
Setting::NotSet,
)),
indexing_embedder: Setting::Set(SubEmbeddingSettings::from_options(
index,
Setting::Set(prompt.template),
document_template_max_bytes,
)),
}
}
}
}
}
@ -2212,7 +2206,7 @@ impl From<EmbeddingSettings> for EmbeddingConfig {
)
.into(),
EmbedderSource::Composite => {
super::EmbedderOptions::Composite(super::composite::EmbedderOptions {
super::EmbedderOptions::Composite(composite::EmbedderOptions {
// it is important to give the distribution to the search here, as this is from where we'll retrieve it
search: SubEmbedderOptions::from_settings(
search_embedder.set().unwrap(),
@ -2290,9 +2284,9 @@ impl SubEmbedderOptions {
dimensions: Setting<usize>,
distribution: Setting<DistributionShift>,
) -> Self {
let mut options = super::openai::EmbedderOptions::with_default_model(None);
let mut options = openai::EmbedderOptions::with_default_model(None);
if let Some(model) = model.set() {
if let Some(model) = super::openai::EmbeddingModel::from_name(&model) {
if let Some(model) = openai::EmbeddingModel::from_name(&model) {
options.embedding_model = model;
}
}
@ -2314,7 +2308,7 @@ impl SubEmbedderOptions {
pooling: Setting<OverridePooling>,
distribution: Setting<DistributionShift>,
) -> Self {
let mut options = super::hf::EmbedderOptions::default();
let mut options = hf::EmbedderOptions::default();
if let Some(model) = model.set() {
options.model = model;
// Reset the revision if we are setting the model.
@ -2334,10 +2328,7 @@ impl SubEmbedderOptions {
SubEmbedderOptions::HuggingFace(options)
}
fn user_provided(dimensions: usize, distribution: Setting<DistributionShift>) -> Self {
Self::UserProvided(super::manual::EmbedderOptions {
dimensions,
distribution: distribution.set(),
})
Self::UserProvided(manual::EmbedderOptions { dimensions, distribution: distribution.set() })
}
#[allow(clippy::too_many_arguments)]
@ -2352,7 +2343,7 @@ impl SubEmbedderOptions {
dimensions: Setting<usize>,
distribution: Setting<DistributionShift>,
) -> Self {
Self::Rest(super::rest::EmbedderOptions {
Self::Rest(rest::EmbedderOptions {
api_key: api_key.set(),
dimensions: dimensions.set(),
url,
@ -2386,11 +2377,7 @@ impl SubEmbedderOptions {
distribution: Setting<DistributionShift>,
) -> Self {
let mut options: ollama::EmbedderOptions =
super::ollama::EmbedderOptions::with_default_model(
api_key.set(),
url.set(),
dimensions.set(),
);
ollama::EmbedderOptions::with_default_model(api_key.set(), url.set(), dimensions.set());
if let Some(model) = model.set() {
options.embedding_model = model;
}

File diff suppressed because it is too large Load Diff