mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-26 05:26:27 +00:00 
			
		
		
		
	Implement Incremental document database stats computing
This commit is contained in:
		| @@ -142,7 +142,7 @@ impl IndexStats { | |||||||
|         Ok(IndexStats { |         Ok(IndexStats { | ||||||
|             number_of_embeddings: Some(arroy_stats.number_of_embeddings), |             number_of_embeddings: Some(arroy_stats.number_of_embeddings), | ||||||
|             number_of_embedded_documents: Some(arroy_stats.documents.len()), |             number_of_embedded_documents: Some(arroy_stats.documents.len()), | ||||||
|             documents_database_stats: index.documents_database_stats(rtxn)?, |             documents_database_stats: index.documents_stats(rtxn)?.unwrap_or_default(), | ||||||
|             database_size: index.on_disk_size()?, |             database_size: index.on_disk_size()?, | ||||||
|             used_database_size: index.used_size()?, |             used_database_size: index.used_size()?, | ||||||
|             primary_key: index.primary_key(rtxn)?.map(|s| s.to_string()), |             primary_key: index.primary_key(rtxn)?.map(|s| s.to_string()), | ||||||
|   | |||||||
| @@ -3,8 +3,6 @@ use heed::Database; | |||||||
| use heed::RoTxn; | use heed::RoTxn; | ||||||
| use serde::{Deserialize, Serialize}; | use serde::{Deserialize, Serialize}; | ||||||
|  |  | ||||||
| use crate::Result; |  | ||||||
|  |  | ||||||
| #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)] | #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)] | ||||||
| #[serde(rename_all = "camelCase")] | #[serde(rename_all = "camelCase")] | ||||||
| /// The stats of a database. | /// The stats of a database. | ||||||
| @@ -15,14 +13,6 @@ pub struct DatabaseStats { | |||||||
|     total_key_size: u64, |     total_key_size: u64, | ||||||
|     /// The total size of the values in the database. |     /// The total size of the values in the database. | ||||||
|     total_value_size: u64, |     total_value_size: u64, | ||||||
|     /// The maximum size of a key in the database. |  | ||||||
|     max_key_size: u64, |  | ||||||
|     /// The maximum size of a value in the database. |  | ||||||
|     max_value_size: u64, |  | ||||||
|     /// The minimum size of a key in the database. |  | ||||||
|     min_key_size: u64, |  | ||||||
|     /// The minimum size of a value in the database. |  | ||||||
|     min_value_size: u64, |  | ||||||
| } | } | ||||||
|  |  | ||||||
| impl DatabaseStats { | impl DatabaseStats { | ||||||
| @@ -30,38 +20,60 @@ impl DatabaseStats { | |||||||
|     /// |     /// | ||||||
|     /// This function iterates over the whole database and computes the stats. |     /// This function iterates over the whole database and computes the stats. | ||||||
|     /// It is not efficient and should be cached somewhere. |     /// It is not efficient and should be cached somewhere. | ||||||
|     pub(crate) fn new(database: Database<Bytes, Bytes>, rtxn: &RoTxn<'_>) -> Result<Self> { |     pub(crate) fn new(database: Database<Bytes, Bytes>, rtxn: &RoTxn<'_>) -> heed::Result<Self> { | ||||||
|         let mut database_stats = Self { |         let mut database_stats = | ||||||
|             number_of_entries: 0, |             Self { number_of_entries: 0, total_key_size: 0, total_value_size: 0 }; | ||||||
|             total_key_size: 0, |  | ||||||
|             total_value_size: 0, |  | ||||||
|             max_key_size: 0, |  | ||||||
|             max_value_size: 0, |  | ||||||
|             min_key_size: u64::MAX, |  | ||||||
|             min_value_size: u64::MAX, |  | ||||||
|         }; |  | ||||||
|  |  | ||||||
|         let mut iter = database.iter(rtxn)?; |         let mut iter = database.iter(rtxn)?; | ||||||
|         while let Some((key, value)) = iter.next().transpose()? { |         while let Some((key, value)) = iter.next().transpose()? { | ||||||
|             let key_size = key.len() as u64; |             let key_size = key.len() as u64; | ||||||
|             let value_size = value.len() as u64; |             let value_size = value.len() as u64; | ||||||
|             database_stats.number_of_entries += 1; |  | ||||||
|             database_stats.total_key_size += key_size; |             database_stats.total_key_size += key_size; | ||||||
|             database_stats.total_value_size += value_size; |             database_stats.total_value_size += value_size; | ||||||
|             database_stats.max_key_size = database_stats.max_key_size.max(key_size); |  | ||||||
|             database_stats.max_value_size = database_stats.max_value_size.max(value_size); |  | ||||||
|             database_stats.min_key_size = database_stats.min_key_size.min(key_size); |  | ||||||
|             database_stats.min_value_size = database_stats.min_value_size.min(value_size); |  | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         if database_stats.number_of_entries == 0 { |         database_stats.number_of_entries = database.len(rtxn)?; | ||||||
|             database_stats.min_key_size = 0; |  | ||||||
|             database_stats.min_value_size = 0; |  | ||||||
|         } |  | ||||||
|  |  | ||||||
|         Ok(database_stats) |         Ok(database_stats) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     /// Recomputes the stats of the database and returns the new stats. | ||||||
|  |     /// | ||||||
|  |     /// This function is used to update the stats of the database when some keys are modified. | ||||||
|  |     /// It is more efficient than the `new` function because it does not iterate over the whole database but only the modified keys comparing the before and after states. | ||||||
|  |     pub(crate) fn recompute<'a, I, K>( | ||||||
|  |         mut stats: Self, | ||||||
|  |         database: Database<Bytes, Bytes>, | ||||||
|  |         before_rtxn: &RoTxn<'_>, | ||||||
|  |         after_rtxn: &RoTxn<'_>, | ||||||
|  |         modified_keys: I, | ||||||
|  |     ) -> heed::Result<Self> | ||||||
|  |     where | ||||||
|  |         I: IntoIterator<Item = K>, | ||||||
|  |         K: AsRef<[u8]>, | ||||||
|  |     { | ||||||
|  |         for key in modified_keys { | ||||||
|  |             let key = key.as_ref(); | ||||||
|  |             if let Some(value) = database.get(after_rtxn, key)? { | ||||||
|  |                 let key_size = key.len() as u64; | ||||||
|  |                 let value_size = value.len() as u64; | ||||||
|  |                 stats.total_key_size = stats.total_key_size.saturating_add(key_size); | ||||||
|  |                 stats.total_value_size = stats.total_value_size.saturating_add(value_size); | ||||||
|  |             } | ||||||
|  |  | ||||||
|  |             if let Some(value) = database.get(before_rtxn, key)? { | ||||||
|  |                 let key_size = key.len() as u64; | ||||||
|  |                 let value_size = value.len() as u64; | ||||||
|  |                 stats.total_key_size = stats.total_key_size.saturating_sub(key_size); | ||||||
|  |                 stats.total_value_size = stats.total_value_size.saturating_sub(value_size); | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         stats.number_of_entries = database.len(after_rtxn)?; | ||||||
|  |  | ||||||
|  |         Ok(stats) | ||||||
|  |     } | ||||||
|  |  | ||||||
|     pub fn average_key_size(&self) -> u64 { |     pub fn average_key_size(&self) -> u64 { | ||||||
|         self.total_key_size.checked_div(self.number_of_entries).unwrap_or(0) |         self.total_key_size.checked_div(self.number_of_entries).unwrap_or(0) | ||||||
|     } |     } | ||||||
| @@ -81,20 +93,4 @@ impl DatabaseStats { | |||||||
|     pub fn total_value_size(&self) -> u64 { |     pub fn total_value_size(&self) -> u64 { | ||||||
|         self.total_value_size |         self.total_value_size | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub fn max_key_size(&self) -> u64 { |  | ||||||
|         self.max_key_size |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     pub fn max_value_size(&self) -> u64 { |  | ||||||
|         self.max_value_size |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     pub fn min_key_size(&self) -> u64 { |  | ||||||
|         self.min_key_size |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     pub fn min_value_size(&self) -> u64 { |  | ||||||
|         self.min_value_size |  | ||||||
|     } |  | ||||||
| } | } | ||||||
|   | |||||||
| @@ -75,6 +75,7 @@ pub mod main_key { | |||||||
|     pub const LOCALIZED_ATTRIBUTES_RULES: &str = "localized_attributes_rules"; |     pub const LOCALIZED_ATTRIBUTES_RULES: &str = "localized_attributes_rules"; | ||||||
|     pub const FACET_SEARCH: &str = "facet_search"; |     pub const FACET_SEARCH: &str = "facet_search"; | ||||||
|     pub const PREFIX_SEARCH: &str = "prefix_search"; |     pub const PREFIX_SEARCH: &str = "prefix_search"; | ||||||
|  |     pub const DOCUMENTS_STATS: &str = "documents_stats"; | ||||||
| } | } | ||||||
|  |  | ||||||
| pub mod db_name { | pub mod db_name { | ||||||
| @@ -404,9 +405,58 @@ impl Index { | |||||||
|         Ok(count.unwrap_or_default()) |         Ok(count.unwrap_or_default()) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     /// Returns the stats of the database. |     /// Updates the stats of the documents database based on the previous stats and the modified docids. | ||||||
|     pub fn documents_database_stats(&self, rtxn: &RoTxn<'_>) -> Result<DatabaseStats> { |     pub fn update_documents_stats( | ||||||
|         DatabaseStats::new(self.documents.remap_types::<Bytes, Bytes>(), rtxn) |         &self, | ||||||
|  |         wtxn: &mut RwTxn<'_>, | ||||||
|  |         modified_docids: roaring::RoaringBitmap, | ||||||
|  |     ) -> Result<()> { | ||||||
|  |         let before_rtxn = self.read_txn()?; | ||||||
|  |         let document_stats = match self.documents_stats(&before_rtxn)? { | ||||||
|  |             Some(before_stats) => DatabaseStats::recompute( | ||||||
|  |                 before_stats, | ||||||
|  |                 self.documents.remap_types(), | ||||||
|  |                 &before_rtxn, | ||||||
|  |                 wtxn, | ||||||
|  |                 modified_docids.iter().map(|docid| docid.to_be_bytes()), | ||||||
|  |             )?, | ||||||
|  |             None => { | ||||||
|  |                 // This should never happen when there are already documents in the index, the documents stats should be present. | ||||||
|  |                 // If it happens, it means that the index was not properly initialized/upgraded. | ||||||
|  |                 debug_assert_eq!( | ||||||
|  |                     self.documents.len(&before_rtxn)?, | ||||||
|  |                     0, | ||||||
|  |                     "The documents stats should be present when there are documents in the index" | ||||||
|  |                 ); | ||||||
|  |                 tracing::warn!("No documents stats found, creating new ones"); | ||||||
|  |                 DatabaseStats::new(self.documents.remap_types(), &*wtxn)? | ||||||
|  |             } | ||||||
|  |         }; | ||||||
|  |  | ||||||
|  |         self.put_documents_stats(wtxn, document_stats)?; | ||||||
|  |         Ok(()) | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     /// Writes the stats of the documents database. | ||||||
|  |     pub fn put_documents_stats( | ||||||
|  |         &self, | ||||||
|  |         wtxn: &mut RwTxn<'_>, | ||||||
|  |         stats: DatabaseStats, | ||||||
|  |     ) -> heed::Result<()> { | ||||||
|  |         eprintln!("putting documents stats: {:?}", stats); | ||||||
|  |         self.main.remap_types::<Str, SerdeJson<DatabaseStats>>().put( | ||||||
|  |             wtxn, | ||||||
|  |             main_key::DOCUMENTS_STATS, | ||||||
|  |             &stats, | ||||||
|  |         ) | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     /// Returns the stats of the documents database. | ||||||
|  |     pub fn documents_stats(&self, rtxn: &RoTxn<'_>) -> heed::Result<Option<DatabaseStats>> { | ||||||
|  |         dbg!(self | ||||||
|  |             .main | ||||||
|  |             .remap_types::<Str, SerdeJson<DatabaseStats>>() | ||||||
|  |             .get(rtxn, main_key::DOCUMENTS_STATS)) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     /* primary key */ |     /* primary key */ | ||||||
|   | |||||||
| @@ -307,6 +307,7 @@ where | |||||||
|         let current_span = tracing::Span::current(); |         let current_span = tracing::Span::current(); | ||||||
|  |  | ||||||
|         // Run extraction pipeline in parallel. |         // Run extraction pipeline in parallel. | ||||||
|  |         let mut modified_docids = RoaringBitmap::new(); | ||||||
|         pool.install(|| { |         pool.install(|| { | ||||||
|                 let settings_diff_cloned = settings_diff.clone(); |                 let settings_diff_cloned = settings_diff.clone(); | ||||||
|                 rayon::spawn(move || { |                 rayon::spawn(move || { | ||||||
| @@ -367,7 +368,7 @@ where | |||||||
|                         Err(status) => { |                         Err(status) => { | ||||||
|                             if let Some(typed_chunks) = chunk_accumulator.pop_longest() { |                             if let Some(typed_chunks) = chunk_accumulator.pop_longest() { | ||||||
|                                 let (docids, is_merged_database) = |                                 let (docids, is_merged_database) = | ||||||
|                                     write_typed_chunk_into_index(self.wtxn, self.index, &settings_diff, typed_chunks)?; |                                     write_typed_chunk_into_index(self.wtxn, self.index, &settings_diff, typed_chunks, &mut modified_docids)?; | ||||||
|                                 if !docids.is_empty() { |                                 if !docids.is_empty() { | ||||||
|                                     final_documents_ids |= docids; |                                     final_documents_ids |= docids; | ||||||
|                                     let documents_seen_count = final_documents_ids.len(); |                                     let documents_seen_count = final_documents_ids.len(); | ||||||
| @@ -467,6 +468,10 @@ where | |||||||
|                 Ok(()) |                 Ok(()) | ||||||
|             }).map_err(InternalError::from)??; |             }).map_err(InternalError::from)??; | ||||||
|  |  | ||||||
|  |         if !settings_diff.settings_update_only { | ||||||
|  |             // Update the stats of the documents database when there is a document update. | ||||||
|  |             self.index.update_documents_stats(self.wtxn, modified_docids)?; | ||||||
|  |         } | ||||||
|         // We write the field distribution into the main database |         // We write the field distribution into the main database | ||||||
|         self.index.put_field_distribution(self.wtxn, &field_distribution)?; |         self.index.put_field_distribution(self.wtxn, &field_distribution)?; | ||||||
|  |  | ||||||
|   | |||||||
| @@ -129,6 +129,7 @@ pub(crate) fn write_typed_chunk_into_index( | |||||||
|     index: &Index, |     index: &Index, | ||||||
|     settings_diff: &InnerIndexSettingsDiff, |     settings_diff: &InnerIndexSettingsDiff, | ||||||
|     typed_chunks: Vec<TypedChunk>, |     typed_chunks: Vec<TypedChunk>, | ||||||
|  |     modified_docids: &mut RoaringBitmap, | ||||||
| ) -> Result<(RoaringBitmap, bool)> { | ) -> Result<(RoaringBitmap, bool)> { | ||||||
|     let mut is_merged_database = false; |     let mut is_merged_database = false; | ||||||
|     match typed_chunks[0] { |     match typed_chunks[0] { | ||||||
| @@ -214,6 +215,7 @@ pub(crate) fn write_typed_chunk_into_index( | |||||||
|                         kind: DocumentOperationKind::Create, |                         kind: DocumentOperationKind::Create, | ||||||
|                     }); |                     }); | ||||||
|                     docids.insert(docid); |                     docids.insert(docid); | ||||||
|  |                     modified_docids.insert(docid); | ||||||
|                 } else { |                 } else { | ||||||
|                     db.delete(wtxn, &docid)?; |                     db.delete(wtxn, &docid)?; | ||||||
|                     operations.push(DocumentOperation { |                     operations.push(DocumentOperation { | ||||||
| @@ -222,6 +224,7 @@ pub(crate) fn write_typed_chunk_into_index( | |||||||
|                         kind: DocumentOperationKind::Delete, |                         kind: DocumentOperationKind::Delete, | ||||||
|                     }); |                     }); | ||||||
|                     docids.remove(docid); |                     docids.remove(docid); | ||||||
|  |                     modified_docids.insert(docid); | ||||||
|                 } |                 } | ||||||
|             } |             } | ||||||
|             let external_documents_docids = index.external_documents_ids(); |             let external_documents_docids = index.external_documents_ids(); | ||||||
|   | |||||||
| @@ -711,15 +711,17 @@ impl DelAddRoaringBitmap { | |||||||
|         DelAddRoaringBitmap { del, add } |         DelAddRoaringBitmap { del, add } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub fn apply_to(&self, documents_ids: &mut RoaringBitmap) { |     pub fn apply_to(&self, documents_ids: &mut RoaringBitmap, modified_docids: &mut RoaringBitmap) { | ||||||
|         let DelAddRoaringBitmap { del, add } = self; |         let DelAddRoaringBitmap { del, add } = self; | ||||||
|  |  | ||||||
|         if let Some(del) = del { |         if let Some(del) = del { | ||||||
|             *documents_ids -= del; |             *documents_ids -= del; | ||||||
|  |             *modified_docids |= del; | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         if let Some(add) = add { |         if let Some(add) = add { | ||||||
|             *documents_ids |= add; |             *documents_ids |= add; | ||||||
|  |             *modified_docids |= add; | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| } | } | ||||||
|   | |||||||
| @@ -32,6 +32,7 @@ pub(super) fn extract_all<'pl, 'extractor, DC, MSP>( | |||||||
|     field_distribution: &mut BTreeMap<String, u64>, |     field_distribution: &mut BTreeMap<String, u64>, | ||||||
|     mut index_embeddings: Vec<IndexEmbeddingConfig>, |     mut index_embeddings: Vec<IndexEmbeddingConfig>, | ||||||
|     document_ids: &mut RoaringBitmap, |     document_ids: &mut RoaringBitmap, | ||||||
|  |     modified_docids: &mut RoaringBitmap, | ||||||
| ) -> Result<(FacetFieldIdsDelta, Vec<IndexEmbeddingConfig>)> | ) -> Result<(FacetFieldIdsDelta, Vec<IndexEmbeddingConfig>)> | ||||||
| where | where | ||||||
|     DC: DocumentChanges<'pl>, |     DC: DocumentChanges<'pl>, | ||||||
| @@ -70,7 +71,7 @@ where | |||||||
|                 // adding the delta should never cause a negative result, as we are removing fields that previously existed. |                 // adding the delta should never cause a negative result, as we are removing fields that previously existed. | ||||||
|                 *current = current.saturating_add_signed(delta); |                 *current = current.saturating_add_signed(delta); | ||||||
|             } |             } | ||||||
|             document_extractor_data.docids_delta.apply_to(document_ids); |             document_extractor_data.docids_delta.apply_to(document_ids, modified_docids); | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         field_distribution.retain(|_, v| *v != 0); |         field_distribution.retain(|_, v| *v != 0); | ||||||
| @@ -256,7 +257,7 @@ where | |||||||
|                     let Some(deladd) = data.remove(&config.name) else { |                     let Some(deladd) = data.remove(&config.name) else { | ||||||
|                         continue 'data; |                         continue 'data; | ||||||
|                     }; |                     }; | ||||||
|                     deladd.apply_to(&mut config.user_provided); |                     deladd.apply_to(&mut config.user_provided, modified_docids); | ||||||
|                 } |                 } | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
|   | |||||||
| @@ -130,6 +130,7 @@ where | |||||||
|     let index_embeddings = index.embedding_configs(wtxn)?; |     let index_embeddings = index.embedding_configs(wtxn)?; | ||||||
|     let mut field_distribution = index.field_distribution(wtxn)?; |     let mut field_distribution = index.field_distribution(wtxn)?; | ||||||
|     let mut document_ids = index.documents_ids(wtxn)?; |     let mut document_ids = index.documents_ids(wtxn)?; | ||||||
|  |     let mut modified_docids = roaring::RoaringBitmap::new(); | ||||||
|  |  | ||||||
|     let congestion = thread::scope(|s| -> Result<ChannelCongestion> { |     let congestion = thread::scope(|s| -> Result<ChannelCongestion> { | ||||||
|         let indexer_span = tracing::Span::current(); |         let indexer_span = tracing::Span::current(); | ||||||
| @@ -138,6 +139,7 @@ where | |||||||
|         // prevent moving the field_distribution and document_ids in the inner closure... |         // prevent moving the field_distribution and document_ids in the inner closure... | ||||||
|         let field_distribution = &mut field_distribution; |         let field_distribution = &mut field_distribution; | ||||||
|         let document_ids = &mut document_ids; |         let document_ids = &mut document_ids; | ||||||
|  |         let modified_docids = &mut modified_docids; | ||||||
|         let extractor_handle = |         let extractor_handle = | ||||||
|             Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || { |             Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || { | ||||||
|                 pool.install(move || { |                 pool.install(move || { | ||||||
| @@ -152,6 +154,7 @@ where | |||||||
|                         field_distribution, |                         field_distribution, | ||||||
|                         index_embeddings, |                         index_embeddings, | ||||||
|                         document_ids, |                         document_ids, | ||||||
|  |                         modified_docids, | ||||||
|                     ) |                     ) | ||||||
|                 }) |                 }) | ||||||
|                 .unwrap() |                 .unwrap() | ||||||
| @@ -227,6 +230,7 @@ where | |||||||
|         embedders, |         embedders, | ||||||
|         field_distribution, |         field_distribution, | ||||||
|         document_ids, |         document_ids, | ||||||
|  |         modified_docids, | ||||||
|     )?; |     )?; | ||||||
|  |  | ||||||
|     Ok(congestion) |     Ok(congestion) | ||||||
|   | |||||||
| @@ -129,6 +129,7 @@ pub fn update_index( | |||||||
|     embedders: EmbeddingConfigs, |     embedders: EmbeddingConfigs, | ||||||
|     field_distribution: std::collections::BTreeMap<String, u64>, |     field_distribution: std::collections::BTreeMap<String, u64>, | ||||||
|     document_ids: roaring::RoaringBitmap, |     document_ids: roaring::RoaringBitmap, | ||||||
|  |     modified_docids: roaring::RoaringBitmap, | ||||||
| ) -> Result<()> { | ) -> Result<()> { | ||||||
|     index.put_fields_ids_map(wtxn, new_fields_ids_map.as_fields_ids_map())?; |     index.put_fields_ids_map(wtxn, new_fields_ids_map.as_fields_ids_map())?; | ||||||
|     if let Some(new_primary_key) = new_primary_key { |     if let Some(new_primary_key) = new_primary_key { | ||||||
| @@ -140,6 +141,7 @@ pub fn update_index( | |||||||
|     index.put_field_distribution(wtxn, &field_distribution)?; |     index.put_field_distribution(wtxn, &field_distribution)?; | ||||||
|     index.put_documents_ids(wtxn, &document_ids)?; |     index.put_documents_ids(wtxn, &document_ids)?; | ||||||
|     index.set_updated_at(wtxn, &OffsetDateTime::now_utc())?; |     index.set_updated_at(wtxn, &OffsetDateTime::now_utc())?; | ||||||
|  |     index.update_documents_stats(wtxn, modified_docids)?; | ||||||
|     Ok(()) |     Ok(()) | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user