Implement the document merge function for the replace method

This commit is contained in:
Clément Renault
2024-08-29 14:08:31 +02:00
parent 637a9c8bdd
commit e6ffa4d454

View File

@@ -19,7 +19,7 @@ mod indexer {
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use serde_json::Value; use serde_json::Value;
use super::document_change::{self, DocumentChange, Insertion, Update}; use super::document_change::{Deletion, DocumentChange, Insertion, Update};
use super::items_pool::ItemsPool; use super::items_pool::ItemsPool;
use crate::documents::{ use crate::documents::{
obkv_to_object, DocumentIdExtractionError, DocumentsBatchReader, PrimaryKey, obkv_to_object, DocumentIdExtractionError, DocumentsBatchReader, PrimaryKey,
@@ -88,7 +88,7 @@ mod indexer {
rtxn: &'a RoTxn, rtxn: &'a RoTxn,
mut fields_ids_map: FieldsIdsMap, mut fields_ids_map: FieldsIdsMap,
primary_key: &'a PrimaryKey<'a>, primary_key: &'a PrimaryKey<'a>,
) -> Result<impl ParallelIterator<Item = document_change::DocumentChange> + 'a> { ) -> Result<impl ParallelIterator<Item = DocumentChange> + 'a> {
let documents_ids = index.documents_ids(rtxn)?; let documents_ids = index.documents_ids(rtxn)?;
let mut available_docids = AvailableDocumentsIds::from_documents_ids(&documents_ids); let mut available_docids = AvailableDocumentsIds::from_documents_ids(&documents_ids);
let mut docids_version_offsets = HashMap::<String, _>::new(); let mut docids_version_offsets = HashMap::<String, _>::new();
@@ -185,9 +185,16 @@ mod indexer {
items, items,
|context_pool, (external_docid, (internal_docid, operations))| { |context_pool, (external_docid, (internal_docid, operations))| {
context_pool.with(|rtxn| match self.method { context_pool.with(|rtxn| match self.method {
IndexDocumentsMethod::ReplaceDocuments => todo!(), IndexDocumentsMethod::ReplaceDocuments => merge_document_for_replacements(
rtxn,
index,
&fields_ids_map,
internal_docid,
external_docid,
&operations,
),
// TODO Remap the documents to match the db fields_ids_map // TODO Remap the documents to match the db fields_ids_map
IndexDocumentsMethod::UpdateDocuments => merge_document_obkv_for_updates( IndexDocumentsMethod::UpdateDocuments => merge_document_for_updates(
rtxn, rtxn,
index, index,
&fields_ids_map, &fields_ids_map,
@@ -282,13 +289,12 @@ mod indexer {
index: &'a Index, index: &'a Index,
fields: &'a FieldsIdsMap, fields: &'a FieldsIdsMap,
primary_key: &'a PrimaryKey<'a>, primary_key: &'a PrimaryKey<'a>,
) -> Result<impl ParallelIterator<Item = Result<document_change::DocumentChange>> + 'a> ) -> Result<impl ParallelIterator<Item = Result<DocumentChange>> + 'a> {
{
let items = Arc::new(ItemsPool::new(|| index.read_txn().map_err(crate::Error::from))); let items = Arc::new(ItemsPool::new(|| index.read_txn().map_err(crate::Error::from)));
Ok(self.to_delete.into_iter().par_bridge().map_with(items, |items, docid| { Ok(self.to_delete.into_iter().par_bridge().map_with(items, |items, docid| {
items.with(|rtxn| { items.with(|rtxn| {
let document = index.document(rtxn, docid)?; let current = index.document(rtxn, docid)?;
let external_docid = match primary_key.document_id(&document, fields)? { let external_docid = match primary_key.document_id(&current, fields)? {
Ok(document_id) => Ok(document_id) as Result<_>, Ok(document_id) => Ok(document_id) as Result<_>,
Err(_) => Err(InternalError::DocumentsError( Err(_) => Err(InternalError::DocumentsError(
crate::documents::Error::InvalidDocumentFormat, crate::documents::Error::InvalidDocumentFormat,
@@ -297,12 +303,8 @@ mod indexer {
}?; }?;
/// TODO create a function for this /// TODO create a function for this
let document = document.as_bytes().to_vec().into_boxed_slice().into(); let current = current.as_bytes().to_vec().into_boxed_slice().into();
Ok(DocumentChange::Deletion(document_change::Deletion::create( Ok(DocumentChange::Deletion(Deletion::create(docid, external_docid, current)))
docid,
external_docid,
document,
)))
}) })
})) }))
} }
@@ -319,7 +321,7 @@ mod indexer {
self, self,
iter: I, iter: I,
index: &Index, index: &Index,
) -> impl ParallelIterator<Item = document_change::DocumentChange> ) -> impl ParallelIterator<Item = DocumentChange>
where where
I: IntoIterator<Item = Value>, I: IntoIterator<Item = Value>,
{ {
@@ -349,15 +351,13 @@ mod indexer {
} }
pub struct UpdateByFunctionIndexer; pub struct UpdateByFunctionIndexer;
// DocumentsBatchReader::from_reader(Cursor::new(content.as_ref()))?
/// Reads the previous version of a document from the database, the new versions /// Reads the previous version of a document from the database, the new versions
/// in the grenad update files and merges them to generate a new boxed obkv. /// in the grenad update files and merges them to generate a new boxed obkv.
/// ///
/// This function is only meant to be used when doing an update and not a replacement. /// This function is only meant to be used when doing an update and not a replacement.
pub fn merge_document_obkv_for_updates( pub fn merge_document_for_updates(
rtxn: &RoTxn, rtxn: &RoTxn,
// Let's construct the new obkv in memory
index: &Index, index: &Index,
fields_ids_map: &FieldsIdsMap, fields_ids_map: &FieldsIdsMap,
docid: DocumentId, docid: DocumentId,
@@ -365,11 +365,11 @@ mod indexer {
operations: &[DocumentOperation], operations: &[DocumentOperation],
) -> Result<Option<DocumentChange>> { ) -> Result<Option<DocumentChange>> {
let mut document = BTreeMap::<_, Cow<_>>::new(); let mut document = BTreeMap::<_, Cow<_>>::new();
let original = index.documents.remap_data_type::<Bytes>().get(rtxn, &docid)?; let current = index.documents.remap_data_type::<Bytes>().get(rtxn, &docid)?;
let original: Option<&KvReaderFieldId> = original.map(Into::into); let current: Option<&KvReaderFieldId> = current.map(Into::into);
if let Some(original) = original { if let Some(current) = current {
original.into_iter().for_each(|(k, v)| { current.into_iter().for_each(|(k, v)| {
document.insert(k, v.into()); document.insert(k, v.into());
}); });
} }
@@ -381,14 +381,12 @@ mod indexer {
let operations = &operations[last_deletion.map_or(0, |i| i + 1)..]; let operations = &operations[last_deletion.map_or(0, |i| i + 1)..];
if operations.is_empty() { if operations.is_empty() {
match original { match current {
Some(original_obkv) => { Some(current) => {
let current = original_obkv.as_bytes().to_vec().into_boxed_slice().into(); /// TODO create a function for this
return Ok(Some(DocumentChange::Deletion(document_change::Deletion::create( let current = current.as_bytes().to_vec().into_boxed_slice().into();
docid, let deletion = Deletion::create(docid, external_docid, current);
external_docid, return Ok(Some(DocumentChange::Deletion(deletion)));
current,
))));
} }
None => return Ok(None), None => return Ok(None),
} }
@@ -416,10 +414,10 @@ mod indexer {
/// TODO create a function for this conversion /// TODO create a function for this conversion
let new = writer.into_inner().unwrap().into_boxed_slice().into(); let new = writer.into_inner().unwrap().into_boxed_slice().into();
match original { match current {
Some(original) => { Some(current) => {
/// TODO create a function for this conversion /// TODO create a function for this conversion
let current = original.as_bytes().to_vec().into_boxed_slice().into(); let current = current.as_bytes().to_vec().into_boxed_slice().into();
let update = Update::create(docid, external_docid, current, new); let update = Update::create(docid, external_docid, current, new);
Ok(Some(DocumentChange::Update(update))) Ok(Some(DocumentChange::Update(update)))
} }
@@ -429,4 +427,68 @@ mod indexer {
} }
} }
} }
/// Returns only the most recent version of a document based on the updates from the payloads.
///
/// This function is only meant to be used when doing a replacement and not an update.
pub fn merge_document_for_replacements(
rtxn: &RoTxn,
index: &Index,
fields_ids_map: &FieldsIdsMap,
docid: DocumentId,
external_docid: String,
operations: &[DocumentOperation],
) -> Result<Option<DocumentChange>> {
let current = index.documents.remap_data_type::<Bytes>().get(rtxn, &docid)?;
let current: Option<&KvReaderFieldId> = current.map(Into::into);
match operations.last() {
Some(DocumentOperation::Addition(DocumentOffset { content, offset })) => {
let reader = DocumentsBatchReader::from_reader(Cursor::new(content.as_ref()))?;
let (mut cursor, batch_index) = reader.into_cursor_and_fields_index();
let update = cursor.get(*offset)?.expect("must exists");
let mut document_entries = Vec::new();
update.into_iter().for_each(|(k, v)| {
let field_name = batch_index.name(k).unwrap();
let id = fields_ids_map.id(field_name).unwrap();
document_entries.push((id, v));
});
document_entries.sort_unstable_by_key(|(id, _)| *id);
let mut writer = KvWriterFieldId::memory();
document_entries
.into_iter()
.for_each(|(id, value)| writer.insert(id, value).unwrap());
/// TODO create a function for this conversion
let new = writer.into_inner().unwrap().into_boxed_slice().into();
match current {
Some(current) => {
/// TODO create a function for this conversion
let current = current.as_bytes().to_vec().into_boxed_slice().into();
let update = Update::create(docid, external_docid, current, new);
Ok(Some(DocumentChange::Update(update)))
}
None => {
let insertion = Insertion::create(docid, external_docid, new);
Ok(Some(DocumentChange::Insertion(insertion)))
}
}
}
Some(DocumentOperation::Deletion) => {
match current {
Some(current) => {
/// TODO create a function for this conversion
let current = current.as_bytes().to_vec().into_boxed_slice().into();
let deletion = Deletion::create(docid, external_docid, current);
Ok(Some(DocumentChange::Deletion(deletion)))
}
None => Ok(None),
}
}
None => Ok(None),
}
}
} }