Compare commits

...

5 Commits

Author SHA1 Message Date
Kerollmops
18ba2e39c0 Multithread word prefix position docids 2025-09-23 11:02:57 +02:00
Kerollmops
4fc9c3cd22 Make clippy happy 2025-09-22 15:22:16 +02:00
Kerollmops
8ce3aa431e Fix the algorithm 2025-09-22 15:04:30 +02:00
Kerollmops
a9c8ad57db Multi-thread the facet bulk processing 2025-09-22 15:04:30 +02:00
Kerollmops
9146122047 Patch heed to create multiple nested RoTxns 2025-09-22 15:04:30 +02:00
6 changed files with 1374 additions and 893 deletions

1899
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -50,3 +50,6 @@ opt-level = 3
opt-level = 3
[profile.dev.package.roaring]
opt-level = 3
[patch.crates-io]
heed = { git = "https://github.com/meilisearch/heed", branch = "allow-nested-rtxn-from-wtxn" }

View File

@@ -180,12 +180,15 @@ where
})
.unwrap()?;
post_processing::post_process(
indexing_context,
wtxn,
global_fields_ids_map,
facet_field_ids_delta,
)?;
pool.install(|| {
post_processing::post_process(
indexing_context,
wtxn,
global_fields_ids_map,
facet_field_ids_delta,
)
})
.unwrap()?;
indexing_context.progress.update_progress(IndexingStep::Finalizing);

View File

