From 5fc7872ab316d8a6a9799e27769ef4bd833f23cc Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Thu, 28 Aug 2025 16:32:47 +0200 Subject: [PATCH] Make sure the vector store works with both arroy and hannoy --- crates/milli/src/vector/store.rs | 905 ++++++++++++++++++++----------- 1 file changed, 588 insertions(+), 317 deletions(-) diff --git a/crates/milli/src/vector/store.rs b/crates/milli/src/vector/store.rs index cbc11aa77..c39596da2 100644 --- a/crates/milli/src/vector/store.rs +++ b/crates/milli/src/vector/store.rs @@ -28,6 +28,8 @@ pub struct VectorStore { } impl VectorStore { + // backend-independent public functions + pub fn new( backend: VectorStoreBackend, database: hannoy::Database, @@ -41,41 +43,7 @@ impl VectorStore { self.embedder_index } - fn arroy_readers<'a, D: arroy::Distance>( - &'a self, - rtxn: &'a RoTxn<'a>, - db: arroy::Database, - ) -> impl Iterator, arroy::Error>> + 'a { - vector_store_range_for_embedder(self.embedder_index).filter_map(move |index| { - match arroy::Reader::open(rtxn, index, db) { - Ok(reader) => match reader.is_empty(rtxn) { - Ok(false) => Some(Ok(reader)), - Ok(true) => None, - Err(e) => Some(Err(e)), - }, - Err(arroy::Error::MissingMetadata(_)) => None, - Err(e) => Some(Err(e)), - } - }) - } - - fn readers<'a, D: hannoy::Distance>( - &'a self, - rtxn: &'a RoTxn<'a>, - db: hannoy::Database, - ) -> impl Iterator, hannoy::Error>> + 'a { - vector_store_range_for_embedder(self.embedder_index).filter_map(move |index| { - match hannoy::Reader::open(rtxn, index, db) { - Ok(reader) => match reader.is_empty(rtxn) { - Ok(false) => Some(Ok(reader)), - Ok(true) => None, - Err(e) => Some(Err(e)), - }, - Err(hannoy::Error::MissingMetadata(_)) => None, - Err(e) => Some(Err(e)), - } - }) - } + // backend-dependent public functions /// The item ids that are present in the store specified by its id. /// @@ -91,55 +59,18 @@ impl VectorStore { { if self.backend == VectorStoreBackend::Arroy { if self.quantized { - self._arroy_items_in_store(rtxn, self.arroy_quantized_db(), store_id, with_items) + self._arroy_items_in_store(rtxn, self._arroy_quantized_db(), store_id, with_items) .map_err(Into::into) } else { - self._arroy_items_in_store(rtxn, self.arroy_angular_db(), store_id, with_items) + self._arroy_items_in_store(rtxn, self._arroy_angular_db(), store_id, with_items) .map_err(Into::into) } } else if self.quantized { - self._items_in_store(rtxn, self.quantized_db(), store_id, with_items) + self._hannoy_items_in_store(rtxn, self._hannoy_quantized_db(), store_id, with_items) .map_err(Into::into) } else { - self._items_in_store(rtxn, self.angular_db(), store_id, with_items).map_err(Into::into) - } - } - - fn _arroy_items_in_store( - &self, - rtxn: &RoTxn, - db: arroy::Database, - store_id: u8, - with_items: F, - ) -> Result - where - F: FnOnce(&RoaringBitmap) -> O, - { - let index = vector_store_for_embedder(self.embedder_index, store_id); - let reader = arroy::Reader::open(rtxn, index, db); - match reader { - Ok(reader) => Ok(with_items(reader.item_ids())), - Err(arroy::Error::MissingMetadata(_)) => Ok(with_items(&RoaringBitmap::new())), - Err(err) => Err(err), - } - } - - fn _items_in_store( - &self, - rtxn: &RoTxn, - db: hannoy::Database, - store_id: u8, - with_items: F, - ) -> Result - where - F: FnOnce(&RoaringBitmap) -> O, - { - let index = vector_store_for_embedder(self.embedder_index, store_id); - let reader = hannoy::Reader::open(rtxn, index, db); - match reader { - Ok(reader) => Ok(with_items(reader.item_ids())), - Err(hannoy::Error::MissingMetadata(_)) => Ok(with_items(&RoaringBitmap::new())), - Err(err) => Err(err), + self._hannoy_items_in_store(rtxn, self._hannoy_angular_db(), store_id, with_items) + .map_err(Into::into) } } @@ -147,26 +78,26 @@ impl VectorStore { if self.backend == VectorStoreBackend::Arroy { if self.quantized { Ok(self - .arroy_readers(rtxn, self.arroy_quantized_db()) + ._arroy_readers(rtxn, self._arroy_quantized_db()) .next() .transpose()? .map(|reader| reader.dimensions())) } else { Ok(self - .arroy_readers(rtxn, self.arroy_angular_db()) + ._arroy_readers(rtxn, self._arroy_angular_db()) .next() .transpose()? .map(|reader| reader.dimensions())) } } else if self.quantized { Ok(self - .readers(rtxn, self.quantized_db()) + ._hannoy_readers(rtxn, self._hannoy_quantized_db()) .next() .transpose()? .map(|reader| reader.dimensions())) } else { Ok(self - .readers(rtxn, self.angular_db()) + ._hannoy_readers(rtxn, self._hannoy_angular_db()) .next() .transpose()? .map(|reader| reader.dimensions())) @@ -178,7 +109,7 @@ impl VectorStore { self.backend = VectorStoreBackend::Hannoy; if self.quantized { let dimensions = self - .arroy_readers(wtxn, self.arroy_quantized_db()) + ._arroy_readers(wtxn, self._arroy_quantized_db()) .next() .transpose()? .map(|reader| reader.dimensions()); @@ -187,7 +118,8 @@ impl VectorStore { for index in vector_store_range_for_embedder(self.embedder_index) { let mut rng = rand::rngs::StdRng::from_entropy(); - let writer = hannoy::Writer::new(self.quantized_db(), index, dimensions); + let writer = + hannoy::Writer::new(self._hannoy_quantized_db(), index, dimensions); let mut builder = writer.builder(&mut rng).progress(progress.clone()); builder.prepare_arroy_conversion(wtxn)?; builder.build::(wtxn)?; @@ -196,7 +128,7 @@ impl VectorStore { Ok(()) } else { let dimensions = self - .arroy_readers(wtxn, self.arroy_angular_db()) + ._arroy_readers(wtxn, self._arroy_angular_db()) .next() .transpose()? .map(|reader| reader.dimensions()); @@ -205,7 +137,7 @@ impl VectorStore { for index in vector_store_range_for_embedder(self.embedder_index) { let mut rng = rand::rngs::StdRng::from_entropy(); - let writer = hannoy::Writer::new(self.angular_db(), index, dimensions); + let writer = hannoy::Writer::new(self._hannoy_angular_db(), index, dimensions); let mut builder = writer.builder(&mut rng).progress(progress.clone()); builder.prepare_arroy_conversion(wtxn)?; builder.build::(wtxn)?; @@ -217,11 +149,8 @@ impl VectorStore { self.backend = VectorStoreBackend::Arroy; if self.quantized { - /// FIXME: make sure that all of the functions in the vector store can work both in arroy and hannoy - /// (the existing implementation was assuming that only reads could happen in arroy) - /// bonus points for making it harder to forget switching on arroy on hannoy when modifying the code let dimensions = self - .readers(wtxn, self.quantized_db()) + ._hannoy_readers(wtxn, self._hannoy_quantized_db()) .next() .transpose()? .map(|reader| reader.dimensions()); @@ -230,7 +159,7 @@ impl VectorStore { for index in vector_store_range_for_embedder(self.embedder_index) { let mut rng = rand::rngs::StdRng::from_entropy(); - let writer = arroy::Writer::new(self.arroy_quantized_db(), index, dimensions); + let writer = arroy::Writer::new(self._arroy_quantized_db(), index, dimensions); let mut builder = writer.builder(&mut rng); let builder = builder.progress(|step| progress.update_progress_from_arroy(step)); @@ -241,7 +170,7 @@ impl VectorStore { Ok(()) } else { let dimensions = self - .readers(wtxn, self.angular_db()) + ._hannoy_readers(wtxn, self._hannoy_angular_db()) .next() .transpose()? .map(|reader| reader.dimensions()); @@ -250,7 +179,7 @@ impl VectorStore { for index in vector_store_range_for_embedder(self.embedder_index) { let mut rng = rand::rngs::StdRng::from_entropy(); - let writer = arroy::Writer::new(self.arroy_angular_db(), index, dimensions); + let writer = arroy::Writer::new(self._arroy_angular_db(), index, dimensions); let mut builder = writer.builder(&mut rng); let builder = builder.progress(|step| progress.update_progress_from_arroy(step)); @@ -271,16 +200,61 @@ impl VectorStore { rng: &mut R, dimension: usize, quantizing: bool, - hannoy_memory: Option, + available_memory: Option, cancel: &(impl Fn() -> bool + Sync + Send), - ) -> Result<(), hannoy::Error> { + ) -> Result<(), crate::Error> { for index in vector_store_range_for_embedder(self.embedder_index) { - if self.quantized { - let writer = hannoy::Writer::new(self.quantized_db(), index, dimension); + if self.backend == VectorStoreBackend::Arroy { + if self.quantized { + let writer = arroy::Writer::new(self._arroy_quantized_db(), index, dimension); + if writer.need_build(wtxn)? { + let mut builder = writer.builder(rng); + let builder = + builder.progress(|step| progress.update_progress_from_arroy(step)); + builder + .available_memory(available_memory.unwrap_or(usize::MAX)) + .cancel(cancel) + .build(wtxn)?; + } else if writer.is_empty(wtxn)? { + continue; + } + } else { + let writer = arroy::Writer::new(self._arroy_angular_db(), index, dimension); + // If we are quantizing the databases, we can't know from meilisearch + // if the db was empty but still contained the wrong metadata, thus we need + // to quantize everything and can't stop early. Since this operation can + // only happens once in the life of an embedder, it's not very performance + // sensitive. + if quantizing && !self.quantized { + let writer = writer + .prepare_changing_distance::( + wtxn, + )?; + let mut builder = writer.builder(rng); + let builder = + builder.progress(|step| progress.update_progress_from_arroy(step)); + builder + .available_memory(available_memory.unwrap_or(usize::MAX)) + .cancel(cancel) + .build(wtxn)?; + } else if writer.need_build(wtxn)? { + let mut builder = writer.builder(rng); + let builder = + builder.progress(|step| progress.update_progress_from_arroy(step)); + builder + .available_memory(available_memory.unwrap_or(usize::MAX)) + .cancel(cancel) + .build(wtxn)?; + } else if writer.is_empty(wtxn)? { + continue; + } + } + } else if self.quantized { + let writer = hannoy::Writer::new(self._hannoy_quantized_db(), index, dimension); if writer.need_build(wtxn)? { let mut builder = writer.builder(rng).progress(progress.clone()); builder - .available_memory(hannoy_memory.unwrap_or(usize::MAX)) + .available_memory(available_memory.unwrap_or(usize::MAX)) .cancel(cancel) .ef_construction(HANNOY_EF_CONSTRUCTION) .build::(wtxn)?; @@ -288,24 +262,24 @@ impl VectorStore { continue; } } else { - let writer = hannoy::Writer::new(self.angular_db(), index, dimension); + let writer = hannoy::Writer::new(self._hannoy_angular_db(), index, dimension); // If we are quantizing the databases, we can't know from meilisearch // if the db was empty but still contained the wrong metadata, thus we need // to quantize everything and can't stop early. Since this operation can - // only happens once in the life of an embedder, it's not very performances + // only happens once in the life of an embedder, it's not very performance // sensitive. if quantizing && !self.quantized { let writer = writer.prepare_changing_distance::(wtxn)?; let mut builder = writer.builder(rng).progress(progress.clone()); builder - .available_memory(hannoy_memory.unwrap_or(usize::MAX)) + .available_memory(available_memory.unwrap_or(usize::MAX)) .cancel(cancel) .ef_construction(HANNOY_EF_CONSTRUCTION) .build::(wtxn)?; } else if writer.need_build(wtxn)? { let mut builder = writer.builder(rng).progress(progress.clone()); builder - .available_memory(hannoy_memory.unwrap_or(usize::MAX)) + .available_memory(available_memory.unwrap_or(usize::MAX)) .cancel(cancel) .ef_construction(HANNOY_EF_CONSTRUCTION) .build::(wtxn)?; @@ -326,16 +300,24 @@ impl VectorStore { wtxn: &mut RwTxn, item_id: hannoy::ItemId, embeddings: &Embeddings, - ) -> Result<(), hannoy::Error> { + ) -> Result<(), crate::Error> { let dimension = embeddings.dimension(); for (index, vector) in vector_store_range_for_embedder(self.embedder_index).zip(embeddings.iter()) { - if self.quantized { - hannoy::Writer::new(self.quantized_db(), index, dimension) + if self.backend == VectorStoreBackend::Arroy { + if self.quantized { + arroy::Writer::new(self._arroy_quantized_db(), index, dimension) + .add_item(wtxn, item_id, vector)? + } else { + arroy::Writer::new(self._arroy_angular_db(), index, dimension) + .add_item(wtxn, item_id, vector)? + } + } else if self.quantized { + hannoy::Writer::new(self._hannoy_quantized_db(), index, dimension) .add_item(wtxn, item_id, vector)? } else { - hannoy::Writer::new(self.angular_db(), index, dimension) + hannoy::Writer::new(self._hannoy_angular_db(), index, dimension) .add_item(wtxn, item_id, vector)? } } @@ -348,31 +330,22 @@ impl VectorStore { wtxn: &mut RwTxn, item_id: hannoy::ItemId, vector: &[f32], - ) -> Result<(), hannoy::Error> { - if self.quantized { - self._add_item(wtxn, self.quantized_db(), item_id, vector) - } else { - self._add_item(wtxn, self.angular_db(), item_id, vector) - } - } - - fn _add_item( - &self, - wtxn: &mut RwTxn, - db: hannoy::Database, - item_id: hannoy::ItemId, - vector: &[f32], - ) -> Result<(), hannoy::Error> { - let dimension = vector.len(); - - for index in vector_store_range_for_embedder(self.embedder_index) { - let writer = hannoy::Writer::new(db, index, dimension); - if !writer.contains_item(wtxn, item_id)? { - writer.add_item(wtxn, item_id, vector)?; - break; + ) -> Result<(), crate::Error> { + if self.backend == VectorStoreBackend::Arroy { + if self.quantized { + self._arroy_add_item(wtxn, self._arroy_quantized_db(), item_id, vector) + .map_err(Into::into) + } else { + self._arroy_add_item(wtxn, self._arroy_angular_db(), item_id, vector) + .map_err(Into::into) } + } else if self.quantized { + self._hannoy_add_item(wtxn, self._hannoy_quantized_db(), item_id, vector) + .map_err(Into::into) + } else { + self._hannoy_add_item(wtxn, self._hannoy_angular_db(), item_id, vector) + .map_err(Into::into) } - Ok(()) } /// Add a vector associated with a document in store specified by its id. @@ -384,27 +357,70 @@ impl VectorStore { item_id: hannoy::ItemId, store_id: u8, vector: &[f32], - ) -> Result<(), hannoy::Error> { - if self.quantized { - self._add_item_in_store(wtxn, self.quantized_db(), item_id, store_id, vector) + ) -> Result<(), crate::Error> { + if self.backend == VectorStoreBackend::Arroy { + if self.quantized { + self._arroy_add_item_in_store( + wtxn, + self._arroy_quantized_db(), + item_id, + store_id, + vector, + ) + .map_err(Into::into) + } else { + self._arroy_add_item_in_store( + wtxn, + self._arroy_angular_db(), + item_id, + store_id, + vector, + ) + .map_err(Into::into) + } + } else if self.quantized { + self._hannoy_add_item_in_store( + wtxn, + self._hannoy_quantized_db(), + item_id, + store_id, + vector, + ) + .map_err(Into::into) } else { - self._add_item_in_store(wtxn, self.angular_db(), item_id, store_id, vector) + self._hannoy_add_item_in_store( + wtxn, + self._hannoy_angular_db(), + item_id, + store_id, + vector, + ) + .map_err(Into::into) } } - fn _add_item_in_store( + /// Delete one item from its value. + pub fn del_item( &self, wtxn: &mut RwTxn, - db: hannoy::Database, item_id: hannoy::ItemId, - store_id: u8, vector: &[f32], - ) -> Result<(), hannoy::Error> { - let dimension = vector.len(); - - let index = vector_store_for_embedder(self.embedder_index, store_id); - let writer = hannoy::Writer::new(db, index, dimension); - writer.add_item(wtxn, item_id, vector) + ) -> Result { + if self.backend == VectorStoreBackend::Arroy { + if self.quantized { + self._arroy_del_item(wtxn, self._arroy_quantized_db(), item_id, vector) + .map_err(Into::into) + } else { + self._arroy_del_item(wtxn, self._arroy_angular_db(), item_id, vector) + .map_err(Into::into) + } + } else if self.quantized { + self._hannoy_del_item(wtxn, self._hannoy_quantized_db(), item_id, vector) + .map_err(Into::into) + } else { + self._hannoy_del_item(wtxn, self._hannoy_angular_db(), item_id, vector) + .map_err(Into::into) + } } /// Delete all embeddings from a specific `item_id` @@ -413,13 +429,21 @@ impl VectorStore { wtxn: &mut RwTxn, dimension: usize, item_id: hannoy::ItemId, - ) -> Result<(), hannoy::Error> { + ) -> Result<(), crate::Error> { for index in vector_store_range_for_embedder(self.embedder_index) { - if self.quantized { - let writer = hannoy::Writer::new(self.quantized_db(), index, dimension); + if self.backend == VectorStoreBackend::Arroy { + if self.quantized { + let writer = arroy::Writer::new(self._arroy_quantized_db(), index, dimension); + writer.del_item(wtxn, item_id)?; + } else { + let writer = arroy::Writer::new(self._arroy_angular_db(), index, dimension); + writer.del_item(wtxn, item_id)?; + } + } else if self.quantized { + let writer = hannoy::Writer::new(self._hannoy_quantized_db(), index, dimension); writer.del_item(wtxn, item_id)?; } else { - let writer = hannoy::Writer::new(self.angular_db(), index, dimension); + let writer = hannoy::Writer::new(self._hannoy_angular_db(), index, dimension); writer.del_item(wtxn, item_id)?; } } @@ -440,27 +464,48 @@ impl VectorStore { item_id: hannoy::ItemId, store_id: u8, dimensions: usize, - ) -> Result { - if self.quantized { - self._del_item_in_store(wtxn, self.quantized_db(), item_id, store_id, dimensions) + ) -> Result { + if self.backend == VectorStoreBackend::Arroy { + if self.quantized { + self._arroy_del_item_in_store( + wtxn, + self._arroy_quantized_db(), + item_id, + store_id, + dimensions, + ) + .map_err(Into::into) + } else { + self._arroy_del_item_in_store( + wtxn, + self._arroy_angular_db(), + item_id, + store_id, + dimensions, + ) + .map_err(Into::into) + } + } else if self.quantized { + self._hannoy_del_item_in_store( + wtxn, + self._hannoy_quantized_db(), + item_id, + store_id, + dimensions, + ) + .map_err(Into::into) } else { - self._del_item_in_store(wtxn, self.angular_db(), item_id, store_id, dimensions) + self._hannoy_del_item_in_store( + wtxn, + self._hannoy_angular_db(), + item_id, + store_id, + dimensions, + ) + .map_err(Into::into) } } - fn _del_item_in_store( - &self, - wtxn: &mut RwTxn, - db: hannoy::Database, - item_id: hannoy::ItemId, - store_id: u8, - dimensions: usize, - ) -> Result { - let index = vector_store_for_embedder(self.embedder_index, store_id); - let writer = hannoy::Writer::new(db, index, dimensions); - writer.del_item(wtxn, item_id) - } - /// Removes all items from the store specified by its id. /// /// # Warning @@ -471,68 +516,48 @@ impl VectorStore { wtxn: &mut RwTxn, store_id: u8, dimensions: usize, - ) -> Result<(), hannoy::Error> { - if self.quantized { - self._clear_store(wtxn, self.quantized_db(), store_id, dimensions) - } else { - self._clear_store(wtxn, self.angular_db(), store_id, dimensions) - } - } - - fn _clear_store( - &self, - wtxn: &mut RwTxn, - db: hannoy::Database, - store_id: u8, - dimensions: usize, - ) -> Result<(), hannoy::Error> { - let index = vector_store_for_embedder(self.embedder_index, store_id); - let writer = hannoy::Writer::new(db, index, dimensions); - writer.clear(wtxn) - } - - /// Delete one item from its value. - pub fn del_item( - &self, - wtxn: &mut RwTxn, - item_id: hannoy::ItemId, - vector: &[f32], - ) -> Result { - if self.quantized { - self._del_item(wtxn, self.quantized_db(), item_id, vector) - } else { - self._del_item(wtxn, self.angular_db(), item_id, vector) - } - } - - fn _del_item( - &self, - wtxn: &mut RwTxn, - db: hannoy::Database, - item_id: hannoy::ItemId, - vector: &[f32], - ) -> Result { - let dimension = vector.len(); - - for index in vector_store_range_for_embedder(self.embedder_index) { - let writer = hannoy::Writer::new(db, index, dimension); - if writer.contains_item(wtxn, item_id)? { - return writer.del_item(wtxn, item_id); - } - } - Ok(false) - } - - pub fn clear(&self, wtxn: &mut RwTxn, dimension: usize) -> Result<(), hannoy::Error> { - for index in vector_store_range_for_embedder(self.embedder_index) { + ) -> Result<(), crate::Error> { + if self.backend == VectorStoreBackend::Arroy { if self.quantized { - let writer = hannoy::Writer::new(self.quantized_db(), index, dimension); + self._arroy_clear_store(wtxn, self._arroy_quantized_db(), store_id, dimensions) + .map_err(Into::into) + } else { + self._arroy_clear_store(wtxn, self._arroy_angular_db(), store_id, dimensions) + .map_err(Into::into) + } + } else if self.quantized { + self._hannoy_clear_store(wtxn, self._hannoy_quantized_db(), store_id, dimensions) + .map_err(Into::into) + } else { + self._hannoy_clear_store(wtxn, self._hannoy_angular_db(), store_id, dimensions) + .map_err(Into::into) + } + } + + pub fn clear(&self, wtxn: &mut RwTxn, dimension: usize) -> Result<(), crate::Error> { + for index in vector_store_range_for_embedder(self.embedder_index) { + if self.backend == VectorStoreBackend::Arroy { + if self.quantized { + let writer = arroy::Writer::new(self._arroy_quantized_db(), index, dimension); + if writer.is_empty(wtxn)? { + continue; + } + writer.clear(wtxn)?; + } else { + let writer = arroy::Writer::new(self._arroy_angular_db(), index, dimension); + if writer.is_empty(wtxn)? { + continue; + } + writer.clear(wtxn)?; + } + } else if self.quantized { + let writer = hannoy::Writer::new(self._hannoy_quantized_db(), index, dimension); if writer.is_empty(wtxn)? { continue; } writer.clear(wtxn)?; } else { - let writer = hannoy::Writer::new(self.angular_db(), index, dimension); + let writer = hannoy::Writer::new(self._hannoy_angular_db(), index, dimension); if writer.is_empty(wtxn)? { continue; } @@ -551,26 +576,26 @@ impl VectorStore { for index in vector_store_range_for_embedder(self.embedder_index) { let contains = if self.backend == VectorStoreBackend::Arroy { if self.quantized { - let writer = arroy::Writer::new(self.arroy_quantized_db(), index, dimension); + let writer = arroy::Writer::new(self._arroy_quantized_db(), index, dimension); if writer.is_empty(rtxn)? { continue; } writer.contains_item(rtxn, item)? } else { - let writer = arroy::Writer::new(self.arroy_angular_db(), index, dimension); + let writer = arroy::Writer::new(self._arroy_angular_db(), index, dimension); if writer.is_empty(rtxn)? { continue; } writer.contains_item(rtxn, item)? } } else if self.quantized { - let writer = hannoy::Writer::new(self.quantized_db(), index, dimension); + let writer = hannoy::Writer::new(self._hannoy_quantized_db(), index, dimension); if writer.is_empty(rtxn)? { continue; } writer.contains_item(rtxn, item)? } else { - let writer = hannoy::Writer::new(self.angular_db(), index, dimension); + let writer = hannoy::Writer::new(self._hannoy_angular_db(), index, dimension); if writer.is_empty(rtxn)? { continue; } @@ -592,18 +617,345 @@ impl VectorStore { ) -> crate::Result> { if self.backend == VectorStoreBackend::Arroy { if self.quantized { - self._arroy_nns_by_item(rtxn, self.arroy_quantized_db(), item, limit, filter) + self._arroy_nns_by_item(rtxn, self._arroy_quantized_db(), item, limit, filter) .map_err(Into::into) } else { - self._arroy_nns_by_item(rtxn, self.arroy_angular_db(), item, limit, filter) + self._arroy_nns_by_item(rtxn, self._arroy_angular_db(), item, limit, filter) .map_err(Into::into) } } else if self.quantized { - self._nns_by_item(rtxn, self.quantized_db(), item, limit, filter).map_err(Into::into) + self._hannoy_nns_by_item(rtxn, self._hannoy_quantized_db(), item, limit, filter) + .map_err(Into::into) } else { - self._nns_by_item(rtxn, self.angular_db(), item, limit, filter).map_err(Into::into) + self._hannoy_nns_by_item(rtxn, self._hannoy_angular_db(), item, limit, filter) + .map_err(Into::into) } } + pub fn nns_by_vector( + &self, + rtxn: &RoTxn, + vector: &[f32], + limit: usize, + filter: Option<&RoaringBitmap>, + ) -> crate::Result> { + if self.backend == VectorStoreBackend::Arroy { + if self.quantized { + self._arroy_nns_by_vector(rtxn, self._arroy_quantized_db(), vector, limit, filter) + .map_err(Into::into) + } else { + self._arroy_nns_by_vector(rtxn, self._arroy_angular_db(), vector, limit, filter) + .map_err(Into::into) + } + } else if self.quantized { + self._hannoy_nns_by_vector(rtxn, self._hannoy_quantized_db(), vector, limit, filter) + .map_err(Into::into) + } else { + self._hannoy_nns_by_vector(rtxn, self._hannoy_angular_db(), vector, limit, filter) + .map_err(Into::into) + } + } + pub fn item_vectors(&self, rtxn: &RoTxn, item_id: u32) -> crate::Result>> { + let mut vectors = Vec::new(); + + if self.backend == VectorStoreBackend::Arroy { + if self.quantized { + for reader in self._arroy_readers(rtxn, self._arroy_quantized_db()) { + if let Some(vec) = reader?.item_vector(rtxn, item_id)? { + vectors.push(vec); + } + } + } else { + for reader in self._arroy_readers(rtxn, self._arroy_angular_db()) { + if let Some(vec) = reader?.item_vector(rtxn, item_id)? { + vectors.push(vec); + } + } + } + } else if self.quantized { + for reader in self._hannoy_readers(rtxn, self._hannoy_quantized_db()) { + if let Some(vec) = reader?.item_vector(rtxn, item_id)? { + vectors.push(vec); + } + } + } else { + for reader in self._hannoy_readers(rtxn, self._hannoy_angular_db()) { + if let Some(vec) = reader?.item_vector(rtxn, item_id)? { + vectors.push(vec); + } + } + } + + Ok(vectors) + } + + pub fn aggregate_stats( + &self, + rtxn: &RoTxn, + stats: &mut HannoyStats, + ) -> Result<(), crate::Error> { + if self.backend == VectorStoreBackend::Arroy { + if self.quantized { + for reader in self._arroy_readers(rtxn, self._arroy_quantized_db()) { + let reader = reader?; + let documents = reader.item_ids(); + stats.documents |= documents; + stats.number_of_embeddings += documents.len(); + } + } else { + for reader in self._arroy_readers(rtxn, self._arroy_angular_db()) { + let reader = reader?; + let documents = reader.item_ids(); + stats.documents |= documents; + stats.number_of_embeddings += documents.len(); + } + } + } else if self.quantized { + for reader in self._hannoy_readers(rtxn, self._hannoy_quantized_db()) { + let reader = reader?; + let documents = reader.item_ids(); + stats.documents |= documents; + stats.number_of_embeddings += documents.len(); + } + } else { + for reader in self._hannoy_readers(rtxn, self._hannoy_angular_db()) { + let reader = reader?; + let documents = reader.item_ids(); + stats.documents |= documents; + stats.number_of_embeddings += documents.len(); + } + } + + Ok(()) + } + + // private functions + fn _arroy_readers<'a, D: arroy::Distance>( + &'a self, + rtxn: &'a RoTxn<'a>, + db: arroy::Database, + ) -> impl Iterator, arroy::Error>> + 'a { + vector_store_range_for_embedder(self.embedder_index).filter_map(move |index| { + match arroy::Reader::open(rtxn, index, db) { + Ok(reader) => match reader.is_empty(rtxn) { + Ok(false) => Some(Ok(reader)), + Ok(true) => None, + Err(e) => Some(Err(e)), + }, + Err(arroy::Error::MissingMetadata(_)) => None, + Err(e) => Some(Err(e)), + } + }) + } + + fn _hannoy_readers<'a, D: hannoy::Distance>( + &'a self, + rtxn: &'a RoTxn<'a>, + db: hannoy::Database, + ) -> impl Iterator, hannoy::Error>> + 'a { + vector_store_range_for_embedder(self.embedder_index).filter_map(move |index| { + match hannoy::Reader::open(rtxn, index, db) { + Ok(reader) => match reader.is_empty(rtxn) { + Ok(false) => Some(Ok(reader)), + Ok(true) => None, + Err(e) => Some(Err(e)), + }, + Err(hannoy::Error::MissingMetadata(_)) => None, + Err(e) => Some(Err(e)), + } + }) + } + + fn _arroy_items_in_store( + &self, + rtxn: &RoTxn, + db: arroy::Database, + store_id: u8, + with_items: F, + ) -> Result + where + F: FnOnce(&RoaringBitmap) -> O, + { + let index = vector_store_for_embedder(self.embedder_index, store_id); + let reader = arroy::Reader::open(rtxn, index, db); + match reader { + Ok(reader) => Ok(with_items(reader.item_ids())), + Err(arroy::Error::MissingMetadata(_)) => Ok(with_items(&RoaringBitmap::new())), + Err(err) => Err(err), + } + } + + fn _hannoy_items_in_store( + &self, + rtxn: &RoTxn, + db: hannoy::Database, + store_id: u8, + with_items: F, + ) -> Result + where + F: FnOnce(&RoaringBitmap) -> O, + { + let index = vector_store_for_embedder(self.embedder_index, store_id); + let reader = hannoy::Reader::open(rtxn, index, db); + match reader { + Ok(reader) => Ok(with_items(reader.item_ids())), + Err(hannoy::Error::MissingMetadata(_)) => Ok(with_items(&RoaringBitmap::new())), + Err(err) => Err(err), + } + } + + fn _arroy_add_item( + &self, + wtxn: &mut RwTxn, + db: arroy::Database, + item_id: arroy::ItemId, + vector: &[f32], + ) -> Result<(), arroy::Error> { + let dimension = vector.len(); + + for index in vector_store_range_for_embedder(self.embedder_index) { + let writer = arroy::Writer::new(db, index, dimension); + if !writer.contains_item(wtxn, item_id)? { + writer.add_item(wtxn, item_id, vector)?; + break; + } + } + Ok(()) + } + + fn _hannoy_add_item( + &self, + wtxn: &mut RwTxn, + db: hannoy::Database, + item_id: hannoy::ItemId, + vector: &[f32], + ) -> Result<(), hannoy::Error> { + let dimension = vector.len(); + + for index in vector_store_range_for_embedder(self.embedder_index) { + let writer = hannoy::Writer::new(db, index, dimension); + if !writer.contains_item(wtxn, item_id)? { + writer.add_item(wtxn, item_id, vector)?; + break; + } + } + Ok(()) + } + + fn _arroy_add_item_in_store( + &self, + wtxn: &mut RwTxn, + db: arroy::Database, + item_id: arroy::ItemId, + store_id: u8, + vector: &[f32], + ) -> Result<(), arroy::Error> { + let dimension = vector.len(); + + let index = vector_store_for_embedder(self.embedder_index, store_id); + let writer = arroy::Writer::new(db, index, dimension); + writer.add_item(wtxn, item_id, vector) + } + + fn _hannoy_add_item_in_store( + &self, + wtxn: &mut RwTxn, + db: hannoy::Database, + item_id: hannoy::ItemId, + store_id: u8, + vector: &[f32], + ) -> Result<(), hannoy::Error> { + let dimension = vector.len(); + + let index = vector_store_for_embedder(self.embedder_index, store_id); + let writer = hannoy::Writer::new(db, index, dimension); + writer.add_item(wtxn, item_id, vector) + } + + fn _arroy_del_item_in_store( + &self, + wtxn: &mut RwTxn, + db: arroy::Database, + item_id: arroy::ItemId, + store_id: u8, + dimensions: usize, + ) -> Result { + let index = vector_store_for_embedder(self.embedder_index, store_id); + let writer = arroy::Writer::new(db, index, dimensions); + writer.del_item(wtxn, item_id) + } + + fn _hannoy_del_item_in_store( + &self, + wtxn: &mut RwTxn, + db: hannoy::Database, + item_id: hannoy::ItemId, + store_id: u8, + dimensions: usize, + ) -> Result { + let index = vector_store_for_embedder(self.embedder_index, store_id); + let writer = hannoy::Writer::new(db, index, dimensions); + writer.del_item(wtxn, item_id) + } + + fn _arroy_clear_store( + &self, + wtxn: &mut RwTxn, + db: arroy::Database, + store_id: u8, + dimensions: usize, + ) -> Result<(), arroy::Error> { + let index = vector_store_for_embedder(self.embedder_index, store_id); + let writer = arroy::Writer::new(db, index, dimensions); + writer.clear(wtxn) + } + + fn _hannoy_clear_store( + &self, + wtxn: &mut RwTxn, + db: hannoy::Database, + store_id: u8, + dimensions: usize, + ) -> Result<(), hannoy::Error> { + let index = vector_store_for_embedder(self.embedder_index, store_id); + let writer = hannoy::Writer::new(db, index, dimensions); + writer.clear(wtxn) + } + + fn _arroy_del_item( + &self, + wtxn: &mut RwTxn, + db: arroy::Database, + item_id: arroy::ItemId, + vector: &[f32], + ) -> Result { + let dimension = vector.len(); + + for index in vector_store_range_for_embedder(self.embedder_index) { + let writer = arroy::Writer::new(db, index, dimension); + if writer.contains_item(wtxn, item_id)? { + return writer.del_item(wtxn, item_id); + } + } + Ok(false) + } + + fn _hannoy_del_item( + &self, + wtxn: &mut RwTxn, + db: hannoy::Database, + item_id: hannoy::ItemId, + vector: &[f32], + ) -> Result { + let dimension = vector.len(); + + for index in vector_store_range_for_embedder(self.embedder_index) { + let writer = hannoy::Writer::new(db, index, dimension); + if writer.contains_item(wtxn, item_id)? { + return writer.del_item(wtxn, item_id); + } + } + Ok(false) + } fn _arroy_nns_by_item( &self, @@ -615,7 +967,7 @@ impl VectorStore { ) -> Result, arroy::Error> { let mut results = Vec::new(); - for reader in self.arroy_readers(rtxn, db) { + for reader in self._arroy_readers(rtxn, db) { let reader = reader?; let mut searcher = reader.nns(limit); if let Some(filter) = filter { @@ -633,7 +985,7 @@ impl VectorStore { Ok(results) } - fn _nns_by_item( + fn _hannoy_nns_by_item( &self, rtxn: &RoTxn, db: hannoy::Database, @@ -643,7 +995,7 @@ impl VectorStore { ) -> Result, hannoy::Error> { let mut results = Vec::new(); - for reader in self.readers(rtxn, db) { + for reader in self._hannoy_readers(rtxn, db) { let reader = reader?; let mut searcher = reader.nns(limit); searcher.ef_search((limit * 10).max(100)); // TODO find better ef @@ -659,29 +1011,6 @@ impl VectorStore { Ok(results) } - pub fn nns_by_vector( - &self, - rtxn: &RoTxn, - vector: &[f32], - limit: usize, - filter: Option<&RoaringBitmap>, - ) -> crate::Result> { - if self.backend == VectorStoreBackend::Arroy { - if self.quantized { - self._arroy_nns_by_vector(rtxn, self.arroy_quantized_db(), vector, limit, filter) - .map_err(Into::into) - } else { - self._arroy_nns_by_vector(rtxn, self.arroy_angular_db(), vector, limit, filter) - .map_err(Into::into) - } - } else if self.quantized { - self._nns_by_vector(rtxn, self.quantized_db(), vector, limit, filter) - .map_err(Into::into) - } else { - self._nns_by_vector(rtxn, self.angular_db(), vector, limit, filter).map_err(Into::into) - } - } - fn _arroy_nns_by_vector( &self, rtxn: &RoTxn, @@ -692,7 +1021,7 @@ impl VectorStore { ) -> Result, arroy::Error> { let mut results = Vec::new(); - for reader in self.arroy_readers(rtxn, db) { + for reader in self._arroy_readers(rtxn, db) { let reader = reader?; let mut searcher = reader.nns(limit); if let Some(filter) = filter { @@ -710,7 +1039,7 @@ impl VectorStore { Ok(results) } - fn _nns_by_vector( + fn _hannoy_nns_by_vector( &self, rtxn: &RoTxn, db: hannoy::Database, @@ -720,7 +1049,7 @@ impl VectorStore { ) -> Result, hannoy::Error> { let mut results = Vec::new(); - for reader in self.readers(rtxn, db) { + for reader in self._hannoy_readers(rtxn, db) { let reader = reader?; let mut searcher = reader.nns(limit); searcher.ef_search((limit * 10).max(100)); // TODO find better ef @@ -736,79 +1065,21 @@ impl VectorStore { Ok(results) } - pub fn item_vectors(&self, rtxn: &RoTxn, item_id: u32) -> crate::Result>> { - let mut vectors = Vec::new(); - - if self.backend == VectorStoreBackend::Arroy { - if self.quantized { - for reader in self.arroy_readers(rtxn, self.arroy_quantized_db()) { - if let Some(vec) = reader?.item_vector(rtxn, item_id)? { - vectors.push(vec); - } - } - } else { - for reader in self.arroy_readers(rtxn, self.arroy_angular_db()) { - if let Some(vec) = reader?.item_vector(rtxn, item_id)? { - vectors.push(vec); - } - } - } - } else if self.quantized { - for reader in self.readers(rtxn, self.quantized_db()) { - if let Some(vec) = reader?.item_vector(rtxn, item_id)? { - vectors.push(vec); - } - } - } else { - for reader in self.readers(rtxn, self.angular_db()) { - if let Some(vec) = reader?.item_vector(rtxn, item_id)? { - vectors.push(vec); - } - } - } - - Ok(vectors) - } - - fn arroy_angular_db(&self) -> arroy::Database { + fn _arroy_angular_db(&self) -> arroy::Database { self.database.remap_types() } - fn arroy_quantized_db(&self) -> arroy::Database { + fn _arroy_quantized_db(&self) -> arroy::Database { self.database.remap_types() } - fn angular_db(&self) -> hannoy::Database { + fn _hannoy_angular_db(&self) -> hannoy::Database { self.database.remap_data_type() } - fn quantized_db(&self) -> hannoy::Database { + fn _hannoy_quantized_db(&self) -> hannoy::Database { self.database.remap_data_type() } - - pub fn aggregate_stats( - &self, - rtxn: &RoTxn, - stats: &mut HannoyStats, - ) -> Result<(), hannoy::Error> { - if self.quantized { - for reader in self.readers(rtxn, self.quantized_db()) { - let reader = reader?; - let documents = reader.item_ids(); - stats.documents |= documents; - stats.number_of_embeddings += documents.len(); - } - } else { - for reader in self.readers(rtxn, self.angular_db()) { - let reader = reader?; - let documents = reader.item_ids(); - stats.documents |= documents; - stats.number_of_embeddings += documents.len(); - } - } - - Ok(()) - } } #[derive(Debug, Default, Clone)]