Give same interface to bulk and incremental facet indexing types

+ cargo fmt, oops, sorry for the bad history :(
This commit is contained in:
Loïc Lecrenier
2022-09-05 17:31:26 +02:00
committed by Loïc Lecrenier
parent 330c9eb1b2
commit 9026867d17
27 changed files with 333 additions and 174 deletions

View File

@ -1,7 +1,8 @@
use roaring::RoaringBitmap;
use time::OffsetDateTime;
use crate::{facet::FacetType, ExternalDocumentsIds, FieldDistribution, Index, Result};
use crate::facet::FacetType;
use crate::{ExternalDocumentsIds, FieldDistribution, Index, Result};
pub struct ClearDocuments<'t, 'u, 'i> {
wtxn: &'t mut heed::RwTxn<'i, 'u>,

View File

@ -11,7 +11,7 @@ use time::OffsetDateTime;
use super::{ClearDocuments, FacetsUpdateBulk};
use crate::error::{InternalError, UserError};
use crate::facet::FacetType;
use crate::heed_codec::facet::{FacetGroupValueCodec, FacetGroupKeyCodec, ByteSliceRef};
use crate::heed_codec::facet::{ByteSliceRef, FacetGroupKeyCodec, FacetGroupValueCodec};
use crate::heed_codec::CboRoaringBitmapCodec;
use crate::index::{db_name, main_key};
use crate::{

View File

@ -1,18 +1,20 @@
use std::borrow::Cow;
use std::cmp;
use std::fs::File;
use grenad::CompressionType;
use heed::types::ByteSlice;
use heed::{BytesEncode, Error, RoTxn, RwTxn};
use log::debug;
use roaring::RoaringBitmap;
use time::OffsetDateTime;
use crate::facet::FacetType;
use crate::heed_codec::facet::{
ByteSliceRef, FacetGroupKey, FacetGroupKeyCodec, FacetGroupValue, FacetGroupValueCodec,
};
use crate::update::index_documents::{create_writer, writer_into_reader};
use crate::{CboRoaringBitmapCodec, FieldId, Index, Result};
use grenad::CompressionType;
use heed::types::ByteSlice;
use heed::{BytesEncode, Error, RoTxn, RwTxn};
use log::debug;
use roaring::RoaringBitmap;
use std::borrow::Cow;
use std::cmp;
use std::fs::File;
use time::OffsetDateTime;
pub struct FacetsUpdateBulk<'i> {
index: &'i Index,
@ -367,9 +369,7 @@ mod tests {
documents.push(serde_json::json!({ "facet2": i }).as_object().unwrap().clone());
}
let documents = documents_batch_reader_from_objects(documents);
dbg!();
index.add_documents(documents).unwrap();
dbg!();
db_snap!(index, facet_id_f64_docids, name);
};
@ -421,4 +421,100 @@ mod tests {
test("default", None, None);
test("tiny_groups_tiny_levels", NonZeroUsize::new(1), NonZeroUsize::new(1));
}
#[test]
fn test_facets_number_incremental_update() {
let test =
|name: &str, group_size: Option<NonZeroUsize>, min_level_size: Option<NonZeroUsize>| {
let mut index = TempIndex::new_with_map_size(4096 * 1000 * 10); // 40MB
index.index_documents_config.autogenerate_docids = true;
index.index_documents_config.facet_level_group_size = group_size;
index.index_documents_config.facet_min_level_size = min_level_size;
index
.update_settings(|settings| {
settings.set_filterable_fields(
IntoIterator::into_iter(["facet".to_owned(), "facet2".to_owned()])
.collect(),
);
})
.unwrap();
let mut documents = vec![];
for i in 0..1000 {
documents.push(serde_json::json!({ "facet": i }).as_object().unwrap().clone());
}
for i in 0..100 {
documents.push(serde_json::json!({ "facet2": i }).as_object().unwrap().clone());
}
let documents_batch = documents_batch_reader_from_objects(documents.clone());
index.add_documents(documents_batch).unwrap();
let mut documents = vec![];
for i in 1000..1010 {
documents.push(serde_json::json!({ "facet": i }).as_object().unwrap().clone());
}
for i in 100..110 {
documents.push(serde_json::json!({ "facet2": i }).as_object().unwrap().clone());
}
let documents_batch = documents_batch_reader_from_objects(documents.clone());
index.add_documents(documents_batch).unwrap();
db_snap!(index, facet_id_f64_docids, name);
};
test("default", None, None);
test("tiny_groups_tiny_levels", NonZeroUsize::new(1), NonZeroUsize::new(1));
}
#[test]
fn test_facets_number_delete_facet_id_then_bulk_update() {
let test =
|name: &str, group_size: Option<NonZeroUsize>, min_level_size: Option<NonZeroUsize>| {
let mut index = TempIndex::new_with_map_size(4096 * 1000 * 10); // 40MB
index.index_documents_config.autogenerate_docids = true;
index.index_documents_config.facet_level_group_size = group_size;
index.index_documents_config.facet_min_level_size = min_level_size;
index
.update_settings(|settings| {
settings.set_filterable_fields(
IntoIterator::into_iter(["facet".to_owned(), "facet2".to_owned()])
.collect(),
);
})
.unwrap();
let mut documents = vec![];
for i in 0..1000 {
documents.push(serde_json::json!({ "facet": i }).as_object().unwrap().clone());
}
for i in 0..100 {
documents.push(serde_json::json!({ "facet2": i }).as_object().unwrap().clone());
}
let documents_batch = documents_batch_reader_from_objects(documents.clone());
index.add_documents(documents_batch).unwrap();
// 1100 facets -> how long is the DB?
let mut documents = vec![];
for i in 1000..1010 {
documents.push(serde_json::json!({ "facet": i }).as_object().unwrap().clone());
}
for i in 100..110 {
documents.push(serde_json::json!({ "facet2": i }).as_object().unwrap().clone());
}
let documents_batch = documents_batch_reader_from_objects(documents.clone());
index.add_documents(documents_batch).unwrap();
db_snap!(index, facet_id_f64_docids, name);
};
test("default", None, None);
test("tiny_groups_tiny_levels", NonZeroUsize::new(1), NonZeroUsize::new(1));
}
}

