mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-26 13:36:27 +00:00 
			
		
		
		
	Compute the new, common and, deleted prefix words fst once
This commit is contained in:
		| @@ -12,6 +12,7 @@ use crossbeam_channel::{Receiver, Sender}; | ||||
| use log::debug; | ||||
| use roaring::RoaringBitmap; | ||||
| use serde::{Deserialize, Serialize}; | ||||
| use slice_group_by::GroupBy; | ||||
| use typed_chunk::{write_typed_chunk_into_index, TypedChunk}; | ||||
|  | ||||
| pub use self::helpers::{ | ||||
| @@ -420,6 +421,27 @@ where | ||||
|         } | ||||
|         builder.execute()?; | ||||
|  | ||||
|         let current_prefix_fst = self.index.words_prefixes_fst(self.wtxn)?; | ||||
|  | ||||
|         // We retrieve the common words between the previous and new prefix word fst. | ||||
|         let common_prefix_fst_words = fst_stream_into_vec( | ||||
|             previous_words_prefixes_fst.op().add(¤t_prefix_fst).intersection(), | ||||
|         ); | ||||
|         let common_prefix_fst_words: Vec<_> = common_prefix_fst_words | ||||
|             .as_slice() | ||||
|             .linear_group_by_key(|x| x.chars().nth(0).unwrap()) | ||||
|             .collect(); | ||||
|  | ||||
|         // We retrieve the newly added words between the previous and new prefix word fst. | ||||
|         let new_prefix_fst_words = fst_stream_into_vec( | ||||
|             current_prefix_fst.op().add(&previous_words_prefixes_fst).difference(), | ||||
|         ); | ||||
|  | ||||
|         // We compute the set of prefixes that are no more part of the prefix fst. | ||||
|         let del_prefix_fst_words = fst_stream_into_hashset( | ||||
|             previous_words_prefixes_fst.op().add(¤t_prefix_fst).difference(), | ||||
|         ); | ||||
|  | ||||
|         databases_seen += 1; | ||||
|         (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { | ||||
|             databases_seen, | ||||
| @@ -432,7 +454,12 @@ where | ||||
|         builder.chunk_compression_level = self.indexer_config.chunk_compression_level; | ||||
|         builder.max_nb_chunks = self.indexer_config.max_nb_chunks; | ||||
|         builder.max_memory = self.indexer_config.max_memory; | ||||
|         builder.execute(word_docids, &previous_words_prefixes_fst)?; | ||||
|         builder.execute( | ||||
|             word_docids, | ||||
|             &new_prefix_fst_words, | ||||
|             &common_prefix_fst_words, | ||||
|             &del_prefix_fst_words, | ||||
|         )?; | ||||
|  | ||||
|         databases_seen += 1; | ||||
|         (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { | ||||
| @@ -446,7 +473,12 @@ where | ||||
|         builder.chunk_compression_level = self.indexer_config.chunk_compression_level; | ||||
|         builder.max_nb_chunks = self.indexer_config.max_nb_chunks; | ||||
|         builder.max_memory = self.indexer_config.max_memory; | ||||
|         builder.execute(word_pair_proximity_docids, &previous_words_prefixes_fst)?; | ||||
|         builder.execute( | ||||
|             word_pair_proximity_docids, | ||||
|             &new_prefix_fst_words, | ||||
|             &common_prefix_fst_words, | ||||
|             &del_prefix_fst_words, | ||||
|         )?; | ||||
|  | ||||
|         databases_seen += 1; | ||||
|         (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { | ||||
| @@ -466,7 +498,12 @@ where | ||||
|         if let Some(value) = self.config.words_positions_min_level_size { | ||||
|             builder.min_level_size(value); | ||||
|         } | ||||
|         builder.execute(word_position_docids, &previous_words_prefixes_fst)?; | ||||
|         builder.execute( | ||||
|             word_position_docids, | ||||
|             &new_prefix_fst_words, | ||||
|             &common_prefix_fst_words, | ||||
|             &del_prefix_fst_words, | ||||
|         )?; | ||||
|  | ||||
|         databases_seen += 1; | ||||
|         (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { | ||||
|   | ||||
| @@ -1,13 +1,10 @@ | ||||
| use std::collections::HashMap; | ||||
| use std::collections::{HashMap, HashSet}; | ||||
|  | ||||
| use fst::Streamer; | ||||
| use grenad::{CompressionType, MergerBuilder}; | ||||
| use heed::types::ByteSlice; | ||||
| use slice_group_by::GroupBy; | ||||
|  | ||||
| use crate::update::index_documents::{ | ||||
|     create_sorter, fst_stream_into_hashset, fst_stream_into_vec, merge_roaring_bitmaps, | ||||
|     sorter_into_lmdb_database, CursorClonableMmap, MergeFn, | ||||
|     create_sorter, merge_roaring_bitmaps, sorter_into_lmdb_database, CursorClonableMmap, MergeFn, | ||||
| }; | ||||
| use crate::{Index, Result}; | ||||
|  | ||||
| @@ -36,24 +33,13 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> { | ||||
|     } | ||||
|  | ||||
|     #[logging_timer::time("WordPrefixDocids::{}")] | ||||
|     pub fn execute<A: AsRef<[u8]>>( | ||||
|     pub fn execute( | ||||
|         self, | ||||
|         new_word_docids: Vec<grenad::Reader<CursorClonableMmap>>, | ||||
|         old_prefix_fst: &fst::Set<A>, | ||||
|         new_prefix_fst_words: &[String], | ||||
|         common_prefix_fst_words: &[&[String]], | ||||
|         del_prefix_fst_words: &HashSet<Vec<u8>>, | ||||
|     ) -> Result<()> { | ||||
|         let prefix_fst = self.index.words_prefixes_fst(self.wtxn)?; | ||||
|  | ||||
|         // We retrieve the common words between the previous and new prefix word fst. | ||||
|         let common_prefix_fst_keys = | ||||
|             fst_stream_into_vec(old_prefix_fst.op().add(&prefix_fst).intersection()); | ||||
|         let common_prefix_fst_keys: Vec<_> = common_prefix_fst_keys | ||||
|             .as_slice() | ||||
|             .linear_group_by_key(|x| x.chars().nth(0).unwrap()) | ||||
|             .collect(); | ||||
|  | ||||
|         // We compute the set of prefixes that are no more part of the prefix fst. | ||||
|         let suppr_pw = fst_stream_into_hashset(old_prefix_fst.op().add(&prefix_fst).difference()); | ||||
|  | ||||
|         // It is forbidden to keep a mutable reference into the database | ||||
|         // and write into it at the same time, therefore we write into another file. | ||||
|         let mut prefix_docids_sorter = create_sorter( | ||||
| @@ -75,7 +61,7 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> { | ||||
|                 Some(prefixes) if word.starts_with(&prefixes[0].as_bytes()) => Some(prefixes), | ||||
|                 _otherwise => { | ||||
|                     write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_docids_sorter)?; | ||||
|                     common_prefix_fst_keys | ||||
|                     common_prefix_fst_words | ||||
|                         .iter() | ||||
|                         .find(|prefixes| word.starts_with(&prefixes[0].as_bytes())) | ||||
|                 } | ||||
| @@ -99,21 +85,18 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> { | ||||
|  | ||||
|         // We fetch the docids associated to the newly added word prefix fst only. | ||||
|         let db = self.index.word_docids.remap_data_type::<ByteSlice>(); | ||||
|         let mut new_prefixes_stream = prefix_fst.op().add(old_prefix_fst).difference(); | ||||
|         while let Some(bytes) = new_prefixes_stream.next() { | ||||
|             let prefix = std::str::from_utf8(bytes)?; | ||||
|         for prefix in new_prefix_fst_words { | ||||
|             let prefix = std::str::from_utf8(prefix.as_bytes())?; | ||||
|             for result in db.prefix_iter(self.wtxn, prefix)? { | ||||
|                 let (_word, data) = result?; | ||||
|                 prefix_docids_sorter.insert(prefix, data)?; | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         drop(new_prefixes_stream); | ||||
|  | ||||
|         // We remove all the entries that are no more required in this word prefix docids database. | ||||
|         let mut iter = self.index.word_prefix_docids.iter_mut(self.wtxn)?.lazily_decode_data(); | ||||
|         while let Some((prefix, _)) = iter.next().transpose()? { | ||||
|             if suppr_pw.contains(prefix.as_bytes()) { | ||||
|             if del_prefix_fst_words.contains(prefix.as_bytes()) { | ||||
|                 unsafe { iter.del_current()? }; | ||||
|             } | ||||
|         } | ||||
|   | ||||
| @@ -1,4 +1,4 @@ | ||||
| use std::collections::HashMap; | ||||
| use std::collections::{HashMap, HashSet}; | ||||
|  | ||||
| use grenad::{CompressionType, MergerBuilder}; | ||||
| use heed::types::ByteSlice; | ||||
| @@ -7,8 +7,8 @@ use log::debug; | ||||
| use slice_group_by::GroupBy; | ||||
|  | ||||
| use crate::update::index_documents::{ | ||||
|     create_sorter, fst_stream_into_hashset, fst_stream_into_vec, merge_cbo_roaring_bitmaps, | ||||
|     sorter_into_lmdb_database, CursorClonableMmap, MergeFn, | ||||
|     create_sorter, merge_cbo_roaring_bitmaps, sorter_into_lmdb_database, CursorClonableMmap, | ||||
|     MergeFn, | ||||
| }; | ||||
| use crate::{Index, Result, StrStrU8Codec}; | ||||
|  | ||||
| @@ -62,40 +62,24 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { | ||||
|     } | ||||
|  | ||||
|     #[logging_timer::time("WordPrefixPairProximityDocids::{}")] | ||||
|     pub fn execute<A: AsRef<[u8]>>( | ||||
|     pub fn execute( | ||||
|         self, | ||||
|         new_word_pair_proximity_docids: Vec<grenad::Reader<CursorClonableMmap>>, | ||||
|         old_prefix_fst: &fst::Set<A>, | ||||
|         new_prefix_fst_words: &[String], | ||||
|         common_prefix_fst_words: &[&[String]], | ||||
|         del_prefix_fst_words: &HashSet<Vec<u8>>, | ||||
|     ) -> Result<()> { | ||||
|         debug!("Computing and writing the word prefix pair proximity docids into LMDB on disk..."); | ||||
|  | ||||
|         let new_prefix_fst_words: Vec<_> = | ||||
|             new_prefix_fst_words.linear_group_by_key(|x| x.chars().nth(0).unwrap()).collect(); | ||||
|  | ||||
|         // We retrieve and merge the created word pair proximities docids entries | ||||
|         // for the newly added documents. | ||||
|         let mut wppd_merger = MergerBuilder::new(merge_cbo_roaring_bitmaps); | ||||
|         wppd_merger.extend(new_word_pair_proximity_docids); | ||||
|         let mut wppd_iter = wppd_merger.build().into_merger_iter()?; | ||||
|  | ||||
|         let prefix_fst = self.index.words_prefixes_fst(self.wtxn)?; | ||||
|  | ||||
|         // We retrieve the common words between the previous and new prefix word fst. | ||||
|         let common_prefix_fst_keys = | ||||
|             fst_stream_into_vec(old_prefix_fst.op().add(&prefix_fst).intersection()); | ||||
|         let common_prefix_fst_keys: Vec<_> = common_prefix_fst_keys | ||||
|             .as_slice() | ||||
|             .linear_group_by_key(|x| x.chars().nth(0).unwrap()) | ||||
|             .collect(); | ||||
|  | ||||
|         // We retrieve the newly added words between the previous and new prefix word fst. | ||||
|         let new_prefix_fst_keys = | ||||
|             fst_stream_into_vec(prefix_fst.op().add(old_prefix_fst).difference()); | ||||
|         let new_prefix_fst_keys: Vec<_> = new_prefix_fst_keys | ||||
|             .as_slice() | ||||
|             .linear_group_by_key(|x| x.chars().nth(0).unwrap()) | ||||
|             .collect(); | ||||
|  | ||||
|         // We compute the set of prefixes that are no more part of the prefix fst. | ||||
|         let suppr_pw = fst_stream_into_hashset(old_prefix_fst.op().add(&prefix_fst).difference()); | ||||
|  | ||||
|         let mut word_prefix_pair_proximity_docids_sorter = create_sorter( | ||||
|             merge_cbo_roaring_bitmaps, | ||||
|             self.chunk_compression_type, | ||||
| @@ -120,7 +104,7 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { | ||||
|                 &mut current_prefixes, | ||||
|                 &mut prefixes_cache, | ||||
|                 &mut word_prefix_pair_proximity_docids_sorter, | ||||
|                 &common_prefix_fst_keys, | ||||
|                 common_prefix_fst_words, | ||||
|                 self.max_prefix_length, | ||||
|                 w1, | ||||
|                 w2, | ||||
| @@ -152,7 +136,7 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { | ||||
|                 &mut current_prefixes, | ||||
|                 &mut prefixes_cache, | ||||
|                 &mut word_prefix_pair_proximity_docids_sorter, | ||||
|                 &new_prefix_fst_keys, | ||||
|                 &new_prefix_fst_words, | ||||
|                 self.max_prefix_length, | ||||
|                 w1, | ||||
|                 w2, | ||||
| @@ -166,7 +150,6 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { | ||||
|             &mut word_prefix_pair_proximity_docids_sorter, | ||||
|         )?; | ||||
|  | ||||
|         drop(prefix_fst); | ||||
|         drop(db_iter); | ||||
|  | ||||
|         // All of the word prefix pairs in the database that have a w2 | ||||
| @@ -177,7 +160,7 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { | ||||
|             .remap_data_type::<ByteSlice>() | ||||
|             .iter_mut(self.wtxn)?; | ||||
|         while let Some(((_, w2, _), _)) = iter.next().transpose()? { | ||||
|             if suppr_pw.contains(w2.as_bytes()) { | ||||
|             if del_prefix_fst_words.contains(w2.as_bytes()) { | ||||
|                 // Delete this entry as the w2 prefix is no more in the words prefix fst. | ||||
|                 unsafe { iter.del_current()? }; | ||||
|             } | ||||
|   | ||||
| @@ -1,20 +1,18 @@ | ||||
| use std::collections::HashMap; | ||||
| use std::collections::{HashMap, HashSet}; | ||||
| use std::num::NonZeroU32; | ||||
| use std::{cmp, str}; | ||||
|  | ||||
| use fst::Streamer; | ||||
| use grenad::{CompressionType, MergerBuilder}; | ||||
| use heed::types::ByteSlice; | ||||
| use heed::{BytesDecode, BytesEncode}; | ||||
| use log::debug; | ||||
| use slice_group_by::GroupBy; | ||||
|  | ||||
| use crate::error::SerializationError; | ||||
| use crate::heed_codec::StrBEU32Codec; | ||||
| use crate::index::main_key::WORDS_PREFIXES_FST_KEY; | ||||
| use crate::update::index_documents::{ | ||||
|     create_sorter, fst_stream_into_hashset, fst_stream_into_vec, merge_cbo_roaring_bitmaps, | ||||
|     sorter_into_lmdb_database, CursorClonableMmap, MergeFn, | ||||
|     create_sorter, merge_cbo_roaring_bitmaps, sorter_into_lmdb_database, CursorClonableMmap, | ||||
|     MergeFn, | ||||
| }; | ||||
| use crate::{Index, Result}; | ||||
|  | ||||
| @@ -57,26 +55,15 @@ impl<'t, 'u, 'i> WordPrefixPositionDocids<'t, 'u, 'i> { | ||||
|     } | ||||
|  | ||||
|     #[logging_timer::time("WordPrefixPositionDocids::{}")] | ||||
|     pub fn execute<A: AsRef<[u8]>>( | ||||
|     pub fn execute( | ||||
|         self, | ||||
|         new_word_position_docids: Vec<grenad::Reader<CursorClonableMmap>>, | ||||
|         old_prefix_fst: &fst::Set<A>, | ||||
|         new_prefix_fst_words: &[String], | ||||
|         common_prefix_fst_words: &[&[String]], | ||||
|         del_prefix_fst_words: &HashSet<Vec<u8>>, | ||||
|     ) -> Result<()> { | ||||
|         debug!("Computing and writing the word levels positions docids into LMDB on disk..."); | ||||
|  | ||||
|         let prefix_fst = self.index.words_prefixes_fst(self.wtxn)?; | ||||
|  | ||||
|         // We retrieve the common words between the previous and new prefix word fst. | ||||
|         let common_prefix_fst_keys = | ||||
|             fst_stream_into_vec(old_prefix_fst.op().add(&prefix_fst).intersection()); | ||||
|         let common_prefix_fst_keys: Vec<_> = common_prefix_fst_keys | ||||
|             .as_slice() | ||||
|             .linear_group_by_key(|x| x.chars().nth(0).unwrap()) | ||||
|             .collect(); | ||||
|  | ||||
|         // We compute the set of prefixes that are no more part of the prefix fst. | ||||
|         let suppr_pw = fst_stream_into_hashset(old_prefix_fst.op().add(&prefix_fst).difference()); | ||||
|  | ||||
|         let mut prefix_position_docids_sorter = create_sorter( | ||||
|             merge_cbo_roaring_bitmaps, | ||||
|             self.chunk_compression_type, | ||||
| @@ -104,7 +91,7 @@ impl<'t, 'u, 'i> WordPrefixPositionDocids<'t, 'u, 'i> { | ||||
|                         &mut prefixes_cache, | ||||
|                         &mut prefix_position_docids_sorter, | ||||
|                     )?; | ||||
|                     common_prefix_fst_keys.iter().find(|prefixes| word.starts_with(&prefixes[0])) | ||||
|                     common_prefix_fst_words.iter().find(|prefixes| word.starts_with(&prefixes[0])) | ||||
|                 } | ||||
|             }; | ||||
|  | ||||
| @@ -129,16 +116,15 @@ impl<'t, 'u, 'i> WordPrefixPositionDocids<'t, 'u, 'i> { | ||||
|  | ||||
|         // We fetch the docids associated to the newly added word prefix fst only. | ||||
|         let db = self.index.word_position_docids.remap_data_type::<ByteSlice>(); | ||||
|         let mut new_prefixes_stream = prefix_fst.op().add(old_prefix_fst).difference(); | ||||
|         while let Some(prefix_bytes) = new_prefixes_stream.next() { | ||||
|             let prefix = str::from_utf8(prefix_bytes).map_err(|_| { | ||||
|         for prefix_bytes in new_prefix_fst_words { | ||||
|             let prefix = str::from_utf8(prefix_bytes.as_bytes()).map_err(|_| { | ||||
|                 SerializationError::Decoding { db_name: Some(WORDS_PREFIXES_FST_KEY) } | ||||
|             })?; | ||||
|  | ||||
|             // iter over all lines of the DB where the key is prefixed by the current prefix. | ||||
|             let iter = db | ||||
|                 .remap_key_type::<ByteSlice>() | ||||
|                 .prefix_iter(self.wtxn, prefix_bytes)? | ||||
|                 .prefix_iter(self.wtxn, prefix_bytes.as_bytes())? | ||||
|                 .remap_key_type::<StrBEU32Codec>(); | ||||
|             for result in iter { | ||||
|                 let ((word, pos), data) = result?; | ||||
| @@ -150,14 +136,12 @@ impl<'t, 'u, 'i> WordPrefixPositionDocids<'t, 'u, 'i> { | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         drop(new_prefixes_stream); | ||||
|  | ||||
|         // We remove all the entries that are no more required in this word prefix position | ||||
|         // docids database. | ||||
|         let mut iter = | ||||
|             self.index.word_prefix_position_docids.iter_mut(self.wtxn)?.lazily_decode_data(); | ||||
|         while let Some(((prefix, _), _)) = iter.next().transpose()? { | ||||
|             if suppr_pw.contains(prefix.as_bytes()) { | ||||
|             if del_prefix_fst_words.contains(prefix.as_bytes()) { | ||||
|                 unsafe { iter.del_current()? }; | ||||
|             } | ||||
|         } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user