Compare commits

..

1 Commits

Author SHA1 Message Date
Clément Renault
02a40645e2 Add TODOs to the prefix functions 2023-10-19 17:58:52 +02:00
13 changed files with 245 additions and 144 deletions

View File

@@ -6,7 +6,6 @@ use byteorder::{NativeEndian, ReadBytesExt, WriteBytesExt};
use roaring::RoaringBitmap;
use crate::heed_codec::BytesDecodeOwned;
use crate::update::del_add::{DelAdd, KvReaderDelAdd};
/// This is the limit where using a byteorder became less size efficient
/// than using a direct roaring encoding, it is also the point where we are able
@@ -100,28 +99,6 @@ impl CboRoaringBitmapCodec {
Ok(())
}
/// Merges a DelAdd delta into a CboRoaringBitmap.
pub fn merge_deladd_into(
deladd: KvReaderDelAdd<'_>,
previous: &[u8],
buffer: &mut Vec<u8>,
) -> io::Result<()> {
// Deserialize the bitmap that is already there
let mut previous = Self::deserialize_from(previous)?;
// Remove integers we no more want in the previous bitmap
if let Some(value) = deladd.get(DelAdd::Deletion) {
previous -= Self::deserialize_from(value)?;
}
// Insert the new integers we want in the previous bitmap
if let Some(value) = deladd.get(DelAdd::Addition) {
previous |= Self::deserialize_from(value)?;
}
previous.serialize_into(buffer)
}
}
impl heed::BytesDecode<'_> for CboRoaringBitmapCodec {

View File

