From fb79c324304ea093ba6e79b04444b0b1907f9511 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Thu, 27 Jan 2022 11:00:18 +0100 Subject: [PATCH] Compute the new, common and, deleted prefix words fst once --- milli/src/update/index_documents/mod.rs | 43 +++++++++++++++++-- milli/src/update/word_prefix_docids.rs | 37 +++++----------- .../word_prefix_pair_proximity_docids.rs | 43 ++++++------------- .../update/words_prefix_position_docids.rs | 40 ++++++----------- 4 files changed, 75 insertions(+), 88 deletions(-) diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index ee80d8ada..a31d1875b 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -12,6 +12,7 @@ use crossbeam_channel::{Receiver, Sender}; use log::debug; use roaring::RoaringBitmap; use serde::{Deserialize, Serialize}; +use slice_group_by::GroupBy; use typed_chunk::{write_typed_chunk_into_index, TypedChunk}; pub use self::helpers::{ @@ -420,6 +421,27 @@ where } builder.execute()?; + let current_prefix_fst = self.index.words_prefixes_fst(self.wtxn)?; + + // We retrieve the common words between the previous and new prefix word fst. + let common_prefix_fst_words = fst_stream_into_vec( + previous_words_prefixes_fst.op().add(¤t_prefix_fst).intersection(), + ); + let common_prefix_fst_words: Vec<_> = common_prefix_fst_words + .as_slice() + .linear_group_by_key(|x| x.chars().nth(0).unwrap()) + .collect(); + + // We retrieve the newly added words between the previous and new prefix word fst. + let new_prefix_fst_words = fst_stream_into_vec( + current_prefix_fst.op().add(&previous_words_prefixes_fst).difference(), + ); + + // We compute the set of prefixes that are no more part of the prefix fst. + let del_prefix_fst_words = fst_stream_into_hashset( + previous_words_prefixes_fst.op().add(¤t_prefix_fst).difference(), + ); + databases_seen += 1; (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { databases_seen, @@ -432,7 +454,12 @@ where builder.chunk_compression_level = self.indexer_config.chunk_compression_level; builder.max_nb_chunks = self.indexer_config.max_nb_chunks; builder.max_memory = self.indexer_config.max_memory; - builder.execute(word_docids, &previous_words_prefixes_fst)?; + builder.execute( + word_docids, + &new_prefix_fst_words, + &common_prefix_fst_words, + &del_prefix_fst_words, + )?; databases_seen += 1; (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { @@ -446,7 +473,12 @@ where builder.chunk_compression_level = self.indexer_config.chunk_compression_level; builder.max_nb_chunks = self.indexer_config.max_nb_chunks; builder.max_memory = self.indexer_config.max_memory; - builder.execute(word_pair_proximity_docids, &previous_words_prefixes_fst)?; + builder.execute( + word_pair_proximity_docids, + &new_prefix_fst_words, + &common_prefix_fst_words, + &del_prefix_fst_words, + )?; databases_seen += 1; (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { @@ -466,7 +498,12 @@ where if let Some(value) = self.config.words_positions_min_level_size { builder.min_level_size(value); } - builder.execute(word_position_docids, &previous_words_prefixes_fst)?; + builder.execute( + word_position_docids, + &new_prefix_fst_words, + &common_prefix_fst_words, + &del_prefix_fst_words, + )?; databases_seen += 1; (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { diff --git a/milli/src/update/word_prefix_docids.rs b/milli/src/update/word_prefix_docids.rs index cf50a5b8a..624037f8f 100644 --- a/milli/src/update/word_prefix_docids.rs +++ b/milli/src/update/word_prefix_docids.rs @@ -1,13 +1,10 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; -use fst::Streamer; use grenad::{CompressionType, MergerBuilder}; use heed::types::ByteSlice; -use slice_group_by::GroupBy; use crate::update::index_documents::{ - create_sorter, fst_stream_into_hashset, fst_stream_into_vec, merge_roaring_bitmaps, - sorter_into_lmdb_database, CursorClonableMmap, MergeFn, + create_sorter, merge_roaring_bitmaps, sorter_into_lmdb_database, CursorClonableMmap, MergeFn, }; use crate::{Index, Result}; @@ -36,24 +33,13 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> { } #[logging_timer::time("WordPrefixDocids::{}")] - pub fn execute>( + pub fn execute( self, new_word_docids: Vec>, - old_prefix_fst: &fst::Set, + new_prefix_fst_words: &[String], + common_prefix_fst_words: &[&[String]], + del_prefix_fst_words: &HashSet>, ) -> Result<()> { - let prefix_fst = self.index.words_prefixes_fst(self.wtxn)?; - - // We retrieve the common words between the previous and new prefix word fst. - let common_prefix_fst_keys = - fst_stream_into_vec(old_prefix_fst.op().add(&prefix_fst).intersection()); - let common_prefix_fst_keys: Vec<_> = common_prefix_fst_keys - .as_slice() - .linear_group_by_key(|x| x.chars().nth(0).unwrap()) - .collect(); - - // We compute the set of prefixes that are no more part of the prefix fst. - let suppr_pw = fst_stream_into_hashset(old_prefix_fst.op().add(&prefix_fst).difference()); - // It is forbidden to keep a mutable reference into the database // and write into it at the same time, therefore we write into another file. let mut prefix_docids_sorter = create_sorter( @@ -75,7 +61,7 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> { Some(prefixes) if word.starts_with(&prefixes[0].as_bytes()) => Some(prefixes), _otherwise => { write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_docids_sorter)?; - common_prefix_fst_keys + common_prefix_fst_words .iter() .find(|prefixes| word.starts_with(&prefixes[0].as_bytes())) } @@ -99,21 +85,18 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> { // We fetch the docids associated to the newly added word prefix fst only. let db = self.index.word_docids.remap_data_type::(); - let mut new_prefixes_stream = prefix_fst.op().add(old_prefix_fst).difference(); - while let Some(bytes) = new_prefixes_stream.next() { - let prefix = std::str::from_utf8(bytes)?; + for prefix in new_prefix_fst_words { + let prefix = std::str::from_utf8(prefix.as_bytes())?; for result in db.prefix_iter(self.wtxn, prefix)? { let (_word, data) = result?; prefix_docids_sorter.insert(prefix, data)?; } } - drop(new_prefixes_stream); - // We remove all the entries that are no more required in this word prefix docids database. let mut iter = self.index.word_prefix_docids.iter_mut(self.wtxn)?.lazily_decode_data(); while let Some((prefix, _)) = iter.next().transpose()? { - if suppr_pw.contains(prefix.as_bytes()) { + if del_prefix_fst_words.contains(prefix.as_bytes()) { unsafe { iter.del_current()? }; } } diff --git a/milli/src/update/word_prefix_pair_proximity_docids.rs b/milli/src/update/word_prefix_pair_proximity_docids.rs index 5b025e4fc..530c2867e 100644 --- a/milli/src/update/word_prefix_pair_proximity_docids.rs +++ b/milli/src/update/word_prefix_pair_proximity_docids.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use grenad::{CompressionType, MergerBuilder}; use heed::types::ByteSlice; @@ -7,8 +7,8 @@ use log::debug; use slice_group_by::GroupBy; use crate::update::index_documents::{ - create_sorter, fst_stream_into_hashset, fst_stream_into_vec, merge_cbo_roaring_bitmaps, - sorter_into_lmdb_database, CursorClonableMmap, MergeFn, + create_sorter, merge_cbo_roaring_bitmaps, sorter_into_lmdb_database, CursorClonableMmap, + MergeFn, }; use crate::{Index, Result, StrStrU8Codec}; @@ -62,40 +62,24 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { } #[logging_timer::time("WordPrefixPairProximityDocids::{}")] - pub fn execute>( + pub fn execute( self, new_word_pair_proximity_docids: Vec>, - old_prefix_fst: &fst::Set, + new_prefix_fst_words: &[String], + common_prefix_fst_words: &[&[String]], + del_prefix_fst_words: &HashSet>, ) -> Result<()> { debug!("Computing and writing the word prefix pair proximity docids into LMDB on disk..."); + let new_prefix_fst_words: Vec<_> = + new_prefix_fst_words.linear_group_by_key(|x| x.chars().nth(0).unwrap()).collect(); + // We retrieve and merge the created word pair proximities docids entries // for the newly added documents. let mut wppd_merger = MergerBuilder::new(merge_cbo_roaring_bitmaps); wppd_merger.extend(new_word_pair_proximity_docids); let mut wppd_iter = wppd_merger.build().into_merger_iter()?; - let prefix_fst = self.index.words_prefixes_fst(self.wtxn)?; - - // We retrieve the common words between the previous and new prefix word fst. - let common_prefix_fst_keys = - fst_stream_into_vec(old_prefix_fst.op().add(&prefix_fst).intersection()); - let common_prefix_fst_keys: Vec<_> = common_prefix_fst_keys - .as_slice() - .linear_group_by_key(|x| x.chars().nth(0).unwrap()) - .collect(); - - // We retrieve the newly added words between the previous and new prefix word fst. - let new_prefix_fst_keys = - fst_stream_into_vec(prefix_fst.op().add(old_prefix_fst).difference()); - let new_prefix_fst_keys: Vec<_> = new_prefix_fst_keys - .as_slice() - .linear_group_by_key(|x| x.chars().nth(0).unwrap()) - .collect(); - - // We compute the set of prefixes that are no more part of the prefix fst. - let suppr_pw = fst_stream_into_hashset(old_prefix_fst.op().add(&prefix_fst).difference()); - let mut word_prefix_pair_proximity_docids_sorter = create_sorter( merge_cbo_roaring_bitmaps, self.chunk_compression_type, @@ -120,7 +104,7 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { &mut current_prefixes, &mut prefixes_cache, &mut word_prefix_pair_proximity_docids_sorter, - &common_prefix_fst_keys, + common_prefix_fst_words, self.max_prefix_length, w1, w2, @@ -152,7 +136,7 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { &mut current_prefixes, &mut prefixes_cache, &mut word_prefix_pair_proximity_docids_sorter, - &new_prefix_fst_keys, + &new_prefix_fst_words, self.max_prefix_length, w1, w2, @@ -166,7 +150,6 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { &mut word_prefix_pair_proximity_docids_sorter, )?; - drop(prefix_fst); drop(db_iter); // All of the word prefix pairs in the database that have a w2 @@ -177,7 +160,7 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { .remap_data_type::() .iter_mut(self.wtxn)?; while let Some(((_, w2, _), _)) = iter.next().transpose()? { - if suppr_pw.contains(w2.as_bytes()) { + if del_prefix_fst_words.contains(w2.as_bytes()) { // Delete this entry as the w2 prefix is no more in the words prefix fst. unsafe { iter.del_current()? }; } diff --git a/milli/src/update/words_prefix_position_docids.rs b/milli/src/update/words_prefix_position_docids.rs index 178684cf0..c992d01ec 100644 --- a/milli/src/update/words_prefix_position_docids.rs +++ b/milli/src/update/words_prefix_position_docids.rs @@ -1,20 +1,18 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::num::NonZeroU32; use std::{cmp, str}; -use fst::Streamer; use grenad::{CompressionType, MergerBuilder}; use heed::types::ByteSlice; use heed::{BytesDecode, BytesEncode}; use log::debug; -use slice_group_by::GroupBy; use crate::error::SerializationError; use crate::heed_codec::StrBEU32Codec; use crate::index::main_key::WORDS_PREFIXES_FST_KEY; use crate::update::index_documents::{ - create_sorter, fst_stream_into_hashset, fst_stream_into_vec, merge_cbo_roaring_bitmaps, - sorter_into_lmdb_database, CursorClonableMmap, MergeFn, + create_sorter, merge_cbo_roaring_bitmaps, sorter_into_lmdb_database, CursorClonableMmap, + MergeFn, }; use crate::{Index, Result}; @@ -57,26 +55,15 @@ impl<'t, 'u, 'i> WordPrefixPositionDocids<'t, 'u, 'i> { } #[logging_timer::time("WordPrefixPositionDocids::{}")] - pub fn execute>( + pub fn execute( self, new_word_position_docids: Vec>, - old_prefix_fst: &fst::Set, + new_prefix_fst_words: &[String], + common_prefix_fst_words: &[&[String]], + del_prefix_fst_words: &HashSet>, ) -> Result<()> { debug!("Computing and writing the word levels positions docids into LMDB on disk..."); - let prefix_fst = self.index.words_prefixes_fst(self.wtxn)?; - - // We retrieve the common words between the previous and new prefix word fst. - let common_prefix_fst_keys = - fst_stream_into_vec(old_prefix_fst.op().add(&prefix_fst).intersection()); - let common_prefix_fst_keys: Vec<_> = common_prefix_fst_keys - .as_slice() - .linear_group_by_key(|x| x.chars().nth(0).unwrap()) - .collect(); - - // We compute the set of prefixes that are no more part of the prefix fst. - let suppr_pw = fst_stream_into_hashset(old_prefix_fst.op().add(&prefix_fst).difference()); - let mut prefix_position_docids_sorter = create_sorter( merge_cbo_roaring_bitmaps, self.chunk_compression_type, @@ -104,7 +91,7 @@ impl<'t, 'u, 'i> WordPrefixPositionDocids<'t, 'u, 'i> { &mut prefixes_cache, &mut prefix_position_docids_sorter, )?; - common_prefix_fst_keys.iter().find(|prefixes| word.starts_with(&prefixes[0])) + common_prefix_fst_words.iter().find(|prefixes| word.starts_with(&prefixes[0])) } }; @@ -129,16 +116,15 @@ impl<'t, 'u, 'i> WordPrefixPositionDocids<'t, 'u, 'i> { // We fetch the docids associated to the newly added word prefix fst only. let db = self.index.word_position_docids.remap_data_type::(); - let mut new_prefixes_stream = prefix_fst.op().add(old_prefix_fst).difference(); - while let Some(prefix_bytes) = new_prefixes_stream.next() { - let prefix = str::from_utf8(prefix_bytes).map_err(|_| { + for prefix_bytes in new_prefix_fst_words { + let prefix = str::from_utf8(prefix_bytes.as_bytes()).map_err(|_| { SerializationError::Decoding { db_name: Some(WORDS_PREFIXES_FST_KEY) } })?; // iter over all lines of the DB where the key is prefixed by the current prefix. let iter = db .remap_key_type::() - .prefix_iter(self.wtxn, prefix_bytes)? + .prefix_iter(self.wtxn, prefix_bytes.as_bytes())? .remap_key_type::(); for result in iter { let ((word, pos), data) = result?; @@ -150,14 +136,12 @@ impl<'t, 'u, 'i> WordPrefixPositionDocids<'t, 'u, 'i> { } } - drop(new_prefixes_stream); - // We remove all the entries that are no more required in this word prefix position // docids database. let mut iter = self.index.word_prefix_position_docids.iter_mut(self.wtxn)?.lazily_decode_data(); while let Some(((prefix, _), _)) = iter.next().transpose()? { - if suppr_pw.contains(prefix.as_bytes()) { + if del_prefix_fst_words.contains(prefix.as_bytes()) { unsafe { iter.del_current()? }; } }