Integrate facet level bulk update

Only the facet bulk update has been added so far, the incremental must be completely rewritten

Factorize facet merging

Fix facet level extraction
This commit is contained in:
ManyTheFish
2024-10-01 16:13:08 +02:00
parent 774ed28539
commit 14261f8f04
5 changed files with 181 additions and 67 deletions

View File

@@ -120,6 +120,7 @@ pub struct WriterOperation {
entry: EntryOperation,
}
#[derive(Debug)]
pub enum Database {
Documents,
ExternalDocumentsIds,
@@ -158,6 +159,18 @@ impl Database {
}
}
impl From<FacetKind> for Database {
fn from(value: FacetKind) -> Self {
match value {
FacetKind::Number => Database::FacetIdF64NumberDocids,
FacetKind::String => Database::FacetIdStringDocids,
FacetKind::Null => Database::FacetIdIsNullDocids,
FacetKind::Empty => Database::FacetIdIsEmptyDocids,
FacetKind::Exists => Database::FacetIdExistsDocids,
}
}
}
impl WriterOperation {
pub fn database(&self, index: &Index) -> heed::Database<Bytes, Bytes> {
self.database.database(index)
@@ -395,8 +408,18 @@ pub struct FacetDocidsSender<'a> {
impl DocidsSender for FacetDocidsSender<'_> {
fn write(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> {
let (database, key) = self.extract_database(key);
let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(key, value));
let (facet_kind, key) = FacetKind::extract_from_key(key);
let database = Database::from(facet_kind);
// let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(key, value));
let entry = match facet_kind {
// skip level group size
FacetKind::String | FacetKind::Number => {
// add facet group size
let value = [&[1], value].concat();
EntryOperation::Write(KeyValueEntry::from_small_key_value(key, &value))
}
_ => EntryOperation::Write(KeyValueEntry::from_small_key_value(key, value)),
};
match self.sender.send(WriterOperation { database, entry }) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
@@ -404,7 +427,8 @@ impl DocidsSender for FacetDocidsSender<'_> {
}
fn delete(&self, key: &[u8]) -> StdResult<(), SendError<()>> {
let (database, key) = self.extract_database(key);
let (facet_kind, key) = FacetKind::extract_from_key(key);
let database = Database::from(facet_kind);
let entry = EntryOperation::Delete(KeyEntry::from_key(key));
match self.sender.send(WriterOperation { database, entry }) {
Ok(()) => Ok(()),
@@ -413,19 +437,6 @@ impl DocidsSender for FacetDocidsSender<'_> {
}
}
impl FacetDocidsSender<'_> {
fn extract_database<'a>(&self, key: &'a [u8]) -> (Database, &'a [u8]) {
let database = match FacetKind::from(key[0]) {
FacetKind::Number => Database::FacetIdF64NumberDocids,
FacetKind::String => Database::FacetIdStringDocids,
FacetKind::Null => Database::FacetIdIsNullDocids,
FacetKind::Empty => Database::FacetIdIsEmptyDocids,
FacetKind::Exists => Database::FacetIdExistsDocids,
};
(database, &key[1..])
}
}
pub struct DocumentsSender<'a>(&'a MergerSender);
impl DocumentsSender<'_> {

View File

@@ -129,7 +129,7 @@ impl FacetedDocidsExtractor {
buffer.clear();
buffer.push(FacetKind::Number as u8);
buffer.extend_from_slice(&fid.to_be_bytes());
buffer.push(1); // level 0
buffer.push(0); // level 0
buffer.extend_from_slice(&ordered);
buffer.extend_from_slice(&n.to_be_bytes());
@@ -145,7 +145,7 @@ impl FacetedDocidsExtractor {
buffer.clear();
buffer.push(FacetKind::String as u8);
buffer.extend_from_slice(&fid.to_be_bytes());
buffer.push(1); // level 0
buffer.push(0); // level 0
buffer.extend_from_slice(truncated.as_bytes());
cache_fn(cached_sorter, &*buffer, docid).map_err(Into::into)
}

View File

@@ -4,6 +4,7 @@ mod facet_document;
pub use extract_facets::FacetedDocidsExtractor;
#[repr(u8)]
#[derive(Debug, Clone, Copy)]
pub enum FacetKind {
Number = 0,
String = 1,
@@ -24,3 +25,10 @@ impl From<u8> for FacetKind {
}
}
}
impl FacetKind {
pub fn extract_from_key<'k>(key: &'k [u8]) -> (FacetKind, &'k [u8]) {
debug_assert!(key.len() > 3);
(FacetKind::from(key[0]), &key[1..])
}
}

