mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-07-22 14:21:03 +00:00
Compare commits
8 Commits
more-effic
...
finer-face
Author | SHA1 | Date | |
---|---|---|---|
b65a1ec4b5 | |||
922c915ce5 | |||
bd686991b9 | |||
b365150137 | |||
05179d1264 | |||
e580a8e27c | |||
9ac4a69fe4 | |||
4e280534a2 |
@ -1,14 +1,16 @@
|
|||||||
use std::collections::{BTreeMap, BTreeSet};
|
use std::collections::{BTreeMap, BTreeSet};
|
||||||
|
|
||||||
|
use either::Either;
|
||||||
use heed::RoTxn;
|
use heed::RoTxn;
|
||||||
use raw_collections::RawMap;
|
use raw_collections::RawMap;
|
||||||
|
use roaring::RoaringBitmap;
|
||||||
use serde_json::value::RawValue;
|
use serde_json::value::RawValue;
|
||||||
|
|
||||||
use super::vector_document::VectorDocument;
|
use super::vector_document::VectorDocument;
|
||||||
use super::{KvReaderFieldId, KvWriterFieldId};
|
use super::{KvReaderFieldId, KvWriterFieldId};
|
||||||
use crate::documents::FieldIdMapper;
|
use crate::documents::FieldIdMapper;
|
||||||
use crate::vector::parsed_vectors::RESERVED_VECTORS_FIELD_NAME;
|
use crate::vector::parsed_vectors::RESERVED_VECTORS_FIELD_NAME;
|
||||||
use crate::{DocumentId, GlobalFieldsIdsMap, Index, InternalError, Result, UserError};
|
use crate::{DocumentId, FieldId, GlobalFieldsIdsMap, Index, InternalError, Result, UserError};
|
||||||
|
|
||||||
/// A view into a document that can represent either the current version from the DB,
|
/// A view into a document that can represent either the current version from the DB,
|
||||||
/// the update data from payload or other means, or the merged updated version.
|
/// the update data from payload or other means, or the merged updated version.
|
||||||
@ -65,33 +67,7 @@ impl<'t, Mapper: FieldIdMapper> Copy for DocumentFromDb<'t, Mapper> {}
|
|||||||
|
|
||||||
impl<'t, Mapper: FieldIdMapper> Document<'t> for DocumentFromDb<'t, Mapper> {
|
impl<'t, Mapper: FieldIdMapper> Document<'t> for DocumentFromDb<'t, Mapper> {
|
||||||
fn iter_top_level_fields(&self) -> impl Iterator<Item = Result<(&'t str, &'t RawValue)>> {
|
fn iter_top_level_fields(&self) -> impl Iterator<Item = Result<(&'t str, &'t RawValue)>> {
|
||||||
let mut it = self.content.iter();
|
self.iter_top_level_fields_with_fid().map(|res| res.map(|(_, name, value)| (name, value)))
|
||||||
|
|
||||||
std::iter::from_fn(move || loop {
|
|
||||||
let (fid, value) = it.next()?;
|
|
||||||
let name = match self.fields_ids_map.name(fid).ok_or(
|
|
||||||
InternalError::FieldIdMapMissingEntry(crate::FieldIdMapMissingEntry::FieldId {
|
|
||||||
field_id: fid,
|
|
||||||
process: "getting current document",
|
|
||||||
}),
|
|
||||||
) {
|
|
||||||
Ok(name) => name,
|
|
||||||
Err(error) => return Some(Err(error.into())),
|
|
||||||
};
|
|
||||||
|
|
||||||
if name == RESERVED_VECTORS_FIELD_NAME || name == "_geo" {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let res = (|| {
|
|
||||||
let value =
|
|
||||||
serde_json::from_slice(value).map_err(crate::InternalError::SerdeJson)?;
|
|
||||||
|
|
||||||
Ok((name, value))
|
|
||||||
})();
|
|
||||||
|
|
||||||
return Some(res);
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn vectors_field(&self) -> Result<Option<&'t RawValue>> {
|
fn vectors_field(&self) -> Result<Option<&'t RawValue>> {
|
||||||
@ -140,6 +116,38 @@ impl<'t, Mapper: FieldIdMapper> DocumentFromDb<'t, Mapper> {
|
|||||||
let Some(value) = self.content.get(fid) else { return Ok(None) };
|
let Some(value) = self.content.get(fid) else { return Ok(None) };
|
||||||
Ok(Some(serde_json::from_slice(value).map_err(InternalError::SerdeJson)?))
|
Ok(Some(serde_json::from_slice(value).map_err(InternalError::SerdeJson)?))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn iter_top_level_fields_with_fid(
|
||||||
|
&self,
|
||||||
|
) -> impl Iterator<Item = Result<(FieldId, &'t str, &'t RawValue)>> + '_ {
|
||||||
|
let mut it = self.content.iter();
|
||||||
|
|
||||||
|
std::iter::from_fn(move || loop {
|
||||||
|
let (fid, value) = it.next()?;
|
||||||
|
let name = match self.fields_ids_map.name(fid).ok_or(
|
||||||
|
InternalError::FieldIdMapMissingEntry(crate::FieldIdMapMissingEntry::FieldId {
|
||||||
|
field_id: fid,
|
||||||
|
process: "getting current document",
|
||||||
|
}),
|
||||||
|
) {
|
||||||
|
Ok(name) => name,
|
||||||
|
Err(error) => return Some(Err(error.into())),
|
||||||
|
};
|
||||||
|
|
||||||
|
if name == RESERVED_VECTORS_FIELD_NAME || name == "_geo" {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let res = (|| {
|
||||||
|
let value =
|
||||||
|
serde_json::from_slice(value).map_err(crate::InternalError::SerdeJson)?;
|
||||||
|
|
||||||
|
Ok((fid, name, value))
|
||||||
|
})();
|
||||||
|
|
||||||
|
return Some(res);
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@ -182,12 +190,130 @@ impl<'a, 'doc> Document<'doc> for DocumentFromVersions<'a, 'doc> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A [`Document`] whose fields value are the [`DocumentFromVersions`] value if exists,
|
||||||
|
/// or else the [`DocumentFromDb`] value.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct MergedDocument<'a, 'doc, 't, Mapper: FieldIdMapper> {
|
pub struct MergedDocument<'a, 'doc, 't, Mapper: FieldIdMapper> {
|
||||||
new_doc: DocumentFromVersions<'a, 'doc>,
|
new_doc: DocumentFromVersions<'a, 'doc>,
|
||||||
db: Option<DocumentFromDb<'t, Mapper>>,
|
db: Option<DocumentFromDb<'t, Mapper>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A pseudo-document that returns [`DeltaValue`]s.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct DeltaDocument<'a, 'doc, 't, Mapper: FieldIdMapper> {
|
||||||
|
new_doc: DocumentFromVersions<'a, 'doc>,
|
||||||
|
db_doc: Option<DocumentFromDb<'t, Mapper>>,
|
||||||
|
has_deletion: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, 'doc, 't, Mapper: FieldIdMapper> DeltaDocument<'a, 'doc, 't, Mapper> {
|
||||||
|
pub fn new(
|
||||||
|
docid: DocumentId,
|
||||||
|
rtxn: &'t RoTxn,
|
||||||
|
index: &'t Index,
|
||||||
|
db_fields_ids_map: &'t Mapper,
|
||||||
|
new_doc: DocumentFromVersions<'a, 'doc>,
|
||||||
|
has_deletion: bool,
|
||||||
|
) -> Result<Self> {
|
||||||
|
let db_doc = DocumentFromDb::new(docid, rtxn, index, db_fields_ids_map)?;
|
||||||
|
Ok(Self { db_doc, new_doc, has_deletion })
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn delta_top_level_fields<'either>(
|
||||||
|
&self,
|
||||||
|
) -> impl Iterator<Item = Result<(&'either str, DeltaValue<'either, 't, 'doc>)>> + '_
|
||||||
|
where
|
||||||
|
't: 'either,
|
||||||
|
'doc: 'either,
|
||||||
|
{
|
||||||
|
match &self.db_doc {
|
||||||
|
// since we'll be returning all db top level fields, it makes more sense to iterate on the db first:
|
||||||
|
// 1. random field access is faster on RawMap than on obkvs
|
||||||
|
// 2. we can store a roaring of fid instead of btree set of fields
|
||||||
|
Some(db_doc) => {
|
||||||
|
let mut new_doc_it = self.new_doc.iter_top_level_fields();
|
||||||
|
let mut db_iter = db_doc.iter_top_level_fields_with_fid();
|
||||||
|
let fid_map = db_doc.fields_ids_map;
|
||||||
|
let mut seen_fields = RoaringBitmap::new();
|
||||||
|
let has_deletion = self.has_deletion;
|
||||||
|
|
||||||
|
Either::Left(std::iter::from_fn(move || {
|
||||||
|
if let Some(entry) = db_iter.next() {
|
||||||
|
let (fid, name, db_value) = match entry {
|
||||||
|
Ok(entry) => entry,
|
||||||
|
Err(err) => return Some(Err(err)),
|
||||||
|
};
|
||||||
|
seen_fields.insert(fid.into());
|
||||||
|
let new_value = match self.new_doc.top_level_field(name) {
|
||||||
|
Ok(new_value) => new_value,
|
||||||
|
Err(err) => return Some(Err(err)),
|
||||||
|
};
|
||||||
|
|
||||||
|
match new_value {
|
||||||
|
Some(new_value) => {
|
||||||
|
if new_value.get() == db_value.get() {
|
||||||
|
return Some(Ok((name, DeltaValue::Unchanged(new_value))));
|
||||||
|
} else {
|
||||||
|
return Some(Ok((
|
||||||
|
name,
|
||||||
|
DeltaValue::Modified(db_value, new_value),
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
if has_deletion {
|
||||||
|
return Some(Ok((name, DeltaValue::Deleted(db_value))));
|
||||||
|
} else {
|
||||||
|
return Some(Ok((name, DeltaValue::Unchanged(db_value))));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
{
|
||||||
|
match new_doc_it.by_ref().find(|res| {
|
||||||
|
if let Ok((name, _)) = res {
|
||||||
|
if let Some(fid) = fid_map.id(name) {
|
||||||
|
return !seen_fields.contains(fid.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
true
|
||||||
|
})? {
|
||||||
|
Ok((name, new_value)) => {
|
||||||
|
Some(Ok((name, DeltaValue::Inserted(new_value))))
|
||||||
|
}
|
||||||
|
Err(err) => Some(Err(err)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
None => Either::Right(self.new_doc.iter_top_level_fields().map(|res| {
|
||||||
|
let (k, v) = res?;
|
||||||
|
Ok((k, DeltaValue::Inserted(v)))
|
||||||
|
})),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn delta_geo_field(&self) -> Result<Option<DeltaValue<'_, 't, 'doc>>> {
|
||||||
|
let db_geo_field = match self.db_doc {
|
||||||
|
Some(db) => db.geo_field()?,
|
||||||
|
None => None,
|
||||||
|
};
|
||||||
|
|
||||||
|
let new_doc_geo_field = self.new_doc.geo_field()?;
|
||||||
|
|
||||||
|
Ok(match (db_geo_field, new_doc_geo_field) {
|
||||||
|
(None, None) => None,
|
||||||
|
(None, Some(new_doc)) => Some(DeltaValue::Inserted(new_doc)),
|
||||||
|
(Some(db), None) => Some(if self.has_deletion {
|
||||||
|
DeltaValue::Deleted(db)
|
||||||
|
} else {
|
||||||
|
DeltaValue::Unchanged(db)
|
||||||
|
}),
|
||||||
|
(Some(db), Some(new_doc)) => Some(DeltaValue::Modified(db, new_doc)),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl<'a, 'doc, 't, Mapper: FieldIdMapper> MergedDocument<'a, 'doc, 't, Mapper> {
|
impl<'a, 'doc, 't, Mapper: FieldIdMapper> MergedDocument<'a, 'doc, 't, Mapper> {
|
||||||
pub fn with_db(
|
pub fn with_db(
|
||||||
docid: DocumentId,
|
docid: DocumentId,
|
||||||
@ -205,6 +331,13 @@ impl<'a, 'doc, 't, Mapper: FieldIdMapper> MergedDocument<'a, 'doc, 't, Mapper> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub enum DeltaValue<'either, 't: 'either, 'doc: 'either> {
|
||||||
|
Deleted(&'t RawValue),
|
||||||
|
Inserted(&'doc RawValue),
|
||||||
|
Modified(&'t RawValue, &'doc RawValue),
|
||||||
|
Unchanged(&'either RawValue),
|
||||||
|
}
|
||||||
|
|
||||||
impl<'d, 'doc: 'd, 't: 'd, Mapper: FieldIdMapper> Document<'d>
|
impl<'d, 'doc: 'd, 't: 'd, Mapper: FieldIdMapper> Document<'d>
|
||||||
for MergedDocument<'d, 'doc, 't, Mapper>
|
for MergedDocument<'d, 'doc, 't, Mapper>
|
||||||
{
|
{
|
||||||
|
@ -2,7 +2,7 @@ use bumpalo::Bump;
|
|||||||
use heed::RoTxn;
|
use heed::RoTxn;
|
||||||
|
|
||||||
use super::document::{
|
use super::document::{
|
||||||
Document as _, DocumentFromDb, DocumentFromVersions, MergedDocument, Versions,
|
DeltaDocument, Document as _, DocumentFromDb, DocumentFromVersions, MergedDocument, Versions,
|
||||||
};
|
};
|
||||||
use super::extract::perm_json_p;
|
use super::extract::perm_json_p;
|
||||||
use super::vector_document::{
|
use super::vector_document::{
|
||||||
@ -167,6 +167,22 @@ impl<'doc> Update<'doc> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn delta<'t, Mapper: FieldIdMapper>(
|
||||||
|
&self,
|
||||||
|
rtxn: &'t RoTxn,
|
||||||
|
index: &'t Index,
|
||||||
|
mapper: &'t Mapper,
|
||||||
|
) -> Result<DeltaDocument<'_, 'doc, 't, Mapper>> {
|
||||||
|
DeltaDocument::new(
|
||||||
|
self.docid,
|
||||||
|
rtxn,
|
||||||
|
index,
|
||||||
|
mapper,
|
||||||
|
DocumentFromVersions::new(&self.new),
|
||||||
|
self.has_deletion,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns whether the updated version of the document is different from the current version for the passed subset of fields.
|
/// Returns whether the updated version of the document is different from the current version for the passed subset of fields.
|
||||||
///
|
///
|
||||||
/// `true` if at least one top-level-field that is a exactly a member of field or a parent of a member of field changed.
|
/// `true` if at least one top-level-field that is a exactly a member of field or a parent of a member of field changed.
|
||||||
|
@ -9,7 +9,7 @@ use heed::RoTxn;
|
|||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
|
||||||
use super::super::cache::BalancedCaches;
|
use super::super::cache::BalancedCaches;
|
||||||
use super::facet_document::extract_document_facets;
|
use super::facet_document::{extract_document_facets, extract_merged_document_facets};
|
||||||
use super::FacetKind;
|
use super::FacetKind;
|
||||||
use crate::heed_codec::facet::OrderedF64Codec;
|
use crate::heed_codec::facet::OrderedF64Codec;
|
||||||
use crate::update::del_add::DelAdd;
|
use crate::update::del_add::DelAdd;
|
||||||
@ -106,17 +106,19 @@ impl FacetedDocidsExtractor {
|
|||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
extract_document_facets(
|
extract_merged_document_facets(
|
||||||
attributes_to_extract,
|
attributes_to_extract,
|
||||||
inner.current(rtxn, index, context.db_fields_ids_map)?,
|
inner.delta(rtxn, index, context.db_fields_ids_map)?,
|
||||||
inner.external_document_id(),
|
inner.external_document_id(),
|
||||||
|
&mut del_add_facet_value,
|
||||||
|
cached_sorter.deref_mut(),
|
||||||
new_fields_ids_map.deref_mut(),
|
new_fields_ids_map.deref_mut(),
|
||||||
&mut |fid, depth, value| {
|
&mut |fid, depth, value, del_add_facet_value, cached_sorter| {
|
||||||
Self::facet_fn_with_options(
|
Self::facet_fn_with_options(
|
||||||
&context.doc_alloc,
|
&context.doc_alloc,
|
||||||
cached_sorter.deref_mut(),
|
cached_sorter,
|
||||||
BalancedCaches::insert_del_u32,
|
BalancedCaches::insert_del_u32,
|
||||||
&mut del_add_facet_value,
|
del_add_facet_value,
|
||||||
DelAddFacetValue::insert_del,
|
DelAddFacetValue::insert_del,
|
||||||
docid,
|
docid,
|
||||||
fid,
|
fid,
|
||||||
@ -124,19 +126,12 @@ impl FacetedDocidsExtractor {
|
|||||||
value,
|
value,
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
)?;
|
&mut |fid, depth, value, del_add_facet_value, cached_sorter| {
|
||||||
|
|
||||||
extract_document_facets(
|
|
||||||
attributes_to_extract,
|
|
||||||
inner.merged(rtxn, index, context.db_fields_ids_map)?,
|
|
||||||
inner.external_document_id(),
|
|
||||||
new_fields_ids_map.deref_mut(),
|
|
||||||
&mut |fid, depth, value| {
|
|
||||||
Self::facet_fn_with_options(
|
Self::facet_fn_with_options(
|
||||||
&context.doc_alloc,
|
&context.doc_alloc,
|
||||||
cached_sorter.deref_mut(),
|
cached_sorter,
|
||||||
BalancedCaches::insert_add_u32,
|
BalancedCaches::insert_add_u32,
|
||||||
&mut del_add_facet_value,
|
del_add_facet_value,
|
||||||
DelAddFacetValue::insert_add,
|
DelAddFacetValue::insert_add,
|
||||||
docid,
|
docid,
|
||||||
fid,
|
fid,
|
||||||
@ -282,7 +277,7 @@ impl FacetedDocidsExtractor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct DelAddFacetValue<'doc> {
|
pub(crate) struct DelAddFacetValue<'doc> {
|
||||||
strings: HashMap<(FieldId, BVec<'doc, u8>), DelAdd, hashbrown::DefaultHashBuilder, &'doc Bump>,
|
strings: HashMap<(FieldId, BVec<'doc, u8>), DelAdd, hashbrown::DefaultHashBuilder, &'doc Bump>,
|
||||||
f64s: HashMap<(FieldId, BVec<'doc, u8>), DelAdd, hashbrown::DefaultHashBuilder, &'doc Bump>,
|
f64s: HashMap<(FieldId, BVec<'doc, u8>), DelAdd, hashbrown::DefaultHashBuilder, &'doc Bump>,
|
||||||
}
|
}
|
||||||
|
@ -1,9 +1,11 @@
|
|||||||
|
use serde_json::value::RawValue;
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
|
||||||
use crate::update::new::document::Document;
|
use super::extract_facets::DelAddFacetValue;
|
||||||
|
use crate::update::new::document::{DeltaDocument, DeltaValue, Document};
|
||||||
use crate::update::new::extract::geo::extract_geo_coordinates;
|
use crate::update::new::extract::geo::extract_geo_coordinates;
|
||||||
use crate::update::new::extract::perm_json_p;
|
use crate::update::new::extract::{perm_json_p, BalancedCaches};
|
||||||
use crate::{FieldId, GlobalFieldsIdsMap, InternalError, Result, UserError};
|
use crate::{FieldId, FieldsIdsMap, GlobalFieldsIdsMap, InternalError, Result, UserError};
|
||||||
|
|
||||||
pub fn extract_document_facets<'doc>(
|
pub fn extract_document_facets<'doc>(
|
||||||
attributes_to_extract: &[&str],
|
attributes_to_extract: &[&str],
|
||||||
@ -15,58 +17,7 @@ pub fn extract_document_facets<'doc>(
|
|||||||
for res in document.iter_top_level_fields() {
|
for res in document.iter_top_level_fields() {
|
||||||
let (field_name, value) = res?;
|
let (field_name, value) = res?;
|
||||||
|
|
||||||
let mut tokenize_field =
|
extract_document_facet(attributes_to_extract, field_id_map, facet_fn, field_name, value)?;
|
||||||
|name: &str, depth: perm_json_p::Depth, value: &Value| match field_id_map
|
|
||||||
.id_or_insert(name)
|
|
||||||
{
|
|
||||||
Some(field_id) => facet_fn(field_id, depth, value),
|
|
||||||
None => Err(UserError::AttributeLimitReached.into()),
|
|
||||||
};
|
|
||||||
|
|
||||||
// if the current field is searchable or contains a searchable attribute
|
|
||||||
let selection = perm_json_p::select_field(field_name, Some(attributes_to_extract), &[]);
|
|
||||||
if selection != perm_json_p::Selection::Skip {
|
|
||||||
// parse json.
|
|
||||||
match serde_json::value::to_value(value).map_err(InternalError::SerdeJson)? {
|
|
||||||
Value::Object(object) => {
|
|
||||||
perm_json_p::seek_leaf_values_in_object(
|
|
||||||
&object,
|
|
||||||
Some(attributes_to_extract),
|
|
||||||
&[], // skip no attributes
|
|
||||||
field_name,
|
|
||||||
perm_json_p::Depth::OnBaseKey,
|
|
||||||
&mut tokenize_field,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
if selection == perm_json_p::Selection::Select {
|
|
||||||
tokenize_field(
|
|
||||||
field_name,
|
|
||||||
perm_json_p::Depth::OnBaseKey,
|
|
||||||
&Value::Object(object),
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Value::Array(array) => {
|
|
||||||
perm_json_p::seek_leaf_values_in_array(
|
|
||||||
&array,
|
|
||||||
Some(attributes_to_extract),
|
|
||||||
&[], // skip no attributes
|
|
||||||
field_name,
|
|
||||||
perm_json_p::Depth::OnBaseKey,
|
|
||||||
&mut tokenize_field,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
if selection == perm_json_p::Selection::Select {
|
|
||||||
tokenize_field(
|
|
||||||
field_name,
|
|
||||||
perm_json_p::Depth::OnBaseKey,
|
|
||||||
&Value::Array(array),
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
value => tokenize_field(field_name, perm_json_p::Depth::OnBaseKey, &value)?,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if attributes_to_extract.contains(&"_geo") {
|
if attributes_to_extract.contains(&"_geo") {
|
||||||
@ -85,3 +36,199 @@ pub fn extract_document_facets<'doc>(
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn extract_document_facet(
|
||||||
|
attributes_to_extract: &[&str],
|
||||||
|
field_id_map: &mut GlobalFieldsIdsMap<'_>,
|
||||||
|
facet_fn: &mut impl FnMut(u16, perm_json_p::Depth, &Value) -> std::result::Result<(), crate::Error>,
|
||||||
|
field_name: &str,
|
||||||
|
value: &serde_json::value::RawValue,
|
||||||
|
) -> Result<()> {
|
||||||
|
let mut tokenize_field = |name: &str, depth: perm_json_p::Depth, value: &Value| {
|
||||||
|
match field_id_map.id_or_insert(name) {
|
||||||
|
Some(field_id) => facet_fn(field_id, depth, value),
|
||||||
|
None => Err(UserError::AttributeLimitReached.into()),
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let selection = perm_json_p::select_field(field_name, Some(attributes_to_extract), &[]);
|
||||||
|
if selection != perm_json_p::Selection::Skip {
|
||||||
|
// parse json.
|
||||||
|
match serde_json::value::to_value(value).map_err(InternalError::SerdeJson)? {
|
||||||
|
Value::Object(object) => {
|
||||||
|
perm_json_p::seek_leaf_values_in_object(
|
||||||
|
&object,
|
||||||
|
Some(attributes_to_extract),
|
||||||
|
&[], // skip no attributes
|
||||||
|
field_name,
|
||||||
|
perm_json_p::Depth::OnBaseKey,
|
||||||
|
&mut tokenize_field,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
if selection == perm_json_p::Selection::Select {
|
||||||
|
tokenize_field(
|
||||||
|
field_name,
|
||||||
|
perm_json_p::Depth::OnBaseKey,
|
||||||
|
&Value::Object(object),
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Value::Array(array) => {
|
||||||
|
perm_json_p::seek_leaf_values_in_array(
|
||||||
|
&array,
|
||||||
|
Some(attributes_to_extract),
|
||||||
|
&[], // skip no attributes
|
||||||
|
field_name,
|
||||||
|
perm_json_p::Depth::OnBaseKey,
|
||||||
|
&mut tokenize_field,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
if selection == perm_json_p::Selection::Select {
|
||||||
|
tokenize_field(
|
||||||
|
field_name,
|
||||||
|
perm_json_p::Depth::OnBaseKey,
|
||||||
|
&Value::Array(array),
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
value => tokenize_field(field_name, perm_json_p::Depth::OnBaseKey, &value)?,
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
|
pub fn extract_merged_document_facets<'doc, 'del_add_facet_value, 'cache>(
|
||||||
|
attributes_to_extract: &[&str],
|
||||||
|
document: DeltaDocument<'doc, 'doc, 'doc, FieldsIdsMap>,
|
||||||
|
external_document_id: &str,
|
||||||
|
del_add_facet_value: &mut DelAddFacetValue<'del_add_facet_value>,
|
||||||
|
cached_sorter: &mut BalancedCaches<'cache>,
|
||||||
|
field_id_map: &mut GlobalFieldsIdsMap,
|
||||||
|
facet_fn_del: &mut impl FnMut(
|
||||||
|
FieldId,
|
||||||
|
perm_json_p::Depth,
|
||||||
|
&Value,
|
||||||
|
&mut DelAddFacetValue<'del_add_facet_value>,
|
||||||
|
&mut BalancedCaches<'cache>,
|
||||||
|
) -> Result<()>,
|
||||||
|
facet_fn_add: &mut impl FnMut(
|
||||||
|
FieldId,
|
||||||
|
perm_json_p::Depth,
|
||||||
|
&Value,
|
||||||
|
&mut DelAddFacetValue<'del_add_facet_value>,
|
||||||
|
&mut BalancedCaches<'cache>,
|
||||||
|
) -> Result<()>,
|
||||||
|
) -> Result<()> {
|
||||||
|
for res in document.delta_top_level_fields() {
|
||||||
|
let (field_name, value) = res?;
|
||||||
|
match value {
|
||||||
|
DeltaValue::Deleted(value) => {
|
||||||
|
extract_document_facet(
|
||||||
|
attributes_to_extract,
|
||||||
|
field_id_map,
|
||||||
|
&mut |fid, depth, value| {
|
||||||
|
facet_fn_del(fid, depth, value, del_add_facet_value, cached_sorter)
|
||||||
|
},
|
||||||
|
field_name,
|
||||||
|
value,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
DeltaValue::Inserted(value) => {
|
||||||
|
extract_document_facet(
|
||||||
|
attributes_to_extract,
|
||||||
|
field_id_map,
|
||||||
|
&mut |fid, depth, value| {
|
||||||
|
facet_fn_add(fid, depth, value, del_add_facet_value, cached_sorter)
|
||||||
|
},
|
||||||
|
field_name,
|
||||||
|
value,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
DeltaValue::Modified(current, updated) => {
|
||||||
|
extract_document_facet(
|
||||||
|
attributes_to_extract,
|
||||||
|
field_id_map,
|
||||||
|
&mut |fid, depth, value| {
|
||||||
|
facet_fn_del(fid, depth, value, del_add_facet_value, cached_sorter)
|
||||||
|
},
|
||||||
|
field_name,
|
||||||
|
current,
|
||||||
|
)?;
|
||||||
|
extract_document_facet(
|
||||||
|
attributes_to_extract,
|
||||||
|
field_id_map,
|
||||||
|
&mut |fid, depth, value| {
|
||||||
|
facet_fn_add(fid, depth, value, del_add_facet_value, cached_sorter)
|
||||||
|
},
|
||||||
|
field_name,
|
||||||
|
updated,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
DeltaValue::Unchanged(_) => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if attributes_to_extract.contains(&"_geo") {
|
||||||
|
match document.delta_geo_field()? {
|
||||||
|
Some(DeltaValue::Deleted(deleted)) => {
|
||||||
|
extract_geo_facet(
|
||||||
|
external_document_id,
|
||||||
|
deleted,
|
||||||
|
field_id_map,
|
||||||
|
&mut |fid, depth, value| {
|
||||||
|
facet_fn_del(fid, depth, value, del_add_facet_value, cached_sorter)
|
||||||
|
},
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
Some(DeltaValue::Inserted(inserted)) => {
|
||||||
|
extract_geo_facet(
|
||||||
|
external_document_id,
|
||||||
|
inserted,
|
||||||
|
field_id_map,
|
||||||
|
&mut |fid, depth, value| {
|
||||||
|
facet_fn_add(fid, depth, value, del_add_facet_value, cached_sorter)
|
||||||
|
},
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
Some(DeltaValue::Modified(current, updated)) => {
|
||||||
|
extract_geo_facet(
|
||||||
|
external_document_id,
|
||||||
|
current,
|
||||||
|
field_id_map,
|
||||||
|
&mut |fid, depth, value| {
|
||||||
|
facet_fn_del(fid, depth, value, del_add_facet_value, cached_sorter)
|
||||||
|
},
|
||||||
|
)?;
|
||||||
|
extract_geo_facet(
|
||||||
|
external_document_id,
|
||||||
|
updated,
|
||||||
|
field_id_map,
|
||||||
|
&mut |fid, depth, value| {
|
||||||
|
facet_fn_add(fid, depth, value, del_add_facet_value, cached_sorter)
|
||||||
|
},
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
None | Some(DeltaValue::Unchanged(_)) => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn extract_geo_facet(
|
||||||
|
external_document_id: &str,
|
||||||
|
geo_value: &RawValue,
|
||||||
|
field_id_map: &mut GlobalFieldsIdsMap<'_>,
|
||||||
|
facet_fn: &mut impl FnMut(FieldId, perm_json_p::Depth, &Value) -> Result<()>,
|
||||||
|
) -> Result<()> {
|
||||||
|
if let Some([lat, lng]) = extract_geo_coordinates(external_document_id, geo_value)? {
|
||||||
|
let (lat_fid, lng_fid) = field_id_map
|
||||||
|
.id_or_insert("_geo.lat")
|
||||||
|
.zip(field_id_map.id_or_insert("_geo.lng"))
|
||||||
|
.ok_or(UserError::AttributeLimitReached)?;
|
||||||
|
|
||||||
|
facet_fn(lat_fid, perm_json_p::Depth::OnBaseKey, &lat.into())?;
|
||||||
|
facet_fn(lng_fid, perm_json_p::Depth::OnBaseKey, &lng.into())?;
|
||||||
|
};
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user