From fe22dbd0e32bdc275bd09bd784e62170c42adaf1 Mon Sep 17 00:00:00 2001 From: ManyTheFish Date: Tue, 20 May 2025 16:04:39 +0200 Subject: [PATCH] Update documents by reinserting or removing vector fields based on embedders --- .../milli/src/update/new/document_change.rs | 14 ++- .../milli/src/update/new/extract/documents.rs | 110 +++++++++++++++++- .../milli/src/update/new/indexer/extract.rs | 45 +++++++ crates/milli/src/update/new/indexer/mod.rs | 2 +- .../update/new/indexer/settings_changes.rs | 1 + crates/milli/src/update/settings.rs | 11 ++ 6 files changed, 178 insertions(+), 5 deletions(-) diff --git a/crates/milli/src/update/new/document_change.rs b/crates/milli/src/update/new/document_change.rs index 1eb2ddb44..75b02868b 100644 --- a/crates/milli/src/update/new/document_change.rs +++ b/crates/milli/src/update/new/document_change.rs @@ -319,7 +319,7 @@ impl<'doc> SettingsChangeDocument<'doc> { self.docid } - pub fn external_document_id(&self) -> &'doc str { + pub fn external_docid(&self) -> &'doc str { self.external_document_id } @@ -333,4 +333,16 @@ impl<'doc> SettingsChangeDocument<'doc> { crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid }, )?) } + + pub fn current_vectors<'a, Mapper: FieldIdMapper>( + &self, + rtxn: &'a RoTxn, + index: &'a Index, + mapper: &'a Mapper, + doc_alloc: &'a Bump, + ) -> Result> { + Ok(VectorDocumentFromDb::new(self.docid, index, rtxn, mapper, doc_alloc)?.ok_or( + crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid }, + )?) + } } diff --git a/crates/milli/src/update/new/extract/documents.rs b/crates/milli/src/update/new/extract/documents.rs index 18a1d28e3..446c26069 100644 --- a/crates/milli/src/update/new/extract/documents.rs +++ b/crates/milli/src/update/new/extract/documents.rs @@ -1,16 +1,25 @@ use std::cell::RefCell; +use std::collections::BTreeMap; use bumpalo::Bump; use hashbrown::HashMap; use super::DelAddRoaringBitmap; use crate::constants::RESERVED_GEO_FIELD_NAME; -use crate::update::new::channel::DocumentsSender; +use crate::update::new::channel::{DocumentsSender, ExtractorBbqueueSender}; use crate::update::new::document::{write_to_obkv, Document as _}; -use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor}; +use crate::update::new::document_change::SettingsChangeDocument; +use crate::update::new::indexer::document_changes::{ + DocumentChangeContext, Extractor, IndexingContext, +}; +use crate::update::new::indexer::settings_changes::{ + settings_change_extract, SettingsChangeDocuments, SettingsChangeExtractor, +}; use crate::update::new::ref_cell_ext::RefCellExt as _; -use crate::update::new::thread_local::FullySend; +use crate::update::new::thread_local::{FullySend, ThreadLocal}; use crate::update::new::DocumentChange; +use crate::update::settings::SettingsDelta; +use crate::vector::settings::EmbedderAction; use crate::vector::EmbeddingConfigs; use crate::Result; @@ -162,3 +171,98 @@ impl<'extractor> Extractor<'extractor> for DocumentsExtractor<'_, '_> { Ok(()) } } + +pub struct SettingsChangeDocumentExtractor<'a, 'b> { + document_sender: DocumentsSender<'a, 'b>, + embedder_actions: &'a BTreeMap, +} + +impl<'a, 'b> SettingsChangeDocumentExtractor<'a, 'b> { + pub fn new( + document_sender: DocumentsSender<'a, 'b>, + embedder_actions: &'a BTreeMap, + ) -> Self { + Self { document_sender, embedder_actions } + } +} + +impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeDocumentExtractor<'_, '_> { + type Data = FullySend>; + + fn init_data(&self, _extractor_alloc: &'extractor Bump) -> Result { + Ok(FullySend(Default::default())) + } + + fn process<'doc>( + &self, + documents: impl Iterator>>, + context: &DocumentChangeContext, + ) -> Result<()> { + let mut document_buffer = bumpalo::collections::Vec::new_in(&context.doc_alloc); + + for document in documents { + let document = document?; + // **WARNING**: the exclusive borrow on `new_fields_ids_map` needs to be taken **inside** of the `for change in changes` loop + // Otherwise, `BorrowMutError` will occur for document changes that also need the new_fields_ids_map (e.g.: UpdateByFunction) + let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield(); + + let external_docid = document.external_docid().to_owned(); + let content = + document.current(&context.rtxn, context.index, &context.db_fields_ids_map)?; + let vector_content = document.current_vectors( + &context.rtxn, + context.index, + &context.db_fields_ids_map, + &context.doc_alloc, + )?; + + let content = write_to_obkv( + &content, + Some(&vector_content), + &self.embedder_actions, + &mut new_fields_ids_map, + &mut document_buffer, + )?; + + /// TODO: avoid sending a document without any modifications + self.document_sender.uncompressed(document.docid(), external_docid, content).unwrap(); + } + + Ok(()) + } +} + +/// Modify the database documents based on the settings changes. +/// +/// This function extracts the documents from the database, +/// modifies them by adding or removing vector fields based on embedder actions, +/// and then updates the database. +#[tracing::instrument(level = "trace", skip_all, target = "indexing::documents::extract")] +pub fn update_database_documents<'pl, 'extractor, SCD, MSP, SD>( + document_changes: &SCD, + indexing_context: IndexingContext, + extractor_sender: ExtractorBbqueueSender, + settings_delta: &SD, + extractor_allocs: &'extractor mut ThreadLocal>, +) -> Result<()> +where + MSP: Fn() -> bool + Sync, + SD: SettingsDelta, + SCD: SettingsChangeDocuments<'pl>, +{ + let document_sender = extractor_sender.documents(); + let document_extractor = + SettingsChangeDocumentExtractor::new(document_sender, settings_delta.embedder_actions()); + let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); + + settings_change_extract( + document_changes, + &document_extractor, + indexing_context, + extractor_allocs, + &datastore, + crate::update::new::steps::IndexingStep::ExtractingDocuments, + )?; + + Ok(()) +} diff --git a/crates/milli/src/update/new/indexer/extract.rs b/crates/milli/src/update/new/indexer/extract.rs index bb36ddc37..ce0503b83 100644 --- a/crates/milli/src/update/new/indexer/extract.rs +++ b/crates/milli/src/update/new/indexer/extract.rs @@ -12,12 +12,14 @@ use super::super::steps::IndexingStep; use super::super::thread_local::{FullySend, ThreadLocal}; use super::super::FacetFieldIdsDelta; use super::document_changes::{extract, DocumentChanges, IndexingContext}; +use super::settings_changes::SettingsChangeDocuments; use crate::index::IndexEmbeddingConfig; use crate::progress::MergingWordCache; use crate::proximity::ProximityPrecision; use crate::update::new::extract::EmbeddingExtractor; use crate::update::new::merger::merge_and_send_rtree; use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases}; +use crate::update::settings::SettingsDelta; use crate::vector::EmbeddingConfigs; use crate::{Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder}; @@ -312,6 +314,49 @@ where Result::Ok((facet_field_ids_delta, index_embeddings)) } +pub(super) fn extract_all_settings_changes<'pl, 'extractor, SCD, MSP, SD>( + document_changes: &SCD, + indexing_context: IndexingContext, + indexer_span: Span, + extractor_sender: ExtractorBbqueueSender, + settings_delta: &SD, + extractor_allocs: &'extractor mut ThreadLocal>, + finished_extraction: &AtomicBool, + field_distribution: &mut BTreeMap, + mut index_embeddings: Vec, + document_ids: &mut RoaringBitmap, + modified_docids: &mut RoaringBitmap, +) -> Result> +where + SCD: SettingsChangeDocuments<'pl>, + MSP: Fn() -> bool + Sync, + SD: SettingsDelta, +{ + let span = + tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract"); + let _entered = span.enter(); + + update_database_documents( + document_changes, + indexing_context, + extractor_sender, + settings_delta, + extractor_allocs, + )?; + + 'vectors: { + // TODO: extract embeddings for settings changes + // extract embeddings from new embedders + // remove embeddings for embedders that are no longer in the settings + todo!() + } + + indexing_context.progress.update_progress(IndexingStep::WaitingForDatabaseWrites); + finished_extraction.store(true, std::sync::atomic::Ordering::Relaxed); + + Result::Ok(index_embeddings) +} + fn request_threads() -> &'static ThreadPoolNoAbort { static REQUEST_THREADS: OnceLock = OnceLock::new(); diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index a8348f960..eb55de135 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -32,7 +32,7 @@ mod extract; mod guess_primary_key; mod partial_dump; mod post_processing; -mod settings_changes; +pub mod settings_changes; mod update_by_function; mod write; diff --git a/crates/milli/src/update/new/indexer/settings_changes.rs b/crates/milli/src/update/new/indexer/settings_changes.rs index ac349527d..0cd9cc405 100644 --- a/crates/milli/src/update/new/indexer/settings_changes.rs +++ b/crates/milli/src/update/new/indexer/settings_changes.rs @@ -15,6 +15,7 @@ use crate::update::new::indexer::document_changes::DocumentChangeContext; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _; use crate::update::new::steps::IndexingStep; use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal}; +use crate::update::settings::SettingsDelta; use crate::update::GrenadParameters; use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result}; diff --git a/crates/milli/src/update/settings.rs b/crates/milli/src/update/settings.rs index 7fb589604..5145ff4b2 100644 --- a/crates/milli/src/update/settings.rs +++ b/crates/milli/src/update/settings.rs @@ -2179,6 +2179,17 @@ fn deserialize_sub_embedder( } } +pub trait SettingsDelta { + fn new_embedding_configs(&self) -> &EmbeddingConfigs; + fn embedder_actions(&self) -> &BTreeMap; +} + +impl SettingsDelta for InnerIndexSettingsDiff { + fn embedder_actions(&self) -> &BTreeMap { + &self.embedding_config_updates + } +} + #[cfg(test)] #[path = "test_settings.rs"] mod tests;