From 05179d1264e062e6eca045662906f0aa58762557 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Sat, 7 Dec 2024 14:45:39 +0100 Subject: [PATCH] Delta document --- crates/milli/src/update/new/document.rs | 177 ++++++++++-------- .../milli/src/update/new/document_change.rs | 11 +- 2 files changed, 114 insertions(+), 74 deletions(-) diff --git a/crates/milli/src/update/new/document.rs b/crates/milli/src/update/new/document.rs index 84501f125..4442155c9 100644 --- a/crates/milli/src/update/new/document.rs +++ b/crates/milli/src/update/new/document.rs @@ -2,13 +2,14 @@ use std::collections::{BTreeMap, BTreeSet}; use heed::RoTxn; use raw_collections::RawMap; +use roaring::RoaringBitmap; use serde_json::value::RawValue; use super::vector_document::VectorDocument; use super::{KvReaderFieldId, KvWriterFieldId}; use crate::documents::FieldIdMapper; use crate::vector::parsed_vectors::RESERVED_VECTORS_FIELD_NAME; -use crate::{DocumentId, GlobalFieldsIdsMap, Index, InternalError, Result, UserError}; +use crate::{DocumentId, FieldId, GlobalFieldsIdsMap, Index, InternalError, Result, UserError}; /// A view into a document that can represent either the current version from the DB, /// the update data from payload or other means, or the merged updated version. @@ -188,12 +189,113 @@ impl<'a, 'doc> Document<'doc> for DocumentFromVersions<'a, 'doc> { } } +/// A [`Document`] whose fields value are the [`DocumentFromVersions`] value if exists, +/// or else the [`DocumentFromDb`] value. #[derive(Debug)] pub struct MergedDocument<'a, 'doc, 't, Mapper: FieldIdMapper> { new_doc: DocumentFromVersions<'a, 'doc>, db: Option>, } +/// A pseudo-document that returns [`DeltaValue`]s. +#[derive(Debug)] +pub struct DeltaDocument<'a, 'doc, 't, Mapper: FieldIdMapper> { + new_doc: DocumentFromVersions<'a, 'doc>, + db_doc: Option>, +} + +impl<'a, 'doc, 't, Mapper: FieldIdMapper> DeltaDocument<'a, 'doc, 't, Mapper> { + pub fn new( + docid: DocumentId, + rtxn: &'t RoTxn, + index: &'t Index, + db_fields_ids_map: &'t Mapper, + new_doc: DocumentFromVersions<'a, 'doc>, + ) -> Result { + let db_doc = DocumentFromDb::new(docid, rtxn, index, db_fields_ids_map)?; + Ok(Self { db_doc, new_doc }) + } + + pub fn delta_top_level_fields<'d>( + &self, + ) -> impl Iterator)>> + '_ + where + 't: 'd, + 'doc: 'd, + { + match &self.db_doc { + // since we'll be returning all db top level fields, it makes more sense to iterate on the db first: + // 1. random field access is faster on RawMap than on obkvs + // 2. we can store a roaring of fid instead of btree set of fields + Some(db_doc) => { + let mut new_doc_it = self.new_doc.iter_top_level_fields(); + let mut db_iter = db_doc.iter_top_level_fields_with_fid(); + let fid_map = db_doc.fields_ids_map; + let mut seen_fields = RoaringBitmap::new(); + + Either::Left(std::iter::from_fn(move || { + if let Some(entry) = db_iter.next() { + let (fid, name, db_value) = match entry { + Ok(entry) => entry, + Err(err) => return Some(Err(err)), + }; + seen_fields.insert(fid.into()); + let new_value = match self.new_doc.top_level_field(name) { + Ok(new_value) => new_value, + Err(err) => return Some(Err(err)), + }; + + match new_value { + Some(new_value) => { + return Some(Ok(( + name, + DeltaValue::CurrentAndUpdated(db_value, new_value), + ))) + } + None => return Some(Ok((name, DeltaValue::Current(db_value)))), + } + } + { + match new_doc_it.by_ref().find(|res| { + if let Ok((name, _)) = res { + if let Some(fid) = fid_map.id(name) { + return !seen_fields.contains(fid.into()); + } + } + true + })? { + Ok((name, new_value)) => { + Some(Ok((name, DeltaValue::Updated(new_value)))) + } + Err(err) => Some(Err(err)), + } + } + })) + } + None => Either::Right(self.new_doc.iter_top_level_fields().map(|res| { + let (k, v) = res?; + Ok((k, DeltaValue::Updated(v))) + })), + } + } + + pub fn delta_geo_field(&self) -> Result>> { + let db_geo_field = match self.db_doc { + Some(db) => db.geo_field()?, + None => None, + }; + + let new_doc_geo_field = self.new_doc.geo_field()?; + + Ok(match (db_geo_field, new_doc_geo_field) { + (None, None) => None, + (None, Some(new_doc)) => Some(DeltaValue::Updated(new_doc)), + (Some(db), None) => Some(DeltaValue::Current(db)), + (Some(db), Some(new_doc)) => Some(DeltaValue::CurrentAndUpdated(db, new_doc)), + }) + } +} + impl<'a, 'doc, 't, Mapper: FieldIdMapper> MergedDocument<'a, 'doc, 't, Mapper> { pub fn with_db( docid: DocumentId, @@ -209,80 +311,9 @@ impl<'a, 'doc, 't, Mapper: FieldIdMapper> MergedDocument<'a, 'doc, 't, Mapper> { pub fn without_db(new_doc: DocumentFromVersions<'a, 'doc>) -> Self { Self { new_doc, db: None } } - - pub fn iter_merged_top_level_fields<'d>( - &self, - ) -> impl Iterator)>> + '_ - where - 't: 'd, - 'doc: 'd, - { - match &self.db { - Some(db) => { - let mut new_doc_it = self.new_doc.iter_top_level_fields(); - let mut db_it = db.iter_top_level_fields(); - let mut seen_fields = BTreeSet::new(); - - Either::Left(std::iter::from_fn(move || { - if let Some(next) = new_doc_it.next() { - let (name, updated_value) = match next { - Ok((name, updated_value)) => (name, updated_value), - Err(err) => return Some(Err(err)), - }; - seen_fields.insert(name); - let current = match db.top_level_field(name) { - Ok(current) => current, - Err(err) => return Some(Err(err)), - }; - - match current { - Some(current) => { - return Some(Ok(( - name, - MergedValue::CurrentAndUpdated(current, updated_value), - ))) - } - None => return Some(Ok((name, MergedValue::Updated(updated_value)))), - } - } - loop { - match db_it.next()? { - Ok((name, value)) => { - if seen_fields.contains(name) { - continue; - } - return Some(Ok((name, MergedValue::Current(value)))); - } - Err(err) => return Some(Err(err)), - } - } - })) - } - None => Either::Right(self.new_doc.iter_top_level_fields().map(|res| { - let (k, v) = res?; - Ok((k, MergedValue::Updated(v))) - })), - } - } - - pub fn merged_geo_field(&self) -> Result>> { - let db_geo_field = match self.db { - Some(db) => db.geo_field()?, - None => None, - }; - - let new_doc_geo_field = self.new_doc.geo_field()?; - - Ok(match (db_geo_field, new_doc_geo_field) { - (None, None) => None, - (None, Some(new_doc)) => Some(MergedValue::Updated(new_doc)), - (Some(db), None) => Some(MergedValue::Current(db)), - (Some(db), Some(new_doc)) => Some(MergedValue::CurrentAndUpdated(db, new_doc)), - }) - } } -pub enum MergedValue<'t, 'doc> { +pub enum DeltaValue<'t, 'doc> { Current(&'t RawValue), Updated(&'doc RawValue), CurrentAndUpdated(&'t RawValue, &'doc RawValue), diff --git a/crates/milli/src/update/new/document_change.rs b/crates/milli/src/update/new/document_change.rs index 1644b2254..abc538533 100644 --- a/crates/milli/src/update/new/document_change.rs +++ b/crates/milli/src/update/new/document_change.rs @@ -2,7 +2,7 @@ use bumpalo::Bump; use heed::RoTxn; use super::document::{ - Document as _, DocumentFromDb, DocumentFromVersions, MergedDocument, Versions, + DeltaDocument, Document as _, DocumentFromDb, DocumentFromVersions, MergedDocument, Versions, }; use super::extract::perm_json_p; use super::vector_document::{ @@ -167,6 +167,15 @@ impl<'doc> Update<'doc> { } } + pub fn delta<'t, Mapper: FieldIdMapper>( + &self, + rtxn: &'t RoTxn, + index: &'t Index, + mapper: &'t Mapper, + ) -> Result> { + DeltaDocument::new(self.docid, rtxn, index, mapper, DocumentFromVersions::new(&self.new)) + } + /// Returns whether the updated version of the document is different from the current version for the passed subset of fields. /// /// `true` if at least one top-level-field that is a exactly a member of field or a parent of a member of field changed.