From 6e4dfa016841def8ba5d3d367399ef3ac6166f35 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Mon, 18 Aug 2025 16:37:38 +0200 Subject: [PATCH] First version of Hannoy dumpless upgrade --- crates/milli/src/update/upgrade/mod.rs | 35 ++++++++-- crates/milli/src/update/upgrade/v1_18.rs | 34 ++++++++++ crates/milli/src/vector/mod.rs | 85 +++++++++++++++++++++--- 3 files changed, 141 insertions(+), 13 deletions(-) create mode 100644 crates/milli/src/update/upgrade/v1_18.rs diff --git a/crates/milli/src/update/upgrade/mod.rs b/crates/milli/src/update/upgrade/mod.rs index 01ad677c7..40826cbe1 100644 --- a/crates/milli/src/update/upgrade/mod.rs +++ b/crates/milli/src/update/upgrade/mod.rs @@ -3,15 +3,18 @@ mod v1_13; mod v1_14; mod v1_15; mod v1_16; +mod v1_18; + use heed::RwTxn; use v1_12::{V1_12_3_To_V1_13_0, V1_12_To_V1_12_3}; use v1_13::{V1_13_0_To_V1_13_1, V1_13_1_To_Latest_V1_13}; use v1_14::Latest_V1_13_To_Latest_V1_14; use v1_15::Latest_V1_14_To_Latest_V1_15; +use v1_16::Latest_V1_15_To_V1_16_0; +use v1_18::Latest_V1_17_To_V1_18_0; use crate::constants::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH}; use crate::progress::{Progress, VariableNameStep}; -use crate::update::upgrade::v1_16::Latest_V1_15_To_V1_16_0; use crate::{Index, InternalError, Result}; trait UpgradeIndex { @@ -34,6 +37,8 @@ const UPGRADE_FUNCTIONS: &[&dyn UpgradeIndex] = &[ &Latest_V1_13_To_Latest_V1_14 {}, &Latest_V1_14_To_Latest_V1_15 {}, &Latest_V1_15_To_V1_16_0 {}, + &ToTargetNoOp { target: (1, 17, 0) }, + &Latest_V1_17_To_V1_18_0 {}, // This is the last upgrade function, it will be called when the index is up to date. // any other upgrade function should be added before this one. &ToCurrentNoOp {}, @@ -62,9 +67,9 @@ const fn start(from: (u32, u32, u32)) -> Option { // We must handle the current version in the match because in case of a failure some index may have been upgraded but not other. (1, 15, _) => function_index!(6), (1, 16, _) => function_index!(7), - (1, 17, _) => function_index!(7), - (1, 18, _) => function_index!(7), - (1, 19, _) => function_index!(7), + (1, 17, _) => function_index!(8), + (1, 18, _) => function_index!(9), + (1, 19, _) => function_index!(9), // We deliberately don't add a placeholder with (VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH) here to force manually // considering dumpless upgrade. (_major, _minor, _patch) => return None, @@ -147,3 +152,25 @@ impl UpgradeIndex for ToCurrentNoOp { (VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH) } } + +/// Perform no operation during the upgrade except changing to the specified target version. +#[allow(non_camel_case_types)] +struct ToTargetNoOp { + pub target: (u32, u32, u32), +} + +impl UpgradeIndex for ToTargetNoOp { + fn upgrade( + &self, + _wtxn: &mut RwTxn, + _index: &Index, + _original: (u32, u32, u32), + _progress: Progress, + ) -> Result { + Ok(false) + } + + fn target_version(&self) -> (u32, u32, u32) { + self.target + } +} diff --git a/crates/milli/src/update/upgrade/v1_18.rs b/crates/milli/src/update/upgrade/v1_18.rs new file mode 100644 index 000000000..e20696b84 --- /dev/null +++ b/crates/milli/src/update/upgrade/v1_18.rs @@ -0,0 +1,34 @@ +use heed::RwTxn; + +use super::UpgradeIndex; +use crate::progress::Progress; +use crate::vector::VectorStore; +use crate::{Index, Result}; + +#[allow(non_camel_case_types)] +pub(super) struct Latest_V1_17_To_V1_18_0(); + +impl UpgradeIndex for Latest_V1_17_To_V1_18_0 { + fn upgrade( + &self, + wtxn: &mut RwTxn, + index: &Index, + _original: (u32, u32, u32), + _progress: Progress, + ) -> Result { + let embedding_configs = index.embedding_configs(); + for config in embedding_configs.embedding_configs(wtxn)? { + // TODO use the embedder name to display progress + let quantized = config.config.quantized(); + let embedder_id = embedding_configs.embedder_id(wtxn, &config.name)?.unwrap(); + let vector_store = VectorStore::new(index.vector_store, embedder_id, quantized); + vector_store.convert_from_arroy(wtxn)?; + } + + Ok(false) + } + + fn target_version(&self) -> (u32, u32, u32) { + (1, 18, 0) + } +} diff --git a/crates/milli/src/vector/mod.rs b/crates/milli/src/vector/mod.rs index bba617782..0a97a9bde 100644 --- a/crates/milli/src/vector/mod.rs +++ b/crates/milli/src/vector/mod.rs @@ -8,6 +8,7 @@ use hannoy::distances::{Cosine, Hamming}; use hannoy::ItemId; use heed::{RoTxn, RwTxn, Unspecified}; use ordered_float::OrderedFloat; +use rand::SeedableRng as _; use roaring::RoaringBitmap; use serde::{Deserialize, Serialize}; use utoipa::ToSchema; @@ -64,12 +65,30 @@ impl VectorStore { self.embedder_index } + fn arroy_readers<'a, D: arroy::Distance>( + &'a self, + rtxn: &'a RoTxn<'a>, + db: arroy::Database, + ) -> impl Iterator, arroy::Error>> + 'a { + vector_store_range_for_embedder(self.embedder_index).filter_map(move |index| { + match arroy::Reader::open(rtxn, index, db) { + Ok(reader) => match reader.is_empty(rtxn) { + Ok(false) => Some(Ok(reader)), + Ok(true) => None, + Err(e) => Some(Err(e)), + }, + Err(arroy::Error::MissingMetadata(_)) => None, + Err(e) => Some(Err(e)), + } + }) + } + fn readers<'a, D: hannoy::Distance>( &'a self, rtxn: &'a RoTxn<'a>, db: hannoy::Database, ) -> impl Iterator, hannoy::Error>> + 'a { - hannoy_store_range_for_embedder(self.embedder_index).filter_map(move |index| { + vector_store_range_for_embedder(self.embedder_index).filter_map(move |index| { match hannoy::Reader::open(rtxn, index, db) { Ok(reader) => match reader.is_empty(rtxn) { Ok(false) => Some(Ok(reader)), @@ -136,6 +155,46 @@ impl VectorStore { } } + pub fn convert_from_arroy(&self, wtxn: &mut RwTxn) -> crate::Result<()> { + if self.quantized { + let dimensions = self + .arroy_readers(wtxn, self.arroy_quantized_db()) + .next() + .transpose()? + .map(|reader| reader.dimensions()); + + let Some(dimensions) = dimensions else { return Ok(()) }; + + for index in vector_store_range_for_embedder(self.embedder_index) { + let mut rng = rand::rngs::StdRng::from_entropy(); + let writer = hannoy::Writer::new(self.quantized_db(), index, dimensions); + let mut builder = writer.builder(&mut rng); + builder.prepare_arroy_conversion(wtxn)?; + builder.build::(wtxn)?; + } + + Ok(()) + } else { + let dimensions = self + .arroy_readers(wtxn, self.arroy_angular_db()) + .next() + .transpose()? + .map(|reader| reader.dimensions()); + + let Some(dimensions) = dimensions else { return Ok(()) }; + + for index in vector_store_range_for_embedder(self.embedder_index) { + let mut rng = rand::rngs::StdRng::from_entropy(); + let writer = hannoy::Writer::new(self.angular_db(), index, dimensions); + let mut builder = writer.builder(&mut rng); + builder.prepare_arroy_conversion(wtxn)?; + builder.build::(wtxn)?; + } + + Ok(()) + } + } + #[allow(clippy::too_many_arguments)] pub fn build_and_quantize( &mut self, @@ -147,7 +206,7 @@ impl VectorStore { hannoy_memory: Option, cancel: &(impl Fn() -> bool + Sync + Send), ) -> Result<(), hannoy::Error> { - for index in hannoy_store_range_for_embedder(self.embedder_index) { + for index in vector_store_range_for_embedder(self.embedder_index) { if self.quantized { let writer = hannoy::Writer::new(self.quantized_db(), index, dimension); if writer.need_build(wtxn)? { @@ -202,7 +261,7 @@ impl VectorStore { ) -> Result<(), hannoy::Error> { let dimension = embeddings.dimension(); for (index, vector) in - hannoy_store_range_for_embedder(self.embedder_index).zip(embeddings.iter()) + vector_store_range_for_embedder(self.embedder_index).zip(embeddings.iter()) { if self.quantized { hannoy::Writer::new(self.quantized_db(), index, dimension) @@ -238,7 +297,7 @@ impl VectorStore { ) -> Result<(), hannoy::Error> { let dimension = vector.len(); - for index in hannoy_store_range_for_embedder(self.embedder_index) { + for index in vector_store_range_for_embedder(self.embedder_index) { let writer = hannoy::Writer::new(db, index, dimension); if !writer.contains_item(wtxn, item_id)? { writer.add_item(wtxn, item_id, vector)?; @@ -287,7 +346,7 @@ impl VectorStore { dimension: usize, item_id: hannoy::ItemId, ) -> Result<(), hannoy::Error> { - for index in hannoy_store_range_for_embedder(self.embedder_index) { + for index in vector_store_range_for_embedder(self.embedder_index) { if self.quantized { let writer = hannoy::Writer::new(self.quantized_db(), index, dimension); writer.del_item(wtxn, item_id)?; @@ -387,7 +446,7 @@ impl VectorStore { ) -> Result { let dimension = vector.len(); - for index in hannoy_store_range_for_embedder(self.embedder_index) { + for index in vector_store_range_for_embedder(self.embedder_index) { let writer = hannoy::Writer::new(db, index, dimension); if writer.contains_item(wtxn, item_id)? { return writer.del_item(wtxn, item_id); @@ -397,7 +456,7 @@ impl VectorStore { } pub fn clear(&self, wtxn: &mut RwTxn, dimension: usize) -> Result<(), hannoy::Error> { - for index in hannoy_store_range_for_embedder(self.embedder_index) { + for index in vector_store_range_for_embedder(self.embedder_index) { if self.quantized { let writer = hannoy::Writer::new(self.quantized_db(), index, dimension); if writer.is_empty(wtxn)? { @@ -421,7 +480,7 @@ impl VectorStore { dimension: usize, item: hannoy::ItemId, ) -> Result { - for index in hannoy_store_range_for_embedder(self.embedder_index) { + for index in vector_store_range_for_embedder(self.embedder_index) { let contains = if self.quantized { let writer = hannoy::Writer::new(self.quantized_db(), index, dimension); if writer.is_empty(rtxn)? { @@ -547,6 +606,14 @@ impl VectorStore { Ok(vectors) } + fn arroy_angular_db(&self) -> arroy::Database { + self.database.remap_types() + } + + fn arroy_quantized_db(&self) -> arroy::Database { + self.database.remap_types() + } + fn angular_db(&self) -> hannoy::Database { self.database.remap_data_type() } @@ -1230,7 +1297,7 @@ pub const fn is_cuda_enabled() -> bool { cfg!(feature = "cuda") } -fn hannoy_store_range_for_embedder(embedder_id: u8) -> impl Iterator { +fn vector_store_range_for_embedder(embedder_id: u8) -> impl Iterator { (0..=u8::MAX).map(move |store_id| hannoy_store_for_embedder(embedder_id, store_id)) }