mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-25 04:56:28 +00:00 
			
		
		
		
	Introduce the UpdateBuilder type along with some update operations
This commit is contained in:
		| @@ -340,7 +340,8 @@ where F: Fn(u32, u32) + Sync + Send, | |||||||
|     //      to first delete the replaced documents for example. |     //      to first delete the replaced documents for example. | ||||||
|     let mut wtxn = env.write_txn()?; |     let mut wtxn = env.write_txn()?; | ||||||
|  |  | ||||||
|     let contains_documents = index.documents_ids(&wtxn)?.map_or(false, |docids| !docids.is_empty()); |     let mut documents_ids = index.documents_ids(&wtxn)?; | ||||||
|  |     let contains_documents = !documents_ids.is_empty(); | ||||||
|     let write_method = if contains_documents { |     let write_method = if contains_documents { | ||||||
|         WriteMethod::GetMergePut |         WriteMethod::GetMergePut | ||||||
|     } else { |     } else { | ||||||
| @@ -354,13 +355,8 @@ where F: Fn(u32, u32) + Sync + Send, | |||||||
|     index.put_users_ids_documents_ids(&mut wtxn, &users_ids_documents_ids)?; |     index.put_users_ids_documents_ids(&mut wtxn, &users_ids_documents_ids)?; | ||||||
|  |  | ||||||
|     // We merge the new documents ids with the existing ones. |     // We merge the new documents ids with the existing ones. | ||||||
|     match index.documents_ids(&wtxn)? { |     documents_ids.union_with(&new_documents_ids); | ||||||
|         Some(mut documents_ids) => { |     index.put_documents_ids(&mut wtxn, &documents_ids)?; | ||||||
|             documents_ids.union_with(&new_documents_ids); |  | ||||||
|             index.put_documents_ids(&mut wtxn, &documents_ids)?; |  | ||||||
|         }, |  | ||||||
|         None => index.put_documents_ids(&mut wtxn, &new_documents_ids)?, |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     debug!("Writing the docid word positions into LMDB on disk..."); |     debug!("Writing the docid word positions into LMDB on disk..."); | ||||||
|     merge_into_lmdb_database( |     merge_into_lmdb_database( | ||||||
|   | |||||||
| @@ -6,11 +6,11 @@ mod indexing; | |||||||
| mod mdfs; | mod mdfs; | ||||||
| mod query_tokens; | mod query_tokens; | ||||||
| mod search; | mod search; | ||||||
| mod update_store; |  | ||||||
| pub mod heed_codec; | pub mod heed_codec; | ||||||
| pub mod proximity; | pub mod proximity; | ||||||
| pub mod subcommand; | pub mod subcommand; | ||||||
| pub mod tokenizer; | pub mod tokenizer; | ||||||
|  | pub mod update; | ||||||
|  |  | ||||||
| use std::collections::HashMap; | use std::collections::HashMap; | ||||||
| use std::hash::BuildHasherDefault; | use std::hash::BuildHasherDefault; | ||||||
| @@ -21,11 +21,11 @@ pub use self::criterion::{Criterion, default_criteria}; | |||||||
| pub use self::fields_ids_map::FieldsIdsMap; | pub use self::fields_ids_map::FieldsIdsMap; | ||||||
| pub use self::index::Index; | pub use self::index::Index; | ||||||
| pub use self::search::{Search, SearchResult}; | pub use self::search::{Search, SearchResult}; | ||||||
| pub use self::update_store::UpdateStore; |  | ||||||
| pub use self::heed_codec::{ | pub use self::heed_codec::{ | ||||||
|     RoaringBitmapCodec, BEU32StrCodec, StrStrU8Codec, |     RoaringBitmapCodec, BEU32StrCodec, StrStrU8Codec, | ||||||
|     ObkvCodec, BoRoaringBitmapCodec, CboRoaringBitmapCodec, |     ObkvCodec, BoRoaringBitmapCodec, CboRoaringBitmapCodec, | ||||||
| }; | }; | ||||||
|  | pub use self::update::UpdateStore; | ||||||
|  |  | ||||||
| pub type FastMap4<K, V> = HashMap<K, V, BuildHasherDefault<FxHasher32>>; | pub type FastMap4<K, V> = HashMap<K, V, BuildHasherDefault<FxHasher32>>; | ||||||
| pub type FastMap8<K, V> = HashMap<K, V, BuildHasherDefault<FxHasher64>>; | pub type FastMap8<K, V> = HashMap<K, V, BuildHasherDefault<FxHasher64>>; | ||||||
|   | |||||||
| @@ -1,3 +1,4 @@ | |||||||
|  | use std::borrow::Cow; | ||||||
| use std::collections::{HashMap, HashSet}; | use std::collections::{HashMap, HashSet}; | ||||||
|  |  | ||||||
| use fst::{IntoStreamer, Streamer}; | use fst::{IntoStreamer, Streamer}; | ||||||
| @@ -81,7 +82,7 @@ impl<'a> Search<'a> { | |||||||
|     /// the associated documents ids. |     /// the associated documents ids. | ||||||
|     fn fetch_words_docids( |     fn fetch_words_docids( | ||||||
|         &self, |         &self, | ||||||
|         fst: &fst::Set<&[u8]>, |         fst: &fst::Set<Cow<[u8]>>, | ||||||
|         dfas: Vec<(String, bool, DFA)>, |         dfas: Vec<(String, bool, DFA)>, | ||||||
|     ) -> anyhow::Result<Vec<(HashMap<String, (u8, RoaringBitmap)>, RoaringBitmap)>> |     ) -> anyhow::Result<Vec<(HashMap<String, (u8, RoaringBitmap)>, RoaringBitmap)>> | ||||||
|     { |     { | ||||||
| @@ -135,20 +136,14 @@ impl<'a> Search<'a> { | |||||||
|     pub fn execute(&self) -> anyhow::Result<SearchResult> { |     pub fn execute(&self) -> anyhow::Result<SearchResult> { | ||||||
|         let limit = self.limit; |         let limit = self.limit; | ||||||
|  |  | ||||||
|         let fst = match self.index.fst(self.rtxn)? { |         let fst = self.index.words_fst(self.rtxn)?; | ||||||
|             Some(fst) => fst, |  | ||||||
|             None => return Ok(Default::default()), |  | ||||||
|         }; |  | ||||||
|  |  | ||||||
|         // Construct the DFAs related to the query words. |         // Construct the DFAs related to the query words. | ||||||
|         let dfas = match self.query.as_deref().map(Self::generate_query_dfas) { |         let dfas = match self.query.as_deref().map(Self::generate_query_dfas) { | ||||||
|             Some(dfas) if !dfas.is_empty() => dfas, |             Some(dfas) if !dfas.is_empty() => dfas, | ||||||
|             _ => { |             _ => { | ||||||
|                 // If the query is not set or results in no DFAs we return a placeholder. |                 // If the query is not set or results in no DFAs we return a placeholder. | ||||||
|                 let documents_ids = match self.index.documents_ids(self.rtxn)? { |                 let documents_ids = self.index.documents_ids(self.rtxn)?.iter().take(limit).collect(); | ||||||
|                     Some(docids) => docids.iter().take(limit).collect(), |  | ||||||
|                     None => Vec::new(), |  | ||||||
|                 }; |  | ||||||
|                 return Ok(SearchResult { documents_ids, ..Default::default() }) |                 return Ok(SearchResult { documents_ids, ..Default::default() }) | ||||||
|             }, |             }, | ||||||
|         }; |         }; | ||||||
|   | |||||||
| @@ -199,10 +199,10 @@ fn biggest_value_sizes(index: &Index, rtxn: &heed::RoTxn, limit: usize) -> anyho | |||||||
|     let mut heap = BinaryHeap::with_capacity(limit + 1); |     let mut heap = BinaryHeap::with_capacity(limit + 1); | ||||||
|  |  | ||||||
|     if limit > 0 { |     if limit > 0 { | ||||||
|         if let Some(fst) = index.fst(rtxn)? { |         let words_fst = index.words_fst(rtxn)?; | ||||||
|             heap.push(Reverse((fst.as_fst().as_bytes().len(), format!("words-fst"), main_name))); |  | ||||||
|             if heap.len() > limit { heap.pop(); } |         heap.push(Reverse((words_fst.as_fst().as_bytes().len(), format!("words-fst"), main_name))); | ||||||
|         } |         if heap.len() > limit { heap.pop(); } | ||||||
|  |  | ||||||
|         if let Some(documents) = index.main.get::<_, Str, ByteSlice>(rtxn, "documents")? { |         if let Some(documents) = index.main.get::<_, Str, ByteSlice>(rtxn, "documents")? { | ||||||
|             heap.push(Reverse((documents.len(), format!("documents"), main_name))); |             heap.push(Reverse((documents.len(), format!("documents"), main_name))); | ||||||
| @@ -265,13 +265,8 @@ fn export_words_fst(index: &Index, rtxn: &heed::RoTxn, output: PathBuf) -> anyho | |||||||
|     let mut output = File::create(&output) |     let mut output = File::create(&output) | ||||||
|         .with_context(|| format!("failed to create {} file", output.display()))?; |         .with_context(|| format!("failed to create {} file", output.display()))?; | ||||||
|  |  | ||||||
|     match index.fst(rtxn)? { |     let words_fst = index.words_fst(rtxn)?; | ||||||
|         Some(fst) =>  output.write_all(fst.as_fst().as_bytes())?, |     output.write_all(words_fst.as_fst().as_bytes())?; | ||||||
|         None => { |  | ||||||
|             let fst = fst::Set::default(); |  | ||||||
|             output.write_all(fst.as_fst().as_bytes())?; |  | ||||||
|         }, |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     Ok(()) |     Ok(()) | ||||||
| } | } | ||||||
|   | |||||||
| @@ -62,7 +62,7 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { | |||||||
|         let result = index.search(&rtxn).query(query).execute().unwrap(); |         let result = index.search(&rtxn).query(query).execute().unwrap(); | ||||||
|  |  | ||||||
|         let mut stdout = io::stdout(); |         let mut stdout = io::stdout(); | ||||||
|         let fields_ids_map = index.fields_ids_map(&rtxn)?.unwrap_or_default(); |         let fields_ids_map = index.fields_ids_map(&rtxn)?; | ||||||
|         let documents = index.documents(&rtxn, result.documents_ids.iter().cloned())?; |         let documents = index.documents(&rtxn, result.documents_ids.iter().cloned())?; | ||||||
|  |  | ||||||
|         for (_id, record) in documents { |         for (_id, record) in documents { | ||||||
|   | |||||||
| @@ -1,4 +1,3 @@ | |||||||
| use std::borrow::Cow; |  | ||||||
| use std::collections::HashSet; | use std::collections::HashSet; | ||||||
| use std::fs::{File, create_dir_all}; | use std::fs::{File, create_dir_all}; | ||||||
| use std::{mem, io}; | use std::{mem, io}; | ||||||
| @@ -156,13 +155,10 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { | |||||||
|                 UpdateMeta::DocumentsAddition => { |                 UpdateMeta::DocumentsAddition => { | ||||||
|                     // We must use the write transaction of the update here. |                     // We must use the write transaction of the update here. | ||||||
|                     let rtxn = env_cloned.read_txn()?; |                     let rtxn = env_cloned.read_txn()?; | ||||||
|                     let fields_ids_map = index_cloned.fields_ids_map(&rtxn)?.unwrap_or_default(); |                     let fields_ids_map = index_cloned.fields_ids_map(&rtxn)?; | ||||||
|                     let documents_ids = index_cloned.documents_ids(&rtxn)?.unwrap_or_default(); |                     let documents_ids = index_cloned.documents_ids(&rtxn)?; | ||||||
|                     let available_documents_ids = AvailableDocumentsIds::from_documents_ids(&documents_ids); |                     let available_documents_ids = AvailableDocumentsIds::from_documents_ids(&documents_ids); | ||||||
|                     let users_ids_documents_ids = match index_cloned.users_ids_documents_ids(&rtxn).unwrap() { |                     let users_ids_documents_ids = index_cloned.users_ids_documents_ids(&rtxn).unwrap(); | ||||||
|                         Some(map) => map.map_data(Cow::Borrowed).unwrap(), |  | ||||||
|                         None => fst::Map::default().map_data(Cow::Owned).unwrap(), |  | ||||||
|                     }; |  | ||||||
|  |  | ||||||
|                     let transform = Transform { |                     let transform = Transform { | ||||||
|                         fields_ids_map, |                         fields_ids_map, | ||||||
| @@ -395,7 +391,7 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { | |||||||
|             let SearchResult { found_words, documents_ids } = search.execute().unwrap(); |             let SearchResult { found_words, documents_ids } = search.execute().unwrap(); | ||||||
|  |  | ||||||
|             let mut documents = Vec::new(); |             let mut documents = Vec::new(); | ||||||
|             let fields_ids_map = index.fields_ids_map(&rtxn).unwrap().unwrap_or_default(); |             let fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); | ||||||
|  |  | ||||||
|             for (_id, record) in index.documents(&rtxn, documents_ids).unwrap() { |             for (_id, record) in index.documents(&rtxn, documents_ids).unwrap() { | ||||||
|                 let mut record = record.iter() |                 let mut record = record.iter() | ||||||
|   | |||||||
							
								
								
									
										5
									
								
								src/update/mod.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										5
									
								
								src/update/mod.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,5 @@ | |||||||
|  | mod update_builder; | ||||||
|  | mod update_store; | ||||||
|  |  | ||||||
|  | pub use self::update_builder::UpdateBuilder; | ||||||
|  | pub use self::update_store::UpdateStore; | ||||||
							
								
								
									
										356
									
								
								src/update/update_builder.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										356
									
								
								src/update/update_builder.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,356 @@ | |||||||
|  | use std::borrow::Cow; | ||||||
|  | use std::convert::TryFrom; | ||||||
|  |  | ||||||
|  | use fst::{IntoStreamer, Streamer}; | ||||||
|  | use grenad::CompressionType; | ||||||
|  | use itertools::Itertools; | ||||||
|  | use roaring::RoaringBitmap; | ||||||
|  |  | ||||||
|  | use crate::{Index, BEU32}; | ||||||
|  |  | ||||||
|  | pub struct UpdateBuilder { | ||||||
|  |     log_every_n: usize, | ||||||
|  |     max_nb_chunks: Option<usize>, | ||||||
|  |     max_memory: usize, | ||||||
|  |     linked_hash_map_size: usize, | ||||||
|  |     chunk_compression_type: CompressionType, | ||||||
|  |     chunk_compression_level: Option<u32>, | ||||||
|  |     chunk_fusing_shrink_size: u64, | ||||||
|  |     enable_chunk_fusing: bool, | ||||||
|  |     indexing_jobs: Option<usize>, | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl UpdateBuilder { | ||||||
|  |     pub fn new() -> UpdateBuilder { | ||||||
|  |         todo!() | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub fn log_every_n(&mut self, log_every_n: usize) -> &mut Self { | ||||||
|  |         self.log_every_n = log_every_n; | ||||||
|  |         self | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub fn max_nb_chunks(&mut self, max_nb_chunks: usize) -> &mut Self { | ||||||
|  |         self.max_nb_chunks = Some(max_nb_chunks); | ||||||
|  |         self | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub fn max_memory(&mut self, max_memory: usize) -> &mut Self { | ||||||
|  |         self.max_memory = max_memory; | ||||||
|  |         self | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub fn linked_hash_map_size(&mut self, linked_hash_map_size: usize) -> &mut Self { | ||||||
|  |         self.linked_hash_map_size = linked_hash_map_size; | ||||||
|  |         self | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub fn chunk_compression_type(&mut self, chunk_compression_type: CompressionType) -> &mut Self { | ||||||
|  |         self.chunk_compression_type = chunk_compression_type; | ||||||
|  |         self | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub fn chunk_compression_level(&mut self, chunk_compression_level: u32) -> &mut Self { | ||||||
|  |         self.chunk_compression_level = Some(chunk_compression_level); | ||||||
|  |         self | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub fn chunk_fusing_shrink_size(&mut self, chunk_fusing_shrink_size: u64) -> &mut Self { | ||||||
|  |         self.chunk_fusing_shrink_size = chunk_fusing_shrink_size; | ||||||
|  |         self | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub fn enable_chunk_fusing(&mut self, enable_chunk_fusing: bool) -> &mut Self { | ||||||
|  |         self.enable_chunk_fusing = enable_chunk_fusing; | ||||||
|  |         self | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub fn indexing_jobs(&mut self, indexing_jobs: usize) -> &mut Self { | ||||||
|  |         self.indexing_jobs = Some(indexing_jobs); | ||||||
|  |         self | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub fn clear_documents<'t, 'u, 'i>( | ||||||
|  |         self, | ||||||
|  |         wtxn: &'t mut heed::RwTxn<'u>, | ||||||
|  |         index: &'i Index, | ||||||
|  |     ) -> ClearDocuments<'t, 'u, 'i> | ||||||
|  |     { | ||||||
|  |         ClearDocuments::new(wtxn, index) | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub fn delete_documents<'t, 'u, 'i>( | ||||||
|  |         self, | ||||||
|  |         wtxn: &'t mut heed::RwTxn<'u>, | ||||||
|  |         index: &'i Index, | ||||||
|  |     ) -> anyhow::Result<DeleteDocuments<'t, 'u, 'i>> | ||||||
|  |     { | ||||||
|  |         DeleteDocuments::new(wtxn, index) | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub fn index_documents<'t, 'u, 'i>( | ||||||
|  |         self, | ||||||
|  |         wtxn: &'t mut heed::RwTxn<'u>, | ||||||
|  |         index: &'i Index, | ||||||
|  |     ) -> IndexDocuments<'t, 'u, 'i> | ||||||
|  |     { | ||||||
|  |         IndexDocuments::new(wtxn, index) | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | pub struct ClearDocuments<'t, 'u, 'i> { | ||||||
|  |     wtxn: &'t mut heed::RwTxn<'u>, | ||||||
|  |     index: &'i Index, | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl<'t, 'u, 'i> ClearDocuments<'t, 'u, 'i> { | ||||||
|  |     fn new(wtxn: &'t mut heed::RwTxn<'u>, index: &'i Index) -> ClearDocuments<'t, 'u, 'i> { | ||||||
|  |         ClearDocuments { wtxn, index } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub fn execute(self) -> anyhow::Result<usize> { | ||||||
|  |         let Index { | ||||||
|  |             main: _main, | ||||||
|  |             word_docids, | ||||||
|  |             docid_word_positions, | ||||||
|  |             word_pair_proximity_docids, | ||||||
|  |             documents, | ||||||
|  |         } = self.index; | ||||||
|  |  | ||||||
|  |         // We clear the word fst. | ||||||
|  |         self.index.put_words_fst(self.wtxn, &fst::Set::default())?; | ||||||
|  |  | ||||||
|  |         // We clear the users ids documents ids. | ||||||
|  |         self.index.put_users_ids_documents_ids(self.wtxn, &fst::Map::default())?; | ||||||
|  |  | ||||||
|  |         // We retrieve the documents ids. | ||||||
|  |         let documents_ids = self.index.documents_ids(self.wtxn)?; | ||||||
|  |  | ||||||
|  |         // We clear the internal documents ids. | ||||||
|  |         self.index.put_documents_ids(self.wtxn, &RoaringBitmap::default())?; | ||||||
|  |  | ||||||
|  |         // We clear the word docids. | ||||||
|  |         word_docids.clear(self.wtxn)?; | ||||||
|  |  | ||||||
|  |         // We clear the docid word positions. | ||||||
|  |         docid_word_positions.clear(self.wtxn)?; | ||||||
|  |  | ||||||
|  |         // We clear the word pair proximity docids. | ||||||
|  |         word_pair_proximity_docids.clear(self.wtxn)?; | ||||||
|  |  | ||||||
|  |         // We clear the documents themselves. | ||||||
|  |         documents.clear(self.wtxn)?; | ||||||
|  |  | ||||||
|  |         Ok(documents_ids.len() as usize) | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | pub struct DeleteDocuments<'t, 'u, 'i> { | ||||||
|  |     wtxn: &'t mut heed::RwTxn<'u>, | ||||||
|  |     index: &'i Index, | ||||||
|  |     users_ids_documents_ids: fst::Map<Vec<u8>>, | ||||||
|  |     documents_ids: RoaringBitmap, | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { | ||||||
|  |     fn new(wtxn: &'t mut heed::RwTxn<'u>, index: &'i Index) -> anyhow::Result<DeleteDocuments<'t, 'u, 'i>> { | ||||||
|  |         let users_ids_documents_ids = index | ||||||
|  |             .users_ids_documents_ids(wtxn)? | ||||||
|  |             .map_data(Cow::into_owned)?; | ||||||
|  |  | ||||||
|  |         Ok(DeleteDocuments { | ||||||
|  |             wtxn, | ||||||
|  |             index, | ||||||
|  |             users_ids_documents_ids, | ||||||
|  |             documents_ids: RoaringBitmap::new(), | ||||||
|  |         }) | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub fn delete_document(&mut self, docid: u32) { | ||||||
|  |         self.documents_ids.insert(docid); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub fn delete_documents(&mut self, docids: &RoaringBitmap) { | ||||||
|  |         self.documents_ids.union_with(docids); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub fn delete_user_id(&mut self, user_id: &str) -> Option<u32> { | ||||||
|  |         let docid = self.users_ids_documents_ids.get(user_id).map(|id| u32::try_from(id).unwrap())?; | ||||||
|  |         self.delete_document(docid); | ||||||
|  |         Some(docid) | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub fn execute(self) -> anyhow::Result<usize> { | ||||||
|  |         // We retrieve remove the deleted documents ids and write them into the database. | ||||||
|  |         let mut documents_ids = self.index.documents_ids(self.wtxn)?; | ||||||
|  |  | ||||||
|  |         // We can and must stop removing documents in a database that is empty. | ||||||
|  |         if documents_ids.is_empty() { | ||||||
|  |             return Ok(0); | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         documents_ids.intersect_with(&self.documents_ids); | ||||||
|  |         self.index.put_documents_ids(self.wtxn, &documents_ids)?; | ||||||
|  |  | ||||||
|  |         let fields_ids_map = self.index.fields_ids_map(self.wtxn)?; | ||||||
|  |         let id_field = fields_ids_map.id("id").expect(r#"the field "id" to be present"#); | ||||||
|  |  | ||||||
|  |         let Index { | ||||||
|  |             main: _main, | ||||||
|  |             word_docids, | ||||||
|  |             docid_word_positions, | ||||||
|  |             word_pair_proximity_docids, | ||||||
|  |             documents, | ||||||
|  |         } = self.index; | ||||||
|  |  | ||||||
|  |         // Retrieve the words and the users ids contained in the documents. | ||||||
|  |         // TODO we must use a smallword instead of a string. | ||||||
|  |         let mut words = Vec::new(); | ||||||
|  |         let mut users_ids = Vec::new(); | ||||||
|  |         for docid in &documents_ids { | ||||||
|  |             // We create an iterator to be able to get the content and delete the document | ||||||
|  |             // content itself. It's faster to acquire a cursor to get and delete, | ||||||
|  |             // as we avoid traversing the LMDB B-Tree two times but only once. | ||||||
|  |             let key = BEU32::new(docid); | ||||||
|  |             let mut iter = documents.range_mut(self.wtxn, &(key..=key))?; | ||||||
|  |             if let Some((_key, obkv)) = iter.next().transpose()? { | ||||||
|  |                 if let Some(content) = obkv.get(id_field) { | ||||||
|  |                     let user_id: String = serde_json::from_slice(content).unwrap(); | ||||||
|  |                     users_ids.push(user_id); | ||||||
|  |                 } | ||||||
|  |                 iter.del_current()?; | ||||||
|  |             } | ||||||
|  |             drop(iter); | ||||||
|  |  | ||||||
|  |             // We iterate througt the words positions of the document id, | ||||||
|  |             // retrieve the word and delete the positions. | ||||||
|  |             let mut iter = docid_word_positions.prefix_iter_mut(self.wtxn, &(docid, ""))?; | ||||||
|  |             while let Some(result) = iter.next() { | ||||||
|  |                 let ((_docid, word), _positions) = result?; | ||||||
|  |                 // This boolean will indicate if we must remove this word from the words FST. | ||||||
|  |                 words.push((String::from(word), false)); | ||||||
|  |                 iter.del_current()?; | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         // We create the FST map of the users ids that we must delete. | ||||||
|  |         users_ids.sort_unstable(); | ||||||
|  |         let users_ids_to_delete = fst::Set::from_iter(users_ids)?; | ||||||
|  |         let users_ids_to_delete = fst::Map::from(users_ids_to_delete.into_fst()); | ||||||
|  |  | ||||||
|  |         let new_users_ids_documents_ids = { | ||||||
|  |             // We acquire the current users ids documents ids map and create | ||||||
|  |             // a difference operation between the current and to-delete users ids. | ||||||
|  |             let users_ids_documents_ids = self.index.users_ids_documents_ids(self.wtxn)?; | ||||||
|  |             let difference = users_ids_documents_ids.op().add(&users_ids_to_delete).difference(); | ||||||
|  |  | ||||||
|  |             // We stream the new users ids that does no more contains the to-delete users ids. | ||||||
|  |             let mut iter = difference.into_stream(); | ||||||
|  |             let mut new_users_ids_documents_ids_builder = fst::MapBuilder::memory(); | ||||||
|  |             while let Some((userid, docids)) = iter.next() { | ||||||
|  |                 new_users_ids_documents_ids_builder.insert(userid, docids[0].value)?; | ||||||
|  |             } | ||||||
|  |  | ||||||
|  |             // We create an FST map from the above builder. | ||||||
|  |             new_users_ids_documents_ids_builder.into_map() | ||||||
|  |         }; | ||||||
|  |  | ||||||
|  |         // We write the new users ids into the main database. | ||||||
|  |         self.index.put_users_ids_documents_ids(self.wtxn, &new_users_ids_documents_ids)?; | ||||||
|  |  | ||||||
|  |         // Maybe we can improve the get performance of the words | ||||||
|  |         // if we sort the words first, keeping the LMDB pages in cache. | ||||||
|  |         words.sort_unstable(); | ||||||
|  |  | ||||||
|  |         // We iterate over the words and delete the documents ids | ||||||
|  |         // from the word docids database. | ||||||
|  |         for (word, must_remove) in &mut words { | ||||||
|  |             // We create an iterator to be able to get the content and delete the word docids. | ||||||
|  |             // It's faster to acquire a cursor to get and delete or put, as we avoid traversing | ||||||
|  |             // the LMDB B-Tree two times but only once. | ||||||
|  |             let mut iter = word_docids.prefix_iter_mut(self.wtxn, &word)?; | ||||||
|  |             if let Some((key, mut docids)) = iter.next().transpose()? { | ||||||
|  |                 if key == word { | ||||||
|  |                     docids.difference_with(&mut documents_ids); | ||||||
|  |                     if docids.is_empty() { | ||||||
|  |                         iter.del_current()?; | ||||||
|  |                         *must_remove = true; | ||||||
|  |                     } else { | ||||||
|  |                         iter.put_current(key, &docids)?; | ||||||
|  |                     } | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         // We construct an FST set that contains the words to delete from the words FST. | ||||||
|  |         let words_to_delete = words.iter().filter_map(|(w, d)| if *d { Some(w) } else { None }); | ||||||
|  |         let words_to_delete = fst::Set::from_iter(words_to_delete)?; | ||||||
|  |  | ||||||
|  |         let new_words_fst = { | ||||||
|  |             // We retrieve the current words FST from the database. | ||||||
|  |             let words_fst = self.index.words_fst(self.wtxn)?; | ||||||
|  |             let difference = words_fst.op().add(&words_to_delete).difference(); | ||||||
|  |  | ||||||
|  |             // We stream the new users ids that does no more contains the to-delete users ids. | ||||||
|  |             let mut new_words_fst_builder = fst::SetBuilder::memory(); | ||||||
|  |             new_words_fst_builder.extend_stream(difference.into_stream())?; | ||||||
|  |  | ||||||
|  |             // We create an words FST set from the above builder. | ||||||
|  |             new_words_fst_builder.into_set() | ||||||
|  |         }; | ||||||
|  |  | ||||||
|  |         // We write the new words FST into the main database. | ||||||
|  |         self.index.put_words_fst(self.wtxn, &new_words_fst)?; | ||||||
|  |  | ||||||
|  |         // We delete the documents ids that are under the pairs of words we found. | ||||||
|  |         // TODO We can maybe improve this by using the `compute_words_pair_proximities` | ||||||
|  |         //      function instead of iterating over all the possible word pairs. | ||||||
|  |         for ((w1, _), (w2, _)) in words.iter().cartesian_product(&words) { | ||||||
|  |             let start = &(w1.as_str(), w2.as_str(), 0); | ||||||
|  |             let end = &(w1.as_str(), w2.as_str(), 7); | ||||||
|  |             let mut iter = word_pair_proximity_docids.range_mut(self.wtxn, &(start..=end))?; | ||||||
|  |             while let Some(result) = iter.next() { | ||||||
|  |                 let ((w1, w2, prox), mut docids) = result?; | ||||||
|  |                 docids.difference_with(&documents_ids); | ||||||
|  |                 if docids.is_empty() { | ||||||
|  |                     iter.del_current()?; | ||||||
|  |                 } else { | ||||||
|  |                     iter.put_current(&(w1, w2, prox), &docids)?; | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         Ok(documents_ids.len() as usize) | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | pub enum IndexDocumentsMethod { | ||||||
|  |     /// Replace the previous document with the new one, | ||||||
|  |     /// removing all the already known attributes. | ||||||
|  |     ReplaceDocuments, | ||||||
|  |  | ||||||
|  |     /// Merge the previous version of the document with the new version, | ||||||
|  |     /// replacing old attributes values with the new ones and add the new attributes. | ||||||
|  |     UpdateDocuments, | ||||||
|  | } | ||||||
|  |  | ||||||
|  | pub struct IndexDocuments<'t, 'u, 'i> { | ||||||
|  |     wtxn: &'t mut heed::RwTxn<'u>, | ||||||
|  |     index: &'i Index, | ||||||
|  |     update_method: IndexDocumentsMethod, | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl<'t, 'u, 'i> IndexDocuments<'t, 'u, 'i> { | ||||||
|  |     fn new(wtxn: &'t mut heed::RwTxn<'u>, index: &'i Index) -> IndexDocuments<'t, 'u, 'i> { | ||||||
|  |         IndexDocuments { wtxn, index, update_method: IndexDocumentsMethod::ReplaceDocuments } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub fn index_documents_method(&mut self, method: IndexDocumentsMethod) -> &mut Self { | ||||||
|  |         self.update_method = method; | ||||||
|  |         self | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub fn execute(self) -> anyhow::Result<()> { | ||||||
|  |         todo!() | ||||||
|  |     } | ||||||
|  | } | ||||||
		Reference in New Issue
	
	Block a user