View File

@ -1,12 +1,16 @@
use std::collections::HashMap;
use std::fs::File;
use heed::types::ByteSlice;
use heed::{BytesDecode, Error, RoTxn, RwTxn};
use roaring::RoaringBitmap;
use crate::facet::FacetType;
use crate::heed_codec::facet::{
ByteSliceRef, FacetGroupKey, FacetGroupKeyCodec, FacetGroupValue, FacetGroupValueCodec,
};
use crate::search::facet::get_highest_level;
use crate::Result;
use crate::{CboRoaringBitmapCodec, FieldId, Index, Result};
enum InsertionResult {
InPlace,
@ -18,30 +22,79 @@ enum DeletionResult {
Remove { prev: Option<Vec<u8>>, next: Option<Vec<u8>> },
}
pub struct FacetsUpdateIncremental {
pub struct FacetsUpdateIncremental<'i> {
index: &'i Index,
inner: FacetsUpdateIncrementalInner,
facet_type: FacetType,
new_data: grenad::Reader<File>,
}
impl<'i> FacetsUpdateIncremental<'i> {
pub fn new(index: &'i Index, facet_type: FacetType, new_data: grenad::Reader<File>) -> Self {
FacetsUpdateIncremental {
index,
inner: FacetsUpdateIncrementalInner {
db: match facet_type {
FacetType::String => index
.facet_id_string_docids
.remap_key_type::<FacetGroupKeyCodec<ByteSliceRef>>(),
FacetType::Number => index
.facet_id_f64_docids
.remap_key_type::<FacetGroupKeyCodec<ByteSliceRef>>(),
},
group_size: 4,
max_group_size: 8,
min_level_size: 5,
},
facet_type,
new_data,
}
}
pub fn group_size(mut self, size: u8) -> Self {
self.inner.group_size = size;
self
}
pub fn min_level_size(mut self, size: u8) -> Self {
self.inner.min_level_size = size;
self
}
pub fn max_group_size(mut self, size: u8) -> Self {
self.inner.max_group_size = size;
self
}
pub fn execute(self, wtxn: &'i mut RwTxn) -> crate::Result<()> {
let mut new_faceted_docids = HashMap::<FieldId, RoaringBitmap>::default();
let mut cursor = self.new_data.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? {
let key = FacetGroupKeyCodec::<ByteSliceRef>::bytes_decode(key)
.ok_or(heed::Error::Encoding)?;
let docids = CboRoaringBitmapCodec::bytes_decode(value).ok_or(heed::Error::Encoding)?;
self.inner.insert(wtxn, key.field_id, key.left_bound, &docids)?;
*new_faceted_docids.entry(key.field_id).or_default() |= docids;
}
for (field_id, new_docids) in new_faceted_docids {
let mut docids = self.index.faceted_documents_ids(wtxn, field_id, self.facet_type)?;
docids |= new_docids;
self.index.put_faceted_documents_ids(wtxn, field_id, self.facet_type, &docids)?;
}
Ok(())
}
}
pub struct FacetsUpdateIncrementalInner {
db: heed::Database<FacetGroupKeyCodec<ByteSliceRef>, FacetGroupValueCodec>,
group_size: u8,
min_level_size: u8,
max_group_size: u8,
}
impl FacetsUpdateIncremental {
impl FacetsUpdateIncrementalInner {
pub fn new(db: heed::Database<FacetGroupKeyCodec<ByteSliceRef>, FacetGroupValueCodec>) -> Self {
Self { db, group_size: 4, min_level_size: 5, max_group_size: 8 }
}
pub fn group_size(mut self, size: u8) -> Self {
self.group_size = size;
self
}
pub fn min_level_size(mut self, size: u8) -> Self {
self.min_level_size = size;
self
}
pub fn max_group_size(mut self, size: u8) -> Self {
self.max_group_size = size;
self
}
}
impl FacetsUpdateIncremental {
impl FacetsUpdateIncrementalInner {
fn find_insertion_key_value(
&self,
field_id: u16,
@ -481,9 +534,9 @@ mod tests {
use rand::{Rng, SeedableRng};
use roaring::RoaringBitmap;
use crate::heed_codec::facet::OrderedF64Codec;
use crate::heed_codec::facet::StrRefCodec;
use crate::heed_codec::facet::{ByteSliceRef, FacetGroupKeyCodec, FacetGroupValueCodec};
use crate::heed_codec::facet::{
ByteSliceRef, FacetGroupKeyCodec, FacetGroupValueCodec, OrderedF64Codec, StrRefCodec,
};
use crate::milli_snap;
use crate::search::facet::get_highest_level;
use crate::search::facet::test::FacetIndex;
@ -534,7 +587,7 @@ mod tests {
FacetGroupKeyCodec::<ByteSliceRef>::bytes_decode(&key_bytes).unwrap()
};
assert!(value.size > 0 && (value.size as usize) < db.max_group_size);
assert!(value.size > 0 && value.size < db.max_group_size);
let mut actual_size = 0;
let mut values_below = RoaringBitmap::new();
@ -553,7 +606,7 @@ mod tests {
}
#[test]
fn append() {
let index = FacetIndex::<OrderedF64Codec>::new(4, 8);
let index = FacetIndex::<OrderedF64Codec>::new(4, 8, 5);
for i in 0..256u16 {
let mut bitmap = RoaringBitmap::new();
bitmap.insert(i as u32);
@ -566,7 +619,7 @@ mod tests {
}
#[test]
fn many_field_ids_append() {
let index = FacetIndex::<OrderedF64Codec>::new(4, 8);
let index = FacetIndex::<OrderedF64Codec>::new(4, 8, 5);
for i in 0..256u16 {
let mut bitmap = RoaringBitmap::new();
bitmap.insert(i as u32);
@ -595,7 +648,7 @@ mod tests {
}
#[test]
fn many_field_ids_prepend() {
let index = FacetIndex::<OrderedF64Codec>::new(4, 8);
let index = FacetIndex::<OrderedF64Codec>::new(4, 8, 5);
for i in (0..256).into_iter().rev() {
let mut bitmap = RoaringBitmap::new();
bitmap.insert(i as u32);
@ -625,7 +678,7 @@ mod tests {
#[test]
fn prepend() {
let index = FacetIndex::<OrderedF64Codec>::new(4, 8);
let index = FacetIndex::<OrderedF64Codec>::new(4, 8, 5);
let mut txn = index.env.write_txn().unwrap();
for i in (0..256).into_iter().rev() {
@ -640,7 +693,7 @@ mod tests {
#[test]
fn shuffled() {
let index = FacetIndex::<OrderedF64Codec>::new(4, 8);
let index = FacetIndex::<OrderedF64Codec>::new(4, 8, 5);
let mut txn = index.env.write_txn().unwrap();
let mut keys = (0..256).into_iter().collect::<Vec<_>>();
@ -659,7 +712,7 @@ mod tests {
#[test]
fn merge_values() {
let index = FacetIndex::<OrderedF64Codec>::new(4, 8);
let index = FacetIndex::<OrderedF64Codec>::new(4, 8, 5);
let mut keys = (0..256).into_iter().collect::<Vec<_>>();
let mut rng = rand::rngs::SmallRng::from_seed([0; 32]);
@ -680,7 +733,7 @@ mod tests {
#[test]
fn delete_from_end() {
let index = FacetIndex::<OrderedF64Codec>::new(4, 8);
let index = FacetIndex::<OrderedF64Codec>::new(4, 8, 5);
for i in 0..256 {
let mut bitmap = RoaringBitmap::new();
bitmap.insert(i);
@ -745,7 +798,7 @@ mod tests {
#[test]
fn delete_from_start() {
let index = FacetIndex::<OrderedF64Codec>::new(4, 8);
let index = FacetIndex::<OrderedF64Codec>::new(4, 8, 5);
for i in 0..256 {
let mut bitmap = RoaringBitmap::new();
@ -783,7 +836,7 @@ mod tests {
#[test]
fn delete_shuffled() {
let index = FacetIndex::<OrderedF64Codec>::new(4, 8);
let index = FacetIndex::<OrderedF64Codec>::new(4, 8, 5);
for i in 0..256 {
let mut bitmap = RoaringBitmap::new();
@ -829,7 +882,7 @@ mod tests {
#[test]
fn in_place_level0_insert() {
let index = FacetIndex::<OrderedF64Codec>::new(4, 8);
let index = FacetIndex::<OrderedF64Codec>::new(4, 8, 5);
let mut keys = (0..16).into_iter().collect::<Vec<_>>();
let mut rng = rand::rngs::SmallRng::from_seed([0; 32]);
keys.shuffle(&mut rng);
@ -849,7 +902,7 @@ mod tests {
#[test]
fn in_place_level0_delete() {
let index = FacetIndex::<OrderedF64Codec>::new(4, 8);
let index = FacetIndex::<OrderedF64Codec>::new(4, 8, 5);
let mut keys = (0..64).into_iter().collect::<Vec<_>>();
let mut rng = rand::rngs::SmallRng::from_seed([0; 32]);
@ -879,7 +932,7 @@ mod tests {
#[test]
fn shuffle_merge_string_and_delete() {
let index = FacetIndex::<StrRefCodec>::new(4, 8);
let index = FacetIndex::<StrRefCodec>::new(4, 8, 5);
let mut keys = (1000..1064).into_iter().collect::<Vec<_>>();
let mut rng = rand::rngs::SmallRng::from_seed([0; 32]);

View File

@ -1,12 +1,9 @@
use super::{FacetsUpdateBulk, FacetsUpdateIncremental};
use crate::{
facet::FacetType,
heed_codec::facet::{ByteSliceRef, FacetGroupKeyCodec, FacetGroupValueCodec},
CboRoaringBitmapCodec, FieldId, Index, Result,
};
use heed::BytesDecode;
use roaring::RoaringBitmap;
use std::{collections::HashMap, fs::File};
use self::incremental::FacetsUpdateIncremental;
use super::FacetsUpdateBulk;
use crate::facet::FacetType;
use crate::heed_codec::facet::{ByteSliceRef, FacetGroupKeyCodec, FacetGroupValueCodec};
use crate::{Index, Result};
use std::fs::File;
pub mod bulk;
pub mod incremental;
@ -14,11 +11,13 @@ pub mod incremental;
pub struct FacetsUpdate<'i> {
index: &'i Index,
database: heed::Database<FacetGroupKeyCodec<ByteSliceRef>, FacetGroupValueCodec>,
facet_type: FacetType,
new_data: grenad::Reader<File>,
// Options:
// there's no way to change these for now
level_group_size: u8,
max_level_group_size: u8,
min_level_size: u8,
facet_type: FacetType,
new_data: grenad::Reader<File>,
}
impl<'i> FacetsUpdate<'i> {
pub fn new(index: &'i Index, facet_type: FacetType, new_data: grenad::Reader<File>) -> Self {
@ -42,36 +41,37 @@ impl<'i> FacetsUpdate<'i> {
}
pub fn execute(self, wtxn: &mut heed::RwTxn) -> Result<()> {
if self.new_data.is_empty() {
return Ok(());
}
// here, come up with a better condition!
if self.database.is_empty(wtxn)? {
// ideally we'd choose which method to use for each field id individually
// but I dont' think it's worth the effort yet
// As a first requirement, we ask that the length of the new data is less
// than a 1/50th of the length of the database in order to use the incremental
// method.
if self.new_data.len() >= (self.database.len(wtxn)? as u64 / 50) {
let bulk_update = FacetsUpdateBulk::new(self.index, self.facet_type, self.new_data)
.level_group_size(self.level_group_size)
.min_level_size(self.min_level_size);
bulk_update.execute(wtxn)?;
} else {
let indexer = FacetsUpdateIncremental::new(self.database)
.max_group_size(self.max_level_group_size)
.min_level_size(self.min_level_size);
let mut new_faceted_docids = HashMap::<FieldId, RoaringBitmap>::default();
let mut cursor = self.new_data.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? {
let key = FacetGroupKeyCodec::<ByteSliceRef>::bytes_decode(key)
.ok_or(heed::Error::Encoding)?;
let docids =
CboRoaringBitmapCodec::bytes_decode(value).ok_or(heed::Error::Encoding)?;
indexer.insert(wtxn, key.field_id, key.left_bound, &docids)?;
*new_faceted_docids.entry(key.field_id).or_default() |= docids;
}
for (field_id, new_docids) in new_faceted_docids {
let mut docids =
self.index.faceted_documents_ids(wtxn, field_id, self.facet_type)?;
docids |= new_docids;
self.index.put_faceted_documents_ids(wtxn, field_id, self.facet_type, &docids)?;
}
let incremental_update =
FacetsUpdateIncremental::new(self.index, self.facet_type, self.new_data)
.group_size(self.level_group_size)
.max_group_size(self.max_level_group_size)
.min_level_size(self.min_level_size);
incremental_update.execute(wtxn)?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
// here I want to create a benchmark
// to find out at which point it is faster to do it incrementally
#[test]
fn update() {}
}

View File

@ -0,0 +1,4 @@
---
source: milli/src/update/facet/bulk.rs
---
9e9175e0a56db39f0dc04fb8f15c28fe

View File

@ -0,0 +1,4 @@
---
source: milli/src/update/facet/bulk.rs
---
9e9175e0a56db39f0dc04fb8f15c28fe

View File

@ -0,0 +1,4 @@
---
source: milli/src/update/facet/bulk.rs
---
b494fb6565707ce401f6d6ac03f46b93

View File

@ -0,0 +1,4 @@
---
source: milli/src/update/facet/bulk.rs
---
b494fb6565707ce401f6d6ac03f46b93

View File

@ -6,9 +6,9 @@ use heed::{BytesDecode, BytesEncode};
use super::helpers::{
create_sorter, merge_cbo_roaring_bitmaps, sorter_into_reader, GrenadParameters,
};
use crate::heed_codec::facet::FieldDocIdFacetF64Codec;
use crate::heed_codec::facet::OrderedF64Codec;
use crate::heed_codec::facet::{FacetGroupKey, FacetGroupKeyCodec};
use crate::heed_codec::facet::{
FacetGroupKey, FacetGroupKeyCodec, FieldDocIdFacetF64Codec, OrderedF64Codec,
};
use crate::Result;
/// Extracts the facet number and the documents ids where this facet number appear.

View File

@ -4,8 +4,7 @@ use std::io;
use heed::BytesEncode;
use super::helpers::{create_sorter, sorter_into_reader, try_split_array_at, GrenadParameters};
use crate::heed_codec::facet::StrRefCodec;
use crate::heed_codec::facet::{FacetGroupKey, FacetGroupKeyCodec};
use crate::heed_codec::facet::{FacetGroupKey, FacetGroupKeyCodec, StrRefCodec};
use crate::update::index_documents::merge_cbo_roaring_bitmaps;
use crate::{FieldId, Result};

View File

@ -3,7 +3,7 @@ use std::fs::File;
use std::io::{self, Seek, SeekFrom};
use std::time::Instant;
use grenad::{CompressionType, Reader, Sorter};
use grenad::{CompressionType, Sorter};
use heed::types::ByteSlice;
use log::debug;
@ -208,36 +208,6 @@ pub fn grenad_obkv_into_chunks<R: io::Read + io::Seek>(
Ok(std::iter::from_fn(move || transposer().transpose()))
}
pub fn write_into_lmdb_database(
wtxn: &mut heed::RwTxn,
database: heed::PolyDatabase,
reader: Reader<File>,
merge: MergeFn,
) -> Result<()> {
debug!("Writing MTBL stores...");
let before = Instant::now();
let mut cursor = reader.into_cursor()?;
while let Some((k, v)) = cursor.move_on_next()? {
let mut iter = database.prefix_iter_mut::<_, ByteSlice, ByteSlice>(wtxn, k)?;
match iter.next().transpose()? {
Some((key, old_val)) if key == k => {
let vals = &[Cow::Borrowed(old_val), Cow::Borrowed(v)][..];
let val = merge(k, vals)?;
// safety: we don't keep references from inside the LMDB database.
unsafe { iter.put_current(k, &val)? };
}
_ => {
drop(iter);
database.put::<_, ByteSlice, ByteSlice>(wtxn, k, v)?;
}
}
}
debug!("MTBL stores merged in {:.02?}!", before.elapsed());
Ok(())
}
pub fn sorter_into_lmdb_database(
wtxn: &mut heed::RwTxn,
database: heed::PolyDatabase,

View File

@ -9,8 +9,8 @@ pub use clonable_mmap::{ClonableMmap, CursorClonableMmap};
use fst::{IntoStreamer, Streamer};
pub use grenad_helpers::{
as_cloneable_grenad, create_sorter, create_writer, grenad_obkv_into_chunks,
merge_ignore_values, sorter_into_lmdb_database, sorter_into_reader, write_into_lmdb_database,
writer_into_reader, GrenadParameters, MergeableReader,
merge_ignore_values, sorter_into_lmdb_database, sorter_into_reader, writer_into_reader,
GrenadParameters, MergeableReader,
};
pub use merge_functions::{
concat_u32s_array, keep_first, keep_latest_obkv, merge_cbo_roaring_bitmaps, merge_obkvs,

View File

@ -27,8 +27,7 @@ pub use self::enrich::{
pub use self::helpers::{
as_cloneable_grenad, create_sorter, create_writer, fst_stream_into_hashset,
fst_stream_into_vec, merge_cbo_roaring_bitmaps, merge_roaring_bitmaps,
sorter_into_lmdb_database, valid_lmdb_key, write_into_lmdb_database, writer_into_reader,
ClonableMmap, MergeFn,
sorter_into_lmdb_database, valid_lmdb_key, writer_into_reader, ClonableMmap, MergeFn,
};
use self::helpers::{grenad_obkv_into_chunks, GrenadParameters};
pub use self::transform::{Transform, TransformOutput};

View File

@ -2,7 +2,7 @@ pub use self::available_documents_ids::AvailableDocumentsIds;
pub use self::clear_documents::ClearDocuments;
pub use self::delete_documents::{DeleteDocuments, DocumentDeletionResult};
pub use self::facet::bulk::FacetsUpdateBulk;
pub use self::facet::incremental::FacetsUpdateIncremental;
pub use self::facet::incremental::FacetsUpdateIncrementalInner;
pub use self::index_documents::{
DocumentAdditionResult, DocumentId, IndexDocuments, IndexDocumentsConfig, IndexDocumentsMethod,
};