View File

@@ -13,18 +13,19 @@ pub use update_by_function::UpdateByFunction;
use super::channel::*;
use super::document_change::{Deletion, DocumentChange, Insertion, Update};
use super::extract::*;
use super::merger::merge_grenad_entries;
use super::merger::{merge_grenad_entries, FacetFieldIdsDelta};
use super::word_fst_builder::PrefixDelta;
use super::words_prefix_docids::{
compute_word_prefix_docids, compute_word_prefix_fid_docids, compute_word_prefix_position_docids,
};
use super::{StdResult, TopLevelMap};
use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY};
use crate::facet::FacetType;
use crate::update::new::channel::ExtractorSender;
use crate::update::settings::InnerIndexSettings;
use crate::update::new::parallel_iterator_ext::ParallelIteratorExt;
use crate::update::GrenadParameters;
use crate::{Error, FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError};
use crate::update::{FacetsUpdateBulk, GrenadParameters};
mod document_deletion;
mod document_operation;
@@ -71,11 +72,11 @@ where
let global_fields_ids_map_clone = global_fields_ids_map.clone();
thread::scope(|s| {
let indexer_span = tracing::Span::current();
// TODO manage the errors correctly
let current_span = tracing::Span::current();
let handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || {
pool.in_place_scope(|_s| {
let span = tracing::trace_span!(target: "indexing::documents", parent: &current_span, "extract");
let span = tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract");
let _entered = span.enter();
let document_changes = document_changes.into_par_iter();
@@ -179,11 +180,11 @@ where
})
})?;
let indexer_span = tracing::Span::current();
// TODO manage the errors correctly
let current_span = tracing::Span::current();
let merger_thread = Builder::new().name(S("indexer-merger")).spawn_scoped(s, move || {
let span =
tracing::trace_span!(target: "indexing::documents", parent: &current_span, "merge");
tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "merge");
let _entered = span.enter();
let rtxn = index.read_txn().unwrap();
merge_grenad_entries(
@@ -211,17 +212,12 @@ where
handle.join().unwrap()?;
let merger_result = merger_thread.join().unwrap()?;
if let Some(prefix_delta) = merger_result.prefix_delta {
let span = tracing::trace_span!(target: "indexing", "prefix");
let _entered = span.enter();
if let Some(facet_field_ids_delta) = merger_result.facet_field_ids_delta {
compute_facet_level_database(index, wtxn, facet_field_ids_delta)?;
}
let PrefixDelta { modified, deleted } = prefix_delta;
// Compute word prefix docids
compute_word_prefix_docids(wtxn, index, &modified, &deleted)?;
// Compute word prefix fid docids
compute_word_prefix_fid_docids(wtxn, index, &modified, &deleted)?;
// Compute word prefix position docids
compute_word_prefix_position_docids(wtxn, index, &modified, &deleted)?;
if let Some(prefix_delta) = merger_result.prefix_delta {
compute_prefix_database(index, wtxn, prefix_delta)?;
}
Ok(()) as Result<_>
@@ -238,6 +234,51 @@ where
Ok(())
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")]
fn compute_prefix_database(
index: &Index,
wtxn: &mut RwTxn,
prefix_delta: PrefixDelta,
) -> Result<()> {
let PrefixDelta { modified, deleted } = prefix_delta;
// Compute word prefix docids
compute_word_prefix_docids(wtxn, index, &modified, &deleted)?;
// Compute word prefix fid docids
compute_word_prefix_fid_docids(wtxn, index, &modified, &deleted)?;
// Compute word prefix position docids
compute_word_prefix_position_docids(wtxn, index, &modified, &deleted)
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_field_ids")]
fn compute_facet_level_database(
index: &Index,
wtxn: &mut RwTxn,
facet_field_ids_delta: FacetFieldIdsDelta,
) -> Result<()> {
if let Some(modified_facet_string_ids) = facet_field_ids_delta.modified_facet_string_ids() {
let span = tracing::trace_span!(target: "indexing::facet_field_ids", "string");
let _entered = span.enter();
FacetsUpdateBulk::new_not_updating_level_0(
index,
modified_facet_string_ids,
FacetType::String,
)
.execute(wtxn)?;
}
if let Some(modified_facet_number_ids) = facet_field_ids_delta.modified_facet_number_ids() {
let span = tracing::trace_span!(target: "indexing::facet_field_ids", "number");
let _entered = span.enter();
FacetsUpdateBulk::new_not_updating_level_0(
index,
modified_facet_number_ids,
FacetType::Number,
)
.execute(wtxn)?;
}
Ok(())
}
/// TODO: GrenadParameters::default() should be removed in favor a passed parameter
/// TODO: manage the errors correctly
/// TODO: we must have a single trait that also gives the extractor type

View File

@@ -16,7 +16,9 @@ use crate::update::del_add::DelAdd;
use crate::update::new::channel::MergerOperation;
use crate::update::new::word_fst_builder::WordFstBuilder;
use crate::update::MergeDeladdCboRoaringBitmaps;
use crate::{CboRoaringBitmapCodec, Error, GeoPoint, GlobalFieldsIdsMap, Index, Prefix, Result};
use crate::{
CboRoaringBitmapCodec, Error, FieldId, GeoPoint, GlobalFieldsIdsMap, Index, Prefix, Result,
};
/// TODO We must return some infos/stats
#[tracing::instrument(level = "trace", skip_all, target = "indexing::documents", name = "merge")]
@@ -188,13 +190,17 @@ pub fn merge_grenad_entries(
let span =
tracing::trace_span!(target: "indexing::documents::merge", "facet_docids");
let _entered = span.enter();
let mut facet_field_ids_delta = FacetFieldIdsDelta::new();
merge_and_send_facet_docids(
merger,
FacetDatabases::new(index),
rtxn,
&mut buffer,
sender.facet_docids(),
&mut facet_field_ids_delta,
)?;
merger_result.facet_field_ids_delta = Some(facet_field_ids_delta);
}
}
}
@@ -218,6 +224,8 @@ pub fn merge_grenad_entries(
pub struct MergerResult {
/// The delta of the prefixes
pub prefix_delta: Option<PrefixDelta>,
/// The field ids that have been modified
pub facet_field_ids_delta: Option<FacetFieldIdsDelta>,
}
pub struct GeoExtractor {
@@ -308,20 +316,23 @@ fn merge_and_send_facet_docids(
rtxn: &RoTxn<'_>,
buffer: &mut Vec<u8>,
docids_sender: impl DocidsSender,
facet_field_ids_delta: &mut FacetFieldIdsDelta,
) -> Result<()> {
let mut merger_iter = merger.into_stream_merger_iter().unwrap();
while let Some((key, deladd)) = merger_iter.next().unwrap() {
let current = database.get(rtxn, key)?;
let current = database.get_cbo_roaring_bytes_value(rtxn, key)?;
let deladd: &KvReaderDelAdd = deladd.into();
let del = deladd.get(DelAdd::Deletion);
let add = deladd.get(DelAdd::Addition);
match merge_cbo_bitmaps(current, del, add)? {
Operation::Write(bitmap) => {
facet_field_ids_delta.register_from_key(key);
let value = cbo_bitmap_serialize_into_vec(&bitmap, buffer);
docids_sender.write(key, value).unwrap();
}
Operation::Delete => {
facet_field_ids_delta.register_from_key(key);
docids_sender.delete(key).unwrap();
}
Operation::Ignore => (),
@@ -331,43 +342,84 @@ fn merge_and_send_facet_docids(
Ok(())
}
struct FacetDatabases {
/// Maps the facet field id and the docids for which this field exists
facet_id_exists_docids: Database<Bytes, Bytes>,
/// Maps the facet field id and the docids for which this field is set as null
facet_id_is_null_docids: Database<Bytes, Bytes>,
/// Maps the facet field id and the docids for which this field is considered empty
facet_id_is_empty_docids: Database<Bytes, Bytes>,
/// Maps the facet field id and ranges of numbers with the docids that corresponds to them.
facet_id_f64_docids: Database<Bytes, Bytes>,
/// Maps the facet field id and ranges of strings with the docids that corresponds to them.
facet_id_string_docids: Database<Bytes, Bytes>,
struct FacetDatabases<'a> {
index: &'a Index,
}
impl FacetDatabases {
fn new(index: &Index) -> Self {
Self {
facet_id_exists_docids: index.facet_id_exists_docids.remap_types(),
facet_id_is_null_docids: index.facet_id_is_null_docids.remap_types(),
facet_id_is_empty_docids: index.facet_id_is_empty_docids.remap_types(),
facet_id_f64_docids: index.facet_id_f64_docids.remap_types(),
facet_id_string_docids: index.facet_id_string_docids.remap_types(),
}
impl<'a> FacetDatabases<'a> {
fn new(index: &'a Index) -> Self {
Self { index }
}
fn get<'a>(&self, rtxn: &'a RoTxn<'_>, key: &[u8]) -> heed::Result<Option<&'a [u8]>> {
let (facet_kind, key) = self.extract_facet_kind(key);
fn get_cbo_roaring_bytes_value<'t>(
&self,
rtxn: &'t RoTxn<'_>,
key: &[u8],
) -> heed::Result<Option<&'t [u8]>> {
let (facet_kind, key) = FacetKind::extract_from_key(key);
let value =
super::channel::Database::from(facet_kind).database(self.index).get(rtxn, key)?;
match facet_kind {
FacetKind::Exists => self.facet_id_exists_docids.get(rtxn, key),
FacetKind::Null => self.facet_id_is_null_docids.get(rtxn, key),
FacetKind::Empty => self.facet_id_is_empty_docids.get(rtxn, key),
FacetKind::Number => self.facet_id_f64_docids.get(rtxn, key),
FacetKind::String => self.facet_id_string_docids.get(rtxn, key),
// skip level group size
FacetKind::String | FacetKind::Number => Ok(value.map(|v| &v[1..])),
_ => Ok(value),
}
}
}
fn extract_facet_kind<'a>(&self, key: &'a [u8]) -> (FacetKind, &'a [u8]) {
(FacetKind::from(key[0]), &key[1..])
#[derive(Debug)]
pub struct FacetFieldIdsDelta {
/// The field ids that have been modified
modified_facet_string_ids: HashSet<FieldId>,
modified_facet_number_ids: HashSet<FieldId>,
}
impl FacetFieldIdsDelta {
fn new() -> Self {
Self {
modified_facet_string_ids: HashSet::new(),
modified_facet_number_ids: HashSet::new(),
}
}
fn register_facet_string_id(&mut self, field_id: FieldId) {
self.modified_facet_string_ids.insert(field_id);
}
fn register_facet_number_id(&mut self, field_id: FieldId) {
self.modified_facet_number_ids.insert(field_id);
}
fn register_from_key(&mut self, key: &[u8]) {
let (facet_kind, field_id) = self.extract_key_data(key);
match facet_kind {
FacetKind::Number => self.register_facet_number_id(field_id),
FacetKind::String => self.register_facet_string_id(field_id),
_ => (),
}
}
fn extract_key_data<'a>(&self, key: &'a [u8]) -> (FacetKind, FieldId) {
let facet_kind = FacetKind::from(key[0]);
let field_id = FieldId::from_be_bytes([key[1], key[2]]);
(facet_kind, field_id)
}
pub fn modified_facet_string_ids(&self) -> Option<Vec<FieldId>> {
if self.modified_facet_string_ids.is_empty() {
None
} else {
Some(self.modified_facet_string_ids.iter().copied().collect())
}
}
pub fn modified_facet_number_ids(&self) -> Option<Vec<FieldId>> {
if self.modified_facet_number_ids.is_empty() {
None
} else {
Some(self.modified_facet_number_ids.iter().copied().collect())
}
}
}
@@ -396,11 +448,13 @@ fn merge_cbo_bitmaps(
(Some(current), None, Some(add)) => Ok(Operation::Write(current | add)),
(Some(current), Some(del), add) => {
let output = match add {
Some(add) => (current - del) | add,
None => current - del,
Some(add) => (&current - del) | add,
None => &current - del,
};
if output.is_empty() {
Ok(Operation::Delete)
} else if current == output {
Ok(Operation::Ignore)
} else {
Ok(Operation::Write(output))
}