Merge pull request #4207 from meilisearch/diff-indexing-prefix-databases

Diff indexing prefix databases
This commit is contained in:
Many the fish
2023-11-14 16:04:05 +01:00
committed by GitHub
20 changed files with 238 additions and 1563 deletions

View File

@ -1,14 +1,12 @@
use std::borrow::Cow;
use std::fs::File;
use std::io::{self, BufReader, BufWriter, Seek};
use std::time::Instant;
use grenad::{CompressionType, Sorter};
use heed::types::ByteSlice;
use log::debug;
use super::{ClonableMmap, MergeFn};
use crate::error::InternalError;
use crate::update::index_documents::valid_lmdb_key;
use crate::Result;
pub type CursorClonableMmap = io::Cursor<ClonableMmap>;
@ -240,45 +238,46 @@ pub fn grenad_obkv_into_chunks<R: io::Read + io::Seek>(
Ok(std::iter::from_fn(move || transposer().transpose()))
}
pub fn sorter_into_lmdb_database(
wtxn: &mut heed::RwTxn,
database: heed::PolyDatabase,
/// Write provided sorter in database using serialize_value function.
/// merge_values function is used if an entry already exist in the database.
pub fn write_sorter_into_database<K, V, FS, FM>(
sorter: Sorter<MergeFn>,
merge: MergeFn,
) -> Result<()> {
database: &heed::Database<K, V>,
wtxn: &mut heed::RwTxn,
index_is_empty: bool,
serialize_value: FS,
merge_values: FM,
) -> Result<()>
where
FS: for<'a> Fn(&'a [u8], &'a mut Vec<u8>) -> Result<&'a [u8]>,
FM: for<'a> Fn(&[u8], &[u8], &'a mut Vec<u8>) -> Result<Option<&'a [u8]>>,
{
puffin::profile_function!();
debug!("Writing MTBL sorter...");
let before = Instant::now();
let mut buffer = Vec::new();
let database = database.remap_types::<ByteSlice, ByteSlice>();
let mut merger_iter = sorter.into_stream_merger_iter()?;
if database.is_empty(wtxn)? {
let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?;
while let Some((k, v)) = merger_iter.next()? {
// safety: we don't keep references from inside the LMDB database.
unsafe { out_iter.append(k, v)? };
}
} else {
while let Some((k, v)) = merger_iter.next()? {
let mut iter = database.prefix_iter_mut::<_, ByteSlice, ByteSlice>(wtxn, k)?;
match iter.next().transpose()? {
Some((key, old_val)) if key == k => {
let vals = vec![Cow::Borrowed(old_val), Cow::Borrowed(v)];
let val = merge(k, &vals).map_err(|_| {
// TODO just wrap this error?
InternalError::IndexingMergingKeys { process: "get-put-merge" }
})?;
// safety: we don't keep references from inside the LMDB database.
unsafe { iter.put_current(k, &val)? };
while let Some((key, value)) = merger_iter.next()? {
if valid_lmdb_key(key) {
buffer.clear();
let value = if index_is_empty {
Some(serialize_value(value, &mut buffer)?)
} else {
match database.get(wtxn, key)? {
Some(prev_value) => merge_values(value, prev_value, &mut buffer)?,
None => Some(serialize_value(value, &mut buffer)?),
}
_ => {
drop(iter);
database.put::<_, ByteSlice, ByteSlice>(wtxn, k, v)?;
};
match value {
Some(value) => database.put(wtxn, key, value)?,
None => {
database.delete(wtxn, key)?;
}
}
}
}
debug!("MTBL sorter writen in {:.02?}!", before.elapsed());
Ok(())
}

View File

@ -239,3 +239,19 @@ pub fn merge_deladd_cbo_roaring_bitmaps<'a>(
output_deladd_obkv.into_inner().map(Cow::from).map_err(Into::into)
}
}
/// A function that merges a DelAdd of bitmao into an already existing bitmap.
///
/// The first argument is the DelAdd obkv of CboRoaringBitmaps and
/// the second one is the CboRoaringBitmap to merge into.
pub fn merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap<'a>(
deladd_obkv: &[u8],
previous: &[u8],
buffer: &'a mut Vec<u8>,
) -> Result<Option<&'a [u8]>> {
Ok(CboRoaringBitmapCodec::merge_deladd_into(
KvReaderDelAdd::new(deladd_obkv),
previous,
buffer,
)?)
}

View File

@ -9,12 +9,13 @@ pub use clonable_mmap::{ClonableMmap, CursorClonableMmap};
use fst::{IntoStreamer, Streamer};
pub use grenad_helpers::{
as_cloneable_grenad, create_sorter, create_writer, grenad_obkv_into_chunks,
merge_ignore_values, sorter_into_lmdb_database, sorter_into_reader, writer_into_reader,
merge_ignore_values, sorter_into_reader, write_sorter_into_database, writer_into_reader,
GrenadParameters, MergeableReader,
};
pub use merge_functions::{
concat_u32s_array, keep_first, keep_latest_obkv, merge_btreeset_string,
merge_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps, merge_roaring_bitmaps,
merge_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps,
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, merge_roaring_bitmaps,
obkvs_keep_last_addition_merge_deletions, obkvs_merge_additions_and_deletions,
serialize_roaring_bitmap, MergeFn,
};

View File

@ -23,8 +23,10 @@ use self::enrich::enrich_documents_batch;
pub use self::enrich::{extract_finite_float_from_value, validate_geo_from_json, DocumentId};
pub use self::helpers::{
as_cloneable_grenad, create_sorter, create_writer, fst_stream_into_hashset,
fst_stream_into_vec, merge_btreeset_string, merge_cbo_roaring_bitmaps, merge_roaring_bitmaps,
sorter_into_lmdb_database, valid_lmdb_key, writer_into_reader, ClonableMmap, MergeFn,
fst_stream_into_vec, merge_btreeset_string, merge_cbo_roaring_bitmaps,
merge_deladd_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
merge_roaring_bitmaps, valid_lmdb_key, write_sorter_into_database, writer_into_reader,
ClonableMmap, MergeFn,
};
use self::helpers::{grenad_obkv_into_chunks, GrenadParameters};
pub use self::transform::{Transform, TransformOutput};
@ -32,13 +34,12 @@ use crate::documents::{obkv_to_object, DocumentsBatchReader};
use crate::error::{Error, InternalError, UserError};
pub use crate::update::index_documents::helpers::CursorClonableMmap;
use crate::update::{
IndexerConfig, PrefixWordPairsProximityDocids, UpdateIndexingStep, WordPrefixDocids,
WordPrefixIntegerDocids, WordsPrefixesFst,
IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst,
};
use crate::{CboRoaringBitmapCodec, Index, Result};
static MERGED_DATABASE_COUNT: usize = 7;
static PREFIX_DATABASE_COUNT: usize = 5;
static PREFIX_DATABASE_COUNT: usize = 4;
static TOTAL_POSTING_DATABASE_COUNT: usize = MERGED_DATABASE_COUNT + PREFIX_DATABASE_COUNT;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@ -411,12 +412,42 @@ where
total_databases: TOTAL_POSTING_DATABASE_COUNT,
});
let mut word_position_docids = None;
let mut word_fid_docids = None;
let mut word_docids = None;
let mut exact_word_docids = None;
for result in lmdb_writer_rx {
if (self.should_abort)() {
return Err(Error::InternalError(InternalError::AbortedIndexation));
}
let typed_chunk = result?;
let typed_chunk = match result? {
TypedChunk::WordDocids {
word_docids_reader,
exact_word_docids_reader,
word_fid_docids_reader,
} => {
let cloneable_chunk = unsafe { as_cloneable_grenad(&word_docids_reader)? };
word_docids = Some(cloneable_chunk);
let cloneable_chunk =
unsafe { as_cloneable_grenad(&exact_word_docids_reader)? };
exact_word_docids = Some(cloneable_chunk);
let cloneable_chunk = unsafe { as_cloneable_grenad(&word_fid_docids_reader)? };
word_fid_docids = Some(cloneable_chunk);
TypedChunk::WordDocids {
word_docids_reader,
exact_word_docids_reader,
word_fid_docids_reader,
}
}
TypedChunk::WordPositionDocids(chunk) => {
let cloneable_chunk = unsafe { as_cloneable_grenad(&chunk)? };
word_position_docids = Some(cloneable_chunk);
TypedChunk::WordPositionDocids(chunk)
}
otherwise => otherwise,
};
// FIXME: return newly added as well as newly deleted documents
let (docids, is_merged_database) =
@ -447,17 +478,16 @@ where
// We write the primary key field id into the main database
self.index.put_primary_key(self.wtxn, &primary_key)?;
let number_of_documents = self.index.number_of_documents(self.wtxn)?;
// TODO: reactivate prefix DB with diff-indexing
// self.execute_prefix_databases(
// word_docids,
// exact_word_docids,
// word_pair_proximity_docids,
// word_position_docids,
// word_fid_docids,
// )?;
self.execute_prefix_databases(
word_docids,
exact_word_docids,
word_position_docids,
word_fid_docids,
)?;
self.index.number_of_documents(self.wtxn)
Ok(number_of_documents)
}
#[logging_timer::time("IndexDocuments::{}")]
@ -465,7 +495,6 @@ where
self,
word_docids: Option<grenad::Reader<CursorClonableMmap>>,
exact_word_docids: Option<grenad::Reader<CursorClonableMmap>>,
word_pair_proximity_docids: Option<grenad::Reader<CursorClonableMmap>>,
word_position_docids: Option<grenad::Reader<CursorClonableMmap>>,
word_fid_docids: Option<grenad::Reader<CursorClonableMmap>>,
) -> Result<()>
@ -586,32 +615,6 @@ where
total_databases: TOTAL_POSTING_DATABASE_COUNT,
});
if let Some(word_pair_proximity_docids) = word_pair_proximity_docids {
// Run the word prefix pair proximity docids update operation.
PrefixWordPairsProximityDocids::new(
self.wtxn,
self.index,
self.indexer_config.chunk_compression_type,
self.indexer_config.chunk_compression_level,
)
.execute(
word_pair_proximity_docids,
&new_prefix_fst_words,
&common_prefix_fst_words,
&del_prefix_fst_words,
)?;
}
if (self.should_abort)() {
return Err(Error::InternalError(InternalError::AbortedIndexation));
}
databases_seen += 1;
(self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {
databases_seen,
total_databases: TOTAL_POSTING_DATABASE_COUNT,
});
if let Some(word_position_docids) = word_position_docids {
// Run the words prefix position docids update operation.
let mut builder = WordPrefixIntegerDocids::new(

View File

@ -13,7 +13,10 @@ use obkv::{KvReader, KvWriter};
use ordered_float::OrderedFloat;
use roaring::RoaringBitmap;
use super::helpers::{self, merge_ignore_values, valid_lmdb_key, CursorClonableMmap};
use super::helpers::{
self, merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, merge_ignore_values,
valid_lmdb_key, CursorClonableMmap,
};
use super::{ClonableMmap, MergeFn};
use crate::distance::NDotProductPoint;
use crate::error::UserError;
@ -21,12 +24,11 @@ use crate::external_documents_ids::{DocumentOperation, DocumentOperationKind};
use crate::facet::FacetType;
use crate::index::db_name::DOCUMENTS;
use crate::index::Hnsw;
use crate::update::del_add::{DelAdd, KvReaderDelAdd};
use crate::update::del_add::{deladd_serialize_add_side, DelAdd, KvReaderDelAdd};
use crate::update::facet::FacetsUpdate;
use crate::update::index_documents::helpers::{as_cloneable_grenad, try_split_array_at};
use crate::{
lat_lng_to_xyz, CboRoaringBitmapCodec, DocumentId, FieldId, GeoPoint, Index, Result,
SerializationError, BEU32,
lat_lng_to_xyz, DocumentId, FieldId, GeoPoint, Index, Result, SerializationError, BEU32,
};
pub(crate) enum TypedChunk {
@ -186,7 +188,7 @@ pub(crate) fn write_typed_chunk_into_index(
wtxn,
index_is_empty,
deladd_serialize_add_side,
merge_deladd_cbo_roaring_bitmaps,
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
)?;
is_merged_database = true;
}
@ -202,7 +204,7 @@ pub(crate) fn write_typed_chunk_into_index(
wtxn,
index_is_empty,
deladd_serialize_add_side,
merge_deladd_cbo_roaring_bitmaps,
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
)?;
let exact_word_docids_iter = unsafe { as_cloneable_grenad(&exact_word_docids_reader) }?;
@ -212,7 +214,7 @@ pub(crate) fn write_typed_chunk_into_index(
wtxn,
index_is_empty,
deladd_serialize_add_side,
merge_deladd_cbo_roaring_bitmaps,
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
)?;
let word_fid_docids_iter = unsafe { as_cloneable_grenad(&word_fid_docids_reader) }?;
@ -222,7 +224,7 @@ pub(crate) fn write_typed_chunk_into_index(
wtxn,
index_is_empty,
deladd_serialize_add_side,
merge_deladd_cbo_roaring_bitmaps,
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
)?;
// create fst from word docids
@ -244,7 +246,7 @@ pub(crate) fn write_typed_chunk_into_index(
wtxn,
index_is_empty,
deladd_serialize_add_side,
merge_deladd_cbo_roaring_bitmaps,
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
)?;
is_merged_database = true;
}
@ -265,7 +267,7 @@ pub(crate) fn write_typed_chunk_into_index(
wtxn,
index_is_empty,
deladd_serialize_add_side,
merge_deladd_cbo_roaring_bitmaps,
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
)?;
is_merged_database = true;
}
@ -276,7 +278,7 @@ pub(crate) fn write_typed_chunk_into_index(
wtxn,
index_is_empty,
deladd_serialize_add_side,
merge_deladd_cbo_roaring_bitmaps,
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
)?;
is_merged_database = true;
}
@ -287,7 +289,7 @@ pub(crate) fn write_typed_chunk_into_index(
wtxn,
index_is_empty,
deladd_serialize_add_side,
merge_deladd_cbo_roaring_bitmaps,
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
)?;
is_merged_database = true;
}
@ -298,7 +300,7 @@ pub(crate) fn write_typed_chunk_into_index(
wtxn,
index_is_empty,
deladd_serialize_add_side,
merge_deladd_cbo_roaring_bitmaps,
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
)?;
is_merged_database = true;
}
@ -495,33 +497,6 @@ fn merge_word_docids_reader_into_fst(
Ok(builder.into_set())
}
/// A function that extracts and returns the Add side of a DelAdd obkv.
/// This is useful when there are no previous value in the database and
/// therefore we don't need to do a diff with what's already there.
///
/// If there is no Add side we currently write an empty buffer
/// which is a valid CboRoaringBitmap.
#[allow(clippy::ptr_arg)] // required to avoid signature mismatch
fn deladd_serialize_add_side<'a>(obkv: &'a [u8], _buffer: &mut Vec<u8>) -> Result<&'a [u8]> {
Ok(KvReaderDelAdd::new(obkv).get(DelAdd::Addition).unwrap_or_default())
}
/// A function that merges a DelAdd of bitmao into an already existing bitmap.
///
/// The first argument is the DelAdd obkv of CboRoaringBitmaps and
/// the second one is the CboRoaringBitmap to merge into.
fn merge_deladd_cbo_roaring_bitmaps<'a>(
deladd_obkv: &[u8],
previous: &[u8],
buffer: &'a mut Vec<u8>,
) -> Result<Option<&'a [u8]>> {
Ok(CboRoaringBitmapCodec::merge_deladd_into(
KvReaderDelAdd::new(deladd_obkv),
previous,
buffer,
)?)
}
/// Write provided entries in database using serialize_value function.
/// merge_values function is used if an entry already exist in the database.
fn write_entries_into_database<R, K, V, FS, FM>(