Use Hannoy instead of arroy

This commit is contained in:
Kerollmops
2025-07-21 11:42:46 +02:00
committed by Louis Dureuil
parent 580bfb06b4
commit affcaef556
25 changed files with 380 additions and 356 deletions

View File

@ -3,9 +3,9 @@ use std::num::NonZeroUsize;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use arroy::distances::{BinaryQuantizedCosine, Cosine};
use arroy::ItemId;
use deserr::{DeserializeError, Deserr};
use hannoy::distances::{BinaryQuantizedCosine, Cosine};
use hannoy::ItemId;
use heed::{RoTxn, RwTxn, Unspecified};
use ordered_float::OrderedFloat;
use roaring::RoaringBitmap;
@ -41,15 +41,15 @@ pub type Embedding = Vec<f32>;
pub const REQUEST_PARALLELISM: usize = 40;
pub const MAX_COMPOSITE_DISTANCE: f32 = 0.01;
pub struct ArroyWrapper {
pub struct HannoyWrapper {
quantized: bool,
embedder_index: u8,
database: arroy::Database<Unspecified>,
database: hannoy::Database<Unspecified>,
}
impl ArroyWrapper {
impl HannoyWrapper {
pub fn new(
database: arroy::Database<Unspecified>,
database: hannoy::Database<Unspecified>,
embedder_index: u8,
quantized: bool,
) -> Self {
@ -60,19 +60,19 @@ impl ArroyWrapper {
self.embedder_index
}
fn readers<'a, D: arroy::Distance>(
fn readers<'a, D: hannoy::Distance>(
&'a self,
rtxn: &'a RoTxn<'a>,
db: arroy::Database<D>,
) -> impl Iterator<Item = Result<arroy::Reader<'a, D>, arroy::Error>> + 'a {
arroy_store_range_for_embedder(self.embedder_index).filter_map(move |index| {
match arroy::Reader::open(rtxn, index, db) {
db: hannoy::Database<D>,
) -> impl Iterator<Item = Result<hannoy::Reader<'a, D>, hannoy::Error>> + 'a {
hannoy_store_range_for_embedder(self.embedder_index).filter_map(move |index| {
match hannoy::Reader::open(rtxn, index, db) {
Ok(reader) => match reader.is_empty(rtxn) {
Ok(false) => Some(Ok(reader)),
Ok(true) => None,
Err(e) => Some(Err(e)),
},
Err(arroy::Error::MissingMetadata(_)) => None,
Err(hannoy::Error::MissingMetadata(_)) => None,
Err(e) => Some(Err(e)),
}
})
@ -86,7 +86,7 @@ impl ArroyWrapper {
rtxn: &RoTxn,
store_id: u8,
with_items: F,
) -> Result<O, arroy::Error>
) -> Result<O, hannoy::Error>
where
F: FnOnce(&RoaringBitmap) -> O,
{
@ -97,26 +97,26 @@ impl ArroyWrapper {
}
}
fn _items_in_store<D: arroy::Distance, F, O>(
fn _items_in_store<D: hannoy::Distance, F, O>(
&self,
rtxn: &RoTxn,
db: arroy::Database<D>,
db: hannoy::Database<D>,
store_id: u8,
with_items: F,
) -> Result<O, arroy::Error>
) -> Result<O, hannoy::Error>
where
F: FnOnce(&RoaringBitmap) -> O,
{
let index = arroy_store_for_embedder(self.embedder_index, store_id);
let reader = arroy::Reader::open(rtxn, index, db);
let index = hannoy_store_for_embedder(self.embedder_index, store_id);
let reader = hannoy::Reader::open(rtxn, index, db);
match reader {
Ok(reader) => Ok(with_items(reader.item_ids())),
Err(arroy::Error::MissingMetadata(_)) => Ok(with_items(&RoaringBitmap::new())),
Err(hannoy::Error::MissingMetadata(_)) => Ok(with_items(&RoaringBitmap::new())),
Err(err) => Err(err),
}
}
pub fn dimensions(&self, rtxn: &RoTxn) -> Result<Option<usize>, arroy::Error> {
pub fn dimensions(&self, rtxn: &RoTxn) -> Result<Option<usize>, hannoy::Error> {
if self.quantized {
Ok(self
.readers(rtxn, self.quantized_db())
@ -140,39 +140,41 @@ impl ArroyWrapper {
rng: &mut R,
dimension: usize,
quantizing: bool,
arroy_memory: Option<usize>,
hannoy_memory: Option<usize>,
cancel: &(impl Fn() -> bool + Sync + Send),
) -> Result<(), arroy::Error> {
for index in arroy_store_range_for_embedder(self.embedder_index) {
) -> Result<(), hannoy::Error> {
for index in hannoy_store_range_for_embedder(self.embedder_index) {
if self.quantized {
let writer = arroy::Writer::new(self.quantized_db(), index, dimension);
let writer = hannoy::Writer::new(self.quantized_db(), index, dimension);
if writer.need_build(wtxn)? {
writer.builder(rng).build(wtxn)?
writer.builder(rng).ef_construction(48).build::<16, 32>(wtxn)?
} else if writer.is_empty(wtxn)? {
continue;
}
} else {
let writer = arroy::Writer::new(self.angular_db(), index, dimension);
let writer = hannoy::Writer::new(self.angular_db(), index, dimension);
// If we are quantizing the databases, we can't know from meilisearch
// if the db was empty but still contained the wrong metadata, thus we need
// to quantize everything and can't stop early. Since this operation can
// only happens once in the life of an embedder, it's not very performances
// sensitive.
if quantizing && !self.quantized {
let writer = writer.prepare_changing_distance::<BinaryQuantizedCosine>(wtxn)?;
writer
.builder(rng)
.available_memory(arroy_memory.unwrap_or(usize::MAX))
.progress(|step| progress.update_progress_from_arroy(step))
.cancel(cancel)
.build(wtxn)?;
// let writer = writer.prepare_changing_distance::<BinaryQuantizedCosine>(wtxn)?;
// writer
// .builder(rng)
// .available_memory(hannoy_memory.unwrap_or(usize::MAX))
// .progress(|step| progress.update_progress_from_hannoy(step))
// .cancel(cancel)
// .build(wtxn)?;
unimplemented!("switching from quantized to non-quantized");
} else if writer.need_build(wtxn)? {
writer
.builder(rng)
.available_memory(arroy_memory.unwrap_or(usize::MAX))
.progress(|step| progress.update_progress_from_arroy(step))
.cancel(cancel)
.build(wtxn)?;
.available_memory(hannoy_memory.unwrap_or(usize::MAX))
// .progress(|step| progress.update_progress_from_hannoy(step))
// .cancel(cancel)
.ef_construction(48)
.build::<16, 32>(wtxn)?;
} else if writer.is_empty(wtxn)? {
continue;
}
@ -188,18 +190,18 @@ impl ArroyWrapper {
pub fn add_items(
&self,
wtxn: &mut RwTxn,
item_id: arroy::ItemId,
item_id: hannoy::ItemId,
embeddings: &Embeddings<f32>,
) -> Result<(), arroy::Error> {
) -> Result<(), hannoy::Error> {
let dimension = embeddings.dimension();
for (index, vector) in
arroy_store_range_for_embedder(self.embedder_index).zip(embeddings.iter())
hannoy_store_range_for_embedder(self.embedder_index).zip(embeddings.iter())
{
if self.quantized {
arroy::Writer::new(self.quantized_db(), index, dimension)
hannoy::Writer::new(self.quantized_db(), index, dimension)
.add_item(wtxn, item_id, vector)?
} else {
arroy::Writer::new(self.angular_db(), index, dimension)
hannoy::Writer::new(self.angular_db(), index, dimension)
.add_item(wtxn, item_id, vector)?
}
}
@ -210,9 +212,9 @@ impl ArroyWrapper {
pub fn add_item(
&self,
wtxn: &mut RwTxn,
item_id: arroy::ItemId,
item_id: hannoy::ItemId,
vector: &[f32],
) -> Result<(), arroy::Error> {
) -> Result<(), hannoy::Error> {
if self.quantized {
self._add_item(wtxn, self.quantized_db(), item_id, vector)
} else {
@ -220,17 +222,17 @@ impl ArroyWrapper {
}
}
fn _add_item<D: arroy::Distance>(
fn _add_item<D: hannoy::Distance>(
&self,
wtxn: &mut RwTxn,
db: arroy::Database<D>,
item_id: arroy::ItemId,
db: hannoy::Database<D>,
item_id: hannoy::ItemId,
vector: &[f32],
) -> Result<(), arroy::Error> {
) -> Result<(), hannoy::Error> {
let dimension = vector.len();
for index in arroy_store_range_for_embedder(self.embedder_index) {
let writer = arroy::Writer::new(db, index, dimension);
for index in hannoy_store_range_for_embedder(self.embedder_index) {
let writer = hannoy::Writer::new(db, index, dimension);
if !writer.contains_item(wtxn, item_id)? {
writer.add_item(wtxn, item_id, vector)?;
break;
@ -245,10 +247,10 @@ impl ArroyWrapper {
pub fn add_item_in_store(
&self,
wtxn: &mut RwTxn,
item_id: arroy::ItemId,
item_id: hannoy::ItemId,
store_id: u8,
vector: &[f32],
) -> Result<(), arroy::Error> {
) -> Result<(), hannoy::Error> {
if self.quantized {
self._add_item_in_store(wtxn, self.quantized_db(), item_id, store_id, vector)
} else {
@ -256,18 +258,18 @@ impl ArroyWrapper {
}
}
fn _add_item_in_store<D: arroy::Distance>(
fn _add_item_in_store<D: hannoy::Distance>(
&self,
wtxn: &mut RwTxn,
db: arroy::Database<D>,
item_id: arroy::ItemId,
db: hannoy::Database<D>,
item_id: hannoy::ItemId,
store_id: u8,
vector: &[f32],
) -> Result<(), arroy::Error> {
) -> Result<(), hannoy::Error> {
let dimension = vector.len();
let index = arroy_store_for_embedder(self.embedder_index, store_id);
let writer = arroy::Writer::new(db, index, dimension);
let index = hannoy_store_for_embedder(self.embedder_index, store_id);
let writer = hannoy::Writer::new(db, index, dimension);
writer.add_item(wtxn, item_id, vector)
}
@ -276,14 +278,14 @@ impl ArroyWrapper {
&self,
wtxn: &mut RwTxn,
dimension: usize,
item_id: arroy::ItemId,
) -> Result<(), arroy::Error> {
for index in arroy_store_range_for_embedder(self.embedder_index) {
item_id: hannoy::ItemId,
) -> Result<(), hannoy::Error> {
for index in hannoy_store_range_for_embedder(self.embedder_index) {
if self.quantized {
let writer = arroy::Writer::new(self.quantized_db(), index, dimension);
let writer = hannoy::Writer::new(self.quantized_db(), index, dimension);
writer.del_item(wtxn, item_id)?;
} else {
let writer = arroy::Writer::new(self.angular_db(), index, dimension);
let writer = hannoy::Writer::new(self.angular_db(), index, dimension);
writer.del_item(wtxn, item_id)?;
}
}
@ -301,10 +303,10 @@ impl ArroyWrapper {
pub fn del_item_in_store(
&self,
wtxn: &mut RwTxn,
item_id: arroy::ItemId,
item_id: hannoy::ItemId,
store_id: u8,
dimensions: usize,
) -> Result<bool, arroy::Error> {
) -> Result<bool, hannoy::Error> {
if self.quantized {
self._del_item_in_store(wtxn, self.quantized_db(), item_id, store_id, dimensions)
} else {
@ -312,16 +314,16 @@ impl ArroyWrapper {
}
}
fn _del_item_in_store<D: arroy::Distance>(
fn _del_item_in_store<D: hannoy::Distance>(
&self,
wtxn: &mut RwTxn,
db: arroy::Database<D>,
item_id: arroy::ItemId,
db: hannoy::Database<D>,
item_id: hannoy::ItemId,
store_id: u8,
dimensions: usize,
) -> Result<bool, arroy::Error> {
let index = arroy_store_for_embedder(self.embedder_index, store_id);
let writer = arroy::Writer::new(db, index, dimensions);
) -> Result<bool, hannoy::Error> {
let index = hannoy_store_for_embedder(self.embedder_index, store_id);
let writer = hannoy::Writer::new(db, index, dimensions);
writer.del_item(wtxn, item_id)
}
@ -335,7 +337,7 @@ impl ArroyWrapper {
wtxn: &mut RwTxn,
store_id: u8,
dimensions: usize,
) -> Result<(), arroy::Error> {
) -> Result<(), hannoy::Error> {
if self.quantized {
self._clear_store(wtxn, self.quantized_db(), store_id, dimensions)
} else {
@ -343,15 +345,15 @@ impl ArroyWrapper {
}
}
fn _clear_store<D: arroy::Distance>(
fn _clear_store<D: hannoy::Distance>(
&self,
wtxn: &mut RwTxn,
db: arroy::Database<D>,
db: hannoy::Database<D>,
store_id: u8,
dimensions: usize,
) -> Result<(), arroy::Error> {
let index = arroy_store_for_embedder(self.embedder_index, store_id);
let writer = arroy::Writer::new(db, index, dimensions);
) -> Result<(), hannoy::Error> {
let index = hannoy_store_for_embedder(self.embedder_index, store_id);
let writer = hannoy::Writer::new(db, index, dimensions);
writer.clear(wtxn)
}
@ -359,9 +361,9 @@ impl ArroyWrapper {
pub fn del_item(
&self,
wtxn: &mut RwTxn,
item_id: arroy::ItemId,
item_id: hannoy::ItemId,
vector: &[f32],
) -> Result<bool, arroy::Error> {
) -> Result<bool, hannoy::Error> {
if self.quantized {
self._del_item(wtxn, self.quantized_db(), item_id, vector)
} else {
@ -369,37 +371,34 @@ impl ArroyWrapper {
}
}
fn _del_item<D: arroy::Distance>(
fn _del_item<D: hannoy::Distance>(
&self,
wtxn: &mut RwTxn,
db: arroy::Database<D>,
item_id: arroy::ItemId,
db: hannoy::Database<D>,
item_id: hannoy::ItemId,
vector: &[f32],
) -> Result<bool, arroy::Error> {
) -> Result<bool, hannoy::Error> {
let dimension = vector.len();
for index in arroy_store_range_for_embedder(self.embedder_index) {
let writer = arroy::Writer::new(db, index, dimension);
let Some(candidate) = writer.item_vector(wtxn, item_id)? else {
continue;
};
if candidate == vector {
for index in hannoy_store_range_for_embedder(self.embedder_index) {
let writer = hannoy::Writer::new(db, index, dimension);
if writer.contains_item(wtxn, item_id)? {
return writer.del_item(wtxn, item_id);
}
}
Ok(false)
}
pub fn clear(&self, wtxn: &mut RwTxn, dimension: usize) -> Result<(), arroy::Error> {
for index in arroy_store_range_for_embedder(self.embedder_index) {
pub fn clear(&self, wtxn: &mut RwTxn, dimension: usize) -> Result<(), hannoy::Error> {
for index in hannoy_store_range_for_embedder(self.embedder_index) {
if self.quantized {
let writer = arroy::Writer::new(self.quantized_db(), index, dimension);
let writer = hannoy::Writer::new(self.quantized_db(), index, dimension);
if writer.is_empty(wtxn)? {
continue;
}
writer.clear(wtxn)?;
} else {
let writer = arroy::Writer::new(self.angular_db(), index, dimension);
let writer = hannoy::Writer::new(self.angular_db(), index, dimension);
if writer.is_empty(wtxn)? {
continue;
}
@ -413,17 +412,17 @@ impl ArroyWrapper {
&self,
rtxn: &RoTxn,
dimension: usize,
item: arroy::ItemId,
) -> Result<bool, arroy::Error> {
for index in arroy_store_range_for_embedder(self.embedder_index) {
item: hannoy::ItemId,
) -> Result<bool, hannoy::Error> {
for index in hannoy_store_range_for_embedder(self.embedder_index) {
let contains = if self.quantized {
let writer = arroy::Writer::new(self.quantized_db(), index, dimension);
let writer = hannoy::Writer::new(self.quantized_db(), index, dimension);
if writer.is_empty(rtxn)? {
continue;
}
writer.contains_item(rtxn, item)?
} else {
let writer = arroy::Writer::new(self.angular_db(), index, dimension);
let writer = hannoy::Writer::new(self.angular_db(), index, dimension);
if writer.is_empty(rtxn)? {
continue;
}
@ -442,7 +441,7 @@ impl ArroyWrapper {
item: ItemId,
limit: usize,
filter: Option<&RoaringBitmap>,
) -> Result<Vec<(ItemId, f32)>, arroy::Error> {
) -> Result<Vec<(ItemId, f32)>, hannoy::Error> {
if self.quantized {
self._nns_by_item(rtxn, self.quantized_db(), item, limit, filter)
} else {
@ -450,19 +449,19 @@ impl ArroyWrapper {
}
}
fn _nns_by_item<D: arroy::Distance>(
fn _nns_by_item<D: hannoy::Distance>(
&self,
rtxn: &RoTxn,
db: arroy::Database<D>,
db: hannoy::Database<D>,
item: ItemId,
limit: usize,
filter: Option<&RoaringBitmap>,
) -> Result<Vec<(ItemId, f32)>, arroy::Error> {
) -> Result<Vec<(ItemId, f32)>, hannoy::Error> {
let mut results = Vec::new();
for reader in self.readers(rtxn, db) {
let reader = reader?;
let mut searcher = reader.nns(limit);
let mut searcher = reader.nns(limit, limit * 2); // TODO find better ef
if let Some(filter) = filter {
if reader.item_ids().is_disjoint(filter) {
continue;
@ -484,7 +483,7 @@ impl ArroyWrapper {
vector: &[f32],
limit: usize,
filter: Option<&RoaringBitmap>,
) -> Result<Vec<(ItemId, f32)>, arroy::Error> {
) -> Result<Vec<(ItemId, f32)>, hannoy::Error> {
if self.quantized {
self._nns_by_vector(rtxn, self.quantized_db(), vector, limit, filter)
} else {
@ -492,19 +491,19 @@ impl ArroyWrapper {
}
}
fn _nns_by_vector<D: arroy::Distance>(
fn _nns_by_vector<D: hannoy::Distance>(
&self,
rtxn: &RoTxn,
db: arroy::Database<D>,
db: hannoy::Database<D>,
vector: &[f32],
limit: usize,
filter: Option<&RoaringBitmap>,
) -> Result<Vec<(ItemId, f32)>, arroy::Error> {
) -> Result<Vec<(ItemId, f32)>, hannoy::Error> {
let mut results = Vec::new();
for reader in self.readers(rtxn, db) {
let reader = reader?;
let mut searcher = reader.nns(limit);
let mut searcher = reader.nns(limit, limit * 2); // TODO find better ef
if let Some(filter) = filter {
if reader.item_ids().is_disjoint(filter) {
continue;
@ -520,7 +519,7 @@ impl ArroyWrapper {
Ok(results)
}
pub fn item_vectors(&self, rtxn: &RoTxn, item_id: u32) -> Result<Vec<Vec<f32>>, arroy::Error> {
pub fn item_vectors(&self, rtxn: &RoTxn, item_id: u32) -> Result<Vec<Vec<f32>>, hannoy::Error> {
let mut vectors = Vec::new();
if self.quantized {
@ -539,19 +538,19 @@ impl ArroyWrapper {
Ok(vectors)
}
fn angular_db(&self) -> arroy::Database<Cosine> {
fn angular_db(&self) -> hannoy::Database<Cosine> {
self.database.remap_data_type()
}
fn quantized_db(&self) -> arroy::Database<BinaryQuantizedCosine> {
fn quantized_db(&self) -> hannoy::Database<BinaryQuantizedCosine> {
self.database.remap_data_type()
}
pub fn aggregate_stats(
&self,
rtxn: &RoTxn,
stats: &mut ArroyStats,
) -> Result<(), arroy::Error> {
stats: &mut HannoyStats,
) -> Result<(), hannoy::Error> {
if self.quantized {
for reader in self.readers(rtxn, self.quantized_db()) {
let reader = reader?;
@ -573,10 +572,11 @@ impl ArroyWrapper {
}
#[derive(Debug, Default, Clone)]
pub struct ArroyStats {
pub struct HannoyStats {
pub number_of_embeddings: u64,
pub documents: RoaringBitmap,
}
/// One or multiple embeddings stored consecutively in a flat vector.
#[derive(Debug, PartialEq)]
pub struct Embeddings<F> {
@ -1221,11 +1221,11 @@ pub const fn is_cuda_enabled() -> bool {
cfg!(feature = "cuda")
}
fn arroy_store_range_for_embedder(embedder_id: u8) -> impl Iterator<Item = u16> {
(0..=u8::MAX).map(move |store_id| arroy_store_for_embedder(embedder_id, store_id))
fn hannoy_store_range_for_embedder(embedder_id: u8) -> impl Iterator<Item = u16> {
(0..=u8::MAX).map(move |store_id| hannoy_store_for_embedder(embedder_id, store_id))
}
fn arroy_store_for_embedder(embedder_id: u8, store_id: u8) -> u16 {
fn hannoy_store_for_embedder(embedder_id: u8, store_id: u8) -> u16 {
let embedder_id = (embedder_id as u16) << 8;
embedder_id | (store_id as u16)
}