diff --git a/crates/milli/src/update/settings.rs b/crates/milli/src/update/settings.rs index 7b455c42c..bd6e6c301 100644 --- a/crates/milli/src/update/settings.rs +++ b/crates/milli/src/update/settings.rs @@ -1529,6 +1529,8 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> { enum VectorStoreBackendChangeIndex {} let embedder_count = embedding_configs.len(); + let rtxn = self.index.read_txn()?; + for (i, config) in embedding_configs.into_iter().enumerate() { if must_stop_processing() { return Err(crate::InternalError::AbortedIndexation.into()); @@ -1541,13 +1543,20 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> { )); let quantized = config.config.quantized(); let embedder_id = embedders.embedder_id(self.wtxn, &config.name)?.unwrap(); - let mut vector_store = crate::vector::VectorStore::new( + let vector_store = crate::vector::VectorStore::new( old_backend, self.index.vector_store, embedder_id, quantized, ); - vector_store.change_backend(self.wtxn, progress.clone(), must_stop_processing)?; + + vector_store.change_backend( + &rtxn, + self.wtxn, + progress.clone(), + must_stop_processing, + self.indexer_config.max_memory, + )?; } Ok(()) diff --git a/crates/milli/src/vector/store.rs b/crates/milli/src/vector/store.rs index 6a18fab1c..d9caeac03 100644 --- a/crates/milli/src/vector/store.rs +++ b/crates/milli/src/vector/store.rs @@ -120,35 +120,20 @@ impl VectorStore { } pub fn change_backend( - &mut self, + self, + rtxn: &RoTxn, wtxn: &mut RwTxn, progress: Progress, must_stop_processing: &MSP, + available_memory: Option, ) -> crate::Result<()> where MSP: Fn() -> bool + Sync, { + let mut rng = rand::rngs::StdRng::from_entropy(); if self.backend == VectorStoreBackend::Arroy { - self.backend = VectorStoreBackend::Hannoy; if self.quantized { - let dimensions = self - ._arroy_readers(wtxn, self._arroy_quantized_db()) - .next() - .transpose()? - .map(|reader| reader.dimensions()); - - let Some(dimensions) = dimensions else { return Ok(()) }; - - for index in vector_store_range_for_embedder(self.embedder_index) { - let mut rng = rand::rngs::StdRng::from_entropy(); - let writer = - hannoy::Writer::new(self._hannoy_quantized_db(), index, dimensions); - let mut builder = writer.builder(&mut rng).progress(progress.clone()); - builder.prepare_arroy_conversion(wtxn)?; - builder.build::(wtxn)?; - } - - Ok(()) + self._arroy_to_hannoy_bq::(rtxn, wtxn, &progress, &mut rng, &must_stop_processing) } else { let dimensions = self ._arroy_readers(wtxn, self._arroy_angular_db()) @@ -159,7 +144,6 @@ impl VectorStore { let Some(dimensions) = dimensions else { return Ok(()) }; for index in vector_store_range_for_embedder(self.embedder_index) { - let mut rng = rand::rngs::StdRng::from_entropy(); let writer = hannoy::Writer::new(self._hannoy_angular_db(), index, dimensions); let mut builder = writer.builder(&mut rng).progress(progress.clone()); builder.cancel(must_stop_processing); @@ -170,28 +154,11 @@ impl VectorStore { Ok(()) } } else { - self.backend = VectorStoreBackend::Arroy; - if self.quantized { - let dimensions = self - ._hannoy_readers(wtxn, self._hannoy_quantized_db()) - .next() - .transpose()? - .map(|reader| reader.dimensions()); - - let Some(dimensions) = dimensions else { return Ok(()) }; - - for index in vector_store_range_for_embedder(self.embedder_index) { - let mut rng = rand::rngs::StdRng::from_entropy(); - let writer = arroy::Writer::new(self._arroy_quantized_db(), index, dimensions); - let mut builder = writer.builder(&mut rng); - let builder = - builder.progress(|step| progress.update_progress_from_arroy(step)); - builder.prepare_hannoy_conversion(wtxn)?; - builder.build(wtxn)?; - } - - Ok(()) + self._hannoy_to_arroy_bq::< + hannoy::distances::Hamming, + arroy::distances::BinaryQuantizedCosine, + _>(rtxn, wtxn, &progress, &mut rng, available_memory, &must_stop_processing) } else { let dimensions = self ._hannoy_readers(wtxn, self._hannoy_angular_db()) @@ -202,7 +169,6 @@ impl VectorStore { let Some(dimensions) = dimensions else { return Ok(()) }; for index in vector_store_range_for_embedder(self.embedder_index) { - let mut rng = rand::rngs::StdRng::from_entropy(); let writer = arroy::Writer::new(self._arroy_angular_db(), index, dimensions); let mut builder = writer.builder(&mut rng); let builder = @@ -232,13 +198,7 @@ impl VectorStore { if self.quantized { let writer = arroy::Writer::new(self._arroy_quantized_db(), index, dimension); if writer.need_build(wtxn)? { - let mut builder = writer.builder(rng); - let builder = - builder.progress(|step| progress.update_progress_from_arroy(step)); - builder - .available_memory(available_memory.unwrap_or(usize::MAX)) - .cancel(cancel) - .build(wtxn)?; + arroy_build(wtxn, &progress, rng, available_memory, cancel, &writer)?; } else if writer.is_empty(wtxn)? { continue; } @@ -254,21 +214,9 @@ impl VectorStore { .prepare_changing_distance::( wtxn, )?; - let mut builder = writer.builder(rng); - let builder = - builder.progress(|step| progress.update_progress_from_arroy(step)); - builder - .available_memory(available_memory.unwrap_or(usize::MAX)) - .cancel(cancel) - .build(wtxn)?; + arroy_build(wtxn, &progress, rng, available_memory, cancel, &writer)?; } else if writer.need_build(wtxn)? { - let mut builder = writer.builder(rng); - let builder = - builder.progress(|step| progress.update_progress_from_arroy(step)); - builder - .available_memory(available_memory.unwrap_or(usize::MAX)) - .cancel(cancel) - .build(wtxn)?; + arroy_build(wtxn, &progress, rng, available_memory, cancel, &writer)?; } else if writer.is_empty(wtxn)? { continue; } @@ -276,11 +224,7 @@ impl VectorStore { } else if self.quantized { let writer = hannoy::Writer::new(self._hannoy_quantized_db(), index, dimension); if writer.need_build(wtxn)? { - let mut builder = writer.builder(rng).progress(progress.clone()); - builder - .cancel(cancel) - .ef_construction(HANNOY_EF_CONSTRUCTION) - .build::(wtxn)?; + hannoy_build(wtxn, &progress, rng, cancel, &writer)?; } else if writer.is_empty(wtxn)? { continue; } @@ -293,17 +237,9 @@ impl VectorStore { // sensitive. if quantizing && !self.quantized { let writer = writer.prepare_changing_distance::(wtxn)?; - let mut builder = writer.builder(rng).progress(progress.clone()); - builder - .cancel(cancel) - .ef_construction(HANNOY_EF_CONSTRUCTION) - .build::(wtxn)?; + hannoy_build(wtxn, &progress, rng, cancel, &writer)?; } else if writer.need_build(wtxn)? { - let mut builder = writer.builder(rng).progress(progress.clone()); - builder - .cancel(cancel) - .ef_construction(HANNOY_EF_CONSTRUCTION) - .build::(wtxn)?; + hannoy_build(wtxn, &progress, rng, cancel, &writer)?; } else if writer.is_empty(wtxn)? { continue; } @@ -1101,6 +1037,130 @@ impl VectorStore { fn _hannoy_quantized_db(&self) -> hannoy::Database { self.database.remap_data_type() } + + fn _arroy_to_hannoy_bq( + self, + arroy_rtxn: &RoTxn, + hannoy_wtxn: &mut RwTxn, + progress: &Progress, + rng: &mut R, + cancel: &(impl Fn() -> bool + Sync + Send), + ) -> crate::Result<()> + where + R: rand::Rng + rand::SeedableRng, + { + for index in vector_store_range_for_embedder(self.embedder_index) { + let arroy_reader: arroy::Reader = + match arroy::Reader::open(arroy_rtxn, index, self.database.remap_types()) { + Ok(reader) => reader, + Err(arroy::Error::MissingMetadata(_)) => continue, + Err(err) => return Err(err.into()), + }; + let dimensions = arroy_reader.dimensions(); + let hannoy_writer: hannoy::Writer = + hannoy::Writer::new(self.database.remap_types(), index, dimensions); + hannoy_writer.clear(hannoy_wtxn)?; + for entry in arroy_reader.iter(arroy_rtxn)? { + let (item, mut vector) = entry?; + // arroy bug? the `vector` here can be longer than `dimensions`. + // workaround: truncating. + if vector.len() > dimensions { + vector.truncate(dimensions); + } + hannoy_writer.add_item(hannoy_wtxn, item, &vector)?; + } + hannoy_build(hannoy_wtxn, progress, rng, cancel, &hannoy_writer)?; + } + Ok(()) + } + + fn _hannoy_to_arroy_bq( + self, + hannoy_rtxn: &RoTxn, + arroy_wtxn: &mut RwTxn, + progress: &Progress, + rng: &mut R, + available_memory: Option, + cancel: &(impl Fn() -> bool + Sync + Send), + ) -> crate::Result<()> + where + R: rand::Rng + rand::SeedableRng, + { + for index in vector_store_range_for_embedder(self.embedder_index) { + let hannoy_reader: hannoy::Reader = + match hannoy::Reader::open(hannoy_rtxn, index, self.database.remap_types()) { + Ok(reader) => reader, + Err(hannoy::Error::MissingMetadata(_)) => continue, + Err(err) => return Err(err.into()), + }; + let dimensions = hannoy_reader.dimensions(); + let arroy_writer: arroy::Writer = + arroy::Writer::new(self.database.remap_types(), index, dimensions); + arroy_writer.clear(arroy_wtxn)?; + for entry in hannoy_reader.iter(hannoy_rtxn)? { + let (item, mut vector) = entry?; + // hannoy bug? the `vector` here can be longer than `dimensions`. + // workaround: truncating. + if vector.len() > dimensions { + vector.truncate(dimensions); + } + // arroy and hannoy disagreement over the 0 value + // - arroy does: + // - if x >= 0 => 1 + // - if x < 0 => -1 + // - hannoy does: + // - if x > 0 => 1 + // - if x <= 0 => 0 + // because of this, a 0 from a bq hannoy will be converted to a 1 in arroy, destroying the information. + // to fix that, we subtract 0.5 from the hannoy vector, so that any zero value is translated to a strictly + // negative value. + for x in &mut vector { + *x -= 0.5; + } + + arroy_writer.add_item(arroy_wtxn, item, &vector)?; + } + arroy_build(arroy_wtxn, progress, rng, available_memory, cancel, &arroy_writer)?; + } + Ok(()) + } +} + +fn arroy_build( + wtxn: &mut RwTxn<'_>, + progress: &Progress, + rng: &mut R, + available_memory: Option, + cancel: &(impl Fn() -> bool + Sync + Send), + writer: &arroy::Writer, +) -> Result<(), crate::Error> +where + R: rand::Rng + rand::SeedableRng, + D: arroy::Distance, +{ + let mut builder = writer.builder(rng); + let builder = builder.progress(|step| progress.update_progress_from_arroy(step)); + builder.available_memory(available_memory.unwrap_or(usize::MAX)).cancel(cancel).build(wtxn)?; + Ok(()) +} + +fn hannoy_build( + wtxn: &mut RwTxn<'_>, + progress: &Progress, + rng: &mut R, + cancel: &(impl Fn() -> bool + Sync + Send), + writer: &hannoy::Writer, +) -> Result<(), crate::Error> +where + R: rand::Rng + rand::SeedableRng, + D: hannoy::Distance, +{ + let mut builder = writer.builder(rng).progress(progress.clone()); + builder + .cancel(cancel) + .ef_construction(HANNOY_EF_CONSTRUCTION) + .build::(wtxn)?; + Ok(()) } #[derive(Debug, Default, Clone)]