@@ -1,6 +1,7 @@
use std::borrow::Cow;
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::fs::File;
use std::mem::size_of;
use std::path::Path;
use charabia::{Language, Script};
@@ -13,6 +14,7 @@ use time::OffsetDateTime;
use crate::distance::NDotProductPoint;
use crate::error::{InternalError, UserError};
use crate::facet::FacetType;
use crate::fields_ids_map::FieldsIdsMap;
use crate::heed_codec::facet::{
FacetGroupKeyCodec, FacetGroupValueCodec, FieldDocIdFacetF64Codec, FieldDocIdFacetStringCodec,
@@ -53,6 +55,7 @@ pub mod main_key {
/// e.g. vector-hnsw0x0032.
pub const VECTOR_HNSW_KEY_PREFIX: &str = "vector-hnsw";
pub const HARD_EXTERNAL_DOCUMENTS_IDS_KEY: &str = "hard-external-documents-ids";
pub const NUMBER_FACETED_DOCUMENTS_IDS_PREFIX: &str = "number-faceted-documents-ids";
pub const PRIMARY_KEY_KEY: &str = "primary-key";
pub const SEARCHABLE_FIELDS_KEY: &str = "searchable-fields";
pub const USER_DEFINED_SEARCHABLE_FIELDS_KEY: &str = "user-defined-searchable-fields";
@@ -61,6 +64,7 @@ pub mod main_key {
pub const NON_SEPARATOR_TOKENS_KEY: &str = "non-separator-tokens";
pub const SEPARATOR_TOKENS_KEY: &str = "separator-tokens";
pub const DICTIONARY_KEY: &str = "dictionary";
pub const STRING_FACETED_DOCUMENTS_IDS_PREFIX: &str = "string-faceted-documents-ids";
pub const SYNONYMS_KEY: &str = "synonyms";
pub const USER_DEFINED_SYNONYMS_KEY: &str = "user-defined-synonyms";
pub const WORDS_FST_KEY: &str = "words-fst";
@@ -922,6 +926,44 @@ impl Index {
/* faceted documents ids */
/// Writes the documents ids that are faceted under this field id for the given facet type.
pub fn put_faceted_documents_ids(
&self,
wtxn: &mut RwTxn,
field_id: FieldId,
facet_type: FacetType,
docids: &RoaringBitmap,
) -> heed::Result<()> {
let key = match facet_type {
FacetType::String => main_key::STRING_FACETED_DOCUMENTS_IDS_PREFIX,
FacetType::Number => main_key::NUMBER_FACETED_DOCUMENTS_IDS_PREFIX,
};
let mut buffer = vec![0u8; key.len() + size_of::<FieldId>()];
buffer[..key.len()].copy_from_slice(key.as_bytes());
buffer[key.len()..].copy_from_slice(&field_id.to_be_bytes());
self.main.put::<_, ByteSlice, RoaringBitmapCodec>(wtxn, &buffer, docids)
}
/// Retrieve all the documents ids that are faceted under this field id for the given facet type.
pub fn faceted_documents_ids(
&self,
rtxn: &RoTxn,
field_id: FieldId,
facet_type: FacetType,
) -> heed::Result<RoaringBitmap> {
let key = match facet_type {
FacetType::String => main_key::STRING_FACETED_DOCUMENTS_IDS_PREFIX,
FacetType::Number => main_key::NUMBER_FACETED_DOCUMENTS_IDS_PREFIX,
};
let mut buffer = vec![0u8; key.len() + size_of::<FieldId>()];
buffer[..key.len()].copy_from_slice(key.as_bytes());
buffer[key.len()..].copy_from_slice(&field_id.to_be_bytes());
match self.main.get::<_, ByteSlice, RoaringBitmapCodec>(rtxn, &buffer)? {
Some(docids) => Ok(docids),
None => Ok(RoaringBitmap::new()),
}
}
/// Retrieve all the documents which contain this field id set as null
pub fn null_faceted_documents_ids(
&self,

View File

@@ -359,7 +359,31 @@ pub fn snap_external_documents_ids(index: &Index) -> String {
snap
}
pub fn snap_number_faceted_documents_ids(index: &Index) -> String {
let rtxn = index.read_txn().unwrap();
let fields_ids_map = index.fields_ids_map(&rtxn).unwrap();
let mut snap = String::new();
for field_id in fields_ids_map.ids() {
let number_faceted_documents_ids =
index.faceted_documents_ids(&rtxn, field_id, FacetType::Number).unwrap();
writeln!(&mut snap, "{field_id:<3} {}", display_bitmap(&number_faceted_documents_ids))
.unwrap();
}
snap
}
pub fn snap_string_faceted_documents_ids(index: &Index) -> String {
let rtxn = index.read_txn().unwrap();
let fields_ids_map = index.fields_ids_map(&rtxn).unwrap();
let mut snap = String::new();
for field_id in fields_ids_map.ids() {
let string_faceted_documents_ids =
index.faceted_documents_ids(&rtxn, field_id, FacetType::String).unwrap();
writeln!(&mut snap, "{field_id:<3} {}", display_bitmap(&string_faceted_documents_ids))
.unwrap();
}
snap
}
pub fn snap_words_fst(index: &Index) -> String {
let rtxn = index.read_txn().unwrap();
let words_fst = index.words_fst(&rtxn).unwrap();
@@ -507,6 +531,12 @@ macro_rules! full_snap_of_db {
($index:ident, external_documents_ids) => {{
$crate::snapshot_tests::snap_external_documents_ids(&$index)
}};
($index:ident, number_faceted_documents_ids) => {{
$crate::snapshot_tests::snap_number_faceted_documents_ids(&$index)
}};
($index:ident, string_faceted_documents_ids) => {{
$crate::snapshot_tests::snap_string_faceted_documents_ids(&$index)
}};
($index:ident, words_fst) => {{
$crate::snapshot_tests::snap_words_fst(&$index)
}};

View File

@@ -1,6 +1,7 @@
use roaring::RoaringBitmap;
use time::OffsetDateTime;
use crate::facet::FacetType;
use crate::{ExternalDocumentsIds, FieldDistribution, Index, Result};
pub struct ClearDocuments<'t, 'u, 'i> {
@@ -50,6 +51,7 @@ impl<'t, 'u, 'i> ClearDocuments<'t, 'u, 'i> {
// We retrieve the number of documents ids that we are deleting.
let number_of_documents = self.index.number_of_documents(self.wtxn)?;
let faceted_fields = self.index.faceted_fields_ids(self.wtxn)?;
// We clean some of the main engine datastructures.
self.index.put_words_fst(self.wtxn, &fst::Set::default())?;
@@ -62,6 +64,22 @@ impl<'t, 'u, 'i> ClearDocuments<'t, 'u, 'i> {
self.index.delete_geo_faceted_documents_ids(self.wtxn)?;
self.index.delete_vector_hnsw(self.wtxn)?;
// We clean all the faceted documents ids.
for field_id in faceted_fields {
self.index.put_faceted_documents_ids(
self.wtxn,
field_id,
FacetType::Number,
&empty_roaring,
)?;
self.index.put_faceted_documents_ids(
self.wtxn,
field_id,
FacetType::String,
&empty_roaring,
)?;
}
// Clear the other databases.
word_docids.clear(self.wtxn)?;
exact_word_docids.clear(self.wtxn)?;

View File

@@ -382,6 +382,12 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> {
for facet_type in [FacetType::Number, FacetType::String] {
let mut affected_facet_values = HashMap::new();
for field_id in self.index.faceted_fields_ids(self.wtxn)? {
// Remove docids from the number faceted documents ids
let mut docids =
self.index.faceted_documents_ids(self.wtxn, field_id, facet_type)?;
docids -= &self.to_delete_docids;
self.index.put_faceted_documents_ids(self.wtxn, field_id, facet_type, &docids)?;
let facet_values = remove_docids_from_field_id_docid_facet_value(
self.index,
self.wtxn,

View File

@@ -1,9 +1,9 @@
use std::borrow::Cow;
use std::fs::File;
use grenad::CompressionType;
use heed::types::ByteSlice;
use heed::{BytesEncode, Error, RoTxn, RwTxn};
use obkv::KvReader;
use roaring::RoaringBitmap;
use super::{FACET_GROUP_SIZE, FACET_MIN_LEVEL_SIZE};
@@ -12,7 +12,6 @@ use crate::heed_codec::facet::{
FacetGroupKey, FacetGroupKeyCodec, FacetGroupValue, FacetGroupValueCodec,
};
use crate::heed_codec::ByteSliceRefCodec;
use crate::update::del_add::DelAdd;
use crate::update::index_documents::{create_writer, valid_lmdb_key, writer_into_reader};
use crate::{CboRoaringBitmapCodec, FieldId, Index, Result};
@@ -21,6 +20,9 @@ use crate::{CboRoaringBitmapCodec, FieldId, Index, Result};
///
/// First, the new elements are inserted into the level 0 of the database. Then, the
/// higher levels are cleared and recomputed from the content of level 0.
///
/// Finally, the `faceted_documents_ids` value in the main database of `Index`
/// is updated to contain the new set of faceted documents.
pub struct FacetsUpdateBulk<'i> {
index: &'i Index,
group_size: u8,
@@ -28,7 +30,7 @@ pub struct FacetsUpdateBulk<'i> {
facet_type: FacetType,
field_ids: Vec<FieldId>,
// None if level 0 does not need to be updated
delta_data: Option<grenad::Reader<File>>,
new_data: Option<grenad::Reader<File>>,
}
impl<'i> FacetsUpdateBulk<'i> {
@@ -36,7 +38,7 @@ impl<'i> FacetsUpdateBulk<'i> {
index: &'i Index,
field_ids: Vec<FieldId>,
facet_type: FacetType,
delta_data: grenad::Reader<File>,
new_data: grenad::Reader<File>,
group_size: u8,
min_level_size: u8,
) -> FacetsUpdateBulk<'i> {
@@ -46,7 +48,7 @@ impl<'i> FacetsUpdateBulk<'i> {
group_size,
min_level_size,
facet_type,
delta_data: Some(delta_data),
new_data: Some(new_data),
}
}
@@ -61,13 +63,13 @@ impl<'i> FacetsUpdateBulk<'i> {
group_size: FACET_GROUP_SIZE,
min_level_size: FACET_MIN_LEVEL_SIZE,
facet_type,
delta_data: None,
new_data: None,
}
}
#[logging_timer::time("FacetsUpdateBulk::{}")]
pub fn execute(self, wtxn: &mut heed::RwTxn) -> Result<()> {
let Self { index, field_ids, group_size, min_level_size, facet_type, delta_data } = self;
let Self { index, field_ids, group_size, min_level_size, facet_type, new_data } = self;
let db = match facet_type {
FacetType::String => index
@@ -78,9 +80,12 @@ impl<'i> FacetsUpdateBulk<'i> {
}
};
let inner = FacetsUpdateBulkInner { db, delta_data, group_size, min_level_size };
let inner = FacetsUpdateBulkInner { db, new_data, group_size, min_level_size };
inner.update(wtxn, &field_ids)?;
inner.update(wtxn, &field_ids, |wtxn, field_id, all_docids| {
index.put_faceted_documents_ids(wtxn, field_id, facet_type, &all_docids)?;
Ok(())
})?;
Ok(())
}
@@ -89,19 +94,26 @@ impl<'i> FacetsUpdateBulk<'i> {
/// Implementation of `FacetsUpdateBulk` that is independent of milli's `Index` type
pub(crate) struct FacetsUpdateBulkInner<R: std::io::Read + std::io::Seek> {
pub db: heed::Database<FacetGroupKeyCodec<ByteSliceRefCodec>, FacetGroupValueCodec>,
pub delta_data: Option<grenad::Reader<R>>,
pub new_data: Option<grenad::Reader<R>>,
pub group_size: u8,
pub min_level_size: u8,
}
impl<R: std::io::Read + std::io::Seek> FacetsUpdateBulkInner<R> {
pub fn update(mut self, wtxn: &mut RwTxn, field_ids: &[u16]) -> Result<()> {
pub fn update(
mut self,
wtxn: &mut RwTxn,
field_ids: &[u16],
mut handle_all_docids: impl FnMut(&mut RwTxn, FieldId, RoaringBitmap) -> Result<()>,
) -> Result<()> {
self.update_level0(wtxn)?;
for &field_id in field_ids.iter() {
self.clear_levels(wtxn, field_id)?;
}
for &field_id in field_ids.iter() {
let level_readers = self.compute_levels_for_field_id(field_id, wtxn)?;
let (level_readers, all_docids) = self.compute_levels_for_field_id(field_id, wtxn)?;
handle_all_docids(wtxn, field_id, all_docids)?;
for level_reader in level_readers {
let mut cursor = level_reader.into_cursor()?;
@@ -121,26 +133,20 @@ impl<R: std::io::Read + std::io::Seek> FacetsUpdateBulkInner<R> {
Ok(())
}
// TODO the new_data is an Reader<Obkv<Key, Obkv<DelAdd, RoaringBitmap>>>
fn update_level0(&mut self, wtxn: &mut RwTxn) -> Result<()> {
let delta_data = match self.delta_data.take() {
let new_data = match self.new_data.take() {
Some(x) => x,
None => return Ok(()),
};
if self.db.is_empty(wtxn)? {
let mut buffer = Vec::new();
let mut database = self.db.iter_mut(wtxn)?.remap_types::<ByteSlice, ByteSlice>();
let mut cursor = delta_data.into_cursor()?;
let mut cursor = new_data.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? {
if !valid_lmdb_key(key) {
continue;
}
let value: KvReader<DelAdd> = KvReader::new(value);
// DB is empty, it is safe to ignore Del operations
let Some(value) = value.get(DelAdd::Addition) else {
continue;
};
buffer.clear();
// the group size for level 0
buffer.push(1);
@@ -152,14 +158,11 @@ impl<R: std::io::Read + std::io::Seek> FacetsUpdateBulkInner<R> {
let mut buffer = Vec::new();
let database = self.db.remap_types::<ByteSlice, ByteSlice>();
let mut cursor = delta_data.into_cursor()?;
let mut cursor = new_data.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? {
if !valid_lmdb_key(key) {
continue;
}
let value: KvReader<DelAdd> = KvReader::new(value);
// the value is a CboRoaringBitmap, but I still need to prepend the
// group size for level 0 (= 1) to it
buffer.clear();
@@ -168,15 +171,12 @@ impl<R: std::io::Read + std::io::Seek> FacetsUpdateBulkInner<R> {
match database.get(wtxn, key)? {
Some(prev_value) => {
let old_bitmap = &prev_value[1..];
CboRoaringBitmapCodec::merge_deladd_into(value, old_bitmap, &mut buffer)?;
CboRoaringBitmapCodec::merge_into(
&[Cow::Borrowed(value), Cow::Borrowed(old_bitmap)],
&mut buffer,
)?;
}
None => {
// it is safe to ignore the del in that case.
let Some(value) = value.get(DelAdd::Addition) else {
// won't put the key in DB as the value would be empty
continue;
};
buffer.extend_from_slice(value);
}
};
@@ -189,10 +189,16 @@ impl<R: std::io::Read + std::io::Seek> FacetsUpdateBulkInner<R> {
&self,
field_id: FieldId,
txn: &RoTxn,
) -> Result<Vec<grenad::Reader<File>>> {
let subwriters = self.compute_higher_levels(txn, field_id, 32, &mut |_, _| Ok(()))?;
) -> Result<(Vec<grenad::Reader<File>>, RoaringBitmap)> {
let mut all_docids = RoaringBitmap::new();
let subwriters = self.compute_higher_levels(txn, field_id, 32, &mut |bitmaps, _| {
for bitmap in bitmaps {
all_docids |= bitmap;
}
Ok(())
})?;
Ok(subwriters)
Ok((subwriters, all_docids))
}
#[allow(clippy::type_complexity)]
fn read_level_0<'t>(
@@ -486,6 +492,7 @@ mod tests {
index.add_documents(documents).unwrap();
db_snap!(index, facet_id_f64_docids, "initial", @"c34f499261f3510d862fa0283bbe843a");
db_snap!(index, number_faceted_documents_ids, "initial", @"01594fecbb316798ce3651d6730a4521");
}
#[test]

View File

@@ -160,6 +160,7 @@ mod tests {
index.add_documents(documents).unwrap();
db_snap!(index, facet_id_f64_docids, 1, @"550cd138d6fe31ccdd42cd5392fbd576");
db_snap!(index, number_faceted_documents_ids, 1, @"9a0ea88e7c9dcf6dc0ef0b601736ffcf");
let mut wtxn = index.env.write_txn().unwrap();
@@ -177,6 +178,7 @@ mod tests {
db_snap!(index, soft_deleted_documents_ids, @"[]");
db_snap!(index, facet_id_f64_docids, 2, @"d4d5f14e7f1e1f09b86821a0b6defcc6");
db_snap!(index, number_faceted_documents_ids, 2, @"3570e0ac0fdb21be9ebe433f59264b56");
}
// Same test as above but working with string values for the facets
@@ -217,6 +219,7 @@ mod tests {
// Note that empty strings are not stored in the facet db due to commit 4860fd452965 (comment written on 29 Nov 2022)
db_snap!(index, facet_id_string_docids, 1, @"5fd1bd0724c65a6dc1aafb6db93c7503");
db_snap!(index, string_faceted_documents_ids, 1, @"54bc15494fa81d93339f43c08fd9d8f5");
let mut wtxn = index.env.write_txn().unwrap();
@@ -234,6 +237,7 @@ mod tests {
db_snap!(index, soft_deleted_documents_ids, @"[]");
db_snap!(index, facet_id_string_docids, 2, @"7f9c00b29e04d58c1821202a5dda0ebc");
db_snap!(index, string_faceted_documents_ids, 2, @"504152afa5c94fd4e515dcdfa4c7161f");
}
#[test]
@@ -270,6 +274,7 @@ mod tests {
// Note that empty strings are not stored in the facet db due to commit 4860fd452965 (comment written on 29 Nov 2022)
db_snap!(index, facet_id_string_docids, 1, @"5fd1bd0724c65a6dc1aafb6db93c7503");
db_snap!(index, string_faceted_documents_ids, 1, @"54bc15494fa81d93339f43c08fd9d8f5");
let mut rng = rand::rngs::SmallRng::from_seed([0; 32]);
@@ -286,6 +291,12 @@ mod tests {
db_snap!(index, soft_deleted_documents_ids, @"[]");
db_snap!(index, facet_id_string_docids, 2, @"ece56086e76d50e661fb2b58475b9f7d");
db_snap!(index, string_faceted_documents_ids, 2, @r###"
0 []
1 [11, 20, 73, 292, 324, 358, 381, 493, 839, 852, ]
2 [292, 324, 358, 381, 493, 839, 852, ]
3 [11, 20, 73, 292, 324, 358, 381, 493, 839, 852, ]
"###);
}
}

View File

@@ -1,8 +1,8 @@
use std::collections::HashMap;
use std::fs::File;
use heed::types::{ByteSlice, DecodeIgnore};
use heed::{BytesDecode, Error, RoTxn, RwTxn};
use obkv::KvReader;
use roaring::RoaringBitmap;
use crate::facet::FacetType;
@@ -11,9 +11,8 @@ use crate::heed_codec::facet::{
};
use crate::heed_codec::ByteSliceRefCodec;
use crate::search::facet::get_highest_level;
use crate::update::del_add::DelAdd;
use crate::update::index_documents::valid_lmdb_key;
use crate::{CboRoaringBitmapCodec, Index, Result};
use crate::{CboRoaringBitmapCodec, FieldId, Index, Result};
enum InsertionResult {
InPlace,
@@ -28,21 +27,27 @@ enum DeletionResult {
/// Algorithm to incrementally insert and delete elememts into the
/// `facet_id_(string/f64)_docids` databases.
pub struct FacetsUpdateIncremental {
///
/// Rhe `faceted_documents_ids` value in the main database of `Index`
/// is also updated to contain the new set of faceted documents.
pub struct FacetsUpdateIncremental<'i> {
index: &'i Index,
inner: FacetsUpdateIncrementalInner,
delta_data: grenad::Reader<File>,
facet_type: FacetType,
new_data: grenad::Reader<File>,
}
impl FacetsUpdateIncremental {
impl<'i> FacetsUpdateIncremental<'i> {
pub fn new(
index: &Index,
index: &'i Index,
facet_type: FacetType,
delta_data: grenad::Reader<File>,
new_data: grenad::Reader<File>,
group_size: u8,
min_level_size: u8,
max_group_size: u8,
) -> Self {
FacetsUpdateIncremental {
index,
inner: FacetsUpdateIncrementalInner {
db: match facet_type {
FacetType::String => index
@@ -56,41 +61,31 @@ impl FacetsUpdateIncremental {
max_group_size,
min_level_size,
},
delta_data,
facet_type,
new_data,
}
}
pub fn execute(self, wtxn: &mut RwTxn) -> crate::Result<()> {
let mut cursor = self.delta_data.into_cursor()?;
pub fn execute(self, wtxn: &'i mut RwTxn) -> crate::Result<()> {
let mut new_faceted_docids = HashMap::<FieldId, RoaringBitmap>::default();
let mut cursor = self.new_data.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? {
if !valid_lmdb_key(key) {
continue;
}
let key = FacetGroupKeyCodec::<ByteSliceRefCodec>::bytes_decode(key)
.ok_or(heed::Error::Encoding)?;
let value = KvReader::new(value);
let docids_to_delete = value
.get(DelAdd::Deletion)
.map(CboRoaringBitmapCodec::bytes_decode)
.map(|o| o.ok_or(heed::Error::Encoding));
let docids_to_add = value
.get(DelAdd::Addition)
.map(CboRoaringBitmapCodec::bytes_decode)
.map(|o| o.ok_or(heed::Error::Encoding));
if let Some(docids_to_delete) = docids_to_delete {
let docids_to_delete = docids_to_delete?;
self.inner.delete(wtxn, key.field_id, key.left_bound, &docids_to_delete)?;
}
if let Some(docids_to_add) = docids_to_add {
let docids_to_add = docids_to_add?;
self.inner.insert(wtxn, key.field_id, key.left_bound, &docids_to_add)?;
}
let docids = CboRoaringBitmapCodec::bytes_decode(value).ok_or(heed::Error::Encoding)?;
self.inner.insert(wtxn, key.field_id, key.left_bound, &docids)?;
*new_faceted_docids.entry(key.field_id).or_default() |= docids;
}
for (field_id, new_docids) in new_faceted_docids {
let mut docids = self.index.faceted_documents_ids(wtxn, field_id, self.facet_type)?;
docids |= new_docids;
self.index.put_faceted_documents_ids(wtxn, field_id, self.facet_type, &docids)?;
}
Ok(())
}
}

View File

@@ -108,14 +108,14 @@ pub struct FacetsUpdate<'i> {
index: &'i Index,
database: heed::Database<FacetGroupKeyCodec<ByteSliceRefCodec>, FacetGroupValueCodec>,
facet_type: FacetType,
delta_data: grenad::Reader<File>,
new_data: grenad::Reader<File>,
group_size: u8,
max_group_size: u8,
min_level_size: u8,
}
impl<'i> FacetsUpdate<'i> {
// TODO grenad::Reader<Key, Obkv<DelAdd, RoaringBitmap>>
pub fn new(index: &'i Index, facet_type: FacetType, delta_data: grenad::Reader<File>) -> Self {
pub fn new(index: &'i Index, facet_type: FacetType, new_data: grenad::Reader<File>) -> Self {
let database = match facet_type {
FacetType::String => index
.facet_id_string_docids
@@ -131,26 +131,26 @@ impl<'i> FacetsUpdate<'i> {
max_group_size: FACET_MAX_GROUP_SIZE,
min_level_size: FACET_MIN_LEVEL_SIZE,
facet_type,
delta_data,
new_data,
}
}
pub fn execute(self, wtxn: &mut heed::RwTxn) -> Result<()> {
if self.delta_data.is_empty() {
if self.new_data.is_empty() {
return Ok(());
}
debug!("Computing and writing the facet values levels docids into LMDB on disk...");
self.index.set_updated_at(wtxn, &OffsetDateTime::now_utc())?;
// See self::comparison_bench::benchmark_facet_indexing
if self.delta_data.len() >= (self.database.len(wtxn)? as u64 / 50) {
if self.new_data.len() >= (self.database.len(wtxn)? as u64 / 50) {
let field_ids =
self.index.faceted_fields_ids(wtxn)?.iter().copied().collect::<Vec<_>>();
let bulk_update = FacetsUpdateBulk::new(
self.index,
field_ids,
self.facet_type,
self.delta_data,
self.new_data,
self.group_size,
self.min_level_size,
);
@@ -159,7 +159,7 @@ impl<'i> FacetsUpdate<'i> {
let incremental_update = FacetsUpdateIncremental::new(
self.index,
self.facet_type,
self.delta_data,
self.new_data,
self.group_size,
self.min_level_size,
self.max_group_size,
@@ -459,7 +459,7 @@ pub(crate) mod test_helpers {
let update = FacetsUpdateBulkInner {
db: self.content,
delta_data: Some(reader),
new_data: Some(reader),
group_size: self.group_size.get(),
min_level_size: self.min_level_size.get(),
};
@@ -594,6 +594,7 @@ mod tests {
index.add_documents(documents).unwrap();
db_snap!(index, facet_id_f64_docids, "initial", @"777e0e221d778764b472c512617eeb3b");
db_snap!(index, number_faceted_documents_ids, "initial", @"bd916ef32b05fd5c3c4c518708f431a9");
db_snap!(index, soft_deleted_documents_ids, "initial", @"[]");
let mut documents = vec![];
@@ -616,6 +617,7 @@ mod tests {
index.add_documents(documents).unwrap();
db_snap!(index, facet_id_f64_docids, "replaced_1_soft", @"abba175d7bed727d0efadaef85a4388f");
db_snap!(index, number_faceted_documents_ids, "replaced_1_soft", @"de76488bd05ad94c6452d725acf1bd06");
db_snap!(index, soft_deleted_documents_ids, "replaced_1_soft", @"6c975deb900f286d2f6456d2d5c3a123");
// Then replace the last document while disabling soft_deletion
@@ -640,6 +642,7 @@ mod tests {
index.add_documents(documents).unwrap();
db_snap!(index, facet_id_f64_docids, "replaced_2_hard", @"029e27a46d09c574ae949aa4289b45e6");
db_snap!(index, number_faceted_documents_ids, "replaced_2_hard", @"60b19824f136affe6b240a7200779028");
db_snap!(index, soft_deleted_documents_ids, "replaced_2_hard", @"[]");
}
}

View File

@@ -166,7 +166,7 @@ pub(crate) fn data_from_obkv_documents(
lmdb_writer_sx.clone(),
extract_fid_word_count_docids,
merge_cbo_roaring_bitmaps,
TypedChunk::FieldIdWordCountDocids,
TypedChunk::FieldIdWordcountDocids,
"field-id-wordcount-docids",
);

View File

@@ -1499,6 +1499,12 @@ mod tests {
3 2 second second
3 3 third third
"###);
db_snap!(index, string_faceted_documents_ids, @r###"
0 []
1 []
2 []
3 [0, 1, 2, 3, ]
"###);
let rtxn = index.read_txn().unwrap();
@@ -1522,6 +1528,12 @@ mod tests {
db_snap!(index, facet_id_string_docids, @"");
db_snap!(index, field_id_docid_facet_strings, @"");
db_snap!(index, string_faceted_documents_ids, @r###"
0 []
1 []
2 []
3 [0, 1, 2, 3, ]
"###);
let rtxn = index.read_txn().unwrap();
@@ -1548,6 +1560,12 @@ mod tests {
3 2 second second
3 3 third third
"###);
db_snap!(index, string_faceted_documents_ids, @r###"
0 []
1 []
2 []
3 [0, 1, 2, 3, ]
"###);
let rtxn = index.read_txn().unwrap();

View File

@@ -1,3 +1,4 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::convert::TryInto;
use std::fs::File;
@@ -10,13 +11,14 @@ use heed::types::ByteSlice;
use heed::RwTxn;
use roaring::RoaringBitmap;
use super::helpers::{self, merge_ignore_values, valid_lmdb_key, CursorClonableMmap};
use super::helpers::{
self, merge_ignore_values, serialize_roaring_bitmap, valid_lmdb_key, CursorClonableMmap,
};
use super::{ClonableMmap, MergeFn};
use crate::distance::NDotProductPoint;
use crate::error::UserError;
use crate::facet::FacetType;
use crate::index::Hnsw;
use crate::update::del_add::{DelAdd, KvReaderDelAdd};
use crate::update::facet::FacetsUpdate;
use crate::update::index_documents::helpers::{as_cloneable_grenad, try_split_array_at};
use crate::{lat_lng_to_xyz, CboRoaringBitmapCodec, DocumentId, GeoPoint, Index, Result, BEU32};
@@ -25,7 +27,7 @@ pub(crate) enum TypedChunk {
FieldIdDocidFacetStrings(grenad::Reader<CursorClonableMmap>),
FieldIdDocidFacetNumbers(grenad::Reader<CursorClonableMmap>),
Documents(grenad::Reader<CursorClonableMmap>),
FieldIdWordCountDocids(grenad::Reader<File>),
FieldIdWordcountDocids(grenad::Reader<File>),
NewDocumentsIds(RoaringBitmap),
WordDocids {
word_docids_reader: grenad::Reader<File>,
@@ -56,7 +58,7 @@ impl TypedChunk {
TypedChunk::Documents(grenad) => {
format!("Documents {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::FieldIdWordCountDocids(grenad) => {
TypedChunk::FieldIdWordcountDocids(grenad) => {
format!("FieldIdWordcountDocids {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::NewDocumentsIds(grenad) => {
@@ -124,14 +126,14 @@ pub(crate) fn write_typed_chunk_into_index(
index.documents.remap_types::<ByteSlice, ByteSlice>().put(wtxn, key, value)?;
}
}
TypedChunk::FieldIdWordCountDocids(fid_word_count_docids_iter) => {
TypedChunk::FieldIdWordcountDocids(fid_word_count_docids_iter) => {
append_entries_into_database(
fid_word_count_docids_iter,
&index.field_id_word_count_docids,
wtxn,
index_is_empty,
deladd_serialize_add_side,
merge_deladd_cbo_roaring_bitmaps,
|value, _buffer| Ok(value),
merge_cbo_roaring_bitmaps,
)?;
is_merged_database = true;
}
@@ -149,8 +151,8 @@ pub(crate) fn write_typed_chunk_into_index(
&index.word_docids,
wtxn,
index_is_empty,
deladd_serialize_add_side,
merge_deladd_cbo_roaring_bitmaps,
|value, _buffer| Ok(value),
merge_cbo_roaring_bitmaps,
)?;
let exact_word_docids_iter = unsafe { as_cloneable_grenad(&exact_word_docids_reader) }?;
@@ -159,8 +161,8 @@ pub(crate) fn write_typed_chunk_into_index(
&index.exact_word_docids,
wtxn,
index_is_empty,
deladd_serialize_add_side,
merge_deladd_cbo_roaring_bitmaps,
|value, _buffer| Ok(value),
merge_cbo_roaring_bitmaps,
)?;
let word_fid_docids_iter = unsafe { as_cloneable_grenad(&word_fid_docids_reader) }?;
@@ -169,8 +171,8 @@ pub(crate) fn write_typed_chunk_into_index(
&index.word_fid_docids,
wtxn,
index_is_empty,
deladd_serialize_add_side,
merge_deladd_cbo_roaring_bitmaps,
|value, _buffer| Ok(value),
merge_cbo_roaring_bitmaps,
)?;
// create fst from word docids
@@ -191,8 +193,8 @@ pub(crate) fn write_typed_chunk_into_index(
&index.word_position_docids,
wtxn,
index_is_empty,
deladd_serialize_add_side,
merge_deladd_cbo_roaring_bitmaps,
|value, _buffer| Ok(value),
merge_cbo_roaring_bitmaps,
)?;
is_merged_database = true;
}
@@ -212,8 +214,8 @@ pub(crate) fn write_typed_chunk_into_index(
&index.facet_id_exists_docids,
wtxn,
index_is_empty,
deladd_serialize_add_side,
merge_deladd_cbo_roaring_bitmaps,
|value, _buffer| Ok(value),
merge_cbo_roaring_bitmaps,
)?;
is_merged_database = true;
}
@@ -223,8 +225,8 @@ pub(crate) fn write_typed_chunk_into_index(
&index.facet_id_is_null_docids,
wtxn,
index_is_empty,
deladd_serialize_add_side,
merge_deladd_cbo_roaring_bitmaps,
|value, _buffer| Ok(value),
merge_cbo_roaring_bitmaps,
)?;
is_merged_database = true;
}
@@ -234,8 +236,8 @@ pub(crate) fn write_typed_chunk_into_index(
&index.facet_id_is_empty_docids,
wtxn,
index_is_empty,
deladd_serialize_add_side,
merge_deladd_cbo_roaring_bitmaps,
|value, _buffer| Ok(value),
merge_cbo_roaring_bitmaps,
)?;
is_merged_database = true;
}
@@ -245,8 +247,8 @@ pub(crate) fn write_typed_chunk_into_index(
&index.word_pair_proximity_docids,
wtxn,
index_is_empty,
deladd_serialize_add_side,
merge_deladd_cbo_roaring_bitmaps,
|value, _buffer| Ok(value),
merge_cbo_roaring_bitmaps,
)?;
is_merged_database = true;
}
@@ -318,7 +320,7 @@ pub(crate) fn write_typed_chunk_into_index(
let found = vector.len();
let expected = *expected_dimensions.get_or_insert(found);
if expected != found {
return Err(UserError::InvalidVectorDimensions { expected, found }.into());
return Err(UserError::InvalidVectorDimensions { expected, found })?;
}
points.push(NDotProductPoint::new(vector));
@@ -385,28 +387,13 @@ fn merge_word_docids_reader_into_fst(
Ok(builder.into_set())
}
/// A function that extracts and returns the Add side of a DelAdd obkv.
/// This is useful when there are no previous value in the database and
/// therefore we don't need to do a diff with what's already there.
///
/// If there is no Add side we currently write an empty buffer
/// which is a valid CboRoaringBitmap.
fn deladd_serialize_add_side<'a>(obkv: &'a [u8], _buffer: &mut Vec<u8>) -> Result<&'a [u8]> {
Ok(KvReaderDelAdd::new(obkv).get(DelAdd::Addition).unwrap_or_default())
}
/// A function that merges a DelAdd of bitmao into an already existing bitmap.
///
/// The first argument is the DelAdd obkv of CboRoaringBitmaps and
/// the second one is the CboRoaringBitmap to merge into.
fn merge_deladd_cbo_roaring_bitmaps(
deladd_obkv: &[u8],
previous: &[u8],
fn merge_cbo_roaring_bitmaps(
new_value: &[u8],
db_value: &[u8],
buffer: &mut Vec<u8>,
) -> Result<()> {
Ok(CboRoaringBitmapCodec::merge_deladd_into(
KvReaderDelAdd::new(deladd_obkv),
previous,
Ok(CboRoaringBitmapCodec::merge_into(
&[Cow::Borrowed(db_value), Cow::Borrowed(new_value)],
buffer,
)?)
}
@@ -491,7 +478,7 @@ where
while let Some((key, value)) = cursor.move_on_next()? {
if valid_lmdb_key(key) {
debug_assert!(
K::bytes_decode(key).is_some(),
K::bytes_decode(&key).is_some(),
"Couldn't decode key with the database decoder, key length: {} - key bytes: {:x?}",
key.len(),
&key

View File

@@ -40,6 +40,7 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> {
#[logging_timer::time("WordPrefixDocids::{}")]
pub fn execute(
self,
// TODO grenad::Reader<onkv::Reader<Word, obkv::Reader<DelAdd, CboRoaringBitmap>>>
mut new_word_docids_iter: grenad::ReaderCursor<CursorClonableMmap>,
new_prefix_fst_words: &[String],
common_prefix_fst_words: &[&[String]],
@@ -51,6 +52,7 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> {
// and write into it at the same time, therefore we write into another file.
let mut prefix_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable,
// TODO change to merge_deladd_cbo_roaring_bitmaps
merge_cbo_roaring_bitmaps,
self.chunk_compression_type,
self.chunk_compression_level,
@@ -96,6 +98,7 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> {
let prefix = std::str::from_utf8(prefix.as_bytes())?;
for result in db.prefix_iter(self.wtxn, prefix)? {
let (_word, data) = result?;
// TODO fake a DelAdd -> Add(`data`)
prefix_docids_sorter.insert(prefix, data)?;
}
}
@@ -111,10 +114,13 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> {
drop(iter);
// We finally write the word prefix docids into the LMDB database.
// TODO introduce a new function that is similar to `append_entries_into_database`
// and accepts the `merge_deladd_cbo_roaring_bitmaps` function
sorter_into_lmdb_database(
self.wtxn,
*self.word_prefix_docids.as_polymorph(),
prefix_docids_sorter,
// TODO change to `merge_deladd_cbo_roaring_bitmaps`
merge_cbo_roaring_bitmaps,
)?;
@@ -127,6 +133,7 @@ fn write_prefixes_in_sorter(
sorter: &mut grenad::Sorter<MergeFn>,
) -> Result<()> {
for (key, data_slices) in prefixes.drain() {
// TODO merge keys before inserting them in the sorter
for data in data_slices {
if valid_lmdb_key(&key) {
sorter.insert(&key, data)?;