Adopt neutral terminology where arroy/hannoy would be confusing

This commit is contained in:
Louis Dureuil
2025-09-03 15:08:40 +02:00
parent 0faf495173
commit 13df964564
10 changed files with 75 additions and 78 deletions

View File

@ -320,7 +320,7 @@ async fn binary_quantize_clear_documents() {
} }
"###); "###);
// Make sure the hannoy DB has been cleared // Make sure the vector DB has been cleared
let (documents, _code) = let (documents, _code) =
index.search_post(json!({ "hybrid": { "embedder": "manual" }, "vector": [1, 1, 1] })).await; index.search_post(json!({ "hybrid": { "embedder": "manual" }, "vector": [1, 1, 1] })).await;
snapshot!(documents, @r#" snapshot!(documents, @r#"

View File

@ -10,11 +10,11 @@ use std::str::FromStr;
use meili_snap::{json_string, snapshot}; use meili_snap::{json_string, snapshot};
use meilisearch::option::MaxThreads; use meilisearch::option::MaxThreads;
pub use rest::create_mock;
use crate::common::index::Index; use crate::common::index::Index;
use crate::common::{default_settings, GetAllDocumentsOptions, Server}; use crate::common::{default_settings, GetAllDocumentsOptions, Server};
use crate::json; use crate::json;
pub use rest::create_mock;
pub async fn get_server_vector() -> Server { pub async fn get_server_vector() -> Server {
Server::new().await Server::new().await
@ -684,7 +684,7 @@ async fn clear_documents() {
} }
"###); "###);
// Make sure the hannoy DB has been cleared // Make sure the vector DB has been cleared
let (documents, _code) = let (documents, _code) =
index.search_post(json!({ "vector": [1, 1, 1], "hybrid": {"embedder": "manual"} })).await; index.search_post(json!({ "vector": [1, 1, 1], "hybrid": {"embedder": "manual"} })).await;
snapshot!(documents, @r#" snapshot!(documents, @r#"

View File

@ -236,7 +236,7 @@ async fn reset_embedder_documents() {
} }
"###); "###);
// Make sure the hannoy DB has been cleared // Make sure the vector DB has been cleared
let (documents, _code) = let (documents, _code) =
index.search_post(json!({ "vector": [1, 1, 1], "hybrid": {"embedder": "default"} })).await; index.search_post(json!({ "vector": [1, 1, 1], "hybrid": {"embedder": "default"} })).await;
snapshot!(json_string!(documents), @r###" snapshot!(json_string!(documents), @r###"

View File

@ -142,7 +142,7 @@ enum Command {
#[derive(Clone, ValueEnum)] #[derive(Clone, ValueEnum)]
enum IndexPart { enum IndexPart {
/// Will make the hannoy index hot. /// Will make the vector index hot.
Hannoy, Hannoy,
} }

View File

@ -178,7 +178,7 @@ pub struct Index {
/// Maps the document id, the facet field id and the strings. /// Maps the document id, the facet field id and the strings.
pub field_id_docid_facet_strings: Database<FieldDocIdFacetStringCodec, Str>, pub field_id_docid_facet_strings: Database<FieldDocIdFacetStringCodec, Str>,
/// Maps an embedder name to its id in the hannoy store. /// Maps an embedder name to its id in the vector store.
pub(crate) embedder_category_id: Database<Unspecified, Unspecified>, pub(crate) embedder_category_id: Database<Unspecified, Unspecified>,
/// Vector store based on hannoy™. /// Vector store based on hannoy™.
pub vector_store: hannoy::Database<Unspecified>, pub vector_store: hannoy::Database<Unspecified>,
@ -1881,7 +1881,7 @@ impl Index {
facet_id_is_empty_docids, facet_id_is_empty_docids,
field_id_docid_facet_f64s, field_id_docid_facet_f64s,
field_id_docid_facet_strings, field_id_docid_facet_strings,
vector_store: vector_hannoy, vector_store,
embedder_category_id, embedder_category_id,
documents, documents,
} = self; } = self;
@ -1952,7 +1952,7 @@ impl Index {
"field_id_docid_facet_strings", "field_id_docid_facet_strings",
field_id_docid_facet_strings.stat(rtxn).map(compute_size)?, field_id_docid_facet_strings.stat(rtxn).map(compute_size)?,
); );
sizes.insert("vector_hannoy", vector_hannoy.stat(rtxn).map(compute_size)?); sizes.insert("vector_store", vector_store.stat(rtxn).map(compute_size)?);
sizes.insert("embedder_category_id", embedder_category_id.stat(rtxn).map(compute_size)?); sizes.insert("embedder_category_id", embedder_category_id.stat(rtxn).map(compute_size)?);
sizes.insert("documents", documents.stat(rtxn).map(compute_size)?); sizes.insert("documents", documents.stat(rtxn).map(compute_size)?);

View File

@ -505,7 +505,7 @@ where
for (embedder_name, dimension) in dimension { for (embedder_name, dimension) in dimension {
let wtxn = &mut *self.wtxn; let wtxn = &mut *self.wtxn;
let vector_hannoy = self.index.vector_store; let vector_store = self.index.vector_store;
let cancel = &self.should_abort; let cancel = &self.should_abort;
let embedder_index = let embedder_index =
@ -525,7 +525,7 @@ where
pool.install(|| { pool.install(|| {
let mut writer = let mut writer =
VectorStore::new(backend, vector_hannoy, embedder_index, was_quantized); VectorStore::new(backend, vector_store, embedder_index, was_quantized);
writer.build_and_quantize( writer.build_and_quantize(
wtxn, wtxn,
// In the settings we don't have any progress to share // In the settings we don't have any progress to share

View File

@ -948,13 +948,13 @@ impl<'a, 'i> Transform<'a, 'i> {
else { else {
continue; continue;
}; };
let hannoy = VectorStore::new( let vector_store = VectorStore::new(
backend, backend,
self.index.vector_store, self.index.vector_store,
infos.embedder_id, infos.embedder_id,
was_quantized, was_quantized,
); );
let Some(dimensions) = hannoy.dimensions(wtxn)? else { let Some(dimensions) = vector_store.dimensions(wtxn)? else {
continue; continue;
}; };
for fragment_id in fragment_ids { for fragment_id in fragment_ids {
@ -962,17 +962,17 @@ impl<'a, 'i> Transform<'a, 'i> {
if infos.embedding_status.user_provided_docids().is_empty() { if infos.embedding_status.user_provided_docids().is_empty() {
// no user provided: clear store // no user provided: clear store
hannoy.clear_store(wtxn, *fragment_id, dimensions)?; vector_store.clear_store(wtxn, *fragment_id, dimensions)?;
continue; continue;
} }
// some user provided, remove only the ids that are not user provided // some user provided, remove only the ids that are not user provided
let to_delete = hannoy.items_in_store(wtxn, *fragment_id, |items| { let to_delete = vector_store.items_in_store(wtxn, *fragment_id, |items| {
items - infos.embedding_status.user_provided_docids() items - infos.embedding_status.user_provided_docids()
})?; })?;
for to_delete in to_delete { for to_delete in to_delete {
hannoy.del_item_in_store(wtxn, to_delete, *fragment_id, dimensions)?; vector_store.del_item_in_store(wtxn, to_delete, *fragment_id, dimensions)?;
} }
} }
} }

View File

@ -255,9 +255,9 @@ impl<'a> From<FrameGrantR<'a>> for FrameWithHeader<'a> {
#[repr(u8)] #[repr(u8)]
pub enum EntryHeader { pub enum EntryHeader {
DbOperation(DbOperation), DbOperation(DbOperation),
HannoyDeleteVector(HannoyDeleteVector), DeleteVector(DeleteVector),
HannoySetVectors(HannoySetVectors), SetVectors(SetVectors),
HannoySetVector(HannoySetVector), SetVector(SetVector),
} }
impl EntryHeader { impl EntryHeader {
@ -268,9 +268,9 @@ impl EntryHeader {
const fn variant_id(&self) -> u8 { const fn variant_id(&self) -> u8 {
match self { match self {
EntryHeader::DbOperation(_) => 0, EntryHeader::DbOperation(_) => 0,
EntryHeader::HannoyDeleteVector(_) => 1, EntryHeader::DeleteVector(_) => 1,
EntryHeader::HannoySetVectors(_) => 2, EntryHeader::SetVectors(_) => 2,
EntryHeader::HannoySetVector(_) => 3, EntryHeader::SetVector(_) => 3,
} }
} }
@ -286,26 +286,26 @@ impl EntryHeader {
} }
const fn total_delete_vector_size() -> usize { const fn total_delete_vector_size() -> usize {
Self::variant_size() + mem::size_of::<HannoyDeleteVector>() Self::variant_size() + mem::size_of::<DeleteVector>()
} }
/// The `dimensions` corresponds to the number of `f32` in the embedding. /// The `dimensions` corresponds to the number of `f32` in the embedding.
fn total_set_vectors_size(count: usize, dimensions: usize) -> usize { fn total_set_vectors_size(count: usize, dimensions: usize) -> usize {
let embedding_size = dimensions * mem::size_of::<f32>(); let embedding_size = dimensions * mem::size_of::<f32>();
Self::variant_size() + mem::size_of::<HannoySetVectors>() + embedding_size * count Self::variant_size() + mem::size_of::<SetVectors>() + embedding_size * count
} }
fn total_set_vector_size(dimensions: usize) -> usize { fn total_set_vector_size(dimensions: usize) -> usize {
let embedding_size = dimensions * mem::size_of::<f32>(); let embedding_size = dimensions * mem::size_of::<f32>();
Self::variant_size() + mem::size_of::<HannoySetVector>() + embedding_size Self::variant_size() + mem::size_of::<SetVector>() + embedding_size
} }
fn header_size(&self) -> usize { fn header_size(&self) -> usize {
let payload_size = match self { let payload_size = match self {
EntryHeader::DbOperation(op) => mem::size_of_val(op), EntryHeader::DbOperation(op) => mem::size_of_val(op),
EntryHeader::HannoyDeleteVector(adv) => mem::size_of_val(adv), EntryHeader::DeleteVector(adv) => mem::size_of_val(adv),
EntryHeader::HannoySetVectors(asvs) => mem::size_of_val(asvs), EntryHeader::SetVectors(asvs) => mem::size_of_val(asvs),
EntryHeader::HannoySetVector(asv) => mem::size_of_val(asv), EntryHeader::SetVector(asv) => mem::size_of_val(asv),
}; };
Self::variant_size() + payload_size Self::variant_size() + payload_size
} }
@ -319,19 +319,19 @@ impl EntryHeader {
EntryHeader::DbOperation(header) EntryHeader::DbOperation(header)
} }
1 => { 1 => {
let header_bytes = &remaining[..mem::size_of::<HannoyDeleteVector>()]; let header_bytes = &remaining[..mem::size_of::<DeleteVector>()];
let header = checked::pod_read_unaligned(header_bytes); let header = checked::pod_read_unaligned(header_bytes);
EntryHeader::HannoyDeleteVector(header) EntryHeader::DeleteVector(header)
} }
2 => { 2 => {
let header_bytes = &remaining[..mem::size_of::<HannoySetVectors>()]; let header_bytes = &remaining[..mem::size_of::<SetVectors>()];
let header = checked::pod_read_unaligned(header_bytes); let header = checked::pod_read_unaligned(header_bytes);
EntryHeader::HannoySetVectors(header) EntryHeader::SetVectors(header)
} }
3 => { 3 => {
let header_bytes = &remaining[..mem::size_of::<HannoySetVector>()]; let header_bytes = &remaining[..mem::size_of::<SetVector>()];
let header = checked::pod_read_unaligned(header_bytes); let header = checked::pod_read_unaligned(header_bytes);
EntryHeader::HannoySetVector(header) EntryHeader::SetVector(header)
} }
id => panic!("invalid variant id: {id}"), id => panic!("invalid variant id: {id}"),
} }
@ -341,9 +341,9 @@ impl EntryHeader {
let (first, remaining) = header_bytes.split_first_mut().unwrap(); let (first, remaining) = header_bytes.split_first_mut().unwrap();
let payload_bytes = match self { let payload_bytes = match self {
EntryHeader::DbOperation(op) => bytemuck::bytes_of(op), EntryHeader::DbOperation(op) => bytemuck::bytes_of(op),
EntryHeader::HannoyDeleteVector(adv) => bytemuck::bytes_of(adv), EntryHeader::DeleteVector(adv) => bytemuck::bytes_of(adv),
EntryHeader::HannoySetVectors(asvs) => bytemuck::bytes_of(asvs), EntryHeader::SetVectors(asvs) => bytemuck::bytes_of(asvs),
EntryHeader::HannoySetVector(asv) => bytemuck::bytes_of(asv), EntryHeader::SetVector(asv) => bytemuck::bytes_of(asv),
}; };
*first = self.variant_id(); *first = self.variant_id();
remaining.copy_from_slice(payload_bytes); remaining.copy_from_slice(payload_bytes);
@ -378,7 +378,7 @@ impl DbOperation {
#[derive(Debug, Clone, Copy, NoUninit, CheckedBitPattern)] #[derive(Debug, Clone, Copy, NoUninit, CheckedBitPattern)]
#[repr(transparent)] #[repr(transparent)]
pub struct HannoyDeleteVector { pub struct DeleteVector {
pub docid: DocumentId, pub docid: DocumentId,
} }
@ -386,13 +386,13 @@ pub struct HannoyDeleteVector {
#[repr(C)] #[repr(C)]
/// The embeddings are in the remaining space and represents /// The embeddings are in the remaining space and represents
/// non-aligned [f32] each with dimensions f32s. /// non-aligned [f32] each with dimensions f32s.
pub struct HannoySetVectors { pub struct SetVectors {
pub docid: DocumentId, pub docid: DocumentId,
pub embedder_id: u8, pub embedder_id: u8,
_padding: [u8; 3], _padding: [u8; 3],
} }
impl HannoySetVectors { impl SetVectors {
fn embeddings_bytes<'a>(frame: &'a FrameGrantR<'_>) -> &'a [u8] { fn embeddings_bytes<'a>(frame: &'a FrameGrantR<'_>) -> &'a [u8] {
let skip = EntryHeader::variant_size() + mem::size_of::<Self>(); let skip = EntryHeader::variant_size() + mem::size_of::<Self>();
&frame[skip..] &frame[skip..]
@ -416,14 +416,14 @@ impl HannoySetVectors {
#[repr(C)] #[repr(C)]
/// The embeddings are in the remaining space and represents /// The embeddings are in the remaining space and represents
/// non-aligned [f32] each with dimensions f32s. /// non-aligned [f32] each with dimensions f32s.
pub struct HannoySetVector { pub struct SetVector {
pub docid: DocumentId, pub docid: DocumentId,
pub embedder_id: u8, pub embedder_id: u8,
pub extractor_id: u8, pub extractor_id: u8,
_padding: [u8; 2], _padding: [u8; 2],
} }
impl HannoySetVector { impl SetVector {
fn embeddings_bytes<'a>(frame: &'a FrameGrantR<'_>) -> &'a [u8] { fn embeddings_bytes<'a>(frame: &'a FrameGrantR<'_>) -> &'a [u8] {
let skip = EntryHeader::variant_size() + mem::size_of::<Self>(); let skip = EntryHeader::variant_size() + mem::size_of::<Self>();
&frame[skip..] &frame[skip..]
@ -553,7 +553,7 @@ impl<'b> ExtractorBbqueueSender<'b> {
let refcell = self.producers.get().unwrap(); let refcell = self.producers.get().unwrap();
let mut producer = refcell.0.borrow_mut_or_yield(); let mut producer = refcell.0.borrow_mut_or_yield();
let payload_header = EntryHeader::HannoyDeleteVector(HannoyDeleteVector { docid }); let payload_header = EntryHeader::DeleteVector(DeleteVector { docid });
let total_length = EntryHeader::total_delete_vector_size(); let total_length = EntryHeader::total_delete_vector_size();
if total_length > max_grant { if total_length > max_grant {
panic!("The entry is larger ({total_length} bytes) than the BBQueue max grant ({max_grant} bytes)"); panic!("The entry is larger ({total_length} bytes) than the BBQueue max grant ({max_grant} bytes)");
@ -589,8 +589,8 @@ impl<'b> ExtractorBbqueueSender<'b> {
// to zero to allocate no extra space at all // to zero to allocate no extra space at all
let dimensions = embeddings.first().map_or(0, |emb| emb.len()); let dimensions = embeddings.first().map_or(0, |emb| emb.len());
let hannoy_set_vector = HannoySetVectors { docid, embedder_id, _padding: [0; 3] }; let set_vectors = SetVectors { docid, embedder_id, _padding: [0; 3] };
let payload_header = EntryHeader::HannoySetVectors(hannoy_set_vector); let payload_header = EntryHeader::SetVectors(set_vectors);
let total_length = EntryHeader::total_set_vectors_size(embeddings.len(), dimensions); let total_length = EntryHeader::total_set_vectors_size(embeddings.len(), dimensions);
if total_length > max_grant { if total_length > max_grant {
let mut value_file = tempfile::tempfile().map(BufWriter::new)?; let mut value_file = tempfile::tempfile().map(BufWriter::new)?;
@ -650,9 +650,8 @@ impl<'b> ExtractorBbqueueSender<'b> {
// to zero to allocate no extra space at all // to zero to allocate no extra space at all
let dimensions = embedding.as_ref().map_or(0, |emb| emb.len()); let dimensions = embedding.as_ref().map_or(0, |emb| emb.len());
let hannoy_set_vector = let set_vector = SetVector { docid, embedder_id, extractor_id, _padding: [0; 2] };
HannoySetVector { docid, embedder_id, extractor_id, _padding: [0; 2] }; let payload_header = EntryHeader::SetVector(set_vector);
let payload_header = EntryHeader::HannoySetVector(hannoy_set_vector);
let total_length = EntryHeader::total_set_vector_size(dimensions); let total_length = EntryHeader::total_set_vector_size(dimensions);
if total_length > max_grant { if total_length > max_grant {
let mut value_file = tempfile::tempfile().map(BufWriter::new)?; let mut value_file = tempfile::tempfile().map(BufWriter::new)?;

View File

@ -67,7 +67,7 @@ where
let mut bbbuffers = Vec::new(); let mut bbbuffers = Vec::new();
let finished_extraction = AtomicBool::new(false); let finished_extraction = AtomicBool::new(false);
let hannoy_memory = grenad_parameters.max_memory; let vector_memory = grenad_parameters.max_memory;
let (grenad_parameters, total_bbbuffer_capacity) = let (grenad_parameters, total_bbbuffer_capacity) =
indexer_memory_settings(pool.current_num_threads(), grenad_parameters); indexer_memory_settings(pool.current_num_threads(), grenad_parameters);
@ -132,7 +132,7 @@ where
let vector_arroy = index.vector_store; let vector_arroy = index.vector_store;
let backend = index.get_vector_store(wtxn)?; let backend = index.get_vector_store(wtxn)?;
let hannoy_writers: Result<HashMap<_, _>> = embedders let vector_stores: Result<HashMap<_, _>> = embedders
.inner_as_ref() .inner_as_ref()
.iter() .iter()
.map(|(embedder_name, runtime)| { .map(|(embedder_name, runtime)| {
@ -155,10 +155,10 @@ where
}) })
.collect(); .collect();
let mut hannoy_writers = hannoy_writers?; let mut vector_stores = vector_stores?;
let congestion = let congestion =
write_to_db(writer_receiver, finished_extraction, index, wtxn, &hannoy_writers)?; write_to_db(writer_receiver, finished_extraction, index, wtxn, &vector_stores)?;
indexing_context.progress.update_progress(IndexingStep::WaitingForExtractors); indexing_context.progress.update_progress(IndexingStep::WaitingForExtractors);
@ -172,8 +172,8 @@ where
wtxn, wtxn,
indexing_context.progress, indexing_context.progress,
index_embeddings, index_embeddings,
hannoy_memory, vector_memory,
&mut hannoy_writers, &mut vector_stores,
None, None,
&indexing_context.must_stop_processing, &indexing_context.must_stop_processing,
) )
@ -229,7 +229,7 @@ where
let mut bbbuffers = Vec::new(); let mut bbbuffers = Vec::new();
let finished_extraction = AtomicBool::new(false); let finished_extraction = AtomicBool::new(false);
let hannoy_memory = grenad_parameters.max_memory; let vector_memory = grenad_parameters.max_memory;
let (grenad_parameters, total_bbbuffer_capacity) = let (grenad_parameters, total_bbbuffer_capacity) =
indexer_memory_settings(pool.current_num_threads(), grenad_parameters); indexer_memory_settings(pool.current_num_threads(), grenad_parameters);
@ -286,7 +286,7 @@ where
let new_embedders = settings_delta.new_embedders(); let new_embedders = settings_delta.new_embedders();
let embedder_actions = settings_delta.embedder_actions(); let embedder_actions = settings_delta.embedder_actions();
let index_embedder_category_ids = settings_delta.new_embedder_category_id(); let index_embedder_category_ids = settings_delta.new_embedder_category_id();
let mut hannoy_writers = hannoy_writers_from_embedder_actions( let mut vector_stores = vector_stores_from_embedder_actions(
index, index,
wtxn, wtxn,
embedder_actions, embedder_actions,
@ -295,7 +295,7 @@ where
)?; )?;
let congestion = let congestion =
write_to_db(writer_receiver, finished_extraction, index, wtxn, &hannoy_writers)?; write_to_db(writer_receiver, finished_extraction, index, wtxn, &vector_stores)?;
indexing_context.progress.update_progress(IndexingStep::WaitingForExtractors); indexing_context.progress.update_progress(IndexingStep::WaitingForExtractors);
@ -309,8 +309,8 @@ where
wtxn, wtxn,
indexing_context.progress, indexing_context.progress,
index_embeddings, index_embeddings,
hannoy_memory, vector_memory,
&mut hannoy_writers, &mut vector_stores,
Some(embedder_actions), Some(embedder_actions),
&indexing_context.must_stop_processing, &indexing_context.must_stop_processing,
) )
@ -340,7 +340,7 @@ where
Ok(congestion) Ok(congestion)
} }
fn hannoy_writers_from_embedder_actions<'indexer>( fn vector_stores_from_embedder_actions<'indexer>(
index: &Index, index: &Index,
rtxn: &RoTxn, rtxn: &RoTxn,
embedder_actions: &'indexer BTreeMap<String, EmbedderAction>, embedder_actions: &'indexer BTreeMap<String, EmbedderAction>,

View File

@ -23,7 +23,7 @@ pub fn write_to_db(
finished_extraction: &AtomicBool, finished_extraction: &AtomicBool,
index: &Index, index: &Index,
wtxn: &mut RwTxn<'_>, wtxn: &mut RwTxn<'_>,
hannoy_writers: &HashMap<u8, (&str, &Embedder, VectorStore, usize)>, vector_stores: &HashMap<u8, (&str, &Embedder, VectorStore, usize)>,
) -> Result<ChannelCongestion> { ) -> Result<ChannelCongestion> {
// Used by by the HannoySetVector to copy the embedding into an // Used by by the HannoySetVector to copy the embedding into an
// aligned memory area, required by arroy to accept a new vector. // aligned memory area, required by arroy to accept a new vector.
@ -56,7 +56,7 @@ pub fn write_to_db(
ReceiverAction::LargeVectors(large_vectors) => { ReceiverAction::LargeVectors(large_vectors) => {
let LargeVectors { docid, embedder_id, .. } = large_vectors; let LargeVectors { docid, embedder_id, .. } = large_vectors;
let (_, _, writer, dimensions) = let (_, _, writer, dimensions) =
hannoy_writers.get(&embedder_id).expect("requested a missing embedder"); vector_stores.get(&embedder_id).expect("requested a missing embedder");
let mut embeddings = Embeddings::new(*dimensions); let mut embeddings = Embeddings::new(*dimensions);
for embedding in large_vectors.read_embeddings(*dimensions) { for embedding in large_vectors.read_embeddings(*dimensions) {
embeddings.push(embedding.to_vec()).unwrap(); embeddings.push(embedding.to_vec()).unwrap();
@ -68,7 +68,7 @@ pub fn write_to_db(
large_vector @ LargeVector { docid, embedder_id, extractor_id, .. }, large_vector @ LargeVector { docid, embedder_id, extractor_id, .. },
) => { ) => {
let (_, _, writer, dimensions) = let (_, _, writer, dimensions) =
hannoy_writers.get(&embedder_id).expect("requested a missing embedder"); vector_stores.get(&embedder_id).expect("requested a missing embedder");
let embedding = large_vector.read_embedding(*dimensions); let embedding = large_vector.read_embedding(*dimensions);
writer.add_item_in_store(wtxn, docid, extractor_id, embedding)?; writer.add_item_in_store(wtxn, docid, extractor_id, embedding)?;
} }
@ -80,12 +80,12 @@ pub fn write_to_db(
&mut writer_receiver, &mut writer_receiver,
index, index,
wtxn, wtxn,
hannoy_writers, vector_stores,
&mut aligned_embedding, &mut aligned_embedding,
)?; )?;
} }
write_from_bbqueue(&mut writer_receiver, index, wtxn, hannoy_writers, &mut aligned_embedding)?; write_from_bbqueue(&mut writer_receiver, index, wtxn, vector_stores, &mut aligned_embedding)?;
Ok(ChannelCongestion { Ok(ChannelCongestion {
attempts: writer_receiver.sent_messages_attempts(), attempts: writer_receiver.sent_messages_attempts(),
@ -115,8 +115,8 @@ pub fn build_vectors<MSP>(
wtxn: &mut RwTxn<'_>, wtxn: &mut RwTxn<'_>,
progress: &Progress, progress: &Progress,
index_embeddings: Vec<IndexEmbeddingConfig>, index_embeddings: Vec<IndexEmbeddingConfig>,
hannoy_memory: Option<usize>, vector_memory: Option<usize>,
hannoy_writers: &mut HashMap<u8, (&str, &Embedder, VectorStore, usize)>, vector_stores: &mut HashMap<u8, (&str, &Embedder, VectorStore, usize)>,
embeder_actions: Option<&BTreeMap<String, EmbedderAction>>, embeder_actions: Option<&BTreeMap<String, EmbedderAction>>,
must_stop_processing: &MSP, must_stop_processing: &MSP,
) -> Result<()> ) -> Result<()>
@ -129,7 +129,7 @@ where
let seed = rand::random(); let seed = rand::random();
let mut rng = rand::rngs::StdRng::seed_from_u64(seed); let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
for (_index, (embedder_name, _embedder, writer, dimensions)) in hannoy_writers { for (_index, (embedder_name, _embedder, writer, dimensions)) in vector_stores {
let dimensions = *dimensions; let dimensions = *dimensions;
let is_being_quantized = embeder_actions let is_being_quantized = embeder_actions
.and_then(|actions| actions.get(*embedder_name).map(|action| action.is_being_quantized)) .and_then(|actions| actions.get(*embedder_name).map(|action| action.is_being_quantized))
@ -140,7 +140,7 @@ where
&mut rng, &mut rng,
dimensions, dimensions,
is_being_quantized, is_being_quantized,
hannoy_memory, vector_memory,
must_stop_processing, must_stop_processing,
)?; )?;
} }
@ -181,7 +181,7 @@ pub fn write_from_bbqueue(
writer_receiver: &mut WriterBbqueueReceiver<'_>, writer_receiver: &mut WriterBbqueueReceiver<'_>,
index: &Index, index: &Index,
wtxn: &mut RwTxn<'_>, wtxn: &mut RwTxn<'_>,
hannoy_writers: &HashMap<u8, (&str, &crate::vector::Embedder, VectorStore, usize)>, vector_stores: &HashMap<u8, (&str, &crate::vector::Embedder, VectorStore, usize)>,
aligned_embedding: &mut Vec<f32>, aligned_embedding: &mut Vec<f32>,
) -> crate::Result<()> { ) -> crate::Result<()> {
while let Some(frame_with_header) = writer_receiver.recv_frame() { while let Some(frame_with_header) = writer_receiver.recv_frame() {
@ -221,17 +221,17 @@ pub fn write_from_bbqueue(
}, },
} }
} }
EntryHeader::HannoyDeleteVector(HannoyDeleteVector { docid }) => { EntryHeader::DeleteVector(DeleteVector { docid }) => {
for (_index, (_name, _embedder, writer, dimensions)) in hannoy_writers { for (_index, (_name, _embedder, writer, dimensions)) in vector_stores {
let dimensions = *dimensions; let dimensions = *dimensions;
writer.del_items(wtxn, dimensions, docid)?; writer.del_items(wtxn, dimensions, docid)?;
} }
} }
EntryHeader::HannoySetVectors(asvs) => { EntryHeader::SetVectors(asvs) => {
let HannoySetVectors { docid, embedder_id, .. } = asvs; let SetVectors { docid, embedder_id, .. } = asvs;
let frame = frame_with_header.frame(); let frame = frame_with_header.frame();
let (_, _, writer, dimensions) = let (_, _, writer, dimensions) =
hannoy_writers.get(&embedder_id).expect("requested a missing embedder"); vector_stores.get(&embedder_id).expect("requested a missing embedder");
let mut embeddings = Embeddings::new(*dimensions); let mut embeddings = Embeddings::new(*dimensions);
let all_embeddings = asvs.read_all_embeddings_into_vec(frame, aligned_embedding); let all_embeddings = asvs.read_all_embeddings_into_vec(frame, aligned_embedding);
writer.del_items(wtxn, *dimensions, docid)?; writer.del_items(wtxn, *dimensions, docid)?;
@ -245,12 +245,10 @@ pub fn write_from_bbqueue(
writer.add_items(wtxn, docid, &embeddings)?; writer.add_items(wtxn, docid, &embeddings)?;
} }
} }
EntryHeader::HannoySetVector( EntryHeader::SetVector(asv @ SetVector { docid, embedder_id, extractor_id, .. }) => {
asv @ HannoySetVector { docid, embedder_id, extractor_id, .. },
) => {
let frame = frame_with_header.frame(); let frame = frame_with_header.frame();
let (_, _, writer, dimensions) = let (_, _, writer, dimensions) =
hannoy_writers.get(&embedder_id).expect("requested a missing embedder"); vector_stores.get(&embedder_id).expect("requested a missing embedder");
let embedding = asv.read_all_embeddings_into_vec(frame, aligned_embedding); let embedding = asv.read_all_embeddings_into_vec(frame, aligned_embedding);
if embedding.is_empty() { if embedding.is_empty() {