mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-31 07:56:28 +00:00 
			
		
		
		
	Rework the WordsPrefixPositionDocids update to compute a subset of the database
This commit is contained in:
		
				
					committed by
					
						 Kerollmops
						Kerollmops
					
				
			
			
				
	
			
			
			
						parent
						
							dbba5fd461
						
					
				
				
					commit
					e9c02173cf
				
			| @@ -285,6 +285,7 @@ where | ||||
|         let index_is_empty = index_documents_ids.len() == 0; | ||||
|         let mut final_documents_ids = RoaringBitmap::new(); | ||||
|         let mut word_pair_proximity_docids = Vec::new(); | ||||
|         let mut word_position_docids = Vec::new(); | ||||
|         let mut word_docids = Vec::new(); | ||||
|  | ||||
|         let mut databases_seen = 0; | ||||
| @@ -321,6 +322,19 @@ where | ||||
|                     let chunk = grenad::Reader::new(file)?; | ||||
|                     TypedChunk::WordPairProximityDocids(chunk) | ||||
|                 } | ||||
|                 TypedChunk::WordPositionDocids(chunk) => { | ||||
|                     // We extract and mmap our chunk file to be able to get it for next processes. | ||||
|                     let mut file = chunk.into_inner(); | ||||
|                     let mmap = unsafe { memmap2::Mmap::map(&file)? }; | ||||
|                     let cursor_mmap = CursorClonableMmap::new(ClonableMmap::from(mmap)); | ||||
|                     let chunk = grenad::Reader::new(cursor_mmap)?; | ||||
|                     word_position_docids.push(chunk); | ||||
|  | ||||
|                     // We reconstruct our typed-chunk back. | ||||
|                     file.rewind()?; | ||||
|                     let chunk = grenad::Reader::new(file)?; | ||||
|                     TypedChunk::WordPositionDocids(chunk) | ||||
|                 } | ||||
|                 otherwise => otherwise, | ||||
|             }; | ||||
|  | ||||
| @@ -359,7 +373,11 @@ where | ||||
|         let all_documents_ids = index_documents_ids | new_documents_ids | replaced_documents_ids; | ||||
|         self.index.put_documents_ids(self.wtxn, &all_documents_ids)?; | ||||
|  | ||||
|         self.execute_prefix_databases(word_docids, word_pair_proximity_docids)?; | ||||
|         self.execute_prefix_databases( | ||||
|             word_docids, | ||||
|             word_pair_proximity_docids, | ||||
|             word_position_docids, | ||||
|         )?; | ||||
|  | ||||
|         Ok(all_documents_ids.len()) | ||||
|     } | ||||
| @@ -369,6 +387,7 @@ where | ||||
|         self, | ||||
|         word_docids: Vec<grenad::Reader<CursorClonableMmap>>, | ||||
|         word_pair_proximity_docids: Vec<grenad::Reader<CursorClonableMmap>>, | ||||
|         word_position_docids: Vec<grenad::Reader<CursorClonableMmap>>, | ||||
|     ) -> Result<()> | ||||
|     where | ||||
|         F: Fn(UpdateIndexingStep) + Sync, | ||||
| @@ -453,7 +472,7 @@ where | ||||
|         if let Some(value) = self.config.words_positions_min_level_size { | ||||
|             builder.min_level_size(value); | ||||
|         } | ||||
|         builder.execute()?; | ||||
|         builder.execute(word_position_docids, &previous_words_prefixes_fst)?; | ||||
|  | ||||
|         databases_seen += 1; | ||||
|         (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { | ||||
|   | ||||
| @@ -1,17 +1,20 @@ | ||||
| use std::collections::HashMap; | ||||
| use std::num::NonZeroU32; | ||||
| use std::{cmp, str}; | ||||
|  | ||||
| use fst::Streamer; | ||||
| use grenad::CompressionType; | ||||
| use grenad::{CompressionType, MergerBuilder}; | ||||
| use heed::types::ByteSlice; | ||||
| use heed::BytesEncode; | ||||
| use heed::{BytesDecode, BytesEncode}; | ||||
| use log::debug; | ||||
| use slice_group_by::GroupBy; | ||||
|  | ||||
| use crate::error::SerializationError; | ||||
| use crate::heed_codec::StrBEU32Codec; | ||||
| use crate::index::main_key::WORDS_PREFIXES_FST_KEY; | ||||
| use crate::update::index_documents::{ | ||||
|     create_sorter, merge_cbo_roaring_bitmaps, sorter_into_lmdb_database, WriteMethod, | ||||
|     create_sorter, fst_stream_into_hashset, fst_stream_into_vec, merge_cbo_roaring_bitmaps, | ||||
|     sorter_into_lmdb_database, CursorClonableMmap, MergeFn, WriteMethod, | ||||
| }; | ||||
| use crate::{Index, Result}; | ||||
|  | ||||
| @@ -54,12 +57,27 @@ impl<'t, 'u, 'i> WordPrefixPositionDocids<'t, 'u, 'i> { | ||||
|     } | ||||
|  | ||||
|     #[logging_timer::time("WordPrefixPositionDocids::{}")] | ||||
|     pub fn execute(self) -> Result<()> { | ||||
|     pub fn execute<A: AsRef<[u8]>>( | ||||
|         self, | ||||
|         new_word_position_docids: Vec<grenad::Reader<CursorClonableMmap>>, | ||||
|         old_prefix_fst: &fst::Set<A>, | ||||
|     ) -> Result<()> { | ||||
|         debug!("Computing and writing the word levels positions docids into LMDB on disk..."); | ||||
|  | ||||
|         self.index.word_prefix_position_docids.clear(self.wtxn)?; | ||||
|         let prefix_fst = self.index.words_prefixes_fst(self.wtxn)?; | ||||
|  | ||||
|         let mut word_prefix_positions_docids_sorter = create_sorter( | ||||
|         // We retrieve the common words between the previous and new prefix word fst. | ||||
|         let common_prefix_fst_keys = | ||||
|             fst_stream_into_vec(old_prefix_fst.op().add(&prefix_fst).intersection()); | ||||
|         let common_prefix_fst_keys: Vec<_> = common_prefix_fst_keys | ||||
|             .as_slice() | ||||
|             .linear_group_by_key(|x| x.chars().nth(0).unwrap()) | ||||
|             .collect(); | ||||
|  | ||||
|         // We compute the set of prefixes that are no more part of the prefix fst. | ||||
|         let suppr_pw = fst_stream_into_hashset(old_prefix_fst.op().add(&prefix_fst).difference()); | ||||
|  | ||||
|         let mut prefix_position_docids_sorter = create_sorter( | ||||
|             merge_cbo_roaring_bitmaps, | ||||
|             self.chunk_compression_type, | ||||
|             self.chunk_compression_level, | ||||
| @@ -67,39 +85,107 @@ impl<'t, 'u, 'i> WordPrefixPositionDocids<'t, 'u, 'i> { | ||||
|             self.max_memory, | ||||
|         ); | ||||
|  | ||||
|         // We insert the word prefix position and | ||||
|         // corresponds to the word-prefix position where the prefixes appears | ||||
|         // in the prefix FST previously constructed. | ||||
|         let prefix_fst = self.index.words_prefixes_fst(self.wtxn)?; | ||||
|         let mut word_position_docids_merger = MergerBuilder::new(merge_cbo_roaring_bitmaps); | ||||
|         word_position_docids_merger.extend(new_word_position_docids); | ||||
|         let mut word_position_docids_iter = | ||||
|             word_position_docids_merger.build().into_merger_iter()?; | ||||
|  | ||||
|         // We fetch all the new common prefixes between the previous and new prefix fst. | ||||
|         let mut buffer = Vec::new(); | ||||
|         let mut current_prefixes: Option<&&[String]> = None; | ||||
|         let mut prefixes_cache = HashMap::new(); | ||||
|         while let Some((key, data)) = word_position_docids_iter.next()? { | ||||
|             let (word, pos) = StrBEU32Codec::bytes_decode(key).ok_or(heed::Error::Decoding)?; | ||||
|  | ||||
|             current_prefixes = match current_prefixes.take() { | ||||
|                 Some(prefixes) if word.starts_with(&prefixes[0]) => Some(prefixes), | ||||
|                 _otherwise => { | ||||
|                     write_prefixes_in_sorter( | ||||
|                         &mut prefixes_cache, | ||||
|                         &mut prefix_position_docids_sorter, | ||||
|                     )?; | ||||
|                     common_prefix_fst_keys.iter().find(|prefixes| word.starts_with(&prefixes[0])) | ||||
|                 } | ||||
|             }; | ||||
|  | ||||
|             if let Some(prefixes) = current_prefixes { | ||||
|                 for prefix in prefixes.iter() { | ||||
|                     if word.starts_with(prefix) { | ||||
|                         buffer.clear(); | ||||
|                         buffer.extend_from_slice(prefix.as_bytes()); | ||||
|                         buffer.extend_from_slice(&pos.to_be_bytes()); | ||||
|                         match prefixes_cache.get_mut(&buffer) { | ||||
|                             Some(value) => value.push(data.to_owned()), | ||||
|                             None => { | ||||
|                                 prefixes_cache.insert(buffer.clone(), vec![data.to_owned()]); | ||||
|                             } | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_position_docids_sorter)?; | ||||
|  | ||||
|         // We fetch the docids associated to the newly added word prefix fst only. | ||||
|         let db = self.index.word_position_docids.remap_data_type::<ByteSlice>(); | ||||
|         // iter over all prefixes in the prefix fst. | ||||
|         let mut word_stream = prefix_fst.stream(); | ||||
|         while let Some(prefix_bytes) = word_stream.next() { | ||||
|         let mut new_prefixes_stream = prefix_fst.op().add(old_prefix_fst).difference(); | ||||
|         while let Some(prefix_bytes) = new_prefixes_stream.next() { | ||||
|             let prefix = str::from_utf8(prefix_bytes).map_err(|_| { | ||||
|                 SerializationError::Decoding { db_name: Some(WORDS_PREFIXES_FST_KEY) } | ||||
|             })?; | ||||
|  | ||||
|             // iter over all lines of the DB where the key is prefixed by the current prefix. | ||||
|             let mut iter = db | ||||
|             let iter = db | ||||
|                 .remap_key_type::<ByteSlice>() | ||||
|                 .prefix_iter(self.wtxn, &prefix_bytes)? | ||||
|                 .prefix_iter(self.wtxn, prefix_bytes)? | ||||
|                 .remap_key_type::<StrBEU32Codec>(); | ||||
|             while let Some(((_word, pos), data)) = iter.next().transpose()? { | ||||
|                 let key = (prefix, pos); | ||||
|                 let bytes = StrBEU32Codec::bytes_encode(&key).unwrap(); | ||||
|                 word_prefix_positions_docids_sorter.insert(bytes, data)?; | ||||
|             for result in iter { | ||||
|                 let ((word, pos), data) = result?; | ||||
|                 if word.starts_with(prefix) { | ||||
|                     let key = (prefix, pos); | ||||
|                     let bytes = StrBEU32Codec::bytes_encode(&key).unwrap(); | ||||
|                     prefix_position_docids_sorter.insert(bytes, data)?; | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         drop(new_prefixes_stream); | ||||
|  | ||||
|         // We remove all the entries that are no more required in this word prefix position | ||||
|         // docids database. | ||||
|         let mut iter = | ||||
|             self.index.word_prefix_position_docids.iter_mut(self.wtxn)?.lazily_decode_data(); | ||||
|         while let Some(((prefix, _), _)) = iter.next().transpose()? { | ||||
|             if suppr_pw.contains(prefix.as_bytes()) { | ||||
|                 unsafe { iter.del_current()? }; | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         drop(iter); | ||||
|  | ||||
|         // We finally write all the word prefix position docids into the LMDB database. | ||||
|         sorter_into_lmdb_database( | ||||
|             self.wtxn, | ||||
|             *self.index.word_prefix_position_docids.as_polymorph(), | ||||
|             word_prefix_positions_docids_sorter, | ||||
|             prefix_position_docids_sorter, | ||||
|             merge_cbo_roaring_bitmaps, | ||||
|             WriteMethod::Append, | ||||
|             WriteMethod::GetMergePut, | ||||
|         )?; | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| fn write_prefixes_in_sorter( | ||||
|     prefixes: &mut HashMap<Vec<u8>, Vec<Vec<u8>>>, | ||||
|     sorter: &mut grenad::Sorter<MergeFn>, | ||||
| ) -> Result<()> { | ||||
|     for (key, data_slices) in prefixes.drain() { | ||||
|         for data in data_slices { | ||||
|             sorter.insert(&key, data)?; | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     Ok(()) | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user