mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-07-20 21:30:58 +00:00
Compare commits
23 Commits
reduce-pre
...
measure-so
Author | SHA1 | Date | |
---|---|---|---|
4d92df1b95 | |||
ca332883cc | |||
6264dbf326 | |||
2b7b18fb5f | |||
b03ec3f603 | |||
91ec0bdaf4 | |||
56329633d5 | |||
507bce791b | |||
7adc715783 | |||
f355cf6985 | |||
2603d8d0d0 | |||
9b3d303b08 | |||
16b4545d23 | |||
0e08906fcb | |||
a3beaa90c5 | |||
02fff51902 | |||
54e2e2aa4a | |||
092a383419 | |||
98d55e0d4d | |||
8319552e7d | |||
5d5769fd8a | |||
eafc097a85 | |||
f17cb2ef5b |
10
Cargo.lock
generated
10
Cargo.lock
generated
@ -3280,6 +3280,15 @@ version = "0.4.21"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c"
|
||||
|
||||
[[package]]
|
||||
name = "lru"
|
||||
version = "0.12.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d3262e75e648fce39813cb56ac41f3c3e3f65217ebf3844d818d1f9398cfb0dc"
|
||||
dependencies = [
|
||||
"hashbrown 0.14.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lzma-rs"
|
||||
version = "0.3.0"
|
||||
@ -3540,6 +3549,7 @@ dependencies = [
|
||||
"json-depth-checker",
|
||||
"levenshtein_automata",
|
||||
"liquid",
|
||||
"lru",
|
||||
"maplit",
|
||||
"md5",
|
||||
"meili-snap",
|
||||
|
@ -50,7 +50,7 @@ serde = { version = "1.0.204", features = ["derive"] }
|
||||
serde_json = { version = "1.0.120", features = ["preserve_order"] }
|
||||
slice-group-by = "0.3.1"
|
||||
smallstr = { version = "0.3.0", features = ["serde"] }
|
||||
smallvec = "1.13.2"
|
||||
smallvec = { version = "1.13.2", features = ["union"] }
|
||||
smartstring = "1.0.1"
|
||||
tempfile = "3.10.1"
|
||||
thiserror = "1.0.61"
|
||||
@ -86,6 +86,7 @@ tracing = "0.1.40"
|
||||
ureq = { version = "2.10.0", features = ["json"] }
|
||||
url = "2.5.2"
|
||||
rayon-par-bridge = "0.1.0"
|
||||
lru = "0.12.3"
|
||||
|
||||
[dev-dependencies]
|
||||
mimalloc = { version = "0.1.43", default-features = false }
|
||||
|
223
milli/src/update/index_documents/cache.rs
Normal file
223
milli/src/update/index_documents/cache.rs
Normal file
@ -0,0 +1,223 @@
|
||||
use std::borrow::Cow;
|
||||
use std::mem;
|
||||
use std::num::NonZeroUsize;
|
||||
|
||||
use lru::LruCache;
|
||||
use roaring::RoaringBitmap;
|
||||
use smallvec::SmallVec;
|
||||
|
||||
use crate::update::del_add::{DelAdd, KvWriterDelAdd};
|
||||
use crate::CboRoaringBitmapCodec;
|
||||
|
||||
const ENABLED: bool = true;
|
||||
|
||||
pub struct SorterCacheDelAddCboRoaringBitmap<const N: usize, MF> {
|
||||
cache: LruCache<SmallVec<[u8; N]>, DelAddRoaringBitmap>,
|
||||
sorter: grenad::Sorter<MF>,
|
||||
deladd_buffer: Vec<u8>,
|
||||
cbo_buffer: Vec<u8>,
|
||||
}
|
||||
|
||||
impl<const N: usize, MF> SorterCacheDelAddCboRoaringBitmap<N, MF> {
|
||||
pub fn new(cap: NonZeroUsize, sorter: grenad::Sorter<MF>) -> Self {
|
||||
SorterCacheDelAddCboRoaringBitmap {
|
||||
cache: LruCache::new(cap),
|
||||
sorter,
|
||||
deladd_buffer: Vec::new(),
|
||||
cbo_buffer: Vec::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<const N: usize, MF, U> SorterCacheDelAddCboRoaringBitmap<N, MF>
|
||||
where
|
||||
MF: for<'a> Fn(&[u8], &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>, U>,
|
||||
{
|
||||
pub fn insert_del_u32(&mut self, key: &[u8], n: u32) -> Result<(), grenad::Error<U>> {
|
||||
if !ENABLED {
|
||||
return self.write_entry_to_sorter(key, DelAddRoaringBitmap::new_del_u32(n));
|
||||
}
|
||||
|
||||
let cache = self.cache.get_mut(key);
|
||||
match cache {
|
||||
Some(DelAddRoaringBitmap { del, add: _ }) => {
|
||||
del.get_or_insert_with(RoaringBitmap::new).insert(n);
|
||||
}
|
||||
None => {
|
||||
let value = DelAddRoaringBitmap::new_del_u32(n);
|
||||
if let Some((key, deladd)) = self.cache.push(key.into(), value) {
|
||||
self.write_entry_to_sorter(key, deladd)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn insert_del(
|
||||
&mut self,
|
||||
key: &[u8],
|
||||
bitmap: RoaringBitmap,
|
||||
) -> Result<(), grenad::Error<U>> {
|
||||
if !ENABLED {
|
||||
return self.write_entry_to_sorter(key, DelAddRoaringBitmap::new_del(bitmap));
|
||||
}
|
||||
|
||||
let cache = self.cache.get_mut(key);
|
||||
match cache {
|
||||
Some(DelAddRoaringBitmap { del, add: _ }) => {
|
||||
*del.get_or_insert_with(RoaringBitmap::new) |= bitmap;
|
||||
}
|
||||
None => {
|
||||
let value = DelAddRoaringBitmap::new_del(bitmap);
|
||||
if let Some((key, deladd)) = self.cache.push(key.into(), value) {
|
||||
self.write_entry_to_sorter(key, deladd)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn insert_add_u32(&mut self, key: &[u8], n: u32) -> Result<(), grenad::Error<U>> {
|
||||
if !ENABLED {
|
||||
return self.write_entry_to_sorter(key, DelAddRoaringBitmap::new_add_u32(n));
|
||||
}
|
||||
|
||||
let cache = self.cache.get_mut(key);
|
||||
match cache {
|
||||
Some(DelAddRoaringBitmap { del: _, add }) => {
|
||||
add.get_or_insert_with(RoaringBitmap::new).insert(n);
|
||||
}
|
||||
None => {
|
||||
let value = DelAddRoaringBitmap::new_add_u32(n);
|
||||
if let Some((key, deladd)) = self.cache.push(key.into(), value) {
|
||||
self.write_entry_to_sorter(key, deladd)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn insert_add(
|
||||
&mut self,
|
||||
key: &[u8],
|
||||
bitmap: RoaringBitmap,
|
||||
) -> Result<(), grenad::Error<U>> {
|
||||
if !ENABLED {
|
||||
return self.write_entry_to_sorter(key, DelAddRoaringBitmap::new_add(bitmap));
|
||||
}
|
||||
|
||||
let cache = self.cache.get_mut(key);
|
||||
match cache {
|
||||
Some(DelAddRoaringBitmap { del: _, add }) => {
|
||||
*add.get_or_insert_with(RoaringBitmap::new) |= bitmap;
|
||||
}
|
||||
None => {
|
||||
let value = DelAddRoaringBitmap::new_add(bitmap);
|
||||
if let Some((key, deladd)) = self.cache.push(key.into(), value) {
|
||||
self.write_entry_to_sorter(key, deladd)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn insert_del_add_u32(&mut self, key: &[u8], n: u32) -> Result<(), grenad::Error<U>> {
|
||||
if !ENABLED {
|
||||
return self.write_entry_to_sorter(key, DelAddRoaringBitmap::new_del_add_u32(n));
|
||||
}
|
||||
|
||||
let cache = self.cache.get_mut(key);
|
||||
match cache {
|
||||
Some(DelAddRoaringBitmap { del, add }) => {
|
||||
del.get_or_insert_with(RoaringBitmap::new).insert(n);
|
||||
add.get_or_insert_with(RoaringBitmap::new).insert(n);
|
||||
}
|
||||
None => {
|
||||
let value = DelAddRoaringBitmap::new_del_add_u32(n);
|
||||
if let Some((key, deladd)) = self.cache.push(key.into(), value) {
|
||||
self.write_entry_to_sorter(key, deladd)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write_entry_to_sorter<A: AsRef<[u8]>>(
|
||||
&mut self,
|
||||
key: A,
|
||||
deladd: DelAddRoaringBitmap,
|
||||
) -> Result<(), grenad::Error<U>> {
|
||||
self.deladd_buffer.clear();
|
||||
let mut value_writer = KvWriterDelAdd::new(&mut self.deladd_buffer);
|
||||
match deladd {
|
||||
DelAddRoaringBitmap { del: Some(del), add: None } => {
|
||||
self.cbo_buffer.clear();
|
||||
CboRoaringBitmapCodec::serialize_into(&del, &mut self.cbo_buffer);
|
||||
value_writer.insert(DelAdd::Deletion, &self.cbo_buffer)?;
|
||||
}
|
||||
DelAddRoaringBitmap { del: None, add: Some(add) } => {
|
||||
self.cbo_buffer.clear();
|
||||
CboRoaringBitmapCodec::serialize_into(&add, &mut self.cbo_buffer);
|
||||
value_writer.insert(DelAdd::Addition, &self.cbo_buffer)?;
|
||||
}
|
||||
DelAddRoaringBitmap { del: Some(del), add: Some(add) } => {
|
||||
self.cbo_buffer.clear();
|
||||
CboRoaringBitmapCodec::serialize_into(&del, &mut self.cbo_buffer);
|
||||
value_writer.insert(DelAdd::Deletion, &self.cbo_buffer)?;
|
||||
|
||||
self.cbo_buffer.clear();
|
||||
CboRoaringBitmapCodec::serialize_into(&add, &mut self.cbo_buffer);
|
||||
value_writer.insert(DelAdd::Addition, &self.cbo_buffer)?;
|
||||
}
|
||||
DelAddRoaringBitmap { del: None, add: None } => return Ok(()),
|
||||
}
|
||||
self.sorter.insert(key, value_writer.into_inner().unwrap())
|
||||
}
|
||||
|
||||
pub fn direct_insert(&mut self, key: &[u8], val: &[u8]) -> Result<(), grenad::Error<U>> {
|
||||
self.sorter.insert(key, val)
|
||||
}
|
||||
|
||||
pub fn into_sorter(mut self) -> Result<grenad::Sorter<MF>, grenad::Error<U>> {
|
||||
let default_arc = LruCache::new(NonZeroUsize::MIN);
|
||||
for (key, deladd) in mem::replace(&mut self.cache, default_arc) {
|
||||
self.write_entry_to_sorter(key, deladd)?;
|
||||
}
|
||||
Ok(self.sorter)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DelAddRoaringBitmap {
|
||||
pub del: Option<RoaringBitmap>,
|
||||
pub add: Option<RoaringBitmap>,
|
||||
}
|
||||
|
||||
impl DelAddRoaringBitmap {
|
||||
fn new_del_add_u32(n: u32) -> Self {
|
||||
DelAddRoaringBitmap {
|
||||
del: Some(RoaringBitmap::from([n])),
|
||||
add: Some(RoaringBitmap::from([n])),
|
||||
}
|
||||
}
|
||||
|
||||
fn new_del(bitmap: RoaringBitmap) -> Self {
|
||||
DelAddRoaringBitmap { del: Some(bitmap), add: None }
|
||||
}
|
||||
|
||||
fn new_del_u32(n: u32) -> Self {
|
||||
DelAddRoaringBitmap { del: Some(RoaringBitmap::from([n])), add: None }
|
||||
}
|
||||
|
||||
fn new_add(bitmap: RoaringBitmap) -> Self {
|
||||
DelAddRoaringBitmap { del: None, add: Some(bitmap) }
|
||||
}
|
||||
|
||||
fn new_add_u32(n: u32) -> Self {
|
||||
DelAddRoaringBitmap { del: None, add: Some(RoaringBitmap::from([n])) }
|
||||
}
|
||||
}
|
@ -148,6 +148,9 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
|
||||
for (field_id, value) in obkv.iter() {
|
||||
key_buffer.truncate(mem::size_of::<u32>());
|
||||
key_buffer.extend_from_slice(&field_id.to_be_bytes());
|
||||
let mut key = b"dwp".to_vec();
|
||||
key.extend_from_slice(&key_buffer);
|
||||
// conn.merge(key, 1u32.to_ne_bytes()).unwrap();
|
||||
docid_word_positions_sorter.insert(&key_buffer, value)?;
|
||||
}
|
||||
|
||||
|
@ -1,5 +1,6 @@
|
||||
use std::fs::File;
|
||||
use std::io::{self, BufReader};
|
||||
use std::num::NonZeroUsize;
|
||||
|
||||
use heed::{BytesDecode, BytesEncode};
|
||||
|
||||
@ -9,8 +10,10 @@ use super::helpers::{
|
||||
use crate::heed_codec::facet::{
|
||||
FacetGroupKey, FacetGroupKeyCodec, FieldDocIdFacetF64Codec, OrderedF64Codec,
|
||||
};
|
||||
use crate::update::del_add::{KvReaderDelAdd, KvWriterDelAdd};
|
||||
use crate::update::del_add::{DelAdd, KvReaderDelAdd};
|
||||
use crate::update::index_documents::cache::SorterCacheDelAddCboRoaringBitmap;
|
||||
use crate::update::settings::InnerIndexSettingsDiff;
|
||||
use crate::update::MergeFn;
|
||||
use crate::Result;
|
||||
|
||||
/// Extracts the facet number and the documents ids where this facet number appear.
|
||||
@ -25,7 +28,7 @@ pub fn extract_facet_number_docids<R: io::Read + io::Seek>(
|
||||
) -> Result<grenad::Reader<BufReader<File>>> {
|
||||
let max_memory = indexer.max_memory_by_thread();
|
||||
|
||||
let mut facet_number_docids_sorter = create_sorter(
|
||||
let facet_number_docids_sorter = create_sorter(
|
||||
grenad::SortAlgorithm::Unstable,
|
||||
merge_deladd_cbo_roaring_bitmaps,
|
||||
indexer.chunk_compression_type,
|
||||
@ -33,8 +36,12 @@ pub fn extract_facet_number_docids<R: io::Read + io::Seek>(
|
||||
indexer.max_nb_chunks,
|
||||
max_memory,
|
||||
);
|
||||
let mut cached_facet_number_docids_sorter =
|
||||
SorterCacheDelAddCboRoaringBitmap::<20, MergeFn>::new(
|
||||
NonZeroUsize::new(1000).unwrap(),
|
||||
facet_number_docids_sorter,
|
||||
);
|
||||
|
||||
let mut buffer = Vec::new();
|
||||
let mut cursor = fid_docid_facet_number.into_cursor()?;
|
||||
while let Some((key_bytes, deladd_obkv_bytes)) = cursor.move_on_next()? {
|
||||
let (field_id, document_id, number) =
|
||||
@ -42,16 +49,17 @@ pub fn extract_facet_number_docids<R: io::Read + io::Seek>(
|
||||
|
||||
let key = FacetGroupKey { field_id, level: 0, left_bound: number };
|
||||
let key_bytes = FacetGroupKeyCodec::<OrderedF64Codec>::bytes_encode(&key).unwrap();
|
||||
|
||||
buffer.clear();
|
||||
let mut obkv = KvWriterDelAdd::new(&mut buffer);
|
||||
for (deladd_key, _) in KvReaderDelAdd::new(deladd_obkv_bytes).iter() {
|
||||
obkv.insert(deladd_key, document_id.to_ne_bytes())?;
|
||||
match deladd_key {
|
||||
DelAdd::Deletion => {
|
||||
cached_facet_number_docids_sorter.insert_del_u32(&key_bytes, document_id)?
|
||||
}
|
||||
DelAdd::Addition => {
|
||||
cached_facet_number_docids_sorter.insert_add_u32(&key_bytes, document_id)?
|
||||
}
|
||||
}
|
||||
}
|
||||
obkv.finish()?;
|
||||
|
||||
facet_number_docids_sorter.insert(key_bytes, &buffer)?;
|
||||
}
|
||||
|
||||
sorter_into_reader(facet_number_docids_sorter, indexer)
|
||||
sorter_into_reader(cached_facet_number_docids_sorter.into_sorter()?, indexer)
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ use std::collections::BTreeSet;
|
||||
use std::fs::File;
|
||||
use std::io::BufReader;
|
||||
use std::iter::FromIterator;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::{io, str};
|
||||
|
||||
use charabia::normalizer::{Normalize, NormalizerOption};
|
||||
@ -12,10 +13,12 @@ use super::helpers::{create_sorter, sorter_into_reader, try_split_array_at, Gren
|
||||
use crate::heed_codec::facet::{FacetGroupKey, FacetGroupKeyCodec};
|
||||
use crate::heed_codec::{BEU16StrCodec, StrRefCodec};
|
||||
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
|
||||
use crate::update::index_documents::cache::SorterCacheDelAddCboRoaringBitmap;
|
||||
use crate::update::index_documents::helpers::{
|
||||
merge_deladd_btreeset_string, merge_deladd_cbo_roaring_bitmaps,
|
||||
};
|
||||
use crate::update::settings::InnerIndexSettingsDiff;
|
||||
use crate::update::MergeFn;
|
||||
use crate::{FieldId, Result, MAX_FACET_VALUE_LENGTH};
|
||||
|
||||
/// Extracts the facet string and the documents ids where this facet string appear.
|
||||
@ -31,7 +34,7 @@ pub fn extract_facet_string_docids<R: io::Read + io::Seek>(
|
||||
let max_memory = indexer.max_memory_by_thread();
|
||||
let options = NormalizerOption { lossy: true, ..Default::default() };
|
||||
|
||||
let mut facet_string_docids_sorter = create_sorter(
|
||||
let facet_string_docids_sorter = create_sorter(
|
||||
grenad::SortAlgorithm::Stable,
|
||||
merge_deladd_cbo_roaring_bitmaps,
|
||||
indexer.chunk_compression_type,
|
||||
@ -39,6 +42,11 @@ pub fn extract_facet_string_docids<R: io::Read + io::Seek>(
|
||||
indexer.max_nb_chunks,
|
||||
max_memory.map(|m| m / 2),
|
||||
);
|
||||
let mut cached_facet_string_docids_sorter =
|
||||
SorterCacheDelAddCboRoaringBitmap::<20, MergeFn>::new(
|
||||
NonZeroUsize::new(1000).unwrap(),
|
||||
facet_string_docids_sorter,
|
||||
);
|
||||
|
||||
let mut normalized_facet_string_docids_sorter = create_sorter(
|
||||
grenad::SortAlgorithm::Stable,
|
||||
@ -94,21 +102,27 @@ pub fn extract_facet_string_docids<R: io::Read + io::Seek>(
|
||||
|
||||
let key = (field_id, hyper_normalized_value.as_ref());
|
||||
let key_bytes = BEU16StrCodec::bytes_encode(&key).map_err(heed::Error::Encoding)?;
|
||||
let mut key = b"nfs".to_vec();
|
||||
key.extend_from_slice(&key_bytes);
|
||||
// conn.merge(key, 1u32.to_ne_bytes()).unwrap();
|
||||
normalized_facet_string_docids_sorter.insert(key_bytes, &buffer)?;
|
||||
}
|
||||
|
||||
let key = FacetGroupKey { field_id, level: 0, left_bound: normalized_value };
|
||||
let key_bytes = FacetGroupKeyCodec::<StrRefCodec>::bytes_encode(&key).unwrap();
|
||||
|
||||
buffer.clear();
|
||||
let mut obkv = KvWriterDelAdd::new(&mut buffer);
|
||||
for (deladd_key, _) in deladd_reader.iter() {
|
||||
obkv.insert(deladd_key, document_id.to_ne_bytes())?;
|
||||
match deladd_key {
|
||||
DelAdd::Deletion => {
|
||||
cached_facet_string_docids_sorter.insert_del_u32(&key_bytes, document_id)?
|
||||
}
|
||||
DelAdd::Addition => {
|
||||
cached_facet_string_docids_sorter.insert_add_u32(&key_bytes, document_id)?
|
||||
}
|
||||
}
|
||||
}
|
||||
obkv.finish()?;
|
||||
facet_string_docids_sorter.insert(&key_bytes, &buffer)?;
|
||||
}
|
||||
|
||||
let normalized = sorter_into_reader(normalized_facet_string_docids_sorter, indexer)?;
|
||||
sorter_into_reader(facet_string_docids_sorter, indexer).map(|s| (s, normalized))
|
||||
sorter_into_reader(cached_facet_string_docids_sorter.into_sorter()?, indexer)
|
||||
.map(|s| (s, normalized))
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
use std::fs::File;
|
||||
use std::io::{self, BufReader};
|
||||
use std::num::NonZeroUsize;
|
||||
|
||||
use obkv::KvReaderU16;
|
||||
|
||||
@ -9,8 +10,10 @@ use super::helpers::{
|
||||
};
|
||||
use crate::error::SerializationError;
|
||||
use crate::index::db_name::DOCID_WORD_POSITIONS;
|
||||
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
|
||||
use crate::update::del_add::{DelAdd, KvReaderDelAdd};
|
||||
use crate::update::index_documents::cache::SorterCacheDelAddCboRoaringBitmap;
|
||||
use crate::update::settings::InnerIndexSettingsDiff;
|
||||
use crate::update::MergeFn;
|
||||
use crate::Result;
|
||||
|
||||
const MAX_COUNTED_WORDS: usize = 30;
|
||||
@ -28,7 +31,7 @@ pub fn extract_fid_word_count_docids<R: io::Read + io::Seek>(
|
||||
) -> Result<grenad::Reader<BufReader<File>>> {
|
||||
let max_memory = indexer.max_memory_by_thread();
|
||||
|
||||
let mut fid_word_count_docids_sorter = create_sorter(
|
||||
let fid_word_count_docids_sorter = create_sorter(
|
||||
grenad::SortAlgorithm::Unstable,
|
||||
merge_deladd_cbo_roaring_bitmaps,
|
||||
indexer.chunk_compression_type,
|
||||
@ -36,9 +39,13 @@ pub fn extract_fid_word_count_docids<R: io::Read + io::Seek>(
|
||||
indexer.max_nb_chunks,
|
||||
max_memory,
|
||||
);
|
||||
let mut cached_fid_word_count_docids_sorter =
|
||||
SorterCacheDelAddCboRoaringBitmap::<20, MergeFn>::new(
|
||||
NonZeroUsize::new(1000).unwrap(),
|
||||
fid_word_count_docids_sorter,
|
||||
);
|
||||
|
||||
let mut key_buffer = Vec::new();
|
||||
let mut value_buffer = Vec::new();
|
||||
let mut cursor = docid_word_positions.into_cursor()?;
|
||||
while let Some((key, value)) = cursor.move_on_next()? {
|
||||
let (document_id_bytes, fid_bytes) = try_split_array_at(key)
|
||||
@ -64,28 +71,20 @@ pub fn extract_fid_word_count_docids<R: io::Read + io::Seek>(
|
||||
if deletion != addition {
|
||||
// Insert deleted word count in sorter if exist.
|
||||
if let Some(word_count) = deletion {
|
||||
value_buffer.clear();
|
||||
let mut value_writer = KvWriterDelAdd::new(&mut value_buffer);
|
||||
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap();
|
||||
key_buffer.clear();
|
||||
key_buffer.extend_from_slice(fid_bytes);
|
||||
key_buffer.push(word_count as u8);
|
||||
fid_word_count_docids_sorter
|
||||
.insert(&key_buffer, value_writer.into_inner().unwrap())?;
|
||||
cached_fid_word_count_docids_sorter.insert_del_u32(&key_buffer, document_id)?;
|
||||
}
|
||||
// Insert added word count in sorter if exist.
|
||||
if let Some(word_count) = addition {
|
||||
value_buffer.clear();
|
||||
let mut value_writer = KvWriterDelAdd::new(&mut value_buffer);
|
||||
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
|
||||
key_buffer.clear();
|
||||
key_buffer.extend_from_slice(fid_bytes);
|
||||
key_buffer.push(word_count as u8);
|
||||
fid_word_count_docids_sorter
|
||||
.insert(&key_buffer, value_writer.into_inner().unwrap())?;
|
||||
cached_fid_word_count_docids_sorter.insert_add_u32(&key_buffer, document_id)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sorter_into_reader(fid_word_count_docids_sorter, indexer)
|
||||
sorter_into_reader(cached_fid_word_count_docids_sorter.into_sorter()?, indexer)
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
use std::collections::BTreeSet;
|
||||
use std::fs::File;
|
||||
use std::io::{self, BufReader};
|
||||
use std::num::NonZeroUsize;
|
||||
|
||||
use heed::{BytesDecode, BytesEncode};
|
||||
use obkv::KvReaderU16;
|
||||
@ -14,6 +15,7 @@ use crate::error::SerializationError;
|
||||
use crate::heed_codec::StrBEU16Codec;
|
||||
use crate::index::db_name::DOCID_WORD_POSITIONS;
|
||||
use crate::update::del_add::{is_noop_del_add_obkv, DelAdd, KvReaderDelAdd, KvWriterDelAdd};
|
||||
use crate::update::index_documents::cache::SorterCacheDelAddCboRoaringBitmap;
|
||||
use crate::update::index_documents::helpers::sorter_into_reader;
|
||||
use crate::update::settings::InnerIndexSettingsDiff;
|
||||
use crate::update::MergeFn;
|
||||
@ -38,7 +40,7 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
|
||||
)> {
|
||||
let max_memory = indexer.max_memory_by_thread();
|
||||
|
||||
let mut word_fid_docids_sorter = create_sorter(
|
||||
let word_fid_docids_sorter = create_sorter(
|
||||
grenad::SortAlgorithm::Unstable,
|
||||
merge_deladd_cbo_roaring_bitmaps,
|
||||
indexer.chunk_compression_type,
|
||||
@ -46,6 +48,11 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
|
||||
indexer.max_nb_chunks,
|
||||
max_memory.map(|m| m / 3),
|
||||
);
|
||||
let mut cached_word_fid_docids_sorter = SorterCacheDelAddCboRoaringBitmap::<20, _>::new(
|
||||
NonZeroUsize::new(1000).unwrap(),
|
||||
word_fid_docids_sorter,
|
||||
);
|
||||
|
||||
let mut key_buffer = Vec::new();
|
||||
let mut del_words = BTreeSet::new();
|
||||
let mut add_words = BTreeSet::new();
|
||||
@ -79,7 +86,7 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
|
||||
&mut key_buffer,
|
||||
&del_words,
|
||||
&add_words,
|
||||
&mut word_fid_docids_sorter,
|
||||
&mut cached_word_fid_docids_sorter,
|
||||
)?;
|
||||
|
||||
del_words.clear();
|
||||
@ -92,7 +99,7 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
|
||||
tempfile::tempfile()?,
|
||||
);
|
||||
|
||||
let mut word_docids_sorter = create_sorter(
|
||||
let word_docids_sorter = create_sorter(
|
||||
grenad::SortAlgorithm::Unstable,
|
||||
merge_deladd_cbo_roaring_bitmaps,
|
||||
indexer.chunk_compression_type,
|
||||
@ -100,8 +107,12 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
|
||||
indexer.max_nb_chunks,
|
||||
max_memory.map(|m| m / 3),
|
||||
);
|
||||
let mut cached_word_docids_sorter = SorterCacheDelAddCboRoaringBitmap::<20, MergeFn>::new(
|
||||
NonZeroUsize::new(1000).unwrap(),
|
||||
word_docids_sorter,
|
||||
);
|
||||
|
||||
let mut exact_word_docids_sorter = create_sorter(
|
||||
let exact_word_docids_sorter = create_sorter(
|
||||
grenad::SortAlgorithm::Unstable,
|
||||
merge_deladd_cbo_roaring_bitmaps,
|
||||
indexer.chunk_compression_type,
|
||||
@ -109,9 +120,12 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
|
||||
indexer.max_nb_chunks,
|
||||
max_memory.map(|m| m / 3),
|
||||
);
|
||||
let mut cached_exact_word_docids_sorter = SorterCacheDelAddCboRoaringBitmap::<20, MergeFn>::new(
|
||||
NonZeroUsize::new(1000).unwrap(),
|
||||
exact_word_docids_sorter,
|
||||
);
|
||||
|
||||
let mut iter = word_fid_docids_sorter.into_stream_merger_iter()?;
|
||||
let mut buffer = Vec::new();
|
||||
let mut iter = cached_word_fid_docids_sorter.into_sorter()?.into_stream_merger_iter()?;
|
||||
// NOTE: replacing sorters by bitmap merging is less efficient, so, use sorters.
|
||||
while let Some((key, value)) = iter.next()? {
|
||||
// only keep the value if their is a change to apply in the DB.
|
||||
@ -124,34 +138,31 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
|
||||
|
||||
// merge all deletions
|
||||
let obkv = KvReaderDelAdd::new(value);
|
||||
let w = w.as_bytes();
|
||||
if let Some(value) = obkv.get(DelAdd::Deletion) {
|
||||
let delete_from_exact = settings_diff.old.exact_attributes.contains(&fid);
|
||||
buffer.clear();
|
||||
let mut obkv = KvWriterDelAdd::new(&mut buffer);
|
||||
obkv.insert(DelAdd::Deletion, value)?;
|
||||
let bitmap = CboRoaringBitmapCodec::deserialize_from(value)?;
|
||||
if delete_from_exact {
|
||||
exact_word_docids_sorter.insert(w, obkv.into_inner().unwrap())?;
|
||||
cached_exact_word_docids_sorter.insert_del(w, bitmap)?;
|
||||
} else {
|
||||
word_docids_sorter.insert(w, obkv.into_inner().unwrap())?;
|
||||
cached_word_docids_sorter.insert_del(w, bitmap)?;
|
||||
}
|
||||
}
|
||||
// merge all additions
|
||||
if let Some(value) = obkv.get(DelAdd::Addition) {
|
||||
let add_in_exact = settings_diff.new.exact_attributes.contains(&fid);
|
||||
buffer.clear();
|
||||
let mut obkv = KvWriterDelAdd::new(&mut buffer);
|
||||
obkv.insert(DelAdd::Addition, value)?;
|
||||
let bitmap = CboRoaringBitmapCodec::deserialize_from(value)?;
|
||||
if add_in_exact {
|
||||
exact_word_docids_sorter.insert(w, obkv.into_inner().unwrap())?;
|
||||
cached_exact_word_docids_sorter.insert_add(w, bitmap)?;
|
||||
} else {
|
||||
word_docids_sorter.insert(w, obkv.into_inner().unwrap())?;
|
||||
cached_word_docids_sorter.insert_add(w, bitmap)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok((
|
||||
sorter_into_reader(word_docids_sorter, indexer)?,
|
||||
sorter_into_reader(exact_word_docids_sorter, indexer)?,
|
||||
sorter_into_reader(cached_word_docids_sorter.into_sorter()?, indexer)?,
|
||||
sorter_into_reader(cached_exact_word_docids_sorter.into_sorter()?, indexer)?,
|
||||
writer_into_reader(word_fid_docids_writer)?,
|
||||
))
|
||||
}
|
||||
@ -163,41 +174,40 @@ fn words_into_sorter(
|
||||
key_buffer: &mut Vec<u8>,
|
||||
del_words: &BTreeSet<Vec<u8>>,
|
||||
add_words: &BTreeSet<Vec<u8>>,
|
||||
word_fid_docids_sorter: &mut grenad::Sorter<MergeFn>,
|
||||
cached_word_fid_docids_sorter: &mut SorterCacheDelAddCboRoaringBitmap<20, MergeFn>,
|
||||
) -> Result<()> {
|
||||
use itertools::merge_join_by;
|
||||
use itertools::EitherOrBoth::{Both, Left, Right};
|
||||
|
||||
let mut buffer = Vec::new();
|
||||
for eob in merge_join_by(del_words.iter(), add_words.iter(), |d, a| d.cmp(a)) {
|
||||
buffer.clear();
|
||||
let mut value_writer = KvWriterDelAdd::new(&mut buffer);
|
||||
let word_bytes = match eob {
|
||||
Left(word_bytes) => {
|
||||
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap();
|
||||
word_bytes
|
||||
}
|
||||
Right(word_bytes) => {
|
||||
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
|
||||
word_bytes
|
||||
}
|
||||
Both(word_bytes, _) => {
|
||||
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap();
|
||||
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
|
||||
word_bytes
|
||||
}
|
||||
Left(word_bytes) => word_bytes,
|
||||
Right(word_bytes) => word_bytes,
|
||||
Both(word_bytes, _) => word_bytes,
|
||||
};
|
||||
|
||||
key_buffer.clear();
|
||||
key_buffer.extend_from_slice(word_bytes);
|
||||
key_buffer.push(0);
|
||||
key_buffer.extend_from_slice(&fid.to_be_bytes());
|
||||
word_fid_docids_sorter.insert(&key_buffer, value_writer.into_inner().unwrap())?;
|
||||
|
||||
match eob {
|
||||
Left(_) => {
|
||||
cached_word_fid_docids_sorter.insert_del_u32(key_buffer, document_id)?;
|
||||
}
|
||||
Right(_) => {
|
||||
cached_word_fid_docids_sorter.insert_add_u32(key_buffer, document_id)?;
|
||||
}
|
||||
Both(_, _) => {
|
||||
cached_word_fid_docids_sorter.insert_del_add_u32(key_buffer, document_id)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// TODO do we still use this?
|
||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::extract")]
|
||||
fn docids_into_writers<W>(
|
||||
word: &str,
|
||||
|
@ -1,6 +1,7 @@
|
||||
use std::collections::{BTreeMap, VecDeque};
|
||||
use std::fs::File;
|
||||
use std::io::BufReader;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::{cmp, io};
|
||||
|
||||
use obkv::KvReaderU16;
|
||||
@ -12,7 +13,8 @@ use super::helpers::{
|
||||
use crate::error::SerializationError;
|
||||
use crate::index::db_name::DOCID_WORD_POSITIONS;
|
||||
use crate::proximity::{index_proximity, ProximityPrecision, MAX_DISTANCE};
|
||||
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
|
||||
use crate::update::del_add::{DelAdd, KvReaderDelAdd};
|
||||
use crate::update::index_documents::cache::SorterCacheDelAddCboRoaringBitmap;
|
||||
use crate::update::settings::InnerIndexSettingsDiff;
|
||||
use crate::{DocumentId, Result};
|
||||
|
||||
@ -40,15 +42,19 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
|
||||
let any_addition = settings_diff.new.proximity_precision == ProximityPrecision::ByWord;
|
||||
|
||||
let max_memory = indexer.max_memory_by_thread();
|
||||
let mut word_pair_proximity_docids_sorters: Vec<_> = (1..MAX_DISTANCE)
|
||||
let mut cached_word_pair_proximity_docids_sorters: Vec<_> = (1..MAX_DISTANCE)
|
||||
.map(|_| {
|
||||
create_sorter(
|
||||
let sorter = create_sorter(
|
||||
grenad::SortAlgorithm::Unstable,
|
||||
merge_deladd_cbo_roaring_bitmaps,
|
||||
indexer.chunk_compression_type,
|
||||
indexer.chunk_compression_level,
|
||||
indexer.max_nb_chunks,
|
||||
max_memory.map(|m| m / MAX_DISTANCE as usize),
|
||||
);
|
||||
SorterCacheDelAddCboRoaringBitmap::<20, MergeFn>::new(
|
||||
NonZeroUsize::new(1000).unwrap(),
|
||||
sorter,
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
@ -77,7 +83,7 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
|
||||
current_document_id.unwrap(),
|
||||
&del_word_pair_proximity,
|
||||
&add_word_pair_proximity,
|
||||
&mut word_pair_proximity_docids_sorters,
|
||||
&mut cached_word_pair_proximity_docids_sorters,
|
||||
)?;
|
||||
del_word_pair_proximity.clear();
|
||||
add_word_pair_proximity.clear();
|
||||
@ -167,7 +173,7 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
|
||||
document_id,
|
||||
&del_word_pair_proximity,
|
||||
&add_word_pair_proximity,
|
||||
&mut word_pair_proximity_docids_sorters,
|
||||
&mut cached_word_pair_proximity_docids_sorters,
|
||||
)?;
|
||||
}
|
||||
{
|
||||
@ -181,8 +187,8 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
|
||||
tempfile::tempfile()?,
|
||||
);
|
||||
|
||||
for sorter in word_pair_proximity_docids_sorters {
|
||||
sorter.write_into_stream_writer(&mut writer)?;
|
||||
for cached_sorter in cached_word_pair_proximity_docids_sorters {
|
||||
cached_sorter.into_sorter()?.write_into_stream_writer(&mut writer)?;
|
||||
}
|
||||
|
||||
writer_into_reader(writer)
|
||||
@ -197,34 +203,24 @@ fn document_word_positions_into_sorter(
|
||||
document_id: DocumentId,
|
||||
del_word_pair_proximity: &BTreeMap<(String, String), u8>,
|
||||
add_word_pair_proximity: &BTreeMap<(String, String), u8>,
|
||||
word_pair_proximity_docids_sorters: &mut [grenad::Sorter<MergeFn>],
|
||||
cached_word_pair_proximity_docids_sorters: &mut [SorterCacheDelAddCboRoaringBitmap<
|
||||
20,
|
||||
MergeFn,
|
||||
>],
|
||||
) -> Result<()> {
|
||||
use itertools::merge_join_by;
|
||||
use itertools::EitherOrBoth::{Both, Left, Right};
|
||||
|
||||
let mut buffer = Vec::new();
|
||||
let mut key_buffer = Vec::new();
|
||||
for eob in
|
||||
merge_join_by(del_word_pair_proximity.iter(), add_word_pair_proximity.iter(), |d, a| {
|
||||
d.cmp(a)
|
||||
})
|
||||
{
|
||||
buffer.clear();
|
||||
let mut value_writer = KvWriterDelAdd::new(&mut buffer);
|
||||
let ((w1, w2), prox) = match eob {
|
||||
Left(key_value) => {
|
||||
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap();
|
||||
key_value
|
||||
}
|
||||
Right(key_value) => {
|
||||
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
|
||||
key_value
|
||||
}
|
||||
Both(key_value, _) => {
|
||||
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap();
|
||||
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
|
||||
key_value
|
||||
}
|
||||
Left(key_value) => key_value,
|
||||
Right(key_value) => key_value,
|
||||
Both(key_value, _) => key_value,
|
||||
};
|
||||
|
||||
key_buffer.clear();
|
||||
@ -233,8 +229,20 @@ fn document_word_positions_into_sorter(
|
||||
key_buffer.push(0);
|
||||
key_buffer.extend_from_slice(w2.as_bytes());
|
||||
|
||||
word_pair_proximity_docids_sorters[*prox as usize - 1]
|
||||
.insert(&key_buffer, value_writer.into_inner().unwrap())?;
|
||||
match eob {
|
||||
Left(_) => {
|
||||
cached_word_pair_proximity_docids_sorters[*prox as usize - 1]
|
||||
.insert_del_u32(&key_buffer, document_id)?;
|
||||
}
|
||||
Right(_) => {
|
||||
cached_word_pair_proximity_docids_sorters[*prox as usize - 1]
|
||||
.insert_add_u32(&key_buffer, document_id)?;
|
||||
}
|
||||
Both(_, _) => {
|
||||
cached_word_pair_proximity_docids_sorters[*prox as usize - 1]
|
||||
.insert_del_add_u32(&key_buffer, document_id)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
@ -1,6 +1,7 @@
|
||||
use std::collections::BTreeSet;
|
||||
use std::fs::File;
|
||||
use std::io::{self, BufReader};
|
||||
use std::num::NonZeroUsize;
|
||||
|
||||
use obkv::KvReaderU16;
|
||||
|
||||
@ -10,7 +11,8 @@ use super::helpers::{
|
||||
};
|
||||
use crate::error::SerializationError;
|
||||
use crate::index::db_name::DOCID_WORD_POSITIONS;
|
||||
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
|
||||
use crate::update::del_add::{DelAdd, KvReaderDelAdd};
|
||||
use crate::update::index_documents::cache::SorterCacheDelAddCboRoaringBitmap;
|
||||
use crate::update::settings::InnerIndexSettingsDiff;
|
||||
use crate::update::MergeFn;
|
||||
use crate::{bucketed_position, DocumentId, Result};
|
||||
@ -27,7 +29,7 @@ pub fn extract_word_position_docids<R: io::Read + io::Seek>(
|
||||
) -> Result<grenad::Reader<BufReader<File>>> {
|
||||
let max_memory = indexer.max_memory_by_thread();
|
||||
|
||||
let mut word_position_docids_sorter = create_sorter(
|
||||
let word_position_docids_sorter = create_sorter(
|
||||
grenad::SortAlgorithm::Unstable,
|
||||
merge_deladd_cbo_roaring_bitmaps,
|
||||
indexer.chunk_compression_type,
|
||||
@ -35,6 +37,11 @@ pub fn extract_word_position_docids<R: io::Read + io::Seek>(
|
||||
indexer.max_nb_chunks,
|
||||
max_memory,
|
||||
);
|
||||
let mut cached_word_position_docids_sorter =
|
||||
SorterCacheDelAddCboRoaringBitmap::<20, MergeFn>::new(
|
||||
NonZeroUsize::new(1000).unwrap(),
|
||||
word_position_docids_sorter,
|
||||
);
|
||||
|
||||
let mut del_word_positions: BTreeSet<(u16, Vec<u8>)> = BTreeSet::new();
|
||||
let mut add_word_positions: BTreeSet<(u16, Vec<u8>)> = BTreeSet::new();
|
||||
@ -52,7 +59,7 @@ pub fn extract_word_position_docids<R: io::Read + io::Seek>(
|
||||
&mut key_buffer,
|
||||
&del_word_positions,
|
||||
&add_word_positions,
|
||||
&mut word_position_docids_sorter,
|
||||
&mut cached_word_position_docids_sorter,
|
||||
)?;
|
||||
del_word_positions.clear();
|
||||
add_word_positions.clear();
|
||||
@ -84,12 +91,13 @@ pub fn extract_word_position_docids<R: io::Read + io::Seek>(
|
||||
&mut key_buffer,
|
||||
&del_word_positions,
|
||||
&add_word_positions,
|
||||
&mut word_position_docids_sorter,
|
||||
&mut cached_word_position_docids_sorter,
|
||||
)?;
|
||||
}
|
||||
|
||||
// TODO remove noop DelAdd OBKV
|
||||
let word_position_docids_reader = sorter_into_reader(word_position_docids_sorter, indexer)?;
|
||||
let word_position_docids_reader =
|
||||
sorter_into_reader(cached_word_position_docids_sorter.into_sorter()?, indexer)?;
|
||||
|
||||
Ok(word_position_docids_reader)
|
||||
}
|
||||
@ -100,38 +108,38 @@ fn words_position_into_sorter(
|
||||
key_buffer: &mut Vec<u8>,
|
||||
del_word_positions: &BTreeSet<(u16, Vec<u8>)>,
|
||||
add_word_positions: &BTreeSet<(u16, Vec<u8>)>,
|
||||
word_position_docids_sorter: &mut grenad::Sorter<MergeFn>,
|
||||
cached_word_position_docids_sorter: &mut SorterCacheDelAddCboRoaringBitmap<20, MergeFn>,
|
||||
) -> Result<()> {
|
||||
use itertools::merge_join_by;
|
||||
use itertools::EitherOrBoth::{Both, Left, Right};
|
||||
|
||||
let mut buffer = Vec::new();
|
||||
for eob in merge_join_by(del_word_positions.iter(), add_word_positions.iter(), |d, a| d.cmp(a))
|
||||
{
|
||||
buffer.clear();
|
||||
let mut value_writer = KvWriterDelAdd::new(&mut buffer);
|
||||
let (position, word_bytes) = match eob {
|
||||
Left(key) => {
|
||||
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap();
|
||||
key
|
||||
}
|
||||
Right(key) => {
|
||||
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
|
||||
key
|
||||
}
|
||||
Both(key, _) => {
|
||||
// both values needs to be kept because it will be used in other extractors.
|
||||
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap();
|
||||
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
|
||||
key
|
||||
}
|
||||
Left(key) => key,
|
||||
Right(key) => key,
|
||||
Both(key, _) => key,
|
||||
};
|
||||
|
||||
key_buffer.clear();
|
||||
key_buffer.extend_from_slice(word_bytes);
|
||||
key_buffer.push(0);
|
||||
key_buffer.extend_from_slice(&position.to_be_bytes());
|
||||
word_position_docids_sorter.insert(&key_buffer, value_writer.into_inner().unwrap())?;
|
||||
|
||||
match eob {
|
||||
Left(_) => {
|
||||
cached_word_position_docids_sorter
|
||||
.insert_del_u32(key_buffer.as_slice(), document_id)?;
|
||||
}
|
||||
Right(_) => {
|
||||
cached_word_position_docids_sorter
|
||||
.insert_add_u32(key_buffer.as_slice(), document_id)?;
|
||||
}
|
||||
Both(_, _) => {
|
||||
cached_word_position_docids_sorter
|
||||
.insert_del_add_u32(key_buffer.as_slice(), document_id)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
@ -1,3 +1,4 @@
|
||||
pub mod cache;
|
||||
mod enrich;
|
||||
mod extract;
|
||||
mod helpers;
|
||||
|
@ -1,10 +1,12 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::num::NonZeroUsize;
|
||||
|
||||
use grenad::CompressionType;
|
||||
use heed::types::{Bytes, Str};
|
||||
use heed::types::Str;
|
||||
use heed::Database;
|
||||
|
||||
use crate::update::del_add::{deladd_serialize_add_side, DelAdd, KvWriterDelAdd};
|
||||
use super::index_documents::cache::SorterCacheDelAddCboRoaringBitmap;
|
||||
use crate::update::del_add::deladd_serialize_add_side;
|
||||
use crate::update::index_documents::{
|
||||
create_sorter, merge_deladd_cbo_roaring_bitmaps,
|
||||
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, valid_lmdb_key,
|
||||
@ -54,7 +56,7 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> {
|
||||
) -> Result<()> {
|
||||
// It is forbidden to keep a mutable reference into the database
|
||||
// and write into it at the same time, therefore we write into another file.
|
||||
let mut prefix_docids_sorter = create_sorter(
|
||||
let prefix_docids_sorter = create_sorter(
|
||||
grenad::SortAlgorithm::Unstable,
|
||||
merge_deladd_cbo_roaring_bitmaps,
|
||||
self.chunk_compression_type,
|
||||
@ -62,6 +64,10 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> {
|
||||
self.max_nb_chunks,
|
||||
self.max_memory,
|
||||
);
|
||||
let mut cached_prefix_docids_sorter = SorterCacheDelAddCboRoaringBitmap::<20, MergeFn>::new(
|
||||
NonZeroUsize::new(1000).unwrap(),
|
||||
prefix_docids_sorter,
|
||||
);
|
||||
|
||||
if !common_prefix_fst_words.is_empty() {
|
||||
let mut current_prefixes: Option<&&[String]> = None;
|
||||
@ -71,7 +77,10 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> {
|
||||
current_prefixes = match current_prefixes.take() {
|
||||
Some(prefixes) if word.starts_with(prefixes[0].as_bytes()) => Some(prefixes),
|
||||
_otherwise => {
|
||||
write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_docids_sorter)?;
|
||||
write_prefixes_in_sorter(
|
||||
&mut prefixes_cache,
|
||||
&mut cached_prefix_docids_sorter,
|
||||
)?;
|
||||
common_prefix_fst_words
|
||||
.iter()
|
||||
.find(|prefixes| word.starts_with(prefixes[0].as_bytes()))
|
||||
@ -93,21 +102,17 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> {
|
||||
}
|
||||
}
|
||||
|
||||
write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_docids_sorter)?;
|
||||
write_prefixes_in_sorter(&mut prefixes_cache, &mut cached_prefix_docids_sorter)?;
|
||||
}
|
||||
|
||||
// We fetch the docids associated to the newly added word prefix fst only.
|
||||
let db = self.word_docids.remap_data_type::<Bytes>();
|
||||
let mut buffer = Vec::new();
|
||||
let db = self.word_docids.lazily_decode_data();
|
||||
for prefix in new_prefix_fst_words {
|
||||
let prefix = std::str::from_utf8(prefix.as_bytes())?;
|
||||
for result in db.prefix_iter(self.wtxn, prefix)? {
|
||||
let (_word, data) = result?;
|
||||
buffer.clear();
|
||||
let mut writer = KvWriterDelAdd::new(&mut buffer);
|
||||
writer.insert(DelAdd::Addition, data)?;
|
||||
|
||||
prefix_docids_sorter.insert(prefix, writer.into_inner()?)?;
|
||||
let (_word, lazy_data) = result?;
|
||||
cached_prefix_docids_sorter
|
||||
.insert_add(prefix.as_bytes(), lazy_data.decode().unwrap())?;
|
||||
}
|
||||
}
|
||||
|
||||
@ -125,7 +130,7 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> {
|
||||
|
||||
// We finally write the word prefix docids into the LMDB database.
|
||||
write_sorter_into_database(
|
||||
prefix_docids_sorter,
|
||||
cached_prefix_docids_sorter.into_sorter()?,
|
||||
&self.word_prefix_docids,
|
||||
self.wtxn,
|
||||
database_is_empty,
|
||||
@ -139,12 +144,12 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> {
|
||||
|
||||
fn write_prefixes_in_sorter(
|
||||
prefixes: &mut HashMap<Vec<u8>, Vec<Vec<u8>>>,
|
||||
sorter: &mut grenad::Sorter<MergeFn>,
|
||||
sorter: &mut SorterCacheDelAddCboRoaringBitmap<20, MergeFn>,
|
||||
) -> Result<()> {
|
||||
for (key, data_slices) in prefixes.drain() {
|
||||
for data in data_slices {
|
||||
if valid_lmdb_key(&key) {
|
||||
sorter.insert(&key, data)?;
|
||||
sorter.direct_insert(&key, &data)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,4 +1,5 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::num::NonZeroUsize;
|
||||
use std::str;
|
||||
|
||||
use grenad::CompressionType;
|
||||
@ -9,7 +10,8 @@ use tracing::debug;
|
||||
use crate::error::SerializationError;
|
||||
use crate::heed_codec::StrBEU16Codec;
|
||||
use crate::index::main_key::WORDS_PREFIXES_FST_KEY;
|
||||
use crate::update::del_add::{deladd_serialize_add_side, DelAdd, KvWriterDelAdd};
|
||||
use crate::update::del_add::deladd_serialize_add_side;
|
||||
use crate::update::index_documents::cache::SorterCacheDelAddCboRoaringBitmap;
|
||||
use crate::update::index_documents::{
|
||||
create_sorter, merge_deladd_cbo_roaring_bitmaps,
|
||||
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap, valid_lmdb_key,
|
||||
@ -59,7 +61,7 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> {
|
||||
) -> Result<()> {
|
||||
debug!("Computing and writing the word levels integers docids into LMDB on disk...");
|
||||
|
||||
let mut prefix_integer_docids_sorter = create_sorter(
|
||||
let prefix_integer_docids_sorter = create_sorter(
|
||||
grenad::SortAlgorithm::Unstable,
|
||||
merge_deladd_cbo_roaring_bitmaps,
|
||||
self.chunk_compression_type,
|
||||
@ -67,6 +69,11 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> {
|
||||
self.max_nb_chunks,
|
||||
self.max_memory,
|
||||
);
|
||||
let mut cached_prefix_integer_docids_sorter =
|
||||
SorterCacheDelAddCboRoaringBitmap::<20, MergeFn>::new(
|
||||
NonZeroUsize::new(1000).unwrap(),
|
||||
prefix_integer_docids_sorter,
|
||||
);
|
||||
|
||||
if !common_prefix_fst_words.is_empty() {
|
||||
// We fetch all the new common prefixes between the previous and new prefix fst.
|
||||
@ -84,7 +91,7 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> {
|
||||
_otherwise => {
|
||||
write_prefixes_in_sorter(
|
||||
&mut prefixes_cache,
|
||||
&mut prefix_integer_docids_sorter,
|
||||
&mut cached_prefix_integer_docids_sorter,
|
||||
)?;
|
||||
common_prefix_fst_words
|
||||
.iter()
|
||||
@ -110,12 +117,14 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> {
|
||||
}
|
||||
}
|
||||
|
||||
write_prefixes_in_sorter(&mut prefixes_cache, &mut prefix_integer_docids_sorter)?;
|
||||
write_prefixes_in_sorter(
|
||||
&mut prefixes_cache,
|
||||
&mut cached_prefix_integer_docids_sorter,
|
||||
)?;
|
||||
}
|
||||
|
||||
// We fetch the docids associated to the newly added word prefix fst only.
|
||||
let db = self.word_database.remap_data_type::<Bytes>();
|
||||
let mut buffer = Vec::new();
|
||||
let db = self.word_database.lazily_decode_data();
|
||||
for prefix_bytes in new_prefix_fst_words {
|
||||
let prefix = str::from_utf8(prefix_bytes.as_bytes()).map_err(|_| {
|
||||
SerializationError::Decoding { db_name: Some(WORDS_PREFIXES_FST_KEY) }
|
||||
@ -127,15 +136,12 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> {
|
||||
.prefix_iter(self.wtxn, prefix_bytes.as_bytes())?
|
||||
.remap_key_type::<StrBEU16Codec>();
|
||||
for result in iter {
|
||||
let ((word, pos), data) = result?;
|
||||
let ((word, pos), lazy_data) = result?;
|
||||
if word.starts_with(prefix) {
|
||||
let key = (prefix, pos);
|
||||
let bytes = StrBEU16Codec::bytes_encode(&key).unwrap();
|
||||
|
||||
buffer.clear();
|
||||
let mut writer = KvWriterDelAdd::new(&mut buffer);
|
||||
writer.insert(DelAdd::Addition, data)?;
|
||||
prefix_integer_docids_sorter.insert(bytes, writer.into_inner()?)?;
|
||||
cached_prefix_integer_docids_sorter
|
||||
.insert_add(&bytes, lazy_data.decode().unwrap())?;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -159,7 +165,7 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> {
|
||||
|
||||
// We finally write all the word prefix integer docids into the LMDB database.
|
||||
write_sorter_into_database(
|
||||
prefix_integer_docids_sorter,
|
||||
cached_prefix_integer_docids_sorter.into_sorter()?,
|
||||
&self.prefix_database,
|
||||
self.wtxn,
|
||||
database_is_empty,
|
||||
@ -173,13 +179,13 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> {
|
||||
|
||||
fn write_prefixes_in_sorter(
|
||||
prefixes: &mut HashMap<Vec<u8>, Vec<Vec<u8>>>,
|
||||
sorter: &mut grenad::Sorter<MergeFn>,
|
||||
sorter: &mut SorterCacheDelAddCboRoaringBitmap<20, MergeFn>,
|
||||
) -> Result<()> {
|
||||
// TODO: Merge before insertion.
|
||||
for (key, data_slices) in prefixes.drain() {
|
||||
for data in data_slices {
|
||||
if valid_lmdb_key(&key) {
|
||||
sorter.insert(&key, data)?;
|
||||
sorter.direct_insert(&key, &data)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user