mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-30 23:46:28 +00:00 
			
		
		
		
	Merge #5168
5168: Refactor indexer r=ManyTheFish a=dureuill # Pull Request Split the indexer mod into multiple submodules. This restores the ability of rustfmt to format the file 🎉 Co-authored-by: Louis Dureuil <louis@meilisearch.com>
This commit is contained in:
		| @@ -28,7 +28,7 @@ use crate::{DocumentId, FieldId, Index, Result, MAX_FACET_VALUE_LENGTH}; | ||||
| pub struct FacetedExtractorData<'a, 'b> { | ||||
|     attributes_to_extract: &'a [&'a str], | ||||
|     sender: &'a FieldIdDocidFacetSender<'a, 'b>, | ||||
|     grenad_parameters: GrenadParameters, | ||||
|     grenad_parameters: &'a GrenadParameters, | ||||
|     buckets: usize, | ||||
| } | ||||
|  | ||||
| @@ -374,7 +374,6 @@ fn truncate_str(s: &str) -> &str { | ||||
| impl FacetedDocidsExtractor { | ||||
|     #[tracing::instrument(level = "trace", skip_all, target = "indexing::extract::faceted")] | ||||
|     pub fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>( | ||||
|         grenad_parameters: GrenadParameters, | ||||
|         document_changes: &DC, | ||||
|         indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>, | ||||
|         extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>, | ||||
| @@ -398,7 +397,7 @@ impl FacetedDocidsExtractor { | ||||
|  | ||||
|             let extractor = FacetedExtractorData { | ||||
|                 attributes_to_extract: &attributes_to_extract, | ||||
|                 grenad_parameters, | ||||
|                 grenad_parameters: indexing_context.grenad_parameters, | ||||
|                 buckets: rayon::current_num_threads(), | ||||
|                 sender, | ||||
|             }; | ||||
|   | ||||
| @@ -18,12 +18,10 @@ pub use vectors::EmbeddingExtractor; | ||||
| use super::indexer::document_changes::{DocumentChanges, IndexingContext}; | ||||
| use super::steps::IndexingStep; | ||||
| use super::thread_local::{FullySend, ThreadLocal}; | ||||
| use crate::update::GrenadParameters; | ||||
| use crate::Result; | ||||
|  | ||||
| pub trait DocidsExtractor { | ||||
|     fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>( | ||||
|         grenad_parameters: GrenadParameters, | ||||
|         document_changes: &DC, | ||||
|         indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>, | ||||
|         extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>, | ||||
|   | ||||
| @@ -208,7 +208,7 @@ impl<'extractor> WordDocidsCaches<'extractor> { | ||||
|  | ||||
| pub struct WordDocidsExtractorData<'a> { | ||||
|     tokenizer: &'a DocumentTokenizer<'a>, | ||||
|     grenad_parameters: GrenadParameters, | ||||
|     grenad_parameters: &'a GrenadParameters, | ||||
|     buckets: usize, | ||||
| } | ||||
|  | ||||
| @@ -240,7 +240,6 @@ pub struct WordDocidsExtractors; | ||||
|  | ||||
| impl WordDocidsExtractors { | ||||
|     pub fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>( | ||||
|         grenad_parameters: GrenadParameters, | ||||
|         document_changes: &DC, | ||||
|         indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>, | ||||
|         extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>, | ||||
| @@ -288,7 +287,7 @@ impl WordDocidsExtractors { | ||||
|  | ||||
|             let extractor = WordDocidsExtractorData { | ||||
|                 tokenizer: &document_tokenizer, | ||||
|                 grenad_parameters, | ||||
|                 grenad_parameters: indexing_context.grenad_parameters, | ||||
|                 buckets: rayon::current_num_threads(), | ||||
|             }; | ||||
|  | ||||
|   | ||||
| @@ -24,7 +24,7 @@ use crate::{Index, Result, MAX_POSITION_PER_ATTRIBUTE}; | ||||
|  | ||||
| pub struct SearchableExtractorData<'a, EX: SearchableExtractor> { | ||||
|     tokenizer: &'a DocumentTokenizer<'a>, | ||||
|     grenad_parameters: GrenadParameters, | ||||
|     grenad_parameters: &'a GrenadParameters, | ||||
|     buckets: usize, | ||||
|     _ex: PhantomData<EX>, | ||||
| } | ||||
| @@ -57,7 +57,6 @@ impl<'a, 'extractor, EX: SearchableExtractor + Sync> Extractor<'extractor> | ||||
|  | ||||
| pub trait SearchableExtractor: Sized + Sync { | ||||
|     fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>( | ||||
|         grenad_parameters: GrenadParameters, | ||||
|         document_changes: &DC, | ||||
|         indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>, | ||||
|         extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>, | ||||
| @@ -96,7 +95,7 @@ pub trait SearchableExtractor: Sized + Sync { | ||||
|  | ||||
|         let extractor_data: SearchableExtractorData<Self> = SearchableExtractorData { | ||||
|             tokenizer: &document_tokenizer, | ||||
|             grenad_parameters, | ||||
|             grenad_parameters: indexing_context.grenad_parameters, | ||||
|             buckets: rayon::current_num_threads(), | ||||
|             _ex: PhantomData, | ||||
|         }; | ||||
| @@ -134,7 +133,6 @@ pub trait SearchableExtractor: Sized + Sync { | ||||
|  | ||||
| impl<T: SearchableExtractor> DocidsExtractor for T { | ||||
|     fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>( | ||||
|         grenad_parameters: GrenadParameters, | ||||
|         document_changes: &DC, | ||||
|         indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>, | ||||
|         extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>, | ||||
| @@ -143,12 +141,6 @@ impl<T: SearchableExtractor> DocidsExtractor for T { | ||||
|     where | ||||
|         MSP: Fn() -> bool + Sync, | ||||
|     { | ||||
|         Self::run_extraction( | ||||
|             grenad_parameters, | ||||
|             document_changes, | ||||
|             indexing_context, | ||||
|             extractor_allocs, | ||||
|             step, | ||||
|         ) | ||||
|         Self::run_extraction(document_changes, indexing_context, extractor_allocs, step) | ||||
|     } | ||||
| } | ||||
|   | ||||
| @@ -12,6 +12,7 @@ use crate::progress::{AtomicDocumentStep, Progress}; | ||||
| use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _; | ||||
| use crate::update::new::steps::IndexingStep; | ||||
| use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal}; | ||||
| use crate::update::GrenadParameters; | ||||
| use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result}; | ||||
|  | ||||
| pub struct DocumentChangeContext< | ||||
| @@ -145,6 +146,7 @@ pub struct IndexingContext< | ||||
|     pub fields_ids_map_store: &'indexer ThreadLocal<FullySend<RefCell<GlobalFieldsIdsMap<'fid>>>>, | ||||
|     pub must_stop_processing: &'indexer MSP, | ||||
|     pub progress: &'indexer Progress, | ||||
|     pub grenad_parameters: &'indexer GrenadParameters, | ||||
| } | ||||
|  | ||||
| impl< | ||||
| @@ -207,6 +209,7 @@ pub fn extract< | ||||
|         fields_ids_map_store, | ||||
|         must_stop_processing, | ||||
|         progress, | ||||
|         grenad_parameters: _, | ||||
|     }: IndexingContext<'fid, 'indexer, 'index, MSP>, | ||||
|     extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>, | ||||
|     datastore: &'data ThreadLocal<EX::Data>, | ||||
|   | ||||
| @@ -166,6 +166,7 @@ mod test { | ||||
|             fields_ids_map_store: &fields_ids_map_store, | ||||
|             must_stop_processing: &(|| false), | ||||
|             progress: &Progress::default(), | ||||
|             grenad_parameters: &Default::default(), | ||||
|         }; | ||||
|  | ||||
|         for _ in 0..3 { | ||||
|   | ||||
| @@ -13,7 +13,7 @@ use serde_json::Deserializer; | ||||
|  | ||||
| use super::super::document_change::DocumentChange; | ||||
| use super::document_changes::{DocumentChangeContext, DocumentChanges}; | ||||
| use super::retrieve_or_guess_primary_key; | ||||
| use super::guess_primary_key::retrieve_or_guess_primary_key; | ||||
| use crate::documents::PrimaryKey; | ||||
| use crate::progress::{AtomicPayloadStep, Progress}; | ||||
| use crate::update::new::document::Versions; | ||||
|   | ||||
							
								
								
									
										309
									
								
								crates/milli/src/update/new/indexer/extract.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										309
									
								
								crates/milli/src/update/new/indexer/extract.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,309 @@ | ||||
| use std::collections::BTreeMap; | ||||
| use std::sync::atomic::AtomicBool; | ||||
| use std::sync::OnceLock; | ||||
|  | ||||
| use bumpalo::Bump; | ||||
| use roaring::RoaringBitmap; | ||||
| use tracing::Span; | ||||
|  | ||||
| use super::super::channel::*; | ||||
| use super::super::extract::*; | ||||
| use super::super::steps::IndexingStep; | ||||
| use super::super::thread_local::{FullySend, ThreadLocal}; | ||||
| use super::super::FacetFieldIdsDelta; | ||||
| use super::document_changes::{extract, DocumentChanges, IndexingContext}; | ||||
| use crate::index::IndexEmbeddingConfig; | ||||
| use crate::proximity::ProximityPrecision; | ||||
| use crate::update::new::extract::EmbeddingExtractor; | ||||
| use crate::update::new::merger::merge_and_send_rtree; | ||||
| use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases}; | ||||
| use crate::vector::EmbeddingConfigs; | ||||
| use crate::{Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder}; | ||||
|  | ||||
| #[allow(clippy::too_many_arguments)] | ||||
| pub(super) fn extract_all<'pl, 'extractor, DC, MSP>( | ||||
|     document_changes: &DC, | ||||
|     indexing_context: IndexingContext<MSP>, | ||||
|     indexer_span: Span, | ||||
|     extractor_sender: ExtractorBbqueueSender, | ||||
|     embedders: &EmbeddingConfigs, | ||||
|     extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>, | ||||
|     finished_extraction: &AtomicBool, | ||||
|     field_distribution: &mut BTreeMap<String, u64>, | ||||
|     mut index_embeddings: Vec<IndexEmbeddingConfig>, | ||||
|     document_ids: &mut RoaringBitmap, | ||||
| ) -> Result<(FacetFieldIdsDelta, Vec<IndexEmbeddingConfig>)> | ||||
| where | ||||
|     DC: DocumentChanges<'pl>, | ||||
|     MSP: Fn() -> bool + Sync, | ||||
| { | ||||
|     let span = | ||||
|         tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract"); | ||||
|     let _entered = span.enter(); | ||||
|  | ||||
|     let index = indexing_context.index; | ||||
|     let rtxn = index.read_txn()?; | ||||
|  | ||||
|     // document but we need to create a function that collects and compresses documents. | ||||
|     let document_sender = extractor_sender.documents(); | ||||
|     let document_extractor = DocumentsExtractor::new(document_sender, embedders); | ||||
|     let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); | ||||
|     { | ||||
|         let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "documents"); | ||||
|         let _entered = span.enter(); | ||||
|         extract( | ||||
|             document_changes, | ||||
|             &document_extractor, | ||||
|             indexing_context, | ||||
|             extractor_allocs, | ||||
|             &datastore, | ||||
|             IndexingStep::ExtractingDocuments, | ||||
|         )?; | ||||
|     } | ||||
|     { | ||||
|         let span = tracing::trace_span!(target: "indexing::documents::merge", parent: &indexer_span, "documents"); | ||||
|         let _entered = span.enter(); | ||||
|         for document_extractor_data in datastore { | ||||
|             let document_extractor_data = document_extractor_data.0.into_inner(); | ||||
|             for (field, delta) in document_extractor_data.field_distribution_delta { | ||||
|                 let current = field_distribution.entry(field).or_default(); | ||||
|                 // adding the delta should never cause a negative result, as we are removing fields that previously existed. | ||||
|                 *current = current.saturating_add_signed(delta); | ||||
|             } | ||||
|             document_extractor_data.docids_delta.apply_to(document_ids); | ||||
|         } | ||||
|  | ||||
|         field_distribution.retain(|_, v| *v != 0); | ||||
|     } | ||||
|  | ||||
|     let facet_field_ids_delta; | ||||
|  | ||||
|     { | ||||
|         let caches = { | ||||
|             let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "faceted"); | ||||
|             let _entered = span.enter(); | ||||
|  | ||||
|             FacetedDocidsExtractor::run_extraction( | ||||
|                 document_changes, | ||||
|                 indexing_context, | ||||
|                 extractor_allocs, | ||||
|                 &extractor_sender.field_id_docid_facet_sender(), | ||||
|                 IndexingStep::ExtractingFacets, | ||||
|             )? | ||||
|         }; | ||||
|  | ||||
|         { | ||||
|             let span = tracing::trace_span!(target: "indexing::documents::merge", parent: &indexer_span, "faceted"); | ||||
|             let _entered = span.enter(); | ||||
|  | ||||
|             facet_field_ids_delta = merge_and_send_facet_docids( | ||||
|                 caches, | ||||
|                 FacetDatabases::new(index), | ||||
|                 index, | ||||
|                 extractor_sender.facet_docids(), | ||||
|             )?; | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     { | ||||
|         let WordDocidsCaches { | ||||
|             word_docids, | ||||
|             word_fid_docids, | ||||
|             exact_word_docids, | ||||
|             word_position_docids, | ||||
|             fid_word_count_docids, | ||||
|         } = { | ||||
|             let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids"); | ||||
|             let _entered = span.enter(); | ||||
|  | ||||
|             WordDocidsExtractors::run_extraction( | ||||
|                 document_changes, | ||||
|                 indexing_context, | ||||
|                 extractor_allocs, | ||||
|                 IndexingStep::ExtractingWords, | ||||
|             )? | ||||
|         }; | ||||
|  | ||||
|         { | ||||
|             let span = tracing::trace_span!(target: "indexing::documents::merge", "word_docids"); | ||||
|             let _entered = span.enter(); | ||||
|             merge_and_send_docids( | ||||
|                 word_docids, | ||||
|                 index.word_docids.remap_types(), | ||||
|                 index, | ||||
|                 extractor_sender.docids::<WordDocids>(), | ||||
|                 &indexing_context.must_stop_processing, | ||||
|             )?; | ||||
|         } | ||||
|  | ||||
|         { | ||||
|             let span = | ||||
|                 tracing::trace_span!(target: "indexing::documents::merge", "word_fid_docids"); | ||||
|             let _entered = span.enter(); | ||||
|             merge_and_send_docids( | ||||
|                 word_fid_docids, | ||||
|                 index.word_fid_docids.remap_types(), | ||||
|                 index, | ||||
|                 extractor_sender.docids::<WordFidDocids>(), | ||||
|                 &indexing_context.must_stop_processing, | ||||
|             )?; | ||||
|         } | ||||
|  | ||||
|         { | ||||
|             let span = | ||||
|                 tracing::trace_span!(target: "indexing::documents::merge", "exact_word_docids"); | ||||
|             let _entered = span.enter(); | ||||
|             merge_and_send_docids( | ||||
|                 exact_word_docids, | ||||
|                 index.exact_word_docids.remap_types(), | ||||
|                 index, | ||||
|                 extractor_sender.docids::<ExactWordDocids>(), | ||||
|                 &indexing_context.must_stop_processing, | ||||
|             )?; | ||||
|         } | ||||
|  | ||||
|         { | ||||
|             let span = | ||||
|                 tracing::trace_span!(target: "indexing::documents::merge", "word_position_docids"); | ||||
|             let _entered = span.enter(); | ||||
|             merge_and_send_docids( | ||||
|                 word_position_docids, | ||||
|                 index.word_position_docids.remap_types(), | ||||
|                 index, | ||||
|                 extractor_sender.docids::<WordPositionDocids>(), | ||||
|                 &indexing_context.must_stop_processing, | ||||
|             )?; | ||||
|         } | ||||
|  | ||||
|         { | ||||
|             let span = | ||||
|                 tracing::trace_span!(target: "indexing::documents::merge", "fid_word_count_docids"); | ||||
|             let _entered = span.enter(); | ||||
|             merge_and_send_docids( | ||||
|                 fid_word_count_docids, | ||||
|                 index.field_id_word_count_docids.remap_types(), | ||||
|                 index, | ||||
|                 extractor_sender.docids::<FidWordCountDocids>(), | ||||
|                 &indexing_context.must_stop_processing, | ||||
|             )?; | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     // run the proximity extraction only if the precision is by word | ||||
|     // this works only if the settings didn't change during this transaction. | ||||
|     let proximity_precision = index.proximity_precision(&rtxn)?.unwrap_or_default(); | ||||
|     if proximity_precision == ProximityPrecision::ByWord { | ||||
|         let caches = { | ||||
|             let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids"); | ||||
|             let _entered = span.enter(); | ||||
|  | ||||
|             <WordPairProximityDocidsExtractor as DocidsExtractor>::run_extraction( | ||||
|                 document_changes, | ||||
|                 indexing_context, | ||||
|                 extractor_allocs, | ||||
|                 IndexingStep::ExtractingWordProximity, | ||||
|             )? | ||||
|         }; | ||||
|  | ||||
|         { | ||||
|             let span = tracing::trace_span!(target: "indexing::documents::merge", "word_pair_proximity_docids"); | ||||
|             let _entered = span.enter(); | ||||
|  | ||||
|             merge_and_send_docids( | ||||
|                 caches, | ||||
|                 index.word_pair_proximity_docids.remap_types(), | ||||
|                 index, | ||||
|                 extractor_sender.docids::<WordPairProximityDocids>(), | ||||
|                 &indexing_context.must_stop_processing, | ||||
|             )?; | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     'vectors: { | ||||
|         if index_embeddings.is_empty() { | ||||
|             break 'vectors; | ||||
|         } | ||||
|  | ||||
|         let embedding_sender = extractor_sender.embeddings(); | ||||
|         let extractor = EmbeddingExtractor::new( | ||||
|             embedders, | ||||
|             embedding_sender, | ||||
|             field_distribution, | ||||
|             request_threads(), | ||||
|         ); | ||||
|         let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); | ||||
|         { | ||||
|             let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors"); | ||||
|             let _entered = span.enter(); | ||||
|  | ||||
|             extract( | ||||
|                 document_changes, | ||||
|                 &extractor, | ||||
|                 indexing_context, | ||||
|                 extractor_allocs, | ||||
|                 &datastore, | ||||
|                 IndexingStep::ExtractingEmbeddings, | ||||
|             )?; | ||||
|         } | ||||
|         { | ||||
|             let span = tracing::trace_span!(target: "indexing::documents::merge", "vectors"); | ||||
|             let _entered = span.enter(); | ||||
|  | ||||
|             for config in &mut index_embeddings { | ||||
|                 'data: for data in datastore.iter_mut() { | ||||
|                     let data = &mut data.get_mut().0; | ||||
|                     let Some(deladd) = data.remove(&config.name) else { | ||||
|                         continue 'data; | ||||
|                     }; | ||||
|                     deladd.apply_to(&mut config.user_provided); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     'geo: { | ||||
|         let Some(extractor) = GeoExtractor::new(&rtxn, index, *indexing_context.grenad_parameters)? | ||||
|         else { | ||||
|             break 'geo; | ||||
|         }; | ||||
|         let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); | ||||
|  | ||||
|         { | ||||
|             let span = tracing::trace_span!(target: "indexing::documents::extract", "geo"); | ||||
|             let _entered = span.enter(); | ||||
|  | ||||
|             extract( | ||||
|                 document_changes, | ||||
|                 &extractor, | ||||
|                 indexing_context, | ||||
|                 extractor_allocs, | ||||
|                 &datastore, | ||||
|                 IndexingStep::WritingGeoPoints, | ||||
|             )?; | ||||
|         } | ||||
|  | ||||
|         merge_and_send_rtree( | ||||
|             datastore, | ||||
|             &rtxn, | ||||
|             index, | ||||
|             extractor_sender.geo(), | ||||
|             &indexing_context.must_stop_processing, | ||||
|         )?; | ||||
|     } | ||||
|     indexing_context.progress.update_progress(IndexingStep::WritingToDatabase); | ||||
|     finished_extraction.store(true, std::sync::atomic::Ordering::Relaxed); | ||||
|  | ||||
|     Result::Ok((facet_field_ids_delta, index_embeddings)) | ||||
| } | ||||
|  | ||||
| fn request_threads() -> &'static ThreadPoolNoAbort { | ||||
|     static REQUEST_THREADS: OnceLock<ThreadPoolNoAbort> = OnceLock::new(); | ||||
|  | ||||
|     REQUEST_THREADS.get_or_init(|| { | ||||
|         ThreadPoolNoAbortBuilder::new() | ||||
|             .num_threads(crate::vector::REQUEST_PARALLELISM) | ||||
|             .thread_name(|index| format!("embedding-request-{index}")) | ||||
|             .build() | ||||
|             .unwrap() | ||||
|     }) | ||||
| } | ||||
							
								
								
									
										85
									
								
								crates/milli/src/update/new/indexer/guess_primary_key.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										85
									
								
								crates/milli/src/update/new/indexer/guess_primary_key.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,85 @@ | ||||
| use bumparaw_collections::RawMap; | ||||
| use heed::RoTxn; | ||||
| use rustc_hash::FxBuildHasher; | ||||
|  | ||||
| use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY}; | ||||
| use crate::update::new::StdResult; | ||||
| use crate::{FieldsIdsMap, Index, Result, UserError}; | ||||
|  | ||||
| /// Returns the primary key that has already been set for this index or the | ||||
| /// one we will guess by searching for the first key that contains "id" as a substring, | ||||
| /// and whether the primary key changed | ||||
| pub fn retrieve_or_guess_primary_key<'a>( | ||||
|     rtxn: &'a RoTxn<'a>, | ||||
|     index: &Index, | ||||
|     new_fields_ids_map: &mut FieldsIdsMap, | ||||
|     primary_key_from_op: Option<&'a str>, | ||||
|     first_document: Option<RawMap<'a, FxBuildHasher>>, | ||||
| ) -> Result<StdResult<(PrimaryKey<'a>, bool), UserError>> { | ||||
|     // make sure that we have a declared primary key, either fetching it from the index or attempting to guess it. | ||||
|  | ||||
|     // do we have an existing declared primary key? | ||||
|     let (primary_key, has_changed) = if let Some(primary_key_from_db) = index.primary_key(rtxn)? { | ||||
|         // did we request a primary key in the operation? | ||||
|         match primary_key_from_op { | ||||
|             // we did, and it is different from the DB one | ||||
|             Some(primary_key_from_op) if primary_key_from_op != primary_key_from_db => { | ||||
|                 return Ok(Err(UserError::PrimaryKeyCannotBeChanged( | ||||
|                     primary_key_from_db.to_string(), | ||||
|                 ))); | ||||
|             } | ||||
|             _ => (primary_key_from_db, false), | ||||
|         } | ||||
|     } else { | ||||
|         // no primary key in the DB => let's set one | ||||
|         // did we request a primary key in the operation? | ||||
|         let primary_key = if let Some(primary_key_from_op) = primary_key_from_op { | ||||
|             // set primary key from operation | ||||
|             primary_key_from_op | ||||
|         } else { | ||||
|             // guess primary key | ||||
|             let first_document = match first_document { | ||||
|                 Some(document) => document, | ||||
|                 // previous indexer when no pk is set + we send an empty payload => index_primary_key_no_candidate_found | ||||
|                 None => return Ok(Err(UserError::NoPrimaryKeyCandidateFound)), | ||||
|             }; | ||||
|  | ||||
|             let guesses: Result<Vec<&str>> = first_document | ||||
|                 .keys() | ||||
|                 .filter_map(|name| { | ||||
|                     let Some(_) = new_fields_ids_map.insert(name) else { | ||||
|                         return Some(Err(UserError::AttributeLimitReached.into())); | ||||
|                     }; | ||||
|                     name.to_lowercase().ends_with(DEFAULT_PRIMARY_KEY).then_some(Ok(name)) | ||||
|                 }) | ||||
|                 .collect(); | ||||
|  | ||||
|             let mut guesses = guesses?; | ||||
|  | ||||
|             // sort the keys in lexicographical order, so that fields are always in the same order. | ||||
|             guesses.sort_unstable(); | ||||
|  | ||||
|             match guesses.as_slice() { | ||||
|                 [] => return Ok(Err(UserError::NoPrimaryKeyCandidateFound)), | ||||
|                 [name] => { | ||||
|                     tracing::info!("Primary key was not specified in index. Inferred to '{name}'"); | ||||
|                     *name | ||||
|                 } | ||||
|                 multiple => { | ||||
|                     return Ok(Err(UserError::MultiplePrimaryKeyCandidatesFound { | ||||
|                         candidates: multiple | ||||
|                             .iter() | ||||
|                             .map(|candidate| candidate.to_string()) | ||||
|                             .collect(), | ||||
|                     })) | ||||
|                 } | ||||
|             } | ||||
|         }; | ||||
|         (primary_key, true) | ||||
|     }; | ||||
|  | ||||
|     match PrimaryKey::new_or_insert(primary_key, new_fields_ids_map) { | ||||
|         Ok(primary_key) => Ok(Ok((primary_key, has_changed))), | ||||
|         Err(err) => Ok(Err(err)), | ||||
|     } | ||||
| } | ||||
| @@ -1,59 +1,37 @@ | ||||
| use std::cmp::Ordering; | ||||
| use std::sync::atomic::AtomicBool; | ||||
| use std::sync::{OnceLock, RwLock}; | ||||
| use std::sync::RwLock; | ||||
| use std::thread::{self, Builder}; | ||||
|  | ||||
| use big_s::S; | ||||
| use bumparaw_collections::RawMap; | ||||
| use document_changes::{extract, DocumentChanges, IndexingContext}; | ||||
| use document_changes::{DocumentChanges, IndexingContext}; | ||||
| pub use document_deletion::DocumentDeletion; | ||||
| pub use document_operation::{DocumentOperation, PayloadStats}; | ||||
| use hashbrown::HashMap; | ||||
| use heed::types::{Bytes, DecodeIgnore, Str}; | ||||
| use heed::{RoTxn, RwTxn}; | ||||
| use itertools::{merge_join_by, EitherOrBoth}; | ||||
| use heed::RwTxn; | ||||
| pub use partial_dump::PartialDump; | ||||
| use rand::SeedableRng as _; | ||||
| use rustc_hash::FxBuildHasher; | ||||
| use time::OffsetDateTime; | ||||
| pub use update_by_function::UpdateByFunction; | ||||
| use write::{build_vectors, update_index, write_to_db}; | ||||
|  | ||||
| use super::channel::*; | ||||
| use super::extract::*; | ||||
| use super::facet_search_builder::FacetSearchBuilder; | ||||
| use super::merger::FacetFieldIdsDelta; | ||||
| use super::steps::IndexingStep; | ||||
| use super::thread_local::ThreadLocal; | ||||
| use super::word_fst_builder::{PrefixData, PrefixDelta, WordFstBuilder}; | ||||
| use super::words_prefix_docids::{ | ||||
|     compute_word_prefix_docids, compute_word_prefix_fid_docids, compute_word_prefix_position_docids, | ||||
| }; | ||||
| use super::StdResult; | ||||
| use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY}; | ||||
| use crate::facet::FacetType; | ||||
| use crate::documents::PrimaryKey; | ||||
| use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder}; | ||||
| use crate::index::main_key::{WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY}; | ||||
| use crate::progress::Progress; | ||||
| use crate::proximity::ProximityPrecision; | ||||
| use crate::update::del_add::DelAdd; | ||||
| use crate::update::new::extract::EmbeddingExtractor; | ||||
| use crate::update::new::merger::merge_and_send_rtree; | ||||
| use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids; | ||||
| use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases}; | ||||
| use crate::update::settings::InnerIndexSettings; | ||||
| use crate::update::{FacetsUpdateBulk, GrenadParameters}; | ||||
| use crate::vector::{ArroyWrapper, EmbeddingConfigs, Embeddings}; | ||||
| use crate::{ | ||||
|     Error, FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort, | ||||
|     ThreadPoolNoAbortBuilder, UserError, | ||||
| }; | ||||
| use crate::update::GrenadParameters; | ||||
| use crate::vector::{ArroyWrapper, EmbeddingConfigs}; | ||||
| use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort}; | ||||
|  | ||||
| pub(crate) mod de; | ||||
| pub mod document_changes; | ||||
| mod document_deletion; | ||||
| mod document_operation; | ||||
| mod extract; | ||||
| mod guess_primary_key; | ||||
| mod partial_dump; | ||||
| mod post_processing; | ||||
| mod update_by_function; | ||||
| mod write; | ||||
|  | ||||
| /// This is the main function of this crate. | ||||
| /// | ||||
| @@ -107,7 +85,7 @@ where | ||||
|         }, | ||||
|     ); | ||||
|  | ||||
|     let (extractor_sender, mut writer_receiver) = pool | ||||
|     let (extractor_sender, writer_receiver) = pool | ||||
|         .install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000)) | ||||
|         .unwrap(); | ||||
|  | ||||
| @@ -126,9 +104,10 @@ where | ||||
|         fields_ids_map_store: &fields_ids_map_store, | ||||
|         must_stop_processing, | ||||
|         progress, | ||||
|         grenad_parameters: &grenad_parameters, | ||||
|     }; | ||||
|  | ||||
|     let mut index_embeddings = index.embedding_configs(wtxn)?; | ||||
|     let index_embeddings = index.embedding_configs(wtxn)?; | ||||
|     let mut field_distribution = index.field_distribution(wtxn)?; | ||||
|     let mut document_ids = index.documents_ids(wtxn)?; | ||||
|  | ||||
| @@ -139,261 +118,28 @@ where | ||||
|         // prevent moving the field_distribution and document_ids in the inner closure... | ||||
|         let field_distribution = &mut field_distribution; | ||||
|         let document_ids = &mut document_ids; | ||||
|         let extractor_handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || { | ||||
|             pool.install(move || { | ||||
|                 let span = tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract"); | ||||
|                 let _entered = span.enter(); | ||||
|  | ||||
|                 let rtxn = index.read_txn()?; | ||||
|  | ||||
|                 // document but we need to create a function that collects and compresses documents. | ||||
|                 let document_sender = extractor_sender.documents(); | ||||
|                 let document_extractor = DocumentsExtractor::new(document_sender, embedders); | ||||
|                 let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); | ||||
|                 { | ||||
|                     let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "documents"); | ||||
|                     let _entered = span.enter(); | ||||
|                     extract( | ||||
|         let extractor_handle = | ||||
|             Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || { | ||||
|                 pool.install(move || { | ||||
|                     extract::extract_all( | ||||
|                         document_changes, | ||||
|                         &document_extractor, | ||||
|                         indexing_context, | ||||
|                         indexer_span, | ||||
|                         extractor_sender, | ||||
|                         embedders, | ||||
|                         &mut extractor_allocs, | ||||
|                         &datastore, | ||||
|                         IndexingStep::ExtractingDocuments, | ||||
|                     )?; | ||||
|                 } | ||||
|                 { | ||||
|                     let span = tracing::trace_span!(target: "indexing::documents::merge", parent: &indexer_span, "documents"); | ||||
|                     let _entered = span.enter(); | ||||
|                     for document_extractor_data in datastore { | ||||
|                         let document_extractor_data = document_extractor_data.0.into_inner(); | ||||
|                         for (field, delta) in document_extractor_data.field_distribution_delta { | ||||
|                             let current = field_distribution.entry(field).or_default(); | ||||
|                             // adding the delta should never cause a negative result, as we are removing fields that previously existed. | ||||
|                             *current = current.saturating_add_signed(delta); | ||||
|                         } | ||||
|                         document_extractor_data.docids_delta.apply_to(document_ids); | ||||
|                     } | ||||
|  | ||||
|                     field_distribution.retain(|_, v| *v != 0); | ||||
|                 } | ||||
|  | ||||
|                 let facet_field_ids_delta; | ||||
|  | ||||
|                 { | ||||
|                     let caches = { | ||||
|                         let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "faceted"); | ||||
|                         let _entered = span.enter(); | ||||
|  | ||||
|                         FacetedDocidsExtractor::run_extraction( | ||||
|                                 grenad_parameters, | ||||
|                                 document_changes, | ||||
|                                 indexing_context, | ||||
|                                 &mut extractor_allocs, | ||||
|                                 &extractor_sender.field_id_docid_facet_sender(), | ||||
|                                 IndexingStep::ExtractingFacets | ||||
|                             )? | ||||
|                     }; | ||||
|  | ||||
|                     { | ||||
|                         let span = tracing::trace_span!(target: "indexing::documents::merge", parent: &indexer_span, "faceted"); | ||||
|                         let _entered = span.enter(); | ||||
|  | ||||
|                         facet_field_ids_delta = merge_and_send_facet_docids( | ||||
|                             caches, | ||||
|                             FacetDatabases::new(index), | ||||
|                             index, | ||||
|                             extractor_sender.facet_docids(), | ||||
|                         )?; | ||||
|                     } | ||||
|                 } | ||||
|  | ||||
|                 { | ||||
|                     let WordDocidsCaches { | ||||
|                         word_docids, | ||||
|                         word_fid_docids, | ||||
|                         exact_word_docids, | ||||
|                         word_position_docids, | ||||
|                         fid_word_count_docids, | ||||
|                     } = { | ||||
|                         let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids"); | ||||
|                         let _entered = span.enter(); | ||||
|  | ||||
|                         WordDocidsExtractors::run_extraction( | ||||
|                             grenad_parameters, | ||||
|                             document_changes, | ||||
|                             indexing_context, | ||||
|                             &mut extractor_allocs, | ||||
|                             IndexingStep::ExtractingWords | ||||
|                         )? | ||||
|                     }; | ||||
|  | ||||
|                     { | ||||
|                         let span = tracing::trace_span!(target: "indexing::documents::merge", "word_docids"); | ||||
|                         let _entered = span.enter(); | ||||
|                         merge_and_send_docids( | ||||
|                             word_docids, | ||||
|                             index.word_docids.remap_types(), | ||||
|                             index, | ||||
|                             extractor_sender.docids::<WordDocids>(), | ||||
|                             &indexing_context.must_stop_processing, | ||||
|                         )?; | ||||
|                     } | ||||
|  | ||||
|                     { | ||||
|                         let span = tracing::trace_span!(target: "indexing::documents::merge", "word_fid_docids"); | ||||
|                         let _entered = span.enter(); | ||||
|                         merge_and_send_docids( | ||||
|                             word_fid_docids, | ||||
|                             index.word_fid_docids.remap_types(), | ||||
|                             index, | ||||
|                             extractor_sender.docids::<WordFidDocids>(), | ||||
|                             &indexing_context.must_stop_processing, | ||||
|                         )?; | ||||
|                     } | ||||
|  | ||||
|                     { | ||||
|                         let span = tracing::trace_span!(target: "indexing::documents::merge", "exact_word_docids"); | ||||
|                         let _entered = span.enter(); | ||||
|                         merge_and_send_docids( | ||||
|                             exact_word_docids, | ||||
|                             index.exact_word_docids.remap_types(), | ||||
|                             index, | ||||
|                             extractor_sender.docids::<ExactWordDocids>(), | ||||
|                             &indexing_context.must_stop_processing, | ||||
|                         )?; | ||||
|                     } | ||||
|  | ||||
|                     { | ||||
|                         let span = tracing::trace_span!(target: "indexing::documents::merge", "word_position_docids"); | ||||
|                         let _entered = span.enter(); | ||||
|                         merge_and_send_docids( | ||||
|                             word_position_docids, | ||||
|                             index.word_position_docids.remap_types(), | ||||
|                             index, | ||||
|                             extractor_sender.docids::<WordPositionDocids>(), | ||||
|                             &indexing_context.must_stop_processing, | ||||
|                         )?; | ||||
|                     } | ||||
|  | ||||
|                     { | ||||
|                         let span = tracing::trace_span!(target: "indexing::documents::merge", "fid_word_count_docids"); | ||||
|                         let _entered = span.enter(); | ||||
|                         merge_and_send_docids( | ||||
|                             fid_word_count_docids, | ||||
|                             index.field_id_word_count_docids.remap_types(), | ||||
|                             index, | ||||
|                             extractor_sender.docids::<FidWordCountDocids>(), | ||||
|                             &indexing_context.must_stop_processing, | ||||
|                         )?; | ||||
|                     } | ||||
|                 } | ||||
|  | ||||
|                 // run the proximity extraction only if the precision is by word | ||||
|                 // this works only if the settings didn't change during this transaction. | ||||
|                 let proximity_precision = index.proximity_precision(&rtxn)?.unwrap_or_default(); | ||||
|                 if proximity_precision == ProximityPrecision::ByWord { | ||||
|                     let caches = { | ||||
|                         let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids"); | ||||
|                         let _entered = span.enter(); | ||||
|  | ||||
|                         <WordPairProximityDocidsExtractor as DocidsExtractor>::run_extraction( | ||||
|                             grenad_parameters, | ||||
|                             document_changes, | ||||
|                             indexing_context, | ||||
|                             &mut extractor_allocs, | ||||
|                             IndexingStep::ExtractingWordProximity, | ||||
|                         )? | ||||
|                     }; | ||||
|  | ||||
|                     { | ||||
|                         let span = tracing::trace_span!(target: "indexing::documents::merge", "word_pair_proximity_docids"); | ||||
|                         let _entered = span.enter(); | ||||
|  | ||||
|                         merge_and_send_docids( | ||||
|                             caches, | ||||
|                             index.word_pair_proximity_docids.remap_types(), | ||||
|                             index, | ||||
|                             extractor_sender.docids::<WordPairProximityDocids>(), | ||||
|                             &indexing_context.must_stop_processing, | ||||
|                         )?; | ||||
|                     } | ||||
|                 } | ||||
|  | ||||
|                 'vectors: { | ||||
|                     if index_embeddings.is_empty() { | ||||
|                         break 'vectors; | ||||
|                     } | ||||
|  | ||||
|                     let embedding_sender = extractor_sender.embeddings(); | ||||
|                     let extractor = EmbeddingExtractor::new(embedders, embedding_sender, field_distribution, request_threads()); | ||||
|                     let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); | ||||
|                     { | ||||
|                         let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors"); | ||||
|                         let _entered = span.enter(); | ||||
|  | ||||
|                         extract( | ||||
|                             document_changes, | ||||
|                             &extractor, | ||||
|                             indexing_context, | ||||
|                             &mut extractor_allocs, | ||||
|                             &datastore, | ||||
|                             IndexingStep::ExtractingEmbeddings, | ||||
|                         )?; | ||||
|                     } | ||||
|                     { | ||||
|                         let span = tracing::trace_span!(target: "indexing::documents::merge", "vectors"); | ||||
|                         let _entered = span.enter(); | ||||
|  | ||||
|                         for config in &mut index_embeddings { | ||||
|                             'data: for data in datastore.iter_mut() { | ||||
|                                 let data = &mut data.get_mut().0; | ||||
|                                 let Some(deladd) = data.remove(&config.name) else { continue 'data; }; | ||||
|                                 deladd.apply_to(&mut config.user_provided); | ||||
|                             } | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|  | ||||
|                 'geo: { | ||||
|                     let Some(extractor) = GeoExtractor::new(&rtxn, index, grenad_parameters)? else { | ||||
|                         break 'geo; | ||||
|                     }; | ||||
|                     let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); | ||||
|  | ||||
|                     { | ||||
|                         let span = tracing::trace_span!(target: "indexing::documents::extract", "geo"); | ||||
|                         let _entered = span.enter(); | ||||
|  | ||||
|                         extract( | ||||
|                             document_changes, | ||||
|                             &extractor, | ||||
|                             indexing_context, | ||||
|                             &mut extractor_allocs, | ||||
|                             &datastore, | ||||
|                             IndexingStep::WritingGeoPoints | ||||
|                         )?; | ||||
|                     } | ||||
|  | ||||
|                     merge_and_send_rtree( | ||||
|                         datastore, | ||||
|                         &rtxn, | ||||
|                         index, | ||||
|                         extractor_sender.geo(), | ||||
|                         &indexing_context.must_stop_processing, | ||||
|                     )?; | ||||
|                 } | ||||
|                 indexing_context.progress.update_progress(IndexingStep::WritingToDatabase); | ||||
|                 finished_extraction.store(true, std::sync::atomic::Ordering::Relaxed); | ||||
|  | ||||
|                 Result::Ok((facet_field_ids_delta, index_embeddings)) | ||||
|             }).unwrap() | ||||
|         })?; | ||||
|                         finished_extraction, | ||||
|                         field_distribution, | ||||
|                         index_embeddings, | ||||
|                         document_ids, | ||||
|                     ) | ||||
|                 }) | ||||
|                 .unwrap() | ||||
|             })?; | ||||
|  | ||||
|         let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map); | ||||
|  | ||||
|         let vector_arroy = index.vector_arroy; | ||||
|         let indexer_span = tracing::Span::current(); | ||||
|         let arroy_writers: Result<HashMap<_, _>> = embedders | ||||
|             .inner_as_ref() | ||||
|             .iter() | ||||
| @@ -415,114 +161,30 @@ where | ||||
|             }) | ||||
|             .collect(); | ||||
|  | ||||
|         // Used by by the ArroySetVector to copy the embedding into an | ||||
|         // aligned memory area, required by arroy to accept a new vector. | ||||
|         let mut aligned_embedding = Vec::new(); | ||||
|         let mut arroy_writers = arroy_writers?; | ||||
|  | ||||
|         { | ||||
|             let span = tracing::trace_span!(target: "indexing::write_db", "all"); | ||||
|             let _entered = span.enter(); | ||||
|  | ||||
|             let span = tracing::trace_span!(target: "indexing::write_db", "post_merge"); | ||||
|             let mut _entered_post_merge = None; | ||||
|  | ||||
|             while let Some(action) = writer_receiver.recv_action() { | ||||
|                 if _entered_post_merge.is_none() | ||||
|                     && finished_extraction.load(std::sync::atomic::Ordering::Relaxed) | ||||
|                 { | ||||
|                     _entered_post_merge = Some(span.enter()); | ||||
|                 } | ||||
|  | ||||
|                 match action { | ||||
|                     ReceiverAction::WakeUp => (), | ||||
|                     ReceiverAction::LargeEntry(LargeEntry { database, key, value }) => { | ||||
|                         let database_name = database.database_name(); | ||||
|                         let database = database.database(index); | ||||
|                         if let Err(error) = database.put(wtxn, &key, &value) { | ||||
|                             return Err(Error::InternalError(InternalError::StorePut { | ||||
|                                 database_name, | ||||
|                                 key: bstr::BString::from(&key[..]), | ||||
|                                 value_length: value.len(), | ||||
|                                 error, | ||||
|                             })); | ||||
|                         } | ||||
|                     } | ||||
|                     ReceiverAction::LargeVectors(large_vectors) => { | ||||
|                         let LargeVectors { docid, embedder_id, .. } = large_vectors; | ||||
|                         let (_, _, writer, dimensions) = | ||||
|                             arroy_writers.get(&embedder_id).expect("requested a missing embedder"); | ||||
|                         let mut embeddings = Embeddings::new(*dimensions); | ||||
|                         for embedding in large_vectors.read_embeddings(*dimensions) { | ||||
|                             embeddings.push(embedding.to_vec()).unwrap(); | ||||
|                         } | ||||
|                         writer.del_items(wtxn, *dimensions, docid)?; | ||||
|                         writer.add_items(wtxn, docid, &embeddings)?; | ||||
|                     } | ||||
|                 } | ||||
|  | ||||
|                 // Every time the is a message in the channel we search | ||||
|                 // for new entries in the BBQueue buffers. | ||||
|                 write_from_bbqueue( | ||||
|                     &mut writer_receiver, | ||||
|                     index, | ||||
|                     wtxn, | ||||
|                     &arroy_writers, | ||||
|                     &mut aligned_embedding, | ||||
|                 )?; | ||||
|             } | ||||
|  | ||||
|             // Once the extractor/writer channel is closed | ||||
|             // we must process the remaining BBQueue messages. | ||||
|             write_from_bbqueue( | ||||
|                 &mut writer_receiver, | ||||
|                 index, | ||||
|                 wtxn, | ||||
|                 &arroy_writers, | ||||
|                 &mut aligned_embedding, | ||||
|             )?; | ||||
|         } | ||||
|         write_to_db(writer_receiver, finished_extraction, index, wtxn, &arroy_writers)?; | ||||
|  | ||||
|         indexing_context.progress.update_progress(IndexingStep::WaitingForExtractors); | ||||
|  | ||||
|         let (facet_field_ids_delta, index_embeddings) = extractor_handle.join().unwrap()?; | ||||
|  | ||||
|         'vectors: { | ||||
|             let span = | ||||
|                 tracing::trace_span!(target: "indexing::vectors", parent: &indexer_span, "build"); | ||||
|             let _entered = span.enter(); | ||||
|         indexing_context.progress.update_progress(IndexingStep::WritingEmbeddingsToDatabase); | ||||
|  | ||||
|             if index_embeddings.is_empty() { | ||||
|                 break 'vectors; | ||||
|             } | ||||
|         build_vectors( | ||||
|             index, | ||||
|             wtxn, | ||||
|             index_embeddings, | ||||
|             &mut arroy_writers, | ||||
|             &indexing_context.must_stop_processing, | ||||
|         )?; | ||||
|  | ||||
|             indexing_context.progress.update_progress(IndexingStep::WritingEmbeddingsToDatabase); | ||||
|             let mut rng = rand::rngs::StdRng::seed_from_u64(42); | ||||
|             for (_index, (_embedder_name, _embedder, writer, dimensions)) in &mut arroy_writers { | ||||
|                 let dimensions = *dimensions; | ||||
|                 writer.build_and_quantize( | ||||
|                     wtxn, | ||||
|                     &mut rng, | ||||
|                     dimensions, | ||||
|                     false, | ||||
|                     &indexing_context.must_stop_processing, | ||||
|                 )?; | ||||
|             } | ||||
|  | ||||
|             index.put_embedding_configs(wtxn, index_embeddings)?; | ||||
|         } | ||||
|  | ||||
|         indexing_context.progress.update_progress(IndexingStep::PostProcessingFacets); | ||||
|         if index.facet_search(wtxn)? { | ||||
|             compute_facet_search_database(index, wtxn, global_fields_ids_map)?; | ||||
|         } | ||||
|  | ||||
|         compute_facet_level_database(index, wtxn, facet_field_ids_delta)?; | ||||
|  | ||||
|         indexing_context.progress.update_progress(IndexingStep::PostProcessingWords); | ||||
|         if let Some(prefix_delta) = compute_word_fst(index, wtxn)? { | ||||
|             compute_prefix_database(index, wtxn, prefix_delta, grenad_parameters)?; | ||||
|         } | ||||
|         post_processing::post_process( | ||||
|             indexing_context, | ||||
|             wtxn, | ||||
|             global_fields_ids_map, | ||||
|             facet_field_ids_delta, | ||||
|         )?; | ||||
|  | ||||
|         indexing_context.progress.update_progress(IndexingStep::Finalizing); | ||||
|  | ||||
| @@ -533,321 +195,15 @@ where | ||||
|     drop(fields_ids_map_store); | ||||
|  | ||||
|     let new_fields_ids_map = new_fields_ids_map.into_inner().unwrap(); | ||||
|     index.put_fields_ids_map(wtxn, new_fields_ids_map.as_fields_ids_map())?; | ||||
|  | ||||
|     if let Some(new_primary_key) = new_primary_key { | ||||
|         index.put_primary_key(wtxn, new_primary_key.name())?; | ||||
|     } | ||||
|  | ||||
|     // used to update the localized and weighted maps while sharing the update code with the settings pipeline. | ||||
|     let mut inner_index_settings = InnerIndexSettings::from_index(index, wtxn, Some(embedders))?; | ||||
|     inner_index_settings.recompute_facets(wtxn, index)?; | ||||
|     inner_index_settings.recompute_searchables(wtxn, index)?; | ||||
|     index.put_field_distribution(wtxn, &field_distribution)?; | ||||
|     index.put_documents_ids(wtxn, &document_ids)?; | ||||
|     index.set_updated_at(wtxn, &OffsetDateTime::now_utc())?; | ||||
|     update_index( | ||||
|         index, | ||||
|         wtxn, | ||||
|         new_fields_ids_map, | ||||
|         new_primary_key, | ||||
|         embedders, | ||||
|         field_distribution, | ||||
|         document_ids, | ||||
|     )?; | ||||
|  | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| /// A function dedicated to manage all the available BBQueue frames. | ||||
| /// | ||||
| /// It reads all the available frames, do the corresponding database operations | ||||
| /// and stops when no frame are available. | ||||
| fn write_from_bbqueue( | ||||
|     writer_receiver: &mut WriterBbqueueReceiver<'_>, | ||||
|     index: &Index, | ||||
|     wtxn: &mut RwTxn<'_>, | ||||
|     arroy_writers: &HashMap<u8, (&str, &crate::vector::Embedder, ArroyWrapper, usize)>, | ||||
|     aligned_embedding: &mut Vec<f32>, | ||||
| ) -> crate::Result<()> { | ||||
|     while let Some(frame_with_header) = writer_receiver.recv_frame() { | ||||
|         match frame_with_header.header() { | ||||
|             EntryHeader::DbOperation(operation) => { | ||||
|                 let database_name = operation.database.database_name(); | ||||
|                 let database = operation.database.database(index); | ||||
|                 let frame = frame_with_header.frame(); | ||||
|                 match operation.key_value(frame) { | ||||
|                     (key, Some(value)) => { | ||||
|                         if let Err(error) = database.put(wtxn, key, value) { | ||||
|                             return Err(Error::InternalError(InternalError::StorePut { | ||||
|                                 database_name, | ||||
|                                 key: key.into(), | ||||
|                                 value_length: value.len(), | ||||
|                                 error, | ||||
|                             })); | ||||
|                         } | ||||
|                     } | ||||
|                     (key, None) => match database.delete(wtxn, key) { | ||||
|                         Ok(false) => { | ||||
|                             unreachable!("We tried to delete an unknown key: {key:?}") | ||||
|                         } | ||||
|                         Ok(_) => (), | ||||
|                         Err(error) => { | ||||
|                             return Err(Error::InternalError(InternalError::StoreDeletion { | ||||
|                                 database_name, | ||||
|                                 key: key.into(), | ||||
|                                 error, | ||||
|                             })); | ||||
|                         } | ||||
|                     }, | ||||
|                 } | ||||
|             } | ||||
|             EntryHeader::ArroyDeleteVector(ArroyDeleteVector { docid }) => { | ||||
|                 for (_index, (_name, _embedder, writer, dimensions)) in arroy_writers { | ||||
|                     let dimensions = *dimensions; | ||||
|                     writer.del_items(wtxn, dimensions, docid)?; | ||||
|                 } | ||||
|             } | ||||
|             EntryHeader::ArroySetVectors(asvs) => { | ||||
|                 let ArroySetVectors { docid, embedder_id, .. } = asvs; | ||||
|                 let frame = frame_with_header.frame(); | ||||
|                 let (_, _, writer, dimensions) = | ||||
|                     arroy_writers.get(&embedder_id).expect("requested a missing embedder"); | ||||
|                 let mut embeddings = Embeddings::new(*dimensions); | ||||
|                 let all_embeddings = asvs.read_all_embeddings_into_vec(frame, aligned_embedding); | ||||
|                 embeddings.append(all_embeddings.to_vec()).unwrap(); | ||||
|                 writer.del_items(wtxn, *dimensions, docid)?; | ||||
|                 writer.add_items(wtxn, docid, &embeddings)?; | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| #[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")] | ||||
| fn compute_prefix_database( | ||||
|     index: &Index, | ||||
|     wtxn: &mut RwTxn, | ||||
|     prefix_delta: PrefixDelta, | ||||
|     grenad_parameters: GrenadParameters, | ||||
| ) -> Result<()> { | ||||
|     let PrefixDelta { modified, deleted } = prefix_delta; | ||||
|     // Compute word prefix docids | ||||
|     compute_word_prefix_docids(wtxn, index, &modified, &deleted, grenad_parameters)?; | ||||
|     // Compute exact word prefix docids | ||||
|     compute_exact_word_prefix_docids(wtxn, index, &modified, &deleted, grenad_parameters)?; | ||||
|     // Compute word prefix fid docids | ||||
|     compute_word_prefix_fid_docids(wtxn, index, &modified, &deleted, grenad_parameters)?; | ||||
|     // Compute word prefix position docids | ||||
|     compute_word_prefix_position_docids(wtxn, index, &modified, &deleted, grenad_parameters) | ||||
| } | ||||
|  | ||||
| #[tracing::instrument(level = "trace", skip_all, target = "indexing")] | ||||
| fn compute_word_fst(index: &Index, wtxn: &mut RwTxn) -> Result<Option<PrefixDelta>> { | ||||
|     let rtxn = index.read_txn()?; | ||||
|     let words_fst = index.words_fst(&rtxn)?; | ||||
|     let mut word_fst_builder = WordFstBuilder::new(&words_fst)?; | ||||
|     let prefix_settings = index.prefix_settings(&rtxn)?; | ||||
|     word_fst_builder.with_prefix_settings(prefix_settings); | ||||
|  | ||||
|     let previous_words = index.word_docids.iter(&rtxn)?.remap_data_type::<Bytes>(); | ||||
|     let current_words = index.word_docids.iter(wtxn)?.remap_data_type::<Bytes>(); | ||||
|     for eob in merge_join_by(previous_words, current_words, |lhs, rhs| match (lhs, rhs) { | ||||
|         (Ok((l, _)), Ok((r, _))) => l.cmp(r), | ||||
|         (Err(_), _) | (_, Err(_)) => Ordering::Equal, | ||||
|     }) { | ||||
|         match eob { | ||||
|             EitherOrBoth::Both(lhs, rhs) => { | ||||
|                 let (word, lhs_bytes) = lhs?; | ||||
|                 let (_, rhs_bytes) = rhs?; | ||||
|                 if lhs_bytes != rhs_bytes { | ||||
|                     word_fst_builder.register_word(DelAdd::Addition, word.as_ref())?; | ||||
|                 } | ||||
|             } | ||||
|             EitherOrBoth::Left(result) => { | ||||
|                 let (word, _) = result?; | ||||
|                 word_fst_builder.register_word(DelAdd::Deletion, word.as_ref())?; | ||||
|             } | ||||
|             EitherOrBoth::Right(result) => { | ||||
|                 let (word, _) = result?; | ||||
|                 word_fst_builder.register_word(DelAdd::Addition, word.as_ref())?; | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     let (word_fst_mmap, prefix_data) = word_fst_builder.build(index, &rtxn)?; | ||||
|     index.main.remap_types::<Str, Bytes>().put(wtxn, WORDS_FST_KEY, &word_fst_mmap)?; | ||||
|     if let Some(PrefixData { prefixes_fst_mmap, prefix_delta }) = prefix_data { | ||||
|         index.main.remap_types::<Str, Bytes>().put( | ||||
|             wtxn, | ||||
|             WORDS_PREFIXES_FST_KEY, | ||||
|             &prefixes_fst_mmap, | ||||
|         )?; | ||||
|         Ok(Some(prefix_delta)) | ||||
|     } else { | ||||
|         Ok(None) | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_search")] | ||||
| fn compute_facet_search_database( | ||||
|     index: &Index, | ||||
|     wtxn: &mut RwTxn, | ||||
|     global_fields_ids_map: GlobalFieldsIdsMap, | ||||
| ) -> Result<()> { | ||||
|     let rtxn = index.read_txn()?; | ||||
|     let localized_attributes_rules = index.localized_attributes_rules(&rtxn)?; | ||||
|     let mut facet_search_builder = FacetSearchBuilder::new( | ||||
|         global_fields_ids_map, | ||||
|         localized_attributes_rules.unwrap_or_default(), | ||||
|     ); | ||||
|  | ||||
|     let previous_facet_id_string_docids = index | ||||
|         .facet_id_string_docids | ||||
|         .iter(&rtxn)? | ||||
|         .remap_data_type::<DecodeIgnore>() | ||||
|         .filter(|r| r.as_ref().map_or(true, |(k, _)| k.level == 0)); | ||||
|     let current_facet_id_string_docids = index | ||||
|         .facet_id_string_docids | ||||
|         .iter(wtxn)? | ||||
|         .remap_data_type::<DecodeIgnore>() | ||||
|         .filter(|r| r.as_ref().map_or(true, |(k, _)| k.level == 0)); | ||||
|     for eob in merge_join_by( | ||||
|         previous_facet_id_string_docids, | ||||
|         current_facet_id_string_docids, | ||||
|         |lhs, rhs| match (lhs, rhs) { | ||||
|             (Ok((l, _)), Ok((r, _))) => l.cmp(r), | ||||
|             (Err(_), _) | (_, Err(_)) => Ordering::Equal, | ||||
|         }, | ||||
|     ) { | ||||
|         match eob { | ||||
|             EitherOrBoth::Both(lhs, rhs) => { | ||||
|                 let (_, _) = lhs?; | ||||
|                 let (_, _) = rhs?; | ||||
|             } | ||||
|             EitherOrBoth::Left(result) => { | ||||
|                 let (key, _) = result?; | ||||
|                 facet_search_builder.register_from_key(DelAdd::Deletion, key)?; | ||||
|             } | ||||
|             EitherOrBoth::Right(result) => { | ||||
|                 let (key, _) = result?; | ||||
|                 facet_search_builder.register_from_key(DelAdd::Addition, key)?; | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     facet_search_builder.merge_and_write(index, wtxn, &rtxn) | ||||
| } | ||||
|  | ||||
| #[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_field_ids")] | ||||
| fn compute_facet_level_database( | ||||
|     index: &Index, | ||||
|     wtxn: &mut RwTxn, | ||||
|     facet_field_ids_delta: FacetFieldIdsDelta, | ||||
| ) -> Result<()> { | ||||
|     if let Some(modified_facet_string_ids) = facet_field_ids_delta.modified_facet_string_ids() { | ||||
|         let span = tracing::trace_span!(target: "indexing::facet_field_ids", "string"); | ||||
|         let _entered = span.enter(); | ||||
|         FacetsUpdateBulk::new_not_updating_level_0( | ||||
|             index, | ||||
|             modified_facet_string_ids, | ||||
|             FacetType::String, | ||||
|         ) | ||||
|         .execute(wtxn)?; | ||||
|     } | ||||
|     if let Some(modified_facet_number_ids) = facet_field_ids_delta.modified_facet_number_ids() { | ||||
|         let span = tracing::trace_span!(target: "indexing::facet_field_ids", "number"); | ||||
|         let _entered = span.enter(); | ||||
|         FacetsUpdateBulk::new_not_updating_level_0( | ||||
|             index, | ||||
|             modified_facet_number_ids, | ||||
|             FacetType::Number, | ||||
|         ) | ||||
|         .execute(wtxn)?; | ||||
|     } | ||||
|  | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| /// Returns the primary key that has already been set for this index or the | ||||
| /// one we will guess by searching for the first key that contains "id" as a substring, | ||||
| /// and whether the primary key changed | ||||
| /// TODO move this elsewhere | ||||
| pub fn retrieve_or_guess_primary_key<'a>( | ||||
|     rtxn: &'a RoTxn<'a>, | ||||
|     index: &Index, | ||||
|     new_fields_ids_map: &mut FieldsIdsMap, | ||||
|     primary_key_from_op: Option<&'a str>, | ||||
|     first_document: Option<RawMap<'a, FxBuildHasher>>, | ||||
| ) -> Result<StdResult<(PrimaryKey<'a>, bool), UserError>> { | ||||
|     // make sure that we have a declared primary key, either fetching it from the index or attempting to guess it. | ||||
|  | ||||
|     // do we have an existing declared primary key? | ||||
|     let (primary_key, has_changed) = if let Some(primary_key_from_db) = index.primary_key(rtxn)? { | ||||
|         // did we request a primary key in the operation? | ||||
|         match primary_key_from_op { | ||||
|             // we did, and it is different from the DB one | ||||
|             Some(primary_key_from_op) if primary_key_from_op != primary_key_from_db => { | ||||
|                 return Ok(Err(UserError::PrimaryKeyCannotBeChanged( | ||||
|                     primary_key_from_db.to_string(), | ||||
|                 ))); | ||||
|             } | ||||
|             _ => (primary_key_from_db, false), | ||||
|         } | ||||
|     } else { | ||||
|         // no primary key in the DB => let's set one | ||||
|         // did we request a primary key in the operation? | ||||
|         let primary_key = if let Some(primary_key_from_op) = primary_key_from_op { | ||||
|             // set primary key from operation | ||||
|             primary_key_from_op | ||||
|         } else { | ||||
|             // guess primary key | ||||
|             let first_document = match first_document { | ||||
|                 Some(document) => document, | ||||
|                 // previous indexer when no pk is set + we send an empty payload => index_primary_key_no_candidate_found | ||||
|                 None => return Ok(Err(UserError::NoPrimaryKeyCandidateFound)), | ||||
|             }; | ||||
|  | ||||
|             let guesses: Result<Vec<&str>> = first_document | ||||
|                 .keys() | ||||
|                 .filter_map(|name| { | ||||
|                     let Some(_) = new_fields_ids_map.insert(name) else { | ||||
|                         return Some(Err(UserError::AttributeLimitReached.into())); | ||||
|                     }; | ||||
|                     name.to_lowercase().ends_with(DEFAULT_PRIMARY_KEY).then_some(Ok(name)) | ||||
|                 }) | ||||
|                 .collect(); | ||||
|  | ||||
|             let mut guesses = guesses?; | ||||
|  | ||||
|             // sort the keys in lexicographical order, so that fields are always in the same order. | ||||
|             guesses.sort_unstable(); | ||||
|  | ||||
|             match guesses.as_slice() { | ||||
|                 [] => return Ok(Err(UserError::NoPrimaryKeyCandidateFound)), | ||||
|                 [name] => { | ||||
|                     tracing::info!("Primary key was not specified in index. Inferred to '{name}'"); | ||||
|                     *name | ||||
|                 } | ||||
|                 multiple => { | ||||
|                     return Ok(Err(UserError::MultiplePrimaryKeyCandidatesFound { | ||||
|                         candidates: multiple | ||||
|                             .iter() | ||||
|                             .map(|candidate| candidate.to_string()) | ||||
|                             .collect(), | ||||
|                     })) | ||||
|                 } | ||||
|             } | ||||
|         }; | ||||
|         (primary_key, true) | ||||
|     }; | ||||
|  | ||||
|     match PrimaryKey::new_or_insert(primary_key, new_fields_ids_map) { | ||||
|         Ok(primary_key) => Ok(Ok((primary_key, has_changed))), | ||||
|         Err(err) => Ok(Err(err)), | ||||
|     } | ||||
| } | ||||
|  | ||||
| fn request_threads() -> &'static ThreadPoolNoAbort { | ||||
|     static REQUEST_THREADS: OnceLock<ThreadPoolNoAbort> = OnceLock::new(); | ||||
|  | ||||
|     REQUEST_THREADS.get_or_init(|| { | ||||
|         ThreadPoolNoAbortBuilder::new() | ||||
|             .num_threads(crate::vector::REQUEST_PARALLELISM) | ||||
|             .thread_name(|index| format!("embedding-request-{index}")) | ||||
|             .build() | ||||
|             .unwrap() | ||||
|     }) | ||||
| } | ||||
|   | ||||
							
								
								
									
										187
									
								
								crates/milli/src/update/new/indexer/post_processing.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										187
									
								
								crates/milli/src/update/new/indexer/post_processing.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,187 @@ | ||||
| use std::cmp::Ordering; | ||||
|  | ||||
| use heed::types::{Bytes, DecodeIgnore, Str}; | ||||
| use heed::RwTxn; | ||||
| use itertools::{merge_join_by, EitherOrBoth}; | ||||
|  | ||||
| use super::document_changes::IndexingContext; | ||||
| use crate::facet::FacetType; | ||||
| use crate::index::main_key::{WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY}; | ||||
| use crate::update::del_add::DelAdd; | ||||
| use crate::update::new::facet_search_builder::FacetSearchBuilder; | ||||
| use crate::update::new::steps::IndexingStep; | ||||
| use crate::update::new::word_fst_builder::{PrefixData, PrefixDelta, WordFstBuilder}; | ||||
| use crate::update::new::words_prefix_docids::{ | ||||
|     compute_exact_word_prefix_docids, compute_word_prefix_docids, compute_word_prefix_fid_docids, | ||||
|     compute_word_prefix_position_docids, | ||||
| }; | ||||
| use crate::update::new::FacetFieldIdsDelta; | ||||
| use crate::update::{FacetsUpdateBulk, GrenadParameters}; | ||||
| use crate::{GlobalFieldsIdsMap, Index, Result}; | ||||
|  | ||||
| pub(super) fn post_process<MSP>( | ||||
|     indexing_context: IndexingContext<MSP>, | ||||
|     wtxn: &mut RwTxn<'_>, | ||||
|     global_fields_ids_map: GlobalFieldsIdsMap<'_>, | ||||
|     facet_field_ids_delta: FacetFieldIdsDelta, | ||||
| ) -> Result<()> | ||||
| where | ||||
|     MSP: Fn() -> bool + Sync, | ||||
| { | ||||
|     let index = indexing_context.index; | ||||
|     indexing_context.progress.update_progress(IndexingStep::PostProcessingFacets); | ||||
|     if index.facet_search(wtxn)? { | ||||
|         compute_facet_search_database(index, wtxn, global_fields_ids_map)?; | ||||
|     } | ||||
|     compute_facet_level_database(index, wtxn, facet_field_ids_delta)?; | ||||
|     indexing_context.progress.update_progress(IndexingStep::PostProcessingWords); | ||||
|     if let Some(prefix_delta) = compute_word_fst(index, wtxn)? { | ||||
|         compute_prefix_database(index, wtxn, prefix_delta, indexing_context.grenad_parameters)?; | ||||
|     }; | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| #[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")] | ||||
| fn compute_prefix_database( | ||||
|     index: &Index, | ||||
|     wtxn: &mut RwTxn, | ||||
|     prefix_delta: PrefixDelta, | ||||
|     grenad_parameters: &GrenadParameters, | ||||
| ) -> Result<()> { | ||||
|     let PrefixDelta { modified, deleted } = prefix_delta; | ||||
|     // Compute word prefix docids | ||||
|     compute_word_prefix_docids(wtxn, index, &modified, &deleted, grenad_parameters)?; | ||||
|     // Compute exact word prefix docids | ||||
|     compute_exact_word_prefix_docids(wtxn, index, &modified, &deleted, grenad_parameters)?; | ||||
|     // Compute word prefix fid docids | ||||
|     compute_word_prefix_fid_docids(wtxn, index, &modified, &deleted, grenad_parameters)?; | ||||
|     // Compute word prefix position docids | ||||
|     compute_word_prefix_position_docids(wtxn, index, &modified, &deleted, grenad_parameters) | ||||
| } | ||||
|  | ||||
| #[tracing::instrument(level = "trace", skip_all, target = "indexing")] | ||||
| fn compute_word_fst(index: &Index, wtxn: &mut RwTxn) -> Result<Option<PrefixDelta>> { | ||||
|     let rtxn = index.read_txn()?; | ||||
|     let words_fst = index.words_fst(&rtxn)?; | ||||
|     let mut word_fst_builder = WordFstBuilder::new(&words_fst)?; | ||||
|     let prefix_settings = index.prefix_settings(&rtxn)?; | ||||
|     word_fst_builder.with_prefix_settings(prefix_settings); | ||||
|  | ||||
|     let previous_words = index.word_docids.iter(&rtxn)?.remap_data_type::<Bytes>(); | ||||
|     let current_words = index.word_docids.iter(wtxn)?.remap_data_type::<Bytes>(); | ||||
|     for eob in merge_join_by(previous_words, current_words, |lhs, rhs| match (lhs, rhs) { | ||||
|         (Ok((l, _)), Ok((r, _))) => l.cmp(r), | ||||
|         (Err(_), _) | (_, Err(_)) => Ordering::Equal, | ||||
|     }) { | ||||
|         match eob { | ||||
|             EitherOrBoth::Both(lhs, rhs) => { | ||||
|                 let (word, lhs_bytes) = lhs?; | ||||
|                 let (_, rhs_bytes) = rhs?; | ||||
|                 if lhs_bytes != rhs_bytes { | ||||
|                     word_fst_builder.register_word(DelAdd::Addition, word.as_ref())?; | ||||
|                 } | ||||
|             } | ||||
|             EitherOrBoth::Left(result) => { | ||||
|                 let (word, _) = result?; | ||||
|                 word_fst_builder.register_word(DelAdd::Deletion, word.as_ref())?; | ||||
|             } | ||||
|             EitherOrBoth::Right(result) => { | ||||
|                 let (word, _) = result?; | ||||
|                 word_fst_builder.register_word(DelAdd::Addition, word.as_ref())?; | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     let (word_fst_mmap, prefix_data) = word_fst_builder.build(index, &rtxn)?; | ||||
|     index.main.remap_types::<Str, Bytes>().put(wtxn, WORDS_FST_KEY, &word_fst_mmap)?; | ||||
|     if let Some(PrefixData { prefixes_fst_mmap, prefix_delta }) = prefix_data { | ||||
|         index.main.remap_types::<Str, Bytes>().put( | ||||
|             wtxn, | ||||
|             WORDS_PREFIXES_FST_KEY, | ||||
|             &prefixes_fst_mmap, | ||||
|         )?; | ||||
|         Ok(Some(prefix_delta)) | ||||
|     } else { | ||||
|         Ok(None) | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_search")] | ||||
| fn compute_facet_search_database( | ||||
|     index: &Index, | ||||
|     wtxn: &mut RwTxn, | ||||
|     global_fields_ids_map: GlobalFieldsIdsMap, | ||||
| ) -> Result<()> { | ||||
|     let rtxn = index.read_txn()?; | ||||
|     let localized_attributes_rules = index.localized_attributes_rules(&rtxn)?; | ||||
|     let mut facet_search_builder = FacetSearchBuilder::new( | ||||
|         global_fields_ids_map, | ||||
|         localized_attributes_rules.unwrap_or_default(), | ||||
|     ); | ||||
|  | ||||
|     let previous_facet_id_string_docids = index | ||||
|         .facet_id_string_docids | ||||
|         .iter(&rtxn)? | ||||
|         .remap_data_type::<DecodeIgnore>() | ||||
|         .filter(|r| r.as_ref().map_or(true, |(k, _)| k.level == 0)); | ||||
|     let current_facet_id_string_docids = index | ||||
|         .facet_id_string_docids | ||||
|         .iter(wtxn)? | ||||
|         .remap_data_type::<DecodeIgnore>() | ||||
|         .filter(|r| r.as_ref().map_or(true, |(k, _)| k.level == 0)); | ||||
|     for eob in merge_join_by( | ||||
|         previous_facet_id_string_docids, | ||||
|         current_facet_id_string_docids, | ||||
|         |lhs, rhs| match (lhs, rhs) { | ||||
|             (Ok((l, _)), Ok((r, _))) => l.cmp(r), | ||||
|             (Err(_), _) | (_, Err(_)) => Ordering::Equal, | ||||
|         }, | ||||
|     ) { | ||||
|         match eob { | ||||
|             EitherOrBoth::Both(lhs, rhs) => { | ||||
|                 let (_, _) = lhs?; | ||||
|                 let (_, _) = rhs?; | ||||
|             } | ||||
|             EitherOrBoth::Left(result) => { | ||||
|                 let (key, _) = result?; | ||||
|                 facet_search_builder.register_from_key(DelAdd::Deletion, key)?; | ||||
|             } | ||||
|             EitherOrBoth::Right(result) => { | ||||
|                 let (key, _) = result?; | ||||
|                 facet_search_builder.register_from_key(DelAdd::Addition, key)?; | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     facet_search_builder.merge_and_write(index, wtxn, &rtxn) | ||||
| } | ||||
|  | ||||
| #[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_field_ids")] | ||||
| fn compute_facet_level_database( | ||||
|     index: &Index, | ||||
|     wtxn: &mut RwTxn, | ||||
|     facet_field_ids_delta: FacetFieldIdsDelta, | ||||
| ) -> Result<()> { | ||||
|     if let Some(modified_facet_string_ids) = facet_field_ids_delta.modified_facet_string_ids() { | ||||
|         let span = tracing::trace_span!(target: "indexing::facet_field_ids", "string"); | ||||
|         let _entered = span.enter(); | ||||
|         FacetsUpdateBulk::new_not_updating_level_0( | ||||
|             index, | ||||
|             modified_facet_string_ids, | ||||
|             FacetType::String, | ||||
|         ) | ||||
|         .execute(wtxn)?; | ||||
|     } | ||||
|     if let Some(modified_facet_number_ids) = facet_field_ids_delta.modified_facet_number_ids() { | ||||
|         let span = tracing::trace_span!(target: "indexing::facet_field_ids", "number"); | ||||
|         let _entered = span.enter(); | ||||
|         FacetsUpdateBulk::new_not_updating_level_0( | ||||
|             index, | ||||
|             modified_facet_number_ids, | ||||
|             FacetType::Number, | ||||
|         ) | ||||
|         .execute(wtxn)?; | ||||
|     } | ||||
|  | ||||
|     Ok(()) | ||||
| } | ||||
							
								
								
									
										189
									
								
								crates/milli/src/update/new/indexer/write.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										189
									
								
								crates/milli/src/update/new/indexer/write.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,189 @@ | ||||
| use std::sync::atomic::AtomicBool; | ||||
|  | ||||
| use hashbrown::HashMap; | ||||
| use heed::RwTxn; | ||||
| use rand::SeedableRng as _; | ||||
| use time::OffsetDateTime; | ||||
|  | ||||
| use super::super::channel::*; | ||||
| use crate::documents::PrimaryKey; | ||||
| use crate::fields_ids_map::metadata::FieldIdMapWithMetadata; | ||||
| use crate::index::IndexEmbeddingConfig; | ||||
| use crate::update::settings::InnerIndexSettings; | ||||
| use crate::vector::{ArroyWrapper, Embedder, EmbeddingConfigs, Embeddings}; | ||||
| use crate::{Error, Index, InternalError, Result}; | ||||
|  | ||||
| pub(super) fn write_to_db( | ||||
|     mut writer_receiver: WriterBbqueueReceiver<'_>, | ||||
|     finished_extraction: &AtomicBool, | ||||
|     index: &Index, | ||||
|     wtxn: &mut RwTxn<'_>, | ||||
|     arroy_writers: &HashMap<u8, (&str, &Embedder, ArroyWrapper, usize)>, | ||||
| ) -> Result<()> { | ||||
|     // Used by by the ArroySetVector to copy the embedding into an | ||||
|     // aligned memory area, required by arroy to accept a new vector. | ||||
|     let mut aligned_embedding = Vec::new(); | ||||
|     let span = tracing::trace_span!(target: "indexing::write_db", "all"); | ||||
|     let _entered = span.enter(); | ||||
|     let span = tracing::trace_span!(target: "indexing::write_db", "post_merge"); | ||||
|     let mut _entered_post_merge = None; | ||||
|     while let Some(action) = writer_receiver.recv_action() { | ||||
|         if _entered_post_merge.is_none() | ||||
|             && finished_extraction.load(std::sync::atomic::Ordering::Relaxed) | ||||
|         { | ||||
|             _entered_post_merge = Some(span.enter()); | ||||
|         } | ||||
|  | ||||
|         match action { | ||||
|             ReceiverAction::WakeUp => (), | ||||
|             ReceiverAction::LargeEntry(LargeEntry { database, key, value }) => { | ||||
|                 let database_name = database.database_name(); | ||||
|                 let database = database.database(index); | ||||
|                 if let Err(error) = database.put(wtxn, &key, &value) { | ||||
|                     return Err(Error::InternalError(InternalError::StorePut { | ||||
|                         database_name, | ||||
|                         key: bstr::BString::from(&key[..]), | ||||
|                         value_length: value.len(), | ||||
|                         error, | ||||
|                     })); | ||||
|                 } | ||||
|             } | ||||
|             ReceiverAction::LargeVectors(large_vectors) => { | ||||
|                 let LargeVectors { docid, embedder_id, .. } = large_vectors; | ||||
|                 let (_, _, writer, dimensions) = | ||||
|                     arroy_writers.get(&embedder_id).expect("requested a missing embedder"); | ||||
|                 let mut embeddings = Embeddings::new(*dimensions); | ||||
|                 for embedding in large_vectors.read_embeddings(*dimensions) { | ||||
|                     embeddings.push(embedding.to_vec()).unwrap(); | ||||
|                 } | ||||
|                 writer.del_items(wtxn, *dimensions, docid)?; | ||||
|                 writer.add_items(wtxn, docid, &embeddings)?; | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         // Every time the is a message in the channel we search | ||||
|         // for new entries in the BBQueue buffers. | ||||
|         write_from_bbqueue( | ||||
|             &mut writer_receiver, | ||||
|             index, | ||||
|             wtxn, | ||||
|             arroy_writers, | ||||
|             &mut aligned_embedding, | ||||
|         )?; | ||||
|     } | ||||
|     write_from_bbqueue(&mut writer_receiver, index, wtxn, arroy_writers, &mut aligned_embedding)?; | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| #[tracing::instrument(level = "trace", skip_all, target = "indexing::vectors")] | ||||
| pub(super) fn build_vectors<MSP>( | ||||
|     index: &Index, | ||||
|     wtxn: &mut RwTxn<'_>, | ||||
|     index_embeddings: Vec<IndexEmbeddingConfig>, | ||||
|     arroy_writers: &mut HashMap<u8, (&str, &Embedder, ArroyWrapper, usize)>, | ||||
|     must_stop_processing: &MSP, | ||||
| ) -> Result<()> | ||||
| where | ||||
|     MSP: Fn() -> bool + Sync + Send, | ||||
| { | ||||
|     if index_embeddings.is_empty() { | ||||
|         return Ok(()); | ||||
|     } | ||||
|  | ||||
|     let mut rng = rand::rngs::StdRng::seed_from_u64(42); | ||||
|     for (_index, (_embedder_name, _embedder, writer, dimensions)) in arroy_writers { | ||||
|         let dimensions = *dimensions; | ||||
|         writer.build_and_quantize(wtxn, &mut rng, dimensions, false, must_stop_processing)?; | ||||
|     } | ||||
|  | ||||
|     index.put_embedding_configs(wtxn, index_embeddings)?; | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| pub(super) fn update_index( | ||||
|     index: &Index, | ||||
|     wtxn: &mut RwTxn<'_>, | ||||
|     new_fields_ids_map: FieldIdMapWithMetadata, | ||||
|     new_primary_key: Option<PrimaryKey<'_>>, | ||||
|     embedders: EmbeddingConfigs, | ||||
|     field_distribution: std::collections::BTreeMap<String, u64>, | ||||
|     document_ids: roaring::RoaringBitmap, | ||||
| ) -> Result<()> { | ||||
|     index.put_fields_ids_map(wtxn, new_fields_ids_map.as_fields_ids_map())?; | ||||
|     if let Some(new_primary_key) = new_primary_key { | ||||
|         index.put_primary_key(wtxn, new_primary_key.name())?; | ||||
|     } | ||||
|     let mut inner_index_settings = InnerIndexSettings::from_index(index, wtxn, Some(embedders))?; | ||||
|     inner_index_settings.recompute_facets(wtxn, index)?; | ||||
|     inner_index_settings.recompute_searchables(wtxn, index)?; | ||||
|     index.put_field_distribution(wtxn, &field_distribution)?; | ||||
|     index.put_documents_ids(wtxn, &document_ids)?; | ||||
|     index.set_updated_at(wtxn, &OffsetDateTime::now_utc())?; | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| /// A function dedicated to manage all the available BBQueue frames. | ||||
| /// | ||||
| /// It reads all the available frames, do the corresponding database operations | ||||
| /// and stops when no frame are available. | ||||
| pub fn write_from_bbqueue( | ||||
|     writer_receiver: &mut WriterBbqueueReceiver<'_>, | ||||
|     index: &Index, | ||||
|     wtxn: &mut RwTxn<'_>, | ||||
|     arroy_writers: &HashMap<u8, (&str, &crate::vector::Embedder, ArroyWrapper, usize)>, | ||||
|     aligned_embedding: &mut Vec<f32>, | ||||
| ) -> crate::Result<()> { | ||||
|     while let Some(frame_with_header) = writer_receiver.recv_frame() { | ||||
|         match frame_with_header.header() { | ||||
|             EntryHeader::DbOperation(operation) => { | ||||
|                 let database_name = operation.database.database_name(); | ||||
|                 let database = operation.database.database(index); | ||||
|                 let frame = frame_with_header.frame(); | ||||
|                 match operation.key_value(frame) { | ||||
|                     (key, Some(value)) => { | ||||
|                         if let Err(error) = database.put(wtxn, key, value) { | ||||
|                             return Err(Error::InternalError(InternalError::StorePut { | ||||
|                                 database_name, | ||||
|                                 key: key.into(), | ||||
|                                 value_length: value.len(), | ||||
|                                 error, | ||||
|                             })); | ||||
|                         } | ||||
|                     } | ||||
|                     (key, None) => match database.delete(wtxn, key) { | ||||
|                         Ok(false) => { | ||||
|                             unreachable!("We tried to delete an unknown key: {key:?}") | ||||
|                         } | ||||
|                         Ok(_) => (), | ||||
|                         Err(error) => { | ||||
|                             return Err(Error::InternalError(InternalError::StoreDeletion { | ||||
|                                 database_name, | ||||
|                                 key: key.into(), | ||||
|                                 error, | ||||
|                             })); | ||||
|                         } | ||||
|                     }, | ||||
|                 } | ||||
|             } | ||||
|             EntryHeader::ArroyDeleteVector(ArroyDeleteVector { docid }) => { | ||||
|                 for (_index, (_name, _embedder, writer, dimensions)) in arroy_writers { | ||||
|                     let dimensions = *dimensions; | ||||
|                     writer.del_items(wtxn, dimensions, docid)?; | ||||
|                 } | ||||
|             } | ||||
|             EntryHeader::ArroySetVectors(asvs) => { | ||||
|                 let ArroySetVectors { docid, embedder_id, .. } = asvs; | ||||
|                 let frame = frame_with_header.frame(); | ||||
|                 let (_, _, writer, dimensions) = | ||||
|                     arroy_writers.get(&embedder_id).expect("requested a missing embedder"); | ||||
|                 let mut embeddings = Embeddings::new(*dimensions); | ||||
|                 let all_embeddings = asvs.read_all_embeddings_into_vec(frame, aligned_embedding); | ||||
|                 embeddings.append(all_embeddings.to_vec()).unwrap(); | ||||
|                 writer.del_items(wtxn, *dimensions, docid)?; | ||||
|                 writer.add_items(wtxn, docid, &embeddings)?; | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     Ok(()) | ||||
| } | ||||
| @@ -25,7 +25,7 @@ impl WordPrefixDocids { | ||||
|     fn new( | ||||
|         database: Database<Bytes, CboRoaringBitmapCodec>, | ||||
|         prefix_database: Database<Bytes, CboRoaringBitmapCodec>, | ||||
|         grenad_parameters: GrenadParameters, | ||||
|         grenad_parameters: &GrenadParameters, | ||||
|     ) -> WordPrefixDocids { | ||||
|         WordPrefixDocids { | ||||
|             database, | ||||
| @@ -161,7 +161,7 @@ impl WordPrefixIntegerDocids { | ||||
|     fn new( | ||||
|         database: Database<Bytes, CboRoaringBitmapCodec>, | ||||
|         prefix_database: Database<Bytes, CboRoaringBitmapCodec>, | ||||
|         grenad_parameters: GrenadParameters, | ||||
|         grenad_parameters: &GrenadParameters, | ||||
|     ) -> WordPrefixIntegerDocids { | ||||
|         WordPrefixIntegerDocids { | ||||
|             database, | ||||
| @@ -311,7 +311,7 @@ pub fn compute_word_prefix_docids( | ||||
|     index: &Index, | ||||
|     prefix_to_compute: &BTreeSet<Prefix>, | ||||
|     prefix_to_delete: &BTreeSet<Prefix>, | ||||
|     grenad_parameters: GrenadParameters, | ||||
|     grenad_parameters: &GrenadParameters, | ||||
| ) -> Result<()> { | ||||
|     WordPrefixDocids::new( | ||||
|         index.word_docids.remap_key_type(), | ||||
| @@ -327,7 +327,7 @@ pub fn compute_exact_word_prefix_docids( | ||||
|     index: &Index, | ||||
|     prefix_to_compute: &BTreeSet<Prefix>, | ||||
|     prefix_to_delete: &BTreeSet<Prefix>, | ||||
|     grenad_parameters: GrenadParameters, | ||||
|     grenad_parameters: &GrenadParameters, | ||||
| ) -> Result<()> { | ||||
|     WordPrefixDocids::new( | ||||
|         index.exact_word_docids.remap_key_type(), | ||||
| @@ -343,7 +343,7 @@ pub fn compute_word_prefix_fid_docids( | ||||
|     index: &Index, | ||||
|     prefix_to_compute: &BTreeSet<Prefix>, | ||||
|     prefix_to_delete: &BTreeSet<Prefix>, | ||||
|     grenad_parameters: GrenadParameters, | ||||
|     grenad_parameters: &GrenadParameters, | ||||
| ) -> Result<()> { | ||||
|     WordPrefixIntegerDocids::new( | ||||
|         index.word_fid_docids.remap_key_type(), | ||||
| @@ -359,7 +359,7 @@ pub fn compute_word_prefix_position_docids( | ||||
|     index: &Index, | ||||
|     prefix_to_compute: &BTreeSet<Prefix>, | ||||
|     prefix_to_delete: &BTreeSet<Prefix>, | ||||
|     grenad_parameters: GrenadParameters, | ||||
|     grenad_parameters: &GrenadParameters, | ||||
| ) -> Result<()> { | ||||
|     WordPrefixIntegerDocids::new( | ||||
|         index.word_position_docids.remap_key_type(), | ||||
|   | ||||
		Reference in New Issue
	
	Block a user