Delta document

This commit is contained in:
Louis Dureuil 2024-12-07 14:45:39 +01:00
parent e580a8e27c
commit 05179d1264
No known key found for this signature in database
2 changed files with 114 additions and 74 deletions

View File

@ -2,13 +2,14 @@ use std::collections::{BTreeMap, BTreeSet};
use heed::RoTxn;
use raw_collections::RawMap;
use roaring::RoaringBitmap;
use serde_json::value::RawValue;
use super::vector_document::VectorDocument;
use super::{KvReaderFieldId, KvWriterFieldId};
use crate::documents::FieldIdMapper;
use crate::vector::parsed_vectors::RESERVED_VECTORS_FIELD_NAME;
use crate::{DocumentId, GlobalFieldsIdsMap, Index, InternalError, Result, UserError};
use crate::{DocumentId, FieldId, GlobalFieldsIdsMap, Index, InternalError, Result, UserError};
/// A view into a document that can represent either the current version from the DB,
/// the update data from payload or other means, or the merged updated version.
@ -188,12 +189,113 @@ impl<'a, 'doc> Document<'doc> for DocumentFromVersions<'a, 'doc> {
}
}
/// A [`Document`] whose fields value are the [`DocumentFromVersions`] value if exists,
/// or else the [`DocumentFromDb`] value.
#[derive(Debug)]
pub struct MergedDocument<'a, 'doc, 't, Mapper: FieldIdMapper> {
new_doc: DocumentFromVersions<'a, 'doc>,
db: Option<DocumentFromDb<'t, Mapper>>,
}
/// A pseudo-document that returns [`DeltaValue`]s.
#[derive(Debug)]
pub struct DeltaDocument<'a, 'doc, 't, Mapper: FieldIdMapper> {
new_doc: DocumentFromVersions<'a, 'doc>,
db_doc: Option<DocumentFromDb<'t, Mapper>>,
}
impl<'a, 'doc, 't, Mapper: FieldIdMapper> DeltaDocument<'a, 'doc, 't, Mapper> {
pub fn new(
docid: DocumentId,
rtxn: &'t RoTxn,
index: &'t Index,
db_fields_ids_map: &'t Mapper,
new_doc: DocumentFromVersions<'a, 'doc>,
) -> Result<Self> {
let db_doc = DocumentFromDb::new(docid, rtxn, index, db_fields_ids_map)?;
Ok(Self { db_doc, new_doc })
}
pub fn delta_top_level_fields<'d>(
&self,
) -> impl Iterator<Item = Result<(&'d str, DeltaValue<'t, 'doc>)>> + '_
where
't: 'd,
'doc: 'd,
{
match &self.db_doc {
// since we'll be returning all db top level fields, it makes more sense to iterate on the db first:
// 1. random field access is faster on RawMap than on obkvs
// 2. we can store a roaring of fid instead of btree set of fields
Some(db_doc) => {
let mut new_doc_it = self.new_doc.iter_top_level_fields();
let mut db_iter = db_doc.iter_top_level_fields_with_fid();
let fid_map = db_doc.fields_ids_map;
let mut seen_fields = RoaringBitmap::new();
Either::Left(std::iter::from_fn(move || {
if let Some(entry) = db_iter.next() {
let (fid, name, db_value) = match entry {
Ok(entry) => entry,
Err(err) => return Some(Err(err)),
};
seen_fields.insert(fid.into());
let new_value = match self.new_doc.top_level_field(name) {
Ok(new_value) => new_value,
Err(err) => return Some(Err(err)),
};
match new_value {
Some(new_value) => {
return Some(Ok((
name,
DeltaValue::CurrentAndUpdated(db_value, new_value),
)))
}
None => return Some(Ok((name, DeltaValue::Current(db_value)))),
}
}
{
match new_doc_it.by_ref().find(|res| {
if let Ok((name, _)) = res {
if let Some(fid) = fid_map.id(name) {
return !seen_fields.contains(fid.into());
}
}
true
})? {
Ok((name, new_value)) => {
Some(Ok((name, DeltaValue::Updated(new_value))))
}
Err(err) => Some(Err(err)),
}
}
}))
}
None => Either::Right(self.new_doc.iter_top_level_fields().map(|res| {
let (k, v) = res?;
Ok((k, DeltaValue::Updated(v)))
})),
}
}
pub fn delta_geo_field(&self) -> Result<Option<DeltaValue<'t, 'doc>>> {
let db_geo_field = match self.db_doc {
Some(db) => db.geo_field()?,
None => None,
};
let new_doc_geo_field = self.new_doc.geo_field()?;
Ok(match (db_geo_field, new_doc_geo_field) {
(None, None) => None,
(None, Some(new_doc)) => Some(DeltaValue::Updated(new_doc)),
(Some(db), None) => Some(DeltaValue::Current(db)),
(Some(db), Some(new_doc)) => Some(DeltaValue::CurrentAndUpdated(db, new_doc)),
})
}
}
impl<'a, 'doc, 't, Mapper: FieldIdMapper> MergedDocument<'a, 'doc, 't, Mapper> {
pub fn with_db(
docid: DocumentId,
@ -209,80 +311,9 @@ impl<'a, 'doc, 't, Mapper: FieldIdMapper> MergedDocument<'a, 'doc, 't, Mapper> {
pub fn without_db(new_doc: DocumentFromVersions<'a, 'doc>) -> Self {
Self { new_doc, db: None }
}
pub fn iter_merged_top_level_fields<'d>(
&self,
) -> impl Iterator<Item = Result<(&'d str, MergedValue<'t, 'doc>)>> + '_
where
't: 'd,
'doc: 'd,
{
match &self.db {
Some(db) => {
let mut new_doc_it = self.new_doc.iter_top_level_fields();
let mut db_it = db.iter_top_level_fields();
let mut seen_fields = BTreeSet::new();
Either::Left(std::iter::from_fn(move || {
if let Some(next) = new_doc_it.next() {
let (name, updated_value) = match next {
Ok((name, updated_value)) => (name, updated_value),
Err(err) => return Some(Err(err)),
};
seen_fields.insert(name);
let current = match db.top_level_field(name) {
Ok(current) => current,
Err(err) => return Some(Err(err)),
};
match current {
Some(current) => {
return Some(Ok((
name,
MergedValue::CurrentAndUpdated(current, updated_value),
)))
}
None => return Some(Ok((name, MergedValue::Updated(updated_value)))),
}
}
loop {
match db_it.next()? {
Ok((name, value)) => {
if seen_fields.contains(name) {
continue;
}
return Some(Ok((name, MergedValue::Current(value))));
}
Err(err) => return Some(Err(err)),
}
}
}))
}
None => Either::Right(self.new_doc.iter_top_level_fields().map(|res| {
let (k, v) = res?;
Ok((k, MergedValue::Updated(v)))
})),
}
}
pub fn merged_geo_field(&self) -> Result<Option<MergedValue<'t, 'doc>>> {
let db_geo_field = match self.db {
Some(db) => db.geo_field()?,
None => None,
};
let new_doc_geo_field = self.new_doc.geo_field()?;
Ok(match (db_geo_field, new_doc_geo_field) {
(None, None) => None,
(None, Some(new_doc)) => Some(MergedValue::Updated(new_doc)),
(Some(db), None) => Some(MergedValue::Current(db)),
(Some(db), Some(new_doc)) => Some(MergedValue::CurrentAndUpdated(db, new_doc)),
})
}
}
pub enum MergedValue<'t, 'doc> {
pub enum DeltaValue<'t, 'doc> {
Current(&'t RawValue),
Updated(&'doc RawValue),
CurrentAndUpdated(&'t RawValue, &'doc RawValue),

View File

@ -2,7 +2,7 @@ use bumpalo::Bump;
use heed::RoTxn;
use super::document::{
Document as _, DocumentFromDb, DocumentFromVersions, MergedDocument, Versions,
DeltaDocument, Document as _, DocumentFromDb, DocumentFromVersions, MergedDocument, Versions,
};
use super::extract::perm_json_p;
use super::vector_document::{
@ -167,6 +167,15 @@ impl<'doc> Update<'doc> {
}
}
pub fn delta<'t, Mapper: FieldIdMapper>(
&self,
rtxn: &'t RoTxn,
index: &'t Index,
mapper: &'t Mapper,
) -> Result<DeltaDocument<'_, 'doc, 't, Mapper>> {
DeltaDocument::new(self.docid, rtxn, index, mapper, DocumentFromVersions::new(&self.new))
}
/// Returns whether the updated version of the document is different from the current version for the passed subset of fields.
///
/// `true` if at least one top-level-field that is a exactly a member of field or a parent of a member of field changed.