Integrate arroy with conversion capabilities

This commit is contained in:
Louis Dureuil
2025-08-28 14:43:04 +02:00
parent da6fffdf6d
commit b2f2807a94
5 changed files with 141 additions and 36 deletions

View File

@ -87,7 +87,7 @@ rhai = { version = "1.22.2", features = [
"no_time",
"sync",
] }
arroy = "0.6.1"
arroy = "0.6.3"
hannoy = "0.0.4"
rand = "0.8.5"
tracing = "0.1.41"
@ -111,7 +111,11 @@ utoipa = { version = "5.4.0", features = [
"openapi_extensions",
] }
lru = "0.14.0"
twox-hash = { version = "2.1.1", default-features = false, features = ["std", "xxhash3_64", "xxhash64"] }
twox-hash = { version = "2.1.1", default-features = false, features = [
"std",
"xxhash3_64",
"xxhash64",
] }
[dev-dependencies]
mimalloc = { version = "0.1.47", default-features = false }

View File

@ -5,6 +5,7 @@ use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use enum_iterator::Sequence as _;
use indexmap::IndexMap;
use itertools::Itertools;
use serde::Serialize;
@ -95,6 +96,14 @@ impl Progress {
durations.drain(..).map(|(name, duration)| (name, format!("{duration:.2?}"))).collect()
}
// TODO: ideally we should expose the progress in a way that let arroy use it directly
pub(crate) fn update_progress_from_arroy(&self, progress: arroy::WriterProgress) {
self.update_progress(progress.main);
if let Some(sub) = progress.sub {
self.update_progress(sub);
}
}
}
/// Generate the names associated with the durations and push them.
@ -291,3 +300,45 @@ impl<T: steppe::Step> Step for Compat<T> {
self.0.total().try_into().unwrap_or(u32::MAX)
}
}
impl Step for arroy::MainStep {
fn name(&self) -> Cow<'static, str> {
match self {
arroy::MainStep::PreProcessingTheItems => "pre processing the items",
arroy::MainStep::WritingTheDescendantsAndMetadata => {
"writing the descendants and metadata"
}
arroy::MainStep::RetrieveTheUpdatedItems => "retrieve the updated items",
arroy::MainStep::RetrievingTheTreeAndItemNodes => "retrieving the tree and item nodes",
arroy::MainStep::UpdatingTheTrees => "updating the trees",
arroy::MainStep::CreateNewTrees => "create new trees",
arroy::MainStep::WritingNodesToDatabase => "writing nodes to database",
arroy::MainStep::DeleteExtraneousTrees => "delete extraneous trees",
arroy::MainStep::WriteTheMetadata => "write the metadata",
arroy::MainStep::ConvertingHannoyToArroy => "converting hannoy to arroy",
}
.into()
}
fn current(&self) -> u32 {
*self as u32
}
fn total(&self) -> u32 {
Self::CARDINALITY as u32
}
}
impl Step for arroy::SubStep {
fn name(&self) -> Cow<'static, str> {
self.unit.into()
}
fn current(&self) -> u32 {
self.current.load(Ordering::Relaxed)
}
fn total(&self) -> u32 {
self.max
}
}

View File

@ -23,9 +23,9 @@ impl UpgradeIndex for Latest_V1_18_New_Hannoy {
/// REMOVE THIS FILE, IMPLEMENT CONVERSION AS A SETTING CHANGE
let quantized = config.config.quantized();
let embedder_id = embedding_configs.embedder_id(wtxn, &config.name)?.unwrap();
let vector_store =
let mut vector_store =
VectorStore::new(backend, index.vector_store, embedder_id, quantized);
vector_store.convert_from_arroy(wtxn, progress.clone())?;
vector_store.change_backend(wtxn, progress.clone())?;
}
Ok(false)

View File

