mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-26 13:36:27 +00:00 
			
		
		
		
	Merge #5092
	
		
			
	
		
	
	
		
	
		
			Some checks failed
		
		
	
	
		
			
				
	
				Test suite / Tests on ${{ matrix.os }} (macos-13) (push) Waiting to run
				
					
					
				
			
		
			
				
	
				Test suite / Tests almost all features (push) Has been skipped
				
					
					
				
			
		
			
				
	
				Test suite / Test disabled tokenization (push) Has been skipped
				
					
					
				
			
		
			
				
	
				Test suite / Tests on ubuntu-20.04 (push) Failing after 12s
				
					
					
				
			
		
			
				
	
				Test suite / Run tests in debug (push) Failing after 11s
				
					
					
				
			
		
			
				
	
				Test suite / Tests on ${{ matrix.os }} (windows-2022) (push) Failing after 23s
				
					
					
				
			
		
			
				
	
				Test suite / Run Rustfmt (push) Successful in 1m41s
				
					
					
				
			
		
			
				
	
				Test suite / Run Clippy (push) Successful in 5m36s
				
					
					
				
			
		
		
	
	
				
					
				
			
		
			Some checks failed
		
		
	
	Test suite / Tests on ${{ matrix.os }} (macos-13) (push) Waiting to run
				Test suite / Tests almost all features (push) Has been skipped
				Test suite / Test disabled tokenization (push) Has been skipped
				Test suite / Tests on ubuntu-20.04 (push) Failing after 12s
				Test suite / Run tests in debug (push) Failing after 11s
				Test suite / Tests on ${{ matrix.os }} (windows-2022) (push) Failing after 23s
				Test suite / Run Rustfmt (push) Successful in 1m41s
				Test suite / Run Clippy (push) Successful in 5m36s
				5092: Precise spans for new indexer r=dureuill a=dureuill - Separate extract and merge spans - Add span around commit Co-authored-by: Louis Dureuil <louis@meilisearch.com>
This commit is contained in:
		| @@ -109,55 +109,71 @@ where | ||||
