mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-10-10 05:36:35 +00:00
Merge pull request #5307 from meilisearch/parallel-bulk-facets
Parallelize bulk facets & word prefix fid/position docids
This commit is contained in:
21
Cargo.lock
generated
21
Cargo.lock
generated
@@ -453,9 +453,9 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50"
|
||||
|
||||
[[package]]
|
||||
name = "arroy"
|
||||
version = "0.6.3"
|
||||
version = "0.6.4-nested-rtxns"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8578a72223dfa13dfd9fc144d15260d134361789ebdea9b16e85a511edc73c7d"
|
||||
checksum = "fb8b6b34d9d83e3b837cb7f72a439dbd2293b102393c084af5e5b097212e1532"
|
||||
dependencies = [
|
||||
"bytemuck",
|
||||
"byteorder",
|
||||
@@ -1075,9 +1075,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cellulite"
|
||||
version = "0.3.0"
|
||||
version = "0.3.1-nested-rtxns"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "71a41aa2cd021bc3f23d97cc1e645848ca8c279fc757d1570ba7fe7ddc021290"
|
||||
checksum = "db298d57a80b9284327800b394ee3921307c2fdda87c6d37202f5cf400478981"
|
||||
dependencies = [
|
||||
"crossbeam",
|
||||
"geo",
|
||||
@@ -2758,9 +2758,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "hannoy"
|
||||
version = "0.0.8"
|
||||
version = "0.0.9-nested-rtxns"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0dba13a271c49a119a97862ebf0a74131d879832868400d9fcd937b790058fdd"
|
||||
checksum = "cc5a945b92b063e677d658cfcc7cb6dec2502fe44631f017084938f14d6ce30e"
|
||||
dependencies = [
|
||||
"bytemuck",
|
||||
"byteorder",
|
||||
@@ -2838,9 +2838,9 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
|
||||
|
||||
[[package]]
|
||||
name = "heed"
|
||||
version = "0.22.0"
|
||||
version = "0.22.1-nested-rtxns"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6a56c94661ddfb51aa9cdfbf102cfcc340aa69267f95ebccc4af08d7c530d393"
|
||||
checksum = "0ff115ba5712b1f1fc7617b195f5c2f139e29c397ff79da040cd19db75ccc240"
|
||||
dependencies = [
|
||||
"bitflags 2.9.4",
|
||||
"byteorder",
|
||||
@@ -2850,7 +2850,6 @@ dependencies = [
|
||||
"lmdb-master-sys",
|
||||
"once_cell",
|
||||
"page_size",
|
||||
"serde",
|
||||
"synchronoise",
|
||||
"url",
|
||||
]
|
||||
@@ -3889,9 +3888,9 @@ checksum = "241eaef5fd12c88705a01fc1066c48c4b36e0dd4377dcdc7ec3942cea7a69956"
|
||||
|
||||
[[package]]
|
||||
name = "lmdb-master-sys"
|
||||
version = "0.2.5"
|
||||
version = "0.2.6-nested-rtxns"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "864808e0b19fb6dd3b70ba94ee671b82fce17554cf80aeb0a155c65bb08027df"
|
||||
checksum = "f4ff85130e3c994b36877045fbbb138d521dea7197bfc19dc3d5d95101a8e20a"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"doxygen-rs",
|
||||
|
@@ -205,6 +205,8 @@ struct Infos {
|
||||
experimental_no_snapshot_compaction: bool,
|
||||
experimental_no_edition_2024_for_dumps: bool,
|
||||
experimental_no_edition_2024_for_settings: bool,
|
||||
experimental_no_edition_2024_for_prefix_post_processing: bool,
|
||||
experimental_no_edition_2024_for_facet_post_processing: bool,
|
||||
experimental_vector_store_setting: bool,
|
||||
gpu_enabled: bool,
|
||||
db_path: bool,
|
||||
@@ -296,6 +298,8 @@ impl Infos {
|
||||
skip_index_budget: _,
|
||||
experimental_no_edition_2024_for_settings,
|
||||
experimental_no_edition_2024_for_dumps,
|
||||
experimental_no_edition_2024_for_prefix_post_processing,
|
||||
experimental_no_edition_2024_for_facet_post_processing,
|
||||
} = indexer_options;
|
||||
|
||||
let RuntimeTogglableFeatures {
|
||||
@@ -365,6 +369,8 @@ impl Infos {
|
||||
ssl_resumption,
|
||||
ssl_tickets,
|
||||
experimental_no_edition_2024_for_settings,
|
||||
experimental_no_edition_2024_for_prefix_post_processing,
|
||||
experimental_no_edition_2024_for_facet_post_processing,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -55,6 +55,10 @@ const MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE: &str = "MEILI_EXPERIMENTAL_ENABLE_LO
|
||||
const MEILI_EXPERIMENTAL_CONTAINS_FILTER: &str = "MEILI_EXPERIMENTAL_CONTAINS_FILTER";
|
||||
const MEILI_EXPERIMENTAL_NO_EDITION_2024_FOR_SETTINGS: &str =
|
||||
"MEILI_EXPERIMENTAL_NO_EDITION_2024_FOR_SETTINGS";
|
||||
const MEILI_EXPERIMENTAL_NO_EDITION_2024_FOR_FACET_POST_PROCESSING: &str =
|
||||
"MEILI_EXPERIMENTAL_NO_EDITION_2024_FOR_FACET_POST_PROCESSING";
|
||||
const MEILI_EXPERIMENTAL_NO_EDITION_2024_FOR_PREFIX_POST_PROCESSING: &str =
|
||||
"MEILI_EXPERIMENTAL_NO_EDITION_2024_FOR_PREFIX_POST_PROCESSING";
|
||||
const MEILI_EXPERIMENTAL_ENABLE_METRICS: &str = "MEILI_EXPERIMENTAL_ENABLE_METRICS";
|
||||
const MEILI_EXPERIMENTAL_SEARCH_QUEUE_SIZE: &str = "MEILI_EXPERIMENTAL_SEARCH_QUEUE_SIZE";
|
||||
const MEILI_EXPERIMENTAL_DROP_SEARCH_AFTER: &str = "MEILI_EXPERIMENTAL_DROP_SEARCH_AFTER";
|
||||
@@ -772,6 +776,22 @@ pub struct IndexerOpts {
|
||||
#[clap(long, env = MEILI_EXPERIMENTAL_NO_EDITION_2024_FOR_DUMPS)]
|
||||
#[serde(default)]
|
||||
pub experimental_no_edition_2024_for_dumps: bool,
|
||||
|
||||
/// Experimental no edition 2024 to compute prefixes. For more information,
|
||||
/// see: <https://github.com/orgs/meilisearch/discussions/862>
|
||||
///
|
||||
/// Enables the experimental no edition 2024 to compute prefixes.
|
||||
#[clap(long, env = MEILI_EXPERIMENTAL_NO_EDITION_2024_FOR_PREFIX_POST_PROCESSING)]
|
||||
#[serde(default)]
|
||||
pub experimental_no_edition_2024_for_prefix_post_processing: bool,
|
||||
|
||||
/// Experimental no edition 2024 to compute facets. For more information,
|
||||
/// see: <https://github.com/orgs/meilisearch/discussions/862>
|
||||
///
|
||||
/// Enables the experimental no edition 2024 to compute facets.
|
||||
#[clap(long, env = MEILI_EXPERIMENTAL_NO_EDITION_2024_FOR_FACET_POST_PROCESSING)]
|
||||
#[serde(default)]
|
||||
pub experimental_no_edition_2024_for_facet_post_processing: bool,
|
||||
}
|
||||
|
||||
impl IndexerOpts {
|
||||
@@ -783,6 +803,8 @@ impl IndexerOpts {
|
||||
skip_index_budget: _,
|
||||
experimental_no_edition_2024_for_settings,
|
||||
experimental_no_edition_2024_for_dumps,
|
||||
experimental_no_edition_2024_for_prefix_post_processing,
|
||||
experimental_no_edition_2024_for_facet_post_processing,
|
||||
} = self;
|
||||
if let Some(max_indexing_memory) = max_indexing_memory.0 {
|
||||
export_to_env_if_not_present(
|
||||
@@ -808,6 +830,18 @@ impl IndexerOpts {
|
||||
experimental_no_edition_2024_for_dumps.to_string(),
|
||||
);
|
||||
}
|
||||
if experimental_no_edition_2024_for_prefix_post_processing {
|
||||
export_to_env_if_not_present(
|
||||
MEILI_EXPERIMENTAL_NO_EDITION_2024_FOR_PREFIX_POST_PROCESSING,
|
||||
experimental_no_edition_2024_for_prefix_post_processing.to_string(),
|
||||
);
|
||||
}
|
||||
if experimental_no_edition_2024_for_facet_post_processing {
|
||||
export_to_env_if_not_present(
|
||||
MEILI_EXPERIMENTAL_NO_EDITION_2024_FOR_FACET_POST_PROCESSING,
|
||||
experimental_no_edition_2024_for_facet_post_processing.to_string(),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -833,6 +867,10 @@ impl TryFrom<&IndexerOpts> for IndexerConfig {
|
||||
chunk_compression_level: Default::default(),
|
||||
documents_chunk_size: Default::default(),
|
||||
max_nb_chunks: Default::default(),
|
||||
experimental_no_edition_2024_for_prefix_post_processing: other
|
||||
.experimental_no_edition_2024_for_prefix_post_processing,
|
||||
experimental_no_edition_2024_for_facet_post_processing: other
|
||||
.experimental_no_edition_2024_for_facet_post_processing,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@@ -490,6 +490,8 @@ pub fn default_settings(dir: impl AsRef<Path>) -> Opt {
|
||||
max_indexing_threads: MaxThreads::from_str("2").unwrap(),
|
||||
experimental_no_edition_2024_for_settings: false,
|
||||
experimental_no_edition_2024_for_dumps: false,
|
||||
experimental_no_edition_2024_for_prefix_post_processing: false,
|
||||
experimental_no_edition_2024_for_facet_post_processing: false,
|
||||
},
|
||||
experimental_enable_metrics: false,
|
||||
..Parser::parse_from(None as Option<&str>)
|
||||
|
@@ -19,7 +19,7 @@ bstr = "1.12.0"
|
||||
bytemuck = { version = "1.23.1", features = ["extern_crate_alloc"] }
|
||||
byteorder = "1.5.0"
|
||||
charabia = { version = "0.9.7", default-features = false }
|
||||
cellulite = "0.3.0"
|
||||
cellulite = "0.3.1-nested-rtxns"
|
||||
concat-arrays = "0.1.2"
|
||||
convert_case = "0.8.0"
|
||||
crossbeam-channel = "0.5.15"
|
||||
@@ -34,7 +34,7 @@ grenad = { version = "0.5.0", default-features = false, features = [
|
||||
"rayon",
|
||||
"tempfile",
|
||||
] }
|
||||
heed = { version = "0.22.0", default-features = false, features = [
|
||||
heed = { version = "0.22.1-nested-rtxns", default-features = false, features = [
|
||||
"serde-json",
|
||||
"serde-bincode",
|
||||
] }
|
||||
@@ -89,8 +89,8 @@ rhai = { version = "1.22.2", features = [
|
||||
"no_time",
|
||||
"sync",
|
||||
] }
|
||||
arroy = "0.6.3"
|
||||
hannoy = { version = "0.0.8", features = ["arroy"] }
|
||||
arroy = "0.6.4-nested-rtxns"
|
||||
hannoy = { version = "0.0.9-nested-rtxns", features = ["arroy"] }
|
||||
rand = "0.8.5"
|
||||
tracing = "0.1.41"
|
||||
ureq = { version = "2.12.1", features = ["json"] }
|
||||
|
@@ -101,6 +101,8 @@ pub struct GrenadParameters {
|
||||
pub chunk_compression_level: Option<u32>,
|
||||
pub max_memory: Option<usize>,
|
||||
pub max_nb_chunks: Option<usize>,
|
||||
pub experimental_no_edition_2024_for_prefix_post_processing: bool,
|
||||
pub experimental_no_edition_2024_for_facet_post_processing: bool,
|
||||
}
|
||||
|
||||
impl Default for GrenadParameters {
|
||||
@@ -110,6 +112,8 @@ impl Default for GrenadParameters {
|
||||
chunk_compression_level: None,
|
||||
max_memory: None,
|
||||
max_nb_chunks: None,
|
||||
experimental_no_edition_2024_for_prefix_post_processing: false,
|
||||
experimental_no_edition_2024_for_facet_post_processing: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -254,6 +254,12 @@ where
|
||||
chunk_compression_level: self.indexer_config.chunk_compression_level,
|
||||
max_memory: self.indexer_config.max_memory,
|
||||
max_nb_chunks: self.indexer_config.max_nb_chunks, // default value, may be chosen.
|
||||
experimental_no_edition_2024_for_prefix_post_processing: self
|
||||
.indexer_config
|
||||
.experimental_no_edition_2024_for_prefix_post_processing,
|
||||
experimental_no_edition_2024_for_facet_post_processing: self
|
||||
.indexer_config
|
||||
.experimental_no_edition_2024_for_facet_post_processing,
|
||||
};
|
||||
let documents_chunk_size = match self.indexer_config.documents_chunk_size {
|
||||
Some(chunk_size) => chunk_size,
|
||||
|
@@ -983,6 +983,12 @@ impl<'a, 'i> Transform<'a, 'i> {
|
||||
chunk_compression_level: self.indexer_settings.chunk_compression_level,
|
||||
max_memory: self.indexer_settings.max_memory,
|
||||
max_nb_chunks: self.indexer_settings.max_nb_chunks, // default value, may be chosen.
|
||||
experimental_no_edition_2024_for_prefix_post_processing: self
|
||||
.indexer_settings
|
||||
.experimental_no_edition_2024_for_prefix_post_processing,
|
||||
experimental_no_edition_2024_for_facet_post_processing: self
|
||||
.indexer_settings
|
||||
.experimental_no_edition_2024_for_facet_post_processing,
|
||||
};
|
||||
|
||||
// Once we have written all the documents, we merge everything into a Reader.
|
||||
|
@@ -18,6 +18,8 @@ pub struct IndexerConfig {
|
||||
pub skip_index_budget: bool,
|
||||
pub experimental_no_edition_2024_for_settings: bool,
|
||||
pub experimental_no_edition_2024_for_dumps: bool,
|
||||
pub experimental_no_edition_2024_for_prefix_post_processing: bool,
|
||||
pub experimental_no_edition_2024_for_facet_post_processing: bool,
|
||||
}
|
||||
|
||||
impl IndexerConfig {
|
||||
@@ -27,6 +29,10 @@ impl IndexerConfig {
|
||||
chunk_compression_level: self.chunk_compression_level,
|
||||
max_memory: self.max_memory,
|
||||
max_nb_chunks: self.max_nb_chunks,
|
||||
experimental_no_edition_2024_for_prefix_post_processing: self
|
||||
.experimental_no_edition_2024_for_prefix_post_processing,
|
||||
experimental_no_edition_2024_for_facet_post_processing: self
|
||||
.experimental_no_edition_2024_for_facet_post_processing,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -68,6 +74,8 @@ impl Default for IndexerConfig {
|
||||
skip_index_budget: false,
|
||||
experimental_no_edition_2024_for_settings: false,
|
||||
experimental_no_edition_2024_for_dumps: false,
|
||||
experimental_no_edition_2024_for_prefix_post_processing: false,
|
||||
experimental_no_edition_2024_for_facet_post_processing: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -180,12 +180,15 @@ where
|
||||
})
|
||||
.unwrap()?;
|
||||
|
||||
post_processing::post_process(
|
||||
indexing_context,
|
||||
wtxn,
|
||||
global_fields_ids_map,
|
||||
facet_field_ids_delta,
|
||||
)?;
|
||||
pool.install(|| {
|
||||
post_processing::post_process(
|
||||
indexing_context,
|
||||
wtxn,
|
||||
global_fields_ids_map,
|
||||
facet_field_ids_delta,
|
||||
)
|
||||
})
|
||||
.unwrap()?;
|
||||
|
||||
indexing_context.progress.update_progress(IndexingStep::BuildingGeoJson);
|
||||
index.cellulite.build(
|
||||
|
@@ -0,0 +1,164 @@
|
||||
use std::fs::File;
|
||||
use std::io::BufReader;
|
||||
use std::{iter, mem};
|
||||
|
||||
use grenad::CompressionType;
|
||||
use heed::types::{Bytes, LazyDecode};
|
||||
use heed::{Database, RwTxn};
|
||||
use rayon::prelude::*;
|
||||
use roaring::MultiOps;
|
||||
use tempfile::tempfile;
|
||||
|
||||
use crate::facet::FacetType;
|
||||
use crate::heed_codec::facet::{FacetGroupKey, FacetGroupKeyCodec, FacetGroupValueCodec};
|
||||
use crate::heed_codec::BytesRefCodec;
|
||||
use crate::update::facet::{FACET_GROUP_SIZE, FACET_MIN_LEVEL_SIZE};
|
||||
use crate::update::{create_writer, writer_into_reader};
|
||||
use crate::{CboRoaringBitmapCodec, FieldId, Index};
|
||||
|
||||
/// Generate the facet level based on the level 0.
|
||||
///
|
||||
/// The function will generate all the group levels from
|
||||
/// the group 1 to the level n until the number of group
|
||||
/// is smaller than the minimum required size.
|
||||
pub fn generate_facet_levels(
|
||||
index: &Index,
|
||||
wtxn: &mut RwTxn,
|
||||
field_id: FieldId,
|
||||
facet_type: FacetType,
|
||||
) -> crate::Result<()> {
|
||||
let db = match facet_type {
|
||||
FacetType::String => index
|
||||
.facet_id_string_docids
|
||||
.remap_key_type::<FacetGroupKeyCodec<BytesRefCodec>>()
|
||||
.lazily_decode_data(),
|
||||
FacetType::Number => index
|
||||
.facet_id_f64_docids
|
||||
.remap_key_type::<FacetGroupKeyCodec<BytesRefCodec>>()
|
||||
.lazily_decode_data(),
|
||||
};
|
||||
|
||||
clear_levels(db, wtxn, field_id)?;
|
||||
|
||||
let mut base_level: u8 = 0;
|
||||
// That's a do-while loop
|
||||
while {
|
||||
let mut level_size = 0;
|
||||
let level = base_level.checked_add(1).unwrap();
|
||||
for reader in compute_level(index, wtxn, db, field_id, base_level)? {
|
||||
let mut cursor = reader.into_cursor()?;
|
||||
while let Some((left_bound, facet_group_value)) = cursor.move_on_next()? {
|
||||
level_size += 1;
|
||||
let key = FacetGroupKey { field_id, level, left_bound };
|
||||
debug_assert!(
|
||||
db.get(wtxn, &key).transpose().is_none(),
|
||||
"entry must not be there and must have already been deleted: {key:?}"
|
||||
);
|
||||
db.remap_data_type::<Bytes>().put(wtxn, &key, facet_group_value)?;
|
||||
}
|
||||
}
|
||||
|
||||
base_level = level;
|
||||
|
||||
// If the next level will have the minimum required groups, continue.
|
||||
(level_size / FACET_GROUP_SIZE as usize) >= FACET_MIN_LEVEL_SIZE as usize
|
||||
} {}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Compute the groups of facets from the provided base level
|
||||
/// and write the content into different grenad files.
|
||||
fn compute_level(
|
||||
index: &Index,
|
||||
wtxn: &heed::RwTxn,
|
||||
db: Database<FacetGroupKeyCodec<BytesRefCodec>, LazyDecode<FacetGroupValueCodec>>,
|
||||
field_id: FieldId,
|
||||
base_level: u8,
|
||||
) -> Result<Vec<grenad::Reader<BufReader<File>>>, crate::Error> {
|
||||
let thread_count = rayon::current_num_threads();
|
||||
let rtxns = iter::repeat_with(|| index.env.nested_read_txn(wtxn))
|
||||
.take(thread_count)
|
||||
.collect::<heed::Result<Vec<_>>>()?;
|
||||
|
||||
let range = {
|
||||
// Based on the first possible value for the base level up to
|
||||
// the first possible value for the next level *excluded*.
|
||||
let left = FacetGroupKey::<&[u8]> { field_id, level: base_level, left_bound: &[] };
|
||||
let right = FacetGroupKey::<&[u8]> {
|
||||
field_id,
|
||||
level: base_level.checked_add(1).unwrap(),
|
||||
left_bound: &[],
|
||||
};
|
||||
left..right
|
||||
};
|
||||
|
||||
rtxns
|
||||
.into_par_iter()
|
||||
.enumerate()
|
||||
.map(|(thread_id, rtxn)| {
|
||||
let mut writer = tempfile().map(|f| create_writer(CompressionType::None, None, f))?;
|
||||
|
||||
let mut left_bound = None;
|
||||
let mut group_docids = Vec::new();
|
||||
let mut ser_buffer = Vec::new();
|
||||
for (i, result) in db.range(&rtxn, &range)?.enumerate() {
|
||||
let (key, lazy_value) = result?;
|
||||
|
||||
let start_of_group = i % FACET_GROUP_SIZE as usize == 0;
|
||||
let group_index = i / FACET_GROUP_SIZE as usize;
|
||||
let group_for_thread = group_index % thread_count == thread_id;
|
||||
|
||||
if group_for_thread {
|
||||
if start_of_group {
|
||||
if let Some(left_bound) = left_bound.take() {
|
||||
// We store the bitmaps in a Vec this way we can use
|
||||
// the MultiOps operations that tends to be more efficient
|
||||
// for unions. The Vec is empty after the operation.
|
||||
//
|
||||
// We also don't forget to store the group size corresponding
|
||||
// to the number of entries merged in this group.
|
||||
ser_buffer.clear();
|
||||
let group_len: u8 = group_docids.len().try_into().unwrap();
|
||||
ser_buffer.push(group_len);
|
||||
let group_docids = mem::take(&mut group_docids);
|
||||
let docids = group_docids.into_iter().union();
|
||||
CboRoaringBitmapCodec::serialize_into_vec(&docids, &mut ser_buffer);
|
||||
writer.insert(left_bound, &ser_buffer)?;
|
||||
}
|
||||
left_bound = Some(key.left_bound);
|
||||
}
|
||||
|
||||
// Lazily decode the bitmaps we are interested in.
|
||||
let value = lazy_value.decode().map_err(heed::Error::Decoding)?;
|
||||
group_docids.push(value.bitmap);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(left_bound) = left_bound.take() {
|
||||
ser_buffer.clear();
|
||||
// We don't forget to store the group size corresponding
|
||||
// to the number of entries merged in this group.
|
||||
let group_len: u8 = group_docids.len().try_into().unwrap();
|
||||
ser_buffer.push(group_len);
|
||||
let group_docids = group_docids.into_iter().union();
|
||||
CboRoaringBitmapCodec::serialize_into_vec(&group_docids, &mut ser_buffer);
|
||||
writer.insert(left_bound, &ser_buffer)?;
|
||||
}
|
||||
|
||||
writer_into_reader(writer)
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Clears all the levels and only keeps the level 0 of the specified field id.
|
||||
fn clear_levels(
|
||||
db: Database<FacetGroupKeyCodec<BytesRefCodec>, LazyDecode<FacetGroupValueCodec>>,
|
||||
wtxn: &mut RwTxn<'_>,
|
||||
field_id: FieldId,
|
||||
) -> heed::Result<()> {
|
||||
let left = FacetGroupKey::<&[u8]> { field_id, level: 1, left_bound: &[] };
|
||||
let right = FacetGroupKey::<&[u8]> { field_id, level: u8::MAX, left_bound: &[] };
|
||||
let range = left..=right;
|
||||
db.delete_range(wtxn, &range).map(drop)
|
||||
}
|
@@ -1,5 +1,6 @@
|
||||
use std::cmp::Ordering;
|
||||
|
||||
use facet_bulk::generate_facet_levels;
|
||||
use heed::types::{Bytes, DecodeIgnore, Str};
|
||||
use heed::RwTxn;
|
||||
use itertools::{merge_join_by, EitherOrBoth};
|
||||
@@ -23,6 +24,8 @@ use crate::update::new::FacetFieldIdsDelta;
|
||||
use crate::update::{FacetsUpdateBulk, GrenadParameters};
|
||||
use crate::{GlobalFieldsIdsMap, Index, Result};
|
||||
|
||||
mod facet_bulk;
|
||||
|
||||
pub(super) fn post_process<MSP>(
|
||||
indexing_context: IndexingContext<MSP>,
|
||||
wtxn: &mut RwTxn<'_>,
|
||||
@@ -39,6 +42,7 @@ where
|
||||
wtxn,
|
||||
facet_field_ids_delta,
|
||||
&mut global_fields_ids_map,
|
||||
indexing_context.grenad_parameters,
|
||||
indexing_context.progress,
|
||||
)?;
|
||||
compute_facet_search_database(index, wtxn, global_fields_ids_map, indexing_context.progress)?;
|
||||
@@ -216,6 +220,7 @@ fn compute_facet_level_database(
|
||||
wtxn: &mut RwTxn,
|
||||
mut facet_field_ids_delta: FacetFieldIdsDelta,
|
||||
global_fields_ids_map: &mut GlobalFieldsIdsMap,
|
||||
grenad_parameters: &GrenadParameters,
|
||||
progress: &Progress,
|
||||
) -> Result<()> {
|
||||
let rtxn = index.read_txn()?;
|
||||
@@ -239,9 +244,14 @@ fn compute_facet_level_database(
|
||||
match delta {
|
||||
FacetFieldIdDelta::Bulk => {
|
||||
progress.update_progress(PostProcessingFacets::StringsBulk);
|
||||
tracing::debug!(%fid, "bulk string facet processing");
|
||||
FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::String)
|
||||
.execute(wtxn)?
|
||||
if grenad_parameters.experimental_no_edition_2024_for_facet_post_processing {
|
||||
tracing::debug!(%fid, "bulk string facet processing");
|
||||
FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::String)
|
||||
.execute(wtxn)?
|
||||
} else {
|
||||
tracing::debug!(%fid, "bulk string facet processing in parallel");
|
||||
generate_facet_levels(index, wtxn, fid, FacetType::String)?
|
||||
}
|
||||
}
|
||||
FacetFieldIdDelta::Incremental(delta_data) => {
|
||||
progress.update_progress(PostProcessingFacets::StringsIncremental);
|
@@ -1,11 +1,12 @@
|
||||
use std::cell::RefCell;
|
||||
use std::collections::BTreeSet;
|
||||
use std::io::{BufReader, BufWriter, Read, Seek, Write};
|
||||
use std::iter;
|
||||
|
||||
use hashbrown::HashMap;
|
||||
use heed::types::Bytes;
|
||||
use heed::types::{Bytes, DecodeIgnore};
|
||||
use heed::{BytesDecode, Database, Error, RoTxn, RwTxn};
|
||||
use rayon::iter::{IntoParallelIterator, ParallelIterator as _};
|
||||
use rayon::iter::{IndexedParallelIterator as _, IntoParallelIterator, ParallelIterator as _};
|
||||
use roaring::MultiOps;
|
||||
use tempfile::spooled_tempfile;
|
||||
use thread_local::ThreadLocal;
|
||||
@@ -151,22 +152,29 @@ impl<'a, 'rtxn> FrozenPrefixBitmaps<'a, 'rtxn> {
|
||||
|
||||
unsafe impl Sync for FrozenPrefixBitmaps<'_, '_> {}
|
||||
|
||||
struct WordPrefixIntegerDocids {
|
||||
struct WordPrefixIntegerDocids<'i> {
|
||||
index: &'i Index,
|
||||
database: Database<Bytes, CboRoaringBitmapCodec>,
|
||||
prefix_database: Database<Bytes, CboRoaringBitmapCodec>,
|
||||
max_memory_by_thread: Option<usize>,
|
||||
/// Do not use an experimental LMDB feature to read uncommitted data in parallel.
|
||||
no_experimental_post_processing: bool,
|
||||
}
|
||||
|
||||
impl WordPrefixIntegerDocids {
|
||||
impl<'i> WordPrefixIntegerDocids<'i> {
|
||||
fn new(
|
||||
index: &'i Index,
|
||||
database: Database<Bytes, CboRoaringBitmapCodec>,
|
||||
prefix_database: Database<Bytes, CboRoaringBitmapCodec>,
|
||||
grenad_parameters: &GrenadParameters,
|
||||
) -> WordPrefixIntegerDocids {
|
||||
grenad_parameters: &'_ GrenadParameters,
|
||||
) -> WordPrefixIntegerDocids<'i> {
|
||||
WordPrefixIntegerDocids {
|
||||
index,
|
||||
database,
|
||||
prefix_database,
|
||||
max_memory_by_thread: grenad_parameters.max_memory_by_thread(),
|
||||
no_experimental_post_processing: grenad_parameters
|
||||
.experimental_no_edition_2024_for_prefix_post_processing,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -177,7 +185,131 @@ impl WordPrefixIntegerDocids {
|
||||
prefix_to_delete: &BTreeSet<Prefix>,
|
||||
) -> Result<()> {
|
||||
delete_prefixes(wtxn, &self.prefix_database, prefix_to_delete)?;
|
||||
self.recompute_modified_prefixes(wtxn, prefix_to_compute)
|
||||
if self.no_experimental_post_processing {
|
||||
self.recompute_modified_prefixes(wtxn, prefix_to_compute)
|
||||
} else {
|
||||
self.recompute_modified_prefixes_no_frozen(wtxn, prefix_to_compute)
|
||||
}
|
||||
}
|
||||
|
||||
/// Computes the same as `recompute_modified_prefixes`.
|
||||
///
|
||||
/// ...but without aggregating the prefixes mmap pointers into a static HashMap
|
||||
/// beforehand and rather use an experimental LMDB feature to read the subset
|
||||
/// of prefixes in parallel from the uncommitted transaction.
|
||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")]
|
||||
fn recompute_modified_prefixes_no_frozen(
|
||||
&self,
|
||||
wtxn: &mut RwTxn,
|
||||
prefixes: &BTreeSet<Prefix>,
|
||||
) -> Result<()> {
|
||||
let thread_count = rayon::current_num_threads();
|
||||
let rtxns = iter::repeat_with(|| self.index.env.nested_read_txn(wtxn))
|
||||
.take(thread_count)
|
||||
.collect::<heed::Result<Vec<_>>>()?;
|
||||
|
||||
let outputs = rtxns
|
||||
.into_par_iter()
|
||||
.enumerate()
|
||||
.map(|(thread_id, rtxn)| {
|
||||
// `indexes` represent offsets at which prefixes computations were stored in the `file`.
|
||||
let mut indexes = Vec::new();
|
||||
let mut file = BufWriter::new(spooled_tempfile(
|
||||
self.max_memory_by_thread.unwrap_or(usize::MAX),
|
||||
));
|
||||
|
||||
let mut buffer = Vec::new();
|
||||
for (prefix_index, prefix) in prefixes.iter().enumerate() {
|
||||
// Is prefix for another thread?
|
||||
if prefix_index % thread_count != thread_id {
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut bitmap_bytes_at_positions = HashMap::new();
|
||||
for result in self
|
||||
.database
|
||||
.prefix_iter(&rtxn, prefix.as_bytes())?
|
||||
.remap_types::<StrBEU16Codec, Bytes>()
|
||||
{
|
||||
let ((_word, pos), bitmap_bytes) = result?;
|
||||
bitmap_bytes_at_positions
|
||||
.entry(pos)
|
||||
.or_insert_with(Vec::new)
|
||||
.push(bitmap_bytes);
|
||||
}
|
||||
|
||||
// We track positions with no corresponding bitmap bytes,
|
||||
// these means that the prefix no longer exists in the database
|
||||
// and must, therefore, be removed from the index.
|
||||
for result in self
|
||||
.prefix_database
|
||||
.prefix_iter(&rtxn, prefix.as_bytes())?
|
||||
.remap_types::<StrBEU16Codec, DecodeIgnore>()
|
||||
{
|
||||
let ((_word, pos), ()) = result?;
|
||||
// They are represented by an empty set of bitmaps.
|
||||
bitmap_bytes_at_positions.entry(pos).or_insert_with(Vec::new);
|
||||
}
|
||||
|
||||
for (pos, bitmaps_bytes) in bitmap_bytes_at_positions {
|
||||
if bitmaps_bytes.is_empty() {
|
||||
indexes.push(PrefixIntegerEntry {
|
||||
prefix,
|
||||
pos,
|
||||
serialized_length: None,
|
||||
});
|
||||
} else {
|
||||
let output = bitmaps_bytes
|
||||
.into_iter()
|
||||
.map(CboRoaringBitmapCodec::deserialize_from)
|
||||
.union()?;
|
||||
buffer.clear();
|
||||
CboRoaringBitmapCodec::serialize_into_vec(&output, &mut buffer);
|
||||
indexes.push(PrefixIntegerEntry {
|
||||
prefix,
|
||||
pos,
|
||||
serialized_length: Some(buffer.len()),
|
||||
});
|
||||
file.write_all(&buffer)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok((indexes, file))
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
// We iterate over all the collected and serialized bitmaps through
|
||||
// the files and entries to eventually put them in the final database.
|
||||
let mut key_buffer = Vec::new();
|
||||
let mut buffer = Vec::new();
|
||||
for (index, file) in outputs {
|
||||
let mut file = file.into_inner().map_err(|e| e.into_error())?;
|
||||
file.rewind()?;
|
||||
let mut file = BufReader::new(file);
|
||||
for PrefixIntegerEntry { prefix, pos, serialized_length } in index {
|
||||
key_buffer.clear();
|
||||
key_buffer.extend_from_slice(prefix.as_bytes());
|
||||
key_buffer.push(0);
|
||||
key_buffer.extend_from_slice(&pos.to_be_bytes());
|
||||
match serialized_length {
|
||||
Some(serialized_length) => {
|
||||
buffer.resize(serialized_length, 0);
|
||||
file.read_exact(&mut buffer)?;
|
||||
self.prefix_database.remap_data_type::<Bytes>().put(
|
||||
wtxn,
|
||||
&key_buffer,
|
||||
&buffer,
|
||||
)?;
|
||||
}
|
||||
None => {
|
||||
self.prefix_database.delete(wtxn, &key_buffer)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")]
|
||||
@@ -262,7 +394,7 @@ impl WordPrefixIntegerDocids {
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents a prefix and the lenght the bitmap takes on disk.
|
||||
/// Represents a prefix and the length the bitmap takes on disk.
|
||||
struct PrefixIntegerEntry<'a> {
|
||||
prefix: &'a str,
|
||||
pos: u16,
|
||||
@@ -363,6 +495,7 @@ pub fn compute_word_prefix_fid_docids(
|
||||
grenad_parameters: &GrenadParameters,
|
||||
) -> Result<()> {
|
||||
WordPrefixIntegerDocids::new(
|
||||
index,
|
||||
index.word_fid_docids.remap_key_type(),
|
||||
index.word_prefix_fid_docids.remap_key_type(),
|
||||
grenad_parameters,
|
||||
@@ -379,6 +512,7 @@ pub fn compute_word_prefix_position_docids(
|
||||
grenad_parameters: &GrenadParameters,
|
||||
) -> Result<()> {
|
||||
WordPrefixIntegerDocids::new(
|
||||
index,
|
||||
index.word_position_docids.remap_key_type(),
|
||||
index.word_prefix_position_docids.remap_key_type(),
|
||||
grenad_parameters,
|
||||
|
Reference in New Issue
Block a user