Write back user provided vectors from deleted embedders

This commit is contained in:
ManyTheFish
2025-06-25 15:56:38 +02:00
parent 31142b3663
commit 51a087b764
3 changed files with 160 additions and 16 deletions

View File

@@ -1,16 +1,25 @@
use std::cell::RefCell;
use std::collections::BTreeMap;
use bumpalo::Bump;
use hashbrown::HashMap;
use super::DelAddRoaringBitmap;
use crate::constants::RESERVED_GEO_FIELD_NAME;
use crate::update::new::channel::DocumentsSender;
use crate::update::new::channel::{DocumentsSender, ExtractorBbqueueSender};
use crate::update::new::document::{write_to_obkv, Document as _};
use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor};
use crate::update::new::document_change::DatabaseDocument;
use crate::update::new::indexer::document_changes::{
DocumentChangeContext, Extractor, IndexingContext,
};
use crate::update::new::indexer::settings_changes::{
settings_change_extract, DatabaseDocuments, SettingsChangeExtractor,
};
use crate::update::new::ref_cell_ext::RefCellExt as _;
use crate::update::new::thread_local::FullySend;
use crate::update::new::thread_local::{FullySend, ThreadLocal};
use crate::update::new::DocumentChange;
use crate::update::settings::SettingsDelta;
use crate::vector::settings::EmbedderAction;
use crate::vector::EmbeddingConfigs;
use crate::Result;
@@ -45,6 +54,7 @@ impl<'extractor> Extractor<'extractor> for DocumentsExtractor<'_, '_> {
) -> Result<()> {
let mut document_buffer = bumpalo::collections::Vec::new_in(&context.doc_alloc);
let mut document_extractor_data = context.data.0.borrow_mut_or_yield();
let embedder_actions = &Default::default();
for change in changes {
let change = change?;
@@ -121,9 +131,11 @@ impl<'extractor> Extractor<'extractor> for DocumentsExtractor<'_, '_> {
let content = write_to_obkv(
&content,
vector_content.as_ref(),
embedder_actions,
&mut new_fields_ids_map,
&mut document_buffer,
)?;
self.document_sender.uncompressed(docid, external_docid, content).unwrap();
}
DocumentChange::Insertion(insertion) => {
@@ -146,6 +158,7 @@ impl<'extractor> Extractor<'extractor> for DocumentsExtractor<'_, '_> {
let content = write_to_obkv(
&content,
inserted_vectors.as_ref(),
embedder_actions,
&mut new_fields_ids_map,
&mut document_buffer,
)?;
@@ -158,3 +171,101 @@ impl<'extractor> Extractor<'extractor> for DocumentsExtractor<'_, '_> {
Ok(())
}
}
pub struct SettingsChangeDocumentExtractor<'a, 'b> {
document_sender: DocumentsSender<'a, 'b>,
embedder_actions: &'a BTreeMap<String, EmbedderAction>,
}
impl<'a, 'b> SettingsChangeDocumentExtractor<'a, 'b> {
pub fn new(
document_sender: DocumentsSender<'a, 'b>,
embedder_actions: &'a BTreeMap<String, EmbedderAction>,
) -> Self {
Self { document_sender, embedder_actions }
}
}
impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeDocumentExtractor<'_, '_> {
type Data = FullySend<RefCell<DocumentExtractorData>>;
fn init_data(&self, _extractor_alloc: &'extractor Bump) -> Result<Self::Data> {
Ok(FullySend(Default::default()))
}
fn process<'doc>(
&self,
documents: impl Iterator<Item = Result<DatabaseDocument<'doc>>>,
context: &DocumentChangeContext<Self::Data>,
) -> Result<()> {
let mut document_buffer = bumpalo::collections::Vec::new_in(&context.doc_alloc);
for document in documents {
let document = document?;
// **WARNING**: the exclusive borrow on `new_fields_ids_map` needs to be taken **inside** of the `for change in changes` loop
// Otherwise, `BorrowMutError` will occur for document changes that also need the new_fields_ids_map (e.g.: UpdateByFunction)
let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield();
let external_docid = document.external_document_id().to_owned();
let content =
document.current(&context.rtxn, context.index, &context.db_fields_ids_map)?;
let vector_content = document.current_vectors(
&context.rtxn,
context.index,
&context.db_fields_ids_map,
&context.doc_alloc,
)?;
let content = write_to_obkv(
&content,
Some(&vector_content),
self.embedder_actions,
&mut new_fields_ids_map,
&mut document_buffer,
)?;
self.document_sender.uncompressed(document.docid(), external_docid, content).unwrap();
}
Ok(())
}
}
/// Modify the database documents based on the settings changes.
///
/// This function extracts the documents from the database,
/// modifies them by adding or removing vector fields based on embedder actions,
/// and then updates the database.
#[tracing::instrument(level = "trace", skip_all, target = "indexing::documents::extract")]
pub fn update_database_documents<'indexer, 'extractor, MSP, SD>(
documents: &'indexer DatabaseDocuments<'indexer>,
indexing_context: IndexingContext<MSP>,
extractor_sender: &ExtractorBbqueueSender,
settings_delta: &SD,
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
) -> Result<()>
where
MSP: Fn() -> bool + Sync,
SD: SettingsDelta,
{
// skip if no embedder_actions
if settings_delta.embedder_actions().is_empty() {
return Ok(());
}
let document_sender = extractor_sender.documents();
let document_extractor =
SettingsChangeDocumentExtractor::new(document_sender, settings_delta.embedder_actions());
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
settings_change_extract(
documents,
&document_extractor,
indexing_context,
extractor_allocs,
&datastore,
crate::update::new::steps::IndexingStep::ExtractingDocuments,
)?;
Ok(())
}