@@ -0,0 +1,164 @@
use std::fs::File;
use std::io::BufReader;
use std::{iter, mem};
use grenad::CompressionType;
use heed::types::{Bytes, LazyDecode};
use heed::{Database, RwTxn};
use rayon::prelude::*;
use roaring::MultiOps;
use tempfile::tempfile;
use crate::facet::FacetType;
use crate::heed_codec::facet::{FacetGroupKey, FacetGroupKeyCodec, FacetGroupValueCodec};
use crate::heed_codec::BytesRefCodec;
use crate::update::facet::{FACET_GROUP_SIZE, FACET_MIN_LEVEL_SIZE};
use crate::update::{create_writer, writer_into_reader};
use crate::{CboRoaringBitmapCodec, FieldId, Index};
/// Generate the facet level based on the level 0.
///
/// The function will generate all the group levels from
/// the group 1 to the level n until the number of group
/// is smaller than the minimum required size.
pub fn generate_facet_levels(
index: &Index,
wtxn: &mut RwTxn,
field_id: FieldId,
facet_type: FacetType,
) -> crate::Result<()> {
let db = match facet_type {
FacetType::String => index
.facet_id_string_docids
.remap_key_type::<FacetGroupKeyCodec<BytesRefCodec>>()
.lazily_decode_data(),
FacetType::Number => index
.facet_id_f64_docids
.remap_key_type::<FacetGroupKeyCodec<BytesRefCodec>>()
.lazily_decode_data(),
};
clear_levels(db, wtxn, field_id)?;
let mut base_level = 0;
// That's a do-while loop
while {
let mut level_size = 0;
for reader in compute_level(index, wtxn, db, field_id, base_level)? {
let mut cursor = reader.into_cursor()?;
while let Some((left_bound, facet_group_value)) = cursor.move_on_next()? {
level_size += 1;
let level = base_level.checked_add(1).unwrap();
let key = FacetGroupKey { field_id, level, left_bound };
debug_assert!(
db.get(wtxn, &key).transpose().is_none(),
"entry must not be there and must have already been deleted: {key:?}"
);
db.remap_data_type::<Bytes>().put(wtxn, &key, facet_group_value)?;
}
}
base_level += 1;
// If the next level will have the minimum required groups, continue.
(level_size / FACET_GROUP_SIZE as usize) >= FACET_MIN_LEVEL_SIZE as usize
} {}
Ok(())
}
/// Compute the groups of facets from the provided base level
/// and write the content into different grenad files.
fn compute_level(
index: &Index,
wtxn: &heed::RwTxn,
db: Database<FacetGroupKeyCodec<BytesRefCodec>, LazyDecode<FacetGroupValueCodec>>,
field_id: FieldId,
base_level: u8,
) -> Result<Vec<grenad::Reader<BufReader<File>>>, crate::Error> {
let thread_count = rayon::current_num_threads();
let rtxns = iter::repeat_with(|| index.env.nested_read_txn(wtxn))
.take(thread_count)
.collect::<heed::Result<Vec<_>>>()?;
let range = {
// Based on the first possible value for the base level up to
// the first possible value for the next level *excluded*.
let left = FacetGroupKey::<&[u8]> { field_id, level: base_level, left_bound: &[] };
let right = FacetGroupKey::<&[u8]> {
field_id,
level: base_level.checked_add(1).unwrap(),
left_bound: &[],
};
left..right
};
rtxns
.into_par_iter()
.enumerate()
.map(|(thread_id, rtxn)| {
let mut writer = tempfile().map(|f| create_writer(CompressionType::None, None, f))?;
let mut left_bound = None;
let mut group_docids = Vec::new();
let mut ser_buffer = Vec::new();
for (i, result) in db.range(&rtxn, &range)?.enumerate() {
let (key, lazy_value) = result?;
let start_of_group = i % FACET_GROUP_SIZE as usize == 0;
let group_index = i / FACET_GROUP_SIZE as usize;
let group_for_thread = group_index % thread_count == thread_id;
if group_for_thread {
if start_of_group {
if let Some(left_bound) = left_bound.take() {
// We store the bitmaps in a Vec this way we can use
// the MultiOps operations that tends to be more efficient
// for unions. The Vec is empty after the operation.
//
// We also don't forget to store the group size corresponding
// to the number of entries merged in this group.
ser_buffer.clear();
let group_len: u8 = group_docids.len().try_into().unwrap();
ser_buffer.push(group_len);
let group_docids = mem::take(&mut group_docids);
let docids = group_docids.into_iter().union();
CboRoaringBitmapCodec::serialize_into_vec(&docids, &mut ser_buffer);
writer.insert(left_bound, &ser_buffer)?;
}
left_bound = Some(key.left_bound);
}
// Lazily decode the bitmaps we are interested in.
let value = lazy_value.decode().map_err(heed::Error::Decoding)?;
group_docids.push(value.bitmap);
}
}
if let Some(left_bound) = left_bound.take() {
ser_buffer.clear();
// We don't forget to store the group size corresponding
// to the number of entries merged in this group.
let group_len: u8 = group_docids.len().try_into().unwrap();
ser_buffer.push(group_len);
let group_docids = group_docids.into_iter().union();
CboRoaringBitmapCodec::serialize_into_vec(&group_docids, &mut ser_buffer);
writer.insert(left_bound, &ser_buffer)?;
}
writer_into_reader(writer)
})
.collect()
}
/// Clears all the levels and only keeps the level 0 of the specified field id.
fn clear_levels(
db: Database<FacetGroupKeyCodec<BytesRefCodec>, LazyDecode<FacetGroupValueCodec>>,
wtxn: &mut RwTxn<'_>,
field_id: FieldId,
) -> heed::Result<()> {
let left = FacetGroupKey::<&[u8]> { field_id, level: 1, left_bound: &[] };
let right = FacetGroupKey::<&[u8]> { field_id, level: u8::MAX, left_bound: &[] };
let range = left..=right;
db.delete_range(wtxn, &range).map(drop)
}

View File

