Compare commits

...

8 Commits

4 changed files with 392 additions and 101 deletions

View File

@ -1,14 +1,16 @@
use std::collections::{BTreeMap, BTreeSet};
use either::Either;
use heed::RoTxn;
use raw_collections::RawMap;
use roaring::RoaringBitmap;
use serde_json::value::RawValue;
use super::vector_document::VectorDocument;
use super::{KvReaderFieldId, KvWriterFieldId};
use crate::documents::FieldIdMapper;
use crate::vector::parsed_vectors::RESERVED_VECTORS_FIELD_NAME;
use crate::{DocumentId, GlobalFieldsIdsMap, Index, InternalError, Result, UserError};
use crate::{DocumentId, FieldId, GlobalFieldsIdsMap, Index, InternalError, Result, UserError};
/// A view into a document that can represent either the current version from the DB,
/// the update data from payload or other means, or the merged updated version.
@ -65,33 +67,7 @@ impl<'t, Mapper: FieldIdMapper> Copy for DocumentFromDb<'t, Mapper> {}
impl<'t, Mapper: FieldIdMapper> Document<'t> for DocumentFromDb<'t, Mapper> {
fn iter_top_level_fields(&self) -> impl Iterator<Item = Result<(&'t str, &'t RawValue)>> {
let mut it = self.content.iter();
std::iter::from_fn(move || loop {
let (fid, value) = it.next()?;
let name = match self.fields_ids_map.name(fid).ok_or(
InternalError::FieldIdMapMissingEntry(crate::FieldIdMapMissingEntry::FieldId {
field_id: fid,
process: "getting current document",
}),
) {
Ok(name) => name,
Err(error) => return Some(Err(error.into())),
};
if name == RESERVED_VECTORS_FIELD_NAME || name == "_geo" {
continue;
}
let res = (|| {
let value =
serde_json::from_slice(value).map_err(crate::InternalError::SerdeJson)?;
Ok((name, value))
})();
return Some(res);
})
self.iter_top_level_fields_with_fid().map(|res| res.map(|(_, name, value)| (name, value)))
}
fn vectors_field(&self) -> Result<Option<&'t RawValue>> {
@ -140,6 +116,38 @@ impl<'t, Mapper: FieldIdMapper> DocumentFromDb<'t, Mapper> {
let Some(value) = self.content.get(fid) else { return Ok(None) };
Ok(Some(serde_json::from_slice(value).map_err(InternalError::SerdeJson)?))
}
pub fn iter_top_level_fields_with_fid(
&self,
) -> impl Iterator<Item = Result<(FieldId, &'t str, &'t RawValue)>> + '_ {
let mut it = self.content.iter();
std::iter::from_fn(move || loop {
let (fid, value) = it.next()?;
let name = match self.fields_ids_map.name(fid).ok_or(
InternalError::FieldIdMapMissingEntry(crate::FieldIdMapMissingEntry::FieldId {
field_id: fid,
process: "getting current document",
}),
) {
Ok(name) => name,
Err(error) => return Some(Err(error.into())),
};
if name == RESERVED_VECTORS_FIELD_NAME || name == "_geo" {
continue;
}
let res = (|| {
let value =
serde_json::from_slice(value).map_err(crate::InternalError::SerdeJson)?;
Ok((fid, name, value))
})();
return Some(res);
})
}
}
#[derive(Debug)]
@ -182,12 +190,130 @@ impl<'a, 'doc> Document<'doc> for DocumentFromVersions<'a, 'doc> {
}
}
/// A [`Document`] whose fields value are the [`DocumentFromVersions`] value if exists,
/// or else the [`DocumentFromDb`] value.
#[derive(Debug)]
pub struct MergedDocument<'a, 'doc, 't, Mapper: FieldIdMapper> {
new_doc: DocumentFromVersions<'a, 'doc>,
db: Option<DocumentFromDb<'t, Mapper>>,
}
/// A pseudo-document that returns [`DeltaValue`]s.
#[derive(Debug)]
pub struct DeltaDocument<'a, 'doc, 't, Mapper: FieldIdMapper> {
new_doc: DocumentFromVersions<'a, 'doc>,
db_doc: Option<DocumentFromDb<'t, Mapper>>,
has_deletion: bool,
}
impl<'a, 'doc, 't, Mapper: FieldIdMapper> DeltaDocument<'a, 'doc, 't, Mapper> {
pub fn new(
docid: DocumentId,
rtxn: &'t RoTxn,
index: &'t Index,
db_fields_ids_map: &'t Mapper,
new_doc: DocumentFromVersions<'a, 'doc>,
has_deletion: bool,
) -> Result<Self> {
let db_doc = DocumentFromDb::new(docid, rtxn, index, db_fields_ids_map)?;
Ok(Self { db_doc, new_doc, has_deletion })
}
pub fn delta_top_level_fields<'either>(
&self,
) -> impl Iterator<Item = Result<(&'either str, DeltaValue<'either, 't, 'doc>)>> + '_
where
't: 'either,
'doc: 'either,
{
match &self.db_doc {
// since we'll be returning all db top level fields, it makes more sense to iterate on the db first:
// 1. random field access is faster on RawMap than on obkvs
// 2. we can store a roaring of fid instead of btree set of fields
Some(db_doc) => {
let mut new_doc_it = self.new_doc.iter_top_level_fields();
let mut db_iter = db_doc.iter_top_level_fields_with_fid();
let fid_map = db_doc.fields_ids_map;
let mut seen_fields = RoaringBitmap::new();
let has_deletion = self.has_deletion;
Either::Left(std::iter::from_fn(move || {
if let Some(entry) = db_iter.next() {
let (fid, name, db_value) = match entry {
Ok(entry) => entry,
Err(err) => return Some(Err(err)),
};
seen_fields.insert(fid.into());
let new_value = match self.new_doc.top_level_field(name) {
Ok(new_value) => new_value,
Err(err) => return Some(Err(err)),
};
match new_value {
Some(new_value) => {
if new_value.get() == db_value.get() {
return Some(Ok((name, DeltaValue::Unchanged(new_value))));
} else {
return Some(Ok((
name,
DeltaValue::Modified(db_value, new_value),
)));
}
}
None => {
if has_deletion {
return Some(Ok((name, DeltaValue::Deleted(db_value))));
} else {
return Some(Ok((name, DeltaValue::Unchanged(db_value))));
}
}
}
}
{
match new_doc_it.by_ref().find(|res| {
if let Ok((name, _)) = res {
if let Some(fid) = fid_map.id(name) {
return !seen_fields.contains(fid.into());
}
}
true
})? {
Ok((name, new_value)) => {
Some(Ok((name, DeltaValue::Inserted(new_value))))
}
Err(err) => Some(Err(err)),
}
}
}))
}
None => Either::Right(self.new_doc.iter_top_level_fields().map(|res| {
let (k, v) = res?;
Ok((k, DeltaValue::Inserted(v)))
})),
}
}
pub fn delta_geo_field(&self) -> Result<Option<DeltaValue<'_, 't, 'doc>>> {
let db_geo_field = match self.db_doc {
Some(db) => db.geo_field()?,
None => None,
};
let new_doc_geo_field = self.new_doc.geo_field()?;
Ok(match (db_geo_field, new_doc_geo_field) {
(None, None) => None,
(None, Some(new_doc)) => Some(DeltaValue::Inserted(new_doc)),
(Some(db), None) => Some(if self.has_deletion {
DeltaValue::Deleted(db)
} else {
DeltaValue::Unchanged(db)
}),
(Some(db), Some(new_doc)) => Some(DeltaValue::Modified(db, new_doc)),
})
}
}
impl<'a, 'doc, 't, Mapper: FieldIdMapper> MergedDocument<'a, 'doc, 't, Mapper> {
pub fn with_db(
docid: DocumentId,
@ -205,6 +331,13 @@ impl<'a, 'doc, 't, Mapper: FieldIdMapper> MergedDocument<'a, 'doc, 't, Mapper> {
}
}
pub enum DeltaValue<'either, 't: 'either, 'doc: 'either> {
Deleted(&'t RawValue),
Inserted(&'doc RawValue),
Modified(&'t RawValue, &'doc RawValue),
Unchanged(&'either RawValue),
}
impl<'d, 'doc: 'd, 't: 'd, Mapper: FieldIdMapper> Document<'d>
for MergedDocument<'d, 'doc, 't, Mapper>
{

View File

@ -2,7 +2,7 @@ use bumpalo::Bump;
use heed::RoTxn;
use super::document::{
Document as _, DocumentFromDb, DocumentFromVersions, MergedDocument, Versions,
DeltaDocument, Document as _, DocumentFromDb, DocumentFromVersions, MergedDocument, Versions,
};
use super::extract::perm_json_p;
use super::vector_document::{
@ -167,6 +167,22 @@ impl<'doc> Update<'doc> {
}
}
pub fn delta<'t, Mapper: FieldIdMapper>(
&self,
rtxn: &'t RoTxn,
index: &'t Index,
mapper: &'t Mapper,
) -> Result<DeltaDocument<'_, 'doc, 't, Mapper>> {
DeltaDocument::new(
self.docid,
rtxn,
index,
mapper,
DocumentFromVersions::new(&self.new),
self.has_deletion,
)
}
/// Returns whether the updated version of the document is different from the current version for the passed subset of fields.
///
/// `true` if at least one top-level-field that is a exactly a member of field or a parent of a member of field changed.

View File

@ -9,7 +9,7 @@ use heed::RoTxn;
use serde_json::Value;
use super::super::cache::BalancedCaches;
use super::facet_document::extract_document_facets;
use super::facet_document::{extract_document_facets, extract_merged_document_facets};
use super::FacetKind;
use crate::heed_codec::facet::OrderedF64Codec;
use crate::update::del_add::DelAdd;
@ -106,17 +106,19 @@ impl FacetedDocidsExtractor {
return Ok(());
}
extract_document_facets(
extract_merged_document_facets(
attributes_to_extract,
inner.current(rtxn, index, context.db_fields_ids_map)?,
inner.delta(rtxn, index, context.db_fields_ids_map)?,
inner.external_document_id(),
&mut del_add_facet_value,
cached_sorter.deref_mut(),
new_fields_ids_map.deref_mut(),
&mut |fid, depth, value| {
&mut |fid, depth, value, del_add_facet_value, cached_sorter| {
Self::facet_fn_with_options(
&context.doc_alloc,
cached_sorter.deref_mut(),
cached_sorter,
BalancedCaches::insert_del_u32,
&mut del_add_facet_value,
del_add_facet_value,
DelAddFacetValue::insert_del,
docid,
fid,
@ -124,19 +126,12 @@ impl FacetedDocidsExtractor {
value,
)
},
)?;
extract_document_facets(
attributes_to_extract,
inner.merged(rtxn, index, context.db_fields_ids_map)?,
inner.external_document_id(),
new_fields_ids_map.deref_mut(),
&mut |fid, depth, value| {
&mut |fid, depth, value, del_add_facet_value, cached_sorter| {
Self::facet_fn_with_options(
&context.doc_alloc,
cached_sorter.deref_mut(),
cached_sorter,
BalancedCaches::insert_add_u32,
&mut del_add_facet_value,
del_add_facet_value,
DelAddFacetValue::insert_add,
docid,
fid,
@ -282,7 +277,7 @@ impl FacetedDocidsExtractor {
}
}
struct DelAddFacetValue<'doc> {
pub(crate) struct DelAddFacetValue<'doc> {
strings: HashMap<(FieldId, BVec<'doc, u8>), DelAdd, hashbrown::DefaultHashBuilder, &'doc Bump>,
f64s: HashMap<(FieldId, BVec<'doc, u8>), DelAdd, hashbrown::DefaultHashBuilder, &'doc Bump>,
}

View File

@ -1,9 +1,11 @@
use serde_json::value::RawValue;
use serde_json::Value;
use crate::update::new::document::Document;
use super::extract_facets::DelAddFacetValue;
use crate::update::new::document::{DeltaDocument, DeltaValue, Document};
use crate::update::new::extract::geo::extract_geo_coordinates;
use crate::update::new::extract::perm_json_p;
use crate::{FieldId, GlobalFieldsIdsMap, InternalError, Result, UserError};
use crate::update::new::extract::{perm_json_p, BalancedCaches};
use crate::{FieldId, FieldsIdsMap, GlobalFieldsIdsMap, InternalError, Result, UserError};
pub fn extract_document_facets<'doc>(
attributes_to_extract: &[&str],
@ -15,58 +17,7 @@ pub fn extract_document_facets<'doc>(
for res in document.iter_top_level_fields() {
let (field_name, value) = res?;
let mut tokenize_field =
|name: &str, depth: perm_json_p::Depth, value: &Value| match field_id_map
.id_or_insert(name)
{
Some(field_id) => facet_fn(field_id, depth, value),
None => Err(UserError::AttributeLimitReached.into()),
};
// if the current field is searchable or contains a searchable attribute
let selection = perm_json_p::select_field(field_name, Some(attributes_to_extract), &[]);
if selection != perm_json_p::Selection::Skip {
// parse json.
match serde_json::value::to_value(value).map_err(InternalError::SerdeJson)? {
Value::Object(object) => {
perm_json_p::seek_leaf_values_in_object(
&object,
Some(attributes_to_extract),
&[], // skip no attributes
field_name,
perm_json_p::Depth::OnBaseKey,
&mut tokenize_field,
)?;
if selection == perm_json_p::Selection::Select {
tokenize_field(
field_name,
perm_json_p::Depth::OnBaseKey,
&Value::Object(object),
)?;
}
}
Value::Array(array) => {
perm_json_p::seek_leaf_values_in_array(
&array,
Some(attributes_to_extract),
&[], // skip no attributes
field_name,
perm_json_p::Depth::OnBaseKey,
&mut tokenize_field,
)?;
if selection == perm_json_p::Selection::Select {
tokenize_field(
field_name,
perm_json_p::Depth::OnBaseKey,
&Value::Array(array),
)?;
}
}
value => tokenize_field(field_name, perm_json_p::Depth::OnBaseKey, &value)?,
}
}
extract_document_facet(attributes_to_extract, field_id_map, facet_fn, field_name, value)?;
}
if attributes_to_extract.contains(&"_geo") {
@ -85,3 +36,199 @@ pub fn extract_document_facets<'doc>(
Ok(())
}
fn extract_document_facet(
attributes_to_extract: &[&str],
field_id_map: &mut GlobalFieldsIdsMap<'_>,
facet_fn: &mut impl FnMut(u16, perm_json_p::Depth, &Value) -> std::result::Result<(), crate::Error>,
field_name: &str,
value: &serde_json::value::RawValue,
) -> Result<()> {
let mut tokenize_field = |name: &str, depth: perm_json_p::Depth, value: &Value| {
match field_id_map.id_or_insert(name) {
Some(field_id) => facet_fn(field_id, depth, value),
None => Err(UserError::AttributeLimitReached.into()),
}
};
let selection = perm_json_p::select_field(field_name, Some(attributes_to_extract), &[]);
if selection != perm_json_p::Selection::Skip {
// parse json.
match serde_json::value::to_value(value).map_err(InternalError::SerdeJson)? {
Value::Object(object) => {
perm_json_p::seek_leaf_values_in_object(
&object,
Some(attributes_to_extract),
&[], // skip no attributes
field_name,
perm_json_p::Depth::OnBaseKey,
&mut tokenize_field,
)?;
if selection == perm_json_p::Selection::Select {
tokenize_field(
field_name,
perm_json_p::Depth::OnBaseKey,
&Value::Object(object),
)?;
}
}
Value::Array(array) => {
perm_json_p::seek_leaf_values_in_array(
&array,
Some(attributes_to_extract),
&[], // skip no attributes
field_name,
perm_json_p::Depth::OnBaseKey,
&mut tokenize_field,
)?;
if selection == perm_json_p::Selection::Select {
tokenize_field(
field_name,
perm_json_p::Depth::OnBaseKey,
&Value::Array(array),
)?;
}
}
value => tokenize_field(field_name, perm_json_p::Depth::OnBaseKey, &value)?,
}
};
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub fn extract_merged_document_facets<'doc, 'del_add_facet_value, 'cache>(
attributes_to_extract: &[&str],
document: DeltaDocument<'doc, 'doc, 'doc, FieldsIdsMap>,
external_document_id: &str,
del_add_facet_value: &mut DelAddFacetValue<'del_add_facet_value>,
cached_sorter: &mut BalancedCaches<'cache>,
field_id_map: &mut GlobalFieldsIdsMap,
facet_fn_del: &mut impl FnMut(
FieldId,
perm_json_p::Depth,
&Value,
&mut DelAddFacetValue<'del_add_facet_value>,
&mut BalancedCaches<'cache>,
) -> Result<()>,
facet_fn_add: &mut impl FnMut(
FieldId,
perm_json_p::Depth,
&Value,
&mut DelAddFacetValue<'del_add_facet_value>,
&mut BalancedCaches<'cache>,
) -> Result<()>,
) -> Result<()> {
for res in document.delta_top_level_fields() {
let (field_name, value) = res?;
match value {
DeltaValue::Deleted(value) => {
extract_document_facet(
attributes_to_extract,
field_id_map,
&mut |fid, depth, value| {
facet_fn_del(fid, depth, value, del_add_facet_value, cached_sorter)
},
field_name,
value,
)?;
}
DeltaValue::Inserted(value) => {
extract_document_facet(
attributes_to_extract,
field_id_map,
&mut |fid, depth, value| {
facet_fn_add(fid, depth, value, del_add_facet_value, cached_sorter)
},
field_name,
value,
)?;
}
DeltaValue::Modified(current, updated) => {
extract_document_facet(
attributes_to_extract,
field_id_map,
&mut |fid, depth, value| {
facet_fn_del(fid, depth, value, del_add_facet_value, cached_sorter)
},
field_name,
current,
)?;
extract_document_facet(
attributes_to_extract,
field_id_map,
&mut |fid, depth, value| {
facet_fn_add(fid, depth, value, del_add_facet_value, cached_sorter)
},
field_name,
updated,
)?;
}
DeltaValue::Unchanged(_) => {}
}
}
if attributes_to_extract.contains(&"_geo") {
match document.delta_geo_field()? {
Some(DeltaValue::Deleted(deleted)) => {
extract_geo_facet(
external_document_id,
deleted,
field_id_map,
&mut |fid, depth, value| {
facet_fn_del(fid, depth, value, del_add_facet_value, cached_sorter)
},
)?;
}
Some(DeltaValue::Inserted(inserted)) => {
extract_geo_facet(
external_document_id,
inserted,
field_id_map,
&mut |fid, depth, value| {
facet_fn_add(fid, depth, value, del_add_facet_value, cached_sorter)
},
)?;
}
Some(DeltaValue::Modified(current, updated)) => {
extract_geo_facet(
external_document_id,
current,
field_id_map,
&mut |fid, depth, value| {
facet_fn_del(fid, depth, value, del_add_facet_value, cached_sorter)
},
)?;
extract_geo_facet(
external_document_id,
updated,
field_id_map,
&mut |fid, depth, value| {
facet_fn_add(fid, depth, value, del_add_facet_value, cached_sorter)
},
)?;
}
None | Some(DeltaValue::Unchanged(_)) => {}
}
}
Ok(())
}
fn extract_geo_facet(
external_document_id: &str,
geo_value: &RawValue,
field_id_map: &mut GlobalFieldsIdsMap<'_>,
facet_fn: &mut impl FnMut(FieldId, perm_json_p::Depth, &Value) -> Result<()>,
) -> Result<()> {
if let Some([lat, lng]) = extract_geo_coordinates(external_document_id, geo_value)? {
let (lat_fid, lng_fid) = field_id_map
.id_or_insert("_geo.lat")
.zip(field_id_map.id_or_insert("_geo.lng"))
.ok_or(UserError::AttributeLimitReached)?;
facet_fn(lat_fid, perm_json_p::Depth::OnBaseKey, &lat.into())?;
facet_fn(lng_fid, perm_json_p::Depth::OnBaseKey, &lng.into())?;
};
Ok(())
}