mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-26 21:46:27 +00:00 
			
		
		
		
	Add tracing to milli
This commit is contained in:
		
							
								
								
									
										1
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										1
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							| @@ -3840,6 +3840,7 @@ dependencies = [ | ||||
|  "time", | ||||
|  "tokenizers", | ||||
|  "tokio", | ||||
|  "tracing", | ||||
|  "uuid", | ||||
| ] | ||||
|  | ||||
|   | ||||
| @@ -91,6 +91,7 @@ tiktoken-rs = "0.5.8" | ||||
| liquid = "0.26.4" | ||||
| arroy = "0.2.0" | ||||
| rand = "0.8.5" | ||||
| tracing = "0.1.40" | ||||
|  | ||||
| [dev-dependencies] | ||||
| mimalloc = { version = "0.1.39", default-features = false } | ||||
| @@ -102,15 +103,7 @@ meili-snap = { path = "../meili-snap" } | ||||
| rand = { version = "0.8.5", features = ["small_rng"] } | ||||
|  | ||||
| [features] | ||||
| all-tokenizations = [ | ||||
|     "charabia/chinese", | ||||
|     "charabia/hebrew", | ||||
|     "charabia/japanese", | ||||
|     "charabia/thai", | ||||
|     "charabia/korean", | ||||
|     "charabia/greek", | ||||
|     "charabia/khmer", | ||||
| ] | ||||
| all-tokenizations = ["charabia/chinese", "charabia/hebrew", "charabia/japanese", "charabia/thai", "charabia/korean", "charabia/greek", "charabia/khmer"] | ||||
|  | ||||
| # Use POSIX semaphores instead of SysV semaphores in LMDB | ||||
| # For more information on this feature, see heed's Cargo.toml | ||||
|   | ||||
| @@ -25,6 +25,7 @@ impl<R: io::Read + io::Seek> DocumentsBatchReader<R> { | ||||
|     /// | ||||
|     /// It first retrieves the index, then moves to the first document. Use the `into_cursor` | ||||
|     /// method to iterator over the documents, from the first to the last. | ||||
|     #[tracing::instrument(level = "trace", skip_all, target = "indexing::documents")] | ||||
|     pub fn from_reader(reader: R) -> Result<Self, Error> { | ||||
|         let reader = grenad::Reader::new(reader)?; | ||||
|         let mut cursor = reader.into_cursor()?; | ||||
|   | ||||
| @@ -14,6 +14,12 @@ impl<'t, 'i> ClearDocuments<'t, 'i> { | ||||
|         ClearDocuments { wtxn, index } | ||||
|     } | ||||
|  | ||||
|     #[tracing::instrument( | ||||
|         level = "trace", | ||||
|         skip(self), | ||||
|         target = "indexing::documents", | ||||
|         name = "clear_documents" | ||||
|     )] | ||||
|     pub fn execute(self) -> Result<u64> { | ||||
|         puffin::profile_function!(); | ||||
|  | ||||
|   | ||||
| @@ -22,6 +22,7 @@ use crate::{FieldId, Index, Result}; | ||||
| /// # Panics | ||||
| /// | ||||
| /// - if reader.is_empty(), this function may panic in some cases | ||||
| #[tracing::instrument(level = "trace", skip_all, target = "indexing::documents")] | ||||
| pub fn enrich_documents_batch<R: Read + Seek>( | ||||
|     rtxn: &heed::RoTxn, | ||||
|     index: &Index, | ||||
| @@ -143,6 +144,8 @@ pub fn enrich_documents_batch<R: Read + Seek>( | ||||
|  | ||||
| /// Retrieve the document id after validating it, returning a `UserError` | ||||
| /// if the id is invalid or can't be guessed. | ||||
| #[tracing::instrument(level = "trace", skip(uuid_buffer, documents_batch_index, document) | ||||
| target = "indexing::documents")] | ||||
| fn fetch_or_generate_document_id( | ||||
|     document: &obkv::KvReader<FieldId>, | ||||
|     documents_batch_index: &DocumentsBatchIndex, | ||||
|   | ||||
| @@ -21,7 +21,7 @@ pub type ScriptLanguageDocidsMap = HashMap<(Script, Language), (RoaringBitmap, R | ||||
| /// | ||||
| /// Returns the generated internal documents ids and a grenad reader | ||||
| /// with the list of extracted words from the given chunk of documents. | ||||
| #[logging_timer::time] | ||||
| #[tracing::instrument(level = "trace", skip_all, target = "indexing::extract")] | ||||
| pub fn extract_docid_word_positions<R: io::Read + io::Seek>( | ||||
|     obkv_documents: grenad::Reader<R>, | ||||
|     indexer: GrenadParameters, | ||||
|   | ||||
| @@ -16,7 +16,7 @@ use crate::Result; | ||||
| /// | ||||
| /// Returns a grenad reader with the list of extracted facet numbers and | ||||
| /// documents ids from the given chunk of docid facet number positions. | ||||
| #[logging_timer::time] | ||||
| #[tracing::instrument(level = "trace", skip_all, target = "indexing::extract")] | ||||
| pub fn extract_facet_number_docids<R: io::Read + io::Seek>( | ||||
|     fid_docid_facet_number: grenad::Reader<R>, | ||||
|     indexer: GrenadParameters, | ||||
|   | ||||
| @@ -15,7 +15,7 @@ use crate::{FieldId, Result}; | ||||
| /// | ||||
| /// Returns a grenad reader with the list of extracted facet strings and | ||||
| /// documents ids from the given chunk of docid facet string positions. | ||||
| #[logging_timer::time] | ||||
| #[tracing::instrument(level = "trace", skip_all, target = "indexing::extract")] | ||||
| pub fn extract_facet_string_docids<R: io::Read + io::Seek>( | ||||
|     docid_fid_facet_string: grenad::Reader<R>, | ||||
|     indexer: GrenadParameters, | ||||
|   | ||||
| @@ -39,7 +39,7 @@ pub struct ExtractedFacetValues { | ||||
| /// Returns the generated grenad reader containing the docid the fid and the orginal value as key | ||||
| /// and the normalized value as value extracted from the given chunk of documents. | ||||
| /// We need the fid of the geofields to correctly parse them as numbers if they were sent as strings initially. | ||||
| #[logging_timer::time] | ||||
| #[tracing::instrument(level = "trace", skip_all, target = "indexing::extract")] | ||||
| pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>( | ||||
|     obkv_documents: grenad::Reader<R>, | ||||
|     indexer: GrenadParameters, | ||||
|   | ||||
| @@ -19,7 +19,7 @@ const MAX_COUNTED_WORDS: usize = 30; | ||||
| /// | ||||
| /// Returns a grenad reader with the list of extracted field id word counts | ||||
| /// and documents ids from the given chunk of docid word positions. | ||||
| #[logging_timer::time] | ||||
| #[tracing::instrument(level = "trace", skip_all, target = "indexing::extract")] | ||||
| pub fn extract_fid_word_count_docids<R: io::Read + io::Seek>( | ||||
|     docid_word_positions: grenad::Reader<R>, | ||||
|     indexer: GrenadParameters, | ||||
|   | ||||
| @@ -13,7 +13,7 @@ use crate::{FieldId, InternalError, Result}; | ||||
| /// Extracts the geographical coordinates contained in each document under the `_geo` field. | ||||
| /// | ||||
| /// Returns the generated grenad reader containing the docid as key associated to the (latitude, longitude) | ||||
| #[logging_timer::time] | ||||
| #[tracing::instrument(level = "trace", skip_all, target = "indexing::extract")] | ||||
| pub fn extract_geo_points<R: io::Read + io::Seek>( | ||||
|     obkv_documents: grenad::Reader<R>, | ||||
|     indexer: GrenadParameters, | ||||
|   | ||||
| @@ -67,7 +67,7 @@ impl VectorStateDelta { | ||||
| /// Extracts the embedding vector contained in each document under the `_vectors` field. | ||||
| /// | ||||
| /// Returns the generated grenad reader containing the docid as key associated to the Vec<f32> | ||||
| #[logging_timer::time] | ||||
| #[tracing::instrument(level = "trace", skip_all, target = "indexing::extract")] | ||||
| pub fn extract_vector_points<R: io::Read + io::Seek>( | ||||
|     obkv_documents: grenad::Reader<R>, | ||||
|     indexer: GrenadParameters, | ||||
|   | ||||
| @@ -23,7 +23,7 @@ use crate::{DocumentId, FieldId, Result}; | ||||
| /// | ||||
| /// The first returned reader is the one for normal word_docids, and the second one is for | ||||
| /// exact_word_docids | ||||
| #[logging_timer::time] | ||||
| #[tracing::instrument(level = "trace", skip_all, target = "indexing::extract")] | ||||
| pub fn extract_word_docids<R: io::Read + io::Seek>( | ||||
|     docid_word_positions: grenad::Reader<R>, | ||||
|     indexer: GrenadParameters, | ||||
| @@ -135,6 +135,7 @@ pub fn extract_word_docids<R: io::Read + io::Seek>( | ||||
|     )) | ||||
| } | ||||
|  | ||||
| #[tracing::instrument(level = "trace", skip_all, target = "indexing::extract")] | ||||
| fn words_into_sorter( | ||||
|     document_id: DocumentId, | ||||
|     fid: FieldId, | ||||
|   | ||||
| @@ -19,7 +19,7 @@ use crate::{DocumentId, Result}; | ||||
| /// | ||||
| /// Returns a grenad reader with the list of extracted word pairs proximities and | ||||
| /// documents ids from the given chunk of docid word positions. | ||||
| #[logging_timer::time] | ||||
| #[tracing::instrument(level = "trace", skip_all, target = "indexing::extract")] | ||||
| pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>( | ||||
|     docid_word_positions: grenad::Reader<R>, | ||||
|     indexer: GrenadParameters, | ||||
| @@ -59,6 +59,10 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>( | ||||
|         if current_document_id.map_or(false, |id| id != document_id) { | ||||
|             puffin::profile_scope!("Document into sorter"); | ||||
|  | ||||
|             // FIXME: span inside of a hot loop might degrade performance and create big reports | ||||
|             let span = tracing::trace_span!(target: "indexing::details", "document_into_sorter"); | ||||
|             let _entered = span.enter(); | ||||
|  | ||||
|             document_word_positions_into_sorter( | ||||
|                 current_document_id.unwrap(), | ||||
|                 &del_word_pair_proximity, | ||||
| @@ -138,6 +142,10 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>( | ||||
|  | ||||
|     if let Some(document_id) = current_document_id { | ||||
|         puffin::profile_scope!("Final document into sorter"); | ||||
|         // FIXME: span inside of a hot loop might degrade performance and create big reports | ||||
|         let span = tracing::trace_span!(target: "indexing::details", "final_document_into_sorter"); | ||||
|         let _entered = span.enter(); | ||||
|  | ||||
|         document_word_positions_into_sorter( | ||||
|             document_id, | ||||
|             &del_word_pair_proximity, | ||||
| @@ -147,6 +155,10 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>( | ||||
|     } | ||||
|     { | ||||
|         puffin::profile_scope!("sorter_into_reader"); | ||||
|         // FIXME: span inside of a hot loop might degrade performance and create big reports | ||||
|         let span = tracing::trace_span!(target: "indexing::details", "sorter_into_reader"); | ||||
|         let _entered = span.enter(); | ||||
|  | ||||
|         let mut writer = create_writer( | ||||
|             indexer.chunk_compression_type, | ||||
|             indexer.chunk_compression_level, | ||||
|   | ||||
| @@ -18,7 +18,7 @@ use crate::{bucketed_position, DocumentId, Result}; | ||||
| /// | ||||
| /// Returns a grenad reader with the list of extracted words at positions and | ||||
| /// documents ids from the given chunk of docid word positions. | ||||
| #[logging_timer::time] | ||||
| #[tracing::instrument(level = "trace", skip_all, target = "indexing::extract")] | ||||
| pub fn extract_word_position_docids<R: io::Read + io::Seek>( | ||||
|     docid_word_positions: grenad::Reader<R>, | ||||
|     indexer: GrenadParameters, | ||||
| @@ -94,6 +94,7 @@ pub fn extract_word_position_docids<R: io::Read + io::Seek>( | ||||
|     Ok(word_position_docids_reader) | ||||
| } | ||||
|  | ||||
| #[tracing::instrument(level = "trace", skip_all, target = "indexing::extract")] | ||||
| fn words_position_into_sorter( | ||||
|     document_id: DocumentId, | ||||
|     key_buffer: &mut Vec<u8>, | ||||
|   | ||||
| @@ -41,6 +41,7 @@ use crate::{FieldId, FieldsIdsMap, Result}; | ||||
| /// Extract data for each databases from obkv documents in parallel. | ||||
| /// Send data in grenad file over provided Sender. | ||||
| #[allow(clippy::too_many_arguments)] | ||||
| #[tracing::instrument(level = "trace", skip_all, target = "indexing::extract")] | ||||
| pub(crate) fn data_from_obkv_documents( | ||||
|     original_obkv_chunks: impl Iterator<Item = Result<grenad::Reader<BufReader<File>>>> + Send, | ||||
|     flattened_obkv_chunks: impl Iterator<Item = Result<grenad::Reader<BufReader<File>>>> + Send, | ||||
| @@ -257,12 +258,20 @@ fn spawn_extraction_task<FE, FS, M>( | ||||
|     M: MergeableReader + FromParallelIterator<M::Output> + Send + 'static, | ||||
|     M::Output: Send, | ||||
| { | ||||
|     let current_span = tracing::Span::current(); | ||||
|  | ||||
|     rayon::spawn(move || { | ||||
|         let child_span = tracing::trace_span!(target: "indexing::details", parent: ¤t_span, "extract_multiple_chunks"); | ||||
|         let _entered = child_span.enter(); | ||||
|         puffin::profile_scope!("extract_multiple_chunks", name); | ||||
|         let chunks: Result<M> = | ||||
|             chunks.into_par_iter().map(|chunk| extract_fn(chunk, indexer)).collect(); | ||||
|         let current_span = tracing::Span::current(); | ||||
|  | ||||
|         rayon::spawn(move || match chunks { | ||||
|             Ok(chunks) => { | ||||
|                 let child_span = tracing::trace_span!(target: "indexing::details", parent: ¤t_span, "merge_multiple_chunks"); | ||||
|                 let _entered = child_span.enter(); | ||||
|                 debug!("merge {} database", name); | ||||
|                 puffin::profile_scope!("merge_multiple_chunks", name); | ||||
|                 let reader = chunks.merge(merge_fn, &indexer); | ||||
|   | ||||
| @@ -49,6 +49,7 @@ pub fn create_sorter( | ||||
|     builder.build() | ||||
| } | ||||
|  | ||||
| #[tracing::instrument(level = "trace", skip_all, target = "indexing::grenad")] | ||||
| pub fn sorter_into_reader( | ||||
|     sorter: grenad::Sorter<MergeFn>, | ||||
|     indexer: GrenadParameters, | ||||
| @@ -240,6 +241,7 @@ pub fn grenad_obkv_into_chunks<R: io::Read + io::Seek>( | ||||
|  | ||||
| /// Write provided sorter in database using serialize_value function. | ||||
| /// merge_values function is used if an entry already exist in the database. | ||||
| #[tracing::instrument(level = "trace", skip_all, target = "indexing::grenad")] | ||||
| pub fn write_sorter_into_database<K, V, FS, FM>( | ||||
|     sorter: Sorter<MergeFn>, | ||||
|     database: &heed::Database<K, V>, | ||||
|   | ||||
| @@ -134,6 +134,7 @@ where | ||||
|     /// return an error and not the `IndexDocuments` struct as it is invalid to use it afterward. | ||||
|     /// | ||||
|     /// Returns the number of documents added to the builder. | ||||
|     #[tracing::instrument(level = "trace", skip_all, target = "indexing::documents")] | ||||
|     pub fn add_documents<R: Read + Seek>( | ||||
|         mut self, | ||||
|         reader: DocumentsBatchReader<R>, | ||||
| @@ -179,6 +180,7 @@ where | ||||
|     /// Remove a batch of documents from the current builder. | ||||
|     /// | ||||
|     /// Returns the number of documents deleted from the builder. | ||||
|     #[tracing::instrument(level = "trace", skip_all, target = "indexing::documents")] | ||||
|     pub fn remove_documents( | ||||
|         mut self, | ||||
|         to_delete: Vec<String>, | ||||
| @@ -214,6 +216,7 @@ where | ||||
|     /// - No batching using the standards `remove_documents` and `add_documents` took place | ||||
|     /// | ||||
|     /// TODO: make it impossible to call `remove_documents` or `add_documents` on an instance that calls this function. | ||||
|     #[tracing::instrument(level = "trace", skip_all, target = "indexing::details")] | ||||
|     pub fn remove_documents_from_db_no_batch( | ||||
|         mut self, | ||||
|         to_delete: &RoaringBitmap, | ||||
| @@ -237,7 +240,12 @@ where | ||||
|         Ok((self, deleted_documents)) | ||||
|     } | ||||
|  | ||||
|     #[logging_timer::time("IndexDocuments::{}")] | ||||
|     #[tracing::instrument( | ||||
|         level = "trace" | ||||
|         skip_all, | ||||
|         target = "indexing::documents", | ||||
|         name = "index_documents" | ||||
|     )] | ||||
|     pub fn execute(mut self) -> Result<DocumentAdditionResult> { | ||||
|         puffin::profile_function!(); | ||||
|  | ||||
| @@ -273,7 +281,12 @@ where | ||||
|     } | ||||
|  | ||||
|     /// Returns the total number of documents in the index after the update. | ||||
|     #[logging_timer::time("IndexDocuments::{}")] | ||||
|     #[tracing::instrument( | ||||
|         level = "trace", | ||||
|         skip_all, | ||||
|         target = "profile::indexing::details", | ||||
|         name = "index_documents_raw" | ||||
|     )] | ||||
|     pub fn execute_raw(self, output: TransformOutput) -> Result<u64> | ||||
|     where | ||||
|         FP: Fn(UpdateIndexingStep) + Sync, | ||||
| @@ -374,8 +387,12 @@ where | ||||
|  | ||||
|         let cloned_embedder = self.embedders.clone(); | ||||
|  | ||||
|         let current_span = tracing::Span::current(); | ||||
|  | ||||
|         // Run extraction pipeline in parallel. | ||||
|         pool.install(|| { | ||||
|             let child_span = tracing::trace_span!(target: "indexing::details", parent: ¤t_span, "extract_and_send_grenad_chunks"); | ||||
|             let _enter = child_span.enter(); | ||||
|             puffin::profile_scope!("extract_and_send_grenad_chunks"); | ||||
|             // split obkv file into several chunks | ||||
|             let original_chunk_iter = | ||||
| @@ -543,7 +560,12 @@ where | ||||
|         Ok(number_of_documents) | ||||
|     } | ||||
|  | ||||
|     #[logging_timer::time("IndexDocuments::{}")] | ||||
|     #[tracing::instrument( | ||||
|         level = "trace", | ||||
|         skip_all, | ||||
|         target = "indexing::prefix", | ||||
|         name = "index_documents_prefix_databases" | ||||
|     )] | ||||
|     pub fn execute_prefix_databases( | ||||
|         self, | ||||
|         word_docids: Option<grenad::Reader<CursorClonableMmap>>, | ||||
| @@ -598,6 +620,8 @@ where | ||||
|         let del_prefix_fst_words; | ||||
|  | ||||
|         { | ||||
|             let span = tracing::trace_span!(target: "indexing::details", "compute_prefix_diffs"); | ||||
|             let _entered = span.enter(); | ||||
|             puffin::profile_scope!("compute_prefix_diffs"); | ||||
|  | ||||
|             current_prefix_fst = self.index.words_prefixes_fst(self.wtxn)?; | ||||
| @@ -722,6 +746,12 @@ where | ||||
|  | ||||
| /// Run the word prefix docids update operation. | ||||
| #[allow(clippy::too_many_arguments)] | ||||
| #[tracing::instrument( | ||||
|     level = "trace", | ||||
|     skip_all, | ||||
|     target = "indexing::prefix", | ||||
|     name = "index_documents_word_prefix_docids" | ||||
| )] | ||||
| fn execute_word_prefix_docids( | ||||
|     txn: &mut heed::RwTxn, | ||||
|     reader: grenad::Reader<Cursor<ClonableMmap>>, | ||||
|   | ||||
| @@ -146,7 +146,7 @@ impl<'a, 'i> Transform<'a, 'i> { | ||||
|         }) | ||||
|     } | ||||
|  | ||||
|     #[logging_timer::time] | ||||
|     #[tracing::instrument(level = "trace", skip_all, target = "indexing::documents")] | ||||
|     pub fn read_documents<R, FP, FA>( | ||||
|         &mut self, | ||||
|         reader: EnrichedDocumentsBatchReader<R>, | ||||
| @@ -359,7 +359,7 @@ impl<'a, 'i> Transform<'a, 'i> { | ||||
|     /// - If the document to remove was inserted by the `read_documents` method before but was NOT present in the db, | ||||
|     ///   it's added into the grenad to ensure we don't insert it + removed from the list of new documents ids. | ||||
|     /// - If the document to remove was not present in either the db or the transform we do nothing. | ||||
|     #[logging_timer::time] | ||||
|     #[tracing::instrument(level = "trace", skip_all, target = "indexing::documents")] | ||||
|     pub fn remove_documents<FA>( | ||||
|         &mut self, | ||||
|         mut to_remove: Vec<String>, | ||||
| @@ -450,7 +450,7 @@ impl<'a, 'i> Transform<'a, 'i> { | ||||
|     /// - No batching using the standards `remove_documents` and `add_documents` took place | ||||
|     /// | ||||
|     /// TODO: make it impossible to call `remove_documents` or `add_documents` on an instance that calls this function. | ||||
|     #[logging_timer::time] | ||||
|     #[tracing::instrument(level = "trace", skip_all, target = "indexing::details")] | ||||
|     pub fn remove_documents_from_db_no_batch<FA>( | ||||
|         &mut self, | ||||
|         to_remove: &RoaringBitmap, | ||||
| @@ -541,6 +541,7 @@ impl<'a, 'i> Transform<'a, 'i> { | ||||
|  | ||||
|     // Flatten a document from the fields ids map contained in self and insert the new | ||||
|     // created fields. Returns `None` if the document doesn't need to be flattened. | ||||
|     #[tracing::instrument(level = "trace", skip(self, obkv), target = "indexing::transform")] | ||||
|     fn flatten_from_fields_ids_map(&mut self, obkv: KvReader<FieldId>) -> Result<Option<Vec<u8>>> { | ||||
|         if obkv | ||||
|             .iter() | ||||
| @@ -661,7 +662,7 @@ impl<'a, 'i> Transform<'a, 'i> { | ||||
|     /// Generate the `TransformOutput` based on the given sorter that can be generated from any | ||||
|     /// format like CSV, JSON or JSON stream. This sorter must contain a key that is the document | ||||
|     /// id for the user side and the value must be an obkv where keys are valid fields ids. | ||||
|     #[logging_timer::time] | ||||
|     #[tracing::instrument(level = "trace", skip_all, target = "indexing::transform")] | ||||
|     pub(crate) fn output_from_sorter<F>( | ||||
|         self, | ||||
|         wtxn: &mut heed::RwTxn, | ||||
|   | ||||
| @@ -115,6 +115,7 @@ impl TypedChunk { | ||||
|  | ||||
| /// Write typed chunk in the corresponding LMDB database of the provided index. | ||||
| /// Return new documents seen. | ||||
| #[tracing::instrument(level = "trace", skip_all, target = "indexing::write_db")] | ||||
| pub(crate) fn write_typed_chunk_into_index( | ||||
|     typed_chunk: TypedChunk, | ||||
|     index: &Index, | ||||
| @@ -126,6 +127,8 @@ pub(crate) fn write_typed_chunk_into_index( | ||||
|     let mut is_merged_database = false; | ||||
|     match typed_chunk { | ||||
|         TypedChunk::Documents(obkv_documents_iter) => { | ||||
|             let span = tracing::trace_span!(target: "indexing::write_db", "documents"); | ||||
|             let _entered = span.enter(); | ||||
|             let mut operations: Vec<DocumentOperation> = Default::default(); | ||||
|  | ||||
|             let mut docids = index.documents_ids(wtxn)?; | ||||
| @@ -172,6 +175,9 @@ pub(crate) fn write_typed_chunk_into_index( | ||||
|             index.put_documents_ids(wtxn, &docids)?; | ||||
|         } | ||||
|         TypedChunk::FieldIdWordCountDocids(fid_word_count_docids_iter) => { | ||||
|             let span = | ||||
|                 tracing::trace_span!(target: "indexing::write_db", "field_id_word_count_docids"); | ||||
|             let _entered = span.enter(); | ||||
|             append_entries_into_database( | ||||
|                 fid_word_count_docids_iter, | ||||
|                 &index.field_id_word_count_docids, | ||||
| @@ -187,6 +193,8 @@ pub(crate) fn write_typed_chunk_into_index( | ||||
|             exact_word_docids_reader, | ||||
|             word_fid_docids_reader, | ||||
|         } => { | ||||
|             let span = tracing::trace_span!(target: "indexing::write_db", "word_docids"); | ||||
|             let _entered = span.enter(); | ||||
|             let word_docids_iter = unsafe { as_cloneable_grenad(&word_docids_reader) }?; | ||||
|             append_entries_into_database( | ||||
|                 word_docids_iter.clone(), | ||||
| @@ -230,6 +238,8 @@ pub(crate) fn write_typed_chunk_into_index( | ||||
|             is_merged_database = true; | ||||
|         } | ||||
|         TypedChunk::WordPositionDocids(word_position_docids_iter) => { | ||||
|             let span = tracing::trace_span!(target: "indexing::write_db", "word_position_docids"); | ||||
|             let _entered = span.enter(); | ||||
|             append_entries_into_database( | ||||
|                 word_position_docids_iter, | ||||
|                 &index.word_position_docids, | ||||
| @@ -241,16 +251,25 @@ pub(crate) fn write_typed_chunk_into_index( | ||||
|             is_merged_database = true; | ||||
|         } | ||||
|         TypedChunk::FieldIdFacetNumberDocids(facet_id_number_docids_iter) => { | ||||
|             let span = | ||||
|                 tracing::trace_span!(target: "indexing::write_db","field_id_facet_number_docids"); | ||||
|             let _entered = span.enter(); | ||||
|             let indexer = FacetsUpdate::new(index, FacetType::Number, facet_id_number_docids_iter); | ||||
|             indexer.execute(wtxn)?; | ||||
|             is_merged_database = true; | ||||
|         } | ||||
|         TypedChunk::FieldIdFacetStringDocids(facet_id_string_docids_iter) => { | ||||
|             let span = | ||||
|                 tracing::trace_span!(target: "indexing::write_db", "field_id_facet_string_docids"); | ||||
|             let _entered = span.enter(); | ||||
|             let indexer = FacetsUpdate::new(index, FacetType::String, facet_id_string_docids_iter); | ||||
|             indexer.execute(wtxn)?; | ||||
|             is_merged_database = true; | ||||
|         } | ||||
|         TypedChunk::FieldIdFacetExistsDocids(facet_id_exists_docids) => { | ||||
|             let span = | ||||
|                 tracing::trace_span!(target: "indexing::write_db", "field_id_facet_exists_docids"); | ||||
|             let _entered = span.enter(); | ||||
|             append_entries_into_database( | ||||
|                 facet_id_exists_docids, | ||||
|                 &index.facet_id_exists_docids, | ||||
| @@ -262,6 +281,9 @@ pub(crate) fn write_typed_chunk_into_index( | ||||
|             is_merged_database = true; | ||||
|         } | ||||
|         TypedChunk::FieldIdFacetIsNullDocids(facet_id_is_null_docids) => { | ||||
|             let span = | ||||
|                 tracing::trace_span!(target: "indexing::write_db", "field_id_facet_is_null_docids"); | ||||
|             let _entered = span.enter(); | ||||
|             append_entries_into_database( | ||||
|                 facet_id_is_null_docids, | ||||
|                 &index.facet_id_is_null_docids, | ||||
| @@ -273,6 +295,8 @@ pub(crate) fn write_typed_chunk_into_index( | ||||
|             is_merged_database = true; | ||||
|         } | ||||
|         TypedChunk::FieldIdFacetIsEmptyDocids(facet_id_is_empty_docids) => { | ||||
|             let span = tracing::trace_span!(target: "profile::indexing::write_db", "field_id_facet_is_empty_docids"); | ||||
|             let _entered = span.enter(); | ||||
|             append_entries_into_database( | ||||
|                 facet_id_is_empty_docids, | ||||
|                 &index.facet_id_is_empty_docids, | ||||
| @@ -284,6 +308,9 @@ pub(crate) fn write_typed_chunk_into_index( | ||||
|             is_merged_database = true; | ||||
|         } | ||||
|         TypedChunk::WordPairProximityDocids(word_pair_proximity_docids_iter) => { | ||||
|             let span = | ||||
|                 tracing::trace_span!(target: "indexing::write_db", "word_pair_proximity_docids"); | ||||
|             let _entered = span.enter(); | ||||
|             append_entries_into_database( | ||||
|                 word_pair_proximity_docids_iter, | ||||
|                 &index.word_pair_proximity_docids, | ||||
| @@ -295,6 +322,9 @@ pub(crate) fn write_typed_chunk_into_index( | ||||
|             is_merged_database = true; | ||||
|         } | ||||
|         TypedChunk::FieldIdDocidFacetNumbers(fid_docid_facet_number) => { | ||||
|             let span = | ||||
|                 tracing::trace_span!(target: "indexing::write_db", "field_id_docid_facet_numbers"); | ||||
|             let _entered = span.enter(); | ||||
|             let index_fid_docid_facet_numbers = | ||||
|                 index.field_id_docid_facet_f64s.remap_types::<Bytes, Bytes>(); | ||||
|             let mut cursor = fid_docid_facet_number.into_cursor()?; | ||||
| @@ -315,6 +345,9 @@ pub(crate) fn write_typed_chunk_into_index( | ||||
|             } | ||||
|         } | ||||
|         TypedChunk::FieldIdDocidFacetStrings(fid_docid_facet_string) => { | ||||
|             let span = | ||||
|                 tracing::trace_span!(target: "indexing::write_db", "field_id_docid_facet_strings"); | ||||
|             let _entered = span.enter(); | ||||
|             let index_fid_docid_facet_strings = | ||||
|                 index.field_id_docid_facet_strings.remap_types::<Bytes, Bytes>(); | ||||
|             let mut cursor = fid_docid_facet_string.into_cursor()?; | ||||
| @@ -335,6 +368,8 @@ pub(crate) fn write_typed_chunk_into_index( | ||||
|             } | ||||
|         } | ||||
|         TypedChunk::GeoPoints(geo_points) => { | ||||
|             let span = tracing::trace_span!(target: "indexing::write_db", "geo_points"); | ||||
|             let _entered = span.enter(); | ||||
|             let mut rtree = index.geo_rtree(wtxn)?.unwrap_or_default(); | ||||
|             let mut geo_faceted_docids = index.geo_faceted_documents_ids(wtxn)?; | ||||
|  | ||||
| @@ -365,6 +400,8 @@ pub(crate) fn write_typed_chunk_into_index( | ||||
|             expected_dimension, | ||||
|             embedder_name, | ||||
|         } => { | ||||
|             let span = tracing::trace_span!(target: "indexing::write_db", "vector_points"); | ||||
|             let _entered = span.enter(); | ||||
|             let embedder_index = index.embedder_category_id.get(wtxn, &embedder_name)?.ok_or( | ||||
|                 InternalError::DatabaseMissingEntry { db_name: "embedder_category_id", key: None }, | ||||
|             )?; | ||||
| @@ -483,6 +520,8 @@ pub(crate) fn write_typed_chunk_into_index( | ||||
|             log::debug!("Finished vector chunk for {}", embedder_name); | ||||
|         } | ||||
|         TypedChunk::ScriptLanguageDocids(sl_map) => { | ||||
|             let span = tracing::trace_span!(target: "indexing::write_db", "script_language_docids"); | ||||
|             let _entered = span.enter(); | ||||
|             for (key, (deletion, addition)) in sl_map { | ||||
|                 let mut db_key_exists = false; | ||||
|                 let final_value = match index.script_language_docids.get(wtxn, &key)? { | ||||
| @@ -536,6 +575,7 @@ fn merge_word_docids_reader_into_fst( | ||||
|  | ||||
| /// Write provided entries in database using serialize_value function. | ||||
| /// merge_values function is used if an entry already exist in the database. | ||||
| #[tracing::instrument(skip_all, target = "indexing::write_db")] | ||||
| fn write_entries_into_database<R, K, V, FS, FM>( | ||||
|     data: grenad::Reader<R>, | ||||
|     database: &heed::Database<K, V>, | ||||
| @@ -582,6 +622,7 @@ where | ||||
| /// merge_values function is used if an entry already exist in the database. | ||||
| /// All provided entries must be ordered. | ||||
| /// If the index is not empty, write_entries_into_database is called instead. | ||||
| #[tracing::instrument(level = "trace", skip_all, target = "indexing::write_db")] | ||||
| fn append_entries_into_database<R, K, V, FS, FM>( | ||||
|     data: grenad::Reader<R>, | ||||
|     database: &heed::Database<K, V>, | ||||
|   | ||||
| @@ -372,6 +372,11 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> { | ||||
|         self.embedder_settings = Setting::Reset; | ||||
|     } | ||||
|  | ||||
|     #[tracing::instrument( | ||||
|         level = "trace" | ||||
|         skip(self, progress_callback, should_abort, old_fields_ids_map), | ||||
|         target = "indexing::documents" | ||||
|     )] | ||||
|     fn reindex<FP, FA>( | ||||
|         &mut self, | ||||
|         progress_callback: &FP, | ||||
|   | ||||
| @@ -39,7 +39,12 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> { | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     #[logging_timer::time("WordPrefixDocids::{}")] | ||||
|     #[tracing::instrument( | ||||
|         level = "trace", | ||||
|         skip_all, | ||||
|         target = "indexing::prefix", | ||||
|         name = "word_prefix_docids" | ||||
|     )] | ||||
|     pub fn execute( | ||||
|         self, | ||||
|         mut new_word_docids_iter: grenad::ReaderCursor<CursorClonableMmap>, | ||||
|   | ||||
| @@ -44,7 +44,12 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> { | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     #[logging_timer::time("WordPrefixIntegerDocids::{}")] | ||||
|     #[tracing::instrument( | ||||
|         level = "trace", | ||||
|         skip_all, | ||||
|         target = "indexing::prefix", | ||||
|         name = "words_prefix_integer_docids" | ||||
|     )] | ||||
|     pub fn execute( | ||||
|         self, | ||||
|         new_word_integer_docids: grenad::Reader<CursorClonableMmap>, | ||||
|   | ||||
| @@ -38,7 +38,12 @@ impl<'t, 'i> WordsPrefixesFst<'t, 'i> { | ||||
|         self | ||||
|     } | ||||
|  | ||||
|     #[logging_timer::time("WordsPrefixesFst::{}")] | ||||
|     #[tracing::instrument( | ||||
|         level = "trace", | ||||
|         skip_all, | ||||
|         target = "indexing::prefix", | ||||
|         name = "words_prefix_fst" | ||||
|     )] | ||||
|     pub fn execute(self) -> Result<()> { | ||||
|         puffin::profile_function!(); | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user