Change approach to arroy <-> migration after encountering multiple issues

This commit is contained in:
Louis Dureuil
2025-09-02 17:49:22 +02:00
parent 6b6e69b07a
commit 687260bc13
2 changed files with 150 additions and 81 deletions

View File

@ -1529,6 +1529,8 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
enum VectorStoreBackendChangeIndex {}
let embedder_count = embedding_configs.len();
let rtxn = self.index.read_txn()?;
for (i, config) in embedding_configs.into_iter().enumerate() {
if must_stop_processing() {
return Err(crate::InternalError::AbortedIndexation.into());
@ -1541,13 +1543,20 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
));
let quantized = config.config.quantized();
let embedder_id = embedders.embedder_id(self.wtxn, &config.name)?.unwrap();
let mut vector_store = crate::vector::VectorStore::new(
let vector_store = crate::vector::VectorStore::new(
old_backend,
self.index.vector_store,
embedder_id,
quantized,
);
vector_store.change_backend(self.wtxn, progress.clone(), must_stop_processing)?;
vector_store.change_backend(
&rtxn,
self.wtxn,
progress.clone(),
must_stop_processing,
self.indexer_config.max_memory,
)?;
}
Ok(())

View File

@ -120,35 +120,20 @@ impl VectorStore {
}
pub fn change_backend<MSP>(
&mut self,
self,
rtxn: &RoTxn,
wtxn: &mut RwTxn,
progress: Progress,
must_stop_processing: &MSP,
available_memory: Option<usize>,
) -> crate::Result<()>
where
MSP: Fn() -> bool + Sync,
{
let mut rng = rand::rngs::StdRng::from_entropy();
if self.backend == VectorStoreBackend::Arroy {
self.backend = VectorStoreBackend::Hannoy;
if self.quantized {
let dimensions = self
._arroy_readers(wtxn, self._arroy_quantized_db())
.next()
.transpose()?
.map(|reader| reader.dimensions());
let Some(dimensions) = dimensions else { return Ok(()) };
for index in vector_store_range_for_embedder(self.embedder_index) {
let mut rng = rand::rngs::StdRng::from_entropy();
let writer =
hannoy::Writer::new(self._hannoy_quantized_db(), index, dimensions);
let mut builder = writer.builder(&mut rng).progress(progress.clone());
builder.prepare_arroy_conversion(wtxn)?;
builder.build::<HANNOY_M, HANNOY_M0>(wtxn)?;
}
Ok(())
self._arroy_to_hannoy_bq::<arroy::distances::BinaryQuantizedCosine, hannoy::distances::Hamming, _>(rtxn, wtxn, &progress, &mut rng, &must_stop_processing)
} else {
let dimensions = self
._arroy_readers(wtxn, self._arroy_angular_db())
@ -159,7 +144,6 @@ impl VectorStore {
let Some(dimensions) = dimensions else { return Ok(()) };
for index in vector_store_range_for_embedder(self.embedder_index) {
let mut rng = rand::rngs::StdRng::from_entropy();
let writer = hannoy::Writer::new(self._hannoy_angular_db(), index, dimensions);
let mut builder = writer.builder(&mut rng).progress(progress.clone());
builder.cancel(must_stop_processing);
@ -170,28 +154,11 @@ impl VectorStore {
Ok(())
}
} else {
self.backend = VectorStoreBackend::Arroy;
if self.quantized {
let dimensions = self
._hannoy_readers(wtxn, self._hannoy_quantized_db())
.next()
.transpose()?
.map(|reader| reader.dimensions());
let Some(dimensions) = dimensions else { return Ok(()) };
for index in vector_store_range_for_embedder(self.embedder_index) {
let mut rng = rand::rngs::StdRng::from_entropy();
let writer = arroy::Writer::new(self._arroy_quantized_db(), index, dimensions);
let mut builder = writer.builder(&mut rng);
let builder =
builder.progress(|step| progress.update_progress_from_arroy(step));
builder.prepare_hannoy_conversion(wtxn)?;
builder.build(wtxn)?;
}
Ok(())
self._hannoy_to_arroy_bq::<
hannoy::distances::Hamming,
arroy::distances::BinaryQuantizedCosine,
_>(rtxn, wtxn, &progress, &mut rng, available_memory, &must_stop_processing)
} else {
let dimensions = self
._hannoy_readers(wtxn, self._hannoy_angular_db())
@ -202,7 +169,6 @@ impl VectorStore {
let Some(dimensions) = dimensions else { return Ok(()) };
for index in vector_store_range_for_embedder(self.embedder_index) {
let mut rng = rand::rngs::StdRng::from_entropy();
let writer = arroy::Writer::new(self._arroy_angular_db(), index, dimensions);
let mut builder = writer.builder(&mut rng);
let builder =
@ -232,13 +198,7 @@ impl VectorStore {
if self.quantized {
let writer = arroy::Writer::new(self._arroy_quantized_db(), index, dimension);
if writer.need_build(wtxn)? {
let mut builder = writer.builder(rng);
let builder =
builder.progress(|step| progress.update_progress_from_arroy(step));
builder
.available_memory(available_memory.unwrap_or(usize::MAX))
.cancel(cancel)
.build(wtxn)?;
arroy_build(wtxn, &progress, rng, available_memory, cancel, &writer)?;
} else if writer.is_empty(wtxn)? {
continue;
}
@ -254,21 +214,9 @@ impl VectorStore {
.prepare_changing_distance::<arroy::distances::BinaryQuantizedCosine>(
wtxn,
)?;
let mut builder = writer.builder(rng);
let builder =
builder.progress(|step| progress.update_progress_from_arroy(step));
builder
.available_memory(available_memory.unwrap_or(usize::MAX))
.cancel(cancel)
.build(wtxn)?;
arroy_build(wtxn, &progress, rng, available_memory, cancel, &writer)?;
} else if writer.need_build(wtxn)? {
let mut builder = writer.builder(rng);
let builder =
builder.progress(|step| progress.update_progress_from_arroy(step));
builder
.available_memory(available_memory.unwrap_or(usize::MAX))
.cancel(cancel)
.build(wtxn)?;
arroy_build(wtxn, &progress, rng, available_memory, cancel, &writer)?;
} else if writer.is_empty(wtxn)? {
continue;
}
@ -276,11 +224,7 @@ impl VectorStore {
} else if self.quantized {
let writer = hannoy::Writer::new(self._hannoy_quantized_db(), index, dimension);
if writer.need_build(wtxn)? {
let mut builder = writer.builder(rng).progress(progress.clone());
builder
.cancel(cancel)
.ef_construction(HANNOY_EF_CONSTRUCTION)
.build::<HANNOY_M, HANNOY_M0>(wtxn)?;
hannoy_build(wtxn, &progress, rng, cancel, &writer)?;
} else if writer.is_empty(wtxn)? {
continue;
}
@ -293,17 +237,9 @@ impl VectorStore {
// sensitive.
if quantizing && !self.quantized {
let writer = writer.prepare_changing_distance::<Hamming>(wtxn)?;
let mut builder = writer.builder(rng).progress(progress.clone());
builder
.cancel(cancel)
.ef_construction(HANNOY_EF_CONSTRUCTION)
.build::<HANNOY_M, HANNOY_M0>(wtxn)?;
hannoy_build(wtxn, &progress, rng, cancel, &writer)?;
} else if writer.need_build(wtxn)? {
let mut builder = writer.builder(rng).progress(progress.clone());
builder
.cancel(cancel)
.ef_construction(HANNOY_EF_CONSTRUCTION)
.build::<HANNOY_M, HANNOY_M0>(wtxn)?;
hannoy_build(wtxn, &progress, rng, cancel, &writer)?;
} else if writer.is_empty(wtxn)? {
continue;
}
@ -1101,6 +1037,130 @@ impl VectorStore {
fn _hannoy_quantized_db(&self) -> hannoy::Database<Hamming> {
self.database.remap_data_type()
}
fn _arroy_to_hannoy_bq<AD: arroy::Distance, HD: hannoy::Distance, R>(
self,
arroy_rtxn: &RoTxn,
hannoy_wtxn: &mut RwTxn,
progress: &Progress,
rng: &mut R,
cancel: &(impl Fn() -> bool + Sync + Send),
) -> crate::Result<()>
where
R: rand::Rng + rand::SeedableRng,
{
for index in vector_store_range_for_embedder(self.embedder_index) {
let arroy_reader: arroy::Reader<AD> =
match arroy::Reader::open(arroy_rtxn, index, self.database.remap_types()) {
Ok(reader) => reader,
Err(arroy::Error::MissingMetadata(_)) => continue,
Err(err) => return Err(err.into()),
};
let dimensions = arroy_reader.dimensions();
let hannoy_writer: hannoy::Writer<HD> =
hannoy::Writer::new(self.database.remap_types(), index, dimensions);
hannoy_writer.clear(hannoy_wtxn)?;
for entry in arroy_reader.iter(arroy_rtxn)? {
let (item, mut vector) = entry?;
// arroy bug? the `vector` here can be longer than `dimensions`.
// workaround: truncating.
if vector.len() > dimensions {
vector.truncate(dimensions);
}
hannoy_writer.add_item(hannoy_wtxn, item, &vector)?;
}
hannoy_build(hannoy_wtxn, progress, rng, cancel, &hannoy_writer)?;
}
Ok(())
}
fn _hannoy_to_arroy_bq<HD: hannoy::Distance, AD: arroy::Distance, R>(
self,
hannoy_rtxn: &RoTxn,
arroy_wtxn: &mut RwTxn,
progress: &Progress,
rng: &mut R,
available_memory: Option<usize>,
cancel: &(impl Fn() -> bool + Sync + Send),
) -> crate::Result<()>
where
R: rand::Rng + rand::SeedableRng,
{
for index in vector_store_range_for_embedder(self.embedder_index) {
let hannoy_reader: hannoy::Reader<HD> =
match hannoy::Reader::open(hannoy_rtxn, index, self.database.remap_types()) {
Ok(reader) => reader,
Err(hannoy::Error::MissingMetadata(_)) => continue,
Err(err) => return Err(err.into()),
};
let dimensions = hannoy_reader.dimensions();
let arroy_writer: arroy::Writer<AD> =
arroy::Writer::new(self.database.remap_types(), index, dimensions);
arroy_writer.clear(arroy_wtxn)?;
for entry in hannoy_reader.iter(hannoy_rtxn)? {
let (item, mut vector) = entry?;
// hannoy bug? the `vector` here can be longer than `dimensions`.
// workaround: truncating.
if vector.len() > dimensions {
vector.truncate(dimensions);
}
// arroy and hannoy disagreement over the 0 value
// - arroy does:
// - if x >= 0 => 1
// - if x < 0 => -1
// - hannoy does:
// - if x > 0 => 1
// - if x <= 0 => 0
// because of this, a 0 from a bq hannoy will be converted to a 1 in arroy, destroying the information.
// to fix that, we subtract 0.5 from the hannoy vector, so that any zero value is translated to a strictly
// negative value.
for x in &mut vector {
*x -= 0.5;
}
arroy_writer.add_item(arroy_wtxn, item, &vector)?;
}
arroy_build(arroy_wtxn, progress, rng, available_memory, cancel, &arroy_writer)?;
}
Ok(())
}
}
fn arroy_build<R, D>(
wtxn: &mut RwTxn<'_>,
progress: &Progress,
rng: &mut R,
available_memory: Option<usize>,
cancel: &(impl Fn() -> bool + Sync + Send),
writer: &arroy::Writer<D>,
) -> Result<(), crate::Error>
where
R: rand::Rng + rand::SeedableRng,
D: arroy::Distance,
{
let mut builder = writer.builder(rng);
let builder = builder.progress(|step| progress.update_progress_from_arroy(step));
builder.available_memory(available_memory.unwrap_or(usize::MAX)).cancel(cancel).build(wtxn)?;
Ok(())
}
fn hannoy_build<R, D>(
wtxn: &mut RwTxn<'_>,
progress: &Progress,
rng: &mut R,
cancel: &(impl Fn() -> bool + Sync + Send),
writer: &hannoy::Writer<D>,
) -> Result<(), crate::Error>
where
R: rand::Rng + rand::SeedableRng,
D: hannoy::Distance,
{
let mut builder = writer.builder(rng).progress(progress.clone());
builder
.cancel(cancel)
.ef_construction(HANNOY_EF_CONSTRUCTION)
.build::<HANNOY_M, HANNOY_M0>(wtxn)?;
Ok(())
}
#[derive(Debug, Default, Clone)]