Compare commits

...

23 Commits

Author SHA1 Message Date
4d92df1b95 Disable sled logging 2024-07-22 14:38:33 +02:00
ca332883cc Use an LRU only with 1000 entries 2024-07-22 13:39:20 +02:00
6264dbf326 Increase the ARCache to 500 entries 2024-07-21 23:08:11 +02:00
2b7b18fb5f Fix one and for all the ARC cache 2024-07-21 22:32:14 +02:00
b03ec3f603 Another algorithm for the ARC cache 2024-07-21 18:56:50 +02:00
91ec0bdaf4 Replace the SmallVec by a Vec 2024-07-21 18:22:50 +02:00
56329633d5 Implement a really adaptative cache 2024-07-21 17:46:51 +02:00
507bce791b Make sure all keys are prefixed 2024-07-21 15:40:32 +02:00
7adc715783 Also count the direct_inserts 2024-07-20 17:15:33 +02:00
f355cf6985 Use sled to count the write insertions 2024-07-20 11:16:57 +02:00
2603d8d0d0 Add a way to disable the cache 2024-07-20 10:45:16 +02:00
9b3d303b08 First ArcCache version 2024-07-20 10:45:15 +02:00
16b4545d23 Prefix Redis keys to avoid false negative excessive writes 2024-07-20 10:45:15 +02:00
0e08906fcb Use the sorter cache when collection prefix docids 2024-07-20 10:45:15 +02:00
a3beaa90c5 Use the sorter cache when extracting the facet number docids 2024-07-20 10:45:15 +02:00
02fff51902 Use the sorter cache when extracting the facet string docids 2024-07-20 10:45:15 +02:00
54e2e2aa4a Use the sorter cache when extracting the word counts 2024-07-20 10:45:14 +02:00
092a383419 Use the sorter cache when extracting the word pair proximity docids 2024-07-20 10:45:14 +02:00
98d55e0d4d Use the sorter cache when extracting the word position docids 2024-07-20 10:45:14 +02:00
8319552e7d Use the sorter cache in the word docids extractor 2024-07-20 10:45:14 +02:00
5d5769fd8a Introduce a new Sorter Cache for CboRoaringBitmaps 2024-07-20 10:45:14 +02:00
eafc097a85 Measure much more places where we insert in sorters 2024-07-20 10:45:13 +02:00
f17cb2ef5b Use Redis to measure the Sorter insertions 2024-07-20 10:45:13 +02:00
13 changed files with 447 additions and 151 deletions

10
Cargo.lock generated
View File

