diff --git a/Cargo.lock b/Cargo.lock index 9ef2372f7..8fec0620c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -444,9 +444,9 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" [[package]] name = "arroy" -version = "0.6.1" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08e6111f351d004bd13e95ab540721272136fd3218b39d3ec95a2ea1c4e6a0a6" +checksum = "8578a72223dfa13dfd9fc144d15260d134361789ebdea9b16e85a511edc73c7d" dependencies = [ "bytemuck", "byteorder", diff --git a/crates/milli/Cargo.toml b/crates/milli/Cargo.toml index e90d96500..94ac1a09f 100644 --- a/crates/milli/Cargo.toml +++ b/crates/milli/Cargo.toml @@ -87,7 +87,7 @@ rhai = { version = "1.22.2", features = [ "no_time", "sync", ] } -arroy = "0.6.1" +arroy = "0.6.3" hannoy = "0.0.4" rand = "0.8.5" tracing = "0.1.41" @@ -111,7 +111,11 @@ utoipa = { version = "5.4.0", features = [ "openapi_extensions", ] } lru = "0.14.0" -twox-hash = { version = "2.1.1", default-features = false, features = ["std", "xxhash3_64", "xxhash64"] } +twox-hash = { version = "2.1.1", default-features = false, features = [ + "std", + "xxhash3_64", + "xxhash64", +] } [dev-dependencies] mimalloc = { version = "0.1.47", default-features = false } diff --git a/crates/milli/src/progress.rs b/crates/milli/src/progress.rs index eb309b0b0..2aab11f05 100644 --- a/crates/milli/src/progress.rs +++ b/crates/milli/src/progress.rs @@ -5,6 +5,7 @@ use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; +use enum_iterator::Sequence as _; use indexmap::IndexMap; use itertools::Itertools; use serde::Serialize; @@ -95,6 +96,14 @@ impl Progress { durations.drain(..).map(|(name, duration)| (name, format!("{duration:.2?}"))).collect() } + + // TODO: ideally we should expose the progress in a way that let arroy use it directly + pub(crate) fn update_progress_from_arroy(&self, progress: arroy::WriterProgress) { + self.update_progress(progress.main); + if let Some(sub) = progress.sub { + self.update_progress(sub); + } + } } /// Generate the names associated with the durations and push them. @@ -291,3 +300,45 @@ impl Step for Compat { self.0.total().try_into().unwrap_or(u32::MAX) } } + +impl Step for arroy::MainStep { + fn name(&self) -> Cow<'static, str> { + match self { + arroy::MainStep::PreProcessingTheItems => "pre processing the items", + arroy::MainStep::WritingTheDescendantsAndMetadata => { + "writing the descendants and metadata" + } + arroy::MainStep::RetrieveTheUpdatedItems => "retrieve the updated items", + arroy::MainStep::RetrievingTheTreeAndItemNodes => "retrieving the tree and item nodes", + arroy::MainStep::UpdatingTheTrees => "updating the trees", + arroy::MainStep::CreateNewTrees => "create new trees", + arroy::MainStep::WritingNodesToDatabase => "writing nodes to database", + arroy::MainStep::DeleteExtraneousTrees => "delete extraneous trees", + arroy::MainStep::WriteTheMetadata => "write the metadata", + arroy::MainStep::ConvertingHannoyToArroy => "converting hannoy to arroy", + } + .into() + } + + fn current(&self) -> u32 { + *self as u32 + } + + fn total(&self) -> u32 { + Self::CARDINALITY as u32 + } +} + +impl Step for arroy::SubStep { + fn name(&self) -> Cow<'static, str> { + self.unit.into() + } + + fn current(&self) -> u32 { + self.current.load(Ordering::Relaxed) + } + + fn total(&self) -> u32 { + self.max + } +} diff --git a/crates/milli/src/update/upgrade/new_hannoy.rs b/crates/milli/src/update/upgrade/new_hannoy.rs index 29dad98cb..097ec7a7e 100644 --- a/crates/milli/src/update/upgrade/new_hannoy.rs +++ b/crates/milli/src/update/upgrade/new_hannoy.rs @@ -23,9 +23,9 @@ impl UpgradeIndex for Latest_V1_18_New_Hannoy { /// REMOVE THIS FILE, IMPLEMENT CONVERSION AS A SETTING CHANGE let quantized = config.config.quantized(); let embedder_id = embedding_configs.embedder_id(wtxn, &config.name)?.unwrap(); - let vector_store = + let mut vector_store = VectorStore::new(backend, index.vector_store, embedder_id, quantized); - vector_store.convert_from_arroy(wtxn, progress.clone())?; + vector_store.change_backend(wtxn, progress.clone())?; } Ok(false) diff --git a/crates/milli/src/vector/store.rs b/crates/milli/src/vector/store.rs index 7ced91201..cbc11aa77 100644 --- a/crates/milli/src/vector/store.rs +++ b/crates/milli/src/vector/store.rs @@ -173,43 +173,93 @@ impl VectorStore { } } - pub fn convert_from_arroy(&self, wtxn: &mut RwTxn, progress: Progress) -> crate::Result<()> { - if self.quantized { - let dimensions = self - .arroy_readers(wtxn, self.arroy_quantized_db()) - .next() - .transpose()? - .map(|reader| reader.dimensions()); + pub fn change_backend(&mut self, wtxn: &mut RwTxn, progress: Progress) -> crate::Result<()> { + if self.backend == VectorStoreBackend::Arroy { + self.backend = VectorStoreBackend::Hannoy; + if self.quantized { + let dimensions = self + .arroy_readers(wtxn, self.arroy_quantized_db()) + .next() + .transpose()? + .map(|reader| reader.dimensions()); - let Some(dimensions) = dimensions else { return Ok(()) }; + let Some(dimensions) = dimensions else { return Ok(()) }; - for index in vector_store_range_for_embedder(self.embedder_index) { - let mut rng = rand::rngs::StdRng::from_entropy(); - let writer = hannoy::Writer::new(self.quantized_db(), index, dimensions); - let mut builder = writer.builder(&mut rng).progress(progress.clone()); - builder.prepare_arroy_conversion(wtxn)?; - builder.build::(wtxn)?; + for index in vector_store_range_for_embedder(self.embedder_index) { + let mut rng = rand::rngs::StdRng::from_entropy(); + let writer = hannoy::Writer::new(self.quantized_db(), index, dimensions); + let mut builder = writer.builder(&mut rng).progress(progress.clone()); + builder.prepare_arroy_conversion(wtxn)?; + builder.build::(wtxn)?; + } + + Ok(()) + } else { + let dimensions = self + .arroy_readers(wtxn, self.arroy_angular_db()) + .next() + .transpose()? + .map(|reader| reader.dimensions()); + + let Some(dimensions) = dimensions else { return Ok(()) }; + + for index in vector_store_range_for_embedder(self.embedder_index) { + let mut rng = rand::rngs::StdRng::from_entropy(); + let writer = hannoy::Writer::new(self.angular_db(), index, dimensions); + let mut builder = writer.builder(&mut rng).progress(progress.clone()); + builder.prepare_arroy_conversion(wtxn)?; + builder.build::(wtxn)?; + } + + Ok(()) } - - Ok(()) } else { - let dimensions = self - .arroy_readers(wtxn, self.arroy_angular_db()) - .next() - .transpose()? - .map(|reader| reader.dimensions()); + self.backend = VectorStoreBackend::Arroy; - let Some(dimensions) = dimensions else { return Ok(()) }; + if self.quantized { + /// FIXME: make sure that all of the functions in the vector store can work both in arroy and hannoy + /// (the existing implementation was assuming that only reads could happen in arroy) + /// bonus points for making it harder to forget switching on arroy on hannoy when modifying the code + let dimensions = self + .readers(wtxn, self.quantized_db()) + .next() + .transpose()? + .map(|reader| reader.dimensions()); - for index in vector_store_range_for_embedder(self.embedder_index) { - let mut rng = rand::rngs::StdRng::from_entropy(); - let writer = hannoy::Writer::new(self.angular_db(), index, dimensions); - let mut builder = writer.builder(&mut rng).progress(progress.clone()); - builder.prepare_arroy_conversion(wtxn)?; - builder.build::(wtxn)?; + let Some(dimensions) = dimensions else { return Ok(()) }; + + for index in vector_store_range_for_embedder(self.embedder_index) { + let mut rng = rand::rngs::StdRng::from_entropy(); + let writer = arroy::Writer::new(self.arroy_quantized_db(), index, dimensions); + let mut builder = writer.builder(&mut rng); + let builder = + builder.progress(|step| progress.update_progress_from_arroy(step)); + builder.prepare_hannoy_conversion(wtxn)?; + builder.build(wtxn)?; + } + + Ok(()) + } else { + let dimensions = self + .readers(wtxn, self.angular_db()) + .next() + .transpose()? + .map(|reader| reader.dimensions()); + + let Some(dimensions) = dimensions else { return Ok(()) }; + + for index in vector_store_range_for_embedder(self.embedder_index) { + let mut rng = rand::rngs::StdRng::from_entropy(); + let writer = arroy::Writer::new(self.arroy_angular_db(), index, dimensions); + let mut builder = writer.builder(&mut rng); + let builder = + builder.progress(|step| progress.update_progress_from_arroy(step)); + builder.prepare_hannoy_conversion(wtxn)?; + builder.build(wtxn)?; + } + + Ok(()) } - - Ok(()) } }