Make the transform struct return diff-based documents obkvs

This commit is contained in:
ManyTheFish
2023-10-12 11:46:56 +02:00
committed by Louis Dureuil
parent f5ef69293b
commit 1dd97578a8
5 changed files with 349 additions and 95 deletions

View File

@ -7,18 +7,20 @@ use std::io::{Read, Seek};
use fxhash::FxHashMap;
use heed::RoTxn;
use itertools::Itertools;
use obkv::{KvReader, KvWriter};
use obkv::{KvReader, KvReaderU16, KvWriter};
use roaring::RoaringBitmap;
use serde_json::Value;
use smartstring::SmartString;
use super::helpers::{
create_sorter, create_writer, keep_latest_obkv, merge_obkvs_and_operations, MergeFn,
create_sorter, create_writer, obkvs_keep_last_addition_merge_deletions,
obkvs_merge_additions_and_deletions, MergeFn,
};
use super::{IndexDocumentsMethod, IndexerConfig};
use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader};
use crate::error::{Error, InternalError, UserError};
use crate::index::{db_name, main_key};
use crate::update::del_add::into_del_add_obkv;
use crate::update::{AvailableDocumentsIds, ClearDocuments, UpdateIndexingStep};
use crate::{
FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldsIdsMap, Index, Result, BEU32,
@ -106,8 +108,8 @@ impl<'a, 'i> Transform<'a, 'i> {
// We must choose the appropriate merge function for when two or more documents
// with the same user id must be merged or fully replaced in the same batch.
let merge_function = match index_documents_method {
IndexDocumentsMethod::ReplaceDocuments => keep_latest_obkv,
IndexDocumentsMethod::UpdateDocuments => merge_obkvs_and_operations,
IndexDocumentsMethod::ReplaceDocuments => obkvs_keep_last_addition_merge_deletions,
IndexDocumentsMethod::UpdateDocuments => obkvs_merge_additions_and_deletions,
};
// We initialize the sorter with the user indexing settings.
@ -223,19 +225,21 @@ impl<'a, 'i> Transform<'a, 'i> {
let docid = match self.new_external_documents_ids_builder.entry((*external_id).into()) {
Entry::Occupied(entry) => *entry.get() as u32,
Entry::Vacant(entry) => {
// If the document was already in the db we mark it as a replaced document.
// It'll be deleted later.
if let Some(docid) = external_documents_ids.get(entry.key()) {
// If it was already in the list of replaced documents it means it was deleted
// by the remove_document method. We should starts as if it never existed.
if self.replaced_documents_ids.insert(docid) {
original_docid = Some(docid);
let docid = match external_documents_ids.get(entry.key()) {
Some(docid) => {
// If it was already in the list of replaced documents it means it was deleted
// by the remove_document method. We should starts as if it never existed.
if self.replaced_documents_ids.insert(docid) {
original_docid = Some(docid);
}
docid
}
}
let docid = self
.available_documents_ids
.next()
.ok_or(UserError::DocumentLimitReached)?;
None => self
.available_documents_ids
.next()
.ok_or(UserError::DocumentLimitReached)?,
};
entry.insert(docid as u64);
docid
}
@ -263,16 +267,28 @@ impl<'a, 'i> Transform<'a, 'i> {
skip_insertion = true;
} else {
// we associate the base document with the new key, everything will get merged later.
let keep_original_version =
self.index_documents_method == IndexDocumentsMethod::UpdateDocuments;
document_sorter_buffer.clear();
document_sorter_buffer.push(Operation::Addition as u8);
document_sorter_buffer.extend_from_slice(base_obkv);
into_del_add_obkv(
KvReaderU16::new(base_obkv),
true,
keep_original_version,
&mut document_sorter_buffer,
)?;
self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
match self.flatten_from_fields_ids_map(KvReader::new(base_obkv))? {
Some(flattened_obkv) => {
// we recreate our buffer with the flattened documents
document_sorter_buffer.clear();
document_sorter_buffer.push(Operation::Addition as u8);
document_sorter_buffer.extend_from_slice(&flattened_obkv);
into_del_add_obkv(
KvReaderU16::new(&flattened_obkv),
true,
keep_original_version,
&mut document_sorter_buffer,
)?;
self.flattened_sorter
.insert(docid.to_be_bytes(), &document_sorter_buffer)?
}
@ -288,7 +304,12 @@ impl<'a, 'i> Transform<'a, 'i> {
document_sorter_buffer.clear();
document_sorter_buffer.push(Operation::Addition as u8);
document_sorter_buffer.extend_from_slice(&obkv_buffer);
into_del_add_obkv(
KvReaderU16::new(&obkv_buffer),
false,
true,
&mut document_sorter_buffer,
)?;
// We use the extracted/generated user id as the key for this document.
self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
@ -296,7 +317,12 @@ impl<'a, 'i> Transform<'a, 'i> {
Some(flattened_obkv) => {
document_sorter_buffer.clear();
document_sorter_buffer.push(Operation::Addition as u8);
document_sorter_buffer.extend_from_slice(&flattened_obkv);
into_del_add_obkv(
KvReaderU16::new(&flattened_obkv),
false,
true,
&mut document_sorter_buffer,
)?;
self.flattened_sorter
.insert(docid.to_be_bytes(), &document_sorter_buffer)?
}
@ -354,19 +380,25 @@ impl<'a, 'i> Transform<'a, 'i> {
let external_documents_ids = self.index.external_documents_ids(wtxn)?;
let mut documents_deleted = 0;
let mut document_sorter_buffer = Vec::new();
for to_remove in to_remove {
if should_abort() {
return Err(Error::InternalError(InternalError::AbortedIndexation));
}
match self.new_external_documents_ids_builder.entry((*to_remove).into()) {
// Check if the document has been added in the current indexing process.
let deleted_from_current = match self
.new_external_documents_ids_builder
.entry((*to_remove).into())
{
// if the document was added in a previous iteration of the transform we make it as deleted in the sorters.
Entry::Occupied(entry) => {
let doc_id = *entry.get() as u32;
self.original_sorter
.insert(doc_id.to_be_bytes(), [Operation::Deletion as u8])?;
self.flattened_sorter
.insert(doc_id.to_be_bytes(), [Operation::Deletion as u8])?;
document_sorter_buffer.clear();
document_sorter_buffer.push(Operation::Deletion as u8);
obkv::KvWriterU16::new(&mut document_sorter_buffer).finish().unwrap();
self.original_sorter.insert(doc_id.to_be_bytes(), &document_sorter_buffer)?;
self.flattened_sorter.insert(doc_id.to_be_bytes(), &document_sorter_buffer)?;
// we must NOT update the list of replaced_documents_ids
// Either:
@ -375,21 +407,69 @@ impl<'a, 'i> Transform<'a, 'i> {
// we're removing it there is nothing to do.
self.new_documents_ids.remove(doc_id);
entry.remove_entry();
true
}
Entry::Vacant(entry) => {
// If the document was already in the db we mark it as a `to_delete` document.
// It'll be deleted later. We don't need to push anything to the sorters.
if let Some(docid) = external_documents_ids.get(entry.key()) {
self.replaced_documents_ids.insert(docid);
} else {
// if the document is nowehere to be found, there is nothing to do and we must NOT
// increment the count of documents_deleted
continue;
}
}
Entry::Vacant(_) => false,
};
documents_deleted += 1;
// If the document was already in the db we mark it as a `to_delete` document.
// Then we push the document in sorters in deletion mode.
let deleted_from_db = match external_documents_ids.get(&to_remove) {
Some(docid) => {
self.replaced_documents_ids.insert(docid);
// fetch the obkv document
let original_key = BEU32::new(docid);
let base_obkv = self
.index
.documents
.remap_data_type::<heed::types::ByteSlice>()
.get(wtxn, &original_key)?
.ok_or(InternalError::DatabaseMissingEntry {
db_name: db_name::DOCUMENTS,
key: None,
})?;
// push it as to delete in the original_sorter
document_sorter_buffer.clear();
document_sorter_buffer.push(Operation::Deletion as u8);
into_del_add_obkv(
KvReaderU16::new(base_obkv),
true,
false,
&mut document_sorter_buffer,
)?;
self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
// flatten it and push it as to delete in the flattened_sorter
match self.flatten_from_fields_ids_map(KvReader::new(base_obkv))? {
Some(flattened_obkv) => {
// we recreate our buffer with the flattened documents
document_sorter_buffer.clear();
document_sorter_buffer.push(Operation::Deletion as u8);
into_del_add_obkv(
KvReaderU16::new(&flattened_obkv),
true,
false,
&mut document_sorter_buffer,
)?;
self.flattened_sorter
.insert(docid.to_be_bytes(), &document_sorter_buffer)?
}
None => self
.flattened_sorter
.insert(docid.to_be_bytes(), &document_sorter_buffer)?,
}
true
}
None => false,
};
// increase counter only if the document existed somewhere before.
if deleted_from_current || deleted_from_db {
documents_deleted += 1;
}
}
Ok(documents_deleted)
@ -589,9 +669,7 @@ impl<'a, 'i> Transform<'a, 'i> {
let mut documents_count = 0;
while let Some((key, val)) = iter.next()? {
if val[0] == Operation::Deletion as u8 {
continue;
}
// skip first byte corresponding to the operation type (Deletion or Addition).
let val = &val[1..];
// send a callback to show at which step we are
@ -631,9 +709,7 @@ impl<'a, 'i> Transform<'a, 'i> {
// We get rids of the `Operation` byte and skip the deleted documents as well.
let mut iter = self.flattened_sorter.into_stream_merger_iter()?;
while let Some((key, val)) = iter.next()? {
if val[0] == Operation::Deletion as u8 {
continue;
}
// skip first byte corresponding to the operation type (Deletion or Addition).
let val = &val[1..];
writer.insert(key, val)?;
}
@ -713,6 +789,7 @@ impl<'a, 'i> Transform<'a, 'i> {
);
let mut obkv_buffer = Vec::new();
let mut document_sorter_buffer = Vec::new();
for result in self.index.all_documents(wtxn)? {
let (docid, obkv) = result?;
@ -727,7 +804,9 @@ impl<'a, 'i> Transform<'a, 'i> {
}
let buffer = obkv_writer.into_inner()?;
original_writer.insert(docid.to_be_bytes(), &buffer)?;
document_sorter_buffer.clear();
into_del_add_obkv(KvReaderU16::new(buffer), true, true, &mut document_sorter_buffer)?;
original_writer.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
// Once we have the document. We're going to flatten it
// and insert it in the flattened sorter.
@ -762,7 +841,9 @@ impl<'a, 'i> Transform<'a, 'i> {
let value = serde_json::to_vec(&value).map_err(InternalError::SerdeJson)?;
writer.insert(fid, &value)?;
}
flattened_writer.insert(docid.to_be_bytes(), &buffer)?;
document_sorter_buffer.clear();
into_del_add_obkv(KvReaderU16::new(&buffer), true, true, &mut document_sorter_buffer)?;
flattened_writer.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
}
// Once we have written all the documents, we extract
@ -828,38 +909,86 @@ mod test {
#[test]
fn merge_obkvs() {
let mut doc_0 = Vec::new();
let mut kv_writer = KvWriter::new(&mut doc_0);
let mut additive_doc_0 = Vec::new();
let mut deletive_doc_0 = Vec::new();
let mut del_add_doc_0 = Vec::new();
let mut kv_writer = KvWriter::memory();
kv_writer.insert(0_u8, [0]).unwrap();
kv_writer.finish().unwrap();
doc_0.insert(0, Operation::Addition as u8);
let buffer = kv_writer.into_inner().unwrap();
into_del_add_obkv(KvReaderU16::new(&buffer), false, true, &mut additive_doc_0).unwrap();
additive_doc_0.insert(0, Operation::Addition as u8);
into_del_add_obkv(KvReaderU16::new(&buffer), true, false, &mut deletive_doc_0).unwrap();
deletive_doc_0.insert(0, Operation::Deletion as u8);
into_del_add_obkv(KvReaderU16::new(&buffer), true, true, &mut del_add_doc_0).unwrap();
del_add_doc_0.insert(0, Operation::Addition as u8);
let ret = merge_obkvs_and_operations(&[], &[Cow::from(doc_0.as_slice())]).unwrap();
assert_eq!(*ret, doc_0);
let mut additive_doc_1 = Vec::new();
let mut kv_writer = KvWriter::memory();
kv_writer.insert(1_u8, [1]).unwrap();
let buffer = kv_writer.into_inner().unwrap();
into_del_add_obkv(KvReaderU16::new(&buffer), false, true, &mut additive_doc_1).unwrap();
additive_doc_1.insert(0, Operation::Addition as u8);
let ret = merge_obkvs_and_operations(
let mut additive_doc_0_1 = Vec::new();
let mut kv_writer = KvWriter::memory();
kv_writer.insert(0_u8, [0]).unwrap();
kv_writer.insert(1_u8, [1]).unwrap();
let buffer = kv_writer.into_inner().unwrap();
into_del_add_obkv(KvReaderU16::new(&buffer), false, true, &mut additive_doc_0_1).unwrap();
additive_doc_0_1.insert(0, Operation::Addition as u8);
let ret = obkvs_merge_additions_and_deletions(&[], &[Cow::from(additive_doc_0.as_slice())])
.unwrap();
assert_eq!(*ret, additive_doc_0);
let ret = obkvs_merge_additions_and_deletions(
&[],
&[Cow::from([Operation::Deletion as u8].as_slice()), Cow::from(doc_0.as_slice())],
&[Cow::from(deletive_doc_0.as_slice()), Cow::from(additive_doc_0.as_slice())],
)
.unwrap();
assert_eq!(*ret, doc_0);
assert_eq!(*ret, del_add_doc_0);
let ret = merge_obkvs_and_operations(
let ret = obkvs_merge_additions_and_deletions(
&[],
&[Cow::from(doc_0.as_slice()), Cow::from([Operation::Deletion as u8].as_slice())],
&[Cow::from(additive_doc_0.as_slice()), Cow::from(deletive_doc_0.as_slice())],
)
.unwrap();
assert_eq!(*ret, [Operation::Deletion as u8]);
assert_eq!(*ret, deletive_doc_0);
let ret = merge_obkvs_and_operations(
let ret = obkvs_merge_additions_and_deletions(
&[],
&[
Cow::from([Operation::Addition as u8, 1].as_slice()),
Cow::from([Operation::Deletion as u8].as_slice()),
Cow::from(doc_0.as_slice()),
Cow::from(additive_doc_1.as_slice()),
Cow::from(deletive_doc_0.as_slice()),
Cow::from(additive_doc_0.as_slice()),
],
)
.unwrap();
assert_eq!(*ret, doc_0);
assert_eq!(*ret, del_add_doc_0);
let ret = obkvs_merge_additions_and_deletions(
&[],
&[Cow::from(additive_doc_1.as_slice()), Cow::from(additive_doc_0.as_slice())],
)
.unwrap();
assert_eq!(*ret, additive_doc_0_1);
let ret = obkvs_keep_last_addition_merge_deletions(
&[],
&[Cow::from(additive_doc_1.as_slice()), Cow::from(additive_doc_0.as_slice())],
)
.unwrap();
assert_eq!(*ret, additive_doc_0);
let ret = obkvs_keep_last_addition_merge_deletions(
&[],
&[
Cow::from(deletive_doc_0.as_slice()),
Cow::from(additive_doc_1.as_slice()),
Cow::from(additive_doc_0.as_slice()),
],
)
.unwrap();
assert_eq!(*ret, del_add_doc_0);
}
}