Compare commits

...

5 Commits

3 changed files with 106 additions and 112 deletions

View File

@ -61,6 +61,7 @@ impl FacetsUpdateIncremental {
}
}
#[logging_timer::time("FacetsUpdateIncremental::{}")]
pub fn execute(self, wtxn: &mut RwTxn) -> crate::Result<()> {
let mut cursor = self.delta_data.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? {

View File

@ -76,26 +76,18 @@ pub const FACET_MAX_GROUP_SIZE: u8 = 8;
pub const FACET_GROUP_SIZE: u8 = 4;
pub const FACET_MIN_LEVEL_SIZE: u8 = 5;
use std::collections::BTreeSet;
use std::fs::File;
use std::io::BufReader;
use std::iter::FromIterator;
use charabia::normalizer::{Normalize, NormalizerOption};
use grenad::{CompressionType, SortAlgorithm};
use heed::types::{Bytes, DecodeIgnore, SerdeJson};
use heed::BytesEncode;
use log::debug;
use time::OffsetDateTime;
use self::incremental::FacetsUpdateIncremental;
use super::FacetsUpdateBulk;
use crate::facet::FacetType;
use crate::heed_codec::facet::{FacetGroupKey, FacetGroupKeyCodec, FacetGroupValueCodec};
use crate::heed_codec::facet::{FacetGroupKeyCodec, FacetGroupValueCodec};
use crate::heed_codec::BytesRefCodec;
use crate::update::index_documents::create_sorter;
use crate::update::merge_btreeset_string;
use crate::{BEU16StrCodec, Index, Result, MAX_FACET_VALUE_LENGTH};
use crate::{Index, Result};
pub mod bulk;
pub mod incremental;
@ -146,115 +138,114 @@ impl<'i> FacetsUpdate<'i> {
self.index.set_updated_at(wtxn, &OffsetDateTime::now_utc())?;
// See self::comparison_bench::benchmark_facet_indexing
if self.delta_data.len() >= (self.database.len(wtxn)? / 50) {
let field_ids =
self.index.faceted_fields_ids(wtxn)?.iter().copied().collect::<Vec<_>>();
let bulk_update = FacetsUpdateBulk::new(
self.index,
field_ids,
self.facet_type,
self.delta_data,
self.group_size,
self.min_level_size,
);
bulk_update.execute(wtxn)?;
} else {
let incremental_update = FacetsUpdateIncremental::new(
self.index,
self.facet_type,
self.delta_data,
self.group_size,
self.min_level_size,
self.max_group_size,
);
incremental_update.execute(wtxn)?;
}
// We clear the list of normalized-for-search facets
// and the previous FSTs to compute everything from scratch
self.index.facet_id_normalized_string_strings.clear(wtxn)?;
self.index.facet_id_string_fst.clear(wtxn)?;
// As we can't use the same write transaction to read and write in two different databases
// we must create a temporary sorter that we will write into LMDB afterward.
// As multiple unnormalized facet values can become the same normalized facet value
// we must merge them together.
let mut sorter = create_sorter(
SortAlgorithm::Unstable,
merge_btreeset_string,
CompressionType::None,
None,
None,
None,
// if self.delta_data.len() >= (self.database.len(wtxn)? / 50) {
let field_ids = self.index.faceted_fields_ids(wtxn)?.iter().copied().collect::<Vec<_>>();
let bulk_update = FacetsUpdateBulk::new(
self.index,
field_ids,
self.facet_type,
self.delta_data,
self.group_size,
self.min_level_size,
);
bulk_update.execute(wtxn)?;
// } else {
// let incremental_update = FacetsUpdateIncremental::new(
// self.index,
// self.facet_type,
// self.delta_data,
// self.group_size,
// self.min_level_size,
// self.max_group_size,
// );
// incremental_update.execute(wtxn)?;
// }
// We iterate on the list of original, semi-normalized, facet values
// and normalize them for search, inserting them in LMDB in any given order.
let options = NormalizerOption { lossy: true, ..Default::default() };
let database = self.index.facet_id_string_docids.remap_data_type::<DecodeIgnore>();
for result in database.iter(wtxn)? {
let (facet_group_key, ()) = result?;
if let FacetGroupKey { field_id, level: 0, left_bound } = facet_group_key {
let mut normalized_facet = left_bound.normalize(&options);
let normalized_truncated_facet: String;
if normalized_facet.len() > MAX_FACET_VALUE_LENGTH {
normalized_truncated_facet = normalized_facet
.char_indices()
.take_while(|(idx, _)| *idx < MAX_FACET_VALUE_LENGTH)
.map(|(_, c)| c)
.collect();
normalized_facet = normalized_truncated_facet.into();
}
let set = BTreeSet::from_iter(std::iter::once(left_bound));
let key = (field_id, normalized_facet.as_ref());
let key = BEU16StrCodec::bytes_encode(&key).map_err(heed::Error::Encoding)?;
let val = SerdeJson::bytes_encode(&set).map_err(heed::Error::Encoding)?;
sorter.insert(key, val)?;
}
}
// // We clear the list of normalized-for-search facets
// // and the previous FSTs to compute everything from scratch
// self.index.facet_id_normalized_string_strings.clear(wtxn)?;
// self.index.facet_id_string_fst.clear(wtxn)?;
// In this loop we don't need to take care of merging bitmaps
// as the grenad sorter already merged them for us.
let mut merger_iter = sorter.into_stream_merger_iter()?;
while let Some((key_bytes, btreeset_bytes)) = merger_iter.next()? {
self.index.facet_id_normalized_string_strings.remap_types::<Bytes, Bytes>().put(
wtxn,
key_bytes,
btreeset_bytes,
)?;
}
// // As we can't use the same write transaction to read and write in two different databases
// // we must create a temporary sorter that we will write into LMDB afterward.
// // As multiple unnormalized facet values can become the same normalized facet value
// // we must merge them together.
// let mut sorter = create_sorter(
// SortAlgorithm::Unstable,
// merge_btreeset_string,
// CompressionType::None,
// None,
// None,
// None,
// );
// We compute one FST by string facet
let mut text_fsts = vec![];
let mut current_fst: Option<(u16, fst::SetBuilder<Vec<u8>>)> = None;
let database =
self.index.facet_id_normalized_string_strings.remap_data_type::<DecodeIgnore>();
for result in database.iter(wtxn)? {
let ((field_id, normalized_facet), _) = result?;
current_fst = match current_fst.take() {
Some((fid, fst_builder)) if fid != field_id => {
let fst = fst_builder.into_set();
text_fsts.push((fid, fst));
Some((field_id, fst::SetBuilder::memory()))
}
Some((field_id, fst_builder)) => Some((field_id, fst_builder)),
None => Some((field_id, fst::SetBuilder::memory())),
};
// // We iterate on the list of original, semi-normalized, facet values
// // and normalize them for search, inserting them in LMDB in any given order.
// let options = NormalizerOption { lossy: true, ..Default::default() };
// let database = self.index.facet_id_string_docids.remap_data_type::<DecodeIgnore>();
// for result in database.iter(wtxn)? {
// let (facet_group_key, ()) = result?;
// if let FacetGroupKey { field_id, level: 0, left_bound } = facet_group_key {
// let mut normalized_facet = left_bound.normalize(&options);
// let normalized_truncated_facet: String;
// if normalized_facet.len() > MAX_FACET_VALUE_LENGTH {
// normalized_truncated_facet = normalized_facet
// .char_indices()
// .take_while(|(idx, _)| *idx < MAX_FACET_VALUE_LENGTH)
// .map(|(_, c)| c)
// .collect();
// normalized_facet = normalized_truncated_facet.into();
// }
// let set = BTreeSet::from_iter(std::iter::once(left_bound));
// let key = (field_id, normalized_facet.as_ref());
// let key = BEU16StrCodec::bytes_encode(&key).map_err(heed::Error::Encoding)?;
// let val = SerdeJson::bytes_encode(&set).map_err(heed::Error::Encoding)?;
// sorter.insert(key, val)?;
// }
// }
if let Some((_, fst_builder)) = current_fst.as_mut() {
fst_builder.insert(normalized_facet)?;
}
}
// // In this loop we don't need to take care of merging bitmaps
// // as the grenad sorter already merged them for us.
// let mut merger_iter = sorter.into_stream_merger_iter()?;
// while let Some((key_bytes, btreeset_bytes)) = merger_iter.next()? {
// self.index.facet_id_normalized_string_strings.remap_types::<Bytes, Bytes>().put(
// wtxn,
// key_bytes,
// btreeset_bytes,
// )?;
// }
if let Some((field_id, fst_builder)) = current_fst {
let fst = fst_builder.into_set();
text_fsts.push((field_id, fst));
}
// // We compute one FST by string facet
// let mut text_fsts = vec![];
// let mut current_fst: Option<(u16, fst::SetBuilder<Vec<u8>>)> = None;
// let database =
// self.index.facet_id_normalized_string_strings.remap_data_type::<DecodeIgnore>();
// for result in database.iter(wtxn)? {
// let ((field_id, normalized_facet), _) = result?;
// current_fst = match current_fst.take() {
// Some((fid, fst_builder)) if fid != field_id => {
// let fst = fst_builder.into_set();
// text_fsts.push((fid, fst));
// Some((field_id, fst::SetBuilder::memory()))
// }
// Some((field_id, fst_builder)) => Some((field_id, fst_builder)),
// None => Some((field_id, fst::SetBuilder::memory())),
// };
// We write those FSTs in LMDB now
for (field_id, fst) in text_fsts {
self.index.facet_id_string_fst.put(wtxn, &field_id, &fst)?;
}
// if let Some((_, fst_builder)) = current_fst.as_mut() {
// fst_builder.insert(normalized_facet)?;
// }
// }
// if let Some((field_id, fst_builder)) = current_fst {
// let fst = fst_builder.into_set();
// text_fsts.push((field_id, fst));
// }
// // We write those FSTs in LMDB now
// for (field_id, fst) in text_fsts {
// self.index.facet_id_string_fst.put(wtxn, &field_id, &fst)?;
// }
Ok(())
}

View File

@ -123,6 +123,8 @@ pub(crate) fn write_typed_chunk_into_index(
) -> Result<(RoaringBitmap, bool)> {
puffin::profile_function!(typed_chunk.to_debug_string());
log::debug!("Received a chunk to process: {}", typed_chunk.to_debug_string());
let mut is_merged_database = false;
match typed_chunk {
TypedChunk::Documents(obkv_documents_iter) => {