mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-07-21 05:41:01 +00:00
Compare commits
6 Commits
disable-ar
...
parallel-b
Author | SHA1 | Date | |
---|---|---|---|
21db4b6b29 | |||
3dcf681d46 | |||
55aa75dccb | |||
f06218d316 | |||
2a6297203c | |||
2ed4eed376 |
14
Cargo.lock
generated
14
Cargo.lock
generated
@ -2398,8 +2398,7 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
|
||||
[[package]]
|
||||
name = "heed"
|
||||
version = "0.20.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7d4f449bab7320c56003d37732a917e18798e2f1709d80263face2b4f9436ddb"
|
||||
source = "git+https://github.com/meilisearch/heed?branch=allow-nested-rtxn-from-wtxn-v0-20#01f3242791171c393dab153d2f6f2777867c89d0"
|
||||
dependencies = [
|
||||
"bitflags 2.6.0",
|
||||
"byteorder",
|
||||
@ -2416,14 +2415,12 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "heed-traits"
|
||||
version = "0.20.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "eb3130048d404c57ce5a1ac61a903696e8fcde7e8c2991e9fcfc1f27c3ef74ff"
|
||||
source = "git+https://github.com/meilisearch/heed?branch=allow-nested-rtxn-from-wtxn-v0-20#01f3242791171c393dab153d2f6f2777867c89d0"
|
||||
|
||||
[[package]]
|
||||
name = "heed-types"
|
||||
version = "0.20.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9d3f528b053a6d700b2734eabcd0fd49cb8230647aa72958467527b0b7917114"
|
||||
source = "git+https://github.com/meilisearch/heed?branch=allow-nested-rtxn-from-wtxn-v0-20#01f3242791171c393dab153d2f6f2777867c89d0"
|
||||
dependencies = [
|
||||
"bincode",
|
||||
"byteorder",
|
||||
@ -3468,9 +3465,8 @@ checksum = "4ee93343901ab17bd981295f2cf0026d4ad018c7c31ba84549a4ddbb47a45104"
|
||||
|
||||
[[package]]
|
||||
name = "lmdb-master-sys"
|
||||
version = "0.2.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "472c3760e2a8d0f61f322fb36788021bb36d573c502b50fa3e2bcaac3ec326c9"
|
||||
version = "0.2.3"
|
||||
source = "git+https://github.com/meilisearch/heed?branch=allow-nested-rtxn-from-wtxn-v0-20#01f3242791171c393dab153d2f6f2777867c89d0"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"doxygen-rs",
|
||||
|
@ -43,3 +43,6 @@ opt-level = 3
|
||||
opt-level = 3
|
||||
[profile.dev.package.roaring]
|
||||
opt-level = 3
|
||||
|
||||
[patch.crates-io]
|
||||
heed = { git = "https://github.com/meilisearch/heed", branch = "allow-nested-rtxn-from-wtxn-v0-20" }
|
||||
|
@ -101,7 +101,13 @@ uell = "0.1.0"
|
||||
enum-iterator = "2.1.0"
|
||||
bbqueue = { git = "https://github.com/meilisearch/bbqueue" }
|
||||
flume = { version = "0.11.1", default-features = false }
|
||||
utoipa = { version = "5.3.1", features = ["non_strict_integers", "preserve_order", "uuid", "time", "openapi_extensions"] }
|
||||
utoipa = { version = "5.3.1", features = [
|
||||
"non_strict_integers",
|
||||
"preserve_order",
|
||||
"uuid",
|
||||
"time",
|
||||
"openapi_extensions",
|
||||
] }
|
||||
|
||||
[dev-dependencies]
|
||||
mimalloc = { version = "0.1.43", default-features = false }
|
||||
@ -113,9 +119,7 @@ meili-snap = { path = "../meili-snap" }
|
||||
rand = { version = "0.8.5", features = ["small_rng"] }
|
||||
|
||||
[features]
|
||||
all-tokenizations = [
|
||||
"charabia/default",
|
||||
]
|
||||
all-tokenizations = ["charabia/default"]
|
||||
|
||||
# Use POSIX semaphores instead of SysV semaphores in LMDB
|
||||
# For more information on this feature, see heed's Cargo.toml
|
||||
|
@ -199,12 +199,17 @@ where
|
||||
&indexing_context.must_stop_processing,
|
||||
)?;
|
||||
|
||||
post_processing::post_process(
|
||||
indexing_context,
|
||||
wtxn,
|
||||
global_fields_ids_map,
|
||||
facet_field_ids_delta,
|
||||
)?;
|
||||
pool.install(|| {
|
||||
// The post processing step is using rayon to process
|
||||
// stuff in parallel and therefore need the thread pool.
|
||||
post_processing::post_process(
|
||||
indexing_context,
|
||||
wtxn,
|
||||
global_fields_ids_map,
|
||||
facet_field_ids_delta,
|
||||
)
|
||||
})
|
||||
.unwrap()?;
|
||||
|
||||
indexing_context.progress.update_progress(IndexingStep::Finalizing);
|
||||
|
||||
|
@ -0,0 +1,164 @@
|
||||
use std::fs::File;
|
||||
use std::io::BufReader;
|
||||
use std::{iter, mem};
|
||||
|
||||
use grenad::CompressionType;
|
||||
use heed::types::{Bytes, LazyDecode};
|
||||
use heed::{Database, RwTxn};
|
||||
use rayon::prelude::*;
|
||||
use roaring::MultiOps;
|
||||
use tempfile::tempfile;
|
||||
|
||||
use crate::facet::FacetType;
|
||||
use crate::heed_codec::facet::{FacetGroupKey, FacetGroupKeyCodec, FacetGroupValueCodec};
|
||||
use crate::heed_codec::BytesRefCodec;
|
||||
use crate::update::facet::{FACET_GROUP_SIZE, FACET_MIN_LEVEL_SIZE};
|
||||
use crate::update::{create_writer, writer_into_reader};
|
||||
use crate::{CboRoaringBitmapCodec, FieldId, Index};
|
||||
|
||||
/// Generate the facet level based on the level 0.
|
||||
///
|
||||
/// The function will generate all the group levels from
|
||||
/// the group 1 to the level n until the number of group
|
||||
/// is smaller than the minimum required size.
|
||||
pub fn generate_facet_levels(
|
||||
index: &Index,
|
||||
wtxn: &mut RwTxn,
|
||||
field_id: FieldId,
|
||||
facet_type: FacetType,
|
||||
) -> crate::Result<()> {
|
||||
let db = match facet_type {
|
||||
FacetType::String => index
|
||||
.facet_id_string_docids
|
||||
.remap_key_type::<FacetGroupKeyCodec<BytesRefCodec>>()
|
||||
.lazily_decode_data(),
|
||||
FacetType::Number => index
|
||||
.facet_id_f64_docids
|
||||
.remap_key_type::<FacetGroupKeyCodec<BytesRefCodec>>()
|
||||
.lazily_decode_data(),
|
||||
};
|
||||
|
||||
clear_levels(db, wtxn, field_id)?;
|
||||
|
||||
let mut base_level = 0;
|
||||
// That's a do-while loop
|
||||
while {
|
||||
let mut level_size = 0;
|
||||
for reader in compute_level(index, wtxn, db, field_id, base_level)? {
|
||||
let mut cursor = reader.into_cursor()?;
|
||||
while let Some((left_bound, facet_group_value)) = cursor.move_on_next()? {
|
||||
level_size += 1;
|
||||
let level = base_level.checked_add(1).unwrap();
|
||||
let key = FacetGroupKey { field_id, level, left_bound };
|
||||
debug_assert!(
|
||||
db.get(wtxn, &key).transpose().is_none(),
|
||||
"entry must not be there and must have already been deleted: {key:?}"
|
||||
);
|
||||
db.remap_data_type::<Bytes>().put(wtxn, &key, facet_group_value)?;
|
||||
}
|
||||
}
|
||||
|
||||
base_level += 1;
|
||||
|
||||
// If the next level will have the minimum required groups, continue.
|
||||
(level_size / FACET_GROUP_SIZE as usize) >= FACET_MIN_LEVEL_SIZE as usize
|
||||
} {}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Compute the groups of facets from the provided base level
|
||||
/// and write the content into different grenad files.
|
||||
fn compute_level(
|
||||
index: &Index,
|
||||
wtxn: &heed::RwTxn,
|
||||
db: Database<FacetGroupKeyCodec<BytesRefCodec>, LazyDecode<FacetGroupValueCodec>>,
|
||||
field_id: FieldId,
|
||||
base_level: u8,
|
||||
) -> Result<Vec<grenad::Reader<BufReader<File>>>, crate::Error> {
|
||||
let thread_count = rayon::current_num_threads();
|
||||
let rtxns = iter::repeat_with(|| index.env.nested_read_txn(wtxn))
|
||||
.take(thread_count)
|
||||
.collect::<heed::Result<Vec<_>>>()?;
|
||||
|
||||
let range = {
|
||||
// Based on the first possible value for the base level up to
|
||||
// the first possible value for the next level *excluded*.
|
||||
let left = FacetGroupKey::<&[u8]> { field_id, level: base_level, left_bound: &[] };
|
||||
let right = FacetGroupKey::<&[u8]> {
|
||||
field_id,
|
||||
level: base_level.checked_add(1).unwrap(),
|
||||
left_bound: &[],
|
||||
};
|
||||
left..right
|
||||
};
|
||||
|
||||
rtxns
|
||||
.into_par_iter()
|
||||
.enumerate()
|
||||
.map(|(thread_id, rtxn)| {
|
||||
let mut writer = tempfile().map(|f| create_writer(CompressionType::None, None, f))?;
|
||||
|
||||
let mut left_bound = None;
|
||||
let mut group_docids = Vec::new();
|
||||
let mut ser_buffer = Vec::new();
|
||||
for (i, result) in db.range(&rtxn, &range)?.enumerate() {
|
||||
let (key, lazy_value) = result?;
|
||||
|
||||
let start_of_group = i % FACET_GROUP_SIZE as usize == 0;
|
||||
let group_index = i / FACET_GROUP_SIZE as usize;
|
||||
let group_for_thread = group_index % thread_count == thread_id;
|
||||
|
||||
if group_for_thread {
|
||||
if start_of_group {
|
||||
if let Some(left_bound) = left_bound.take() {
|
||||
// We store the bitmaps in a Vec this way we can use
|
||||
// the MultiOps operations that tends to be more efficient
|
||||
// for unions. The Vec is empty after the operation.
|
||||
//
|
||||
// We also don't forget to store the group size corresponding
|
||||
// to the number of entries merged in this group.
|
||||
ser_buffer.clear();
|
||||
let group_len: u8 = group_docids.len().try_into().unwrap();
|
||||
ser_buffer.push(group_len);
|
||||
let group_docids = mem::take(&mut group_docids);
|
||||
let docids = group_docids.into_iter().union();
|
||||
CboRoaringBitmapCodec::serialize_into_vec(&docids, &mut ser_buffer);
|
||||
writer.insert(left_bound, &ser_buffer)?;
|
||||
}
|
||||
left_bound = Some(key.left_bound);
|
||||
}
|
||||
|
||||
// Lazily decode the bitmaps we are interested in.
|
||||
let value = lazy_value.decode().map_err(heed::Error::Decoding)?;
|
||||
group_docids.push(value.bitmap);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(left_bound) = left_bound.take() {
|
||||
ser_buffer.clear();
|
||||
// We don't forget to store the group size corresponding
|
||||
// to the number of entries merged in this group.
|
||||
let group_len: u8 = group_docids.len().try_into().unwrap();
|
||||
ser_buffer.push(group_len);
|
||||
let group_docids = group_docids.into_iter().union();
|
||||
CboRoaringBitmapCodec::serialize_into_vec(&group_docids, &mut ser_buffer);
|
||||
writer.insert(left_bound, &ser_buffer)?;
|
||||
}
|
||||
|
||||
writer_into_reader(writer).map_err(Into::into)
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Clears all the levels and only keeps the level 0 of the specified field id.
|
||||
fn clear_levels(
|
||||
db: Database<FacetGroupKeyCodec<BytesRefCodec>, LazyDecode<FacetGroupValueCodec>>,
|
||||
wtxn: &mut RwTxn<'_>,
|
||||
field_id: FieldId,
|
||||
) -> heed::Result<()> {
|
||||
let left = FacetGroupKey::<&[u8]> { field_id, level: 1, left_bound: &[] };
|
||||
let right = FacetGroupKey::<&[u8]> { field_id, level: u8::MAX, left_bound: &[] };
|
||||
let range = left..=right;
|
||||
db.delete_range(wtxn, &range).map(drop)
|
||||
}
|
@ -1,5 +1,6 @@
|
||||
use std::cmp::Ordering;
|
||||
|
||||
use facet_bulk::generate_facet_levels;
|
||||
use heed::types::{Bytes, DecodeIgnore, Str};
|
||||
use heed::RwTxn;
|
||||
use itertools::{merge_join_by, EitherOrBoth};
|
||||
@ -22,6 +23,8 @@ use crate::update::new::FacetFieldIdsDelta;
|
||||
use crate::update::{FacetsUpdateBulk, GrenadParameters};
|
||||
use crate::{GlobalFieldsIdsMap, Index, Result};
|
||||
|
||||
mod facet_bulk;
|
||||
|
||||
pub(super) fn post_process<MSP>(
|
||||
indexing_context: IndexingContext<MSP>,
|
||||
wtxn: &mut RwTxn<'_>,
|
||||
@ -170,9 +173,8 @@ fn compute_facet_level_database(
|
||||
let _entered = span.enter();
|
||||
match delta {
|
||||
FacetFieldIdDelta::Bulk => {
|
||||
tracing::debug!(%fid, "bulk string facet processing");
|
||||
FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::String)
|
||||
.execute(wtxn)?
|
||||
tracing::debug!(%fid, "bulk string facet processing in parallel");
|
||||
generate_facet_levels(index, wtxn, fid, FacetType::String)?
|
||||
}
|
||||
FacetFieldIdDelta::Incremental(delta_data) => {
|
||||
tracing::debug!(%fid, len=%delta_data.len(), "incremental string facet processing");
|
@ -31,7 +31,7 @@
|
||||
"hackernews-modified-number-filters.ndjson": {
|
||||
"local_location": null,
|
||||
"remote_location": "https://milli-benchmarks.fra1.digitaloceanspaces.com/bench/datasets/hackernews/modification/01-modified-filters.ndjson",
|
||||
"sha256": "7272cbfd41110d32d7fe168424a0000f07589bfe40f664652b34f4f20aaf3802"
|
||||
"sha256": "b80c245ce1b1df80b9b38800f677f3bd11947ebc62716fb108269d50e796c35c"
|
||||
}
|
||||
},
|
||||
"precommands": [
|
||||
|
@ -31,7 +31,7 @@
|
||||
"hackernews-modified-string-filters.ndjson": {
|
||||
"local_location": null,
|
||||
"remote_location": "https://milli-benchmarks.fra1.digitaloceanspaces.com/bench/datasets/hackernews/modification/02-modified-filters.ndjson",
|
||||
"sha256": "b80c245ce1b1df80b9b38800f677f3bd11947ebc62716fb108269d50e796c35c"
|
||||
"sha256": "7272cbfd41110d32d7fe168424a0000f07589bfe40f664652b34f4f20aaf3802"
|
||||
}
|
||||
},
|
||||
"precommands": [
|
||||
|
Reference in New Issue
Block a user