Refactor Document indexing process (Facets)

**Changes:**
The Documents changes now take a selector closure instead of a list of field to match the field to extract.
The seek_leaf_values_in_object function now uses a selector closure of a list of field to match the field to extract
The facet database extraction is now relying on the FilterableAttributesRule to match the field to extract.
The facet-search database extraction is now relying on the FieldIdMapWithMetadata to select the field to index.
The facet level database extraction is now relying on the FieldIdMapWithMetadata to select the field to index.

**Important:**
Because the filterable attributes are patterns now,
the fieldIdMap will only register the fields that exists in at least one document.
if a field doesn't exist in any document, it will not be registered even if it has been specified in the filterable fields.

**Impact:**
- Document Addition/modification facet indexing
- Document deletion facet indexing
This commit is contained in:
ManyTheFish
2025-03-03 10:30:42 +01:00
parent 659855c88e
commit 95bccaf5f5
8 changed files with 233 additions and 179 deletions

View File

@ -5,12 +5,13 @@ use std::ops::DerefMut as _;
use bumpalo::collections::Vec as BVec;
use bumpalo::Bump;
use hashbrown::HashMap;
use heed::RoTxn;
use serde_json::Value;
use super::super::cache::BalancedCaches;
use super::facet_document::extract_document_facets;
use super::FacetKind;
use crate::fields_ids_map::metadata::Metadata;
use crate::filterable_attributes_rules::match_faceted_field;
use crate::heed_codec::facet::OrderedF64Codec;
use crate::update::del_add::DelAdd;
use crate::update::new::channel::FieldIdDocidFacetSender;
@ -23,13 +24,17 @@ use crate::update::new::steps::IndexingStep;
use crate::update::new::thread_local::{FullySend, ThreadLocal};
use crate::update::new::DocumentChange;
use crate::update::GrenadParameters;
use crate::{DocumentId, FieldId, Index, Result, MAX_FACET_VALUE_LENGTH};
use crate::{DocumentId, FieldId, FilterableAttributesRule, Result, MAX_FACET_VALUE_LENGTH};
pub struct FacetedExtractorData<'a, 'b> {
attributes_to_extract: &'a [&'a str],
sender: &'a FieldIdDocidFacetSender<'a, 'b>,
grenad_parameters: &'a GrenadParameters,
buckets: usize,
filterable_attributes: Vec<FilterableAttributesRule>,
sortable_fields: HashSet<String>,
asc_desc_fields: HashSet<String>,
distinct_field: Option<String>,
is_geo_enabled: bool,
}
impl<'a, 'b, 'extractor> Extractor<'extractor> for FacetedExtractorData<'a, 'b> {
@ -52,7 +57,11 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for FacetedExtractorData<'a, 'b>
let change = change?;
FacetedDocidsExtractor::extract_document_change(
context,
self.attributes_to_extract,
&self.filterable_attributes,
&self.sortable_fields,
&self.asc_desc_fields,
&self.distinct_field,
self.is_geo_enabled,
change,
self.sender,
)?
@ -64,13 +73,18 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for FacetedExtractorData<'a, 'b>
pub struct FacetedDocidsExtractor;
impl FacetedDocidsExtractor {
#[allow(clippy::too_many_arguments)]
fn extract_document_change(
context: &DocumentChangeContext<RefCell<BalancedCaches>>,
attributes_to_extract: &[&str],
filterable_attributes: &[FilterableAttributesRule],
sortable_fields: &HashSet<String>,
asc_desc_fields: &HashSet<String>,
distinct_field: &Option<String>,
is_geo_enabled: bool,
document_change: DocumentChange,
sender: &FieldIdDocidFacetSender,
) -> Result<()> {
let index = &context.index;
let index = context.index;
let rtxn = &context.rtxn;
let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield();
let mut cached_sorter = context.data.borrow_mut_or_yield();
@ -78,11 +92,15 @@ impl FacetedDocidsExtractor {
let docid = document_change.docid();
let res = match document_change {
DocumentChange::Deletion(inner) => extract_document_facets(
attributes_to_extract,
inner.current(rtxn, index, context.db_fields_ids_map)?,
inner.external_document_id(),
new_fields_ids_map.deref_mut(),
&mut |fid, depth, value| {
filterable_attributes,
sortable_fields,
asc_desc_fields,
distinct_field,
is_geo_enabled,
&mut |fid, meta, depth, value| {
Self::facet_fn_with_options(
&context.doc_alloc,
cached_sorter.deref_mut(),
@ -91,6 +109,8 @@ impl FacetedDocidsExtractor {
DelAddFacetValue::insert_del,
docid,
fid,
meta,
filterable_attributes,
depth,
value,
)
@ -98,7 +118,15 @@ impl FacetedDocidsExtractor {
),
DocumentChange::Update(inner) => {
if !inner.has_changed_for_fields(
Some(attributes_to_extract),
&mut |field_name| {
match_faceted_field(
field_name,
filterable_attributes,
sortable_fields,
asc_desc_fields,
distinct_field,
)
},
rtxn,
index,
context.db_fields_ids_map,
@ -107,11 +135,15 @@ impl FacetedDocidsExtractor {
}
extract_document_facets(
attributes_to_extract,
inner.current(rtxn, index, context.db_fields_ids_map)?,
inner.external_document_id(),
new_fields_ids_map.deref_mut(),
&mut |fid, depth, value| {
filterable_attributes,
sortable_fields,
asc_desc_fields,
distinct_field,
is_geo_enabled,
&mut |fid, meta, depth, value| {
Self::facet_fn_with_options(
&context.doc_alloc,
cached_sorter.deref_mut(),
@ -120,6 +152,8 @@ impl FacetedDocidsExtractor {
DelAddFacetValue::insert_del,
docid,
fid,
meta,
filterable_attributes,
depth,
value,
)
@ -127,11 +161,15 @@ impl FacetedDocidsExtractor {
)?;
extract_document_facets(
attributes_to_extract,
inner.merged(rtxn, index, context.db_fields_ids_map)?,
inner.external_document_id(),
new_fields_ids_map.deref_mut(),
&mut |fid, depth, value| {
filterable_attributes,
sortable_fields,
asc_desc_fields,
distinct_field,
is_geo_enabled,
&mut |fid, meta, depth, value| {
Self::facet_fn_with_options(
&context.doc_alloc,
cached_sorter.deref_mut(),
@ -140,6 +178,8 @@ impl FacetedDocidsExtractor {
DelAddFacetValue::insert_add,
docid,
fid,
meta,
filterable_attributes,
depth,
value,
)
@ -147,11 +187,15 @@ impl FacetedDocidsExtractor {
)
}
DocumentChange::Insertion(inner) => extract_document_facets(
attributes_to_extract,
inner.inserted(),
inner.external_document_id(),
new_fields_ids_map.deref_mut(),
&mut |fid, depth, value| {
filterable_attributes,
sortable_fields,
asc_desc_fields,
distinct_field,
is_geo_enabled,
&mut |fid, meta, depth, value| {
Self::facet_fn_with_options(
&context.doc_alloc,
cached_sorter.deref_mut(),
@ -160,6 +204,8 @@ impl FacetedDocidsExtractor {
DelAddFacetValue::insert_add,
docid,
fid,
meta,
filterable_attributes,
depth,
value,
)
@ -180,9 +226,18 @@ impl FacetedDocidsExtractor {
facet_fn: impl Fn(&mut DelAddFacetValue<'doc>, FieldId, BVec<'doc, u8>, FacetKind),
docid: DocumentId,
fid: FieldId,
meta: Metadata,
filterable_attributes: &[FilterableAttributesRule],
depth: perm_json_p::Depth,
value: &Value,
) -> Result<()> {
// if the field is not faceted, do nothing
if !meta.is_faceted(filterable_attributes) {
return Ok(());
}
let features = meta.filterable_attributes_features(filterable_attributes);
let mut buffer = BVec::new_in(doc_alloc);
// Exists
// key: fid
@ -246,7 +301,9 @@ impl FacetedDocidsExtractor {
}
// Null
// key: fid
Value::Null if depth == perm_json_p::Depth::OnBaseKey => {
Value::Null
if depth == perm_json_p::Depth::OnBaseKey && features.is_filterable_null() =>
{
buffer.clear();
buffer.push(FacetKind::Null as u8);
buffer.extend_from_slice(&fid.to_be_bytes());
@ -254,19 +311,29 @@ impl FacetedDocidsExtractor {
}
// Empty
// key: fid
Value::Array(a) if a.is_empty() && depth == perm_json_p::Depth::OnBaseKey => {
Value::Array(a)
if a.is_empty()
&& depth == perm_json_p::Depth::OnBaseKey
&& features.is_filterable_empty() =>
{
buffer.clear();
buffer.push(FacetKind::Empty as u8);
buffer.extend_from_slice(&fid.to_be_bytes());
cache_fn(cached_sorter, &buffer, docid)
}
Value::String(_) if depth == perm_json_p::Depth::OnBaseKey => {
Value::String(_)
if depth == perm_json_p::Depth::OnBaseKey && features.is_filterable_empty() =>
{
buffer.clear();
buffer.push(FacetKind::Empty as u8);
buffer.extend_from_slice(&fid.to_be_bytes());
cache_fn(cached_sorter, &buffer, docid)
}
Value::Object(o) if o.is_empty() && depth == perm_json_p::Depth::OnBaseKey => {
Value::Object(o)
if o.is_empty()
&& depth == perm_json_p::Depth::OnBaseKey
&& features.is_filterable_empty() =>
{
buffer.clear();
buffer.push(FacetKind::Empty as u8);
buffer.extend_from_slice(&fid.to_be_bytes());
@ -276,10 +343,6 @@ impl FacetedDocidsExtractor {
_ => Ok(()),
}
}
fn attributes_to_extract<'a>(rtxn: &'a RoTxn, index: &'a Index) -> Result<HashSet<String>> {
index.user_defined_faceted_fields(rtxn)
}
}
struct DelAddFacetValue<'doc> {
@ -399,9 +462,11 @@ impl FacetedDocidsExtractor {
{
let index = indexing_context.index;
let rtxn = index.read_txn()?;
let attributes_to_extract = Self::attributes_to_extract(&rtxn, index)?;
let attributes_to_extract: Vec<_> =
attributes_to_extract.iter().map(|s| s.as_ref()).collect();
let filterable_attributes = index.filterable_attributes_rules(&rtxn)?;
let sortable_fields = index.sortable_fields(&rtxn)?;
let asc_desc_fields = index.asc_desc_fields(&rtxn)?;
let distinct_field = index.distinct_field(&rtxn)?.map(|s| s.to_string());
let is_geo_enabled = index.is_geo_enabled(&rtxn)?;
let datastore = ThreadLocal::new();
{
@ -410,10 +475,14 @@ impl FacetedDocidsExtractor {
let _entered = span.enter();
let extractor = FacetedExtractorData {
attributes_to_extract: &attributes_to_extract,
grenad_parameters: indexing_context.grenad_parameters,
buckets: rayon::current_num_threads(),
sender,
filterable_attributes,
sortable_fields,
asc_desc_fields,
distinct_field,
is_geo_enabled,
};
extract(
document_changes,

View File

@ -1,46 +1,80 @@
use std::collections::HashSet;
use serde_json::Value;
use crate::constants::RESERVED_GEO_FIELD_NAME;
use crate::attribute_patterns::PatternMatch;
use crate::fields_ids_map::metadata::Metadata;
use crate::update::new::document::Document;
use crate::update::new::extract::geo::extract_geo_coordinates;
use crate::update::new::extract::perm_json_p;
use crate::{FieldId, GlobalFieldsIdsMap, InternalError, Result, UserError};
use crate::{
FieldId, FilterableAttributesRule, GlobalFieldsIdsMap, InternalError, Result, UserError,
};
use crate::filterable_attributes_rules::match_faceted_field;
#[allow(clippy::too_many_arguments)]
pub fn extract_document_facets<'doc>(
attributes_to_extract: &[&str],
document: impl Document<'doc>,
external_document_id: &str,
field_id_map: &mut GlobalFieldsIdsMap,
facet_fn: &mut impl FnMut(FieldId, perm_json_p::Depth, &Value) -> Result<()>,
filterable_attributes: &[FilterableAttributesRule],
sortable_fields: &HashSet<String>,
asc_desc_fields: &HashSet<String>,
distinct_field: &Option<String>,
is_geo_enabled: bool,
facet_fn: &mut impl FnMut(FieldId, Metadata, perm_json_p::Depth, &Value) -> Result<()>,
) -> Result<()> {
// return the match result for the given field name.
let match_field = |field_name: &str| -> PatternMatch {
match_faceted_field(
field_name,
filterable_attributes,
sortable_fields,
asc_desc_fields,
distinct_field,
)
};
// extract the field if it is faceted (facet searchable, filterable, sortable)
let mut extract_field = |name: &str, depth: perm_json_p::Depth, value: &Value| -> Result<()> {
match field_id_map.id_with_metadata_or_insert(name) {
Some((field_id, meta)) => {
facet_fn(field_id, meta, depth, value)?;
Ok(())
}
None => Err(UserError::AttributeLimitReached.into()),
}
};
for res in document.iter_top_level_fields() {
let (field_name, value) = res?;
let selection = match_field(field_name);
let mut tokenize_field =
|name: &str, depth: perm_json_p::Depth, value: &Value| match field_id_map
.id_or_insert(name)
{
Some(field_id) => facet_fn(field_id, depth, value),
None => Err(UserError::AttributeLimitReached.into()),
};
// extract the field if it matches a pattern and if it is faceted (facet searchable, filterable, sortable)
let mut match_and_extract = |name: &str, depth: perm_json_p::Depth, value: &Value| {
let selection = match_field(name);
if selection == PatternMatch::Match {
extract_field(name, depth, value)?;
}
// if the current field is searchable or contains a searchable attribute
let selection = perm_json_p::select_field(field_name, Some(attributes_to_extract), &[]);
if selection != perm_json_p::Selection::Skip {
Ok(selection)
};
if selection != PatternMatch::NoMatch {
// parse json.
match serde_json::value::to_value(value).map_err(InternalError::SerdeJson)? {
Value::Object(object) => {
perm_json_p::seek_leaf_values_in_object(
&object,
Some(attributes_to_extract),
&[], // skip no attributes
field_name,
perm_json_p::Depth::OnBaseKey,
&mut tokenize_field,
&mut match_and_extract,
)?;
if selection == perm_json_p::Selection::Select {
tokenize_field(
if selection == PatternMatch::Match {
extract_field(
field_name,
perm_json_p::Depth::OnBaseKey,
&Value::Object(object),
@ -50,36 +84,34 @@ pub fn extract_document_facets<'doc>(
Value::Array(array) => {
perm_json_p::seek_leaf_values_in_array(
&array,
Some(attributes_to_extract),
&[], // skip no attributes
field_name,
perm_json_p::Depth::OnBaseKey,
&mut tokenize_field,
&mut match_and_extract,
)?;
if selection == perm_json_p::Selection::Select {
tokenize_field(
if selection == PatternMatch::Match {
extract_field(
field_name,
perm_json_p::Depth::OnBaseKey,
&Value::Array(array),
)?;
}
}
value => tokenize_field(field_name, perm_json_p::Depth::OnBaseKey, &value)?,
value => extract_field(field_name, perm_json_p::Depth::OnBaseKey, &value)?,
}
}
}
if attributes_to_extract.contains(&RESERVED_GEO_FIELD_NAME) {
if is_geo_enabled {
if let Some(geo_value) = document.geo_field()? {
if let Some([lat, lng]) = extract_geo_coordinates(external_document_id, geo_value)? {
let (lat_fid, lng_fid) = field_id_map
.id_or_insert("_geo.lat")
.zip(field_id_map.id_or_insert("_geo.lng"))
let ((lat_fid, lat_meta), (lng_fid, lng_meta)) = field_id_map
.id_with_metadata_or_insert("_geo.lat")
.zip(field_id_map.id_with_metadata_or_insert("_geo.lng"))
.ok_or(UserError::AttributeLimitReached)?;
facet_fn(lat_fid, perm_json_p::Depth::OnBaseKey, &lat.into())?;
facet_fn(lng_fid, perm_json_p::Depth::OnBaseKey, &lng.into())?;
facet_fn(lat_fid, lat_meta, perm_json_p::Depth::OnBaseKey, &lat.into())?;
facet_fn(lng_fid, lng_meta, perm_json_p::Depth::OnBaseKey, &lng.into())?;
}
}
}