From 202f1b0c1e07697f286abfdb0f2ab6469b964c82 Mon Sep 17 00:00:00 2001 From: ManyTheFish Date: Wed, 21 May 2025 10:43:27 +0200 Subject: [PATCH] wip --- .../src/scheduler/process_index_operation.rs | 4 +- crates/milli/src/update/new/document.rs | 32 +- .../milli/src/update/new/document_change.rs | 38 +-- .../milli/src/update/new/extract/documents.rs | 22 +- crates/milli/src/update/new/extract/mod.rs | 2 +- .../src/update/new/extract/vectors/mod.rs | 201 ++++++++++++ .../update/new/indexer/document_deletion.rs | 4 +- .../update/new/indexer/document_operation.rs | 4 +- .../milli/src/update/new/indexer/extract.rs | 89 +++++- crates/milli/src/update/new/indexer/mod.rs | 287 +++++++++++++++--- .../update/new/indexer/settings_changes.rs | 90 +++--- .../update/new/indexer/update_by_function.rs | 9 +- crates/milli/src/update/new/indexer/write.rs | 14 +- crates/milli/src/update/new/mod.rs | 2 +- crates/milli/src/update/settings.rs | 70 ++++- 15 files changed, 684 insertions(+), 184 deletions(-) diff --git a/crates/index-scheduler/src/scheduler/process_index_operation.rs b/crates/index-scheduler/src/scheduler/process_index_operation.rs index 093c6209d..f3119d264 100644 --- a/crates/index-scheduler/src/scheduler/process_index_operation.rs +++ b/crates/index-scheduler/src/scheduler/process_index_operation.rs @@ -468,14 +468,14 @@ impl IndexScheduler { } progress.update_progress(SettingsProgress::ApplyTheSettings); - builder + let congestion = builder .execute( |indexing_step| tracing::debug!(update = ?indexing_step), || must_stop_processing.get(), ) .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?; - Ok((tasks, None)) + Ok((tasks, congestion)) } IndexOperation::DocumentClearAndSetting { index_uid, diff --git a/crates/milli/src/update/new/document.rs b/crates/milli/src/update/new/document.rs index 720b83780..f320ac31f 100644 --- a/crates/milli/src/update/new/document.rs +++ b/crates/milli/src/update/new/document.rs @@ -353,21 +353,25 @@ where } } } else { - if embedder_actions.contains_key(name) { - continue; + match embedder_actions.get(name) { + Some(action) if action.write_back().is_none() => { + continue; + } + _ => { + vectors.insert( + name, + if entry.implicit { + serde_json::json!(entry.embeddings) + } else { + serde_json::json!({ + "regenerate": entry.regenerate, + // TODO: consider optimizing the shape of embedders here to store an array of f32 rather than a JSON object + "embeddings": entry.embeddings, + }) + }, + ); + } } - vectors.insert( - name, - if entry.implicit { - serde_json::json!(entry.embeddings) - } else { - serde_json::json!({ - "regenerate": entry.regenerate, - // TODO: consider optimizing the shape of embedders here to store an array of f32 rather than a JSON object - "embeddings": entry.embeddings, - }) - }, - ); } } diff --git a/crates/milli/src/update/new/document_change.rs b/crates/milli/src/update/new/document_change.rs index 75b02868b..2ff96fd24 100644 --- a/crates/milli/src/update/new/document_change.rs +++ b/crates/milli/src/update/new/document_change.rs @@ -14,16 +14,11 @@ use crate::vector::EmbeddingConfigs; use crate::{DocumentId, Index, InternalError, Result}; pub enum DocumentChange<'doc> { - Deletion(Deletion<'doc>), + Deletion(DatabaseDocument<'doc>), Update(Update<'doc>), Insertion(Insertion<'doc>), } -pub struct Deletion<'doc> { - docid: DocumentId, - external_document_id: &'doc str, -} - pub struct Update<'doc> { docid: DocumentId, external_document_id: &'doc str, @@ -37,7 +32,7 @@ pub struct Insertion<'doc> { new: Versions<'doc>, } -pub struct SettingsChangeDocument<'doc> { +pub struct DatabaseDocument<'doc> { docid: DocumentId, external_document_id: &'doc str, } @@ -60,31 +55,6 @@ impl<'doc> DocumentChange<'doc> { } } -impl<'doc> Deletion<'doc> { - pub fn create(docid: DocumentId, external_document_id: &'doc str) -> Self { - Self { docid, external_document_id } - } - - pub fn docid(&self) -> DocumentId { - self.docid - } - - pub fn external_document_id(&self) -> &'doc str { - self.external_document_id - } - - pub fn current<'a, Mapper: FieldIdMapper>( - &self, - rtxn: &'a RoTxn, - index: &'a Index, - mapper: &'a Mapper, - ) -> Result> { - Ok(DocumentFromDb::new(self.docid, rtxn, index, mapper)?.ok_or( - crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid }, - )?) - } -} - impl<'doc> Insertion<'doc> { pub fn create(docid: DocumentId, external_document_id: &'doc str, new: Versions<'doc>) -> Self { Insertion { docid, external_document_id, new } @@ -310,7 +280,7 @@ impl<'doc> Update<'doc> { } } -impl<'doc> SettingsChangeDocument<'doc> { +impl<'doc> DatabaseDocument<'doc> { pub fn create(docid: DocumentId, external_document_id: &'doc str) -> Self { Self { docid, external_document_id } } @@ -319,7 +289,7 @@ impl<'doc> SettingsChangeDocument<'doc> { self.docid } - pub fn external_docid(&self) -> &'doc str { + pub fn external_document_id(&self) -> &'doc str { self.external_document_id } diff --git a/crates/milli/src/update/new/extract/documents.rs b/crates/milli/src/update/new/extract/documents.rs index 446c26069..3098e8572 100644 --- a/crates/milli/src/update/new/extract/documents.rs +++ b/crates/milli/src/update/new/extract/documents.rs @@ -8,12 +8,12 @@ use super::DelAddRoaringBitmap; use crate::constants::RESERVED_GEO_FIELD_NAME; use crate::update::new::channel::{DocumentsSender, ExtractorBbqueueSender}; use crate::update::new::document::{write_to_obkv, Document as _}; -use crate::update::new::document_change::SettingsChangeDocument; +use crate::update::new::document_change::DatabaseDocument; use crate::update::new::indexer::document_changes::{ DocumentChangeContext, Extractor, IndexingContext, }; use crate::update::new::indexer::settings_changes::{ - settings_change_extract, SettingsChangeDocuments, SettingsChangeExtractor, + settings_change_extract, DatabaseDocuments, SettingsChangeExtractor, }; use crate::update::new::ref_cell_ext::RefCellExt as _; use crate::update::new::thread_local::{FullySend, ThreadLocal}; @@ -195,7 +195,7 @@ impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeDocumentE fn process<'doc>( &self, - documents: impl Iterator>>, + documents: impl Iterator>>, context: &DocumentChangeContext, ) -> Result<()> { let mut document_buffer = bumpalo::collections::Vec::new_in(&context.doc_alloc); @@ -206,7 +206,7 @@ impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeDocumentE // Otherwise, `BorrowMutError` will occur for document changes that also need the new_fields_ids_map (e.g.: UpdateByFunction) let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield(); - let external_docid = document.external_docid().to_owned(); + let external_docid = document.external_document_id().to_owned(); let content = document.current(&context.rtxn, context.index, &context.db_fields_ids_map)?; let vector_content = document.current_vectors( @@ -238,25 +238,29 @@ impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeDocumentE /// modifies them by adding or removing vector fields based on embedder actions, /// and then updates the database. #[tracing::instrument(level = "trace", skip_all, target = "indexing::documents::extract")] -pub fn update_database_documents<'pl, 'extractor, SCD, MSP, SD>( - document_changes: &SCD, +pub fn update_database_documents<'indexer, 'extractor, MSP, SD>( + documents: &'indexer DatabaseDocuments<'indexer>, indexing_context: IndexingContext, - extractor_sender: ExtractorBbqueueSender, + extractor_sender: &ExtractorBbqueueSender, settings_delta: &SD, extractor_allocs: &'extractor mut ThreadLocal>, ) -> Result<()> where MSP: Fn() -> bool + Sync, SD: SettingsDelta, - SCD: SettingsChangeDocuments<'pl>, { + // skip if no embedder_actions + if settings_delta.embedder_actions().is_empty() { + return Ok(()); + } + let document_sender = extractor_sender.documents(); let document_extractor = SettingsChangeDocumentExtractor::new(document_sender, settings_delta.embedder_actions()); let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); settings_change_extract( - document_changes, + documents, &document_extractor, indexing_context, extractor_allocs, diff --git a/crates/milli/src/update/new/extract/mod.rs b/crates/milli/src/update/new/extract/mod.rs index 2abefb098..05c90d8f8 100644 --- a/crates/milli/src/update/new/extract/mod.rs +++ b/crates/milli/src/update/new/extract/mod.rs @@ -12,7 +12,7 @@ pub use documents::*; pub use faceted::*; pub use geo::*; pub use searchable::*; -pub use vectors::EmbeddingExtractor; +pub use vectors::{EmbeddingExtractor, SettingsChangeEmbeddingExtractor}; /// TODO move in permissive json pointer pub mod perm_json_p { diff --git a/crates/milli/src/update/new/extract/vectors/mod.rs b/crates/milli/src/update/new/extract/vectors/mod.rs index 43647e786..79227fc1a 100644 --- a/crates/milli/src/update/new/extract/vectors/mod.rs +++ b/crates/milli/src/update/new/extract/vectors/mod.rs @@ -1,4 +1,5 @@ use std::cell::RefCell; +use std::collections::BTreeMap; use bumpalo::collections::Vec as BVec; use bumpalo::Bump; @@ -8,13 +9,16 @@ use super::cache::DelAddRoaringBitmap; use crate::error::FaultSource; use crate::prompt::Prompt; use crate::update::new::channel::EmbeddingSender; +use crate::update::new::document_change::DatabaseDocument; use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor}; +use crate::update::new::indexer::settings_changes::SettingsChangeExtractor; use crate::update::new::thread_local::MostlySend; use crate::update::new::vector_document::VectorDocument; use crate::update::new::DocumentChange; use crate::vector::error::{ EmbedErrorKind, PossibleEmbeddingMistakes, UnusedVectorsDistributionBump, }; +use crate::vector::settings::{EmbedderAction, ReindexAction}; use crate::vector::{Embedder, Embedding, EmbeddingConfigs}; use crate::{DocumentId, FieldDistribution, InternalError, Result, ThreadPoolNoAbort, UserError}; @@ -290,6 +294,203 @@ impl<'extractor> Extractor<'extractor> for EmbeddingExtractor<'_, '_> { } } +pub struct SettingsChangeEmbeddingExtractor<'a, 'b> { + embedders: &'a EmbeddingConfigs, + old_embedders: &'a EmbeddingConfigs, + embedder_actions: &'a BTreeMap, + + sender: EmbeddingSender<'a, 'b>, + possible_embedding_mistakes: PossibleEmbeddingMistakes, + threads: &'a ThreadPoolNoAbort, +} + +impl<'a, 'b> SettingsChangeEmbeddingExtractor<'a, 'b> { + pub fn new( + embedders: &'a EmbeddingConfigs, + old_embedders: &'a EmbeddingConfigs, + embedder_actions: &'a BTreeMap, + sender: EmbeddingSender<'a, 'b>, + field_distribution: &'a FieldDistribution, + threads: &'a ThreadPoolNoAbort, + ) -> Self { + let possible_embedding_mistakes = PossibleEmbeddingMistakes::new(field_distribution); + Self { + embedders, + old_embedders, + embedder_actions, + sender, + threads, + possible_embedding_mistakes, + } + } +} + +impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeEmbeddingExtractor<'_, '_> { + type Data = RefCell>; + + fn init_data<'doc>(&'doc self, extractor_alloc: &'extractor Bump) -> crate::Result { + Ok(RefCell::new(EmbeddingExtractorData(HashMap::new_in(extractor_alloc)))) + } + + fn process<'doc>( + &'doc self, + documents: impl Iterator>>, + context: &'doc DocumentChangeContext, + ) -> crate::Result<()> { + let embedders = self.embedders.inner_as_ref(); + let old_embedders = self.old_embedders.inner_as_ref(); + let unused_vectors_distribution = UnusedVectorsDistributionBump::new_in(&context.doc_alloc); + + let mut all_chunks = BVec::with_capacity_in(embedders.len(), &context.doc_alloc); + for (embedder_name, (embedder, prompt, _is_quantized)) in embedders { + // if the embedder is not in the embedder_actions, we don't need to reindex. + if let Some(reindex_action) = + self.embedder_actions.get(embedder_name).and_then(|action| action.reindex()) + { + let embedder_id = context + .index + .embedder_category_id + .get(&context.rtxn, embedder_name)? + .ok_or_else(|| InternalError::DatabaseMissingEntry { + db_name: "embedder_category_id", + key: None, + })?; + + all_chunks.push(( + Chunks::new( + embedder, + embedder_id, + embedder_name, + prompt, + context.data, + &self.possible_embedding_mistakes, + self.threads, + self.sender, + &context.doc_alloc, + ), + reindex_action, + )) + } + } + + for document in documents { + let document = document?; + + let current_vectors = document.current_vectors( + &context.rtxn, + context.index, + context.db_fields_ids_map, + &context.doc_alloc, + )?; + + for (chunks, reindex_action) in &mut all_chunks { + let embedder_name = chunks.embedder_name(); + let current_vectors = current_vectors.vectors_for_key(embedder_name)?; + + // if the vectors for this document have been already provided, we don't need to reindex. + let (is_new_embedder, must_regenerate) = + current_vectors.as_ref().map_or((true, true), |vectors| { + (!vectors.has_configured_embedder, vectors.regenerate) + }); + + match reindex_action { + ReindexAction::RegeneratePrompts => { + if !must_regenerate { + continue; + } + // we need to regenerate the prompts for the document + + // Get the old prompt and render the document with it + let Some((_, old_prompt, _)) = old_embedders.get(embedder_name) else { + unreachable!("ReindexAction::RegeneratePrompts implies that the embedder {embedder_name} is in the old_embedders") + }; + let old_rendered = old_prompt.render_document( + document.external_document_id(), + document.current( + &context.rtxn, + context.index, + context.db_fields_ids_map, + )?, + context.new_fields_ids_map, + &context.doc_alloc, + )?; + + // Get the new prompt and render the document with it + let new_prompt = chunks.prompt(); + let new_rendered = new_prompt.render_document( + document.external_document_id(), + document.current( + &context.rtxn, + context.index, + context.db_fields_ids_map, + )?, + context.new_fields_ids_map, + &context.doc_alloc, + )?; + + // Compare the rendered documents + // if they are different, regenerate the vectors + if new_rendered != old_rendered { + chunks.set_autogenerated( + document.docid(), + document.external_document_id(), + new_rendered, + &unused_vectors_distribution, + )?; + } + } + ReindexAction::FullReindex => { + let prompt = chunks.prompt(); + // if no inserted vectors, then regenerate: true + no embeddings => autogenerate + if must_regenerate { + let rendered = prompt.render_document( + document.external_document_id(), + document.current( + &context.rtxn, + context.index, + context.db_fields_ids_map, + )?, + context.new_fields_ids_map, + &context.doc_alloc, + )?; + chunks.set_autogenerated( + document.docid(), + document.external_document_id(), + rendered, + &unused_vectors_distribution, + )?; + } else if is_new_embedder { + if let Some(embeddings) = + current_vectors.and_then(|vectors| vectors.embeddings) + { + chunks.set_regenerate(document.docid(), false); + chunks.set_vectors( + document.external_document_id(), + document.docid(), + embeddings + .into_vec(&context.doc_alloc, embedder_name) + .map_err(|error| UserError::InvalidVectorsEmbedderConf { + document_id: document + .external_document_id() + .to_string(), + error: error.to_string(), + })?, + )?; + } + } + } + } + } + } + + for (chunk, _) in all_chunks { + chunk.drain(&unused_vectors_distribution)?; + } + + Ok(()) + } +} + // **Warning**: the destructor of this struct is not normally run, make sure that all its fields: // 1. don't have side effects tied to they destructors // 2. if allocated, are allocated inside of the bumpalo diff --git a/crates/milli/src/update/new/indexer/document_deletion.rs b/crates/milli/src/update/new/indexer/document_deletion.rs index c4a72a2a1..114ce0a69 100644 --- a/crates/milli/src/update/new/indexer/document_deletion.rs +++ b/crates/milli/src/update/new/indexer/document_deletion.rs @@ -7,7 +7,7 @@ use roaring::RoaringBitmap; use super::document_changes::{DocumentChangeContext, DocumentChanges}; use crate::documents::PrimaryKey; use crate::update::new::thread_local::MostlySend; -use crate::update::new::{Deletion, DocumentChange}; +use crate::update::new::{DatabaseDocument, DocumentChange}; use crate::{DocumentId, Result}; #[derive(Default)] @@ -74,7 +74,7 @@ impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> { let external_document_id = external_document_id.to_bump(&context.doc_alloc); - Ok(Some(DocumentChange::Deletion(Deletion::create(*docid, external_document_id)))) + Ok(Some(DocumentChange::Deletion(DatabaseDocument::create(*docid, external_document_id)))) } fn len(&self) -> usize { diff --git a/crates/milli/src/update/new/indexer/document_operation.rs b/crates/milli/src/update/new/indexer/document_operation.rs index ca433c043..70dc5f35c 100644 --- a/crates/milli/src/update/new/indexer/document_operation.rs +++ b/crates/milli/src/update/new/indexer/document_operation.rs @@ -19,7 +19,7 @@ use crate::progress::{AtomicPayloadStep, Progress}; use crate::update::new::document::Versions; use crate::update::new::steps::IndexingStep; use crate::update::new::thread_local::MostlySend; -use crate::update::new::{Deletion, Insertion, Update}; +use crate::update::new::{DatabaseDocument, Insertion, Update}; use crate::update::{AvailableIds, IndexDocumentsMethod}; use crate::{DocumentId, Error, FieldsIdsMap, Index, InternalError, Result, UserError}; @@ -577,7 +577,7 @@ impl<'pl> PayloadOperations<'pl> { if self.is_new { Ok(None) } else { - let deletion = Deletion::create(self.docid, external_doc); + let deletion = DatabaseDocument::create(self.docid, external_doc); Ok(Some(DocumentChange::Deletion(deletion))) } } diff --git a/crates/milli/src/update/new/indexer/extract.rs b/crates/milli/src/update/new/indexer/extract.rs index ce0503b83..cebe8edec 100644 --- a/crates/milli/src/update/new/indexer/extract.rs +++ b/crates/milli/src/update/new/indexer/extract.rs @@ -12,15 +12,20 @@ use super::super::steps::IndexingStep; use super::super::thread_local::{FullySend, ThreadLocal}; use super::super::FacetFieldIdsDelta; use super::document_changes::{extract, DocumentChanges, IndexingContext}; -use super::settings_changes::SettingsChangeDocuments; +use super::settings_changes::settings_change_extract; +use crate::documents::FieldIdMapper; +use crate::documents::PrimaryKey; use crate::index::IndexEmbeddingConfig; use crate::progress::MergingWordCache; use crate::proximity::ProximityPrecision; use crate::update::new::extract::EmbeddingExtractor; +use crate::update::new::indexer::settings_changes::DatabaseDocuments; use crate::update::new::merger::merge_and_send_rtree; use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases}; use crate::update::settings::SettingsDelta; use crate::vector::EmbeddingConfigs; +use crate::Index; +use crate::InternalError; use crate::{Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder}; #[allow(clippy::too_many_arguments)] @@ -314,8 +319,7 @@ where Result::Ok((facet_field_ids_delta, index_embeddings)) } -pub(super) fn extract_all_settings_changes<'pl, 'extractor, SCD, MSP, SD>( - document_changes: &SCD, +pub(super) fn extract_all_settings_changes<'extractor, MSP, SD>( indexing_context: IndexingContext, indexer_span: Span, extractor_sender: ExtractorBbqueueSender, @@ -324,31 +328,76 @@ pub(super) fn extract_all_settings_changes<'pl, 'extractor, SCD, MSP, SD>( finished_extraction: &AtomicBool, field_distribution: &mut BTreeMap, mut index_embeddings: Vec, - document_ids: &mut RoaringBitmap, modified_docids: &mut RoaringBitmap, ) -> Result> where - SCD: SettingsChangeDocuments<'pl>, MSP: Fn() -> bool + Sync, SD: SettingsDelta, { + // Create the list of document ids to extract + let rtxn = indexing_context.index.read_txn()?; + let all_document_ids = + indexing_context.index.documents_ids(&rtxn)?.into_iter().collect::>(); + let primary_key = + primary_key_from_db(&indexing_context.index, &rtxn, &indexing_context.db_fields_ids_map)?; + let documents = DatabaseDocuments::new(&all_document_ids, primary_key); + let span = tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract"); let _entered = span.enter(); update_database_documents( - document_changes, + &documents, indexing_context, - extractor_sender, + &extractor_sender, settings_delta, extractor_allocs, )?; 'vectors: { - // TODO: extract embeddings for settings changes - // extract embeddings from new embedders - // remove embeddings for embedders that are no longer in the settings - todo!() + if settings_delta.embedder_actions().is_empty() { + break 'vectors; + } + + let embedding_sender = extractor_sender.embeddings(); + + // extract the remaining embedders + let extractor = SettingsChangeEmbeddingExtractor::new( + settings_delta.new_embedders(), + settings_delta.old_embedders(), + settings_delta.embedder_actions(), + embedding_sender, + field_distribution, + request_threads(), + ); + let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); + { + let span = tracing::debug_span!(target: "indexing::documents::extract", "vectors"); + let _entered = span.enter(); + + settings_change_extract( + &documents, + &extractor, + indexing_context, + extractor_allocs, + &datastore, + IndexingStep::ExtractingEmbeddings, + )?; + } + { + let span = tracing::debug_span!(target: "indexing::documents::merge", "vectors"); + let _entered = span.enter(); + + for config in &mut index_embeddings { + 'data: for data in datastore.iter_mut() { + let data = &mut data.get_mut().0; + let Some(deladd) = data.remove(&config.name) else { + continue 'data; + }; + deladd.apply_to(&mut config.user_provided, modified_docids); + } + } + } } indexing_context.progress.update_progress(IndexingStep::WaitingForDatabaseWrites); @@ -357,6 +406,24 @@ where Result::Ok(index_embeddings) } +fn primary_key_from_db<'indexer, 'index>( + index: &'indexer Index, + rtxn: &'indexer heed::RoTxn<'index>, + fields: &'indexer impl FieldIdMapper, +) -> Result> { + let Some(primary_key) = index.primary_key(rtxn)? else { + return Err(InternalError::DatabaseMissingEntry { + db_name: crate::index::db_name::MAIN, + key: Some(crate::index::main_key::PRIMARY_KEY_KEY), + } + .into()); + }; + let Some(primary_key) = PrimaryKey::new(primary_key, fields) else { + unreachable!("Primary key must exist at this point"); + }; + Ok(primary_key) +} + fn request_threads() -> &'static ThreadPoolNoAbort { static REQUEST_THREADS: OnceLock = OnceLock::new(); diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index eb55de135..366990dd7 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -1,3 +1,4 @@ +use std::collections::BTreeMap; use std::sync::atomic::AtomicBool; use std::sync::{Once, RwLock}; use std::thread::{self, Builder}; @@ -20,8 +21,10 @@ use super::thread_local::ThreadLocal; use crate::documents::PrimaryKey; use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder}; use crate::progress::Progress; +use crate::update::settings::SettingsDelta; use crate::update::GrenadParameters; -use crate::vector::{ArroyWrapper, EmbeddingConfigs}; +use crate::vector::settings::{EmbedderAction, WriteBackToDocuments}; +use crate::vector::{ArroyWrapper, Embedder, EmbeddingConfigs}; use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort}; pub(crate) mod de; @@ -42,7 +45,7 @@ static LOG_MEMORY_METRICS_ONCE: Once = Once::new(); /// /// Give it the output of the [`Indexer::document_changes`] method and it will execute it in the [`rayon::ThreadPool`]. #[allow(clippy::too_many_arguments)] // clippy: 😝 -pub fn index<'pl, 'indexer, 'index, DC, MSP>( +pub(crate) fn index<'pl, 'indexer, 'index, DC, MSP>( wtxn: &mut RwTxn, index: &'index Index, pool: &ThreadPoolNoAbort, @@ -64,48 +67,8 @@ where let arroy_memory = grenad_parameters.max_memory; - // We reduce the actual memory used to 5%. The reason we do this here and not in Meilisearch - // is because we still use the old indexer for the settings and it is highly impacted by the - // max memory. So we keep the changes here and will remove these changes once we use the new - // indexer to also index settings. Related to #5125 and #5141. - let grenad_parameters = GrenadParameters { - max_memory: grenad_parameters.max_memory.map(|mm| mm * 5 / 100), - ..grenad_parameters - }; - - // 5% percent of the allocated memory for the extractors, or min 100MiB - // 5% percent of the allocated memory for the bbqueues, or min 50MiB - // - // Minimum capacity for bbqueues - let minimum_total_bbbuffer_capacity = 50 * 1024 * 1024 * pool.current_num_threads(); // 50 MiB - let minimum_total_extractors_capacity = minimum_total_bbbuffer_capacity * 2; - - let (grenad_parameters, total_bbbuffer_capacity) = grenad_parameters.max_memory.map_or( - ( - GrenadParameters { - max_memory: Some(minimum_total_extractors_capacity), - ..grenad_parameters - }, - minimum_total_bbbuffer_capacity, - ), // 100 MiB by thread by default - |max_memory| { - let total_bbbuffer_capacity = max_memory.max(minimum_total_bbbuffer_capacity); - let new_grenad_parameters = GrenadParameters { - max_memory: Some(max_memory.max(minimum_total_extractors_capacity)), - ..grenad_parameters - }; - (new_grenad_parameters, total_bbbuffer_capacity) - }, - ); - - LOG_MEMORY_METRICS_ONCE.call_once(|| { - tracing::debug!( - "Indexation allocated memory metrics - \ - Total BBQueue size: {total_bbbuffer_capacity}, \ - Total extractor memory: {:?}", - grenad_parameters.max_memory, - ); - }); + let (grenad_parameters, total_bbbuffer_capacity) = + indexer_memory_settings(pool.current_num_threads(), grenad_parameters); let (extractor_sender, writer_receiver) = pool .install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000)) @@ -179,10 +142,7 @@ where let dimensions = embedder.dimensions(); let writer = ArroyWrapper::new(vector_arroy, embedder_index, *was_quantized); - Ok(( - embedder_index, - (embedder_name.as_str(), embedder.as_ref(), writer, dimensions), - )) + Ok((embedder_index, (embedder.as_ref(), writer, dimensions))) }) .collect(); @@ -238,3 +198,234 @@ where Ok(congestion) } + +#[allow(clippy::too_many_arguments)] // clippy: 😝 +pub fn reindex<'pl, 'indexer, 'index, MSP, SD>( + wtxn: &mut RwTxn<'index>, + index: &'index Index, + pool: &ThreadPoolNoAbort, + grenad_parameters: GrenadParameters, + settings_delta: &'indexer SD, + must_stop_processing: &'indexer MSP, + progress: &'indexer Progress, +) -> Result +where + MSP: Fn() -> bool + Sync, + SD: SettingsDelta + Sync, +{ + delete_old_embedders(wtxn, index, settings_delta)?; + + let mut bbbuffers = Vec::new(); + let finished_extraction = AtomicBool::new(false); + + let arroy_memory = grenad_parameters.max_memory; + + let (grenad_parameters, total_bbbuffer_capacity) = + indexer_memory_settings(pool.current_num_threads(), grenad_parameters); + + let (extractor_sender, writer_receiver) = pool + .install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000)) + .unwrap(); + + let mut extractor_allocs = ThreadLocal::with_capacity(rayon::current_num_threads()); + + let db_fields_ids_map = index.fields_ids_map(wtxn)?; + let new_fields_ids_map = settings_delta.new_fields_ids_map().clone(); + let new_fields_ids_map = RwLock::new(new_fields_ids_map); + let fields_ids_map_store = ThreadLocal::with_capacity(rayon::current_num_threads()); + let doc_allocs = ThreadLocal::with_capacity(rayon::current_num_threads()); + + let indexing_context = IndexingContext { + index, + db_fields_ids_map: &db_fields_ids_map, + new_fields_ids_map: &new_fields_ids_map, + doc_allocs: &doc_allocs, + fields_ids_map_store: &fields_ids_map_store, + must_stop_processing, + progress, + grenad_parameters: &grenad_parameters, + }; + + let index_embeddings = index.embedding_configs(wtxn)?; + let mut field_distribution = index.field_distribution(wtxn)?; + let mut modified_docids = roaring::RoaringBitmap::new(); + + let congestion = thread::scope(|s| -> Result { + let indexer_span = tracing::Span::current(); + let finished_extraction = &finished_extraction; + // prevent moving the field_distribution and document_ids in the inner closure... + let field_distribution = &mut field_distribution; + let modified_docids = &mut modified_docids; + let extractor_handle = + Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || { + pool.install(move || { + extract::extract_all_settings_changes( + indexing_context, + indexer_span, + extractor_sender, + settings_delta, + &mut extractor_allocs, + finished_extraction, + field_distribution, + index_embeddings, + modified_docids, + ) + }) + .unwrap() + })?; + + let new_embedders = settings_delta.new_embedders(); + let embedder_actions = settings_delta.embedder_actions(); + let mut arroy_writers = + arroy_writers_from_embedder_actions(wtxn, index, &embedder_actions, &new_embedders)?; + + let congestion = + write_to_db(writer_receiver, finished_extraction, index, wtxn, &arroy_writers)?; + + indexing_context.progress.update_progress(IndexingStep::WaitingForExtractors); + + let index_embeddings = extractor_handle.join().unwrap()?; + + indexing_context.progress.update_progress(IndexingStep::WritingEmbeddingsToDatabase); + + pool.install(|| { + build_vectors( + index, + wtxn, + indexing_context.progress, + index_embeddings, + arroy_memory, + &mut arroy_writers, + &indexing_context.must_stop_processing, + ) + }) + .unwrap()?; + + indexing_context.progress.update_progress(IndexingStep::Finalizing); + + Ok(congestion) as Result<_> + })?; + + // required to into_inner the new_fields_ids_map + drop(fields_ids_map_store); + + let new_fields_ids_map = new_fields_ids_map.into_inner().unwrap(); + let document_ids = index.documents_ids(wtxn)?; + update_index( + index, + wtxn, + new_fields_ids_map, + None, + settings_delta.new_embedders().clone(), + field_distribution, + document_ids, + )?; + + Ok(congestion) +} + +fn arroy_writers_from_embedder_actions<'indexer, 'index>( + wtxn: &mut RwTxn<'index>, + index: &'index Index, + embedder_actions: &'indexer BTreeMap, + embedders: &'indexer EmbeddingConfigs, +) -> Result> { + let vector_arroy = index.vector_arroy; + + embedders + .inner_as_ref() + .iter() + .filter_map(|(embedder_name, (embedder, _, _))| match embedder_actions.get(embedder_name) { + None => None, + Some(action) if action.write_back().is_some() => None, + Some(action) => { + let embedder_index = match index.embedder_category_id.get(wtxn, embedder_name) { + Ok(Some(embedder_index)) => embedder_index, + Ok(None) => { + return Some(Err(InternalError::DatabaseMissingEntry { + db_name: "embedder_category_id", + key: None, + } + .into())) + } + Err(e) => return Some(Err(e.into())), + }; + + let writer = + ArroyWrapper::new(vector_arroy, embedder_index, action.is_being_quantized); + let dimensions = embedder.dimensions(); + Some(Ok((embedder_index, (embedder.as_ref(), writer, dimensions)))) + } + }) + .collect() +} + +fn delete_old_embedders<'indexer, 'index, SD>( + wtxn: &mut RwTxn<'_>, + index: &'index Index, + settings_delta: &'indexer SD, +) -> Result<()> +where + SD: SettingsDelta, +{ + for (_name, action) in settings_delta.embedder_actions() { + if let Some(WriteBackToDocuments { embedder_id, .. }) = action.write_back() { + let reader = ArroyWrapper::new(index.vector_arroy, *embedder_id, action.was_quantized); + let dimensions = reader.dimensions(wtxn)?; + reader.clear(wtxn, dimensions)?; + } + } + + Ok(()) +} + +fn indexer_memory_settings( + current_num_threads: usize, + grenad_parameters: GrenadParameters, +) -> (GrenadParameters, usize) { + // We reduce the actual memory used to 5%. The reason we do this here and not in Meilisearch + // is because we still use the old indexer for the settings and it is highly impacted by the + // max memory. So we keep the changes here and will remove these changes once we use the new + // indexer to also index settings. Related to #5125 and #5141. + let grenad_parameters = GrenadParameters { + max_memory: grenad_parameters.max_memory.map(|mm| mm * 5 / 100), + ..grenad_parameters + }; + + // 5% percent of the allocated memory for the extractors, or min 100MiB + // 5% percent of the allocated memory for the bbqueues, or min 50MiB + // + // Minimum capacity for bbqueues + let minimum_total_bbbuffer_capacity = 50 * 1024 * 1024 * current_num_threads; + // 50 MiB + let minimum_total_extractors_capacity = minimum_total_bbbuffer_capacity * 2; + + let (grenad_parameters, total_bbbuffer_capacity) = grenad_parameters.max_memory.map_or( + ( + GrenadParameters { + max_memory: Some(minimum_total_extractors_capacity), + ..grenad_parameters + }, + minimum_total_bbbuffer_capacity, + ), // 100 MiB by thread by default + |max_memory| { + let total_bbbuffer_capacity = max_memory.max(minimum_total_bbbuffer_capacity); + let new_grenad_parameters = GrenadParameters { + max_memory: Some(max_memory.max(minimum_total_extractors_capacity)), + ..grenad_parameters + }; + (new_grenad_parameters, total_bbbuffer_capacity) + }, + ); + + LOG_MEMORY_METRICS_ONCE.call_once(|| { + tracing::debug!( + "Indexation allocated memory metrics - \ + Total BBQueue size: {total_bbbuffer_capacity}, \ + Total extractor memory: {:?}", + grenad_parameters.max_memory, + ); + }); + + (grenad_parameters, total_bbbuffer_capacity) +} diff --git a/crates/milli/src/update/new/indexer/settings_changes.rs b/crates/milli/src/update/new/indexer/settings_changes.rs index 0cd9cc405..f92935399 100644 --- a/crates/milli/src/update/new/indexer/settings_changes.rs +++ b/crates/milli/src/update/new/indexer/settings_changes.rs @@ -1,23 +1,19 @@ -use std::cell::{Cell, RefCell}; use std::sync::atomic::Ordering; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use bumpalo::Bump; -use heed::{RoTxn, WithoutTls}; use rayon::iter::IndexedParallelIterator; +use rayon::slice::ParallelSlice; -use super::super::document_change::DocumentChange; use super::document_changes::IndexingContext; -use crate::fields_ids_map::metadata::FieldIdMapWithMetadata; -use crate::progress::{AtomicDocumentStep, Progress}; -use crate::update::new::document_change::SettingsChangeDocument; +use crate::documents::PrimaryKey; +use crate::progress::AtomicDocumentStep; +use crate::update::new::document_change::DatabaseDocument; use crate::update::new::indexer::document_changes::DocumentChangeContext; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _; use crate::update::new::steps::IndexingStep; use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal}; -use crate::update::settings::SettingsDelta; -use crate::update::GrenadParameters; -use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result}; +use crate::{DocumentId, InternalError, Result}; /// An internal iterator (i.e. using `foreach`) of `DocumentChange`s pub trait SettingsChangeExtractor<'extractor>: Sync { @@ -27,46 +23,62 @@ pub trait SettingsChangeExtractor<'extractor>: Sync { fn process<'doc>( &'doc self, - changes: impl Iterator>>, + changes: impl Iterator>>, context: &'doc DocumentChangeContext, ) -> Result<()>; } +pub struct DatabaseDocuments<'indexer> { + documents: &'indexer [DocumentId], + primary_key: PrimaryKey<'indexer>, +} -pub trait SettingsChangeDocuments<'pl // lifetime of the underlying payload ->: Sync { - type Item: Send; - - fn iter(&self, chunk_size: usize) -> impl IndexedParallelIterator>; - - fn len(&self) -> usize; - - fn is_empty(&self) -> bool { - self.len() == 0 +impl<'indexer> DatabaseDocuments<'indexer> { + pub fn new(documents: &'indexer [DocumentId], primary_key: PrimaryKey<'indexer>) -> Self { + Self { documents, primary_key } } - fn item_to_settings_change_document<'doc, // lifetime of a single `process` call - T: MostlySend>( + fn iter(&self, chunk_size: usize) -> impl IndexedParallelIterator { + self.documents.par_chunks(chunk_size) + } + + fn item_to_database_document< + 'doc, // lifetime of a single `process` call + T: MostlySend, + >( &'doc self, context: &'doc DocumentChangeContext, - item: &'doc Self::Item, - ) -> Result>> where 'pl: 'doc // the payload must survive the process calls - ; + docid: &'doc DocumentId, + ) -> Result>> { + let current = context.index.document(&context.rtxn, *docid)?; + + let external_document_id = self.primary_key.extract_docid_from_db( + current, + &context.db_fields_ids_map, + &context.doc_alloc, + )?; + + let external_document_id = external_document_id.to_bump(&context.doc_alloc); + + Ok(Some(DatabaseDocument::create(*docid, external_document_id))) + } + + fn len(&self) -> usize { + self.documents.len() + } } const CHUNK_SIZE: usize = 100; pub fn settings_change_extract< - 'pl, // covariant lifetime of the underlying payload 'extractor, // invariant lifetime of extractor_alloc 'fid, // invariant lifetime of fields ids map 'indexer, // covariant lifetime of objects that are borrowed during the entire indexing 'data, // invariant on EX::Data lifetime of datastore 'index, // covariant lifetime of the index - EX, - SCD: SettingsChangeDocuments<'pl>, - MSP, + EX: SettingsChangeExtractor<'extractor>, + MSP: Fn() -> bool + Sync, >( - document_changes: &SCD, + documents: &'indexer DatabaseDocuments<'indexer>, extractor: &EX, IndexingContext { index, @@ -81,11 +93,7 @@ pub fn settings_change_extract< extractor_allocs: &'extractor mut ThreadLocal>, datastore: &'data ThreadLocal, step: IndexingStep, -) -> Result<()> -where - EX: SettingsChangeExtractor<'extractor>, - MSP: Fn() -> bool + Sync, -{ +) -> Result<()> { tracing::trace!("We are resetting the extractor allocators"); progress.update_progress(step); // Clean up and reuse the extractor allocs @@ -94,11 +102,11 @@ where extractor_alloc.0.reset(); } - let total_documents = document_changes.len() as u32; + let total_documents = documents.len() as u32; let (step, progress_step) = AtomicDocumentStep::new(total_documents); progress.update_progress(progress_step); - let pi = document_changes.iter(CHUNK_SIZE); + let pi = documents.iter(CHUNK_SIZE); pi.try_arc_for_each_try_init( || { DocumentChangeContext::new( @@ -121,9 +129,9 @@ where context.doc_alloc.reset(); let items = items.as_ref(); - let changes = items.iter().filter_map(|item| { - document_changes.item_to_settings_change_document(context, item).transpose() - }); + let changes = items + .iter() + .filter_map(|item| documents.item_to_database_document(context, item).transpose()); let res = extractor.process(changes, context).map_err(Arc::new); step.fetch_add(items.as_ref().len() as u32, Ordering::Relaxed); diff --git a/crates/milli/src/update/new/indexer/update_by_function.rs b/crates/milli/src/update/new/indexer/update_by_function.rs index 3001648e6..694645d28 100644 --- a/crates/milli/src/update/new/indexer/update_by_function.rs +++ b/crates/milli/src/update/new/indexer/update_by_function.rs @@ -13,7 +13,7 @@ use crate::error::{FieldIdMapMissingEntry, InternalError}; use crate::update::new::document::Versions; use crate::update::new::ref_cell_ext::RefCellExt as _; use crate::update::new::thread_local::MostlySend; -use crate::update::new::{Deletion, DocumentChange, KvReaderFieldId, Update}; +use crate::update::new::{DatabaseDocument, DocumentChange, KvReaderFieldId, Update}; use crate::{all_obkv_to_json, Error, FieldsIdsMap, Object, Result, UserError}; pub struct UpdateByFunction { @@ -128,10 +128,9 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> { match scope.remove::("doc") { // If the "doc" variable has been set to (), we effectively delete the document. - Some(doc) if doc.is_unit() => Ok(Some(DocumentChange::Deletion(Deletion::create( - docid, - doc_alloc.alloc_str(&document_id), - )))), + Some(doc) if doc.is_unit() => Ok(Some(DocumentChange::Deletion( + DatabaseDocument::create(docid, doc_alloc.alloc_str(&document_id)), + ))), None => unreachable!("missing doc variable from the Rhai scope"), Some(new_document) => match new_document.try_cast() { Some(new_rhai_document) => { diff --git a/crates/milli/src/update/new/indexer/write.rs b/crates/milli/src/update/new/indexer/write.rs index 5a600eeb3..0a3ca8196 100644 --- a/crates/milli/src/update/new/indexer/write.rs +++ b/crates/milli/src/update/new/indexer/write.rs @@ -21,7 +21,7 @@ pub fn write_to_db( finished_extraction: &AtomicBool, index: &Index, wtxn: &mut RwTxn<'_>, - arroy_writers: &HashMap, + arroy_writers: &HashMap, ) -> Result { // Used by by the ArroySetVector to copy the embedding into an // aligned memory area, required by arroy to accept a new vector. @@ -53,7 +53,7 @@ pub fn write_to_db( } ReceiverAction::LargeVectors(large_vectors) => { let LargeVectors { docid, embedder_id, .. } = large_vectors; - let (_, _, writer, dimensions) = + let (_, writer, dimensions) = arroy_writers.get(&embedder_id).expect("requested a missing embedder"); let mut embeddings = Embeddings::new(*dimensions); for embedding in large_vectors.read_embeddings(*dimensions) { @@ -105,7 +105,7 @@ pub fn build_vectors( progress: &Progress, index_embeddings: Vec, arroy_memory: Option, - arroy_writers: &mut HashMap, + arroy_writers: &mut HashMap, must_stop_processing: &MSP, ) -> Result<()> where @@ -117,7 +117,7 @@ where let seed = rand::random(); let mut rng = rand::rngs::StdRng::seed_from_u64(seed); - for (_index, (_embedder_name, _embedder, writer, dimensions)) in arroy_writers { + for (_index, (_embedder, writer, dimensions)) in arroy_writers { let dimensions = *dimensions; writer.build_and_quantize( wtxn, @@ -166,7 +166,7 @@ pub fn write_from_bbqueue( writer_receiver: &mut WriterBbqueueReceiver<'_>, index: &Index, wtxn: &mut RwTxn<'_>, - arroy_writers: &HashMap, + arroy_writers: &HashMap, aligned_embedding: &mut Vec, ) -> crate::Result<()> { while let Some(frame_with_header) = writer_receiver.recv_frame() { @@ -207,7 +207,7 @@ pub fn write_from_bbqueue( } } EntryHeader::ArroyDeleteVector(ArroyDeleteVector { docid }) => { - for (_index, (_name, _embedder, writer, dimensions)) in arroy_writers { + for (_index, (_embedder, writer, dimensions)) in arroy_writers { let dimensions = *dimensions; writer.del_items(wtxn, dimensions, docid)?; } @@ -215,7 +215,7 @@ pub fn write_from_bbqueue( EntryHeader::ArroySetVectors(asvs) => { let ArroySetVectors { docid, embedder_id, .. } = asvs; let frame = frame_with_header.frame(); - let (_, _, writer, dimensions) = + let (_, writer, dimensions) = arroy_writers.get(&embedder_id).expect("requested a missing embedder"); let mut embeddings = Embeddings::new(*dimensions); let all_embeddings = asvs.read_all_embeddings_into_vec(frame, aligned_embedding); diff --git a/crates/milli/src/update/new/mod.rs b/crates/milli/src/update/new/mod.rs index 81ff93e54..e3adc5bde 100644 --- a/crates/milli/src/update/new/mod.rs +++ b/crates/milli/src/update/new/mod.rs @@ -1,4 +1,4 @@ -pub use document_change::{Deletion, DocumentChange, Insertion, Update}; +pub use document_change::{DatabaseDocument, DocumentChange, Insertion, Update}; pub use indexer::ChannelCongestion; pub use merger::{ merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases, FacetFieldIdsDelta, diff --git a/crates/milli/src/update/settings.rs b/crates/milli/src/update/settings.rs index 5145ff4b2..1e9f641e8 100644 --- a/crates/milli/src/update/settings.rs +++ b/crates/milli/src/update/settings.rs @@ -31,13 +31,17 @@ use crate::progress::Progress; use crate::prompt::{default_max_bytes, default_template_text, PromptData}; use crate::proximity::ProximityPrecision; use crate::update::index_documents::IndexDocumentsMethod; +use crate::update::new::indexer::reindex; use crate::update::{IndexDocuments, UpdateIndexingStep}; use crate::vector::settings::{ EmbedderAction, EmbedderSource, EmbeddingSettings, NestingContext, ReindexAction, SubEmbeddingSettings, WriteBackToDocuments, }; use crate::vector::{Embedder, EmbeddingConfig, EmbeddingConfigs}; -use crate::{FieldId, FilterableAttributesRule, Index, LocalizedAttributesRule, Result}; +use crate::{ + ChannelCongestion, FieldId, FieldsIdsMap, FilterableAttributesRule, Index, + LocalizedAttributesRule, Result, ThreadPoolNoAbortBuilder, +}; #[derive(Debug, Clone, PartialEq, Eq, Copy)] pub enum Setting { @@ -1424,16 +1428,18 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> { mut self, must_stop_processing: &'indexer MSP, progress: &'indexer Progress, - ) -> Result<()> + ) -> Result> where MSP: Fn() -> bool + Sync, { // force the old indexer if the environment says so if std::env::var_os("MEILI_EXPERIMENTAL_NO_EDITION_2024_FOR_SETTINGS").is_some() { - return self.execute( - |indexing_step| tracing::debug!("update: {:?}", indexing_step), - must_stop_processing, - ); + return self + .execute( + |indexing_step| tracing::debug!("update: {:?}", indexing_step), + must_stop_processing, + ) + .map(|_| None); } // only use the new indexer when only the embedder possibly changed @@ -1469,7 +1475,40 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> { indexer_config: _, } = &self { + self.index.set_updated_at(self.wtxn, &OffsetDateTime::now_utc())?; + + let old_inner_settings = InnerIndexSettings::from_index(self.index, self.wtxn, None)?; + + // Update index settings + let embedding_config_updates = self.update_embedding_configs()?; + + let mut new_inner_settings = + InnerIndexSettings::from_index(self.index, self.wtxn, None)?; + new_inner_settings.recompute_searchables(self.wtxn, self.index)?; + + let primary_key_id = self + .index + .primary_key(self.wtxn)? + .and_then(|name| new_inner_settings.fields_ids_map.id(name)); + let settings_update_only = true; + let inner_settings_diff = InnerIndexSettingsDiff::new( + old_inner_settings, + new_inner_settings, + primary_key_id, + embedding_config_updates, + settings_update_only, + ); + todo!() + // reindex( + // self.wtxn, + // self.index, + // &self.indexer_config.pool, + // self.indexer_config.grenad_parameters(), + // &inner_settings_diff, + // must_stop_processing, + // progress, + // ) // 1. First we want to update the database and compute the settings diff, we might reuse a bunch of existing functions here // 2. Pick which pipelines we need to run. // 3. Execute extraction pipelines @@ -1486,6 +1525,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> { |indexing_step| tracing::debug!("update: {:?}", indexing_step), must_stop_processing, ) + .map(|_| None) } // create rtxn, populate FieldIdMapWithMetadata (old + new) @@ -2179,15 +2219,31 @@ fn deserialize_sub_embedder( } } +/// Implement this trait for the settings delta type. +/// This is used in the new settings update flow and will allow to easily replace the old settings delta type: `InnerIndexSettingsDiff`. pub trait SettingsDelta { - fn new_embedding_configs(&self) -> &EmbeddingConfigs; + fn new_embedders(&self) -> &EmbeddingConfigs; + fn old_embedders(&self) -> &EmbeddingConfigs; fn embedder_actions(&self) -> &BTreeMap; + fn new_fields_ids_map(&self) -> &FieldIdMapWithMetadata; } impl SettingsDelta for InnerIndexSettingsDiff { + fn new_embedders(&self) -> &EmbeddingConfigs { + &self.new.embedding_configs + } + + fn old_embedders(&self) -> &EmbeddingConfigs { + &self.old.embedding_configs + } + fn embedder_actions(&self) -> &BTreeMap { &self.embedding_config_updates } + + fn new_fields_ids_map(&self) -> &FieldIdMapWithMetadata { + &self.new.fields_ids_map + } } #[cfg(test)]