@ -3280,6 +3280,15 @@ version = "0.4.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c"
[[package]]
name = "lru"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3262e75e648fce39813cb56ac41f3c3e3f65217ebf3844d818d1f9398cfb0dc"
dependencies = [
"hashbrown 0.14.3",
]
[[package]]
name = "lzma-rs"
version = "0.3.0"
@ -3540,6 +3549,7 @@ dependencies = [
"json-depth-checker",
"levenshtein_automata",
"liquid",
"lru",
"maplit",
"md5",
"meili-snap",

View File

@ -50,7 +50,7 @@ serde = { version = "1.0.204", features = ["derive"] }
serde_json = { version = "1.0.120", features = ["preserve_order"] }
slice-group-by = "0.3.1"
smallstr = { version = "0.3.0", features = ["serde"] }
smallvec = "1.13.2"
smallvec = { version = "1.13.2", features = ["union"] }
smartstring = "1.0.1"
tempfile = "3.10.1"
thiserror = "1.0.61"
@ -86,6 +86,7 @@ tracing = "0.1.40"
ureq = { version = "2.10.0", features = ["json"] }
url = "2.5.2"
rayon-par-bridge = "0.1.0"
lru = "0.12.3"
[dev-dependencies]
mimalloc = { version = "0.1.43", default-features = false }

View File

@ -0,0 +1,223 @@
use std::borrow::Cow;
use std::mem;
use std::num::NonZeroUsize;
use lru::LruCache;
use roaring::RoaringBitmap;
use smallvec::SmallVec;
use crate::update::del_add::{DelAdd, KvWriterDelAdd};
use crate::CboRoaringBitmapCodec;
const ENABLED: bool = true;
pub struct SorterCacheDelAddCboRoaringBitmap<const N: usize, MF> {
cache: LruCache<SmallVec<[u8; N]>, DelAddRoaringBitmap>,
sorter: grenad::Sorter<MF>,
deladd_buffer: Vec<u8>,
cbo_buffer: Vec<u8>,
}
impl<const N: usize, MF> SorterCacheDelAddCboRoaringBitmap<N, MF> {
pub fn new(cap: NonZeroUsize, sorter: grenad::Sorter<MF>) -> Self {
SorterCacheDelAddCboRoaringBitmap {
cache: LruCache::new(cap),
sorter,
deladd_buffer: Vec::new(),
cbo_buffer: Vec::new(),
}
}
}
impl<const N: usize, MF, U> SorterCacheDelAddCboRoaringBitmap<N, MF>
where
MF: for<'a> Fn(&[u8], &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>, U>,
{
pub fn insert_del_u32(&mut self, key: &[u8], n: u32) -> Result<(), grenad::Error<U>> {
if !ENABLED {
return self.write_entry_to_sorter(key, DelAddRoaringBitmap::new_del_u32(n));
}
let cache = self.cache.get_mut(key);
match cache {
Some(DelAddRoaringBitmap { del, add: _ }) => {
del.get_or_insert_with(RoaringBitmap::new).insert(n);
}
None => {
let value = DelAddRoaringBitmap::new_del_u32(n);
if let Some((key, deladd)) = self.cache.push(key.into(), value) {
self.write_entry_to_sorter(key, deladd)?;
}
}
}
Ok(())
}
pub fn insert_del(
&mut self,
key: &[u8],
bitmap: RoaringBitmap,
) -> Result<(), grenad::Error<U>> {
if !ENABLED {
return self.write_entry_to_sorter(key, DelAddRoaringBitmap::new_del(bitmap));
}
let cache = self.cache.get_mut(key);
match cache {
Some(DelAddRoaringBitmap { del, add: _ }) => {
*del.get_or_insert_with(RoaringBitmap::new) |= bitmap;
}
None => {
let value = DelAddRoaringBitmap::new_del(bitmap);
if let Some((key, deladd)) = self.cache.push(key.into(), value) {
self.write_entry_to_sorter(key, deladd)?;
}
}
}
Ok(())
}
pub fn insert_add_u32(&mut self, key: &[u8], n: u32) -> Result<(), grenad::Error<U>> {
if !ENABLED {
return self.write_entry_to_sorter(key, DelAddRoaringBitmap::new_add_u32(n));
}
let cache = self.cache.get_mut(key);
match cache {
Some(DelAddRoaringBitmap { del: _, add }) => {
add.get_or_insert_with(RoaringBitmap::new).insert(n);
}
None => {
let value = DelAddRoaringBitmap::new_add_u32(n);
if let Some((key, deladd)) = self.cache.push(key.into(), value) {
self.write_entry_to_sorter(key, deladd)?;
}
}
}
Ok(())
}
pub fn insert_add(
&mut self,
key: &[u8],
bitmap: RoaringBitmap,
) -> Result<(), grenad::Error<U>> {
if !ENABLED {
return self.write_entry_to_sorter(key, DelAddRoaringBitmap::new_add(bitmap));
}
let cache = self.cache.get_mut(key);
match cache {
Some(DelAddRoaringBitmap { del: _, add }) => {
*add.get_or_insert_with(RoaringBitmap::new) |= bitmap;
}
None => {
let value = DelAddRoaringBitmap::new_add(bitmap);
if let Some((key, deladd)) = self.cache.push(key.into(), value) {
self.write_entry_to_sorter(key, deladd)?;
}
}
}
Ok(())
}
pub fn insert_del_add_u32(&mut self, key: &[u8], n: u32) -> Result<(), grenad::Error<U>> {
if !ENABLED {
return self.write_entry_to_sorter(key, DelAddRoaringBitmap::new_del_add_u32(n));
}
let cache = self.cache.get_mut(key);
match cache {
Some(DelAddRoaringBitmap { del, add }) => {
del.get_or_insert_with(RoaringBitmap::new).insert(n);
add.get_or_insert_with(RoaringBitmap::new).insert(n);
}
None => {
let value = DelAddRoaringBitmap::new_del_add_u32(n);
if let Some((key, deladd)) = self.cache.push(key.into(), value) {
self.write_entry_to_sorter(key, deladd)?;
}
}
}
Ok(())
}
fn write_entry_to_sorter<A: AsRef<[u8]>>(
&mut self,
key: A,
deladd: DelAddRoaringBitmap,
) -> Result<(), grenad::Error<U>> {
self.deladd_buffer.clear();
let mut value_writer = KvWriterDelAdd::new(&mut self.deladd_buffer);
match deladd {
DelAddRoaringBitmap { del: Some(del), add: None } => {
self.cbo_buffer.clear();
CboRoaringBitmapCodec::serialize_into(&del, &mut self.cbo_buffer);
value_writer.insert(DelAdd::Deletion, &self.cbo_buffer)?;
}
DelAddRoaringBitmap { del: None, add: Some(add) } => {
self.cbo_buffer.clear();
CboRoaringBitmapCodec::serialize_into(&add, &mut self.cbo_buffer);
value_writer.insert(DelAdd::Addition, &self.cbo_buffer)?;
}
DelAddRoaringBitmap { del: Some(del), add: Some(add) } => {
self.cbo_buffer.clear();
CboRoaringBitmapCodec::serialize_into(&del, &mut self.cbo_buffer);
value_writer.insert(DelAdd::Deletion, &self.cbo_buffer)?;
self.cbo_buffer.clear();
CboRoaringBitmapCodec::serialize_into(&add, &mut self.cbo_buffer);
value_writer.insert(DelAdd::Addition, &self.cbo_buffer)?;
}
DelAddRoaringBitmap { del: None, add: None } => return Ok(()),
}
self.sorter.insert(key, value_writer.into_inner().unwrap())
}
pub fn direct_insert(&mut self, key: &[u8], val: &[u8]) -> Result<(), grenad::Error<U>> {
self.sorter.insert(key, val)
}
pub fn into_sorter(mut self) -> Result<grenad::Sorter<MF>, grenad::Error<U>> {
let default_arc = LruCache::new(NonZeroUsize::MIN);
for (key, deladd) in mem::replace(&mut self.cache, default_arc) {
self.write_entry_to_sorter(key, deladd)?;
}
Ok(self.sorter)
}
}
pub struct DelAddRoaringBitmap {
pub del: Option<RoaringBitmap>,
pub add: Option<RoaringBitmap>,
}
impl DelAddRoaringBitmap {
fn new_del_add_u32(n: u32) -> Self {
DelAddRoaringBitmap {
del: Some(RoaringBitmap::from([n])),
add: Some(RoaringBitmap::from([n])),
}
}
fn new_del(bitmap: RoaringBitmap) -> Self {
DelAddRoaringBitmap { del: Some(bitmap), add: None }
}
fn new_del_u32(n: u32) -> Self {
DelAddRoaringBitmap { del: Some(RoaringBitmap::from([n])), add: None }
}
fn new_add(bitmap: RoaringBitmap) -> Self {
DelAddRoaringBitmap { del: None, add: Some(bitmap) }
}
fn new_add_u32(n: u32) -> Self {
DelAddRoaringBitmap { del: None, add: Some(RoaringBitmap::from([n])) }
}
}

View File

@ -148,6 +148,9 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
for (field_id, value) in obkv.iter() {
key_buffer.truncate(mem::size_of::<u32>());
key_buffer.extend_from_slice(&field_id.to_be_bytes());
let mut key = b"dwp".to_vec();
key.extend_from_slice(&key_buffer);
// conn.merge(key, 1u32.to_ne_bytes()).unwrap();
docid_word_positions_sorter.insert(&key_buffer, value)?;
}

View File

@ -1,5 +1,6 @@
use std::fs::File;
use std::io::{self, BufReader};
use std::num::NonZeroUsize;
use heed::{BytesDecode, BytesEncode};
@ -9,8 +10,10 @@ use super::helpers::{
use crate::heed_codec::facet::{
FacetGroupKey, FacetGroupKeyCodec, FieldDocIdFacetF64Codec, OrderedF64Codec,
};
use crate::update::del_add::{KvReaderDelAdd, KvWriterDelAdd};
use crate::update::del_add::{DelAdd, KvReaderDelAdd};
use crate::update::index_documents::cache::SorterCacheDelAddCboRoaringBitmap;
use crate::update::settings::InnerIndexSettingsDiff;
use crate::update::MergeFn;
use crate::Result;
/// Extracts the facet number and the documents ids where this facet number appear.
@ -25,7 +28,7 @@ pub fn extract_facet_number_docids<R: io::Read + io::Seek>(
) -> Result<grenad::Reader<BufReader<File>>> {
let max_memory = indexer.max_memory_by_thread();
let mut facet_number_docids_sorter = create_sorter(
let facet_number_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable,
merge_deladd_cbo_roaring_bitmaps,
indexer.chunk_compression_type,
@ -33,8 +36,12 @@ pub fn extract_facet_number_docids<R: io::Read + io::Seek>(
indexer.max_nb_chunks,
max_memory,
);
let mut cached_facet_number_docids_sorter =
SorterCacheDelAddCboRoaringBitmap::<20, MergeFn>::new(
NonZeroUsize::new(1000).unwrap(),
facet_number_docids_sorter,
);
let mut buffer = Vec::new();
let mut cursor = fid_docid_facet_number.into_cursor()?;
while let Some((key_bytes, deladd_obkv_bytes)) = cursor.move_on_next()? {
let (field_id, document_id, number) =
@ -42,16 +49,17 @@ pub fn extract_facet_number_docids<R: io::Read + io::Seek>(
let key = FacetGroupKey { field_id, level: 0, left_bound: number };
let key_bytes = FacetGroupKeyCodec::<OrderedF64Codec>::bytes_encode(&key).unwrap();
buffer.clear();
let mut obkv = KvWriterDelAdd::new(&mut buffer);
for (deladd_key, _) in KvReaderDelAdd::new(deladd_obkv_bytes).iter() {
obkv.insert(deladd_key, document_id.to_ne_bytes())?;
match deladd_key {
DelAdd::Deletion => {
cached_facet_number_docids_sorter.insert_del_u32(&key_bytes, document_id)?
}
DelAdd::Addition => {
cached_facet_number_docids_sorter.insert_add_u32(&key_bytes, document_id)?
}
}
}
obkv.finish()?;
facet_number_docids_sorter.insert(key_bytes, &buffer)?;
}
sorter_into_reader(facet_number_docids_sorter, indexer)
sorter_into_reader(cached_facet_number_docids_sorter.into_sorter()?, indexer)
}

View File

@ -2,6 +2,7 @@ use std::collections::BTreeSet;
use std::fs::File;
use std::io::BufReader;
use std::iter::FromIterator;
use std::num::NonZeroUsize;
use std::{io, str};
use charabia::normalizer::{Normalize, NormalizerOption};
@ -12,10 +13,12 @@ use super::helpers::{create_sorter, sorter_into_reader, try_split_array_at, Gren
use crate::heed_codec::facet::{FacetGroupKey, FacetGroupKeyCodec};
use crate::heed_codec::{BEU16StrCodec, StrRefCodec};
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
use crate::update::index_documents::cache::SorterCacheDelAddCboRoaringBitmap;
use crate::update::index_documents::helpers::{
merge_deladd_btreeset_string, merge_deladd_cbo_roaring_bitmaps,
};
use crate::update::settings::InnerIndexSettingsDiff;
use crate::update::MergeFn;
use crate::{FieldId, Result, MAX_FACET_VALUE_LENGTH};
/// Extracts the facet string and the documents ids where this facet string appear.
@ -31,7 +34,7 @@ pub fn extract_facet_string_docids<R: io::Read + io::Seek>(
let max_memory = indexer.max_memory_by_thread();
let options = NormalizerOption { lossy: true, ..Default::default() };
let mut facet_string_docids_sorter = create_sorter(
let facet_string_docids_sorter = create_sorter(
grenad::SortAlgorithm::Stable,
merge_deladd_cbo_roaring_bitmaps,
indexer.chunk_compression_type,
@ -39,6 +42,11 @@ pub fn extract_facet_string_docids<R: io::Read + io::Seek>(
indexer.max_nb_chunks,
max_memory.map(|m| m / 2),
);
let mut cached_facet_string_docids_sorter =
SorterCacheDelAddCboRoaringBitmap::<20, MergeFn>::new(
NonZeroUsize::new(1000).unwrap(),
facet_string_docids_sorter,
);
let mut normalized_facet_string_docids_sorter = create_sorter(
grenad::SortAlgorithm::Stable,
@ -94,21 +102,27 @@ pub fn extract_facet_string_docids<R: io::Read + io::Seek>(
let key = (field_id, hyper_normalized_value.as_ref());
let key_bytes = BEU16StrCodec::bytes_encode(&key).map_err(heed::Error::Encoding)?;
let mut key = b"nfs".to_vec();
key.extend_from_slice(&key_bytes);
// conn.merge(key, 1u32.to_ne_bytes()).unwrap();
normalized_facet_string_docids_sorter.insert(key_bytes, &buffer)?;
}
let key = FacetGroupKey { field_id, level: 0, left_bound: normalized_value };
let key_bytes = FacetGroupKeyCodec::<StrRefCodec>::bytes_encode(&key).unwrap();
buffer.clear();
let mut obkv = KvWriterDelAdd::new(&mut buffer);
for (deladd_key, _) in deladd_reader.iter() {
obkv.insert(deladd_key, document_id.to_ne_bytes())?;
match deladd_key {
DelAdd::Deletion => {
cached_facet_string_docids_sorter.insert_del_u32(&key_bytes, document_id)?
}
DelAdd::Addition => {
cached_facet_string_docids_sorter.insert_add_u32(&key_bytes, document_id)?
}
}
}
obkv.finish()?;
facet_string_docids_sorter.insert(&key_bytes, &buffer)?;
}
let normalized = sorter_into_reader(normalized_facet_string_docids_sorter, indexer)?;
sorter_into_reader(facet_string_docids_sorter, indexer).map(|s| (s, normalized))
sorter_into_reader(cached_facet_string_docids_sorter.into_sorter()?, indexer)
.map(|s| (s, normalized))
}

View File

@ -1,5 +1,6 @@
use std::fs::File;
use std::io::{self, BufReader};
use std::num::NonZeroUsize;
use obkv::KvReaderU16;
@ -9,8 +10,10 @@ use super::helpers::{
};
use crate::error::SerializationError;
use crate::index::db_name::DOCID_WORD_POSITIONS;
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
use crate::update::del_add::{DelAdd, KvReaderDelAdd};
use crate::update::index_documents::cache::SorterCacheDelAddCboRoaringBitmap;
use crate::update::settings::InnerIndexSettingsDiff;
use crate::update::MergeFn;
use crate::Result;
const MAX_COUNTED_WORDS: usize = 30;
@ -28,7 +31,7 @@ pub fn extract_fid_word_count_docids<R: io::Read + io::Seek>(
) -> Result<grenad::Reader<BufReader<File>>> {
let max_memory = indexer.max_memory_by_thread();
let mut fid_word_count_docids_sorter = create_sorter(
let fid_word_count_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable,
merge_deladd_cbo_roaring_bitmaps,
indexer.chunk_compression_type,
@ -36,9 +39,13 @@ pub fn extract_fid_word_count_docids<R: io::Read + io::Seek>(
indexer.max_nb_chunks,
max_memory,
);
let mut cached_fid_word_count_docids_sorter =
SorterCacheDelAddCboRoaringBitmap::<20, MergeFn>::new(
NonZeroUsize::new(1000).unwrap(),
fid_word_count_docids_sorter,
);
let mut key_buffer = Vec::new();
let mut value_buffer = Vec::new();
let mut cursor = docid_word_positions.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? {
let (document_id_bytes, fid_bytes) = try_split_array_at(key)
@ -64,28 +71,20 @@ pub fn extract_fid_word_count_docids<R: io::Read + io::Seek>(
if deletion != addition {
// Insert deleted word count in sorter if exist.
if let Some(word_count) = deletion {
value_buffer.clear();
let mut value_writer = KvWriterDelAdd::new(&mut value_buffer);
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap();
key_buffer.clear();
key_buffer.extend_from_slice(fid_bytes);
key_buffer.push(word_count as u8);
fid_word_count_docids_sorter
.insert(&key_buffer, value_writer.into_inner().unwrap())?;
cached_fid_word_count_docids_sorter.insert_del_u32(&key_buffer, document_id)?;
}
// Insert added word count in sorter if exist.
if let Some(word_count) = addition {
value_buffer.clear();
let mut value_writer = KvWriterDelAdd::new(&mut value_buffer);
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
key_buffer.clear();
key_buffer.extend_from_slice(fid_bytes);
key_buffer.push(word_count as u8);
fid_word_count_docids_sorter
.insert(&key_buffer, value_writer.into_inner().unwrap())?;
cached_fid_word_count_docids_sorter.insert_add_u32(&key_buffer, document_id)?;
}
}
}
sorter_into_reader(fid_word_count_docids_sorter, indexer)
sorter_into_reader(cached_fid_word_count_docids_sorter.into_sorter()?, indexer)
}

View File

@ -1,6 +1,7 @@
use std::collections::BTreeSet;
use std::fs::File;
use std::io::{self, BufReader};
use std::num::NonZeroUsize;
use heed::{BytesDecode, BytesEncode};
use obkv::KvReaderU16;
@ -14,6 +15,7 @@ use crate::error::SerializationError;
use crate::heed_codec::StrBEU16Codec;
use crate::index::db_name::DOCID_WORD_POSITIONS;
use crate::update::del_add::{is_noop_del_add_obkv, DelAdd, KvReaderDelAdd, KvWriterDelAdd};
use crate::update::index_documents::cache::SorterCacheDelAddCboRoaringBitmap;
use crate::update::index_documents::helpers::sorter_into_reader;
use crate::update::settings::InnerIndexSettingsDiff;
use crate::update::MergeFn;
@ -38,7 +40,7 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
)> {
let max_memory = indexer.max_memory_by_thread();
let mut word_fid_docids_sorter = create_sorter(
let word_fid_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable,
merge_deladd_cbo_roaring_bitmaps,
indexer.chunk_compression_type,
@ -46,6 +48,11 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
indexer.max_nb_chunks,
max_memory.map(|m| m / 3),
);
let mut cached_word_fid_docids_sorter = SorterCacheDelAddCboRoaringBitmap::<20, _>::new(
NonZeroUsize::new(1000).unwrap(),
word_fid_docids_sorter,
);
let mut key_buffer = Vec::new();
let mut del_words = BTreeSet::new();
let mut add_words = BTreeSet::new();
@ -79,7 +86,7 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
&mut key_buffer,
&del_words,
&add_words,
&mut word_fid_docids_sorter,
&mut cached_word_fid_docids_sorter,
)?;
del_words.clear();
@ -92,7 +99,7 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
tempfile::tempfile()?,
);
let mut word_docids_sorter = create_sorter(
let word_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable,
merge_deladd_cbo_roaring_bitmaps,
indexer.chunk_compression_type,
@ -100,8 +107,12 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
indexer.max_nb_chunks,
max_memory.map(|m| m / 3),
);
let mut cached_word_docids_sorter = SorterCacheDelAddCboRoaringBitmap::<20, MergeFn>::new(
NonZeroUsize::new(1000).unwrap(),
word_docids_sorter,
);
let mut exact_word_docids_sorter = create_sorter(
let exact_word_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable,
merge_deladd_cbo_roaring_bitmaps,
indexer.chunk_compression_type,
@ -109,9 +120,12 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
indexer.max_nb_chunks,
max_memory.map(|m| m / 3),
);
let mut cached_exact_word_docids_sorter = SorterCacheDelAddCboRoaringBitmap::<20, MergeFn>::new(
NonZeroUsize::new(1000).unwrap(),
exact_word_docids_sorter,
);
let mut iter = word_fid_docids_sorter.into_stream_merger_iter()?;
let mut buffer = Vec::new();
let mut iter = cached_word_fid_docids_sorter.into_sorter()?.into_stream_merger_iter()?;
// NOTE: replacing sorters by bitmap merging is less efficient, so, use sorters.
while let Some((key, value)) = iter.next()? {
// only keep the value if their is a change to apply in the DB.
@ -124,34 +138,31 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
// merge all deletions
let obkv = KvReaderDelAdd::new(value);
let w = w.as_bytes();
if let Some(value) = obkv.get(DelAdd::Deletion) {
let delete_from_exact = settings_diff.old.exact_attributes.contains(&fid);
buffer.clear();
let mut obkv = KvWriterDelAdd::new(&mut buffer);
obkv.insert(DelAdd::Deletion, value)?;
let bitmap = CboRoaringBitmapCodec::deserialize_from(value)?;
if delete_from_exact {
exact_word_docids_sorter.insert(w, obkv.into_inner().unwrap())?;
cached_exact_word_docids_sorter.insert_del(w, bitmap)?;
} else {
word_docids_sorter.insert(w, obkv.into_inner().unwrap())?;
cached_word_docids_sorter.insert_del(w, bitmap)?;
}
}
// merge all additions
if let Some(value) = obkv.get(DelAdd::Addition) {
let add_in_exact = settings_diff.new.exact_attributes.contains(&fid);
buffer.clear();
let mut obkv = KvWriterDelAdd::new(&mut buffer);
obkv.insert(DelAdd::Addition, value)?;
let bitmap = CboRoaringBitmapCodec::deserialize_from(value)?;
if add_in_exact {
exact_word_docids_sorter.insert(w, obkv.into_inner().unwrap())?;
cached_exact_word_docids_sorter.insert_add(w, bitmap)?;
} else {
word_docids_sorter.insert(w, obkv.into_inner().unwrap())?;
cached_word_docids_sorter.insert_add(w, bitmap)?;
}
}
}
Ok((
sorter_into_reader(word_docids_sorter, indexer)?,
sorter_into_reader(exact_word_docids_sorter, indexer)?,
sorter_into_reader(cached_word_docids_sorter.into_sorter()?, indexer)?,
sorter_into_reader(cached_exact_word_docids_sorter.into_sorter()?, indexer)?,
writer_into_reader(word_fid_docids_writer)?,
))
}
@ -163,41 +174,40 @@ fn words_into_sorter(
key_buffer: &mut Vec<u8>,
del_words: &BTreeSet<Vec<u8>>,
add_words: &BTreeSet<Vec<u8>>,
word_fid_docids_sorter: &mut grenad::Sorter<MergeFn>,
cached_word_fid_docids_sorter: &mut SorterCacheDelAddCboRoaringBitmap<20, MergeFn>,
) -> Result<()> {
use itertools::merge_join_by;
use itertools::EitherOrBoth::{Both, Left, Right};
let mut buffer = Vec::new();
for eob in merge_join_by(del_words.iter(), add_words.iter(), |d, a| d.cmp(a)) {
buffer.clear();
let mut value_writer = KvWriterDelAdd::new(&mut buffer);
let word_bytes = match eob {
Left(word_bytes) => {
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap();
word_bytes
}
Right(word_bytes) => {
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
word_bytes
}
Both(word_bytes, _) => {
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap();
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
word_bytes
}
Left(word_bytes) => word_bytes,
Right(word_bytes) => word_bytes,
Both(word_bytes, _) => word_bytes,
};
key_buffer.clear();
key_buffer.extend_from_slice(word_bytes);
key_buffer.push(0);
key_buffer.extend_from_slice(&fid.to_be_bytes());
word_fid_docids_sorter.insert(&key_buffer, value_writer.into_inner().unwrap())?;
match eob {
Left(_) => {
cached_word_fid_docids_sorter.insert_del_u32(key_buffer, document_id)?;
}
Right(_) => {
cached_word_fid_docids_sorter.insert_add_u32(key_buffer, document_id)?;
}
Both(_, _) => {
cached_word_fid_docids_sorter.insert_del_add_u32(key_buffer, document_id)?;
}
}
}
Ok(())
}
// TODO do we still use this?
#[tracing::instrument(level = "trace", skip_all, target = "indexing::extract")]
fn docids_into_writers<W>(
word: &str,

View File

@ -1,6 +1,7 @@
use std::collections::{BTreeMap, VecDeque};
use std::fs::File;
use std::io::BufReader;
use std::num::NonZeroUsize;
use std::{cmp, io};
use obkv::KvReaderU16;
@ -12,7 +13,8 @@ use super::helpers::{
use crate::error::SerializationError;
use crate::index::db_name::DOCID_WORD_POSITIONS;
use crate::proximity::{index_proximity, ProximityPrecision, MAX_DISTANCE};
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
use crate::update::del_add::{DelAdd, KvReaderDelAdd};
use crate::update::index_documents::cache::SorterCacheDelAddCboRoaringBitmap;
use crate::update::settings::InnerIndexSettingsDiff;
use crate::{DocumentId, Result};
@ -40,15 +42,19 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
let any_addition = settings_diff.new.proximity_precision == ProximityPrecision::ByWord;
let max_memory = indexer.max_memory_by_thread();
let mut word_pair_proximity_docids_sorters: Vec<_> = (1..MAX_DISTANCE)
let mut cached_word_pair_proximity_docids_sorters: Vec<_> = (1..MAX_DISTANCE)
.map(|_| {
create_sorter(
let sorter = create_sorter(
grenad::SortAlgorithm::Unstable,
merge_deladd_cbo_roaring_bitmaps,
indexer.chunk_compression_type,
indexer.chunk_compression_level,
indexer.max_nb_chunks,
max_memory.map(|m| m / MAX_DISTANCE as usize),
);
SorterCacheDelAddCboRoaringBitmap::<20, MergeFn>::new(
NonZeroUsize::new(1000).unwrap(),
sorter,
)
})
.collect();
@ -77,7 +83,7 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
current_document_id.unwrap(),
&del_word_pair_proximity,
&add_word_pair_proximity,
&mut word_pair_proximity_docids_sorters,
&mut cached_word_pair_proximity_docids_sorters,
)?;
del_word_pair_proximity.clear();
add_word_pair_proximity.clear();
@ -167,7 +173,7 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
document_id,
&del_word_pair_proximity,
&add_word_pair_proximity,
&mut word_pair_proximity_docids_sorters,
&mut cached_word_pair_proximity_docids_sorters,
)?;
}
{
@ -181,8 +187,8 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
tempfile::tempfile()?,
);
for sorter in word_pair_proximity_docids_sorters {
sorter.write_into_stream_writer(&mut writer)?;
for cached_sorter in cached_word_pair_proximity_docids_sorters {
cached_sorter.into_sorter()?.write_into_stream_writer(&mut writer)?;
}
writer_into_reader(writer)
@ -197,34 +203,24 @@ fn document_word_positions_into_sorter(
document_id: DocumentId,
del_word_pair_proximity: &BTreeMap<(String, String), u8>,
add_word_pair_proximity: &BTreeMap<(String, String), u8>,
word_pair_proximity_docids_sorters: &mut [grenad::Sorter<MergeFn>],
cached_word_pair_proximity_docids_sorters: &mut [SorterCacheDelAddCboRoaringBitmap<
20,
MergeFn,
>],
) -> Result<()> {
use itertools::merge_join_by;
use itertools::EitherOrBoth::{Both, Left, Right};
let mut buffer = Vec::new();
let mut key_buffer = Vec::new();
for eob in
merge_join_by(del_word_pair_proximity.iter(), add_word_pair_proximity.iter(), |d, a| {
d.cmp(a)
})
{
buffer.clear();
let mut value_writer = KvWriterDelAdd::new(&mut buffer);
let ((w1, w2), prox) = match eob {
Left(key_value) => {
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap();
key_value
}
Right(key_value) => {
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
key_value
}
Both(key_value, _) => {
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap();
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
key_value
}
Left(key_value) => key_value,
Right(key_value) => key_value,
Both(key_value, _) => key_value,
};
key_buffer.clear();
@ -233,8 +229,20 @@ fn document_word_positions_into_sorter(
key_buffer.push(0);
key_buffer.extend_from_slice(w2.as_bytes());
word_pair_proximity_docids_sorters[*prox as usize - 1]
.insert(&key_buffer, value_writer.into_inner().unwrap())?;
match eob {
Left(_) => {
cached_word_pair_proximity_docids_sorters[*prox as usize - 1]
.insert_del_u32(&key_buffer, document_id)?;
}
Right(_) => {
cached_word_pair_proximity_docids_sorters[*prox as usize - 1]
.insert_add_u32(&key_buffer, document_id)?;
}
Both(_, _) => {
cached_word_pair_proximity_docids_sorters[*prox as usize - 1]
.insert_del_add_u32(&key_buffer, document_id)?;
}
}
}
Ok(())

View File

@ -1,6 +1,7 @@
use std::collections::BTreeSet;
use std::fs::File;
use std::io::{self, BufReader};
use std::num::NonZeroUsize;
use obkv::KvReaderU16;
@ -10,7 +11,8 @@ use super::helpers::{
};
use crate::error::SerializationError;
use crate::index::db_name::DOCID_WORD_POSITIONS;
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
use crate::update::del_add::{DelAdd, KvReaderDelAdd};
use crate::update::index_documents::cache::SorterCacheDelAddCboRoaringBitmap;
use crate::update::settings::InnerIndexSettingsDiff;
use crate::update::MergeFn;
use crate::{bucketed_position, DocumentId, Result};
@ -27,7 +29,7 @@ pub fn extract_word_position_docids<R: io::Read + io::Seek>(
) -> Result<grenad::Reader<BufReader<File>>> {
let max_memory = indexer.max_memory_by_thread();
let mut word_position_docids_sorter = create_sorter(
let word_position_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable,
merge_deladd_cbo_roaring_bitmaps,
indexer.chunk_compression_type,
@ -35,6 +37,11 @@ pub fn extract_word_position_docids<R: io::Read + io::Seek>(
indexer.max_nb_chunks,
max_memory,
);
let mut cached_word_position_docids_sorter =
SorterCacheDelAddCboRoaringBitmap::<20, MergeFn>::new(
NonZeroUsize::new(1000).unwrap(),
word_position_docids_sorter,
);
let mut del_word_positions: BTreeSet<(u16, Vec<u8>)> = BTreeSet::new();
let mut add_word_positions: BTreeSet<(u16, Vec<u8>)> = BTreeSet::new();
@ -52,7 +59,7 @@ pub fn extract_word_position_docids<R: io::Read + io::Seek>(
&mut key_buffer,
&del_word_positions,
&add_word_positions,
&mut word_position_docids_sorter,
&mut cached_word_position_docids_sorter,
)?;
del_word_positions.clear();
add_word_positions.clear();
@ -84,12 +91,13 @@ pub fn extract_word_position_docids<R: io::Read + io::Seek>(
&mut key_buffer,
&del_word_positions,
&add_word_positions,
&mut word_position_docids_sorter,
&mut cached_word_position_docids_sorter,
)?;
}
// TODO remove noop DelAdd OBKV
let word_position_docids_reader = sorter_into_reader(word_position_docids_sorter, indexer)?;
let word_position_docids_reader =
sorter_into_reader(cached_word_position_docids_sorter.into_sorter()?, indexer)?;
Ok(word_position_docids_reader)
}
@ -100,38 +108,38 @@ fn words_position_into_sorter(
key_buffer: &mut Vec<u8>,
del_word_positions: &BTreeSet<(u16, Vec<u8>)>,
add_word_positions: &BTreeSet<(u16, Vec<u8>)>,
word_position_docids_sorter: &mut grenad::Sorter<MergeFn>,
cached_word_position_docids_sorter: &mut SorterCacheDelAddCboRoaringBitmap<20, MergeFn>,
) -> Result<()> {
use itertools::merge_join_by;
use itertools::EitherOrBoth::{Both, Left, Right};
let mut buffer = Vec::new();
for eob in merge_join_by(del_word_positions.iter(), add_word_positions.iter(), |d, a| d.cmp(a))
{
buffer.clear();
let mut value_writer = KvWriterDelAdd::new(&mut buffer);
let (position, word_bytes) = match eob {
Left(key) => {
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap();
key
}
Right(key) => {
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
key
}
Both(key, _) => {
// both values needs to be kept because it will be used in other extractors.
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap();
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
key
}
Left(key) => key,
Right(key) => key,
Both(key, _) => key,
};
key_buffer.clear();
key_buffer.extend_from_slice(word_bytes);
key_buffer.push(0);
key_buffer.extend_from_slice(&position.to_be_bytes());
word_position_docids_sorter.insert(&key_buffer, value_writer.into_inner().unwrap())?;
match eob {
Left(_) => {
cached_word_position_docids_sorter
.insert_del_u32(key_buffer.as_slice(), document_id)?;
}
Right(_) => {
cached_word_position_docids_sorter
.insert_add_u32(key_buffer.as_slice(), document_id)?;
}
Both(_, _) => {
cached_word_position_docids_sorter
.insert_del_add_u32(key_buffer.as_slice(), document_id)?;
}
}
}
Ok(())

View File

@ -1,3 +1,4 @@
pub mod cache;
mod enrich;
mod extract;
mod helpers;

View File

@ -1,10 +1,12 @@
use std::collections::{HashMap, HashSet};
use std::num::NonZeroUsize;
use grenad::CompressionType;
use heed::types::{Bytes, Str};
use heed::types::Str;
use heed::Database;
use crate::update::del_add::{deladd_serialize_add_side, DelAdd, KvWriterDelAdd};
use super::index_documents::cache::SorterCacheDelAddCboRoaringBitmap;
use crate::update::del_add::deladd_serialize_add_side;
use crate::update::index_documents::{
create_sorter, merge_deladd_cbo_roaring_bitmaps,
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, valid_lmdb_key,
@ -54,7 +56,7 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> {
) -> Result<()> {
// It is forbidden to keep a mutable reference into the database
// and write into it at the same time, therefore we write into another file.
let mut prefix_docids_sorter = create_sorter(
let prefix_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable,
merge_deladd_cbo_roaring_bitmaps,
self.chunk_compression_type,
@ -62,6 +64,10 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> {
self.max_nb_chunks,
self.max_memory,
);
let mut cached_prefix_docids_sorter = SorterCacheDelAddCboRoaringBitmap::<20, MergeFn>::new(
NonZeroUsize::new(1000).unwrap(),
prefix_docids_sorter,
);
if !common_prefix_fst_words.is_empty() {
let mut current_prefixes: Option<&&[String]> = None;
@ -71,7 +77,10 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> {
current_prefixes = match current_prefixes.take() {
Some(prefixes) if word.starts_with(prefixes[0].as_bytes()) => Some(prefixes),
_otherwise => {
write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_docids_sorter)?;
write_prefixes_in_sorter(
&mut prefixes_cache,
&mut cached_prefix_docids_sorter,
)?;
common_prefix_fst_words
.iter()
.find(|prefixes| word.starts_with(prefixes[0].as_bytes()))
@ -93,21 +102,17 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> {
}
}
write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_docids_sorter)?;
write_prefixes_in_sorter(&mut prefixes_cache, &mut cached_prefix_docids_sorter)?;
}
// We fetch the docids associated to the newly added word prefix fst only.
let db = self.word_docids.remap_data_type::<Bytes>();
let mut buffer = Vec::new();
let db = self.word_docids.lazily_decode_data();
for prefix in new_prefix_fst_words {
let prefix = std::str::from_utf8(prefix.as_bytes())?;
for result in db.prefix_iter(self.wtxn, prefix)? {
let (_word, data) = result?;
buffer.clear();
let mut writer = KvWriterDelAdd::new(&mut buffer);
writer.insert(DelAdd::Addition, data)?;
prefix_docids_sorter.insert(prefix, writer.into_inner()?)?;
let (_word, lazy_data) = result?;
cached_prefix_docids_sorter
.insert_add(prefix.as_bytes(), lazy_data.decode().unwrap())?;
}
}
@ -125,7 +130,7 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> {
// We finally write the word prefix docids into the LMDB database.
write_sorter_into_database(
prefix_docids_sorter,
cached_prefix_docids_sorter.into_sorter()?,
&self.word_prefix_docids,
self.wtxn,
database_is_empty,
@ -139,12 +144,12 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> {
fn write_prefixes_in_sorter(
prefixes: &mut HashMap<Vec<u8>, Vec<Vec<u8>>>,
sorter: &mut grenad::Sorter<MergeFn>,
sorter: &mut SorterCacheDelAddCboRoaringBitmap<20, MergeFn>,
) -> Result<()> {
for (key, data_slices) in prefixes.drain() {
for data in data_slices {
if valid_lmdb_key(&key) {
sorter.insert(&key, data)?;
sorter.direct_insert(&key, &data)?;
}
}
}

View File

@ -1,4 +1,5 @@
use std::collections::{HashMap, HashSet};
use std::num::NonZeroUsize;
use std::str;
use grenad::CompressionType;
@ -9,7 +10,8 @@ use tracing::debug;
use crate::error::SerializationError;
use crate::heed_codec::StrBEU16Codec;
use crate::index::main_key::WORDS_PREFIXES_FST_KEY;
use crate::update::del_add::{deladd_serialize_add_side, DelAdd, KvWriterDelAdd};
use crate::update::del_add::deladd_serialize_add_side;
use crate::update::index_documents::cache::SorterCacheDelAddCboRoaringBitmap;
use crate::update::index_documents::{
create_sorter, merge_deladd_cbo_roaring_bitmaps,
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, valid_lmdb_key,
@ -59,7 +61,7 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> {
) -> Result<()> {
debug!("Computing and writing the word levels integers docids into LMDB on disk...");
let mut prefix_integer_docids_sorter = create_sorter(
let prefix_integer_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable,
merge_deladd_cbo_roaring_bitmaps,
self.chunk_compression_type,
@ -67,6 +69,11 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> {
self.max_nb_chunks,
self.max_memory,
);
let mut cached_prefix_integer_docids_sorter =
SorterCacheDelAddCboRoaringBitmap::<20, MergeFn>::new(
NonZeroUsize::new(1000).unwrap(),
prefix_integer_docids_sorter,
);
if !common_prefix_fst_words.is_empty() {
// We fetch all the new common prefixes between the previous and new prefix fst.
@ -84,7 +91,7 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> {
_otherwise => {
write_prefixes_in_sorter(
&mut prefixes_cache,
&mut prefix_integer_docids_sorter,
&mut cached_prefix_integer_docids_sorter,
)?;
common_prefix_fst_words
.iter()
@ -110,12 +117,14 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> {
}
}
write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_integer_docids_sorter)?;
write_prefixes_in_sorter(
&mut prefixes_cache,
&mut cached_prefix_integer_docids_sorter,
)?;
}
// We fetch the docids associated to the newly added word prefix fst only.
let db = self.word_database.remap_data_type::<Bytes>();
let mut buffer = Vec::new();
let db = self.word_database.lazily_decode_data();
for prefix_bytes in new_prefix_fst_words {
let prefix = str::from_utf8(prefix_bytes.as_bytes()).map_err(|_| {
SerializationError::Decoding { db_name: Some(WORDS_PREFIXES_FST_KEY) }
@ -127,15 +136,12 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> {
.prefix_iter(self.wtxn, prefix_bytes.as_bytes())?
.remap_key_type::<StrBEU16Codec>();
for result in iter {
let ((word, pos), data) = result?;
let ((word, pos), lazy_data) = result?;
if word.starts_with(prefix) {
let key = (prefix, pos);
let bytes = StrBEU16Codec::bytes_encode(&key).unwrap();
buffer.clear();
let mut writer = KvWriterDelAdd::new(&mut buffer);
writer.insert(DelAdd::Addition, data)?;
prefix_integer_docids_sorter.insert(bytes, writer.into_inner()?)?;
cached_prefix_integer_docids_sorter
.insert_add(&bytes, lazy_data.decode().unwrap())?;
}
}
}
@ -159,7 +165,7 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> {
// We finally write all the word prefix integer docids into the LMDB database.
write_sorter_into_database(
prefix_integer_docids_sorter,
cached_prefix_integer_docids_sorter.into_sorter()?,
&self.prefix_database,
self.wtxn,
database_is_empty,
@ -173,13 +179,13 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> {
fn write_prefixes_in_sorter(
prefixes: &mut HashMap<Vec<u8>, Vec<Vec<u8>>>,
sorter: &mut grenad::Sorter<MergeFn>,
sorter: &mut SorterCacheDelAddCboRoaringBitmap<20, MergeFn>,
) -> Result<()> {
// TODO: Merge before insertion.
for (key, data_slices) in prefixes.drain() {
for data in data_slices {
if valid_lmdb_key(&key) {
sorter.insert(&key, data)?;
sorter.direct_insert(&key, &data)?;
}
}
}