mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-31 07:56:28 +00:00 
			
		
		
		
	Rework the WordPrefixDocids update to compute a subset of the database
This commit is contained in:
		
				
					committed by
					
						 Kerollmops
						Kerollmops
					
				
			
			
				
	
			
			
			
						parent
						
							5404bc02dd
						
					
				
				
					commit
					28692f65be
				
			| @@ -285,6 +285,7 @@ where | ||||
|         let index_is_empty = index_documents_ids.len() == 0; | ||||
|         let mut final_documents_ids = RoaringBitmap::new(); | ||||
|         let mut word_pair_proximity_docids = Vec::new(); | ||||
|         let mut word_docids = Vec::new(); | ||||
|  | ||||
|         let mut databases_seen = 0; | ||||
|         (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { | ||||
| @@ -294,6 +295,19 @@ where | ||||
|  | ||||
|         for result in lmdb_writer_rx { | ||||
|             let typed_chunk = match result? { | ||||
|                 TypedChunk::WordDocids(chunk) => { | ||||
|                     // We extract and mmap our chunk file to be able to get it for next processes. | ||||
|                     let mut file = chunk.into_inner(); | ||||
|                     let mmap = unsafe { memmap2::Mmap::map(&file)? }; | ||||
|                     let cursor_mmap = CursorClonableMmap::new(ClonableMmap::from(mmap)); | ||||
|                     let chunk = grenad::Reader::new(cursor_mmap)?; | ||||
|                     word_docids.push(chunk); | ||||
|  | ||||
|                     // We reconstruct our typed-chunk back. | ||||
|                     file.rewind()?; | ||||
|                     let chunk = grenad::Reader::new(file)?; | ||||
|                     TypedChunk::WordDocids(chunk) | ||||
|                 } | ||||
|                 TypedChunk::WordPairProximityDocids(chunk) => { | ||||
|                     // We extract and mmap our chunk file to be able to get it for next processes. | ||||
|                     let mut file = chunk.into_inner(); | ||||
| @@ -345,7 +359,7 @@ where | ||||
|         let all_documents_ids = index_documents_ids | new_documents_ids | replaced_documents_ids; | ||||
|         self.index.put_documents_ids(self.wtxn, &all_documents_ids)?; | ||||
|  | ||||
|         self.execute_prefix_databases(word_pair_proximity_docids)?; | ||||
|         self.execute_prefix_databases(word_docids, word_pair_proximity_docids)?; | ||||
|  | ||||
|         Ok(all_documents_ids.len()) | ||||
|     } | ||||
| @@ -353,6 +367,7 @@ where | ||||
|     #[logging_timer::time("IndexDocuments::{}")] | ||||
|     pub fn execute_prefix_databases( | ||||
|         self, | ||||
|         word_docids: Vec<grenad::Reader<CursorClonableMmap>>, | ||||
|         word_pair_proximity_docids: Vec<grenad::Reader<CursorClonableMmap>>, | ||||
|     ) -> Result<()> | ||||
|     where | ||||
| @@ -404,7 +419,7 @@ where | ||||
|         builder.chunk_compression_level = self.indexer_config.chunk_compression_level; | ||||
|         builder.max_nb_chunks = self.indexer_config.max_nb_chunks; | ||||
|         builder.max_memory = self.indexer_config.max_memory; | ||||
|         builder.execute()?; | ||||
|         builder.execute(word_docids, &previous_words_prefixes_fst)?; | ||||
|  | ||||
|         databases_seen += 1; | ||||
|         (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { | ||||
|   | ||||
| @@ -1,11 +1,12 @@ | ||||
| use std::str; | ||||
| use std::collections::HashMap; | ||||
|  | ||||
| use fst::Streamer; | ||||
| use grenad::CompressionType; | ||||
| use heed::types::ByteSlice; | ||||
| use fst::IntoStreamer; | ||||
| use grenad::{CompressionType, MergerBuilder}; | ||||
| use slice_group_by::GroupBy; | ||||
|  | ||||
| use crate::update::index_documents::{ | ||||
|     create_sorter, merge_roaring_bitmaps, sorter_into_lmdb_database, WriteMethod, | ||||
|     create_sorter, fst_stream_into_hashset, merge_roaring_bitmaps, sorter_into_lmdb_database, | ||||
|     CursorClonableMmap, MergeFn, WriteMethod, | ||||
| }; | ||||
| use crate::{Index, Result}; | ||||
|  | ||||
| @@ -34,11 +35,18 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> { | ||||
|     } | ||||
|  | ||||
|     #[logging_timer::time("WordPrefixDocids::{}")] | ||||
|     pub fn execute(self) -> Result<()> { | ||||
|         // Clear the word prefix docids database. | ||||
|         self.index.word_prefix_docids.clear(self.wtxn)?; | ||||
|  | ||||
|     pub fn execute<A: AsRef<[u8]>>( | ||||
|         self, | ||||
|         new_word_docids: Vec<grenad::Reader<CursorClonableMmap>>, | ||||
|         old_prefix_fst: &fst::Set<A>, | ||||
|     ) -> Result<()> { | ||||
|         let prefix_fst = self.index.words_prefixes_fst(self.wtxn)?; | ||||
|         let prefix_fst_keys = prefix_fst.into_stream().into_strs()?; | ||||
|         let prefix_fst_keys: Vec<_> = | ||||
|             prefix_fst_keys.as_slice().linear_group_by_key(|x| x.chars().nth(0).unwrap()).collect(); | ||||
|  | ||||
|         // We compute the set of prefixes that are no more part of the prefix fst. | ||||
|         let suppr_pw = fst_stream_into_hashset(old_prefix_fst.op().add(&prefix_fst).difference()); | ||||
|  | ||||
|         // It is forbidden to keep a mutable reference into the database | ||||
|         // and write into it at the same time, therefore we write into another file. | ||||
| @@ -50,18 +58,46 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> { | ||||
|             self.max_memory, | ||||
|         ); | ||||
|  | ||||
|         // We iterate over all the prefixes and retrieve the corresponding docids. | ||||
|         let mut prefix_stream = prefix_fst.stream(); | ||||
|         while let Some(bytes) = prefix_stream.next() { | ||||
|             let prefix = str::from_utf8(bytes)?; | ||||
|             let db = self.index.word_docids.remap_data_type::<ByteSlice>(); | ||||
|             for result in db.prefix_iter(self.wtxn, prefix)? { | ||||
|                 let (_word, data) = result?; | ||||
|                 prefix_docids_sorter.insert(prefix, data)?; | ||||
|         let mut word_docids_merger = MergerBuilder::new(merge_roaring_bitmaps); | ||||
|         word_docids_merger.extend(new_word_docids); | ||||
|         let mut word_docids_iter = word_docids_merger.build().into_merger_iter()?; | ||||
|  | ||||
|         let mut current_prefixes: Option<&&[String]> = None; | ||||
|         let mut prefixes_cache = HashMap::new(); | ||||
|         while let Some((word, data)) = word_docids_iter.next()? { | ||||
|             current_prefixes = match current_prefixes.take() { | ||||
|                 Some(prefixes) if word.starts_with(&prefixes[0].as_bytes()) => Some(prefixes), | ||||
|                 _otherwise => { | ||||
|                     write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_docids_sorter)?; | ||||
|                     prefix_fst_keys | ||||
|                         .iter() | ||||
|                         .find(|prefixes| word.starts_with(&prefixes[0].as_bytes())) | ||||
|                 } | ||||
|             }; | ||||
|  | ||||
|             if let Some(prefixes) = current_prefixes { | ||||
|                 for prefix in prefixes.iter() { | ||||
|                     if word.starts_with(prefix.as_bytes()) { | ||||
|                         match prefixes_cache.get_mut(prefix.as_bytes()) { | ||||
|                             Some(value) => value.push(data.to_owned()), | ||||
|                             None => { | ||||
|                                 prefixes_cache.insert(prefix.clone().into(), vec![data.to_owned()]); | ||||
|                             } | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         drop(prefix_fst); | ||||
|         // We remove all the entries that are no more required in this word prefix docids database. | ||||
|         let mut iter = self.index.word_prefix_docids.iter_mut(self.wtxn)?.lazily_decode_data(); | ||||
|         while let Some((prefix, _)) = iter.next().transpose()? { | ||||
|             if suppr_pw.contains(prefix.as_bytes()) { | ||||
|                 unsafe { iter.del_current()? }; | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         drop(iter); | ||||
|  | ||||
|         // We finally write the word prefix docids into the LMDB database. | ||||
|         sorter_into_lmdb_database( | ||||
| @@ -69,9 +105,22 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> { | ||||
|             *self.index.word_prefix_docids.as_polymorph(), | ||||
|             prefix_docids_sorter, | ||||
|             merge_roaring_bitmaps, | ||||
|             WriteMethod::Append, | ||||
|             WriteMethod::GetMergePut, | ||||
|         )?; | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| fn write_prefixes_in_sorter( | ||||
|     prefixes: &mut HashMap<Vec<u8>, Vec<Vec<u8>>>, | ||||
|     sorter: &mut grenad::Sorter<MergeFn>, | ||||
| ) -> Result<()> { | ||||
|     for (key, data_slices) in prefixes.drain() { | ||||
|         for data in data_slices { | ||||
|             sorter.insert(&key, data)?; | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     Ok(()) | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user