From c1bacd53a7a4e20dafecc31c2484cbfd8732a723 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 12 Aug 2025 16:00:47 +0200 Subject: [PATCH] Dispatch the vector store based on the index version --- .../src/update/index_documents/transform.rs | 5 +- crates/milli/src/vector/mod.rs | 261 ++++++++++++++---- 2 files changed, 214 insertions(+), 52 deletions(-) diff --git a/crates/milli/src/update/index_documents/transform.rs b/crates/milli/src/update/index_documents/transform.rs index a8e0d318c..e0d8b82f8 100644 --- a/crates/milli/src/update/index_documents/transform.rs +++ b/crates/milli/src/update/index_documents/transform.rs @@ -884,10 +884,7 @@ impl<'a, 'i> Transform<'a, 'i> { InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None }, )?; - let injected_vectors: std::result::Result< - serde_json::Map, - hannoy::Error, - > = readers + let injected_vectors: crate::Result<_> = readers .iter() .filter_map(|(name, (reader, user_provided))| { if !user_provided.contains(docid) { diff --git a/crates/milli/src/vector/mod.rs b/crates/milli/src/vector/mod.rs index 7a78f15a4..8b3ba7577 100644 --- a/crates/milli/src/vector/mod.rs +++ b/crates/milli/src/vector/mod.rs @@ -67,6 +67,12 @@ impl VectorStore { self.embedder_index } + /// Whether we must use the arroy to read the vector store. + pub fn version_uses_arroy(&self) -> bool { + let (major, minor, _patch) = self.version; + major == 1 && minor < 18 + } + fn arroy_readers<'a, D: arroy::Distance>( &'a self, rtxn: &'a RoTxn<'a>, @@ -111,14 +117,45 @@ impl VectorStore { rtxn: &RoTxn, store_id: u8, with_items: F, - ) -> Result + ) -> crate::Result where F: FnOnce(&RoaringBitmap) -> O, { - if self.quantized { - self._items_in_store(rtxn, self.quantized_db(), store_id, with_items) + if self.version_uses_arroy() { + if self.quantized { + self._arroy_items_in_store(rtxn, self.arroy_quantized_db(), store_id, with_items) + .map_err(Into::into) + } else { + self._arroy_items_in_store(rtxn, self.arroy_angular_db(), store_id, with_items) + .map_err(Into::into) + } } else { - self._items_in_store(rtxn, self.angular_db(), store_id, with_items) + if self.quantized { + self._items_in_store(rtxn, self.quantized_db(), store_id, with_items) + .map_err(Into::into) + } else { + self._items_in_store(rtxn, self.angular_db(), store_id, with_items) + .map_err(Into::into) + } + } + } + + fn _arroy_items_in_store( + &self, + rtxn: &RoTxn, + db: arroy::Database, + store_id: u8, + with_items: F, + ) -> Result + where + F: FnOnce(&RoaringBitmap) -> O, + { + let index = vector_store_for_embedder(self.embedder_index, store_id); + let reader = arroy::Reader::open(rtxn, index, db); + match reader { + Ok(reader) => Ok(with_items(reader.item_ids())), + Err(arroy::Error::MissingMetadata(_)) => Ok(with_items(&RoaringBitmap::new())), + Err(err) => Err(err), } } @@ -132,7 +169,7 @@ impl VectorStore { where F: FnOnce(&RoaringBitmap) -> O, { - let index = hannoy_store_for_embedder(self.embedder_index, store_id); + let index = vector_store_for_embedder(self.embedder_index, store_id); let reader = hannoy::Reader::open(rtxn, index, db); match reader { Ok(reader) => Ok(with_items(reader.item_ids())), @@ -141,19 +178,35 @@ impl VectorStore { } } - pub fn dimensions(&self, rtxn: &RoTxn) -> Result, hannoy::Error> { - if self.quantized { - Ok(self - .readers(rtxn, self.quantized_db()) - .next() - .transpose()? - .map(|reader| reader.dimensions())) + pub fn dimensions(&self, rtxn: &RoTxn) -> crate::Result> { + if self.version_uses_arroy() { + if self.quantized { + Ok(self + .arroy_readers(rtxn, self.arroy_quantized_db()) + .next() + .transpose()? + .map(|reader| reader.dimensions())) + } else { + Ok(self + .arroy_readers(rtxn, self.arroy_angular_db()) + .next() + .transpose()? + .map(|reader| reader.dimensions())) + } } else { - Ok(self - .readers(rtxn, self.angular_db()) - .next() - .transpose()? - .map(|reader| reader.dimensions())) + if self.quantized { + Ok(self + .readers(rtxn, self.quantized_db()) + .next() + .transpose()? + .map(|reader| reader.dimensions())) + } else { + Ok(self + .readers(rtxn, self.angular_db()) + .next() + .transpose()? + .map(|reader| reader.dimensions())) + } } } @@ -336,7 +389,7 @@ impl VectorStore { ) -> Result<(), hannoy::Error> { let dimension = vector.len(); - let index = hannoy_store_for_embedder(self.embedder_index, store_id); + let index = vector_store_for_embedder(self.embedder_index, store_id); let writer = hannoy::Writer::new(db, index, dimension); writer.add_item(wtxn, item_id, vector) } @@ -390,7 +443,7 @@ impl VectorStore { store_id: u8, dimensions: usize, ) -> Result { - let index = hannoy_store_for_embedder(self.embedder_index, store_id); + let index = vector_store_for_embedder(self.embedder_index, store_id); let writer = hannoy::Writer::new(db, index, dimensions); writer.del_item(wtxn, item_id) } @@ -420,7 +473,7 @@ impl VectorStore { store_id: u8, dimensions: usize, ) -> Result<(), hannoy::Error> { - let index = hannoy_store_for_embedder(self.embedder_index, store_id); + let index = vector_store_for_embedder(self.embedder_index, store_id); let writer = hannoy::Writer::new(db, index, dimensions); writer.clear(wtxn) } @@ -481,20 +534,36 @@ impl VectorStore { rtxn: &RoTxn, dimension: usize, item: hannoy::ItemId, - ) -> Result { + ) -> crate::Result { for index in vector_store_range_for_embedder(self.embedder_index) { - let contains = if self.quantized { - let writer = hannoy::Writer::new(self.quantized_db(), index, dimension); - if writer.is_empty(rtxn)? { - continue; + let contains = if self.version_uses_arroy() { + if self.quantized { + let writer = arroy::Writer::new(self.arroy_quantized_db(), index, dimension); + if writer.is_empty(rtxn)? { + continue; + } + writer.contains_item(rtxn, item)? + } else { + let writer = arroy::Writer::new(self.arroy_angular_db(), index, dimension); + if writer.is_empty(rtxn)? { + continue; + } + writer.contains_item(rtxn, item)? } - writer.contains_item(rtxn, item)? } else { - let writer = hannoy::Writer::new(self.angular_db(), index, dimension); - if writer.is_empty(rtxn)? { - continue; + if self.quantized { + let writer = hannoy::Writer::new(self.quantized_db(), index, dimension); + if writer.is_empty(rtxn)? { + continue; + } + writer.contains_item(rtxn, item)? + } else { + let writer = hannoy::Writer::new(self.angular_db(), index, dimension); + if writer.is_empty(rtxn)? { + continue; + } + writer.contains_item(rtxn, item)? } - writer.contains_item(rtxn, item)? }; if contains { return Ok(contains); @@ -509,14 +578,53 @@ impl VectorStore { item: ItemId, limit: usize, filter: Option<&RoaringBitmap>, - ) -> Result, hannoy::Error> { - if self.quantized { - self._nns_by_item(rtxn, self.quantized_db(), item, limit, filter) + ) -> crate::Result> { + if self.version_uses_arroy() { + if self.quantized { + self._arroy_nns_by_item(rtxn, self.arroy_quantized_db(), item, limit, filter) + .map_err(Into::into) + } else { + self._arroy_nns_by_item(rtxn, self.arroy_angular_db(), item, limit, filter) + .map_err(Into::into) + } } else { - self._nns_by_item(rtxn, self.angular_db(), item, limit, filter) + if self.quantized { + self._nns_by_item(rtxn, self.quantized_db(), item, limit, filter) + .map_err(Into::into) + } else { + self._nns_by_item(rtxn, self.angular_db(), item, limit, filter).map_err(Into::into) + } } } + fn _arroy_nns_by_item( + &self, + rtxn: &RoTxn, + db: arroy::Database, + item: ItemId, + limit: usize, + filter: Option<&RoaringBitmap>, + ) -> Result, arroy::Error> { + let mut results = Vec::new(); + + for reader in self.arroy_readers(rtxn, db) { + let reader = reader?; + let mut searcher = reader.nns(limit); + if let Some(filter) = filter { + if reader.item_ids().is_disjoint(filter) { + continue; + } + searcher.candidates(filter); + } + + if let Some(mut ret) = searcher.by_item(rtxn, item)? { + results.append(&mut ret); + } + } + results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance)); + Ok(results) + } + fn _nns_by_item( &self, rtxn: &RoTxn, @@ -552,14 +660,54 @@ impl VectorStore { vector: &[f32], limit: usize, filter: Option<&RoaringBitmap>, - ) -> Result, hannoy::Error> { - if self.quantized { - self._nns_by_vector(rtxn, self.quantized_db(), vector, limit, filter) + ) -> crate::Result> { + if self.version_uses_arroy() { + if self.quantized { + self._arroy_nns_by_vector(rtxn, self.arroy_quantized_db(), vector, limit, filter) + .map_err(Into::into) + } else { + self._arroy_nns_by_vector(rtxn, self.arroy_angular_db(), vector, limit, filter) + .map_err(Into::into) + } } else { - self._nns_by_vector(rtxn, self.angular_db(), vector, limit, filter) + if self.quantized { + self._nns_by_vector(rtxn, self.quantized_db(), vector, limit, filter) + .map_err(Into::into) + } else { + self._nns_by_vector(rtxn, self.angular_db(), vector, limit, filter) + .map_err(Into::into) + } } } + fn _arroy_nns_by_vector( + &self, + rtxn: &RoTxn, + db: arroy::Database, + vector: &[f32], + limit: usize, + filter: Option<&RoaringBitmap>, + ) -> Result, arroy::Error> { + let mut results = Vec::new(); + + for reader in self.arroy_readers(rtxn, db) { + let reader = reader?; + let mut searcher = reader.nns(limit); + if let Some(filter) = filter { + if reader.item_ids().is_disjoint(filter) { + continue; + } + searcher.candidates(filter); + } + + results.append(&mut searcher.by_vector(rtxn, vector)?); + } + + results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance)); + + Ok(results) + } + fn _nns_by_vector( &self, rtxn: &RoTxn, @@ -589,22 +737,39 @@ impl VectorStore { Ok(results) } - pub fn item_vectors(&self, rtxn: &RoTxn, item_id: u32) -> Result>, hannoy::Error> { + pub fn item_vectors(&self, rtxn: &RoTxn, item_id: u32) -> crate::Result>> { let mut vectors = Vec::new(); - if self.quantized { - for reader in self.readers(rtxn, self.quantized_db()) { - if let Some(vec) = reader?.item_vector(rtxn, item_id)? { - vectors.push(vec); + if self.version_uses_arroy() { + if self.quantized { + for reader in self.arroy_readers(rtxn, self.arroy_quantized_db()) { + if let Some(vec) = reader?.item_vector(rtxn, item_id)? { + vectors.push(vec); + } + } + } else { + for reader in self.arroy_readers(rtxn, self.arroy_angular_db()) { + if let Some(vec) = reader?.item_vector(rtxn, item_id)? { + vectors.push(vec); + } } } } else { - for reader in self.readers(rtxn, self.angular_db()) { - if let Some(vec) = reader?.item_vector(rtxn, item_id)? { - vectors.push(vec); + if self.quantized { + for reader in self.readers(rtxn, self.quantized_db()) { + if let Some(vec) = reader?.item_vector(rtxn, item_id)? { + vectors.push(vec); + } + } + } else { + for reader in self.readers(rtxn, self.angular_db()) { + if let Some(vec) = reader?.item_vector(rtxn, item_id)? { + vectors.push(vec); + } } } } + Ok(vectors) } @@ -1300,10 +1465,10 @@ pub const fn is_cuda_enabled() -> bool { } fn vector_store_range_for_embedder(embedder_id: u8) -> impl Iterator { - (0..=u8::MAX).map(move |store_id| hannoy_store_for_embedder(embedder_id, store_id)) + (0..=u8::MAX).map(move |store_id| vector_store_for_embedder(embedder_id, store_id)) } -fn hannoy_store_for_embedder(embedder_id: u8, store_id: u8) -> u16 { +fn vector_store_for_embedder(embedder_id: u8, store_id: u8) -> u16 { let embedder_id = (embedder_id as u16) << 8; embedder_id | (store_id as u16) }