Take updates into account to distinguish between values in the DB that are unchanged and values that are deleted

This commit is contained in:
Louis Dureuil 2024-12-07 15:14:12 +01:00
parent b365150137
commit bd686991b9
No known key found for this signature in database
3 changed files with 80 additions and 59 deletions

View File

@ -202,6 +202,7 @@ pub struct MergedDocument<'a, 'doc, 't, Mapper: FieldIdMapper> {
pub struct DeltaDocument<'a, 'doc, 't, Mapper: FieldIdMapper> { pub struct DeltaDocument<'a, 'doc, 't, Mapper: FieldIdMapper> {
new_doc: DocumentFromVersions<'a, 'doc>, new_doc: DocumentFromVersions<'a, 'doc>,
db_doc: Option<DocumentFromDb<'t, Mapper>>, db_doc: Option<DocumentFromDb<'t, Mapper>>,
has_deletion: bool,
} }
impl<'a, 'doc, 't, Mapper: FieldIdMapper> DeltaDocument<'a, 'doc, 't, Mapper> { impl<'a, 'doc, 't, Mapper: FieldIdMapper> DeltaDocument<'a, 'doc, 't, Mapper> {
@ -211,17 +212,18 @@ impl<'a, 'doc, 't, Mapper: FieldIdMapper> DeltaDocument<'a, 'doc, 't, Mapper> {
index: &'t Index, index: &'t Index,
db_fields_ids_map: &'t Mapper, db_fields_ids_map: &'t Mapper,
new_doc: DocumentFromVersions<'a, 'doc>, new_doc: DocumentFromVersions<'a, 'doc>,
has_deletion: bool,
) -> Result<Self> { ) -> Result<Self> {
let db_doc = DocumentFromDb::new(docid, rtxn, index, db_fields_ids_map)?; let db_doc = DocumentFromDb::new(docid, rtxn, index, db_fields_ids_map)?;
Ok(Self { db_doc, new_doc }) Ok(Self { db_doc, new_doc, has_deletion })
} }
pub fn delta_top_level_fields<'d>( pub fn delta_top_level_fields<'either>(
&self, &self,
) -> impl Iterator<Item = Result<(&'d str, DeltaValue<'t, 'doc>)>> + '_ ) -> impl Iterator<Item = Result<(&'either str, DeltaValue<'either, 't, 'doc>)>> + '_
where where
't: 'd, 't: 'either,
'doc: 'd, 'doc: 'either,
{ {
match &self.db_doc { match &self.db_doc {
// since we'll be returning all db top level fields, it makes more sense to iterate on the db first: // since we'll be returning all db top level fields, it makes more sense to iterate on the db first:
@ -232,6 +234,7 @@ impl<'a, 'doc, 't, Mapper: FieldIdMapper> DeltaDocument<'a, 'doc, 't, Mapper> {
let mut db_iter = db_doc.iter_top_level_fields_with_fid(); let mut db_iter = db_doc.iter_top_level_fields_with_fid();
let fid_map = db_doc.fields_ids_map; let fid_map = db_doc.fields_ids_map;
let mut seen_fields = RoaringBitmap::new(); let mut seen_fields = RoaringBitmap::new();
let has_deletion = self.has_deletion;
Either::Left(std::iter::from_fn(move || { Either::Left(std::iter::from_fn(move || {
if let Some(entry) = db_iter.next() { if let Some(entry) = db_iter.next() {
@ -247,12 +250,22 @@ impl<'a, 'doc, 't, Mapper: FieldIdMapper> DeltaDocument<'a, 'doc, 't, Mapper> {
match new_value { match new_value {
Some(new_value) => { Some(new_value) => {
return Some(Ok(( if new_value.get() == db_value.get() {
name, return Some(Ok((name, DeltaValue::Unchanged(new_value))));
DeltaValue::CurrentAndUpdated(db_value, new_value), } else {
))) return Some(Ok((
name,
DeltaValue::Modified(db_value, new_value),
)));
}
}
None => {
if has_deletion {
return Some(Ok((name, DeltaValue::Deleted(db_value))));
} else {
return Some(Ok((name, DeltaValue::Unchanged(db_value))));
}
} }
None => return Some(Ok((name, DeltaValue::Current(db_value)))),
} }
} }
{ {
@ -265,7 +278,7 @@ impl<'a, 'doc, 't, Mapper: FieldIdMapper> DeltaDocument<'a, 'doc, 't, Mapper> {
true true
})? { })? {
Ok((name, new_value)) => { Ok((name, new_value)) => {
Some(Ok((name, DeltaValue::Updated(new_value)))) Some(Ok((name, DeltaValue::Inserted(new_value))))
} }
Err(err) => Some(Err(err)), Err(err) => Some(Err(err)),
} }
@ -274,12 +287,12 @@ impl<'a, 'doc, 't, Mapper: FieldIdMapper> DeltaDocument<'a, 'doc, 't, Mapper> {
} }
None => Either::Right(self.new_doc.iter_top_level_fields().map(|res| { None => Either::Right(self.new_doc.iter_top_level_fields().map(|res| {
let (k, v) = res?; let (k, v) = res?;
Ok((k, DeltaValue::Updated(v))) Ok((k, DeltaValue::Inserted(v)))
})), })),
} }
} }
pub fn delta_geo_field(&self) -> Result<Option<DeltaValue<'t, 'doc>>> { pub fn delta_geo_field(&self) -> Result<Option<DeltaValue<'_, 't, 'doc>>> {
let db_geo_field = match self.db_doc { let db_geo_field = match self.db_doc {
Some(db) => db.geo_field()?, Some(db) => db.geo_field()?,
None => None, None => None,
@ -289,9 +302,13 @@ impl<'a, 'doc, 't, Mapper: FieldIdMapper> DeltaDocument<'a, 'doc, 't, Mapper> {
Ok(match (db_geo_field, new_doc_geo_field) { Ok(match (db_geo_field, new_doc_geo_field) {
(None, None) => None, (None, None) => None,
(None, Some(new_doc)) => Some(DeltaValue::Updated(new_doc)), (None, Some(new_doc)) => Some(DeltaValue::Inserted(new_doc)),
(Some(db), None) => Some(DeltaValue::Current(db)), (Some(db), None) => Some(if self.has_deletion {
(Some(db), Some(new_doc)) => Some(DeltaValue::CurrentAndUpdated(db, new_doc)), DeltaValue::Deleted(db)
} else {
DeltaValue::Unchanged(db)
}),
(Some(db), Some(new_doc)) => Some(DeltaValue::Modified(db, new_doc)),
}) })
} }
} }
@ -313,10 +330,11 @@ impl<'a, 'doc, 't, Mapper: FieldIdMapper> MergedDocument<'a, 'doc, 't, Mapper> {
} }
} }
pub enum DeltaValue<'t, 'doc> { pub enum DeltaValue<'either, 't: 'either, 'doc: 'either> {
Current(&'t RawValue), Deleted(&'t RawValue),
Updated(&'doc RawValue), Inserted(&'doc RawValue),
CurrentAndUpdated(&'t RawValue, &'doc RawValue), Modified(&'t RawValue, &'doc RawValue),
Unchanged(&'either RawValue),
} }
impl<'d, 'doc: 'd, 't: 'd, Mapper: FieldIdMapper> Document<'d> impl<'d, 'doc: 'd, 't: 'd, Mapper: FieldIdMapper> Document<'d>

View File

@ -173,7 +173,14 @@ impl<'doc> Update<'doc> {
index: &'t Index, index: &'t Index,
mapper: &'t Mapper, mapper: &'t Mapper,
) -> Result<DeltaDocument<'_, 'doc, 't, Mapper>> { ) -> Result<DeltaDocument<'_, 'doc, 't, Mapper>> {
DeltaDocument::new(self.docid, rtxn, index, mapper, DocumentFromVersions::new(&self.new)) DeltaDocument::new(
self.docid,
rtxn,
index,
mapper,
DocumentFromVersions::new(&self.new),
self.has_deletion,
)
} }
/// Returns whether the updated version of the document is different from the current version for the passed subset of fields. /// Returns whether the updated version of the document is different from the current version for the passed subset of fields.

View File

@ -104,14 +104,14 @@ pub fn extract_merged_document_facets<'doc, 'del_add_facet_value, 'cache>(
del_add_facet_value: &mut DelAddFacetValue<'del_add_facet_value>, del_add_facet_value: &mut DelAddFacetValue<'del_add_facet_value>,
cached_sorter: &mut BalancedCaches<'cache>, cached_sorter: &mut BalancedCaches<'cache>,
field_id_map: &mut GlobalFieldsIdsMap, field_id_map: &mut GlobalFieldsIdsMap,
facet_fn_current: &mut impl FnMut( facet_fn_del: &mut impl FnMut(
FieldId, FieldId,
perm_json_p::Depth, perm_json_p::Depth,
&Value, &Value,
&mut DelAddFacetValue<'del_add_facet_value>, &mut DelAddFacetValue<'del_add_facet_value>,
&mut BalancedCaches<'cache>, &mut BalancedCaches<'cache>,
) -> Result<()>, ) -> Result<()>,
facet_fn_updated: &mut impl FnMut( facet_fn_add: &mut impl FnMut(
FieldId, FieldId,
perm_json_p::Depth, perm_json_p::Depth,
&Value, &Value,
@ -122,37 +122,34 @@ pub fn extract_merged_document_facets<'doc, 'del_add_facet_value, 'cache>(
for res in document.delta_top_level_fields() { for res in document.delta_top_level_fields() {
let (field_name, value) = res?; let (field_name, value) = res?;
match value { match value {
DeltaValue::Current(value) => { DeltaValue::Deleted(value) => {
extract_document_facet( extract_document_facet(
attributes_to_extract, attributes_to_extract,
field_id_map, field_id_map,
&mut |fid, depth, value| { &mut |fid, depth, value| {
facet_fn_current(fid, depth, value, del_add_facet_value, cached_sorter) facet_fn_del(fid, depth, value, del_add_facet_value, cached_sorter)
}, },
field_name, field_name,
value, value,
)?; )?;
} }
DeltaValue::Updated(value) => { DeltaValue::Inserted(value) => {
extract_document_facet( extract_document_facet(
attributes_to_extract, attributes_to_extract,
field_id_map, field_id_map,
&mut |fid, depth, value| { &mut |fid, depth, value| {
facet_fn_updated(fid, depth, value, del_add_facet_value, cached_sorter) facet_fn_add(fid, depth, value, del_add_facet_value, cached_sorter)
}, },
field_name, field_name,
value, value,
)?; )?;
} }
DeltaValue::CurrentAndUpdated(current, updated) => { DeltaValue::Modified(current, updated) => {
if current.get() == updated.get() {
continue;
}
extract_document_facet( extract_document_facet(
attributes_to_extract, attributes_to_extract,
field_id_map, field_id_map,
&mut |fid, depth, value| { &mut |fid, depth, value| {
facet_fn_current(fid, depth, value, del_add_facet_value, cached_sorter) facet_fn_del(fid, depth, value, del_add_facet_value, cached_sorter)
}, },
field_name, field_name,
current, current,
@ -161,46 +158,45 @@ pub fn extract_merged_document_facets<'doc, 'del_add_facet_value, 'cache>(
attributes_to_extract, attributes_to_extract,
field_id_map, field_id_map,
&mut |fid, depth, value| { &mut |fid, depth, value| {
facet_fn_updated(fid, depth, value, del_add_facet_value, cached_sorter) facet_fn_add(fid, depth, value, del_add_facet_value, cached_sorter)
}, },
field_name, field_name,
updated, updated,
)?; )?;
} }
DeltaValue::Unchanged(_) => {}
} }
} }
if attributes_to_extract.contains(&"_geo") { if attributes_to_extract.contains(&"_geo") {
match document.delta_geo_field()? { match document.delta_geo_field()? {
Some(DeltaValue::Current(current)) => { Some(DeltaValue::Deleted(deleted)) => {
extract_geo_facet(
external_document_id,
deleted,
field_id_map,
&mut |fid, depth, value| {
facet_fn_del(fid, depth, value, del_add_facet_value, cached_sorter)
},
)?;
}
Some(DeltaValue::Inserted(inserted)) => {
extract_geo_facet(
external_document_id,
inserted,
field_id_map,
&mut |fid, depth, value| {
facet_fn_add(fid, depth, value, del_add_facet_value, cached_sorter)
},
)?;
}
Some(DeltaValue::Modified(current, updated)) => {
extract_geo_facet( extract_geo_facet(
external_document_id, external_document_id,
current, current,
field_id_map, field_id_map,
&mut |fid, depth, value| { &mut |fid, depth, value| {
facet_fn_current(fid, depth, value, del_add_facet_value, cached_sorter) facet_fn_del(fid, depth, value, del_add_facet_value, cached_sorter)
},
)?;
}
Some(DeltaValue::Updated(updated)) => {
extract_geo_facet(
external_document_id,
updated,
field_id_map,
&mut |fid, depth, value| {
facet_fn_updated(fid, depth, value, del_add_facet_value, cached_sorter)
},
)?;
}
Some(DeltaValue::CurrentAndUpdated(current, updated))
if current.get() != updated.get() =>
{
extract_geo_facet(
external_document_id,
current,
field_id_map,
&mut |fid, depth, value| {
facet_fn_current(fid, depth, value, del_add_facet_value, cached_sorter)
}, },
)?; )?;
extract_geo_facet( extract_geo_facet(
@ -208,11 +204,11 @@ pub fn extract_merged_document_facets<'doc, 'del_add_facet_value, 'cache>(
updated, updated,
field_id_map, field_id_map,
&mut |fid, depth, value| { &mut |fid, depth, value| {
facet_fn_updated(fid, depth, value, del_add_facet_value, cached_sorter) facet_fn_add(fid, depth, value, del_add_facet_value, cached_sorter)
}, },
)?; )?;
} }
None | Some(DeltaValue::CurrentAndUpdated(_, _)) => {} None | Some(DeltaValue::Unchanged(_)) => {}
} }
} }