@ -173,43 +173,93 @@ impl VectorStore {
}
}
pub fn convert_from_arroy(&self, wtxn: &mut RwTxn, progress: Progress) -> crate::Result<()> {
if self.quantized {
let dimensions = self
.arroy_readers(wtxn, self.arroy_quantized_db())
.next()
.transpose()?
.map(|reader| reader.dimensions());
pub fn change_backend(&mut self, wtxn: &mut RwTxn, progress: Progress) -> crate::Result<()> {
if self.backend == VectorStoreBackend::Arroy {
self.backend = VectorStoreBackend::Hannoy;
if self.quantized {
let dimensions = self
.arroy_readers(wtxn, self.arroy_quantized_db())
.next()
.transpose()?
.map(|reader| reader.dimensions());
let Some(dimensions) = dimensions else { return Ok(()) };
let Some(dimensions) = dimensions else { return Ok(()) };
for index in vector_store_range_for_embedder(self.embedder_index) {
let mut rng = rand::rngs::StdRng::from_entropy();
let writer = hannoy::Writer::new(self.quantized_db(), index, dimensions);
let mut builder = writer.builder(&mut rng).progress(progress.clone());
builder.prepare_arroy_conversion(wtxn)?;
builder.build::<HANNOY_M, HANNOY_M0>(wtxn)?;
for index in vector_store_range_for_embedder(self.embedder_index) {
let mut rng = rand::rngs::StdRng::from_entropy();
let writer = hannoy::Writer::new(self.quantized_db(), index, dimensions);
let mut builder = writer.builder(&mut rng).progress(progress.clone());
builder.prepare_arroy_conversion(wtxn)?;
builder.build::<HANNOY_M, HANNOY_M0>(wtxn)?;
}
Ok(())
} else {
let dimensions = self
.arroy_readers(wtxn, self.arroy_angular_db())
.next()
.transpose()?
.map(|reader| reader.dimensions());
let Some(dimensions) = dimensions else { return Ok(()) };
for index in vector_store_range_for_embedder(self.embedder_index) {
let mut rng = rand::rngs::StdRng::from_entropy();
let writer = hannoy::Writer::new(self.angular_db(), index, dimensions);
let mut builder = writer.builder(&mut rng).progress(progress.clone());
builder.prepare_arroy_conversion(wtxn)?;
builder.build::<HANNOY_M, HANNOY_M0>(wtxn)?;
}
Ok(())
}
Ok(())
} else {
let dimensions = self
.arroy_readers(wtxn, self.arroy_angular_db())
.next()
.transpose()?
.map(|reader| reader.dimensions());
self.backend = VectorStoreBackend::Arroy;
let Some(dimensions) = dimensions else { return Ok(()) };
if self.quantized {
/// FIXME: make sure that all of the functions in the vector store can work both in arroy and hannoy
/// (the existing implementation was assuming that only reads could happen in arroy)
/// bonus points for making it harder to forget switching on arroy on hannoy when modifying the code
let dimensions = self
.readers(wtxn, self.quantized_db())
.next()
.transpose()?
.map(|reader| reader.dimensions());
for index in vector_store_range_for_embedder(self.embedder_index) {
let mut rng = rand::rngs::StdRng::from_entropy();
let writer = hannoy::Writer::new(self.angular_db(), index, dimensions);
let mut builder = writer.builder(&mut rng).progress(progress.clone());
builder.prepare_arroy_conversion(wtxn)?;
builder.build::<HANNOY_M, HANNOY_M0>(wtxn)?;
let Some(dimensions) = dimensions else { return Ok(()) };
for index in vector_store_range_for_embedder(self.embedder_index) {
let mut rng = rand::rngs::StdRng::from_entropy();
let writer = arroy::Writer::new(self.arroy_quantized_db(), index, dimensions);
let mut builder = writer.builder(&mut rng);
let builder =
builder.progress(|step| progress.update_progress_from_arroy(step));
builder.prepare_hannoy_conversion(wtxn)?;
builder.build(wtxn)?;
}
Ok(())
} else {
let dimensions = self
.readers(wtxn, self.angular_db())
.next()
.transpose()?
.map(|reader| reader.dimensions());
let Some(dimensions) = dimensions else { return Ok(()) };
for index in vector_store_range_for_embedder(self.embedder_index) {
let mut rng = rand::rngs::StdRng::from_entropy();
let writer = arroy::Writer::new(self.arroy_angular_db(), index, dimensions);
let mut builder = writer.builder(&mut rng);
let builder =
builder.progress(|step| progress.update_progress_from_arroy(step));
builder.prepare_hannoy_conversion(wtxn)?;
builder.build(wtxn)?;
}
Ok(())
}
Ok(())
}
}