Multithread word prefix position docids

This commit is contained in:
Kerollmops
2025-09-22 16:49:33 +02:00
committed by Clément Renault
parent 4fc9c3cd22
commit 74f57db577
2 changed files with 1201 additions and 882 deletions

1905
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,11 +1,12 @@
use std::cell::RefCell; use std::cell::RefCell;
use std::collections::BTreeSet; use std::collections::BTreeSet;
use std::io::{BufReader, BufWriter, Read, Seek, Write}; use std::io::{BufReader, BufWriter, Read, Seek, Write};
use std::iter;
use hashbrown::HashMap; use hashbrown::HashMap;
use heed::types::Bytes; use heed::types::Bytes;
use heed::{BytesDecode, Database, Error, RoTxn, RwTxn}; use heed::{BytesDecode, Database, Error, RoTxn, RwTxn};
use rayon::iter::{IntoParallelIterator, ParallelIterator as _}; use rayon::iter::{IndexedParallelIterator as _, IntoParallelIterator, ParallelIterator as _};
use roaring::MultiOps; use roaring::MultiOps;
use tempfile::spooled_tempfile; use tempfile::spooled_tempfile;
use thread_local::ThreadLocal; use thread_local::ThreadLocal;
@@ -151,25 +152,35 @@ impl<'a, 'rtxn> FrozenPrefixBitmaps<'a, 'rtxn> {
unsafe impl Sync for FrozenPrefixBitmaps<'_, '_> {} unsafe impl Sync for FrozenPrefixBitmaps<'_, '_> {}
struct WordPrefixIntegerDocids { struct WordPrefixIntegerDocids<'i> {
index: &'i Index,
database: Database<Bytes, CboRoaringBitmapCodec>, database: Database<Bytes, CboRoaringBitmapCodec>,
prefix_database: Database<Bytes, CboRoaringBitmapCodec>, prefix_database: Database<Bytes, CboRoaringBitmapCodec>,
max_memory_by_thread: Option<usize>, max_memory_by_thread: Option<usize>,
read_uncommitted_in_parallel: bool,
} }
impl WordPrefixIntegerDocids { impl<'i> WordPrefixIntegerDocids<'i> {
fn new( fn new(
index: &'i Index,
database: Database<Bytes, CboRoaringBitmapCodec>, database: Database<Bytes, CboRoaringBitmapCodec>,
prefix_database: Database<Bytes, CboRoaringBitmapCodec>, prefix_database: Database<Bytes, CboRoaringBitmapCodec>,
grenad_parameters: &GrenadParameters, grenad_parameters: &'_ GrenadParameters,
) -> WordPrefixIntegerDocids { ) -> WordPrefixIntegerDocids<'i> {
WordPrefixIntegerDocids { WordPrefixIntegerDocids {
index,
database, database,
prefix_database, prefix_database,
max_memory_by_thread: grenad_parameters.max_memory_by_thread(), max_memory_by_thread: grenad_parameters.max_memory_by_thread(),
read_uncommitted_in_parallel: false,
} }
} }
/// Use an experimental LMDB feature to read uncommitted data in parallel.
fn read_uncommitted_in_parallel(&mut self, value: bool) {
self.read_uncommitted_in_parallel = value;
}
fn execute( fn execute(
self, self,
wtxn: &mut heed::RwTxn, wtxn: &mut heed::RwTxn,
@@ -177,8 +188,145 @@ impl WordPrefixIntegerDocids {
prefix_to_delete: &BTreeSet<Prefix>, prefix_to_delete: &BTreeSet<Prefix>,
) -> Result<()> { ) -> Result<()> {
delete_prefixes(wtxn, &self.prefix_database, prefix_to_delete)?; delete_prefixes(wtxn, &self.prefix_database, prefix_to_delete)?;
if self.read_uncommitted_in_parallel {
self.recompute_modified_prefixes_no_frozen(wtxn, prefix_to_compute)
} else {
self.recompute_modified_prefixes(wtxn, prefix_to_compute) self.recompute_modified_prefixes(wtxn, prefix_to_compute)
} }
}
/// Computes the same as `recompute_modified_prefixes`.
///
/// ...but without aggregating the prefixes mmap pointers into a static HashMap
/// beforehand and rather use an experimental LMDB feature to read the subset
/// of prefixes in parallel from the uncommitted transaction.
#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")]
fn recompute_modified_prefixes_no_frozen(
&self,
wtxn: &mut RwTxn,
prefixes: &BTreeSet<Prefix>,
) -> Result<()> {
let thread_count = rayon::current_num_threads();
let rtxns = iter::repeat_with(|| self.index.env.nested_read_txn(wtxn))
.take(thread_count)
.collect::<heed::Result<Vec<_>>>()?;
let outputs = rtxns
.into_par_iter()
.enumerate()
.map(|(thread_id, rtxn)| {
// `indexes` represent offsets at which prefixes computations were stored in the `file`.
let mut indexes = Vec::new();
let mut file = BufWriter::new(spooled_tempfile(
self.max_memory_by_thread.unwrap_or(usize::MAX),
));
let mut buffer = Vec::new();
for (prefix_index, prefix) in prefixes.iter().enumerate() {
// Is prefix for another thread?
if prefix_index % thread_count != thread_id {
continue;
}
let mut bitmaps_bytes = Vec::<&[u8]>::new();
let mut prev_pos = None;
for result in self
.database
.remap_data_type::<Bytes>()
.prefix_iter(&rtxn, prefix.as_bytes())?
{
let (key, current_bitmap_bytes) = result?;
let (_word, pos) =
StrBEU16Codec::bytes_decode(key).map_err(Error::Decoding)?;
if prev_pos.is_some_and(|p| p != pos) {
if bitmaps_bytes.is_empty() {
indexes.push(PrefixIntegerEntry {
prefix,
pos,
serialized_length: None,
});
} else {
let output = bitmaps_bytes
.iter()
.map(|bytes| CboRoaringBitmapCodec::deserialize_from(bytes))
.union()?;
buffer.clear();
CboRoaringBitmapCodec::serialize_into_vec(&output, &mut buffer);
indexes.push(PrefixIntegerEntry {
prefix,
pos,
serialized_length: Some(buffer.len()),
});
file.write_all(&buffer)?;
bitmaps_bytes.clear();
}
}
bitmaps_bytes.push(current_bitmap_bytes);
prev_pos = Some(pos);
}
if let Some(pos) = prev_pos {
if bitmaps_bytes.is_empty() {
indexes.push(PrefixIntegerEntry {
prefix,
pos,
serialized_length: None,
});
} else {
let output = bitmaps_bytes
.iter()
.map(|bytes| CboRoaringBitmapCodec::deserialize_from(bytes))
.union()?;
buffer.clear();
CboRoaringBitmapCodec::serialize_into_vec(&output, &mut buffer);
indexes.push(PrefixIntegerEntry {
prefix,
pos,
serialized_length: Some(buffer.len()),
});
file.write_all(&buffer)?;
}
}
}
Ok((indexes, file))
})
.collect::<Result<Vec<_>>>()?;
// We iterate over all the collected and serialized bitmaps through
// the files and entries to eventually put them in the final database.
let mut key_buffer = Vec::new();
let mut buffer = Vec::new();
for (index, file) in outputs {
let mut file = file.into_inner().map_err(|e| e.into_error())?;
file.rewind()?;
let mut file = BufReader::new(file);
for PrefixIntegerEntry { prefix, pos, serialized_length } in index {
key_buffer.clear();
key_buffer.extend_from_slice(prefix.as_bytes());
key_buffer.push(0);
key_buffer.extend_from_slice(&pos.to_be_bytes());
match serialized_length {
Some(serialized_length) => {
buffer.resize(serialized_length, 0);
file.read_exact(&mut buffer)?;
self.prefix_database.remap_data_type::<Bytes>().put(
wtxn,
&key_buffer,
&buffer,
)?;
}
None => {
self.prefix_database.delete(wtxn, &key_buffer)?;
}
}
}
}
Ok(())
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")]
fn recompute_modified_prefixes( fn recompute_modified_prefixes(
@@ -262,7 +410,7 @@ impl WordPrefixIntegerDocids {
} }
} }
/// Represents a prefix and the lenght the bitmap takes on disk. /// Represents a prefix and the length the bitmap takes on disk.
struct PrefixIntegerEntry<'a> { struct PrefixIntegerEntry<'a> {
prefix: &'a str, prefix: &'a str,
pos: u16, pos: u16,
@@ -362,12 +510,14 @@ pub fn compute_word_prefix_fid_docids(
prefix_to_delete: &BTreeSet<Prefix>, prefix_to_delete: &BTreeSet<Prefix>,
grenad_parameters: &GrenadParameters, grenad_parameters: &GrenadParameters,
) -> Result<()> { ) -> Result<()> {
WordPrefixIntegerDocids::new( let mut builder = WordPrefixIntegerDocids::new(
index,
index.word_fid_docids.remap_key_type(), index.word_fid_docids.remap_key_type(),
index.word_prefix_fid_docids.remap_key_type(), index.word_prefix_fid_docids.remap_key_type(),
grenad_parameters, grenad_parameters,
) );
.execute(wtxn, prefix_to_compute, prefix_to_delete) builder.read_uncommitted_in_parallel(true);
builder.execute(wtxn, prefix_to_compute, prefix_to_delete)
} }
#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")]
@@ -378,10 +528,12 @@ pub fn compute_word_prefix_position_docids(
prefix_to_delete: &BTreeSet<Prefix>, prefix_to_delete: &BTreeSet<Prefix>,
grenad_parameters: &GrenadParameters, grenad_parameters: &GrenadParameters,
) -> Result<()> { ) -> Result<()> {
WordPrefixIntegerDocids::new( let mut builder = WordPrefixIntegerDocids::new(
index,
index.word_position_docids.remap_key_type(), index.word_position_docids.remap_key_type(),
index.word_prefix_position_docids.remap_key_type(), index.word_prefix_position_docids.remap_key_type(),
grenad_parameters, grenad_parameters,
) );
.execute(wtxn, prefix_to_compute, prefix_to_delete) builder.read_uncommitted_in_parallel(true);
builder.execute(wtxn, prefix_to_compute, prefix_to_delete)
} }