From 229a12c8e6bbee30f70c126ae828097c90a3d38d Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Mon, 22 Sep 2025 16:49:33 +0200 Subject: [PATCH] Multithread word prefix position docids --- .../src/update/new/words_prefix_docids.rs | 178 ++++++++++++++++-- 1 file changed, 165 insertions(+), 13 deletions(-) diff --git a/crates/milli/src/update/new/words_prefix_docids.rs b/crates/milli/src/update/new/words_prefix_docids.rs index 27c485d71..9518dab8e 100644 --- a/crates/milli/src/update/new/words_prefix_docids.rs +++ b/crates/milli/src/update/new/words_prefix_docids.rs @@ -1,11 +1,12 @@ use std::cell::RefCell; use std::collections::BTreeSet; use std::io::{BufReader, BufWriter, Read, Seek, Write}; +use std::iter; use hashbrown::HashMap; use heed::types::Bytes; use heed::{BytesDecode, Database, Error, RoTxn, RwTxn}; -use rayon::iter::{IntoParallelIterator, ParallelIterator as _}; +use rayon::iter::{IndexedParallelIterator as _, IntoParallelIterator, ParallelIterator as _}; use roaring::MultiOps; use tempfile::spooled_tempfile; use thread_local::ThreadLocal; @@ -151,25 +152,35 @@ impl<'a, 'rtxn> FrozenPrefixBitmaps<'a, 'rtxn> { unsafe impl Sync for FrozenPrefixBitmaps<'_, '_> {} -struct WordPrefixIntegerDocids { +struct WordPrefixIntegerDocids<'i> { + index: &'i Index, database: Database, prefix_database: Database, max_memory_by_thread: Option, + read_uncommitted_in_parallel: bool, } -impl WordPrefixIntegerDocids { +impl<'i> WordPrefixIntegerDocids<'i> { fn new( + index: &'i Index, database: Database, prefix_database: Database, - grenad_parameters: &GrenadParameters, - ) -> WordPrefixIntegerDocids { + grenad_parameters: &'_ GrenadParameters, + ) -> WordPrefixIntegerDocids<'i> { WordPrefixIntegerDocids { + index, database, prefix_database, max_memory_by_thread: grenad_parameters.max_memory_by_thread(), + read_uncommitted_in_parallel: false, } } + /// Use an experimental LMDB feature to read uncommitted data in parallel. + fn read_uncommitted_in_parallel(&mut self, value: bool) { + self.read_uncommitted_in_parallel = value; + } + fn execute( self, wtxn: &mut heed::RwTxn, @@ -177,7 +188,144 @@ impl WordPrefixIntegerDocids { prefix_to_delete: &BTreeSet, ) -> Result<()> { delete_prefixes(wtxn, &self.prefix_database, prefix_to_delete)?; - self.recompute_modified_prefixes(wtxn, prefix_to_compute) + if self.read_uncommitted_in_parallel { + self.recompute_modified_prefixes_no_frozen(wtxn, prefix_to_compute) + } else { + self.recompute_modified_prefixes(wtxn, prefix_to_compute) + } + } + + /// Computes the same as `recompute_modified_prefixes`. + /// + /// ...but without aggregating the prefixes mmap pointers into a static HashMap + /// beforehand and rather use an experimental LMDB feature to read the subset + /// of prefixes in parallel from the uncommitted transaction. + #[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")] + fn recompute_modified_prefixes_no_frozen( + &self, + wtxn: &mut RwTxn, + prefixes: &BTreeSet, + ) -> Result<()> { + let thread_count = rayon::current_num_threads(); + let rtxns = iter::repeat_with(|| self.index.env.nested_read_txn(wtxn)) + .take(thread_count) + .collect::>>()?; + + let outputs = rtxns + .into_par_iter() + .enumerate() + .map(|(thread_id, rtxn)| { + // `indexes` represent offsets at which prefixes computations were stored in the `file`. + let mut indexes = Vec::new(); + let mut file = BufWriter::new(spooled_tempfile( + self.max_memory_by_thread.unwrap_or(usize::MAX), + )); + + let mut buffer = Vec::new(); + for (prefix_index, prefix) in prefixes.iter().enumerate() { + // Is prefix for another thread? + if prefix_index % thread_count != thread_id { + continue; + } + + let mut bitmaps_bytes = Vec::<&[u8]>::new(); + let mut prev_pos = None; + for result in self + .database + .remap_data_type::() + .prefix_iter(&rtxn, prefix.as_bytes())? + { + let (key, current_bitmap_bytes) = result?; + let (_word, pos) = + StrBEU16Codec::bytes_decode(key).map_err(Error::Decoding)?; + + if prev_pos.is_some_and(|p| p != pos) { + if bitmaps_bytes.is_empty() { + indexes.push(PrefixIntegerEntry { + prefix, + pos, + serialized_length: None, + }); + } else { + let output = bitmaps_bytes + .iter() + .map(|bytes| CboRoaringBitmapCodec::deserialize_from(bytes)) + .union()?; + buffer.clear(); + CboRoaringBitmapCodec::serialize_into_vec(&output, &mut buffer); + indexes.push(PrefixIntegerEntry { + prefix, + pos, + serialized_length: Some(buffer.len()), + }); + file.write_all(&buffer)?; + bitmaps_bytes.clear(); + } + } + + bitmaps_bytes.push(current_bitmap_bytes); + prev_pos = Some(pos); + } + + if let Some(pos) = prev_pos { + if bitmaps_bytes.is_empty() { + indexes.push(PrefixIntegerEntry { + prefix, + pos, + serialized_length: None, + }); + } else { + let output = bitmaps_bytes + .iter() + .map(|bytes| CboRoaringBitmapCodec::deserialize_from(bytes)) + .union()?; + buffer.clear(); + CboRoaringBitmapCodec::serialize_into_vec(&output, &mut buffer); + indexes.push(PrefixIntegerEntry { + prefix, + pos, + serialized_length: Some(buffer.len()), + }); + file.write_all(&buffer)?; + } + } + } + + Ok((indexes, file)) + }) + .collect::>>()?; + + // We iterate over all the collected and serialized bitmaps through + // the files and entries to eventually put them in the final database. + let mut key_buffer = Vec::new(); + let mut buffer = Vec::new(); + for (index, file) in outputs { + let mut file = file.into_inner().map_err(|e| e.into_error())?; + file.rewind()?; + let mut file = BufReader::new(file); + for PrefixIntegerEntry { prefix, pos, serialized_length } in index { + key_buffer.clear(); + key_buffer.extend_from_slice(prefix.as_bytes()); + key_buffer.push(0); + key_buffer.extend_from_slice(&pos.to_be_bytes()); + match serialized_length { + Some(serialized_length) => { + buffer.resize(serialized_length, 0); + file.read_exact(&mut buffer)?; + self.prefix_database.remap_data_type::().put( + wtxn, + &key_buffer, + &buffer, + )?; + } + None => { + self.prefix_database.delete(wtxn, &key_buffer)?; + } + } + } + } + + Ok(()) } #[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")] @@ -262,7 +410,7 @@ impl WordPrefixIntegerDocids { } } -/// Represents a prefix and the lenght the bitmap takes on disk. +/// Represents a prefix and the length the bitmap takes on disk. struct PrefixIntegerEntry<'a> { prefix: &'a str, pos: u16, @@ -362,12 +510,14 @@ pub fn compute_word_prefix_fid_docids( prefix_to_delete: &BTreeSet, grenad_parameters: &GrenadParameters, ) -> Result<()> { - WordPrefixIntegerDocids::new( + let mut builder = WordPrefixIntegerDocids::new( + index, index.word_fid_docids.remap_key_type(), index.word_prefix_fid_docids.remap_key_type(), grenad_parameters, - ) - .execute(wtxn, prefix_to_compute, prefix_to_delete) + ); + builder.read_uncommitted_in_parallel(true); + builder.execute(wtxn, prefix_to_compute, prefix_to_delete) } #[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")] @@ -378,10 +528,12 @@ pub fn compute_word_prefix_position_docids( prefix_to_delete: &BTreeSet, grenad_parameters: &GrenadParameters, ) -> Result<()> { - WordPrefixIntegerDocids::new( + let mut builder = WordPrefixIntegerDocids::new( + index, index.word_position_docids.remap_key_type(), index.word_prefix_position_docids.remap_key_type(), grenad_parameters, - ) - .execute(wtxn, prefix_to_compute, prefix_to_delete) + ); + builder.read_uncommitted_in_parallel(true); + builder.execute(wtxn, prefix_to_compute, prefix_to_delete) }