@@ -1,5 +1,6 @@
use std::cmp::Ordering;
use facet_bulk::generate_facet_levels;
use heed::types::{Bytes, DecodeIgnore, Str};
use heed::RwTxn;
use itertools::{merge_join_by, EitherOrBoth};
@@ -23,6 +24,8 @@ use crate::update::new::FacetFieldIdsDelta;
use crate::update::{FacetsUpdateBulk, GrenadParameters};
use crate::{GlobalFieldsIdsMap, Index, Result};
mod facet_bulk;
pub(super) fn post_process<MSP>(
indexing_context: IndexingContext<MSP>,
wtxn: &mut RwTxn<'_>,
@@ -239,9 +242,8 @@ fn compute_facet_level_database(
match delta {
FacetFieldIdDelta::Bulk => {
progress.update_progress(PostProcessingFacets::StringsBulk);
tracing::debug!(%fid, "bulk string facet processing");
FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::String)
.execute(wtxn)?
tracing::debug!(%fid, "bulk string facet processing in parallel");
generate_facet_levels(index, wtxn, fid, FacetType::String)?
}
FacetFieldIdDelta::Incremental(delta_data) => {
progress.update_progress(PostProcessingFacets::StringsIncremental);

View File

@@ -1,11 +1,12 @@
use std::cell::RefCell;
use std::collections::BTreeSet;
use std::io::{BufReader, BufWriter, Read, Seek, Write};
use std::iter;
use hashbrown::HashMap;
use heed::types::Bytes;
use heed::{BytesDecode, Database, Error, RoTxn, RwTxn};
use rayon::iter::{IntoParallelIterator, ParallelIterator as _};
use rayon::iter::{IndexedParallelIterator as _, IntoParallelIterator, ParallelIterator as _};
use roaring::MultiOps;
use tempfile::spooled_tempfile;
use thread_local::ThreadLocal;
@@ -151,25 +152,35 @@ impl<'a, 'rtxn> FrozenPrefixBitmaps<'a, 'rtxn> {
unsafe impl Sync for FrozenPrefixBitmaps<'_, '_> {}
struct WordPrefixIntegerDocids {
struct WordPrefixIntegerDocids<'i> {
index: &'i Index,
database: Database<Bytes, CboRoaringBitmapCodec>,
prefix_database: Database<Bytes, CboRoaringBitmapCodec>,
max_memory_by_thread: Option<usize>,
read_uncommitted_in_parallel: bool,
}
impl WordPrefixIntegerDocids {
impl<'i> WordPrefixIntegerDocids<'i> {
fn new(
index: &'i Index,
database: Database<Bytes, CboRoaringBitmapCodec>,
prefix_database: Database<Bytes, CboRoaringBitmapCodec>,
grenad_parameters: &GrenadParameters,
) -> WordPrefixIntegerDocids {
grenad_parameters: &'_ GrenadParameters,
) -> WordPrefixIntegerDocids<'i> {
WordPrefixIntegerDocids {
index,
database,
prefix_database,
max_memory_by_thread: grenad_parameters.max_memory_by_thread(),
read_uncommitted_in_parallel: false,
}
}
/// Use an experimental LMDB feature to read uncommitted data in parallel.
fn read_uncommitted_in_parallel(&mut self, value: bool) {
self.read_uncommitted_in_parallel = value;
}
fn execute(
self,
wtxn: &mut heed::RwTxn,
@@ -177,7 +188,144 @@ impl WordPrefixIntegerDocids {
prefix_to_delete: &BTreeSet<Prefix>,
) -> Result<()> {
delete_prefixes(wtxn, &self.prefix_database, prefix_to_delete)?;
self.recompute_modified_prefixes(wtxn, prefix_to_compute)
if self.read_uncommitted_in_parallel {
self.recompute_modified_prefixes_no_frozen(wtxn, prefix_to_compute)
} else {
self.recompute_modified_prefixes(wtxn, prefix_to_compute)
}
}
/// Computes the same as `recompute_modified_prefixes`.
///
/// ...but without aggregating the prefixes mmap pointers into a static HashMap
/// beforehand and rather use an experimental LMDB feature to read the subset
/// of prefixes in parallel from the uncommitted transaction.
#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")]
fn recompute_modified_prefixes_no_frozen(
&self,
wtxn: &mut RwTxn,
prefixes: &BTreeSet<Prefix>,
) -> Result<()> {
let thread_count = rayon::current_num_threads();
let rtxns = iter::repeat_with(|| self.index.env.nested_read_txn(wtxn))
.take(thread_count)
.collect::<heed::Result<Vec<_>>>()?;
let outputs = rtxns
.into_par_iter()
.enumerate()
.map(|(thread_id, rtxn)| {
// `indexes` represent offsets at which prefixes computations were stored in the `file`.
let mut indexes = Vec::new();
let mut file = BufWriter::new(spooled_tempfile(
self.max_memory_by_thread.unwrap_or(usize::MAX),
));
let mut buffer = Vec::new();
for (prefix_index, prefix) in prefixes.iter().enumerate() {
// Is prefix for another thread?
if prefix_index % thread_count != thread_id {
continue;
}
let mut bitmaps_bytes = Vec::<&[u8]>::new();
let mut prev_pos = None;
for result in self
.database
.remap_data_type::<Bytes>()
.prefix_iter(&rtxn, prefix.as_bytes())?
{
let (key, current_bitmap_bytes) = result?;
let (_word, pos) =
StrBEU16Codec::bytes_decode(key).map_err(Error::Decoding)?;
if prev_pos.is_some_and(|p| p != pos) {
if bitmaps_bytes.is_empty() {
indexes.push(PrefixIntegerEntry {
prefix,
pos,
serialized_length: None,
});
} else {
let output = bitmaps_bytes
.iter()
.map(|bytes| CboRoaringBitmapCodec::deserialize_from(bytes))
.union()?;
buffer.clear();
CboRoaringBitmapCodec::serialize_into_vec(&output, &mut buffer);
indexes.push(PrefixIntegerEntry {
prefix,
pos,
serialized_length: Some(buffer.len()),
});
file.write_all(&buffer)?;
bitmaps_bytes.clear();
}
}
bitmaps_bytes.push(current_bitmap_bytes);
prev_pos = Some(pos);
}
if let Some(pos) = prev_pos {
if bitmaps_bytes.is_empty() {
indexes.push(PrefixIntegerEntry {
prefix,
pos,
serialized_length: None,
});
} else {
let output = bitmaps_bytes
.iter()
.map(|bytes| CboRoaringBitmapCodec::deserialize_from(bytes))
.union()?;
buffer.clear();
CboRoaringBitmapCodec::serialize_into_vec(&output, &mut buffer);
indexes.push(PrefixIntegerEntry {
prefix,
pos,
serialized_length: Some(buffer.len()),
});
file.write_all(&buffer)?;
}
}
}
Ok((indexes, file))
})
.collect::<Result<Vec<_>>>()?;
// We iterate over all the collected and serialized bitmaps through
// the files and entries to eventually put them in the final database.
let mut key_buffer = Vec::new();
let mut buffer = Vec::new();
for (index, file) in outputs {
let mut file = file.into_inner().map_err(|e| e.into_error())?;
file.rewind()?;
let mut file = BufReader::new(file);
for PrefixIntegerEntry { prefix, pos, serialized_length } in index {
key_buffer.clear();
key_buffer.extend_from_slice(prefix.as_bytes());
key_buffer.push(0);
key_buffer.extend_from_slice(&pos.to_be_bytes());
match serialized_length {
Some(serialized_length) => {
buffer.resize(serialized_length, 0);
file.read_exact(&mut buffer)?;
self.prefix_database.remap_data_type::<Bytes>().put(
wtxn,
&key_buffer,
&buffer,
)?;
}
None => {
self.prefix_database.delete(wtxn, &key_buffer)?;
}
}
}
}
Ok(())
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")]
@@ -262,7 +410,7 @@ impl WordPrefixIntegerDocids {
}
}
/// Represents a prefix and the lenght the bitmap takes on disk.
/// Represents a prefix and the length the bitmap takes on disk.
struct PrefixIntegerEntry<'a> {
prefix: &'a str,
pos: u16,
@@ -362,12 +510,14 @@ pub fn compute_word_prefix_fid_docids(
prefix_to_delete: &BTreeSet<Prefix>,
grenad_parameters: &GrenadParameters,
) -> Result<()> {
WordPrefixIntegerDocids::new(
let mut builder = WordPrefixIntegerDocids::new(
index,
index.word_fid_docids.remap_key_type(),
index.word_prefix_fid_docids.remap_key_type(),
grenad_parameters,
)
.execute(wtxn, prefix_to_compute, prefix_to_delete)
);
builder.read_uncommitted_in_parallel(true);
builder.execute(wtxn, prefix_to_compute, prefix_to_delete)
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")]
@@ -378,10 +528,12 @@ pub fn compute_word_prefix_position_docids(
prefix_to_delete: &BTreeSet<Prefix>,
grenad_parameters: &GrenadParameters,
) -> Result<()> {
WordPrefixIntegerDocids::new(
let mut builder = WordPrefixIntegerDocids::new(
index,
index.word_position_docids.remap_key_type(),
index.word_prefix_position_docids.remap_key_type(),
grenad_parameters,
)
.execute(wtxn, prefix_to_compute, prefix_to_delete)
);
builder.read_uncommitted_in_parallel(true);
builder.execute(wtxn, prefix_to_compute, prefix_to_delete)
}