|  | ||||
|             let rtxn = index.read_txn()?; | ||||
|  | ||||
|  | ||||
|             // document but we need to create a function that collects and compresses documents. | ||||
|             let document_sender = extractor_sender.documents(); | ||||
|             let document_extractor = DocumentsExtractor::new(&document_sender, embedders); | ||||
|             let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); | ||||
|  | ||||
|             extract(document_changes, | ||||
|                 &document_extractor, | ||||
|                 indexing_context, | ||||
|                 &mut extractor_allocs, | ||||
|                 &datastore, | ||||
|                 Step::ExtractingDocuments, | ||||
|             )?; | ||||
|  | ||||
|             for document_extractor_data in datastore { | ||||
|                 let document_extractor_data = document_extractor_data.0.into_inner(); | ||||
|                 for (field, delta) in document_extractor_data.field_distribution_delta { | ||||
|                     let current = field_distribution.entry(field).or_default(); | ||||
|                     // adding the delta should never cause a negative result, as we are removing fields that previously existed. | ||||
|                     *current = current.saturating_add_signed(delta); | ||||
|                 } | ||||
|                 document_extractor_data.docids_delta.apply_to(document_ids); | ||||
|             { | ||||
|                 let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "documents"); | ||||
|                 let _entered = span.enter(); | ||||
|                 extract(document_changes, | ||||
|                     &document_extractor, | ||||
|                     indexing_context, | ||||
|                     &mut extractor_allocs, | ||||
|                     &datastore, | ||||
|                     Step::ExtractingDocuments, | ||||
|                 )?; | ||||
|             } | ||||
|             { | ||||
|                 let span = tracing::trace_span!(target: "indexing::documents::merge", parent: &indexer_span, "documents"); | ||||
|                 let _entered = span.enter(); | ||||
|                 for document_extractor_data in datastore { | ||||
|                     let document_extractor_data = document_extractor_data.0.into_inner(); | ||||
|                     for (field, delta) in document_extractor_data.field_distribution_delta { | ||||
|                         let current = field_distribution.entry(field).or_default(); | ||||
|                         // adding the delta should never cause a negative result, as we are removing fields that previously existed. | ||||
|                         *current = current.saturating_add_signed(delta); | ||||
|                     } | ||||
|                     document_extractor_data.docids_delta.apply_to(document_ids); | ||||
|                 } | ||||
|  | ||||
|             field_distribution.retain(|_, v| *v != 0); | ||||
|                 field_distribution.retain(|_, v| *v != 0); | ||||
|             } | ||||
|  | ||||
|             let facet_field_ids_delta; | ||||
|  | ||||
|             { | ||||
|                 let span = tracing::trace_span!(target: "indexing::documents::extract", "faceted"); | ||||
|                 let _entered = span.enter(); | ||||
|                 let caches = { | ||||
|                     let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "faceted"); | ||||
|                     let _entered = span.enter(); | ||||
|  | ||||
|                 facet_field_ids_delta = merge_and_send_facet_docids( | ||||
|                     FacetedDocidsExtractor::run_extraction( | ||||
|                         grenad_parameters, | ||||
|                         document_changes, | ||||
|                         indexing_context, | ||||
|                         &mut extractor_allocs, | ||||
|                         &extractor_sender.field_id_docid_facet_sender(), | ||||
|                         Step::ExtractingFacets | ||||
|                     )?, | ||||
|                     FacetDatabases::new(index), | ||||
|                     index, | ||||
|                     extractor_sender.facet_docids(), | ||||
|                 )?; | ||||
|                             grenad_parameters, | ||||
|                             document_changes, | ||||
|                             indexing_context, | ||||
|                             &mut extractor_allocs, | ||||
|                             &extractor_sender.field_id_docid_facet_sender(), | ||||
|                             Step::ExtractingFacets | ||||
|                         )? | ||||
|                 }; | ||||
|  | ||||
|                 { | ||||
|                     let span = tracing::trace_span!(target: "indexing::documents::merge", parent: &indexer_span, "faceted"); | ||||
|                     let _entered = span.enter(); | ||||
|  | ||||
|                     facet_field_ids_delta = merge_and_send_facet_docids( | ||||
|                         caches, | ||||
|                         FacetDatabases::new(index), | ||||
|                         index, | ||||
|                         extractor_sender.facet_docids(), | ||||
|                     )?; | ||||
|                 } | ||||
|             } | ||||
|  | ||||
|             { | ||||
|                 let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids"); | ||||
|                 let _entered = span.enter(); | ||||
|  | ||||
|  | ||||
|  | ||||
|  | ||||
|                 let WordDocidsCaches { | ||||
| @@ -166,15 +182,19 @@ where | ||||
|                     exact_word_docids, | ||||
|                     word_position_docids, | ||||
|                     fid_word_count_docids, | ||||
|                 } = WordDocidsExtractors::run_extraction( | ||||
|                     grenad_parameters, | ||||
|                     document_changes, | ||||
|                     indexing_context, | ||||
|                     &mut extractor_allocs, | ||||
|                     Step::ExtractingWords | ||||
|                 )?; | ||||
|                 } = { | ||||
|                     let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids"); | ||||
|                     let _entered = span.enter(); | ||||
|  | ||||
|                     WordDocidsExtractors::run_extraction( | ||||
|                         grenad_parameters, | ||||
|                         document_changes, | ||||
|                         indexing_context, | ||||
|                         &mut extractor_allocs, | ||||
|                         Step::ExtractingWords | ||||
|                     )? | ||||
|                 }; | ||||
|  | ||||
|                 // TODO Word Docids Merger | ||||
|                 { | ||||
|                     let span = tracing::trace_span!(target: "indexing::documents::merge", "word_docids"); | ||||
|                     let _entered = span.enter(); | ||||
| @@ -187,7 +207,6 @@ where | ||||
|                     )?; | ||||
|                 } | ||||
|  | ||||
|                 // Word Fid Docids Merging | ||||
|                 { | ||||
|                     let span = tracing::trace_span!(target: "indexing::documents::merge", "word_fid_docids"); | ||||
|                     let _entered = span.enter(); | ||||
| @@ -200,7 +219,6 @@ where | ||||
|                     )?; | ||||
|                 } | ||||
|  | ||||
|                 // Exact Word Docids Merging | ||||
|                 { | ||||
|                     let span = tracing::trace_span!(target: "indexing::documents::merge", "exact_word_docids"); | ||||
|                     let _entered = span.enter(); | ||||
| @@ -213,7 +231,6 @@ where | ||||
|                     )?; | ||||
|                 } | ||||
|  | ||||
|                 // Word Position Docids Merging | ||||
|                 { | ||||
|                     let span = tracing::trace_span!(target: "indexing::documents::merge", "word_position_docids"); | ||||
|                     let _entered = span.enter(); | ||||
| @@ -226,7 +243,6 @@ where | ||||
|                     )?; | ||||
|                 } | ||||
|  | ||||
|                 // Fid Word Count Docids Merging | ||||
|                 { | ||||
|                     let span = tracing::trace_span!(target: "indexing::documents::merge", "fid_word_count_docids"); | ||||
|                     let _entered = span.enter(); | ||||
| @@ -244,30 +260,34 @@ where | ||||
|             // this works only if the settings didn't change during this transaction. | ||||
|             let proximity_precision = index.proximity_precision(&rtxn)?.unwrap_or_default(); | ||||
|             if proximity_precision == ProximityPrecision::ByWord { | ||||
|                 let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids"); | ||||
|                 let _entered = span.enter(); | ||||
|                 let caches = { | ||||
|                     let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids"); | ||||
|                     let _entered = span.enter(); | ||||
|  | ||||
|                     <WordPairProximityDocidsExtractor as DocidsExtractor>::run_extraction( | ||||
|                         grenad_parameters, | ||||
|                         document_changes, | ||||
|                         indexing_context, | ||||
|                         &mut extractor_allocs, | ||||
|                         Step::ExtractingWordProximity, | ||||
|                     )? | ||||
|                 }; | ||||
|  | ||||
|                 let caches = <WordPairProximityDocidsExtractor as DocidsExtractor>::run_extraction( | ||||
|                     grenad_parameters, | ||||
|                     document_changes, | ||||
|                     indexing_context, | ||||
|                     &mut extractor_allocs, | ||||
|                     Step::ExtractingWordProximity, | ||||
|                 )?; | ||||
|                 { | ||||
|                     let span = tracing::trace_span!(target: "indexing::documents::merge", "word_pair_proximity_docids"); | ||||
|                     let _entered = span.enter(); | ||||
|  | ||||
|                 merge_and_send_docids( | ||||
|                     caches, | ||||
|                     index.word_pair_proximity_docids.remap_types(), | ||||
|                     index, | ||||
|                     extractor_sender.docids::<WordPairProximityDocids>(), | ||||
|                     &indexing_context.must_stop_processing, | ||||
|                 )?; | ||||
|                     merge_and_send_docids( | ||||
|                         caches, | ||||
|                         index.word_pair_proximity_docids.remap_types(), | ||||
|                         index, | ||||
|                         extractor_sender.docids::<WordPairProximityDocids>(), | ||||
|                         &indexing_context.must_stop_processing, | ||||
|                     )?; | ||||
|                 } | ||||
|             } | ||||
|  | ||||
|             'vectors: { | ||||
|                 let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors"); | ||||
|                 let _entered = span.enter(); | ||||
|  | ||||
|                 let mut index_embeddings = index.embedding_configs(&rtxn)?; | ||||
|                 if index_embeddings.is_empty() { | ||||
| @@ -277,13 +297,22 @@ where | ||||
|                 let embedding_sender = extractor_sender.embeddings(); | ||||
|                 let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads()); | ||||
|                 let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); | ||||
|                 extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, Step::ExtractingEmbeddings)?; | ||||
|                 { | ||||
|                     let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors"); | ||||
|                     let _entered = span.enter(); | ||||
|  | ||||
|                 for config in &mut index_embeddings { | ||||
|                     'data: for data in datastore.iter_mut() { | ||||
|                         let data = &mut data.get_mut().0; | ||||
|                         let Some(deladd) = data.remove(&config.name) else { continue 'data; }; | ||||
|                         deladd.apply_to(&mut config.user_provided); | ||||
|                     extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, Step::ExtractingEmbeddings)?; | ||||
|                 } | ||||
|                 { | ||||
|                     let span = tracing::trace_span!(target: "indexing::documents::merge", "vectors"); | ||||
|                     let _entered = span.enter(); | ||||
|  | ||||
|                     for config in &mut index_embeddings { | ||||
|                         'data: for data in datastore.iter_mut() { | ||||
|                             let data = &mut data.get_mut().0; | ||||
|                             let Some(deladd) = data.remove(&config.name) else { continue 'data; }; | ||||
|                             deladd.apply_to(&mut config.user_provided); | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|  | ||||
| @@ -291,21 +320,24 @@ where | ||||
|             } | ||||
|  | ||||
|             'geo: { | ||||
|                 let span = tracing::trace_span!(target: "indexing::documents::extract", "geo"); | ||||
|                 let _entered = span.enter(); | ||||
|  | ||||
|                 let Some(extractor) = GeoExtractor::new(&rtxn, index, grenad_parameters)? else { | ||||
|                     break 'geo; | ||||
|                 }; | ||||
|                 let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); | ||||
|                 extract( | ||||
|                     document_changes, | ||||
|                     &extractor, | ||||
|                     indexing_context, | ||||
|                     &mut extractor_allocs, | ||||
|                     &datastore, | ||||
|                     Step::WritingGeoPoints | ||||
|                 )?; | ||||
|  | ||||
|                 { | ||||
|                     let span = tracing::trace_span!(target: "indexing::documents::extract", "geo"); | ||||
|                     let _entered = span.enter(); | ||||
|  | ||||
|                     extract( | ||||
|                         document_changes, | ||||
|                         &extractor, | ||||
|                         indexing_context, | ||||
|                         &mut extractor_allocs, | ||||
|                         &datastore, | ||||
|                         Step::WritingGeoPoints | ||||
|                     )?; | ||||
|                 } | ||||
|  | ||||
|                 merge_and_send_rtree( | ||||
|                     datastore, | ||||
| @@ -316,11 +348,7 @@ where | ||||
|                 )?; | ||||
|             } | ||||
|  | ||||
|             { | ||||
|                 let span = tracing::trace_span!(target: "indexing::documents::extract", "FINISH"); | ||||
|                 let _entered = span.enter(); | ||||
|                 (indexing_context.send_progress)(Progress::from_step(Step::WritingToDatabase)); | ||||
|             } | ||||
|             (indexing_context.send_progress)(Progress::from_step(Step::WritingToDatabase)); | ||||
|  | ||||
|             Result::Ok(facet_field_ids_delta) | ||||
|         })?; | ||||
| @@ -352,90 +380,103 @@ where | ||||
|             .collect(); | ||||
|  | ||||
|         let mut arroy_writers = arroy_writers?; | ||||
|         for operation in writer_receiver { | ||||
|             match operation { | ||||
|                 WriterOperation::DbOperation(db_operation) => { | ||||
|                     let database = db_operation.database(index); | ||||
|                     let database_name = db_operation.database_name(); | ||||
|                     match db_operation.entry() { | ||||
|                         EntryOperation::Delete(e) => match database.delete(wtxn, e.entry()) { | ||||
|                             Ok(false) => unreachable!("We tried to delete an unknown key"), | ||||
|                             Ok(_) => (), | ||||
|                             Err(error) => { | ||||
|                                 return Err(Error::InternalError(InternalError::StoreDeletion { | ||||
|                                     database_name, | ||||
|                                     key: e.entry().to_owned(), | ||||
|                                     error, | ||||
|                                 })); | ||||
|                             } | ||||
|                         }, | ||||
|                         EntryOperation::Write(e) => { | ||||
|                             if let Err(error) = database.put(wtxn, e.key(), e.value()) { | ||||
|                                 return Err(Error::InternalError(InternalError::StorePut { | ||||
|                                     database_name, | ||||
|                                     key: e.key().to_owned(), | ||||
|                                     value_length: e.value().len(), | ||||
|                                     error, | ||||
|                                 })); | ||||
|         { | ||||
|             let span = tracing::trace_span!(target: "indexing::write_db", "all"); | ||||
|             let _entered = span.enter(); | ||||
|  | ||||
|             for operation in writer_receiver { | ||||
|                 match operation { | ||||
|                     WriterOperation::DbOperation(db_operation) => { | ||||
|                         let database = db_operation.database(index); | ||||
|                         let database_name = db_operation.database_name(); | ||||
|                         match db_operation.entry() { | ||||
|                             EntryOperation::Delete(e) => match database.delete(wtxn, e.entry()) { | ||||
|                                 Ok(false) => unreachable!("We tried to delete an unknown key"), | ||||
|                                 Ok(_) => (), | ||||
|                                 Err(error) => { | ||||
|                                     return Err(Error::InternalError( | ||||
|                                         InternalError::StoreDeletion { | ||||
|                                             database_name, | ||||
|                                             key: e.entry().to_owned(), | ||||
|                                             error, | ||||
|                                         }, | ||||
|                                     )); | ||||
|                                 } | ||||
|                             }, | ||||
|                             EntryOperation::Write(e) => { | ||||
|                                 if let Err(error) = database.put(wtxn, e.key(), e.value()) { | ||||
|                                     return Err(Error::InternalError(InternalError::StorePut { | ||||
|                                         database_name, | ||||
|                                         key: e.key().to_owned(), | ||||
|                                         value_length: e.value().len(), | ||||
|                                         error, | ||||
|                                     })); | ||||
|                                 } | ||||
|                             } | ||||
|                         } | ||||
|                     } | ||||
|                     WriterOperation::ArroyOperation(arroy_operation) => match arroy_operation { | ||||
|                         ArroyOperation::DeleteVectors { docid } => { | ||||
|                             for ( | ||||
|                                 _embedder_index, | ||||
|                                 (_embedder_name, _embedder, writer, dimensions), | ||||
|                             ) in &mut arroy_writers | ||||
|                             { | ||||
|                                 let dimensions = *dimensions; | ||||
|                                 writer.del_items(wtxn, dimensions, docid)?; | ||||
|                             } | ||||
|                         } | ||||
|                         ArroyOperation::SetVectors { | ||||
|                             docid, | ||||
|                             embedder_id, | ||||
|                             embeddings: raw_embeddings, | ||||
|                         } => { | ||||
|                             let (_, _, writer, dimensions) = arroy_writers | ||||
|                                 .get(&embedder_id) | ||||
|                                 .expect("requested a missing embedder"); | ||||
|  | ||||
|                             let mut embeddings = Embeddings::new(*dimensions); | ||||
|                             for embedding in raw_embeddings { | ||||
|                                 embeddings.append(embedding).unwrap(); | ||||
|                             } | ||||
|  | ||||
|                             writer.del_items(wtxn, *dimensions, docid)?; | ||||
|                             writer.add_items(wtxn, docid, &embeddings)?; | ||||
|                         } | ||||
|                         ArroyOperation::SetVector { docid, embedder_id, embedding } => { | ||||
|                             let (_, _, writer, dimensions) = arroy_writers | ||||
|                                 .get(&embedder_id) | ||||
|                                 .expect("requested a missing embedder"); | ||||
|                             writer.del_items(wtxn, *dimensions, docid)?; | ||||
|                             writer.add_item(wtxn, docid, &embedding)?; | ||||
|                         } | ||||
|                         ArroyOperation::Finish { configs } => { | ||||
|                             let span = tracing::trace_span!(target: "indexing::vectors", parent: &indexer_span, "build"); | ||||
|                             let _entered = span.enter(); | ||||
|  | ||||
|                             (indexing_context.send_progress)(Progress::from_step( | ||||
|                                 Step::WritingEmbeddingsToDatabase, | ||||
|                             )); | ||||
|  | ||||
|                             for ( | ||||
|                                 _embedder_index, | ||||
|                                 (_embedder_name, _embedder, writer, dimensions), | ||||
|                             ) in &mut arroy_writers | ||||
|                             { | ||||
|                                 let dimensions = *dimensions; | ||||
|                                 writer.build_and_quantize( | ||||
|                                     wtxn, | ||||
|                                     &mut rng, | ||||
|                                     dimensions, | ||||
|                                     false, | ||||
|                                     &indexing_context.must_stop_processing, | ||||
|                                 )?; | ||||
|                             } | ||||
|  | ||||
|                             index.put_embedding_configs(wtxn, configs)?; | ||||
|                         } | ||||
|                     }, | ||||
|                 } | ||||
|                 WriterOperation::ArroyOperation(arroy_operation) => match arroy_operation { | ||||
|                     ArroyOperation::DeleteVectors { docid } => { | ||||
|                         for (_embedder_index, (_embedder_name, _embedder, writer, dimensions)) in | ||||
|                             &mut arroy_writers | ||||
|                         { | ||||
|                             let dimensions = *dimensions; | ||||
|                             writer.del_items(wtxn, dimensions, docid)?; | ||||
|                         } | ||||
|                     } | ||||
|                     ArroyOperation::SetVectors { | ||||
|                         docid, | ||||
|                         embedder_id, | ||||
|                         embeddings: raw_embeddings, | ||||
|                     } => { | ||||
|                         let (_, _, writer, dimensions) = | ||||
|                             arroy_writers.get(&embedder_id).expect("requested a missing embedder"); | ||||
|                         // TODO: switch to Embeddings | ||||
|                         let mut embeddings = Embeddings::new(*dimensions); | ||||
|                         for embedding in raw_embeddings { | ||||
|                             embeddings.append(embedding).unwrap(); | ||||
|                         } | ||||
|  | ||||
|                         writer.del_items(wtxn, *dimensions, docid)?; | ||||
|                         writer.add_items(wtxn, docid, &embeddings)?; | ||||
|                     } | ||||
|                     ArroyOperation::SetVector { docid, embedder_id, embedding } => { | ||||
|                         let (_, _, writer, dimensions) = | ||||
|                             arroy_writers.get(&embedder_id).expect("requested a missing embedder"); | ||||
|                         writer.del_items(wtxn, *dimensions, docid)?; | ||||
|                         writer.add_item(wtxn, docid, &embedding)?; | ||||
|                     } | ||||
|                     ArroyOperation::Finish { configs } => { | ||||
|                         let span = tracing::trace_span!(target: "indexing::vectors", parent: &indexer_span, "build"); | ||||
|                         let _entered = span.enter(); | ||||
|  | ||||
|                         (indexing_context.send_progress)(Progress::from_step( | ||||
|                             Step::WritingEmbeddingsToDatabase, | ||||
|                         )); | ||||
|  | ||||
|                         for (_embedder_index, (_embedder_name, _embedder, writer, dimensions)) in | ||||
|                             &mut arroy_writers | ||||
|                         { | ||||
|                             let dimensions = *dimensions; | ||||
|                             writer.build_and_quantize( | ||||
|                                 wtxn, | ||||
|                                 &mut rng, | ||||
|                                 dimensions, | ||||
|                                 false, | ||||
|                                 &indexing_context.must_stop_processing, | ||||
|                             )?; | ||||
|                         } | ||||
|  | ||||
|                         index.put_embedding_configs(wtxn, configs)?; | ||||
|                     } | ||||
|                 }, | ||||
|             } | ||||
|         } | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user