Dispatch the vector store based on the index version

This commit is contained in:
Clément Renault
2025-08-12 16:00:47 +02:00
committed by Louis Dureuil
parent e64852208c
commit 24b017e367
2 changed files with 214 additions and 52 deletions

View File

@ -884,10 +884,7 @@ impl<'a, 'i> Transform<'a, 'i> {
InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None }, InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None },
)?; )?;
let injected_vectors: std::result::Result< let injected_vectors: crate::Result<_> = readers
serde_json::Map<String, serde_json::Value>,
hannoy::Error,
> = readers
.iter() .iter()
.filter_map(|(name, (reader, user_provided))| { .filter_map(|(name, (reader, user_provided))| {
if !user_provided.contains(docid) { if !user_provided.contains(docid) {

View File

@ -67,6 +67,12 @@ impl VectorStore {
self.embedder_index self.embedder_index
} }
/// Whether we must use the arroy to read the vector store.
pub fn version_uses_arroy(&self) -> bool {
let (major, minor, _patch) = self.version;
major == 1 && minor < 18
}
fn arroy_readers<'a, D: arroy::Distance>( fn arroy_readers<'a, D: arroy::Distance>(
&'a self, &'a self,
rtxn: &'a RoTxn<'a>, rtxn: &'a RoTxn<'a>,
@ -111,14 +117,45 @@ impl VectorStore {
rtxn: &RoTxn, rtxn: &RoTxn,
store_id: u8, store_id: u8,
with_items: F, with_items: F,
) -> Result<O, hannoy::Error> ) -> crate::Result<O>
where where
F: FnOnce(&RoaringBitmap) -> O, F: FnOnce(&RoaringBitmap) -> O,
{ {
if self.quantized { if self.version_uses_arroy() {
self._items_in_store(rtxn, self.quantized_db(), store_id, with_items) if self.quantized {
self._arroy_items_in_store(rtxn, self.arroy_quantized_db(), store_id, with_items)
.map_err(Into::into)
} else {
self._arroy_items_in_store(rtxn, self.arroy_angular_db(), store_id, with_items)
.map_err(Into::into)
}
} else { } else {
self._items_in_store(rtxn, self.angular_db(), store_id, with_items) if self.quantized {
self._items_in_store(rtxn, self.quantized_db(), store_id, with_items)
.map_err(Into::into)
} else {
self._items_in_store(rtxn, self.angular_db(), store_id, with_items)
.map_err(Into::into)
}
}
}
fn _arroy_items_in_store<D: arroy::Distance, F, O>(
&self,
rtxn: &RoTxn,
db: arroy::Database<D>,
store_id: u8,
with_items: F,
) -> Result<O, arroy::Error>
where
F: FnOnce(&RoaringBitmap) -> O,
{
let index = vector_store_for_embedder(self.embedder_index, store_id);
let reader = arroy::Reader::open(rtxn, index, db);
match reader {
Ok(reader) => Ok(with_items(reader.item_ids())),
Err(arroy::Error::MissingMetadata(_)) => Ok(with_items(&RoaringBitmap::new())),
Err(err) => Err(err),
} }
} }
@ -132,7 +169,7 @@ impl VectorStore {
where where
F: FnOnce(&RoaringBitmap) -> O, F: FnOnce(&RoaringBitmap) -> O,
{ {
let index = hannoy_store_for_embedder(self.embedder_index, store_id); let index = vector_store_for_embedder(self.embedder_index, store_id);
let reader = hannoy::Reader::open(rtxn, index, db); let reader = hannoy::Reader::open(rtxn, index, db);
match reader { match reader {
Ok(reader) => Ok(with_items(reader.item_ids())), Ok(reader) => Ok(with_items(reader.item_ids())),
@ -141,19 +178,35 @@ impl VectorStore {
} }
} }
pub fn dimensions(&self, rtxn: &RoTxn) -> Result<Option<usize>, hannoy::Error> { pub fn dimensions(&self, rtxn: &RoTxn) -> crate::Result<Option<usize>> {
if self.quantized { if self.version_uses_arroy() {
Ok(self if self.quantized {
.readers(rtxn, self.quantized_db()) Ok(self
.next() .arroy_readers(rtxn, self.arroy_quantized_db())
.transpose()? .next()
.map(|reader| reader.dimensions())) .transpose()?
.map(|reader| reader.dimensions()))
} else {
Ok(self
.arroy_readers(rtxn, self.arroy_angular_db())
.next()
.transpose()?
.map(|reader| reader.dimensions()))
}
} else { } else {
Ok(self if self.quantized {
.readers(rtxn, self.angular_db()) Ok(self
.next() .readers(rtxn, self.quantized_db())
.transpose()? .next()
.map(|reader| reader.dimensions())) .transpose()?
.map(|reader| reader.dimensions()))
} else {
Ok(self
.readers(rtxn, self.angular_db())
.next()
.transpose()?
.map(|reader| reader.dimensions()))
}
} }
} }
@ -336,7 +389,7 @@ impl VectorStore {
) -> Result<(), hannoy::Error> { ) -> Result<(), hannoy::Error> {
let dimension = vector.len(); let dimension = vector.len();
let index = hannoy_store_for_embedder(self.embedder_index, store_id); let index = vector_store_for_embedder(self.embedder_index, store_id);
let writer = hannoy::Writer::new(db, index, dimension); let writer = hannoy::Writer::new(db, index, dimension);
writer.add_item(wtxn, item_id, vector) writer.add_item(wtxn, item_id, vector)
} }
@ -390,7 +443,7 @@ impl VectorStore {
store_id: u8, store_id: u8,
dimensions: usize, dimensions: usize,
) -> Result<bool, hannoy::Error> { ) -> Result<bool, hannoy::Error> {
let index = hannoy_store_for_embedder(self.embedder_index, store_id); let index = vector_store_for_embedder(self.embedder_index, store_id);
let writer = hannoy::Writer::new(db, index, dimensions); let writer = hannoy::Writer::new(db, index, dimensions);
writer.del_item(wtxn, item_id) writer.del_item(wtxn, item_id)
} }
@ -420,7 +473,7 @@ impl VectorStore {
store_id: u8, store_id: u8,
dimensions: usize, dimensions: usize,
) -> Result<(), hannoy::Error> { ) -> Result<(), hannoy::Error> {
let index = hannoy_store_for_embedder(self.embedder_index, store_id); let index = vector_store_for_embedder(self.embedder_index, store_id);
let writer = hannoy::Writer::new(db, index, dimensions); let writer = hannoy::Writer::new(db, index, dimensions);
writer.clear(wtxn) writer.clear(wtxn)
} }
@ -481,20 +534,36 @@ impl VectorStore {
rtxn: &RoTxn, rtxn: &RoTxn,
dimension: usize, dimension: usize,
item: hannoy::ItemId, item: hannoy::ItemId,
) -> Result<bool, hannoy::Error> { ) -> crate::Result<bool> {
for index in vector_store_range_for_embedder(self.embedder_index) { for index in vector_store_range_for_embedder(self.embedder_index) {
let contains = if self.quantized { let contains = if self.version_uses_arroy() {
let writer = hannoy::Writer::new(self.quantized_db(), index, dimension); if self.quantized {
if writer.is_empty(rtxn)? { let writer = arroy::Writer::new(self.arroy_quantized_db(), index, dimension);
continue; if writer.is_empty(rtxn)? {
continue;
}
writer.contains_item(rtxn, item)?
} else {
let writer = arroy::Writer::new(self.arroy_angular_db(), index, dimension);
if writer.is_empty(rtxn)? {
continue;
}
writer.contains_item(rtxn, item)?
} }
writer.contains_item(rtxn, item)?
} else { } else {
let writer = hannoy::Writer::new(self.angular_db(), index, dimension); if self.quantized {
if writer.is_empty(rtxn)? { let writer = hannoy::Writer::new(self.quantized_db(), index, dimension);
continue; if writer.is_empty(rtxn)? {
continue;
}
writer.contains_item(rtxn, item)?
} else {
let writer = hannoy::Writer::new(self.angular_db(), index, dimension);
if writer.is_empty(rtxn)? {
continue;
}
writer.contains_item(rtxn, item)?
} }
writer.contains_item(rtxn, item)?
}; };
if contains { if contains {
return Ok(contains); return Ok(contains);
@ -509,14 +578,53 @@ impl VectorStore {
item: ItemId, item: ItemId,
limit: usize, limit: usize,
filter: Option<&RoaringBitmap>, filter: Option<&RoaringBitmap>,
) -> Result<Vec<(ItemId, f32)>, hannoy::Error> { ) -> crate::Result<Vec<(ItemId, f32)>> {
if self.quantized { if self.version_uses_arroy() {
self._nns_by_item(rtxn, self.quantized_db(), item, limit, filter) if self.quantized {
self._arroy_nns_by_item(rtxn, self.arroy_quantized_db(), item, limit, filter)
.map_err(Into::into)
} else {
self._arroy_nns_by_item(rtxn, self.arroy_angular_db(), item, limit, filter)
.map_err(Into::into)
}
} else { } else {
self._nns_by_item(rtxn, self.angular_db(), item, limit, filter) if self.quantized {
self._nns_by_item(rtxn, self.quantized_db(), item, limit, filter)
.map_err(Into::into)
} else {
self._nns_by_item(rtxn, self.angular_db(), item, limit, filter).map_err(Into::into)
}
} }
} }
fn _arroy_nns_by_item<D: arroy::Distance>(
&self,
rtxn: &RoTxn,
db: arroy::Database<D>,
item: ItemId,
limit: usize,
filter: Option<&RoaringBitmap>,
) -> Result<Vec<(ItemId, f32)>, arroy::Error> {
let mut results = Vec::new();
for reader in self.arroy_readers(rtxn, db) {
let reader = reader?;
let mut searcher = reader.nns(limit);
if let Some(filter) = filter {
if reader.item_ids().is_disjoint(filter) {
continue;
}
searcher.candidates(filter);
}
if let Some(mut ret) = searcher.by_item(rtxn, item)? {
results.append(&mut ret);
}
}
results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance));
Ok(results)
}
fn _nns_by_item<D: hannoy::Distance>( fn _nns_by_item<D: hannoy::Distance>(
&self, &self,
rtxn: &RoTxn, rtxn: &RoTxn,
@ -552,14 +660,54 @@ impl VectorStore {
vector: &[f32], vector: &[f32],
limit: usize, limit: usize,
filter: Option<&RoaringBitmap>, filter: Option<&RoaringBitmap>,
) -> Result<Vec<(ItemId, f32)>, hannoy::Error> { ) -> crate::Result<Vec<(ItemId, f32)>> {
if self.quantized { if self.version_uses_arroy() {
self._nns_by_vector(rtxn, self.quantized_db(), vector, limit, filter) if self.quantized {
self._arroy_nns_by_vector(rtxn, self.arroy_quantized_db(), vector, limit, filter)
.map_err(Into::into)
} else {
self._arroy_nns_by_vector(rtxn, self.arroy_angular_db(), vector, limit, filter)
.map_err(Into::into)
}
} else { } else {
self._nns_by_vector(rtxn, self.angular_db(), vector, limit, filter) if self.quantized {
self._nns_by_vector(rtxn, self.quantized_db(), vector, limit, filter)
.map_err(Into::into)
} else {
self._nns_by_vector(rtxn, self.angular_db(), vector, limit, filter)
.map_err(Into::into)
}
} }
} }
fn _arroy_nns_by_vector<D: arroy::Distance>(
&self,
rtxn: &RoTxn,
db: arroy::Database<D>,
vector: &[f32],
limit: usize,
filter: Option<&RoaringBitmap>,
) -> Result<Vec<(ItemId, f32)>, arroy::Error> {
let mut results = Vec::new();
for reader in self.arroy_readers(rtxn, db) {
let reader = reader?;
let mut searcher = reader.nns(limit);
if let Some(filter) = filter {
if reader.item_ids().is_disjoint(filter) {
continue;
}
searcher.candidates(filter);
}
results.append(&mut searcher.by_vector(rtxn, vector)?);
}
results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance));
Ok(results)
}
fn _nns_by_vector<D: hannoy::Distance>( fn _nns_by_vector<D: hannoy::Distance>(
&self, &self,
rtxn: &RoTxn, rtxn: &RoTxn,
@ -589,22 +737,39 @@ impl VectorStore {
Ok(results) Ok(results)
} }
pub fn item_vectors(&self, rtxn: &RoTxn, item_id: u32) -> Result<Vec<Vec<f32>>, hannoy::Error> { pub fn item_vectors(&self, rtxn: &RoTxn, item_id: u32) -> crate::Result<Vec<Vec<f32>>> {
let mut vectors = Vec::new(); let mut vectors = Vec::new();
if self.quantized { if self.version_uses_arroy() {
for reader in self.readers(rtxn, self.quantized_db()) { if self.quantized {
if let Some(vec) = reader?.item_vector(rtxn, item_id)? { for reader in self.arroy_readers(rtxn, self.arroy_quantized_db()) {
vectors.push(vec); if let Some(vec) = reader?.item_vector(rtxn, item_id)? {
vectors.push(vec);
}
}
} else {
for reader in self.arroy_readers(rtxn, self.arroy_angular_db()) {
if let Some(vec) = reader?.item_vector(rtxn, item_id)? {
vectors.push(vec);
}
} }
} }
} else { } else {
for reader in self.readers(rtxn, self.angular_db()) { if self.quantized {
if let Some(vec) = reader?.item_vector(rtxn, item_id)? { for reader in self.readers(rtxn, self.quantized_db()) {
vectors.push(vec); if let Some(vec) = reader?.item_vector(rtxn, item_id)? {
vectors.push(vec);
}
}
} else {
for reader in self.readers(rtxn, self.angular_db()) {
if let Some(vec) = reader?.item_vector(rtxn, item_id)? {
vectors.push(vec);
}
} }
} }
} }
Ok(vectors) Ok(vectors)
} }
@ -1300,10 +1465,10 @@ pub const fn is_cuda_enabled() -> bool {
} }
fn vector_store_range_for_embedder(embedder_id: u8) -> impl Iterator<Item = u16> { fn vector_store_range_for_embedder(embedder_id: u8) -> impl Iterator<Item = u16> {
(0..=u8::MAX).map(move |store_id| hannoy_store_for_embedder(embedder_id, store_id)) (0..=u8::MAX).map(move |store_id| vector_store_for_embedder(embedder_id, store_id))
} }
fn hannoy_store_for_embedder(embedder_id: u8, store_id: u8) -> u16 { fn vector_store_for_embedder(embedder_id: u8, store_id: u8) -> u16 {
let embedder_id = (embedder_id as u16) << 8; let embedder_id = (embedder_id as u16) << 8;
embedder_id | (store_id as u16) embedder_id | (store_id as u16)
} }