Merge pull request #5959 from meilisearch/parallelize-word-prefix-docids

Parallelize the word prefix docids
This commit is contained in:
Clément Renault
2025-11-05 10:10:07 +00:00
committed by GitHub

View File

@@ -4,7 +4,7 @@ use std::io::{BufReader, BufWriter, Read, Seek, Write};
use std::iter;
use hashbrown::HashMap;
use heed::types::{Bytes, DecodeIgnore};
use heed::types::{Bytes, DecodeIgnore, Str};
use heed::{BytesDecode, Database, Error, RoTxn, RwTxn};
use rayon::iter::{IndexedParallelIterator as _, IntoParallelIterator, ParallelIterator as _};
use roaring::MultiOps;
@@ -16,22 +16,29 @@ use crate::heed_codec::StrBEU16Codec;
use crate::update::GrenadParameters;
use crate::{CboRoaringBitmapCodec, Index, Prefix, Result};
struct WordPrefixDocids {
struct WordPrefixDocids<'i> {
index: &'i Index,
database: Database<Bytes, CboRoaringBitmapCodec>,
prefix_database: Database<Bytes, CboRoaringBitmapCodec>,
max_memory_by_thread: Option<usize>,
/// Do not use an experimental LMDB feature to read uncommitted data in parallel.
no_experimental_post_processing: bool,
}
impl WordPrefixDocids {
impl<'i> WordPrefixDocids<'i> {
fn new(
index: &'i Index,
database: Database<Bytes, CboRoaringBitmapCodec>,
prefix_database: Database<Bytes, CboRoaringBitmapCodec>,
grenad_parameters: &GrenadParameters,
) -> WordPrefixDocids {
) -> WordPrefixDocids<'i> {
WordPrefixDocids {
index,
database,
prefix_database,
max_memory_by_thread: grenad_parameters.max_memory_by_thread(),
no_experimental_post_processing: grenad_parameters
.experimental_no_edition_2024_for_prefix_post_processing,
}
}
@@ -42,7 +49,77 @@ impl WordPrefixDocids {
prefix_to_delete: &BTreeSet<Prefix>,
) -> Result<()> {
delete_prefixes(wtxn, &self.prefix_database, prefix_to_delete)?;
self.recompute_modified_prefixes(wtxn, prefix_to_compute)
if self.no_experimental_post_processing {
self.recompute_modified_prefixes(wtxn, prefix_to_compute)
} else {
self.recompute_modified_prefixes_no_frozen(wtxn, prefix_to_compute)
}
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")]
fn recompute_modified_prefixes_no_frozen(
&self,
wtxn: &mut RwTxn,
prefix_to_compute: &BTreeSet<Prefix>,
) -> Result<()> {
let thread_count = rayon::current_num_threads();
let rtxns = iter::repeat_with(|| self.index.env.nested_read_txn(wtxn))
.take(thread_count)
.collect::<heed::Result<Vec<_>>>()?;
let outputs = rtxns
.into_par_iter()
.enumerate()
.map(|(thread_id, rtxn)| {
// `indexes` represent offsets at which prefixes computations were stored in the `file`.
let mut indexes = Vec::new();
let mut file = BufWriter::new(spooled_tempfile(
self.max_memory_by_thread.unwrap_or(usize::MAX),
));
let mut buffer = Vec::new();
for (prefix_index, prefix) in prefix_to_compute.iter().enumerate() {
// Is prefix for another thread?
if prefix_index % thread_count != thread_id {
continue;
}
let output = self
.database
.prefix_iter(&rtxn, prefix.as_bytes())?
.remap_types::<Str, CboRoaringBitmapCodec>()
.map(|result| result.map(|(_word, bitmap)| bitmap))
.union()?;
buffer.clear();
CboRoaringBitmapCodec::serialize_into_vec(&output, &mut buffer);
indexes.push(PrefixEntry { prefix, serialized_length: buffer.len() });
file.write_all(&buffer)?;
}
Ok((indexes, file))
})
.collect::<Result<Vec<_>>>()?;
// We iterate over all the collected and serialized bitmaps through
// the files and entries to eventually put them in the final database.
let mut buffer = Vec::new();
for (index, file) in outputs {
let mut file = file.into_inner().map_err(|e| e.into_error())?;
file.rewind()?;
let mut file = BufReader::new(file);
for PrefixEntry { prefix, serialized_length } in index {
buffer.resize(serialized_length, 0);
file.read_exact(&mut buffer)?;
self.prefix_database.remap_data_type::<Bytes>().put(
wtxn,
prefix.as_bytes(),
&buffer,
)?;
}
}
Ok(())
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")]
@@ -463,6 +540,7 @@ pub fn compute_word_prefix_docids(
grenad_parameters: &GrenadParameters,
) -> Result<()> {
WordPrefixDocids::new(
index,
index.word_docids.remap_key_type(),
index.word_prefix_docids.remap_key_type(),
grenad_parameters,
@@ -479,6 +557,7 @@ pub fn compute_exact_word_prefix_docids(
grenad_parameters: &GrenadParameters,
) -> Result<()> {
WordPrefixDocids::new(
index,
index.exact_word_docids.remap_key_type(),
index.exact_word_prefix_docids.remap_key_type(),
grenad_parameters,