mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-30 23:46:28 +00:00 
			
		
		
		
	Fix all the benchmark compilation errors
This commit is contained in:
		| @@ -1907,7 +1907,7 @@ pub(crate) mod tests { | ||||
|             { "id": 2, "name": "bob", "age": 20 }, | ||||
|             { "id": 2, "name": "bob", "age": 20 }, | ||||
|         ]); | ||||
|         indexer.add_documents(&payload); | ||||
|         indexer.add_documents(&payload).unwrap(); | ||||
|  | ||||
|         let indexer_alloc = Bump::new(); | ||||
|         let (document_changes, _operation_stats, primary_key) = indexer | ||||
|   | ||||
| @@ -493,7 +493,7 @@ mod tests { | ||||
|         } | ||||
|  | ||||
|         let documents = mmap_from_objects(documents); | ||||
|         index.add_documents(documents); | ||||
|         index.add_documents(documents).unwrap(); | ||||
|  | ||||
|         db_snap!(index, facet_id_f64_docids, "initial", @"c34f499261f3510d862fa0283bbe843a"); | ||||
|     } | ||||
|   | ||||
| @@ -5,11 +5,13 @@ mod transform; | ||||
| mod typed_chunk; | ||||
|  | ||||
| use std::collections::HashSet; | ||||
| use std::io::{Read, Seek}; | ||||
| use std::iter; | ||||
| use std::num::NonZeroU32; | ||||
| use std::sync::Arc; | ||||
|  | ||||
| use crossbeam_channel::{Receiver, Sender}; | ||||
| use enrich::enrich_documents_batch; | ||||
| use grenad::{Merger, MergerBuilder}; | ||||
| use hashbrown::HashMap; | ||||
| use heed::types::Str; | ||||
| @@ -24,7 +26,8 @@ use typed_chunk::{write_typed_chunk_into_index, ChunkAccumulator, TypedChunk}; | ||||
| pub use self::enrich::{extract_finite_float_from_value, DocumentId}; | ||||
| pub use self::helpers::*; | ||||
| pub use self::transform::{Transform, TransformOutput}; | ||||
| use crate::documents::obkv_to_object; | ||||
| use super::new::StdResult; | ||||
| use crate::documents::{obkv_to_object, DocumentsBatchReader}; | ||||
| use crate::error::{Error, InternalError}; | ||||
| use crate::thread_pool_no_abort::ThreadPoolNoAbortBuilder; | ||||
| pub use crate::update::index_documents::helpers::CursorClonableMmap; | ||||
| @@ -32,7 +35,7 @@ use crate::update::{ | ||||
|     IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst, | ||||
| }; | ||||
| use crate::vector::{ArroyWrapper, EmbeddingConfigs}; | ||||
| use crate::{CboRoaringBitmapCodec, Index, Result}; | ||||
| use crate::{CboRoaringBitmapCodec, Index, Result, UserError}; | ||||
|  | ||||
| static MERGED_DATABASE_COUNT: usize = 7; | ||||
| static PREFIX_DATABASE_COUNT: usize = 4; | ||||
| @@ -122,11 +125,76 @@ where | ||||
|         }) | ||||
|     } | ||||
|  | ||||
|     /// Adds a batch of documents to the current builder. | ||||
|     /// | ||||
|     /// Since the documents are progressively added to the writer, a failure will cause only | ||||
|     /// return an error and not the `IndexDocuments` struct as it is invalid to use it afterward. | ||||
|     /// | ||||
|     /// Returns the number of documents added to the builder. | ||||
|     #[tracing::instrument(level = "trace", skip_all, target = "indexing::documents")] | ||||
|     pub fn add_documents<R: Read + Seek>( | ||||
|         mut self, | ||||
|         reader: DocumentsBatchReader<R>, | ||||
|     ) -> Result<(Self, StdResult<u64, UserError>)> { | ||||
|         // Early return when there is no document to add | ||||
|         if reader.is_empty() { | ||||
|             return Ok((self, Ok(0))); | ||||
|         } | ||||
|  | ||||
|         // We check for user errors in this validator and if there is one, we can return | ||||
|         // the `IndexDocument` struct as it is valid to send more documents into it. | ||||
|         // However, if there is an internal error we throw it away! | ||||
|         let enriched_documents_reader = match enrich_documents_batch( | ||||
|             self.wtxn, | ||||
|             self.index, | ||||
|             self.config.autogenerate_docids, | ||||
|             reader, | ||||
|         )? { | ||||
|             Ok(reader) => reader, | ||||
|             Err(user_error) => return Ok((self, Err(user_error))), | ||||
|         }; | ||||
|  | ||||
|         let indexed_documents = | ||||
|             self.transform.as_mut().expect("Invalid document addition state").read_documents( | ||||
|                 enriched_documents_reader, | ||||
|                 self.wtxn, | ||||
|                 &self.progress, | ||||
|                 &self.should_abort, | ||||
|             )? as u64; | ||||
|  | ||||
|         self.added_documents += indexed_documents; | ||||
|  | ||||
|         Ok((self, Ok(indexed_documents))) | ||||
|     } | ||||
|  | ||||
|     pub fn with_embedders(mut self, embedders: EmbeddingConfigs) -> Self { | ||||
|         self.embedders = embedders; | ||||
|         self | ||||
|     } | ||||
|  | ||||
|     #[tracing::instrument( | ||||
|         level = "trace" | ||||
|         skip_all, | ||||
|         target = "indexing::documents", | ||||
|         name = "index_documents" | ||||
|     )] | ||||
|     pub fn execute(mut self) -> Result<DocumentAdditionResult> { | ||||
|         if self.added_documents == 0 && self.deleted_documents == 0 { | ||||
|             let number_of_documents = self.index.number_of_documents(self.wtxn)?; | ||||
|             return Ok(DocumentAdditionResult { indexed_documents: 0, number_of_documents }); | ||||
|         } | ||||
|         let output = self | ||||
|             .transform | ||||
|             .take() | ||||
|             .expect("Invalid document addition state") | ||||
|             .output_from_sorter(self.wtxn, &self.progress)?; | ||||
|  | ||||
|         let indexed_documents = output.documents_count as u64; | ||||
|         let number_of_documents = self.execute_raw(output)?; | ||||
|  | ||||
|         Ok(DocumentAdditionResult { indexed_documents, number_of_documents }) | ||||
|     } | ||||
|  | ||||
|     /// Returns the total number of documents in the index after the update. | ||||
|     #[tracing::instrument( | ||||
|         level = "trace", | ||||
| @@ -678,7 +746,7 @@ mod tests { | ||||
|     use maplit::hashset; | ||||
|  | ||||
|     use super::*; | ||||
|     use crate::documents::{documents_batch_reader_from_objects, mmap_from_objects}; | ||||
|     use crate::documents::mmap_from_objects; | ||||
|     use crate::index::tests::TempIndex; | ||||
|     use crate::index::IndexEmbeddingConfig; | ||||
|     use crate::search::TermsMatchingStrategy; | ||||
| @@ -2119,16 +2187,7 @@ mod tests { | ||||
|         index.index_documents_config.update_method = IndexDocumentsMethod::UpdateDocuments; | ||||
|  | ||||
|         let mut wtxn = index.write_txn().unwrap(); | ||||
|         let local_pool; | ||||
|         let indexer_config = &index.indexer_config; | ||||
|         let pool = match &indexer_config.thread_pool { | ||||
|             Some(pool) => pool, | ||||
|             None => { | ||||
|                 local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); | ||||
|                 &local_pool | ||||
|             } | ||||
|         }; | ||||
|  | ||||
|         let rtxn = index.inner.read_txn().unwrap(); | ||||
|         let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); | ||||
|         let mut new_fields_ids_map = db_fields_ids_map.clone(); | ||||
| @@ -2142,27 +2201,24 @@ mod tests { | ||||
|         let indexer_alloc = Bump::new(); | ||||
|         let embedders = EmbeddingConfigs::default(); | ||||
|         let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); | ||||
|         indexer.add_documents(&documents); | ||||
|         indexer.add_documents(&documents).unwrap(); | ||||
|         indexer.delete_documents(&["2"]); | ||||
|         let (document_changes, _operation_stats, primary_key) = indexer | ||||
|             .into_changes(&indexer_alloc, &index.inner, &rtxn, None, &mut new_fields_ids_map) | ||||
|             .unwrap(); | ||||
|  | ||||
|         pool.install(|| { | ||||
|             indexer::index( | ||||
|                 &mut wtxn, | ||||
|                 &index.inner, | ||||
|                 indexer_config.grenad_parameters(), | ||||
|                 &db_fields_ids_map, | ||||
|                 new_fields_ids_map, | ||||
|                 primary_key, | ||||
|                 &document_changes, | ||||
|                 embedders, | ||||
|                 &|| false, | ||||
|                 &|_| (), | ||||
|             ) | ||||
|         }) | ||||
|         .unwrap() | ||||
|         indexer::index( | ||||
|             &mut wtxn, | ||||
|             &index.inner, | ||||
|             indexer_config.grenad_parameters(), | ||||
|             &db_fields_ids_map, | ||||
|             new_fields_ids_map, | ||||
|             primary_key, | ||||
|             &document_changes, | ||||
|             embedders, | ||||
|             &|| false, | ||||
|             &|_| (), | ||||
|         ) | ||||
|         .unwrap(); | ||||
|         wtxn.commit().unwrap(); | ||||
|  | ||||
| @@ -2178,16 +2234,7 @@ mod tests { | ||||
|         index.index_documents_config.update_method = IndexDocumentsMethod::UpdateDocuments; | ||||
|  | ||||
|         let mut wtxn = index.write_txn().unwrap(); | ||||
|         let local_pool; | ||||
|         let indexer_config = &index.indexer_config; | ||||
|         let pool = match &indexer_config.thread_pool { | ||||
|             Some(pool) => pool, | ||||
|             None => { | ||||
|                 local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); | ||||
|                 &local_pool | ||||
|             } | ||||
|         }; | ||||
|  | ||||
|         let rtxn = index.inner.read_txn().unwrap(); | ||||
|         let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); | ||||
|         let mut new_fields_ids_map = db_fields_ids_map.clone(); | ||||
| @@ -2213,21 +2260,18 @@ mod tests { | ||||
|             .into_changes(&indexer_alloc, &index.inner, &rtxn, None, &mut new_fields_ids_map) | ||||
|             .unwrap(); | ||||
|  | ||||
|         pool.install(|| { | ||||
|             indexer::index( | ||||
|                 &mut wtxn, | ||||
|                 &index.inner, | ||||
|                 indexer_config.grenad_parameters(), | ||||
|                 &db_fields_ids_map, | ||||
|                 new_fields_ids_map, | ||||
|                 primary_key, | ||||
|                 &document_changes, | ||||
|                 embedders, | ||||
|                 &|| false, | ||||
|                 &|_| (), | ||||
|             ) | ||||
|         }) | ||||
|         .unwrap() | ||||
|         indexer::index( | ||||
|             &mut wtxn, | ||||
|             &index.inner, | ||||
|             indexer_config.grenad_parameters(), | ||||
|             &db_fields_ids_map, | ||||
|             new_fields_ids_map, | ||||
|             primary_key, | ||||
|             &document_changes, | ||||
|             embedders, | ||||
|             &|| false, | ||||
|             &|_| (), | ||||
|         ) | ||||
|         .unwrap(); | ||||
|         wtxn.commit().unwrap(); | ||||
|  | ||||
| @@ -2238,19 +2282,10 @@ mod tests { | ||||
|  | ||||
|     #[test] | ||||
|     fn add_document_and_in_another_transform_update_and_delete_documents() { | ||||
|         let mut index = TempIndex::new(); | ||||
|         let index = TempIndex::new(); | ||||
|  | ||||
|         let mut wtxn = index.write_txn().unwrap(); | ||||
|         let local_pool; | ||||
|         let indexer_config = &index.indexer_config; | ||||
|         let pool = match &indexer_config.thread_pool { | ||||
|             Some(pool) => pool, | ||||
|             None => { | ||||
|                 local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); | ||||
|                 &local_pool | ||||
|             } | ||||
|         }; | ||||
|  | ||||
|         let rtxn = index.inner.read_txn().unwrap(); | ||||
|         let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); | ||||
|         let mut new_fields_ids_map = db_fields_ids_map.clone(); | ||||
| @@ -2263,27 +2298,24 @@ mod tests { | ||||
|         let indexer_alloc = Bump::new(); | ||||
|         let embedders = EmbeddingConfigs::default(); | ||||
|         let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::UpdateDocuments); | ||||
|         indexer.add_documents(&documents); | ||||
|         indexer.add_documents(&documents).unwrap(); | ||||
|  | ||||
|         let (document_changes, _operation_stats, primary_key) = indexer | ||||
|             .into_changes(&indexer_alloc, &index.inner, &rtxn, None, &mut new_fields_ids_map) | ||||
|             .unwrap(); | ||||
|  | ||||
|         pool.install(|| { | ||||
|             indexer::index( | ||||
|                 &mut wtxn, | ||||
|                 &index.inner, | ||||
|                 indexer_config.grenad_parameters(), | ||||
|                 &db_fields_ids_map, | ||||
|                 new_fields_ids_map, | ||||
|                 primary_key, | ||||
|                 &document_changes, | ||||
|                 embedders, | ||||
|                 &|| false, | ||||
|                 &|_| (), | ||||
|             ) | ||||
|         }) | ||||
|         .unwrap() | ||||
|         indexer::index( | ||||
|             &mut wtxn, | ||||
|             &index.inner, | ||||
|             indexer_config.grenad_parameters(), | ||||
|             &db_fields_ids_map, | ||||
|             new_fields_ids_map, | ||||
|             primary_key, | ||||
|             &document_changes, | ||||
|             embedders, | ||||
|             &|| false, | ||||
|             &|_| (), | ||||
|         ) | ||||
|         .unwrap(); | ||||
|         wtxn.commit().unwrap(); | ||||
|  | ||||
| @@ -2296,16 +2328,7 @@ mod tests { | ||||
|         // A first batch of documents has been inserted | ||||
|  | ||||
|         let mut wtxn = index.write_txn().unwrap(); | ||||
|         let local_pool; | ||||
|         let indexer_config = &index.indexer_config; | ||||
|         let pool = match &indexer_config.thread_pool { | ||||
|             Some(pool) => pool, | ||||
|             None => { | ||||
|                 local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); | ||||
|                 &local_pool | ||||
|             } | ||||
|         }; | ||||
|  | ||||
|         let rtxn = index.inner.read_txn().unwrap(); | ||||
|         let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); | ||||
|         let mut new_fields_ids_map = db_fields_ids_map.clone(); | ||||
| @@ -2317,28 +2340,25 @@ mod tests { | ||||
|         let indexer_alloc = Bump::new(); | ||||
|         let embedders = EmbeddingConfigs::default(); | ||||
|         let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::UpdateDocuments); | ||||
|         indexer.add_documents(&documents); | ||||
|         indexer.add_documents(&documents).unwrap(); | ||||
|         indexer.delete_documents(&["1", "2"]); | ||||
|  | ||||
|         let (document_changes, _operation_stats, primary_key) = indexer | ||||
|             .into_changes(&indexer_alloc, &index.inner, &rtxn, None, &mut new_fields_ids_map) | ||||
|             .unwrap(); | ||||
|  | ||||
|         pool.install(|| { | ||||
|             indexer::index( | ||||
|                 &mut wtxn, | ||||
|                 &index.inner, | ||||
|                 indexer_config.grenad_parameters(), | ||||
|                 &db_fields_ids_map, | ||||
|                 new_fields_ids_map, | ||||
|                 primary_key, | ||||
|                 &document_changes, | ||||
|                 embedders, | ||||
|                 &|| false, | ||||
|                 &|_| (), | ||||
|             ) | ||||
|         }) | ||||
|         .unwrap() | ||||
|         indexer::index( | ||||
|             &mut wtxn, | ||||
|             &index.inner, | ||||
|             indexer_config.grenad_parameters(), | ||||
|             &db_fields_ids_map, | ||||
|             new_fields_ids_map, | ||||
|             primary_key, | ||||
|             &document_changes, | ||||
|             embedders, | ||||
|             &|| false, | ||||
|             &|_| (), | ||||
|         ) | ||||
|         .unwrap(); | ||||
|         wtxn.commit().unwrap(); | ||||
|  | ||||
| @@ -2349,19 +2369,10 @@ mod tests { | ||||
|  | ||||
|     #[test] | ||||
|     fn delete_document_and_then_add_documents_in_the_same_transform() { | ||||
|         let mut index = TempIndex::new(); | ||||
|         let index = TempIndex::new(); | ||||
|  | ||||
|         let mut wtxn = index.write_txn().unwrap(); | ||||
|         let local_pool; | ||||
|         let indexer_config = &index.indexer_config; | ||||
|         let pool = match &indexer_config.thread_pool { | ||||
|             Some(pool) => pool, | ||||
|             None => { | ||||
|                 local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); | ||||
|                 &local_pool | ||||
|             } | ||||
|         }; | ||||
|  | ||||
|         let rtxn = index.inner.read_txn().unwrap(); | ||||
|         let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); | ||||
|         let mut new_fields_ids_map = db_fields_ids_map.clone(); | ||||
| @@ -2375,27 +2386,24 @@ mod tests { | ||||
|             { "id": 2, "doggo": { "name": "jean", "age": 20 } }, | ||||
|             { "id": 3, "name": "bob", "age": 25 }, | ||||
|         ]); | ||||
|         indexer.add_documents(&documents); | ||||
|         indexer.add_documents(&documents).unwrap(); | ||||
|  | ||||
|         let (document_changes, _operation_stats, primary_key) = indexer | ||||
|             .into_changes(&indexer_alloc, &index.inner, &rtxn, None, &mut new_fields_ids_map) | ||||
|             .unwrap(); | ||||
|  | ||||
|         pool.install(|| { | ||||
|             indexer::index( | ||||
|                 &mut wtxn, | ||||
|                 &index.inner, | ||||
|                 indexer_config.grenad_parameters(), | ||||
|                 &db_fields_ids_map, | ||||
|                 new_fields_ids_map, | ||||
|                 primary_key, | ||||
|                 &document_changes, | ||||
|                 embedders, | ||||
|                 &|| false, | ||||
|                 &|_| (), | ||||
|             ) | ||||
|         }) | ||||
|         .unwrap() | ||||
|         indexer::index( | ||||
|             &mut wtxn, | ||||
|             &index.inner, | ||||
|             indexer_config.grenad_parameters(), | ||||
|             &db_fields_ids_map, | ||||
|             new_fields_ids_map, | ||||
|             primary_key, | ||||
|             &document_changes, | ||||
|             embedders, | ||||
|             &|| false, | ||||
|             &|_| (), | ||||
|         ) | ||||
|         .unwrap(); | ||||
|         wtxn.commit().unwrap(); | ||||
|  | ||||
| @@ -2407,19 +2415,10 @@ mod tests { | ||||
|  | ||||
|     #[test] | ||||
|     fn delete_the_same_document_multiple_time() { | ||||
|         let mut index = TempIndex::new(); | ||||
|         let index = TempIndex::new(); | ||||
|  | ||||
|         let mut wtxn = index.write_txn().unwrap(); | ||||
|         let local_pool; | ||||
|         let indexer_config = &index.indexer_config; | ||||
|         let pool = match &indexer_config.thread_pool { | ||||
|             Some(pool) => pool, | ||||
|             None => { | ||||
|                 local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); | ||||
|                 &local_pool | ||||
|             } | ||||
|         }; | ||||
|  | ||||
|         let rtxn = index.inner.read_txn().unwrap(); | ||||
|         let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); | ||||
|         let mut new_fields_ids_map = db_fields_ids_map.clone(); | ||||
| @@ -2435,7 +2434,7 @@ mod tests { | ||||
|             { "id": 2, "doggo": { "name": "jean", "age": 20 } }, | ||||
|             { "id": 3, "name": "bob", "age": 25 }, | ||||
|         ]); | ||||
|         indexer.add_documents(&documents); | ||||
|         indexer.add_documents(&documents).unwrap(); | ||||
|  | ||||
|         indexer.delete_documents(&["1", "2", "1", "2"]); | ||||
|  | ||||
| @@ -2443,21 +2442,18 @@ mod tests { | ||||
|             .into_changes(&indexer_alloc, &index.inner, &rtxn, None, &mut new_fields_ids_map) | ||||
|             .unwrap(); | ||||
|  | ||||
|         pool.install(|| { | ||||
|             indexer::index( | ||||
|                 &mut wtxn, | ||||
|                 &index.inner, | ||||
|                 indexer_config.grenad_parameters(), | ||||
|                 &db_fields_ids_map, | ||||
|                 new_fields_ids_map, | ||||
|                 primary_key, | ||||
|                 &document_changes, | ||||
|                 embedders, | ||||
|                 &|| false, | ||||
|                 &|_| (), | ||||
|             ) | ||||
|         }) | ||||
|         .unwrap() | ||||
|         indexer::index( | ||||
|             &mut wtxn, | ||||
|             &index.inner, | ||||
|             indexer_config.grenad_parameters(), | ||||
|             &db_fields_ids_map, | ||||
|             new_fields_ids_map, | ||||
|             primary_key, | ||||
|             &document_changes, | ||||
|             embedders, | ||||
|             &|| false, | ||||
|             &|_| (), | ||||
|         ) | ||||
|         .unwrap(); | ||||
|         wtxn.commit().unwrap(); | ||||
|  | ||||
| @@ -2468,19 +2464,10 @@ mod tests { | ||||
|  | ||||
|     #[test] | ||||
|     fn add_document_and_in_another_transform_delete_the_document_then_add_it_again() { | ||||
|         let mut index = TempIndex::new(); | ||||
|         let index = TempIndex::new(); | ||||
|  | ||||
|         let mut wtxn = index.write_txn().unwrap(); | ||||
|         let local_pool; | ||||
|         let indexer_config = &index.indexer_config; | ||||
|         let pool = match &indexer_config.thread_pool { | ||||
|             Some(pool) => pool, | ||||
|             None => { | ||||
|                 local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); | ||||
|                 &local_pool | ||||
|             } | ||||
|         }; | ||||
|  | ||||
|         let rtxn = index.inner.read_txn().unwrap(); | ||||
|         let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); | ||||
|         let mut new_fields_ids_map = db_fields_ids_map.clone(); | ||||
| @@ -2492,27 +2479,24 @@ mod tests { | ||||
|         let documents = documents!([ | ||||
|             { "id": 1, "doggo": "kevin" }, | ||||
|         ]); | ||||
|         indexer.add_documents(&documents); | ||||
|         indexer.add_documents(&documents).unwrap(); | ||||
|  | ||||
|         let (document_changes, _operation_stats, primary_key) = indexer | ||||
|             .into_changes(&indexer_alloc, &index.inner, &rtxn, None, &mut new_fields_ids_map) | ||||
|             .unwrap(); | ||||
|  | ||||
|         pool.install(|| { | ||||
|             indexer::index( | ||||
|                 &mut wtxn, | ||||
|                 &index.inner, | ||||
|                 indexer_config.grenad_parameters(), | ||||
|                 &db_fields_ids_map, | ||||
|                 new_fields_ids_map, | ||||
|                 primary_key, | ||||
|                 &document_changes, | ||||
|                 embedders, | ||||
|                 &|| false, | ||||
|                 &|_| (), | ||||
|             ) | ||||
|         }) | ||||
|         .unwrap() | ||||
|         indexer::index( | ||||
|             &mut wtxn, | ||||
|             &index.inner, | ||||
|             indexer_config.grenad_parameters(), | ||||
|             &db_fields_ids_map, | ||||
|             new_fields_ids_map, | ||||
|             primary_key, | ||||
|             &document_changes, | ||||
|             embedders, | ||||
|             &|| false, | ||||
|             &|_| (), | ||||
|         ) | ||||
|         .unwrap(); | ||||
|         wtxn.commit().unwrap(); | ||||
|  | ||||
| @@ -2523,7 +2507,7 @@ mod tests { | ||||
|         // A first batch of documents has been inserted | ||||
|  | ||||
|         let mut wtxn = index.write_txn().unwrap(); | ||||
|  | ||||
|         let indexer_config = &index.indexer_config; | ||||
|         let rtxn = index.inner.read_txn().unwrap(); | ||||
|         let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); | ||||
|         let mut new_fields_ids_map = db_fields_ids_map.clone(); | ||||
| @@ -2538,27 +2522,24 @@ mod tests { | ||||
|             { "id": 1, "catto": "jorts" }, | ||||
|         ]); | ||||
|  | ||||
|         indexer.add_documents(&documents); | ||||
|         indexer.add_documents(&documents).unwrap(); | ||||
|  | ||||
|         let (document_changes, _operation_stats, primary_key) = indexer | ||||
|             .into_changes(&indexer_alloc, &index.inner, &rtxn, None, &mut new_fields_ids_map) | ||||
|             .unwrap(); | ||||
|  | ||||
|         pool.install(|| { | ||||
|             indexer::index( | ||||
|                 &mut wtxn, | ||||
|                 &index.inner, | ||||
|                 indexer_config.grenad_parameters(), | ||||
|                 &db_fields_ids_map, | ||||
|                 new_fields_ids_map, | ||||
|                 primary_key, | ||||
|                 &document_changes, | ||||
|                 embedders, | ||||
|                 &|| false, | ||||
|                 &|_| (), | ||||
|             ) | ||||
|         }) | ||||
|         .unwrap() | ||||
|         indexer::index( | ||||
|             &mut wtxn, | ||||
|             &index.inner, | ||||
|             indexer_config.grenad_parameters(), | ||||
|             &db_fields_ids_map, | ||||
|             new_fields_ids_map, | ||||
|             primary_key, | ||||
|             &document_changes, | ||||
|             embedders, | ||||
|             &|| false, | ||||
|             &|_| (), | ||||
|         ) | ||||
|         .unwrap(); | ||||
|         wtxn.commit().unwrap(); | ||||
|  | ||||
| @@ -2711,17 +2692,7 @@ mod tests { | ||||
|         println!("--- ENTERING BATCH 1"); | ||||
|  | ||||
|         let mut wtxn = index.write_txn().unwrap(); | ||||
|  | ||||
|         let local_pool; | ||||
|         let indexer_config = &index.indexer_config; | ||||
|         let pool = match &indexer_config.thread_pool { | ||||
|             Some(pool) => pool, | ||||
|             None => { | ||||
|                 local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); | ||||
|                 &local_pool | ||||
|             } | ||||
|         }; | ||||
|  | ||||
|         let rtxn = index.inner.read_txn().unwrap(); | ||||
|         let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); | ||||
|         let mut new_fields_ids_map = db_fields_ids_map.clone(); | ||||
| @@ -2742,21 +2713,18 @@ mod tests { | ||||
|             .into_changes(&indexer_alloc, &index.inner, &rtxn, None, &mut new_fields_ids_map) | ||||
|             .unwrap(); | ||||
|  | ||||
|         pool.install(|| { | ||||
|             indexer::index( | ||||
|                 &mut wtxn, | ||||
|                 &index.inner, | ||||
|                 indexer_config.grenad_parameters(), | ||||
|                 &db_fields_ids_map, | ||||
|                 new_fields_ids_map, | ||||
|                 primary_key, | ||||
|                 &document_changes, | ||||
|                 embedders, | ||||
|                 &|| false, | ||||
|                 &|_| (), | ||||
|             ) | ||||
|         }) | ||||
|         .unwrap() | ||||
|         indexer::index( | ||||
|             &mut wtxn, | ||||
|             &index.inner, | ||||
|             indexer_config.grenad_parameters(), | ||||
|             &db_fields_ids_map, | ||||
|             new_fields_ids_map, | ||||
|             primary_key, | ||||
|             &document_changes, | ||||
|             embedders, | ||||
|             &|| false, | ||||
|             &|_| (), | ||||
|         ) | ||||
|         .unwrap(); | ||||
|         wtxn.commit().unwrap(); | ||||
|  | ||||
| @@ -2775,7 +2743,7 @@ mod tests { | ||||
|         println!("--- ENTERING BATCH 2"); | ||||
|  | ||||
|         let mut wtxn = index.write_txn().unwrap(); | ||||
|  | ||||
|         let indexer_config = &index.indexer_config; | ||||
|         let rtxn = index.inner.read_txn().unwrap(); | ||||
|         let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); | ||||
|         let mut new_fields_ids_map = db_fields_ids_map.clone(); | ||||
| @@ -2795,21 +2763,18 @@ mod tests { | ||||
|             .into_changes(&indexer_alloc, &index.inner, &rtxn, None, &mut new_fields_ids_map) | ||||
|             .unwrap(); | ||||
|  | ||||
|         pool.install(|| { | ||||
|             indexer::index( | ||||
|                 &mut wtxn, | ||||
|                 &index.inner, | ||||
|                 indexer_config.grenad_parameters(), | ||||
|                 &db_fields_ids_map, | ||||
|                 new_fields_ids_map, | ||||
|                 primary_key, | ||||
|                 &document_changes, | ||||
|                 embedders, | ||||
|                 &|| false, | ||||
|                 &|_| (), | ||||
|             ) | ||||
|         }) | ||||
|         .unwrap() | ||||
|         indexer::index( | ||||
|             &mut wtxn, | ||||
|             &index.inner, | ||||
|             indexer_config.grenad_parameters(), | ||||
|             &db_fields_ids_map, | ||||
|             new_fields_ids_map, | ||||
|             primary_key, | ||||
|             &document_changes, | ||||
|             embedders, | ||||
|             &|| false, | ||||
|             &|_| (), | ||||
|         ) | ||||
|         .unwrap(); | ||||
|         wtxn.commit().unwrap(); | ||||
|  | ||||
| @@ -2827,7 +2792,7 @@ mod tests { | ||||
|         println!("--- ENTERING BATCH 3"); | ||||
|  | ||||
|         let mut wtxn = index.write_txn().unwrap(); | ||||
|  | ||||
|         let indexer_config = &index.indexer_config; | ||||
|         let rtxn = index.inner.read_txn().unwrap(); | ||||
|         let db_fields_ids_map = index.inner.fields_ids_map(&rtxn).unwrap(); | ||||
|         let mut new_fields_ids_map = db_fields_ids_map.clone(); | ||||
| @@ -2845,21 +2810,18 @@ mod tests { | ||||
|             .into_changes(&indexer_alloc, &index.inner, &rtxn, None, &mut new_fields_ids_map) | ||||
|             .unwrap(); | ||||
|  | ||||
|         pool.install(|| { | ||||
|             indexer::index( | ||||
|                 &mut wtxn, | ||||
|                 &index.inner, | ||||
|                 indexer_config.grenad_parameters(), | ||||
|                 &db_fields_ids_map, | ||||
|                 new_fields_ids_map, | ||||
|                 primary_key, | ||||
|                 &document_changes, | ||||
|                 embedders, | ||||
|                 &|| false, | ||||
|                 &|_| (), | ||||
|             ) | ||||
|         }) | ||||
|         .unwrap() | ||||
|         indexer::index( | ||||
|             &mut wtxn, | ||||
|             &index.inner, | ||||
|             indexer_config.grenad_parameters(), | ||||
|             &db_fields_ids_map, | ||||
|             new_fields_ids_map, | ||||
|             primary_key, | ||||
|             &document_changes, | ||||
|             embedders, | ||||
|             &|| false, | ||||
|             &|_| (), | ||||
|         ) | ||||
|         .unwrap(); | ||||
|         wtxn.commit().unwrap(); | ||||
|  | ||||
| @@ -2913,8 +2875,7 @@ mod tests { | ||||
|             .unwrap(); | ||||
|         wtxn.commit().unwrap(); | ||||
|  | ||||
|         let mut wtxn = index.write_txn().unwrap(); | ||||
|         // delete those documents, ids are synchronous therefore 0, 1, and 2. | ||||
|         let mut wtxn = index.write_txn().unwrap(); // delete those documents, ids are synchronous therefore 0, 1, and 2. | ||||
|         index.delete_documents_using_wtxn(&mut wtxn, vec![S("0"), S("1"), S("2")]).unwrap(); | ||||
|         wtxn.commit().unwrap(); | ||||
|  | ||||
| @@ -2951,7 +2912,6 @@ mod tests { | ||||
|         wtxn.commit().unwrap(); | ||||
|  | ||||
|         let mut wtxn = index.write_txn().unwrap(); | ||||
|  | ||||
|         // Delete not all of the documents but some of them. | ||||
|         index.delete_documents_using_wtxn(&mut wtxn, vec![S("0"), S("1")]).unwrap(); | ||||
|  | ||||
| @@ -3287,7 +3247,6 @@ mod tests { | ||||
|         let index = TempIndex::new(); | ||||
|  | ||||
|         let mut wtxn = index.write_txn().unwrap(); | ||||
|  | ||||
|         index | ||||
|             .update_settings_using_wtxn(&mut wtxn, |settings| { | ||||
|                 settings.set_primary_key(S("docid")); | ||||
|   | ||||
| @@ -1,11 +1,14 @@ | ||||
| use std::borrow::Cow; | ||||
| use std::collections::btree_map::Entry as BEntry; | ||||
| use std::collections::hash_map::Entry as HEntry; | ||||
| use std::collections::{BTreeMap, HashMap, HashSet}; | ||||
| use std::fs::File; | ||||
| use std::io::{Read, Seek}; | ||||
|  | ||||
| use either::Either; | ||||
| use fxhash::FxHashMap; | ||||
| use itertools::Itertools; | ||||
| use obkv::{KvReader, KvWriter}; | ||||
| use obkv::{KvReader, KvReaderU16, KvWriter}; | ||||
| use roaring::RoaringBitmap; | ||||
| use serde_json::Value; | ||||
| use smartstring::SmartString; | ||||
| @@ -14,16 +17,17 @@ use super::helpers::{ | ||||
|     create_sorter, sorter_into_reader, EitherObkvMerge, ObkvsKeepLastAdditionMergeDeletions, | ||||
|     ObkvsMergeAdditionsAndDeletions, | ||||
| }; | ||||
| use super::{IndexDocumentsMethod, IndexerConfig, KeepFirst}; | ||||
| use crate::documents::DocumentsBatchIndex; | ||||
| use super::{create_writer, IndexDocumentsMethod, IndexerConfig, KeepFirst}; | ||||
| use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader}; | ||||
| use crate::error::{Error, InternalError, UserError}; | ||||
| use crate::index::{db_name, main_key}; | ||||
| use crate::update::del_add::{ | ||||
|     into_del_add_obkv, into_del_add_obkv_conditional_operation, DelAddOperation, | ||||
|     into_del_add_obkv, into_del_add_obkv_conditional_operation, DelAdd, DelAddOperation, | ||||
|     KvReaderDelAdd, | ||||
| }; | ||||
| use crate::update::index_documents::GrenadParameters; | ||||
| use crate::update::settings::InnerIndexSettingsDiff; | ||||
| use crate::update::AvailableIds; | ||||
| use crate::update::settings::{InnerIndexSettings, InnerIndexSettingsDiff}; | ||||
| use crate::update::{AvailableIds, UpdateIndexingStep}; | ||||
| use crate::vector::parsed_vectors::{ExplicitVectors, VectorOrArrayOfVectors}; | ||||
| use crate::vector::settings::WriteBackToDocuments; | ||||
| use crate::vector::ArroyWrapper; | ||||
| @@ -48,8 +52,23 @@ pub struct TransformOutput { | ||||
| /// containing all those documents. | ||||
| pub struct Transform<'a, 'i> { | ||||
|     pub index: &'i Index, | ||||
|     fields_ids_map: FieldsIdsMap, | ||||
|  | ||||
|     indexer_settings: &'a IndexerConfig, | ||||
|     pub index_documents_method: IndexDocumentsMethod, | ||||
|     available_documents_ids: AvailableIds, | ||||
|  | ||||
|     // Both grenad follows the same format: | ||||
|     // key | value | ||||
|     // u32 | 1 byte for the Operation byte, the rest is the obkv of the document stored | ||||
|     original_sorter: grenad::Sorter<EitherObkvMerge>, | ||||
|     flattened_sorter: grenad::Sorter<EitherObkvMerge>, | ||||
|  | ||||
|     replaced_documents_ids: RoaringBitmap, | ||||
|     new_documents_ids: RoaringBitmap, | ||||
|     // To increase the cache locality and decrease the heap usage we use compact smartstring. | ||||
|     new_external_documents_ids_builder: FxHashMap<SmartString<smartstring::Compact>, u64>, | ||||
|     documents_count: usize, | ||||
| } | ||||
|  | ||||
| /// This enum is specific to the grenad sorter stored in the transform. | ||||
| @@ -60,6 +79,29 @@ pub enum Operation { | ||||
|     Deletion, | ||||
| } | ||||
|  | ||||
| /// Create a mapping between the field ids found in the document batch and the one that were | ||||
| /// already present in the index. | ||||
| /// | ||||
| /// If new fields are present in the addition, they are added to the index field ids map. | ||||
| fn create_fields_mapping( | ||||
|     index_field_map: &mut FieldsIdsMap, | ||||
|     batch_field_map: &DocumentsBatchIndex, | ||||
| ) -> Result<HashMap<FieldId, FieldId>> { | ||||
|     batch_field_map | ||||
|         .iter() | ||||
|         // we sort by id here to ensure a deterministic mapping of the fields, that preserves | ||||
|         // the original ordering. | ||||
|         .sorted_by_key(|(&id, _)| id) | ||||
|         .map(|(field, name)| match index_field_map.id(name) { | ||||
|             Some(id) => Ok((*field, id)), | ||||
|             None => index_field_map | ||||
|                 .insert(name) | ||||
|                 .ok_or(Error::UserError(UserError::AttributeLimitReached)) | ||||
|                 .map(|id| (*field, id)), | ||||
|         }) | ||||
|         .collect() | ||||
| } | ||||
|  | ||||
| impl<'a, 'i> Transform<'a, 'i> { | ||||
|     pub fn new( | ||||
|         wtxn: &mut heed::RwTxn<'_>, | ||||
| @@ -100,7 +142,224 @@ impl<'a, 'i> Transform<'a, 'i> { | ||||
|         ); | ||||
|         let documents_ids = index.documents_ids(wtxn)?; | ||||
|  | ||||
|         Ok(Transform { index, indexer_settings, index_documents_method }) | ||||
|         Ok(Transform { | ||||
|             index, | ||||
|             fields_ids_map: index.fields_ids_map(wtxn)?, | ||||
|             indexer_settings, | ||||
|             available_documents_ids: AvailableIds::new(&documents_ids), | ||||
|             original_sorter, | ||||
|             flattened_sorter, | ||||
|             index_documents_method, | ||||
|             replaced_documents_ids: RoaringBitmap::new(), | ||||
|             new_documents_ids: RoaringBitmap::new(), | ||||
|             new_external_documents_ids_builder: FxHashMap::default(), | ||||
|             documents_count: 0, | ||||
|         }) | ||||
|     } | ||||
|  | ||||
|     #[tracing::instrument(level = "trace", skip_all, target = "indexing::documents")] | ||||
|     pub fn read_documents<R, FP, FA>( | ||||
|         &mut self, | ||||
|         reader: EnrichedDocumentsBatchReader<R>, | ||||
|         wtxn: &mut heed::RwTxn<'_>, | ||||
|         progress_callback: FP, | ||||
|         should_abort: FA, | ||||
|     ) -> Result<usize> | ||||
|     where | ||||
|         R: Read + Seek, | ||||
|         FP: Fn(UpdateIndexingStep) + Sync, | ||||
|         FA: Fn() -> bool + Sync, | ||||
|     { | ||||
|         let (mut cursor, fields_index) = reader.into_cursor_and_fields_index(); | ||||
|         let external_documents_ids = self.index.external_documents_ids(); | ||||
|         let mapping = create_fields_mapping(&mut self.fields_ids_map, &fields_index)?; | ||||
|  | ||||
|         let primary_key = cursor.primary_key().to_string(); | ||||
|         let primary_key_id = | ||||
|             self.fields_ids_map.insert(&primary_key).ok_or(UserError::AttributeLimitReached)?; | ||||
|  | ||||
|         let mut obkv_buffer = Vec::new(); | ||||
|         let mut document_sorter_value_buffer = Vec::new(); | ||||
|         let mut document_sorter_key_buffer = Vec::new(); | ||||
|         let mut documents_count = 0; | ||||
|         let mut docid_buffer: Vec<u8> = Vec::new(); | ||||
|         let mut field_buffer: Vec<(u16, Cow<'_, [u8]>)> = Vec::new(); | ||||
|         while let Some(enriched_document) = cursor.next_enriched_document()? { | ||||
|             let EnrichedDocument { document, document_id } = enriched_document; | ||||
|  | ||||
|             if should_abort() { | ||||
|                 return Err(Error::InternalError(InternalError::AbortedIndexation)); | ||||
|             } | ||||
|  | ||||
|             // drop_and_reuse is called instead of .clear() to communicate to the compiler that field_buffer | ||||
|             // does not keep references from the cursor between loop iterations | ||||
|             let mut field_buffer_cache = drop_and_reuse(field_buffer); | ||||
|             if self.indexer_settings.log_every_n.map_or(false, |len| documents_count % len == 0) { | ||||
|                 progress_callback(UpdateIndexingStep::RemapDocumentAddition { | ||||
|                     documents_seen: documents_count, | ||||
|                 }); | ||||
|             } | ||||
|  | ||||
|             // When the document id has been auto-generated by the `enrich_documents_batch` | ||||
|             // we must insert this document id into the remaped document. | ||||
|             let external_id = document_id.value(); | ||||
|             if document_id.is_generated() { | ||||
|                 serde_json::to_writer(&mut docid_buffer, external_id) | ||||
|                     .map_err(InternalError::SerdeJson)?; | ||||
|                 field_buffer_cache.push((primary_key_id, Cow::from(&docid_buffer))); | ||||
|             } | ||||
|  | ||||
|             for (k, v) in document.iter() { | ||||
|                 let mapped_id = | ||||
|                     *mapping.get(&k).ok_or(InternalError::FieldIdMappingMissingEntry { key: k })?; | ||||
|                 field_buffer_cache.push((mapped_id, Cow::from(v))); | ||||
|             } | ||||
|  | ||||
|             // Insertion in a obkv need to be done with keys ordered. For now they are ordered | ||||
|             // according to the document addition key order, so we sort it according to the | ||||
|             // fieldids map keys order. | ||||
|             field_buffer_cache.sort_unstable_by(|(f1, _), (f2, _)| f1.cmp(f2)); | ||||
|  | ||||
|             // Build the new obkv document. | ||||
|             let mut writer = KvWriter::new(&mut obkv_buffer); | ||||
|             for (k, v) in field_buffer_cache.iter() { | ||||
|                 writer.insert(*k, v)?; | ||||
|             } | ||||
|  | ||||
|             let mut original_docid = None; | ||||
|             let docid = match self.new_external_documents_ids_builder.entry((*external_id).into()) { | ||||
|                 HEntry::Occupied(entry) => *entry.get() as u32, | ||||
|                 HEntry::Vacant(entry) => { | ||||
|                     let docid = match external_documents_ids.get(wtxn, entry.key())? { | ||||
|                         Some(docid) => { | ||||
|                             // If it was already in the list of replaced documents it means it was deleted | ||||
|                             // by the remove_document method. We should starts as if it never existed. | ||||
|                             if self.replaced_documents_ids.insert(docid) { | ||||
|                                 original_docid = Some(docid); | ||||
|                             } | ||||
|  | ||||
|                             docid | ||||
|                         } | ||||
|                         None => self | ||||
|                             .available_documents_ids | ||||
|                             .next() | ||||
|                             .ok_or(UserError::DocumentLimitReached)?, | ||||
|                     }; | ||||
|                     entry.insert(docid as u64); | ||||
|                     docid | ||||
|                 } | ||||
|             }; | ||||
|  | ||||
|             let mut skip_insertion = false; | ||||
|             if let Some(original_docid) = original_docid { | ||||
|                 let original_key = original_docid; | ||||
|                 let base_obkv = self | ||||
|                     .index | ||||
|                     .documents | ||||
|                     .remap_data_type::<heed::types::Bytes>() | ||||
|                     .get(wtxn, &original_key)? | ||||
|                     .ok_or(InternalError::DatabaseMissingEntry { | ||||
|                         db_name: db_name::DOCUMENTS, | ||||
|                         key: None, | ||||
|                     })?; | ||||
|  | ||||
|                 // we check if the two documents are exactly equal. If it's the case we can skip this document entirely | ||||
|                 if base_obkv == obkv_buffer { | ||||
|                     // we're not replacing anything | ||||
|                     self.replaced_documents_ids.remove(original_docid); | ||||
|                     // and we need to put back the original id as it was before | ||||
|                     self.new_external_documents_ids_builder.remove(external_id); | ||||
|                     skip_insertion = true; | ||||
|                 } else { | ||||
|                     // we associate the base document with the new key, everything will get merged later. | ||||
|                     let deladd_operation = match self.index_documents_method { | ||||
|                         IndexDocumentsMethod::UpdateDocuments => { | ||||
|                             DelAddOperation::DeletionAndAddition | ||||
|                         } | ||||
|                         IndexDocumentsMethod::ReplaceDocuments => DelAddOperation::Deletion, | ||||
|                     }; | ||||
|                     document_sorter_key_buffer.clear(); | ||||
|                     document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes()); | ||||
|                     document_sorter_key_buffer.extend_from_slice(external_id.as_bytes()); | ||||
|                     document_sorter_value_buffer.clear(); | ||||
|                     document_sorter_value_buffer.push(Operation::Addition as u8); | ||||
|                     into_del_add_obkv( | ||||
|                         KvReaderU16::from_slice(base_obkv), | ||||
|                         deladd_operation, | ||||
|                         &mut document_sorter_value_buffer, | ||||
|                     )?; | ||||
|                     self.original_sorter | ||||
|                         .insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?; | ||||
|                     let base_obkv = KvReader::from_slice(base_obkv); | ||||
|                     if let Some(flattened_obkv) = | ||||
|                         Self::flatten_from_fields_ids_map(base_obkv, &mut self.fields_ids_map)? | ||||
|                     { | ||||
|                         // we recreate our buffer with the flattened documents | ||||
|                         document_sorter_value_buffer.clear(); | ||||
|                         document_sorter_value_buffer.push(Operation::Addition as u8); | ||||
|                         into_del_add_obkv( | ||||
|                             KvReaderU16::from_slice(&flattened_obkv), | ||||
|                             deladd_operation, | ||||
|                             &mut document_sorter_value_buffer, | ||||
|                         )?; | ||||
|                     } | ||||
|                     self.flattened_sorter | ||||
|                         .insert(docid.to_be_bytes(), &document_sorter_value_buffer)?; | ||||
|                 } | ||||
|             } | ||||
|  | ||||
|             if !skip_insertion { | ||||
|                 self.new_documents_ids.insert(docid); | ||||
|  | ||||
|                 document_sorter_key_buffer.clear(); | ||||
|                 document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes()); | ||||
|                 document_sorter_key_buffer.extend_from_slice(external_id.as_bytes()); | ||||
|                 document_sorter_value_buffer.clear(); | ||||
|                 document_sorter_value_buffer.push(Operation::Addition as u8); | ||||
|                 into_del_add_obkv( | ||||
|                     KvReaderU16::from_slice(&obkv_buffer), | ||||
|                     DelAddOperation::Addition, | ||||
|                     &mut document_sorter_value_buffer, | ||||
|                 )?; | ||||
|                 // We use the extracted/generated user id as the key for this document. | ||||
|                 self.original_sorter | ||||
|                     .insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?; | ||||
|  | ||||
|                 let flattened_obkv = KvReader::from_slice(&obkv_buffer); | ||||
|                 if let Some(obkv) = | ||||
|                     Self::flatten_from_fields_ids_map(flattened_obkv, &mut self.fields_ids_map)? | ||||
|                 { | ||||
|                     document_sorter_value_buffer.clear(); | ||||
|                     document_sorter_value_buffer.push(Operation::Addition as u8); | ||||
|                     into_del_add_obkv( | ||||
|                         KvReaderU16::from_slice(&obkv), | ||||
|                         DelAddOperation::Addition, | ||||
|                         &mut document_sorter_value_buffer, | ||||
|                     )? | ||||
|                 } | ||||
|                 self.flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_value_buffer)?; | ||||
|             } | ||||
|             documents_count += 1; | ||||
|  | ||||
|             progress_callback(UpdateIndexingStep::RemapDocumentAddition { | ||||
|                 documents_seen: documents_count, | ||||
|             }); | ||||
|  | ||||
|             field_buffer = drop_and_reuse(field_buffer_cache); | ||||
|             docid_buffer.clear(); | ||||
|             obkv_buffer.clear(); | ||||
|         } | ||||
|  | ||||
|         progress_callback(UpdateIndexingStep::RemapDocumentAddition { | ||||
|             documents_seen: documents_count, | ||||
|         }); | ||||
|  | ||||
|         self.index.put_fields_ids_map(wtxn, &self.fields_ids_map)?; | ||||
|         self.index.put_primary_key(wtxn, &primary_key)?; | ||||
|         self.documents_count += documents_count; | ||||
|         // Now that we have a valid sorter that contains the user id and the obkv we | ||||
|         // give it to the last transforming function which returns the TransformOutput. | ||||
|         Ok(documents_count) | ||||
|     } | ||||
|  | ||||
|     // Flatten a document from the fields ids map contained in self and insert the new | ||||
| @@ -230,6 +489,167 @@ impl<'a, 'i> Transform<'a, 'i> { | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     /// Generate the `TransformOutput` based on the given sorter that can be generated from any | ||||
|     /// format like CSV, JSON or JSON stream. This sorter must contain a key that is the document | ||||
|     /// id for the user side and the value must be an obkv where keys are valid fields ids. | ||||
|     #[tracing::instrument(level = "trace", skip_all, target = "indexing::transform")] | ||||
|     pub(crate) fn output_from_sorter<F>( | ||||
|         self, | ||||
|         wtxn: &mut heed::RwTxn<'_>, | ||||
|         progress_callback: F, | ||||
|     ) -> Result<TransformOutput> | ||||
|     where | ||||
|         F: Fn(UpdateIndexingStep) + Sync, | ||||
|     { | ||||
|         let primary_key = self | ||||
|             .index | ||||
|             .primary_key(wtxn)? | ||||
|             .ok_or(Error::InternalError(InternalError::DatabaseMissingEntry { | ||||
|                 db_name: db_name::MAIN, | ||||
|                 key: Some(main_key::PRIMARY_KEY_KEY), | ||||
|             }))? | ||||
|             .to_string(); | ||||
|  | ||||
|         // We create a final writer to write the new documents in order from the sorter. | ||||
|         let mut writer = create_writer( | ||||
|             self.indexer_settings.chunk_compression_type, | ||||
|             self.indexer_settings.chunk_compression_level, | ||||
|             tempfile::tempfile()?, | ||||
|         ); | ||||
|  | ||||
|         // To compute the field distribution we need to; | ||||
|         // 1. Remove all the deleted documents from the field distribution | ||||
|         // 2. Add all the new documents to the field distribution | ||||
|         let mut field_distribution = self.index.field_distribution(wtxn)?; | ||||
|  | ||||
|         // Here we are going to do the document count + field distribution + `write_into_stream_writer` | ||||
|         let mut iter = self.original_sorter.into_stream_merger_iter()?; | ||||
|         // used only for the callback | ||||
|         let mut documents_count = 0; | ||||
|  | ||||
|         while let Some((key, val)) = iter.next()? { | ||||
|             // skip first byte corresponding to the operation type (Deletion or Addition). | ||||
|             let val = &val[1..]; | ||||
|  | ||||
|             // send a callback to show at which step we are | ||||
|             documents_count += 1; | ||||
|             progress_callback(UpdateIndexingStep::ComputeIdsAndMergeDocuments { | ||||
|                 documents_seen: documents_count, | ||||
|                 total_documents: self.documents_count, | ||||
|             }); | ||||
|  | ||||
|             for (key, value) in KvReader::from_slice(val) { | ||||
|                 let reader = KvReaderDelAdd::from_slice(value); | ||||
|                 match (reader.get(DelAdd::Deletion), reader.get(DelAdd::Addition)) { | ||||
|                     (None, None) => (), | ||||
|                     (None, Some(_)) => { | ||||
|                         // New field | ||||
|                         let name = self.fields_ids_map.name(key).ok_or( | ||||
|                             FieldIdMapMissingEntry::FieldId { | ||||
|                                 field_id: key, | ||||
|                                 process: "Computing field distribution in transform.", | ||||
|                             }, | ||||
|                         )?; | ||||
|                         *field_distribution.entry(name.to_string()).or_insert(0) += 1; | ||||
|                     } | ||||
|                     (Some(_), None) => { | ||||
|                         // Field removed | ||||
|                         let name = self.fields_ids_map.name(key).ok_or( | ||||
|                             FieldIdMapMissingEntry::FieldId { | ||||
|                                 field_id: key, | ||||
|                                 process: "Computing field distribution in transform.", | ||||
|                             }, | ||||
|                         )?; | ||||
|                         match field_distribution.entry(name.to_string()) { | ||||
|                             BEntry::Vacant(_) => { /* Bug? trying to remove a non-existing field */ | ||||
|                             } | ||||
|                             BEntry::Occupied(mut entry) => { | ||||
|                                 // attempt to remove one | ||||
|                                 match entry.get_mut().checked_sub(1) { | ||||
|                                     Some(0) => { | ||||
|                                         entry.remove(); | ||||
|                                     } | ||||
|                                     Some(new_val) => { | ||||
|                                         *entry.get_mut() = new_val; | ||||
|                                     } | ||||
|                                     None => { | ||||
|                                         unreachable!("Attempting to remove a field that wasn't in the field distribution") | ||||
|                                     } | ||||
|                                 } | ||||
|                             } | ||||
|                         } | ||||
|                     } | ||||
|                     (Some(_), Some(_)) => { | ||||
|                         // Value change, no field distribution change | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|             writer.insert(key, val)?; | ||||
|         } | ||||
|  | ||||
|         let mut original_documents = writer.into_inner()?; | ||||
|         // We then extract the file and reset the seek to be able to read it again. | ||||
|         original_documents.rewind()?; | ||||
|  | ||||
|         // We create a final writer to write the new documents in order from the sorter. | ||||
|         let mut writer = create_writer( | ||||
|             self.indexer_settings.chunk_compression_type, | ||||
|             self.indexer_settings.chunk_compression_level, | ||||
|             tempfile::tempfile()?, | ||||
|         ); | ||||
|  | ||||
|         // Once we have written all the documents into the final sorter, we write the nested documents | ||||
|         // into this writer. | ||||
|         // We get rids of the `Operation` byte and skip the deleted documents as well. | ||||
|         let mut iter = self.flattened_sorter.into_stream_merger_iter()?; | ||||
|         while let Some((key, val)) = iter.next()? { | ||||
|             // skip first byte corresponding to the operation type (Deletion or Addition). | ||||
|             let val = &val[1..]; | ||||
|             writer.insert(key, val)?; | ||||
|         } | ||||
|         let mut flattened_documents = writer.into_inner()?; | ||||
|         flattened_documents.rewind()?; | ||||
|  | ||||
|         let mut new_external_documents_ids_builder: Vec<_> = | ||||
|             self.new_external_documents_ids_builder.into_iter().collect(); | ||||
|  | ||||
|         new_external_documents_ids_builder | ||||
|             .sort_unstable_by(|(left, _), (right, _)| left.cmp(right)); | ||||
|         let mut fst_new_external_documents_ids_builder = fst::MapBuilder::memory(); | ||||
|         new_external_documents_ids_builder.into_iter().try_for_each(|(key, value)| { | ||||
|             fst_new_external_documents_ids_builder.insert(key, value) | ||||
|         })?; | ||||
|  | ||||
|         let old_inner_settings = InnerIndexSettings::from_index(self.index, wtxn)?; | ||||
|         let fields_ids_map = self.fields_ids_map; | ||||
|         let primary_key_id = self.index.primary_key(wtxn)?.and_then(|name| fields_ids_map.id(name)); | ||||
|         let mut new_inner_settings = old_inner_settings.clone(); | ||||
|         new_inner_settings.fields_ids_map = fields_ids_map; | ||||
|  | ||||
|         let embedding_config_updates = Default::default(); | ||||
|         let settings_update_only = false; | ||||
|         let settings_diff = InnerIndexSettingsDiff::new( | ||||
|             old_inner_settings, | ||||
|             new_inner_settings, | ||||
|             primary_key_id, | ||||
|             embedding_config_updates, | ||||
|             settings_update_only, | ||||
|         ); | ||||
|  | ||||
|         Ok(TransformOutput { | ||||
|             primary_key, | ||||
|             settings_diff, | ||||
|             field_distribution, | ||||
|             documents_count: self.documents_count, | ||||
|             original_documents: Some( | ||||
|                 original_documents.into_inner().map_err(|err| err.into_error())?, | ||||
|             ), | ||||
|             flattened_documents: Some( | ||||
|                 flattened_documents.into_inner().map_err(|err| err.into_error())?, | ||||
|             ), | ||||
|         }) | ||||
|     } | ||||
|  | ||||
|     /// Rebind the field_ids of the provided document to their values | ||||
|     /// based on the field_ids_maps difference between the old and the new settings, | ||||
|     /// then fill the provided buffers with delta documents using KvWritterDelAdd. | ||||
|   | ||||
| @@ -1,17 +1,13 @@ | ||||
| use std::io::Cursor; | ||||
|  | ||||
| use big_s::S; | ||||
| use bumpalo::Bump; | ||||
| use heed::EnvOpenOptions; | ||||
| use maplit::hashset; | ||||
| use milli::documents::{mmap_from_objects, DocumentsBatchBuilder, DocumentsBatchReader}; | ||||
| use milli::documents::mmap_from_objects; | ||||
| use milli::update::new::indexer; | ||||
| use milli::update::{ | ||||
|     IndexDocuments, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings, | ||||
| }; | ||||
| use milli::update::{IndexDocumentsMethod, IndexerConfig, Settings}; | ||||
| use milli::vector::EmbeddingConfigs; | ||||
| use milli::{FacetDistribution, Index, Object, OrderBy}; | ||||
| use serde_json::{from_value, json, Deserializer}; | ||||
| use serde_json::{from_value, json}; | ||||
|  | ||||
| #[test] | ||||
| fn test_facet_distribution_with_no_facet_values() { | ||||
|   | ||||
| @@ -1,16 +1,12 @@ | ||||
| use std::cmp::Reverse; | ||||
| use std::io::Cursor; | ||||
|  | ||||
| use big_s::S; | ||||
| use bumpalo::Bump; | ||||
| use heed::EnvOpenOptions; | ||||
| use itertools::Itertools; | ||||
| use maplit::hashset; | ||||
| use milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader}; | ||||
| use milli::update::new::indexer; | ||||
| use milli::update::{ | ||||
|     IndexDocuments, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings, | ||||
| }; | ||||
| use milli::update::{IndexDocumentsMethod, IndexerConfig, Settings}; | ||||
| use milli::vector::EmbeddingConfigs; | ||||
| use milli::{AscDesc, Criterion, Index, Member, Search, SearchResult, TermsMatchingStrategy}; | ||||
| use rand::Rng; | ||||
|   | ||||
| @@ -4,7 +4,7 @@ use bumpalo::Bump; | ||||
| use heed::EnvOpenOptions; | ||||
| use milli::documents::mmap_from_objects; | ||||
| use milli::update::new::indexer; | ||||
| use milli::update::{IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings}; | ||||
| use milli::update::{IndexDocumentsMethod, IndexerConfig, Settings}; | ||||
| use milli::vector::EmbeddingConfigs; | ||||
| use milli::{Criterion, Index, Object, Search, TermsMatchingStrategy}; | ||||
| use serde_json::from_value; | ||||
|   | ||||
		Reference in New Issue
	
	Block a user