Compare commits

..

1 Commits

Author SHA1 Message Date
43989fe2e4 Reduce porximity range from 7 to 3 2023-10-03 12:16:48 +02:00
33 changed files with 690 additions and 1661 deletions

View File

@ -74,4 +74,4 @@ jobs:
echo "${{ steps.file.outputs.basename }}.json has just been pushed."
echo 'How to compare this benchmark with another one?'
echo ' - Check the available files with: ./benchmarks/scripts/list.sh'
echo " - Run the following command: ./benchmaks/scripts/compare.sh <file-to-compare-with> ${{ steps.file.outputs.basename }}.json"
echo " - Run the following command: ./benchmaks/scipts/compare.sh <file-to-compare-with> ${{ steps.file.outputs.basename }}.json"

View File

@ -57,10 +57,10 @@ jobs:
echo "date=$commit_date" >> $GITHUB_OUTPUT
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
uses: docker/setup-qemu-action@v2
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@v2
- name: Login to Docker Hub
uses: docker/login-action@v2
@ -70,7 +70,7 @@ jobs:
- name: Docker meta
id: meta
uses: docker/metadata-action@v5
uses: docker/metadata-action@v4
with:
images: getmeili/meilisearch
# Prevent `latest` to be updated for each new tag pushed.
@ -83,7 +83,7 @@ jobs:
type=raw,value=latest,enable=${{ steps.check-tag-format.outputs.stable == 'true' && steps.check-tag-format.outputs.latest == 'true' }}
- name: Build and push
uses: docker/build-push-action@v5
uses: docker/build-push-action@v4
with:
push: true
platforms: linux/amd64,linux/arm64

View File

@ -1,84 +0,0 @@
name: Benchmarks (PR)
on: issue_comment
permissions:
issues: write
env:
GH_TOKEN: ${{ secrets.MEILI_BOT_GH_PAT }}
jobs:
run-benchmarks-on-comment:
name: Run and upload benchmarks
runs-on: benchmarks
timeout-minutes: 4320 # 72h
steps:
- uses: actions/checkout@v3
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
override: true
- name: Check for Command
id: command
uses: xt0rted/slash-command-action@v2
with:
command: benchmark
reaction-type: "eyes"
repo-token: ${{ env.GH_TOKEN }}
# Set variables
- name: Set current branch name
shell: bash
run: echo "name=$(echo ${GITHUB_REF#refs/heads/})" >> $GITHUB_OUTPUT
id: current_branch
- name: Set normalized current branch name # Replace `/` by `_` in branch name to avoid issues when pushing to S3
shell: bash
run: echo "name=$(echo ${GITHUB_REF#refs/heads/} | tr '/' '_')" >> $GITHUB_OUTPUT
id: normalized_current_branch
- name: Set shorter commit SHA
shell: bash
run: echo "short=$(echo $GITHUB_SHA | cut -c1-8)" >> $GITHUB_OUTPUT
id: commit_sha
- name: Set file basename with format "dataset_branch_commitSHA"
shell: bash
run: echo "basename=$(echo ${{ steps.command.outputs.command-arguments }}_${{ steps.normalized_current_branch.outputs.name }}_${{ steps.commit_sha.outputs.short }})" >> $GITHUB_OUTPUT
id: file
# Run benchmarks
- name: Run benchmarks - Dataset ${{ steps.command.outputs.command-arguments }} - Branch ${{ steps.current_branch.outputs.name }} - Commit ${{ steps.commit_sha.outputs.short }}
run: |
cd benchmarks
cargo bench --bench ${{ steps.command.outputs.command-arguments }} -- --save-baseline ${{ steps.file.outputs.basename }}
# Generate critcmp files
- name: Install critcmp
uses: taiki-e/install-action@v2
with:
tool: critcmp
- name: Export cripcmp file
run: |
critcmp --export ${{ steps.file.outputs.basename }} > ${{ steps.file.outputs.basename }}.json
# Upload benchmarks
- name: Upload ${{ steps.file.outputs.basename }}.json to DO Spaces # DigitalOcean Spaces = S3
uses: BetaHuhn/do-spaces-action@v2
with:
access_key: ${{ secrets.DO_SPACES_ACCESS_KEY }}
secret_key: ${{ secrets.DO_SPACES_SECRET_KEY }}
space_name: ${{ secrets.DO_SPACES_SPACE_NAME }}
space_region: ${{ secrets.DO_SPACES_SPACE_REGION }}
source: ${{ steps.file.outputs.basename }}.json
out_dir: critcmp_results
# Compute the diff of the benchmarks and send a message on the GitHub PR
- name: Compute and send a message in the PR
env:
GITHUB_TOKEN: ${{ secrets.MEILI_BOT_GH_PAT }}
run: |
export base=$(git log --pretty=%p -n 1)
echo 'Here are your benchmarks diff 👊' >> body.txt
echo '```' >> body.txt
./benchmarks/scripts/compare.sh $base ${{ steps.file.outputs.basename }}.json >> body.txt
echo '```' >> body.txt
gh pr comment ${GITHUB_REF#refs/heads/} --body-file body.txt

1
Cargo.lock generated
View File

@ -2704,7 +2704,6 @@ dependencies = [
"logging_timer",
"maplit",
"md5",
"meili-snap",
"memmap2",
"mimalloc",
"obkv",

View File

@ -79,7 +79,6 @@ big_s = "1.0.2"
insta = "1.29.0"
maplit = "1.0.2"
md5 = "0.7.0"
meili-snap = { path = "../meili-snap" }
rand = { version = "0.8.5", features = ["small_rng"] }
[features]

View File

@ -60,16 +60,12 @@ impl CboRoaringBitmapCodec {
/// if the merged values length is under the threshold, values are directly
/// serialized in the buffer else a RoaringBitmap is created from the
/// values and is serialized in the buffer.
pub fn merge_into<I, A>(slices: I, buffer: &mut Vec<u8>) -> io::Result<()>
where
I: IntoIterator<Item = A>,
A: AsRef<[u8]>,
{
pub fn merge_into(slices: &[Cow<[u8]>], buffer: &mut Vec<u8>) -> io::Result<()> {
let mut roaring = RoaringBitmap::new();
let mut vec = Vec::new();
for bytes in slices {
if bytes.as_ref().len() <= THRESHOLD * size_of::<u32>() {
if bytes.len() <= THRESHOLD * size_of::<u32>() {
let mut reader = bytes.as_ref();
while let Ok(integer) = reader.read_u32::<NativeEndian>() {
vec.push(integer);
@ -89,7 +85,7 @@ impl CboRoaringBitmapCodec {
}
} else {
// We can unwrap safely because the vector is sorted upper.
let roaring = RoaringBitmap::from_sorted_iter(vec).unwrap();
let roaring = RoaringBitmap::from_sorted_iter(vec.into_iter()).unwrap();
roaring.serialize_into(buffer)?;
}
} else {

View File

@ -119,16 +119,16 @@ pub struct Index {
pub(crate) main: PolyDatabase,
/// A word and all the documents ids containing the word.
pub word_docids: Database<Str, CboRoaringBitmapCodec>,
pub word_docids: Database<Str, RoaringBitmapCodec>,
/// A word and all the documents ids containing the word, from attributes for which typos are not allowed.
pub exact_word_docids: Database<Str, CboRoaringBitmapCodec>,
pub exact_word_docids: Database<Str, RoaringBitmapCodec>,
/// A prefix of word and all the documents ids containing this prefix.
pub word_prefix_docids: Database<Str, CboRoaringBitmapCodec>,
pub word_prefix_docids: Database<Str, RoaringBitmapCodec>,
/// A prefix of word and all the documents ids containing this prefix, from attributes for which typos are not allowed.
pub exact_word_prefix_docids: Database<Str, CboRoaringBitmapCodec>,
pub exact_word_prefix_docids: Database<Str, RoaringBitmapCodec>,
/// Maps the proximity between a pair of words with all the docids where this relation appears.
pub word_pair_proximity_docids: Database<U8StrStrCodec, CboRoaringBitmapCodec>,

View File

@ -2,7 +2,7 @@ use std::cmp;
use crate::{relative_from_absolute_position, Position};
pub const MAX_DISTANCE: u32 = 8;
pub const MAX_DISTANCE: u32 = 4;
pub fn index_proximity(lhs: u32, rhs: u32) -> u32 {
if lhs <= rhs {

View File

@ -11,7 +11,9 @@ use super::interner::Interned;
use super::Word;
use crate::heed_codec::{BytesDecodeOwned, StrBEU16Codec};
use crate::update::{merge_cbo_roaring_bitmaps, MergeFn};
use crate::{CboRoaringBitmapCodec, CboRoaringBitmapLenCodec, Result, SearchContext};
use crate::{
CboRoaringBitmapCodec, CboRoaringBitmapLenCodec, Result, RoaringBitmapCodec, SearchContext,
};
/// A cache storing pointers to values in the LMDB databases.
///
@ -166,7 +168,7 @@ impl<'ctx> SearchContext<'ctx> {
merge_cbo_roaring_bitmaps,
)
}
None => DatabaseCache::get_value::<_, _, CboRoaringBitmapCodec>(
None => DatabaseCache::get_value::<_, _, RoaringBitmapCodec>(
self.txn,
word,
self.word_interner.get(word).as_str(),
@ -180,7 +182,7 @@ impl<'ctx> SearchContext<'ctx> {
&mut self,
word: Interned<String>,
) -> Result<Option<RoaringBitmap>> {
DatabaseCache::get_value::<_, _, CboRoaringBitmapCodec>(
DatabaseCache::get_value::<_, _, RoaringBitmapCodec>(
self.txn,
word,
self.word_interner.get(word).as_str(),
@ -228,7 +230,7 @@ impl<'ctx> SearchContext<'ctx> {
merge_cbo_roaring_bitmaps,
)
}
None => DatabaseCache::get_value::<_, _, CboRoaringBitmapCodec>(
None => DatabaseCache::get_value::<_, _, RoaringBitmapCodec>(
self.txn,
prefix,
self.word_interner.get(prefix).as_str(),
@ -242,7 +244,7 @@ impl<'ctx> SearchContext<'ctx> {
&mut self,
prefix: Interned<String>,
) -> Result<Option<RoaringBitmap>> {
DatabaseCache::get_value::<_, _, CboRoaringBitmapCodec>(
DatabaseCache::get_value::<_, _, RoaringBitmapCodec>(
self.txn,
prefix,
self.word_interner.get(prefix).as_str(),

View File

@ -1,6 +1,7 @@
#![allow(clippy::too_many_arguments)]
use super::ProximityCondition;
use crate::proximity::MAX_DISTANCE;
use crate::search::new::interner::{DedupInterner, Interned};
use crate::search::new::query_term::LocatedQueryTermSubset;
use crate::search::new::SearchContext;
@ -35,7 +36,7 @@ pub fn build_edges(
}
let mut conditions = vec![];
for cost in right_ngram_max..(7 + right_ngram_max) {
for cost in right_ngram_max..(((MAX_DISTANCE as usize) - 1) + right_ngram_max) {
conditions.push((
cost as u32,
conditions_interner.insert(ProximityCondition::Uninit {
@ -47,7 +48,7 @@ pub fn build_edges(
}
conditions.push((
(7 + right_ngram_max) as u32,
((MAX_DISTANCE - 1) + (right_ngram_max as u32)),
conditions_interner.insert(ProximityCondition::Term { term: right_term.clone() }),
));

View File

@ -13,7 +13,6 @@ This module tests the `sort` ranking rule:
use big_s::S;
use maplit::hashset;
use meili_snap::insta;
use crate::index::tests::TempIndex;
use crate::search::new::tests::collect_field_values;

View File

@ -1,104 +0,0 @@
use obkv::Key;
pub type KvWriterDelAdd<W> = obkv::KvWriter<W, DelAdd>;
pub type KvReaderDelAdd<'a> = obkv::KvReader<'a, DelAdd>;
/// DelAdd defines the new value to add in the database and old value to delete from the database.
///
/// Its used in an OBKV to be serialized in grenad files.
#[repr(u8)]
#[derive(Clone, Copy, PartialOrd, PartialEq, Debug)]
pub enum DelAdd {
Deletion = 0,
Addition = 1,
}
impl Key for DelAdd {
const BYTES_SIZE: usize = std::mem::size_of::<DelAdd>();
type BYTES = [u8; Self::BYTES_SIZE];
fn to_be_bytes(&self) -> Self::BYTES {
u8::to_be_bytes(*self as u8)
}
fn from_be_bytes(array: Self::BYTES) -> Self {
match u8::from_be_bytes(array) {
0 => Self::Deletion,
1 => Self::Addition,
otherwise => unreachable!("DelAdd has only 2 variants, unknown variant: {}", otherwise),
}
}
}
/// Creates a Kv<K, Kv<DelAdd, value>> from Kv<K, value>
///
/// if deletion is `true`, the value will be inserted behind a DelAdd::Deletion key.
/// if addition is `true`, the value will be inserted behind a DelAdd::Addition key.
/// if both deletion and addition are `true, the value will be inserted in both keys.
pub fn into_del_add_obkv<K: obkv::Key + PartialOrd>(
reader: obkv::KvReader<K>,
deletion: bool,
addition: bool,
buffer: &mut Vec<u8>,
) -> Result<(), std::io::Error> {
let mut writer = obkv::KvWriter::new(buffer);
let mut value_buffer = Vec::new();
for (key, value) in reader.iter() {
value_buffer.clear();
let mut value_writer = KvWriterDelAdd::new(&mut value_buffer);
if deletion {
value_writer.insert(DelAdd::Deletion, value)?;
}
if addition {
value_writer.insert(DelAdd::Addition, value)?;
}
value_writer.finish()?;
writer.insert(key, &value_buffer)?;
}
writer.finish()
}
/// Creates a Kv<K, Kv<DelAdd, value>> from two Kv<K, value>
///
/// putting each deletion obkv's keys under an DelAdd::Deletion
/// and putting each addition obkv's keys under an DelAdd::Addition
pub fn del_add_from_two_obkvs<K: obkv::Key + PartialOrd + Ord>(
deletion: obkv::KvReader<K>,
addition: obkv::KvReader<K>,
buffer: &mut Vec<u8>,
) -> Result<(), std::io::Error> {
use itertools::merge_join_by;
use itertools::EitherOrBoth::{Both, Left, Right};
let mut writer = obkv::KvWriter::new(buffer);
let mut value_buffer = Vec::new();
for eob in merge_join_by(deletion.iter(), addition.iter(), |(b, _), (u, _)| b.cmp(u)) {
value_buffer.clear();
match eob {
Left((k, v)) => {
let mut value_writer = KvWriterDelAdd::new(&mut value_buffer);
value_writer.insert(DelAdd::Deletion, v).unwrap();
writer.insert(k, value_writer.into_inner()?).unwrap();
}
Right((k, v)) => {
let mut value_writer = KvWriterDelAdd::new(&mut value_buffer);
value_writer.insert(DelAdd::Addition, v).unwrap();
writer.insert(k, value_writer.into_inner()?).unwrap();
}
Both((k, deletion), (_, addition)) => {
let mut value_writer = KvWriterDelAdd::new(&mut value_buffer);
value_writer.insert(DelAdd::Deletion, deletion).unwrap();
value_writer.insert(DelAdd::Addition, addition).unwrap();
writer.insert(k, value_writer.into_inner()?).unwrap();
}
}
}
writer.finish()
}
pub fn is_noop_del_add_obkv(del_add: KvReaderDelAdd) -> bool {
del_add.get(DelAdd::Deletion) == del_add.get(DelAdd::Addition)
}

View File

@ -16,7 +16,9 @@ use crate::facet::FacetType;
use crate::heed_codec::facet::FieldDocIdFacetCodec;
use crate::heed_codec::CboRoaringBitmapCodec;
use crate::index::Hnsw;
use crate::{ExternalDocumentsIds, FieldId, FieldIdMapMissingEntry, Index, Result, BEU32};
use crate::{
ExternalDocumentsIds, FieldId, FieldIdMapMissingEntry, Index, Result, RoaringBitmapCodec, BEU32,
};
pub struct DeleteDocuments<'t, 'u, 'i> {
wtxn: &'t mut heed::RwTxn<'i, 'u>,
@ -493,7 +495,7 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> {
fn remove_from_word_prefix_docids(
txn: &mut heed::RwTxn,
db: &Database<Str, CboRoaringBitmapCodec>,
db: &Database<Str, RoaringBitmapCodec>,
to_remove: &RoaringBitmap,
) -> Result<fst::Set<Vec<u8>>> {
let mut prefixes_to_delete = fst::SetBuilder::memory();
@ -521,7 +523,7 @@ fn remove_from_word_prefix_docids(
fn remove_from_word_docids(
txn: &mut heed::RwTxn,
db: &heed::Database<Str, CboRoaringBitmapCodec>,
db: &heed::Database<Str, RoaringBitmapCodec>,
to_remove: &RoaringBitmap,
words_to_keep: &mut BTreeSet<String>,
words_to_remove: &mut BTreeSet<String>,

View File

@ -132,8 +132,6 @@ impl<R: std::io::Read + std::io::Seek> FacetsUpdateBulkInner<R> {
self.db.delete_range(wtxn, &range).map(drop)?;
Ok(())
}
// TODO the new_data is an Reader<Obkv<Key, Obkv<DelAdd, RoaringBitmap>>>
fn update_level0(&mut self, wtxn: &mut RwTxn) -> Result<()> {
let new_data = match self.new_data.take() {
Some(x) => x,

View File

@ -114,7 +114,6 @@ pub struct FacetsUpdate<'i> {
min_level_size: u8,
}
impl<'i> FacetsUpdate<'i> {
// TODO grenad::Reader<Key, Obkv<DelAdd, RoaringBitmap>>
pub fn new(index: &'i Index, facet_type: FacetType, new_data: grenad::Reader<File>) -> Self {
let database = match facet_type {
FacetType::String => index

View File

@ -4,16 +4,18 @@ use std::fs::File;
use std::{io, mem, str};
use charabia::{Language, Script, SeparatorKind, Token, TokenKind, Tokenizer, TokenizerBuilder};
use obkv::{KvReader, KvWriterU16};
use obkv::KvReader;
use roaring::RoaringBitmap;
use serde_json::Value;
use super::helpers::{create_sorter, keep_latest_obkv, sorter_into_reader, GrenadParameters};
use super::helpers::{concat_u32s_array, create_sorter, sorter_into_reader, GrenadParameters};
use crate::error::{InternalError, SerializationError};
use crate::update::del_add::{del_add_from_two_obkvs, DelAdd, KvReaderDelAdd};
use crate::{FieldId, Result, MAX_POSITION_PER_ATTRIBUTE, MAX_WORD_LENGTH};
use crate::update::index_documents::MergeFn;
use crate::{
absolute_from_relative_position, FieldId, Result, MAX_POSITION_PER_ATTRIBUTE, MAX_WORD_LENGTH,
};
pub type ScriptLanguageDocidsMap = HashMap<(Script, Language), (RoaringBitmap, RoaringBitmap)>;
pub type ScriptLanguageDocidsMap = HashMap<(Script, Language), RoaringBitmap>;
/// Extracts the word and positions where this word appear and
/// prefixes it by the document id.
@ -36,153 +38,18 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
.map_or(MAX_POSITION_PER_ATTRIBUTE, |max| max.min(MAX_POSITION_PER_ATTRIBUTE));
let max_memory = indexer.max_memory_by_thread();
// initialize destination values.
let mut documents_ids = RoaringBitmap::new();
let mut script_language_docids = HashMap::new();
let mut docid_word_positions_sorter = create_sorter(
grenad::SortAlgorithm::Stable,
keep_latest_obkv,
concat_u32s_array,
indexer.chunk_compression_type,
indexer.chunk_compression_level,
indexer.max_nb_chunks,
max_memory,
);
// initialize buffers.
let mut del_buffers = Buffers::default();
let mut add_buffers = Buffers::default();
let mut key_buffer = Vec::new();
let mut value_buffer = Vec::new();
// initialize tokenizer.
let mut builder = tokenizer_builder(stop_words, dictionary, allowed_separators, None);
let tokenizer = builder.build();
// iterate over documents.
let mut cursor = obkv_documents.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? {
let document_id = key
.try_into()
.map(u32::from_be_bytes)
.map_err(|_| SerializationError::InvalidNumberSerialization)?;
let obkv = KvReader::<FieldId>::new(value);
// if the searchable fields didn't change, skip the searchable indexing for this document.
if !searchable_fields_changed(&KvReader::<FieldId>::new(value), searchable_fields) {
continue;
}
documents_ids.push(document_id);
// Update key buffer prefix.
key_buffer.clear();
key_buffer.extend_from_slice(&document_id.to_be_bytes());
// Tokenize deletions and additions in 2 diffferent threads.
let (del, add): (Result<_>, Result<_>) = rayon::join(
|| {
// deletions
lang_safe_tokens_from_document(
&obkv,
searchable_fields,
&tokenizer,
stop_words,
allowed_separators,
dictionary,
max_positions_per_attributes,
DelAdd::Deletion,
&mut del_buffers,
)
},
|| {
// additions
lang_safe_tokens_from_document(
&obkv,
searchable_fields,
&tokenizer,
stop_words,
allowed_separators,
dictionary,
max_positions_per_attributes,
DelAdd::Addition,
&mut add_buffers,
)
},
);
let (del_obkv, del_script_language_word_count) = del?;
let (add_obkv, add_script_language_word_count) = add?;
// merge deletions and additions.
value_buffer.clear();
del_add_from_two_obkvs(
KvReader::<FieldId>::new(del_obkv),
KvReader::<FieldId>::new(add_obkv),
&mut value_buffer,
)?;
// write them into the sorter.
let obkv = KvReader::<FieldId>::new(value);
for (field_id, value) in obkv.iter() {
key_buffer.truncate(mem::size_of::<u32>());
key_buffer.extend_from_slice(&field_id.to_be_bytes());
docid_word_positions_sorter.insert(&key_buffer, value)?;
}
// update script_language_docids deletions.
for (script, languages_frequency) in del_script_language_word_count {
for (language, _) in languages_frequency {
let entry = script_language_docids
.entry((script, language))
.or_insert_with(|| (RoaringBitmap::new(), RoaringBitmap::new()));
entry.0.push(document_id);
}
}
// update script_language_docids additions.
for (script, languages_frequency) in add_script_language_word_count {
for (language, _) in languages_frequency {
let entry = script_language_docids
.entry((script, language))
.or_insert_with(|| (RoaringBitmap::new(), RoaringBitmap::new()));
entry.1.push(document_id);
}
}
}
sorter_into_reader(docid_word_positions_sorter, indexer)
.map(|reader| (documents_ids, reader, script_language_docids))
}
/// Check if any searchable fields of a document changed.
fn searchable_fields_changed(
obkv: &KvReader<FieldId>,
searchable_fields: &Option<HashSet<FieldId>>,
) -> bool {
for (field_id, field_bytes) in obkv.iter() {
if searchable_fields.as_ref().map_or(true, |sf| sf.contains(&field_id)) {
let del_add = KvReaderDelAdd::new(field_bytes);
match (del_add.get(DelAdd::Deletion), del_add.get(DelAdd::Addition)) {
// if both fields are None, check the next field.
(None, None) => (),
// if both contains a value and values are the same, check the next field.
(Some(del), Some(add)) if del == add => (),
// otherwise the fields are different, return true.
_otherwise => return true,
}
}
}
false
}
/// Factorize tokenizer building.
fn tokenizer_builder<'a>(
stop_words: Option<&'a fst::Set<&[u8]>>,
allowed_separators: Option<&'a [&str]>,
dictionary: Option<&'a [&str]>,
script_language: Option<&'a HashMap<Script, Vec<Language>>>,
) -> TokenizerBuilder<'a, &'a [u8]> {
let mut buffers = Buffers::default();
let mut tokenizer_builder = TokenizerBuilder::new();
if let Some(stop_words) = stop_words {
tokenizer_builder.stop_words(stop_words);
@ -193,144 +60,130 @@ fn tokenizer_builder<'a>(
if let Some(separators) = allowed_separators {
tokenizer_builder.separators(separators);
}
let tokenizer = tokenizer_builder.build();
if let Some(script_language) = script_language {
tokenizer_builder.allow_list(&script_language);
}
let mut cursor = obkv_documents.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? {
let document_id = key
.try_into()
.map(u32::from_be_bytes)
.map_err(|_| SerializationError::InvalidNumberSerialization)?;
let obkv = KvReader::<FieldId>::new(value);
tokenizer_builder
}
documents_ids.push(document_id);
buffers.key_buffer.clear();
buffers.key_buffer.extend_from_slice(&document_id.to_be_bytes());
/// Extract words maped with their positions of a document,
/// ensuring no Language detection mistakes was made.
fn lang_safe_tokens_from_document<'a>(
obkv: &KvReader<FieldId>,
searchable_fields: &Option<HashSet<FieldId>>,
tokenizer: &Tokenizer,
stop_words: Option<&fst::Set<&[u8]>>,
allowed_separators: Option<&[&str]>,
dictionary: Option<&[&str]>,
max_positions_per_attributes: u32,
del_add: DelAdd,
buffers: &'a mut Buffers,
) -> Result<(&'a [u8], HashMap<Script, Vec<(Language, usize)>>)> {
let mut script_language_word_count = HashMap::new();
let mut script_language_word_count = HashMap::new();
tokens_from_document(
&obkv,
searchable_fields,
&tokenizer,
max_positions_per_attributes,
del_add,
buffers,
&mut script_language_word_count,
)?;
extract_tokens_from_document(
&obkv,
searchable_fields,
&tokenizer,
max_positions_per_attributes,
&mut buffers,
&mut script_language_word_count,
&mut docid_word_positions_sorter,
)?;
// if we detect a potetial mistake in the language detection,
// we rerun the extraction forcing the tokenizer to detect the most frequently detected Languages.
// context: https://github.com/meilisearch/meilisearch/issues/3565
if script_language_word_count
.values()
.map(Vec::as_slice)
.any(potential_language_detection_error)
{
// build an allow list with the most frequent detected languages in the document.
let script_language: HashMap<_, _> =
script_language_word_count.iter().filter_map(most_frequent_languages).collect();
// if we detect a potetial mistake in the language detection,
// we rerun the extraction forcing the tokenizer to detect the most frequently detected Languages.
// context: https://github.com/meilisearch/meilisearch/issues/3565
if script_language_word_count
.values()
.map(Vec::as_slice)
.any(potential_language_detection_error)
{
// build an allow list with the most frequent detected languages in the document.
let script_language: HashMap<_, _> =
script_language_word_count.iter().filter_map(most_frequent_languages).collect();
// if the allow list is empty, meaning that no Language is considered frequent,
// then we don't rerun the extraction.
if !script_language.is_empty() {
// build a new temporary tokenizer including the allow list.
let mut builder = tokenizer_builder(
stop_words,
dictionary,
allowed_separators,
Some(&script_language),
);
let tokenizer = builder.build();
// if the allow list is empty, meaning that no Language is considered frequent,
// then we don't rerun the extraction.
if !script_language.is_empty() {
// build a new temporary tokenizer including the allow list.
let mut tokenizer_builder = TokenizerBuilder::new();
if let Some(stop_words) = stop_words {
tokenizer_builder.stop_words(stop_words);
}
tokenizer_builder.allow_list(&script_language);
let tokenizer = tokenizer_builder.build();
script_language_word_count.clear();
script_language_word_count.clear();
// rerun the extraction.
tokens_from_document(
&obkv,
searchable_fields,
&tokenizer,
max_positions_per_attributes,
del_add,
buffers,
&mut script_language_word_count,
)?;
// rerun the extraction.
extract_tokens_from_document(
&obkv,
searchable_fields,
&tokenizer,
max_positions_per_attributes,
&mut buffers,
&mut script_language_word_count,
&mut docid_word_positions_sorter,
)?;
}
}
for (script, languages_frequency) in script_language_word_count {
for (language, _) in languages_frequency {
let entry = script_language_docids
.entry((script, language))
.or_insert_with(RoaringBitmap::new);
entry.push(document_id);
}
}
}
Ok((&buffers.obkv_buffer, script_language_word_count))
sorter_into_reader(docid_word_positions_sorter, indexer)
.map(|reader| (documents_ids, reader, script_language_docids))
}
/// Extract words maped with their positions of a document.
fn tokens_from_document<'a>(
fn extract_tokens_from_document(
obkv: &KvReader<FieldId>,
searchable_fields: &Option<HashSet<FieldId>>,
tokenizer: &Tokenizer,
max_positions_per_attributes: u32,
del_add: DelAdd,
buffers: &'a mut Buffers,
buffers: &mut Buffers,
script_language_word_count: &mut HashMap<Script, Vec<(Language, usize)>>,
) -> Result<&'a [u8]> {
buffers.obkv_buffer.clear();
let mut document_writer = KvWriterU16::new(&mut buffers.obkv_buffer);
docid_word_positions_sorter: &mut grenad::Sorter<MergeFn>,
) -> Result<()> {
for (field_id, field_bytes) in obkv.iter() {
// if field is searchable.
if searchable_fields.as_ref().map_or(true, |sf| sf.contains(&field_id)) {
// extract deletion or addition only.
if let Some(field_bytes) = KvReaderDelAdd::new(field_bytes).get(del_add) {
// parse json.
let value =
serde_json::from_slice(field_bytes).map_err(InternalError::SerdeJson)?;
let value = serde_json::from_slice(field_bytes).map_err(InternalError::SerdeJson)?;
buffers.field_buffer.clear();
if let Some(field) = json_to_string(&value, &mut buffers.field_buffer) {
let tokens = process_tokens(tokenizer.tokenize(field))
.take_while(|(p, _)| (*p as u32) < max_positions_per_attributes);
// prepare writting destination.
buffers.obkv_positions_buffer.clear();
let mut writer = KvWriterU16::new(&mut buffers.obkv_positions_buffer);
// convert json into an unique string.
buffers.field_buffer.clear();
if let Some(field) = json_to_string(&value, &mut buffers.field_buffer) {
// create an iterator of token with their positions.
let tokens = process_tokens(tokenizer.tokenize(field))
.take_while(|(p, _)| (*p as u32) < max_positions_per_attributes);
for (index, token) in tokens {
// if a language has been detected for the token, we update the counter.
if let Some(language) = token.language {
let script = token.script;
let entry =
script_language_word_count.entry(script).or_insert_with(Vec::new);
match entry.iter_mut().find(|(l, _)| *l == language) {
Some((_, n)) => *n += 1,
None => entry.push((language, 1)),
}
}
// keep a word only if it is not empty and fit in a LMDB key.
let token = token.lemma().trim();
if !token.is_empty() && token.len() <= MAX_WORD_LENGTH {
let position: u16 = index
.try_into()
.map_err(|_| SerializationError::InvalidNumberSerialization)?;
writer.insert(position, token.as_bytes())?;
for (index, token) in tokens {
// if a language has been detected for the token, we update the counter.
if let Some(language) = token.language {
let script = token.script;
let entry =
script_language_word_count.entry(script).or_insert_with(Vec::new);
match entry.iter_mut().find(|(l, _)| *l == language) {
Some((_, n)) => *n += 1,
None => entry.push((language, 1)),
}
}
let token = token.lemma().trim();
if !token.is_empty() && token.len() <= MAX_WORD_LENGTH {
buffers.key_buffer.truncate(mem::size_of::<u32>());
buffers.key_buffer.extend_from_slice(token.as_bytes());
// write positions into document.
let positions = writer.into_inner()?;
document_writer.insert(field_id, positions)?;
let position: u16 = index
.try_into()
.map_err(|_| SerializationError::InvalidNumberSerialization)?;
let position = absolute_from_relative_position(field_id, position);
docid_word_positions_sorter
.insert(&buffers.key_buffer, position.to_ne_bytes())?;
}
}
}
}
}
Ok(document_writer.into_inner().map(|v| v.as_slice())?)
Ok(())
}
/// Transform a JSON value into a string that can be indexed.
@ -433,10 +286,10 @@ fn compute_language_frequency_threshold(languages_frequency: &[(Language, usize)
#[derive(Default)]
struct Buffers {
// the key buffer is the concatenation of the internal document id with the field id.
// The buffer has to be completelly cleared between documents,
// and the field id part must be cleared between each field.
key_buffer: Vec<u8>,
// the field buffer for each fields desserialization, and must be cleared between each field.
field_buffer: String,
// buffer used to store the value data containing an obkv.
obkv_buffer: Vec<u8>,
// buffer used to store the value data containing an obkv of tokens with their positions.
obkv_positions_buffer: Vec<u8>,
}

View File

@ -4,12 +4,11 @@ use std::io;
use heed::{BytesDecode, BytesEncode};
use super::helpers::{
create_sorter, merge_deladd_cbo_roaring_bitmaps, sorter_into_reader, GrenadParameters,
create_sorter, merge_cbo_roaring_bitmaps, sorter_into_reader, GrenadParameters,
};
use crate::heed_codec::facet::{
FacetGroupKey, FacetGroupKeyCodec, FieldDocIdFacetF64Codec, OrderedF64Codec,
};
use crate::update::del_add::{KvReaderDelAdd, KvWriterDelAdd};
use crate::Result;
/// Extracts the facet number and the documents ids where this facet number appear.
@ -18,7 +17,7 @@ use crate::Result;
/// documents ids from the given chunk of docid facet number positions.
#[logging_timer::time]
pub fn extract_facet_number_docids<R: io::Read + io::Seek>(
fid_docid_facet_number: grenad::Reader<R>,
docid_fid_facet_number: grenad::Reader<R>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
puffin::profile_function!();
@ -27,30 +26,21 @@ pub fn extract_facet_number_docids<R: io::Read + io::Seek>(
let mut facet_number_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable,
merge_deladd_cbo_roaring_bitmaps,
merge_cbo_roaring_bitmaps,
indexer.chunk_compression_type,
indexer.chunk_compression_level,
indexer.max_nb_chunks,
max_memory,
);
let mut buffer = Vec::new();
let mut cursor = fid_docid_facet_number.into_cursor()?;
while let Some((key_bytes, deladd_obkv_bytes)) = cursor.move_on_next()? {
let mut cursor = docid_fid_facet_number.into_cursor()?;
while let Some((key_bytes, _)) = cursor.move_on_next()? {
let (field_id, document_id, number) =
FieldDocIdFacetF64Codec::bytes_decode(key_bytes).unwrap();
let key = FacetGroupKey { field_id, level: 0, left_bound: number };
let key_bytes = FacetGroupKeyCodec::<OrderedF64Codec>::bytes_encode(&key).unwrap();
buffer.clear();
let mut obkv = KvWriterDelAdd::new(&mut buffer);
for (deladd_key, _) in KvReaderDelAdd::new(deladd_obkv_bytes).iter() {
obkv.insert(deladd_key, document_id.to_ne_bytes())?;
}
obkv.finish()?;
facet_number_docids_sorter.insert(key_bytes, &buffer)?;
facet_number_docids_sorter.insert(key_bytes, document_id.to_ne_bytes())?;
}
sorter_into_reader(facet_number_docids_sorter, indexer)

View File

@ -1,14 +1,13 @@
use std::fs::File;
use std::{io, str};
use std::io;
use heed::BytesEncode;
use super::helpers::{create_sorter, sorter_into_reader, try_split_array_at, GrenadParameters};
use crate::heed_codec::facet::{FacetGroupKey, FacetGroupKeyCodec};
use crate::heed_codec::StrRefCodec;
use crate::update::del_add::{KvReaderDelAdd, KvWriterDelAdd};
use crate::update::index_documents::helpers::merge_deladd_cbo_roaring_bitmaps;
use crate::{FieldId, Result};
use crate::update::index_documents::merge_cbo_roaring_bitmaps;
use crate::{FieldId, Result, MAX_FACET_VALUE_LENGTH};
/// Extracts the facet string and the documents ids where this facet string appear.
///
@ -25,16 +24,15 @@ pub fn extract_facet_string_docids<R: io::Read + io::Seek>(
let mut facet_string_docids_sorter = create_sorter(
grenad::SortAlgorithm::Stable,
merge_deladd_cbo_roaring_bitmaps,
merge_cbo_roaring_bitmaps,
indexer.chunk_compression_type,
indexer.chunk_compression_level,
indexer.max_nb_chunks,
max_memory,
);
let mut buffer = Vec::new();
let mut cursor = docid_fid_facet_string.into_cursor()?;
while let Some((key, deladd_original_value_bytes)) = cursor.move_on_next()? {
while let Some((key, _original_value_bytes)) = cursor.move_on_next()? {
let (field_id_bytes, bytes) = try_split_array_at(key).unwrap();
let field_id = FieldId::from_be_bytes(field_id_bytes);
@ -42,17 +40,21 @@ pub fn extract_facet_string_docids<R: io::Read + io::Seek>(
try_split_array_at::<_, 4>(bytes).unwrap();
let document_id = u32::from_be_bytes(document_id_bytes);
let normalized_value = str::from_utf8(normalized_value_bytes)?;
let key = FacetGroupKey { field_id, level: 0, left_bound: normalized_value };
let key_bytes = FacetGroupKeyCodec::<StrRefCodec>::bytes_encode(&key).unwrap();
let mut normalised_value = std::str::from_utf8(normalized_value_bytes)?;
buffer.clear();
let mut obkv = KvWriterDelAdd::new(&mut buffer);
for (deladd_key, _) in KvReaderDelAdd::new(deladd_original_value_bytes).iter() {
obkv.insert(deladd_key, document_id.to_ne_bytes())?;
let normalised_truncated_value: String;
if normalised_value.len() > MAX_FACET_VALUE_LENGTH {
normalised_truncated_value = normalised_value
.char_indices()
.take_while(|(idx, _)| *idx < MAX_FACET_VALUE_LENGTH)
.map(|(_, c)| c)
.collect();
normalised_value = normalised_truncated_value.as_str();
}
obkv.finish()?;
facet_string_docids_sorter.insert(&key_bytes, &buffer)?;
let key = FacetGroupKey { field_id, level: 0, left_bound: normalised_value };
let key_bytes = FacetGroupKeyCodec::<StrRefCodec>::bytes_encode(&key).unwrap();
// document id is encoded in native-endian because of the CBO roaring bitmap codec
facet_string_docids_sorter.insert(&key_bytes, document_id.to_ne_bytes())?;
}
sorter_into_reader(facet_string_docids_sorter, indexer)

View File

@ -1,36 +1,24 @@
use std::borrow::Cow;
use std::collections::{BTreeMap, HashSet};
use std::convert::TryInto;
use std::fs::File;
use std::io;
use std::mem::size_of;
use std::result::Result as StdResult;
use grenad::Sorter;
use heed::zerocopy::AsBytes;
use heed::BytesEncode;
use itertools::EitherOrBoth;
use ordered_float::OrderedFloat;
use roaring::RoaringBitmap;
use serde_json::{from_slice, Value};
use FilterableValues::{Empty, Null, Values};
use super::helpers::{create_sorter, keep_first, sorter_into_reader, GrenadParameters};
use crate::error::InternalError;
use crate::facet::value_encoding::f64_into_bytes;
use crate::update::del_add::{DelAdd, KvWriterDelAdd};
use crate::update::index_documents::{create_writer, writer_into_reader};
use crate::{
CboRoaringBitmapCodec, DocumentId, Error, FieldId, Result, BEU32, MAX_FACET_VALUE_LENGTH,
};
/// The length of the elements that are always in the buffer when inserting new values.
const TRUNCATE_SIZE: usize = size_of::<FieldId>() + size_of::<DocumentId>();
use crate::{CboRoaringBitmapCodec, DocumentId, FieldId, Result, BEU32, MAX_FACET_VALUE_LENGTH};
/// The extracted facet values stored in grenad files by type.
pub struct ExtractedFacetValues {
pub fid_docid_facet_numbers_chunk: grenad::Reader<File>,
pub fid_docid_facet_strings_chunk: grenad::Reader<File>,
pub docid_fid_facet_numbers_chunk: grenad::Reader<File>,
pub docid_fid_facet_strings_chunk: grenad::Reader<File>,
pub fid_facet_is_null_docids_chunk: grenad::Reader<File>,
pub fid_facet_is_empty_docids_chunk: grenad::Reader<File>,
pub fid_facet_exists_docids_chunk: grenad::Reader<File>,
@ -70,150 +58,71 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
max_memory.map(|m| m / 2),
);
// The tuples represents the Del and Add side for a bitmap
let mut facet_exists_docids = BTreeMap::<FieldId, (RoaringBitmap, RoaringBitmap)>::new();
let mut facet_is_null_docids = BTreeMap::<FieldId, (RoaringBitmap, RoaringBitmap)>::new();
let mut facet_is_empty_docids = BTreeMap::<FieldId, (RoaringBitmap, RoaringBitmap)>::new();
// We create two buffer for mutable ref issues with closures.
let mut numbers_key_buffer = Vec::new();
let mut strings_key_buffer = Vec::new();
let mut facet_exists_docids = BTreeMap::<FieldId, RoaringBitmap>::new();
let mut facet_is_null_docids = BTreeMap::<FieldId, RoaringBitmap>::new();
let mut facet_is_empty_docids = BTreeMap::<FieldId, RoaringBitmap>::new();
let mut key_buffer = Vec::new();
let mut cursor = obkv_documents.into_cursor()?;
while let Some((docid_bytes, value)) = cursor.move_on_next()? {
let obkv = obkv::KvReader::new(value);
for (field_id, field_bytes) in obkv.iter() {
if faceted_fields.contains(&field_id) {
numbers_key_buffer.clear();
strings_key_buffer.clear();
key_buffer.clear();
// Set key to the field_id
// Note: this encoding is consistent with FieldIdCodec
numbers_key_buffer.extend_from_slice(&field_id.to_be_bytes());
strings_key_buffer.extend_from_slice(&field_id.to_be_bytes());
key_buffer.extend_from_slice(&field_id.to_be_bytes());
// Here, we know already that the document must be added to the “field id exists” database
let document: [u8; 4] = docid_bytes[..4].try_into().ok().unwrap();
let document = BEU32::from(document).get();
facet_exists_docids.entry(field_id).or_default().insert(document);
// For the other extraction tasks, prefix the key with the field_id and the document_id
numbers_key_buffer.extend_from_slice(docid_bytes);
strings_key_buffer.extend_from_slice(docid_bytes);
key_buffer.extend_from_slice(docid_bytes);
let del_add_obkv = obkv::KvReader::new(field_bytes);
let del_value = match del_add_obkv.get(DelAdd::Deletion) {
Some(bytes) => from_slice(bytes).map_err(InternalError::SerdeJson)?,
None => None,
};
let add_value = match del_add_obkv.get(DelAdd::Addition) {
Some(bytes) => from_slice(bytes).map_err(InternalError::SerdeJson)?,
None => None,
};
let value = from_slice(field_bytes).map_err(InternalError::SerdeJson)?;
// We insert the document id on the Del and the Add side if the field exists.
let (ref mut del_exists, ref mut add_exists) =
facet_exists_docids.entry(field_id).or_default();
let (ref mut del_is_null, ref mut add_is_null) =
facet_is_null_docids.entry(field_id).or_default();
let (ref mut del_is_empty, ref mut add_is_empty) =
facet_is_empty_docids.entry(field_id).or_default();
match extract_facet_values(
&value,
geo_fields_ids.map_or(false, |(lat, lng)| field_id == lat || field_id == lng),
) {
FilterableValues::Null => {
facet_is_null_docids.entry(field_id).or_default().insert(document);
}
FilterableValues::Empty => {
facet_is_empty_docids.entry(field_id).or_default().insert(document);
}
FilterableValues::Values { numbers, strings } => {
// insert facet numbers in sorter
for number in numbers {
key_buffer.truncate(size_of::<FieldId>() + size_of::<DocumentId>());
if let Some(value_bytes) = f64_into_bytes(number) {
key_buffer.extend_from_slice(&value_bytes);
key_buffer.extend_from_slice(&number.to_be_bytes());
if del_value.is_some() {
del_exists.insert(document);
}
if add_value.is_some() {
add_exists.insert(document);
}
fid_docid_facet_numbers_sorter
.insert(&key_buffer, ().as_bytes())?;
}
}
let geo_support =
geo_fields_ids.map_or(false, |(lat, lng)| field_id == lat || field_id == lng);
let del_filterable_values =
del_value.map(|value| extract_facet_values(&value, geo_support));
let add_filterable_values =
add_value.map(|value| extract_facet_values(&value, geo_support));
// insert normalized and original facet string in sorter
for (normalized, original) in
strings.into_iter().filter(|(n, _)| !n.is_empty())
{
let normalized_truncated_value: String = normalized
.char_indices()
.take_while(|(idx, _)| idx + 4 < MAX_FACET_VALUE_LENGTH)
.map(|(_, c)| c)
.collect();
// Those closures are just here to simplify things a bit.
let mut insert_numbers_diff = |del_numbers, add_numbers| {
insert_numbers_diff(
&mut fid_docid_facet_numbers_sorter,
&mut numbers_key_buffer,
del_numbers,
add_numbers,
)
};
let mut insert_strings_diff = |del_strings, add_strings| {
insert_strings_diff(
&mut fid_docid_facet_strings_sorter,
&mut strings_key_buffer,
del_strings,
add_strings,
)
};
match (del_filterable_values, add_filterable_values) {
(None, None) => (),
(Some(del_filterable_values), None) => match del_filterable_values {
Null => {
del_is_null.insert(document);
}
Empty => {
del_is_empty.insert(document);
}
Values { numbers, strings } => {
insert_numbers_diff(numbers, vec![])?;
insert_strings_diff(strings, vec![])?;
}
},
(None, Some(add_filterable_values)) => match add_filterable_values {
Null => {
add_is_null.insert(document);
}
Empty => {
add_is_empty.insert(document);
}
Values { numbers, strings } => {
insert_numbers_diff(vec![], numbers)?;
insert_strings_diff(vec![], strings)?;
}
},
(Some(del_filterable_values), Some(add_filterable_values)) => {
match (del_filterable_values, add_filterable_values) {
(Null, Null) | (Empty, Empty) => (),
(Null, Empty) => {
del_is_null.insert(document);
add_is_empty.insert(document);
}
(Empty, Null) => {
del_is_empty.insert(document);
add_is_null.insert(document);
}
(Null, Values { numbers, strings }) => {
insert_numbers_diff(vec![], numbers)?;
insert_strings_diff(vec![], strings)?;
del_is_null.insert(document);
}
(Empty, Values { numbers, strings }) => {
insert_numbers_diff(vec![], numbers)?;
insert_strings_diff(vec![], strings)?;
del_is_empty.insert(document);
}
(Values { numbers, strings }, Null) => {
add_is_null.insert(document);
insert_numbers_diff(numbers, vec![])?;
insert_strings_diff(strings, vec![])?;
}
(Values { numbers, strings }, Empty) => {
add_is_empty.insert(document);
insert_numbers_diff(numbers, vec![])?;
insert_strings_diff(strings, vec![])?;
}
(
Values { numbers: del_numbers, strings: del_strings },
Values { numbers: add_numbers, strings: add_strings },
) => {
insert_numbers_diff(del_numbers, add_numbers)?;
insert_strings_diff(del_strings, add_strings)?;
}
key_buffer.truncate(size_of::<FieldId>() + size_of::<DocumentId>());
key_buffer.extend_from_slice(normalized_truncated_value.as_bytes());
fid_docid_facet_strings_sorter
.insert(&key_buffer, original.as_bytes())?;
}
}
}
@ -221,15 +130,14 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
}
}
let mut buffer = Vec::new();
let mut facet_exists_docids_writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
for (fid, (del_bitmap, add_bitmap)) in facet_exists_docids.into_iter() {
deladd_obkv_cbo_roaring_bitmaps(&mut buffer, &del_bitmap, &add_bitmap)?;
facet_exists_docids_writer.insert(fid.to_be_bytes(), &buffer)?;
for (fid, bitmap) in facet_exists_docids.into_iter() {
let bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&bitmap).unwrap();
facet_exists_docids_writer.insert(fid.to_be_bytes(), &bitmap_bytes)?;
}
let facet_exists_docids_reader = writer_into_reader(facet_exists_docids_writer)?;
@ -238,9 +146,9 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
for (fid, (del_bitmap, add_bitmap)) in facet_is_null_docids.into_iter() {
deladd_obkv_cbo_roaring_bitmaps(&mut buffer, &del_bitmap, &add_bitmap)?;
facet_is_null_docids_writer.insert(fid.to_be_bytes(), &buffer)?;
for (fid, bitmap) in facet_is_null_docids.into_iter() {
let bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&bitmap).unwrap();
facet_is_null_docids_writer.insert(fid.to_be_bytes(), &bitmap_bytes)?;
}
let facet_is_null_docids_reader = writer_into_reader(facet_is_null_docids_writer)?;
@ -249,156 +157,21 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
for (fid, (del_bitmap, add_bitmap)) in facet_is_empty_docids.into_iter() {
deladd_obkv_cbo_roaring_bitmaps(&mut buffer, &del_bitmap, &add_bitmap)?;
facet_is_empty_docids_writer.insert(fid.to_be_bytes(), &buffer)?;
for (fid, bitmap) in facet_is_empty_docids.into_iter() {
let bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&bitmap).unwrap();
facet_is_empty_docids_writer.insert(fid.to_be_bytes(), &bitmap_bytes)?;
}
let facet_is_empty_docids_reader = writer_into_reader(facet_is_empty_docids_writer)?;
Ok(ExtractedFacetValues {
fid_docid_facet_numbers_chunk: sorter_into_reader(fid_docid_facet_numbers_sorter, indexer)?,
fid_docid_facet_strings_chunk: sorter_into_reader(fid_docid_facet_strings_sorter, indexer)?,
docid_fid_facet_numbers_chunk: sorter_into_reader(fid_docid_facet_numbers_sorter, indexer)?,
docid_fid_facet_strings_chunk: sorter_into_reader(fid_docid_facet_strings_sorter, indexer)?,
fid_facet_is_null_docids_chunk: facet_is_null_docids_reader,
fid_facet_is_empty_docids_chunk: facet_is_empty_docids_reader,
fid_facet_exists_docids_chunk: facet_exists_docids_reader,
})
}
/// Generates a vector of bytes containing a DelAdd obkv with two bitmaps.
fn deladd_obkv_cbo_roaring_bitmaps(
buffer: &mut Vec<u8>,
del_bitmap: &RoaringBitmap,
add_bitmap: &RoaringBitmap,
) -> io::Result<()> {
buffer.clear();
let mut obkv = KvWriterDelAdd::new(buffer);
let del_bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(del_bitmap).unwrap();
let add_bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(add_bitmap).unwrap();
obkv.insert(DelAdd::Deletion, del_bitmap_bytes)?;
obkv.insert(DelAdd::Addition, add_bitmap_bytes)?;
obkv.finish()
}
/// Truncates a string to the biggest valid LMDB key size.
fn truncate_string(s: String) -> String {
s.char_indices()
.take_while(|(idx, _)| idx + 4 < MAX_FACET_VALUE_LENGTH)
.map(|(_, c)| c)
.collect()
}
/// Computes the diff between both Del and Add numbers and
/// only inserts the parts that differ in the sorter.
fn insert_numbers_diff<MF>(
fid_docid_facet_numbers_sorter: &mut Sorter<MF>,
key_buffer: &mut Vec<u8>,
mut del_numbers: Vec<f64>,
mut add_numbers: Vec<f64>,
) -> Result<()>
where
MF: for<'a> Fn(&[u8], &[Cow<'a, [u8]>]) -> StdResult<Cow<'a, [u8]>, Error>,
{
// We sort and dedup the float numbers
del_numbers.sort_unstable_by_key(|f| OrderedFloat(*f));
add_numbers.sort_unstable_by_key(|f| OrderedFloat(*f));
del_numbers.dedup_by_key(|f| OrderedFloat(*f));
add_numbers.dedup_by_key(|f| OrderedFloat(*f));
let merged_numbers_iter = itertools::merge_join_by(
del_numbers.into_iter().map(OrderedFloat),
add_numbers.into_iter().map(OrderedFloat),
|del, add| del.cmp(add),
);
// insert facet numbers in sorter
for eob in merged_numbers_iter {
key_buffer.truncate(TRUNCATE_SIZE);
match eob {
EitherOrBoth::Both(_, _) => (), // no need to touch anything
EitherOrBoth::Left(OrderedFloat(number)) => {
if let Some(value_bytes) = f64_into_bytes(number) {
key_buffer.extend_from_slice(&value_bytes);
key_buffer.extend_from_slice(&number.to_be_bytes());
// We insert only the Del part of the Obkv to inform
// that we only want to remove all those numbers.
let mut obkv = KvWriterDelAdd::memory();
obkv.insert(DelAdd::Deletion, ().as_bytes())?;
let bytes = obkv.into_inner()?;
fid_docid_facet_numbers_sorter.insert(&key_buffer, bytes)?;
}
}
EitherOrBoth::Right(OrderedFloat(number)) => {
if let Some(value_bytes) = f64_into_bytes(number) {
key_buffer.extend_from_slice(&value_bytes);
key_buffer.extend_from_slice(&number.to_be_bytes());
// We insert only the Del part of the Obkv to inform
// that we only want to remove all those numbers.
let mut obkv = KvWriterDelAdd::memory();
obkv.insert(DelAdd::Addition, ().as_bytes())?;
let bytes = obkv.into_inner()?;
fid_docid_facet_numbers_sorter.insert(&key_buffer, bytes)?;
}
}
}
}
Ok(())
}
/// Computes the diff between both Del and Add strings and
/// only inserts the parts that differ in the sorter.
fn insert_strings_diff<MF>(
fid_docid_facet_strings_sorter: &mut Sorter<MF>,
key_buffer: &mut Vec<u8>,
mut del_strings: Vec<(String, String)>,
mut add_strings: Vec<(String, String)>,
) -> Result<()>
where
MF: for<'a> Fn(&[u8], &[Cow<'a, [u8]>]) -> StdResult<Cow<'a, [u8]>, Error>,
{
// We sort and dedup the normalized and original strings
del_strings.sort_unstable();
add_strings.sort_unstable();
del_strings.dedup();
add_strings.dedup();
let merged_strings_iter = itertools::merge_join_by(
del_strings.into_iter().filter(|(n, _)| !n.is_empty()),
add_strings.into_iter().filter(|(n, _)| !n.is_empty()),
|del, add| del.cmp(add),
);
// insert normalized and original facet string in sorter
for eob in merged_strings_iter {
key_buffer.truncate(TRUNCATE_SIZE);
match eob {
EitherOrBoth::Both(_, _) => (), // no need to touch anything
EitherOrBoth::Left((normalized, original)) => {
let truncated = truncate_string(normalized);
key_buffer.extend_from_slice(truncated.as_bytes());
let mut obkv = KvWriterDelAdd::memory();
obkv.insert(DelAdd::Deletion, original)?;
let bytes = obkv.into_inner()?;
fid_docid_facet_strings_sorter.insert(&key_buffer, bytes)?;
}
EitherOrBoth::Right((normalized, original)) => {
let truncated = truncate_string(normalized);
key_buffer.extend_from_slice(truncated.as_bytes());
let mut obkv = KvWriterDelAdd::memory();
obkv.insert(DelAdd::Addition, original)?;
let bytes = obkv.into_inner()?;
fid_docid_facet_strings_sorter.insert(&key_buffer, bytes)?;
}
}
}
Ok(())
}
/// Represent what a document field contains.
enum FilterableValues {
/// Corresponds to the JSON `null` value.
@ -409,7 +182,6 @@ enum FilterableValues {
Values { numbers: Vec<f64>, strings: Vec<(String, String)> },
}
/// Extracts the facet values of a JSON field.
fn extract_facet_values(value: &Value, geo_field: bool) -> FilterableValues {
fn inner_extract_facet_values(
value: &Value,

View File

@ -1,17 +1,16 @@
use std::collections::HashMap;
use std::fs::File;
use std::io;
use obkv::KvReaderU16;
use grenad::Sorter;
use super::helpers::{
create_sorter, merge_cbo_roaring_bitmaps, sorter_into_reader, try_split_array_at,
GrenadParameters,
create_sorter, merge_cbo_roaring_bitmaps, read_u32_ne_bytes, sorter_into_reader,
try_split_array_at, GrenadParameters, MergeFn,
};
use crate::error::SerializationError;
use crate::index::db_name::DOCID_WORD_POSITIONS;
use crate::Result;
const MAX_COUNTED_WORDS: usize = 30;
use crate::{relative_from_absolute_position, DocumentId, FieldId, Result};
/// Extracts the field id word count and the documents ids where
/// this field id with this amount of words appear.
@ -36,21 +35,63 @@ pub fn extract_fid_word_count_docids<R: io::Read + io::Seek>(
max_memory,
);
let mut key_buffer = Vec::new();
// This map is assumed to not consume a lot of memory.
let mut document_fid_wordcount = HashMap::new();
let mut current_document_id = None;
let mut cursor = docid_word_positions.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? {
let (document_id_bytes, fid_bytes) = try_split_array_at(key)
let (document_id_bytes, _word_bytes) = try_split_array_at(key)
.ok_or(SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?;
let document_id = u32::from_be_bytes(document_id_bytes);
let word_count = KvReaderU16::new(&value).iter().take(MAX_COUNTED_WORDS + 1).count();
if word_count <= MAX_COUNTED_WORDS {
key_buffer.clear();
key_buffer.extend_from_slice(fid_bytes);
key_buffer.push(word_count as u8);
fid_word_count_docids_sorter.insert(&key_buffer, document_id.to_ne_bytes())?;
let curr_document_id = *current_document_id.get_or_insert(document_id);
if curr_document_id != document_id {
drain_document_fid_wordcount_into_sorter(
&mut fid_word_count_docids_sorter,
&mut document_fid_wordcount,
curr_document_id,
)?;
current_document_id = Some(document_id);
}
for position in read_u32_ne_bytes(value) {
let (field_id, _) = relative_from_absolute_position(position);
let value = document_fid_wordcount.entry(field_id as FieldId).or_insert(0);
*value += 1;
}
}
if let Some(document_id) = current_document_id {
// We must make sure that don't lose the current document field id
// word count map if we break because we reached the end of the chunk.
drain_document_fid_wordcount_into_sorter(
&mut fid_word_count_docids_sorter,
&mut document_fid_wordcount,
document_id,
)?;
}
sorter_into_reader(fid_word_count_docids_sorter, indexer)
}
fn drain_document_fid_wordcount_into_sorter(
fid_word_count_docids_sorter: &mut Sorter<MergeFn>,
document_fid_wordcount: &mut HashMap<FieldId, u32>,
document_id: DocumentId,
) -> Result<()> {
let mut key_buffer = Vec::new();
for (fid, count) in document_fid_wordcount.drain() {
if count <= 30 {
key_buffer.clear();
key_buffer.extend_from_slice(&fid.to_be_bytes());
key_buffer.push(count as u8);
fid_word_count_docids_sorter.insert(&key_buffer, document_id.to_ne_bytes())?;
}
}
Ok(())
}

View File

@ -1,20 +1,18 @@
use std::collections::{BTreeSet, HashSet};
use std::collections::HashSet;
use std::fs::File;
use std::io;
use std::iter::FromIterator;
use heed::BytesDecode;
use obkv::KvReaderU16;
use roaring::RoaringBitmap;
use super::helpers::{
create_sorter, create_writer, merge_deladd_cbo_roaring_bitmaps, sorter_into_reader,
try_split_array_at, writer_into_reader, GrenadParameters,
create_sorter, merge_roaring_bitmaps, serialize_roaring_bitmap, sorter_into_reader,
try_split_array_at, GrenadParameters,
};
use crate::error::SerializationError;
use crate::heed_codec::StrBEU16Codec;
use crate::index::db_name::DOCID_WORD_POSITIONS;
use crate::update::del_add::{is_noop_del_add_obkv, DelAdd, KvReaderDelAdd, KvWriterDelAdd};
use crate::update::MergeFn;
use crate::{DocumentId, FieldId, Result};
use crate::update::index_documents::helpers::read_u32_ne_bytes;
use crate::{relative_from_absolute_position, FieldId, Result};
/// Extracts the word and the documents ids where this word appear.
///
@ -28,148 +26,65 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
docid_word_positions: grenad::Reader<R>,
indexer: GrenadParameters,
exact_attributes: &HashSet<FieldId>,
) -> Result<(grenad::Reader<File>, grenad::Reader<File>, grenad::Reader<File>)> {
) -> Result<(grenad::Reader<File>, grenad::Reader<File>)> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread();
let mut word_fid_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable,
merge_deladd_cbo_roaring_bitmaps,
indexer.chunk_compression_type,
indexer.chunk_compression_level,
indexer.max_nb_chunks,
max_memory.map(|x| x / 3),
);
let mut key_buffer = Vec::new();
let mut del_words = BTreeSet::new();
let mut add_words = BTreeSet::new();
let mut cursor = docid_word_positions.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? {
let (document_id_bytes, fid_bytes) = try_split_array_at(key)
.ok_or(SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?;
let (fid_bytes, _) = try_split_array_at(fid_bytes)
.ok_or(SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?;
let document_id = u32::from_be_bytes(document_id_bytes);
let fid = u16::from_be_bytes(fid_bytes);
let del_add_reader = KvReaderDelAdd::new(&value);
// extract all unique words to remove.
if let Some(deletion) = del_add_reader.get(DelAdd::Deletion) {
for (_pos, word) in KvReaderU16::new(&deletion).iter() {
del_words.insert(word.to_vec());
}
}
// extract all unique additional words.
if let Some(addition) = del_add_reader.get(DelAdd::Addition) {
for (_pos, word) in KvReaderU16::new(&addition).iter() {
add_words.insert(word.to_vec());
}
}
words_into_sorter(
document_id,
fid,
&mut key_buffer,
&del_words,
&add_words,
&mut word_fid_docids_sorter,
)?;
del_words.clear();
add_words.clear();
}
let mut word_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable,
merge_deladd_cbo_roaring_bitmaps,
merge_roaring_bitmaps,
indexer.chunk_compression_type,
indexer.chunk_compression_level,
indexer.max_nb_chunks,
max_memory.map(|x| x / 3),
max_memory.map(|x| x / 2),
);
let mut exact_word_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable,
merge_deladd_cbo_roaring_bitmaps,
merge_roaring_bitmaps,
indexer.chunk_compression_type,
indexer.chunk_compression_level,
indexer.max_nb_chunks,
max_memory.map(|x| x / 3),
max_memory.map(|x| x / 2),
);
let mut word_fid_docids_writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
let mut iter = word_fid_docids_sorter.into_stream_merger_iter()?;
// TODO: replace sorters by writers by accumulating values into a buffer before inserting them.
while let Some((key, value)) = iter.next()? {
// only keep the value if their is a change to apply in the DB.
if !is_noop_del_add_obkv(KvReaderDelAdd::new(value)) {
word_fid_docids_writer.insert(key, value)?;
}
let (word, fid) = StrBEU16Codec::bytes_decode(key)
let mut value_buffer = Vec::new();
let mut cursor = docid_word_positions.into_cursor()?;
while let Some((key, positions)) = cursor.move_on_next()? {
let (document_id_bytes, word_bytes) = try_split_array_at(key)
.ok_or(SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?;
let document_id = u32::from_be_bytes(document_id_bytes);
// every words contained in an attribute set to exact must be pushed in the exact_words list.
if exact_attributes.contains(&fid) {
exact_word_docids_sorter.insert(word.as_bytes(), &value)?;
let bitmap = RoaringBitmap::from_iter(Some(document_id));
serialize_roaring_bitmap(&bitmap, &mut value_buffer)?;
// If there are no exact attributes, we do not need to iterate over positions.
if exact_attributes.is_empty() {
word_docids_sorter.insert(word_bytes, &value_buffer)?;
} else {
word_docids_sorter.insert(word.as_bytes(), &value)?;
let mut added_to_exact = false;
let mut added_to_word_docids = false;
for position in read_u32_ne_bytes(positions) {
// as soon as we know that this word had been to both readers, we don't need to
// iterate over the positions.
if added_to_exact && added_to_word_docids {
break;
}
let (fid, _) = relative_from_absolute_position(position);
if exact_attributes.contains(&fid) && !added_to_exact {
exact_word_docids_sorter.insert(word_bytes, &value_buffer)?;
added_to_exact = true;
} else if !added_to_word_docids {
word_docids_sorter.insert(word_bytes, &value_buffer)?;
added_to_word_docids = true;
}
}
}
}
Ok((
sorter_into_reader(word_docids_sorter, indexer)?,
sorter_into_reader(exact_word_docids_sorter, indexer)?,
writer_into_reader(word_fid_docids_writer)?,
))
}
fn words_into_sorter(
document_id: DocumentId,
fid: FieldId,
key_buffer: &mut Vec<u8>,
del_words: &BTreeSet<Vec<u8>>,
add_words: &BTreeSet<Vec<u8>>,
word_fid_docids_sorter: &mut grenad::Sorter<MergeFn>,
) -> Result<()> {
puffin::profile_function!();
use itertools::merge_join_by;
use itertools::EitherOrBoth::{Both, Left, Right};
let mut buffer = Vec::new();
for eob in merge_join_by(del_words.iter(), add_words.iter(), |d, a| d.cmp(a)) {
buffer.clear();
let mut value_writer = KvWriterDelAdd::new(&mut buffer);
let word_bytes = match eob {
Left(word_bytes) => {
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap();
word_bytes
}
Right(word_bytes) => {
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
word_bytes
}
Both(word_bytes, _) => {
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap();
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
word_bytes
}
};
key_buffer.clear();
key_buffer.extend_from_slice(&word_bytes);
key_buffer.push(0);
key_buffer.extend_from_slice(&fid.to_be_bytes());
word_fid_docids_sorter.insert(&key_buffer, value_writer.into_inner().unwrap())?;
}
Ok(())
}

View File

@ -0,0 +1,51 @@
use std::fs::File;
use std::io;
use super::helpers::{
create_sorter, merge_cbo_roaring_bitmaps, read_u32_ne_bytes, sorter_into_reader,
try_split_array_at, GrenadParameters,
};
use crate::error::SerializationError;
use crate::index::db_name::DOCID_WORD_POSITIONS;
use crate::{relative_from_absolute_position, DocumentId, Result};
/// Extracts the word, field id, and the documents ids where this word appear at this field id.
#[logging_timer::time]
pub fn extract_word_fid_docids<R: io::Read + io::Seek>(
docid_word_positions: grenad::Reader<R>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread();
let mut word_fid_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable,
merge_cbo_roaring_bitmaps,
indexer.chunk_compression_type,
indexer.chunk_compression_level,
indexer.max_nb_chunks,
max_memory,
);
let mut key_buffer = Vec::new();
let mut cursor = docid_word_positions.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? {
let (document_id_bytes, word_bytes) = try_split_array_at(key)
.ok_or(SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?;
let document_id = DocumentId::from_be_bytes(document_id_bytes);
for position in read_u32_ne_bytes(value) {
key_buffer.clear();
key_buffer.extend_from_slice(word_bytes);
key_buffer.push(0);
let (fid, _) = relative_from_absolute_position(position);
key_buffer.extend_from_slice(&fid.to_be_bytes());
word_fid_docids_sorter.insert(&key_buffer, document_id.to_ne_bytes())?;
}
}
let word_fid_docids_reader = sorter_into_reader(word_fid_docids_sorter, indexer)?;
Ok(word_fid_docids_reader)
}

View File

@ -1,17 +1,15 @@
use std::collections::{BTreeMap, VecDeque};
use std::cmp::Ordering;
use std::collections::{BinaryHeap, HashMap};
use std::fs::File;
use std::{cmp, io};
use obkv::KvReaderU16;
use std::{cmp, io, mem, str, vec};
use super::helpers::{
create_sorter, create_writer, merge_deladd_cbo_roaring_bitmaps, try_split_array_at,
writer_into_reader, GrenadParameters, MergeFn,
create_sorter, merge_cbo_roaring_bitmaps, read_u32_ne_bytes, sorter_into_reader,
try_split_array_at, GrenadParameters, MergeFn,
};
use crate::error::SerializationError;
use crate::index::db_name::DOCID_WORD_POSITIONS;
use crate::proximity::{index_proximity, MAX_DISTANCE};
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
use crate::proximity::{positions_proximity, MAX_DISTANCE};
use crate::{DocumentId, Result};
/// Extracts the best proximity between pairs of words and the documents ids where this pair appear.
@ -27,138 +25,58 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
let max_memory = indexer.max_memory_by_thread();
let mut word_pair_proximity_docids_sorters: Vec<_> = (1..MAX_DISTANCE)
.into_iter()
.map(|_| {
create_sorter(
grenad::SortAlgorithm::Unstable,
merge_deladd_cbo_roaring_bitmaps,
indexer.chunk_compression_type,
indexer.chunk_compression_level,
indexer.max_nb_chunks,
max_memory.map(|m| m / MAX_DISTANCE as usize),
)
})
.collect();
let mut word_pair_proximity_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable,
merge_cbo_roaring_bitmaps,
indexer.chunk_compression_type,
indexer.chunk_compression_level,
indexer.max_nb_chunks,
max_memory.map(|m| m / 2),
);
let mut del_word_positions: VecDeque<(String, u16)> =
VecDeque::with_capacity(MAX_DISTANCE as usize);
let mut add_word_positions: VecDeque<(String, u16)> =
VecDeque::with_capacity(MAX_DISTANCE as usize);
let mut del_word_pair_proximity = BTreeMap::new();
let mut add_word_pair_proximity = BTreeMap::new();
// This map is assumed to not consume a lot of memory.
let mut document_word_positions_heap = BinaryHeap::new();
let mut current_document_id = None;
let mut cursor = docid_word_positions.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? {
let (document_id_bytes, _fid_bytes) = try_split_array_at(key)
let (document_id_bytes, word_bytes) = try_split_array_at(key)
.ok_or(SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?;
let document_id = u32::from_be_bytes(document_id_bytes);
let word = str::from_utf8(word_bytes)?;
// if we change document, we fill the sorter
if current_document_id.map_or(false, |id| id != document_id) {
puffin::profile_scope!("Document into sorter");
let curr_document_id = *current_document_id.get_or_insert(document_id);
if curr_document_id != document_id {
let document_word_positions_heap = mem::take(&mut document_word_positions_heap);
document_word_positions_into_sorter(
current_document_id.unwrap(),
&del_word_pair_proximity,
&add_word_pair_proximity,
&mut word_pair_proximity_docids_sorters,
curr_document_id,
document_word_positions_heap,
&mut word_pair_proximity_docids_sorter,
)?;
del_word_pair_proximity.clear();
add_word_pair_proximity.clear();
current_document_id = Some(document_id);
}
current_document_id = Some(document_id);
let (del, add): (Result<_>, Result<_>) = rayon::join(
|| {
// deletions
if let Some(deletion) = KvReaderDelAdd::new(&value).get(DelAdd::Deletion) {
for (position, word) in KvReaderU16::new(deletion).iter() {
// drain the proximity window until the head word is considered close to the word we are inserting.
while del_word_positions.get(0).map_or(false, |(_w, p)| {
index_proximity(*p as u32, position as u32) >= MAX_DISTANCE
}) {
word_positions_into_word_pair_proximity(
&mut del_word_positions,
&mut del_word_pair_proximity,
)?;
}
// insert the new word.
let word = std::str::from_utf8(word)?;
del_word_positions.push_back((word.to_string(), position));
}
while !del_word_positions.is_empty() {
word_positions_into_word_pair_proximity(
&mut del_word_positions,
&mut del_word_pair_proximity,
)?;
}
}
Ok(())
},
|| {
// additions
if let Some(addition) = KvReaderDelAdd::new(&value).get(DelAdd::Addition) {
for (position, word) in KvReaderU16::new(addition).iter() {
// drain the proximity window until the head word is considered close to the word we are inserting.
while add_word_positions.get(0).map_or(false, |(_w, p)| {
index_proximity(*p as u32, position as u32) >= MAX_DISTANCE
}) {
word_positions_into_word_pair_proximity(
&mut add_word_positions,
&mut add_word_pair_proximity,
)?;
}
// insert the new word.
let word = std::str::from_utf8(word)?;
add_word_positions.push_back((word.to_string(), position));
}
while !add_word_positions.is_empty() {
word_positions_into_word_pair_proximity(
&mut add_word_positions,
&mut add_word_pair_proximity,
)?;
}
}
Ok(())
},
);
del?;
add?;
let word = word.to_string();
let mut positions: Vec<_> = read_u32_ne_bytes(value).collect();
positions.sort_unstable();
let mut iter = positions.into_iter();
if let Some(position) = iter.next() {
document_word_positions_heap.push(PeekedWordPosition { word, position, iter });
}
}
if let Some(document_id) = current_document_id {
puffin::profile_scope!("Final document into sorter");
// We must make sure that don't lose the current document field id
// word count map if we break because we reached the end of the chunk.
let document_word_positions_heap = mem::take(&mut document_word_positions_heap);
document_word_positions_into_sorter(
document_id,
&del_word_pair_proximity,
&add_word_pair_proximity,
&mut word_pair_proximity_docids_sorters,
document_word_positions_heap,
&mut word_pair_proximity_docids_sorter,
)?;
}
{
puffin::profile_scope!("sorter_into_reader");
let mut writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
for sorter in word_pair_proximity_docids_sorters {
sorter.write_into_stream_writer(&mut writer)?;
}
writer_into_reader(writer)
}
sorter_into_reader(word_pair_proximity_docids_sorter, indexer)
}
/// Fills the list of all pairs of words with the shortest proximity between 1 and 7 inclusive.
@ -167,66 +85,96 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
/// close to each other.
fn document_word_positions_into_sorter(
document_id: DocumentId,
del_word_pair_proximity: &BTreeMap<(String, String), u8>,
add_word_pair_proximity: &BTreeMap<(String, String), u8>,
word_pair_proximity_docids_sorters: &mut Vec<grenad::Sorter<MergeFn>>,
mut word_positions_heap: BinaryHeap<PeekedWordPosition<vec::IntoIter<u32>>>,
word_pair_proximity_docids_sorter: &mut grenad::Sorter<MergeFn>,
) -> Result<()> {
use itertools::merge_join_by;
use itertools::EitherOrBoth::{Both, Left, Right};
let mut word_pair_proximity = HashMap::new();
let mut ordered_peeked_word_positions = Vec::new();
while !word_positions_heap.is_empty() {
while let Some(peeked_word_position) = word_positions_heap.pop() {
ordered_peeked_word_positions.push(peeked_word_position);
if ordered_peeked_word_positions.len() == 7 {
break;
}
}
if let Some((head, tail)) = ordered_peeked_word_positions.split_first() {
for PeekedWordPosition { word, position, .. } in tail {
let prox = positions_proximity(head.position, *position);
if prox > 0 && prox < MAX_DISTANCE {
word_pair_proximity
.entry((head.word.clone(), word.clone()))
.and_modify(|p| {
*p = cmp::min(*p, prox);
})
.or_insert(prox);
}
}
// Push the tail in the heap.
let tail_iter = ordered_peeked_word_positions.drain(1..);
word_positions_heap.extend(tail_iter);
// Advance the head and push it in the heap.
if let Some(mut head) = ordered_peeked_word_positions.pop() {
if let Some(next_position) = head.iter.next() {
let prox = positions_proximity(head.position, next_position);
if prox > 0 && prox < MAX_DISTANCE {
word_pair_proximity
.entry((head.word.clone(), head.word.clone()))
.and_modify(|p| {
*p = cmp::min(*p, prox);
})
.or_insert(prox);
}
word_positions_heap.push(PeekedWordPosition {
word: head.word,
position: next_position,
iter: head.iter,
});
}
}
}
}
let mut buffer = Vec::new();
let mut key_buffer = Vec::new();
for eob in
merge_join_by(del_word_pair_proximity.iter(), add_word_pair_proximity.iter(), |d, a| {
d.cmp(a)
})
{
buffer.clear();
let mut value_writer = KvWriterDelAdd::new(&mut buffer);
let ((w1, w2), prox) = match eob {
Left(key_value) => {
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap();
key_value
}
Right(key_value) => {
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
key_value
}
Both(key_value, _) => {
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap();
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
key_value
}
};
for ((w1, w2), prox) in word_pair_proximity {
key_buffer.clear();
key_buffer.push(*prox as u8);
key_buffer.push(prox as u8);
key_buffer.extend_from_slice(w1.as_bytes());
key_buffer.push(0);
key_buffer.extend_from_slice(w2.as_bytes());
word_pair_proximity_docids_sorters[*prox as usize - 1]
.insert(&key_buffer, value_writer.into_inner().unwrap())?;
word_pair_proximity_docids_sorter.insert(&key_buffer, document_id.to_ne_bytes())?;
}
Ok(())
}
fn word_positions_into_word_pair_proximity(
word_positions: &mut VecDeque<(String, u16)>,
word_pair_proximity: &mut BTreeMap<(String, String), u8>,
) -> Result<()> {
let (head_word, head_position) = word_positions.pop_front().unwrap();
for (word, position) in word_positions.iter() {
let prox = index_proximity(head_position as u32, *position as u32) as u8;
if prox > 0 && prox < MAX_DISTANCE as u8 {
word_pair_proximity
.entry((head_word.clone(), word.clone()))
.and_modify(|p| {
*p = cmp::min(*p, prox);
})
.or_insert(prox);
}
}
Ok(())
struct PeekedWordPosition<I> {
word: String,
position: u32,
iter: I,
}
impl<I> Ord for PeekedWordPosition<I> {
fn cmp(&self, other: &Self) -> Ordering {
self.position.cmp(&other.position).reverse()
}
}
impl<I> PartialOrd for PeekedWordPosition<I> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<I> Eq for PeekedWordPosition<I> {}
impl<I> PartialEq for PeekedWordPosition<I> {
fn eq(&self, other: &Self) -> bool {
self.position == other.position
}
}

View File

@ -1,18 +1,13 @@
use std::collections::BTreeSet;
use std::fs::File;
use std::io;
use obkv::KvReaderU16;
use super::helpers::{
create_sorter, merge_deladd_cbo_roaring_bitmaps, sorter_into_reader, try_split_array_at,
GrenadParameters,
create_sorter, merge_cbo_roaring_bitmaps, read_u32_ne_bytes, sorter_into_reader,
try_split_array_at, GrenadParameters,
};
use crate::error::SerializationError;
use crate::index::db_name::DOCID_WORD_POSITIONS;
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
use crate::update::MergeFn;
use crate::{bucketed_position, DocumentId, Result};
use crate::{bucketed_position, relative_from_absolute_position, DocumentId, Result};
/// Extracts the word positions and the documents ids where this word appear.
///
@ -29,110 +24,32 @@ pub fn extract_word_position_docids<R: io::Read + io::Seek>(
let mut word_position_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable,
merge_deladd_cbo_roaring_bitmaps,
merge_cbo_roaring_bitmaps,
indexer.chunk_compression_type,
indexer.chunk_compression_level,
indexer.max_nb_chunks,
max_memory,
);
let mut del_word_positions: BTreeSet<(u16, Vec<u8>)> = BTreeSet::new();
let mut add_word_positions: BTreeSet<(u16, Vec<u8>)> = BTreeSet::new();
let mut current_document_id: Option<u32> = None;
let mut key_buffer = Vec::new();
let mut cursor = docid_word_positions.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? {
let (document_id_bytes, _fid_bytes) = try_split_array_at(key)
let (document_id_bytes, word_bytes) = try_split_array_at(key)
.ok_or(SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?;
let document_id = DocumentId::from_be_bytes(document_id_bytes);
if current_document_id.map_or(false, |id| document_id != id) {
words_position_into_sorter(
current_document_id.unwrap(),
&mut key_buffer,
&del_word_positions,
&add_word_positions,
&mut word_position_docids_sorter,
)?;
del_word_positions.clear();
add_word_positions.clear();
}
current_document_id = Some(document_id);
let del_add_reader = KvReaderDelAdd::new(&value);
// extract all unique words to remove.
if let Some(deletion) = del_add_reader.get(DelAdd::Deletion) {
for (position, word_bytes) in KvReaderU16::new(deletion).iter() {
let position = bucketed_position(position);
del_word_positions.insert((position, word_bytes.to_vec()));
}
}
// extract all unique additional words.
if let Some(addition) = del_add_reader.get(DelAdd::Addition) {
for (position, word_bytes) in KvReaderU16::new(addition).iter() {
let position = bucketed_position(position);
add_word_positions.insert((position, word_bytes.to_vec()));
}
for position in read_u32_ne_bytes(value) {
key_buffer.clear();
key_buffer.extend_from_slice(word_bytes);
key_buffer.push(0);
let (_, position) = relative_from_absolute_position(position);
let position = bucketed_position(position);
key_buffer.extend_from_slice(&position.to_be_bytes());
word_position_docids_sorter.insert(&key_buffer, document_id.to_ne_bytes())?;
}
}
if let Some(document_id) = current_document_id {
words_position_into_sorter(
document_id,
&mut key_buffer,
&del_word_positions,
&add_word_positions,
&mut word_position_docids_sorter,
)?;
}
// TODO remove noop DelAdd OBKV
let word_position_docids_reader = sorter_into_reader(word_position_docids_sorter, indexer)?;
Ok(word_position_docids_reader)
}
fn words_position_into_sorter(
document_id: DocumentId,
key_buffer: &mut Vec<u8>,
del_word_positions: &BTreeSet<(u16, Vec<u8>)>,
add_word_positions: &BTreeSet<(u16, Vec<u8>)>,
word_position_docids_sorter: &mut grenad::Sorter<MergeFn>,
) -> Result<()> {
puffin::profile_function!();
use itertools::merge_join_by;
use itertools::EitherOrBoth::{Both, Left, Right};
let mut buffer = Vec::new();
for eob in merge_join_by(del_word_positions.iter(), add_word_positions.iter(), |d, a| d.cmp(a))
{
buffer.clear();
let mut value_writer = KvWriterDelAdd::new(&mut buffer);
let (position, word_bytes) = match eob {
Left(key) => {
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap();
key
}
Right(key) => {
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
key
}
Both(key, _) => {
value_writer.insert(DelAdd::Deletion, document_id.to_ne_bytes()).unwrap();
value_writer.insert(DelAdd::Addition, document_id.to_ne_bytes()).unwrap();
key
}
};
key_buffer.clear();
key_buffer.extend_from_slice(word_bytes);
key_buffer.push(0);
key_buffer.extend_from_slice(&position.to_be_bytes());
word_position_docids_sorter.insert(&key_buffer, value_writer.into_inner().unwrap())?;
}
Ok(())
}

View File

@ -6,6 +6,7 @@ mod extract_fid_word_count_docids;
mod extract_geo_points;
mod extract_vector_points;
mod extract_word_docids;
mod extract_word_fid_docids;
mod extract_word_pair_proximity_docids;
mod extract_word_position_docids;
@ -24,11 +25,12 @@ use self::extract_fid_word_count_docids::extract_fid_word_count_docids;
use self::extract_geo_points::extract_geo_points;
use self::extract_vector_points::extract_vector_points;
use self::extract_word_docids::extract_word_docids;
use self::extract_word_fid_docids::extract_word_fid_docids;
use self::extract_word_pair_proximity_docids::extract_word_pair_proximity_docids;
use self::extract_word_position_docids::extract_word_position_docids;
use super::helpers::{
as_cloneable_grenad, merge_cbo_roaring_bitmaps, CursorClonableMmap, GrenadParameters, MergeFn,
MergeableReader,
as_cloneable_grenad, merge_cbo_roaring_bitmaps, merge_roaring_bitmaps, CursorClonableMmap,
GrenadParameters, MergeFn, MergeableReader,
};
use super::{helpers, TypedChunk};
use crate::{FieldId, Result};
@ -91,9 +93,9 @@ pub(crate) fn data_from_obkv_documents(
let (
docid_word_positions_chunks,
(
fid_docid_facet_numbers_chunks,
docid_fid_facet_numbers_chunks,
(
fid_docid_facet_strings_chunks,
docid_fid_facet_strings_chunks,
(
facet_is_null_docids_chunks,
(facet_is_empty_docids_chunks, facet_exists_docids_chunks),
@ -170,22 +172,15 @@ pub(crate) fn data_from_obkv_documents(
"field-id-wordcount-docids",
);
spawn_extraction_task::<
_,
_,
Vec<(grenad::Reader<File>, grenad::Reader<File>, grenad::Reader<File>)>,
>(
spawn_extraction_task::<_, _, Vec<(grenad::Reader<File>, grenad::Reader<File>)>>(
docid_word_positions_chunks.clone(),
indexer,
lmdb_writer_sx.clone(),
move |doc_word_pos, indexer| extract_word_docids(doc_word_pos, indexer, &exact_attributes),
merge_cbo_roaring_bitmaps,
|(word_docids_reader, exact_word_docids_reader, word_fid_docids_reader)| {
TypedChunk::WordDocids {
word_docids_reader,
exact_word_docids_reader,
word_fid_docids_reader,
}
merge_roaring_bitmaps,
|(word_docids_reader, exact_word_docids_reader)| TypedChunk::WordDocids {
word_docids_reader,
exact_word_docids_reader,
},
"word-docids",
);
@ -199,9 +194,18 @@ pub(crate) fn data_from_obkv_documents(
TypedChunk::WordPositionDocids,
"word-position-docids",
);
spawn_extraction_task::<_, _, Vec<grenad::Reader<File>>>(
docid_word_positions_chunks,
indexer,
lmdb_writer_sx.clone(),
extract_word_fid_docids,
merge_cbo_roaring_bitmaps,
TypedChunk::WordFidDocids,
"word-fid-docids",
);
spawn_extraction_task::<_, _, Vec<grenad::Reader<File>>>(
fid_docid_facet_strings_chunks,
docid_fid_facet_strings_chunks,
indexer,
lmdb_writer_sx.clone(),
extract_facet_string_docids,
@ -211,7 +215,7 @@ pub(crate) fn data_from_obkv_documents(
);
spawn_extraction_task::<_, _, Vec<grenad::Reader<File>>>(
fid_docid_facet_numbers_chunks,
docid_fid_facet_numbers_chunks,
indexer,
lmdb_writer_sx,
extract_facet_number_docids,
@ -344,7 +348,7 @@ fn send_and_extract_flattened_documents_data(
});
}
let (docid_word_positions_chunk, fid_docid_facet_values_chunks): (Result<_>, Result<_>) =
let (docid_word_positions_chunk, docid_fid_facet_values_chunks): (Result<_>, Result<_>) =
rayon::join(
|| {
let (documents_ids, docid_word_positions_chunk, script_language_pair) =
@ -372,8 +376,8 @@ fn send_and_extract_flattened_documents_data(
},
|| {
let ExtractedFacetValues {
fid_docid_facet_numbers_chunk,
fid_docid_facet_strings_chunk,
docid_fid_facet_numbers_chunk,
docid_fid_facet_strings_chunk,
fid_facet_is_null_docids_chunk,
fid_facet_is_empty_docids_chunk,
fid_facet_exists_docids_chunk,
@ -384,26 +388,26 @@ fn send_and_extract_flattened_documents_data(
geo_fields_ids,
)?;
// send fid_docid_facet_numbers_chunk to DB writer
let fid_docid_facet_numbers_chunk =
unsafe { as_cloneable_grenad(&fid_docid_facet_numbers_chunk)? };
// send docid_fid_facet_numbers_chunk to DB writer
let docid_fid_facet_numbers_chunk =
unsafe { as_cloneable_grenad(&docid_fid_facet_numbers_chunk)? };
let _ = lmdb_writer_sx.send(Ok(TypedChunk::FieldIdDocidFacetNumbers(
fid_docid_facet_numbers_chunk.clone(),
docid_fid_facet_numbers_chunk.clone(),
)));
// send fid_docid_facet_strings_chunk to DB writer
let fid_docid_facet_strings_chunk =
unsafe { as_cloneable_grenad(&fid_docid_facet_strings_chunk)? };
// send docid_fid_facet_strings_chunk to DB writer
let docid_fid_facet_strings_chunk =
unsafe { as_cloneable_grenad(&docid_fid_facet_strings_chunk)? };
let _ = lmdb_writer_sx.send(Ok(TypedChunk::FieldIdDocidFacetStrings(
fid_docid_facet_strings_chunk.clone(),
docid_fid_facet_strings_chunk.clone(),
)));
Ok((
fid_docid_facet_numbers_chunk,
docid_fid_facet_numbers_chunk,
(
fid_docid_facet_strings_chunk,
docid_fid_facet_strings_chunk,
(
fid_facet_is_null_docids_chunk,
(fid_facet_is_empty_docids_chunk, fid_facet_exists_docids_chunk),
@ -413,5 +417,5 @@ fn send_and_extract_flattened_documents_data(
},
);
Ok((docid_word_positions_chunk?, fid_docid_facet_values_chunks?))
Ok((docid_word_positions_chunk?, docid_fid_facet_values_chunks?))
}

View File

@ -54,7 +54,6 @@ pub fn sorter_into_reader(
sorter: grenad::Sorter<MergeFn>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
puffin::profile_function!();
let mut writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
@ -114,22 +113,6 @@ impl MergeableReader for Vec<(grenad::Reader<File>, grenad::Reader<File>)> {
}
}
impl MergeableReader for Vec<(grenad::Reader<File>, grenad::Reader<File>, grenad::Reader<File>)> {
type Output = (grenad::Reader<File>, grenad::Reader<File>, grenad::Reader<File>);
fn merge(self, merge_fn: MergeFn, params: &GrenadParameters) -> Result<Self::Output> {
let mut m1 = MergerBuilder::new(merge_fn);
let mut m2 = MergerBuilder::new(merge_fn);
let mut m3 = MergerBuilder::new(merge_fn);
for (r1, r2, r3) in self.into_iter() {
m1.push(r1)?;
m2.push(r2)?;
m3.push(r3)?;
}
Ok((m1.finish(params)?, m2.finish(params)?, m3.finish(params)?))
}
}
struct MergerBuilder<R>(grenad::MergerBuilder<R, MergeFn>);
impl<R: io::Read + io::Seek> MergerBuilder<R> {

View File

@ -6,13 +6,11 @@ use std::result::Result as StdResult;
use roaring::RoaringBitmap;
use crate::heed_codec::CboRoaringBitmapCodec;
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
use crate::update::index_documents::transform::Operation;
use crate::Result;
pub type MergeFn = for<'a> fn(&[u8], &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>>;
#[allow(unused)]
pub fn concat_u32s_array<'a>(_key: &[u8], values: &[Cow<'a, [u8]>]) -> Result<Cow<'a, [u8]>> {
if values.len() == 1 {
Ok(values[0].clone())
@ -77,123 +75,57 @@ pub fn keep_latest_obkv<'a>(_key: &[u8], obkvs: &[Cow<'a, [u8]>]) -> Result<Cow<
Ok(obkvs.last().unwrap().clone())
}
pub fn merge_two_del_add_obkvs(
base: obkv::KvReaderU16,
update: obkv::KvReaderU16,
merge_additions: bool,
buffer: &mut Vec<u8>,
) {
pub fn merge_two_obkvs(base: obkv::KvReaderU16, update: obkv::KvReaderU16, buffer: &mut Vec<u8>) {
use itertools::merge_join_by;
use itertools::EitherOrBoth::{Both, Left, Right};
buffer.clear();
let mut writer = obkv::KvWriter::new(buffer);
let mut value_buffer = Vec::new();
for eob in merge_join_by(base.iter(), update.iter(), |(b, _), (u, _)| b.cmp(u)) {
match eob {
Left((k, v)) => {
if merge_additions {
writer.insert(k, v).unwrap()
} else {
// If merge_additions is false, recreate an obkv keeping the deletions only.
value_buffer.clear();
let mut value_writer = KvWriterDelAdd::new(&mut value_buffer);
let base_reader = KvReaderDelAdd::new(v);
if let Some(deletion) = base_reader.get(DelAdd::Deletion) {
value_writer.insert(DelAdd::Deletion, deletion).unwrap();
value_writer.finish().unwrap();
writer.insert(k, &value_buffer).unwrap()
}
}
}
Right((k, v)) => writer.insert(k, v).unwrap(),
Both((k, base), (_, update)) => {
// merge deletions and additions.
value_buffer.clear();
let mut value_writer = KvWriterDelAdd::new(&mut value_buffer);
let base_reader = KvReaderDelAdd::new(base);
let update_reader = KvReaderDelAdd::new(update);
// keep newest deletion.
if let Some(deletion) = update_reader
.get(DelAdd::Deletion)
.or_else(|| base_reader.get(DelAdd::Deletion))
{
value_writer.insert(DelAdd::Deletion, deletion).unwrap();
}
// keep base addition only if merge_additions is true.
let base_addition =
merge_additions.then(|| base_reader.get(DelAdd::Addition)).flatten();
// keep newest addition.
// TODO use or_else
if let Some(addition) = update_reader.get(DelAdd::Addition).or(base_addition) {
value_writer.insert(DelAdd::Addition, addition).unwrap();
}
value_writer.finish().unwrap();
writer.insert(k, &value_buffer).unwrap()
}
Both(_, (k, v)) | Left((k, v)) | Right((k, v)) => writer.insert(k, v).unwrap(),
}
}
writer.finish().unwrap();
}
/// Merge all the obkvs from the newest to the oldest.
fn inner_merge_del_add_obkvs<'a>(
/// Merge all the obks in the order we see them.
pub fn merge_obkvs_and_operations<'a>(
_key: &[u8],
obkvs: &[Cow<'a, [u8]>],
merge_additions: bool,
) -> Result<Cow<'a, [u8]>> {
// pop the newest operation from the list.
let (newest, obkvs) = obkvs.split_last().unwrap();
// keep the operation type for the returned value.
let newest_operation_type = newest[0];
// [add, add, delete, add, add]
// we can ignore everything that happened before the last delete.
let starting_position =
obkvs.iter().rposition(|obkv| obkv[0] == Operation::Deletion as u8).unwrap_or(0);
// treat the newest obkv as the starting point of the merge.
let mut acc_operation_type = newest_operation_type;
let mut acc = newest[1..].to_vec();
let mut buffer = Vec::new();
// reverse iter from the most recent to the oldest.
for current in obkvs.into_iter().rev() {
// if in the previous iteration there was a complete deletion,
// stop the merge process.
if acc_operation_type == Operation::Deletion as u8 {
break;
}
let newest = obkv::KvReader::new(&acc);
let oldest = obkv::KvReader::new(&current[1..]);
merge_two_del_add_obkvs(oldest, newest, merge_additions, &mut buffer);
// we want the result of the merge into our accumulator.
std::mem::swap(&mut acc, &mut buffer);
acc_operation_type = current[0];
// [add, add, delete]
// if the last operation was a deletion then we simply return the deletion
if starting_position == obkvs.len() - 1 && obkvs.last().unwrap()[0] == Operation::Deletion as u8
{
return Ok(obkvs[obkvs.len() - 1].clone());
}
let mut buffer = Vec::new();
acc.insert(0, newest_operation_type);
Ok(Cow::from(acc))
// (add, add, delete) [add, add]
// in the other case, no deletion will be encountered during the merge
let mut ret =
obkvs[starting_position..].iter().cloned().fold(Vec::new(), |mut acc, current| {
let first = obkv::KvReader::new(&acc);
let second = obkv::KvReader::new(&current[1..]);
merge_two_obkvs(first, second, &mut buffer);
// we want the result of the merge into our accumulator
std::mem::swap(&mut acc, &mut buffer);
acc
});
ret.insert(0, Operation::Addition as u8);
Ok(Cow::from(ret))
}
/// Merge all the obkvs from the newest to the oldest.
pub fn obkvs_merge_additions_and_deletions<'a>(
_key: &[u8],
obkvs: &[Cow<'a, [u8]>],
) -> Result<Cow<'a, [u8]>> {
inner_merge_del_add_obkvs(obkvs, true)
}
/// Merge all the obkvs deletions from the newest to the oldest and keep only the newest additions.
pub fn obkvs_keep_last_addition_merge_deletions<'a>(
_key: &[u8],
obkvs: &[Cow<'a, [u8]>],
) -> Result<Cow<'a, [u8]>> {
inner_merge_del_add_obkvs(obkvs, false)
}
/// Do a union of all the CboRoaringBitmaps in the values.
pub fn merge_cbo_roaring_bitmaps<'a>(
_key: &[u8],
values: &[Cow<'a, [u8]>],
@ -206,36 +138,3 @@ pub fn merge_cbo_roaring_bitmaps<'a>(
Ok(Cow::from(vec))
}
}
/// Do a union of CboRoaringBitmaps on both sides of a DelAdd obkv
/// separately and outputs a new DelAdd with both unions.
pub fn merge_deladd_cbo_roaring_bitmaps<'a>(
_key: &[u8],
values: &[Cow<'a, [u8]>],
) -> Result<Cow<'a, [u8]>> {
if values.len() == 1 {
Ok(values[0].clone())
} else {
// Retrieve the bitmaps from both sides
let mut del_bitmaps_bytes = Vec::new();
let mut add_bitmaps_bytes = Vec::new();
for value in values {
let obkv = KvReaderDelAdd::new(value);
if let Some(bitmap_bytes) = obkv.get(DelAdd::Deletion) {
del_bitmaps_bytes.push(bitmap_bytes);
}
if let Some(bitmap_bytes) = obkv.get(DelAdd::Addition) {
add_bitmaps_bytes.push(bitmap_bytes);
}
}
let mut output_deladd_obkv = KvWriterDelAdd::memory();
let mut buffer = Vec::new();
CboRoaringBitmapCodec::merge_into(del_bitmaps_bytes, &mut buffer)?;
output_deladd_obkv.insert(DelAdd::Deletion, &buffer)?;
buffer.clear();
CboRoaringBitmapCodec::merge_into(add_bitmaps_bytes, &mut buffer)?;
output_deladd_obkv.insert(DelAdd::Addition, &buffer)?;
output_deladd_obkv.into_inner().map(Cow::from).map_err(Into::into)
}
}

View File

@ -14,8 +14,7 @@ pub use grenad_helpers::{
};
pub use merge_functions::{
concat_u32s_array, keep_first, keep_latest_obkv, merge_btreeset_string,
merge_cbo_roaring_bitmaps, merge_deladd_cbo_roaring_bitmaps, merge_roaring_bitmaps,
obkvs_keep_last_addition_merge_deletions, obkvs_merge_additions_and_deletions,
merge_cbo_roaring_bitmaps, merge_obkvs_and_operations, merge_roaring_bitmaps, merge_two_obkvs,
serialize_roaring_bitmap, MergeFn,
};
@ -45,7 +44,6 @@ where
Some((head, tail))
}
#[allow(unused)]
pub fn read_u32_ne_bytes(bytes: &[u8]) -> impl Iterator<Item = u32> + '_ {
bytes.chunks_exact(4).flat_map(TryInto::try_into).map(u32::from_ne_bytes)
}

View File

@ -38,7 +38,7 @@ use crate::update::{
self, DeletionStrategy, IndexerConfig, PrefixWordPairsProximityDocids, UpdateIndexingStep,
WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst,
};
use crate::{CboRoaringBitmapCodec, Index, Result};
use crate::{Index, Result, RoaringBitmapCodec};
static MERGED_DATABASE_COUNT: usize = 7;
static PREFIX_DATABASE_COUNT: usize = 5;
@ -406,23 +406,13 @@ where
}
let typed_chunk = match result? {
TypedChunk::WordDocids {
word_docids_reader,
exact_word_docids_reader,
word_fid_docids_reader,
} => {
TypedChunk::WordDocids { word_docids_reader, exact_word_docids_reader } => {
let cloneable_chunk = unsafe { as_cloneable_grenad(&word_docids_reader)? };
word_docids = Some(cloneable_chunk);
let cloneable_chunk =
unsafe { as_cloneable_grenad(&exact_word_docids_reader)? };
exact_word_docids = Some(cloneable_chunk);
let cloneable_chunk = unsafe { as_cloneable_grenad(&word_fid_docids_reader)? };
word_fid_docids = Some(cloneable_chunk);
TypedChunk::WordDocids {
word_docids_reader,
exact_word_docids_reader,
word_fid_docids_reader,
}
TypedChunk::WordDocids { word_docids_reader, exact_word_docids_reader }
}
TypedChunk::WordPairProximityDocids(chunk) => {
let cloneable_chunk = unsafe { as_cloneable_grenad(&chunk)? };
@ -434,6 +424,11 @@ where
word_position_docids = Some(cloneable_chunk);
TypedChunk::WordPositionDocids(chunk)
}
TypedChunk::WordFidDocids(chunk) => {
let cloneable_chunk = unsafe { as_cloneable_grenad(&chunk)? };
word_fid_docids = Some(cloneable_chunk);
TypedChunk::WordFidDocids(chunk)
}
otherwise => otherwise,
};
@ -475,14 +470,13 @@ where
let all_documents_ids = index_documents_ids | new_documents_ids;
self.index.put_documents_ids(self.wtxn, &all_documents_ids)?;
// TODO: reactivate prefix DB with diff-indexing
// self.execute_prefix_databases(
// word_docids,
// exact_word_docids,
// word_pair_proximity_docids,
// word_position_docids,
// word_fid_docids,
// )?;
self.execute_prefix_databases(
word_docids,
exact_word_docids,
word_pair_proximity_docids,
word_position_docids,
word_fid_docids,
)?;
Ok(all_documents_ids.len())
}
@ -696,8 +690,8 @@ where
fn execute_word_prefix_docids(
txn: &mut heed::RwTxn,
reader: grenad::Reader<Cursor<ClonableMmap>>,
word_docids_db: Database<Str, CboRoaringBitmapCodec>,
word_prefix_docids_db: Database<Str, CboRoaringBitmapCodec>,
word_docids_db: Database<Str, RoaringBitmapCodec>,
word_prefix_docids_db: Database<Str, RoaringBitmapCodec>,
indexer_config: &IndexerConfig,
new_prefix_fst_words: &[String],
common_prefix_fst_words: &[&[String]],

View File

@ -7,20 +7,18 @@ use std::io::{Read, Seek};
use fxhash::FxHashMap;
use heed::RoTxn;
use itertools::Itertools;
use obkv::{KvReader, KvReaderU16, KvWriter};
use obkv::{KvReader, KvWriter};
use roaring::RoaringBitmap;
use serde_json::Value;
use smartstring::SmartString;
use super::helpers::{
create_sorter, create_writer, obkvs_keep_last_addition_merge_deletions,
obkvs_merge_additions_and_deletions, MergeFn,
create_sorter, create_writer, keep_latest_obkv, merge_obkvs_and_operations, MergeFn,
};
use super::{IndexDocumentsMethod, IndexerConfig};
use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader};
use crate::error::{Error, InternalError, UserError};
use crate::index::{db_name, main_key};
use crate::update::del_add::into_del_add_obkv;
use crate::update::{AvailableDocumentsIds, ClearDocuments, UpdateIndexingStep};
use crate::{
FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldsIdsMap, Index, Result, BEU32,
@ -108,8 +106,8 @@ impl<'a, 'i> Transform<'a, 'i> {
// We must choose the appropriate merge function for when two or more documents
// with the same user id must be merged or fully replaced in the same batch.
let merge_function = match index_documents_method {
IndexDocumentsMethod::ReplaceDocuments => obkvs_keep_last_addition_merge_deletions,
IndexDocumentsMethod::UpdateDocuments => obkvs_merge_additions_and_deletions,
IndexDocumentsMethod::ReplaceDocuments => keep_latest_obkv,
IndexDocumentsMethod::UpdateDocuments => merge_obkvs_and_operations,
};
// We initialize the sorter with the user indexing settings.
@ -225,21 +223,19 @@ impl<'a, 'i> Transform<'a, 'i> {
let docid = match self.new_external_documents_ids_builder.entry((*external_id).into()) {
Entry::Occupied(entry) => *entry.get() as u32,
Entry::Vacant(entry) => {
let docid = match external_documents_ids.get(entry.key()) {
Some(docid) => {
// If it was already in the list of replaced documents it means it was deleted
// by the remove_document method. We should starts as if it never existed.
if self.replaced_documents_ids.insert(docid) {
original_docid = Some(docid);
}
docid
// If the document was already in the db we mark it as a replaced document.
// It'll be deleted later.
if let Some(docid) = external_documents_ids.get(entry.key()) {
// If it was already in the list of replaced documents it means it was deleted
// by the remove_document method. We should starts as if it never existed.
if self.replaced_documents_ids.insert(docid) {
original_docid = Some(docid);
}
None => self
.available_documents_ids
.next()
.ok_or(UserError::DocumentLimitReached)?,
};
}
let docid = self
.available_documents_ids
.next()
.ok_or(UserError::DocumentLimitReached)?;
entry.insert(docid as u64);
docid
}
@ -267,28 +263,16 @@ impl<'a, 'i> Transform<'a, 'i> {
skip_insertion = true;
} else {
// we associate the base document with the new key, everything will get merged later.
let keep_original_version =
self.index_documents_method == IndexDocumentsMethod::UpdateDocuments;
document_sorter_buffer.clear();
document_sorter_buffer.push(Operation::Addition as u8);
into_del_add_obkv(
KvReaderU16::new(base_obkv),
true,
keep_original_version,
&mut document_sorter_buffer,
)?;
document_sorter_buffer.extend_from_slice(base_obkv);
self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
match self.flatten_from_fields_ids_map(KvReader::new(base_obkv))? {
Some(flattened_obkv) => {
// we recreate our buffer with the flattened documents
document_sorter_buffer.clear();
document_sorter_buffer.push(Operation::Addition as u8);
into_del_add_obkv(
KvReaderU16::new(&flattened_obkv),
true,
keep_original_version,
&mut document_sorter_buffer,
)?;
document_sorter_buffer.extend_from_slice(&flattened_obkv);
self.flattened_sorter
.insert(docid.to_be_bytes(), &document_sorter_buffer)?
}
@ -304,12 +288,7 @@ impl<'a, 'i> Transform<'a, 'i> {
document_sorter_buffer.clear();
document_sorter_buffer.push(Operation::Addition as u8);
into_del_add_obkv(
KvReaderU16::new(&obkv_buffer),
false,
true,
&mut document_sorter_buffer,
)?;
document_sorter_buffer.extend_from_slice(&obkv_buffer);
// We use the extracted/generated user id as the key for this document.
self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
@ -317,12 +296,7 @@ impl<'a, 'i> Transform<'a, 'i> {
Some(flattened_obkv) => {
document_sorter_buffer.clear();
document_sorter_buffer.push(Operation::Addition as u8);
into_del_add_obkv(
KvReaderU16::new(&flattened_obkv),
false,
true,
&mut document_sorter_buffer,
)?;
document_sorter_buffer.extend_from_slice(&flattened_obkv);
self.flattened_sorter
.insert(docid.to_be_bytes(), &document_sorter_buffer)?
}
@ -380,25 +354,19 @@ impl<'a, 'i> Transform<'a, 'i> {
let external_documents_ids = self.index.external_documents_ids(wtxn)?;
let mut documents_deleted = 0;
let mut document_sorter_buffer = Vec::new();
for to_remove in to_remove {
if should_abort() {
return Err(Error::InternalError(InternalError::AbortedIndexation));
}
// Check if the document has been added in the current indexing process.
let deleted_from_current = match self
.new_external_documents_ids_builder
.entry((*to_remove).into())
{
match self.new_external_documents_ids_builder.entry((*to_remove).into()) {
// if the document was added in a previous iteration of the transform we make it as deleted in the sorters.
Entry::Occupied(entry) => {
let doc_id = *entry.get() as u32;
document_sorter_buffer.clear();
document_sorter_buffer.push(Operation::Deletion as u8);
obkv::KvWriterU16::new(&mut document_sorter_buffer).finish().unwrap();
self.original_sorter.insert(doc_id.to_be_bytes(), &document_sorter_buffer)?;
self.flattened_sorter.insert(doc_id.to_be_bytes(), &document_sorter_buffer)?;
self.original_sorter
.insert(doc_id.to_be_bytes(), [Operation::Deletion as u8])?;
self.flattened_sorter
.insert(doc_id.to_be_bytes(), [Operation::Deletion as u8])?;
// we must NOT update the list of replaced_documents_ids
// Either:
@ -407,69 +375,21 @@ impl<'a, 'i> Transform<'a, 'i> {
// we're removing it there is nothing to do.
self.new_documents_ids.remove(doc_id);
entry.remove_entry();
true
}
Entry::Vacant(_) => false,
};
// If the document was already in the db we mark it as a `to_delete` document.
// Then we push the document in sorters in deletion mode.
let deleted_from_db = match external_documents_ids.get(&to_remove) {
Some(docid) => {
self.replaced_documents_ids.insert(docid);
// fetch the obkv document
let original_key = BEU32::new(docid);
let base_obkv = self
.index
.documents
.remap_data_type::<heed::types::ByteSlice>()
.get(wtxn, &original_key)?
.ok_or(InternalError::DatabaseMissingEntry {
db_name: db_name::DOCUMENTS,
key: None,
})?;
// push it as to delete in the original_sorter
document_sorter_buffer.clear();
document_sorter_buffer.push(Operation::Deletion as u8);
into_del_add_obkv(
KvReaderU16::new(base_obkv),
true,
false,
&mut document_sorter_buffer,
)?;
self.original_sorter.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
// flatten it and push it as to delete in the flattened_sorter
match self.flatten_from_fields_ids_map(KvReader::new(base_obkv))? {
Some(flattened_obkv) => {
// we recreate our buffer with the flattened documents
document_sorter_buffer.clear();
document_sorter_buffer.push(Operation::Deletion as u8);
into_del_add_obkv(
KvReaderU16::new(&flattened_obkv),
true,
false,
&mut document_sorter_buffer,
)?;
self.flattened_sorter
.insert(docid.to_be_bytes(), &document_sorter_buffer)?
}
None => self
.flattened_sorter
.insert(docid.to_be_bytes(), &document_sorter_buffer)?,
Entry::Vacant(entry) => {
// If the document was already in the db we mark it as a `to_delete` document.
// It'll be deleted later. We don't need to push anything to the sorters.
if let Some(docid) = external_documents_ids.get(entry.key()) {
self.replaced_documents_ids.insert(docid);
} else {
// if the document is nowehere to be found, there is nothing to do and we must NOT
// increment the count of documents_deleted
continue;
}
true
}
None => false,
};
// increase counter only if the document existed somewhere before.
if deleted_from_current || deleted_from_db {
documents_deleted += 1;
}
documents_deleted += 1;
}
Ok(documents_deleted)
@ -669,7 +589,9 @@ impl<'a, 'i> Transform<'a, 'i> {
let mut documents_count = 0;
while let Some((key, val)) = iter.next()? {
// skip first byte corresponding to the operation type (Deletion or Addition).
if val[0] == Operation::Deletion as u8 {
continue;
}
let val = &val[1..];
// send a callback to show at which step we are
@ -709,7 +631,9 @@ impl<'a, 'i> Transform<'a, 'i> {
// We get rids of the `Operation` byte and skip the deleted documents as well.
let mut iter = self.flattened_sorter.into_stream_merger_iter()?;
while let Some((key, val)) = iter.next()? {
// skip first byte corresponding to the operation type (Deletion or Addition).
if val[0] == Operation::Deletion as u8 {
continue;
}
let val = &val[1..];
writer.insert(key, val)?;
}
@ -787,7 +711,6 @@ impl<'a, 'i> Transform<'a, 'i> {
);
let mut obkv_buffer = Vec::new();
let mut document_sorter_buffer = Vec::new();
for result in self.index.all_documents(wtxn)? {
let (docid, obkv) = result?;
@ -802,9 +725,7 @@ impl<'a, 'i> Transform<'a, 'i> {
}
let buffer = obkv_writer.into_inner()?;
document_sorter_buffer.clear();
into_del_add_obkv(KvReaderU16::new(buffer), false, true, &mut document_sorter_buffer)?;
original_writer.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
original_writer.insert(docid.to_be_bytes(), &buffer)?;
// Once we have the document. We're going to flatten it
// and insert it in the flattened sorter.
@ -839,9 +760,7 @@ impl<'a, 'i> Transform<'a, 'i> {
let value = serde_json::to_vec(&value).map_err(InternalError::SerdeJson)?;
writer.insert(fid, &value)?;
}
document_sorter_buffer.clear();
into_del_add_obkv(KvReaderU16::new(&buffer), false, true, &mut document_sorter_buffer)?;
flattened_writer.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
flattened_writer.insert(docid.to_be_bytes(), &buffer)?;
}
// Once we have written all the documents, we extract
@ -905,86 +824,38 @@ mod test {
#[test]
fn merge_obkvs() {
let mut additive_doc_0 = Vec::new();
let mut deletive_doc_0 = Vec::new();
let mut del_add_doc_0 = Vec::new();
let mut kv_writer = KvWriter::memory();
let mut doc_0 = Vec::new();
let mut kv_writer = KvWriter::new(&mut doc_0);
kv_writer.insert(0_u8, [0]).unwrap();
let buffer = kv_writer.into_inner().unwrap();
into_del_add_obkv(KvReaderU16::new(&buffer), false, true, &mut additive_doc_0).unwrap();
additive_doc_0.insert(0, Operation::Addition as u8);
into_del_add_obkv(KvReaderU16::new(&buffer), true, false, &mut deletive_doc_0).unwrap();
deletive_doc_0.insert(0, Operation::Deletion as u8);
into_del_add_obkv(KvReaderU16::new(&buffer), true, true, &mut del_add_doc_0).unwrap();
del_add_doc_0.insert(0, Operation::Addition as u8);
kv_writer.finish().unwrap();
doc_0.insert(0, Operation::Addition as u8);
let mut additive_doc_1 = Vec::new();
let mut kv_writer = KvWriter::memory();
kv_writer.insert(1_u8, [1]).unwrap();
let buffer = kv_writer.into_inner().unwrap();
into_del_add_obkv(KvReaderU16::new(&buffer), false, true, &mut additive_doc_1).unwrap();
additive_doc_1.insert(0, Operation::Addition as u8);
let ret = merge_obkvs_and_operations(&[], &[Cow::from(doc_0.as_slice())]).unwrap();
assert_eq!(*ret, doc_0);
let mut additive_doc_0_1 = Vec::new();
let mut kv_writer = KvWriter::memory();
kv_writer.insert(0_u8, [0]).unwrap();
kv_writer.insert(1_u8, [1]).unwrap();
let buffer = kv_writer.into_inner().unwrap();
into_del_add_obkv(KvReaderU16::new(&buffer), false, true, &mut additive_doc_0_1).unwrap();
additive_doc_0_1.insert(0, Operation::Addition as u8);
let ret = obkvs_merge_additions_and_deletions(&[], &[Cow::from(additive_doc_0.as_slice())])
.unwrap();
assert_eq!(*ret, additive_doc_0);
let ret = obkvs_merge_additions_and_deletions(
let ret = merge_obkvs_and_operations(
&[],
&[Cow::from(deletive_doc_0.as_slice()), Cow::from(additive_doc_0.as_slice())],
&[Cow::from([Operation::Deletion as u8].as_slice()), Cow::from(doc_0.as_slice())],
)
.unwrap();
assert_eq!(*ret, del_add_doc_0);
assert_eq!(*ret, doc_0);
let ret = obkvs_merge_additions_and_deletions(
let ret = merge_obkvs_and_operations(
&[],
&[Cow::from(additive_doc_0.as_slice()), Cow::from(deletive_doc_0.as_slice())],
&[Cow::from(doc_0.as_slice()), Cow::from([Operation::Deletion as u8].as_slice())],
)
.unwrap();
assert_eq!(*ret, deletive_doc_0);
assert_eq!(*ret, [Operation::Deletion as u8]);
let ret = obkvs_merge_additions_and_deletions(
let ret = merge_obkvs_and_operations(
&[],
&[
Cow::from(additive_doc_1.as_slice()),
Cow::from(deletive_doc_0.as_slice()),
Cow::from(additive_doc_0.as_slice()),
Cow::from([Operation::Addition as u8, 1].as_slice()),
Cow::from([Operation::Deletion as u8].as_slice()),
Cow::from(doc_0.as_slice()),
],
)
.unwrap();
assert_eq!(*ret, del_add_doc_0);
let ret = obkvs_merge_additions_and_deletions(
&[],
&[Cow::from(additive_doc_1.as_slice()), Cow::from(additive_doc_0.as_slice())],
)
.unwrap();
assert_eq!(*ret, additive_doc_0_1);
let ret = obkvs_keep_last_addition_merge_deletions(
&[],
&[Cow::from(additive_doc_1.as_slice()), Cow::from(additive_doc_0.as_slice())],
)
.unwrap();
assert_eq!(*ret, additive_doc_0);
let ret = obkvs_keep_last_addition_merge_deletions(
&[],
&[
Cow::from(deletive_doc_0.as_slice()),
Cow::from(additive_doc_1.as_slice()),
Cow::from(additive_doc_0.as_slice()),
],
)
.unwrap();
assert_eq!(*ret, del_add_doc_0);
assert_eq!(*ret, doc_0);
}
}

View File

@ -32,9 +32,9 @@ pub(crate) enum TypedChunk {
WordDocids {
word_docids_reader: grenad::Reader<File>,
exact_word_docids_reader: grenad::Reader<File>,
word_fid_docids_reader: grenad::Reader<File>,
},
WordPositionDocids(grenad::Reader<File>),
WordFidDocids(grenad::Reader<File>),
WordPairProximityDocids(grenad::Reader<File>),
FieldIdFacetStringDocids(grenad::Reader<File>),
FieldIdFacetNumberDocids(grenad::Reader<File>),
@ -43,7 +43,7 @@ pub(crate) enum TypedChunk {
FieldIdFacetIsEmptyDocids(grenad::Reader<File>),
GeoPoints(grenad::Reader<File>),
VectorPoints(grenad::Reader<File>),
ScriptLanguageDocids(HashMap<(Script, Language), (RoaringBitmap, RoaringBitmap)>),
ScriptLanguageDocids(HashMap<(Script, Language), RoaringBitmap>),
}
impl TypedChunk {
@ -64,19 +64,17 @@ impl TypedChunk {
TypedChunk::NewDocumentsIds(grenad) => {
format!("NewDocumentsIds {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::WordDocids {
word_docids_reader,
exact_word_docids_reader,
word_fid_docids_reader,
} => format!(
"WordDocids {{ word_docids_reader: {}, exact_word_docids_reader: {}, word_fid_docids_reader: {} }}",
TypedChunk::WordDocids { word_docids_reader, exact_word_docids_reader } => format!(
"WordDocids {{ word_docids_reader: {}, exact_word_docids_reader: {} }}",
word_docids_reader.len(),
exact_word_docids_reader.len(),
word_fid_docids_reader.len()
exact_word_docids_reader.len()
),
TypedChunk::WordPositionDocids(grenad) => {
format!("WordPositionDocids {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::WordFidDocids(grenad) => {
format!("WordFidDocids {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::WordPairProximityDocids(grenad) => {
format!("WordPairProximityDocids {{ number_of_entries: {} }}", grenad.len())
}
@ -101,8 +99,8 @@ impl TypedChunk {
TypedChunk::VectorPoints(grenad) => {
format!("VectorPoints {{ number_of_entries: {} }}", grenad.len())
}
TypedChunk::ScriptLanguageDocids(sl_map) => {
format!("ScriptLanguageDocids {{ number_of_entries: {} }}", sl_map.len())
TypedChunk::ScriptLanguageDocids(grenad) => {
format!("ScriptLanguageDocids {{ number_of_entries: {} }}", grenad.len())
}
}
}
@ -140,11 +138,7 @@ pub(crate) fn write_typed_chunk_into_index(
TypedChunk::NewDocumentsIds(documents_ids) => {
return Ok((documents_ids, is_merged_database))
}
TypedChunk::WordDocids {
word_docids_reader,
exact_word_docids_reader,
word_fid_docids_reader,
} => {
TypedChunk::WordDocids { word_docids_reader, exact_word_docids_reader } => {
let word_docids_iter = unsafe { as_cloneable_grenad(&word_docids_reader) }?;
append_entries_into_database(
word_docids_iter.clone(),
@ -152,7 +146,7 @@ pub(crate) fn write_typed_chunk_into_index(
wtxn,
index_is_empty,
|value, _buffer| Ok(value),
merge_cbo_roaring_bitmaps,
merge_roaring_bitmaps,
)?;
let exact_word_docids_iter = unsafe { as_cloneable_grenad(&exact_word_docids_reader) }?;
@ -162,17 +156,7 @@ pub(crate) fn write_typed_chunk_into_index(
wtxn,
index_is_empty,
|value, _buffer| Ok(value),
merge_cbo_roaring_bitmaps,
)?;
let word_fid_docids_iter = unsafe { as_cloneable_grenad(&word_fid_docids_reader) }?;
append_entries_into_database(
word_fid_docids_iter,
&index.word_fid_docids,
wtxn,
index_is_empty,
|value, _buffer| Ok(value),
merge_cbo_roaring_bitmaps,
merge_roaring_bitmaps,
)?;
// create fst from word docids
@ -198,6 +182,17 @@ pub(crate) fn write_typed_chunk_into_index(
)?;
is_merged_database = true;
}
TypedChunk::WordFidDocids(word_fid_docids_iter) => {
append_entries_into_database(
word_fid_docids_iter,
&index.word_fid_docids,
wtxn,
index_is_empty,
|value, _buffer| Ok(value),
merge_cbo_roaring_bitmaps,
)?;
is_merged_database = true;
}
TypedChunk::FieldIdFacetNumberDocids(facet_id_number_docids_iter) => {
let indexer = FacetsUpdate::new(index, FacetType::Number, facet_id_number_docids_iter);
indexer.execute(wtxn)?;
@ -344,25 +339,22 @@ pub(crate) fn write_typed_chunk_into_index(
log::debug!("There are {} entries in the HNSW so far", hnsw_length);
index.put_vector_hnsw(wtxn, &new_hnsw)?;
}
TypedChunk::ScriptLanguageDocids(sl_map) => {
for (key, (deletion, addition)) in sl_map {
let mut db_key_exists = false;
TypedChunk::ScriptLanguageDocids(hash_pair) => {
let mut buffer = Vec::new();
for (key, value) in hash_pair {
buffer.clear();
let final_value = match index.script_language_docids.get(wtxn, &key)? {
Some(db_values) => {
db_key_exists = true;
(db_values - deletion) | addition
let mut db_value_buffer = Vec::new();
serialize_roaring_bitmap(&db_values, &mut db_value_buffer)?;
let mut new_value_buffer = Vec::new();
serialize_roaring_bitmap(&value, &mut new_value_buffer)?;
merge_roaring_bitmaps(&new_value_buffer, &db_value_buffer, &mut buffer)?;
RoaringBitmap::deserialize_from(&buffer[..])?
}
None => addition,
None => value,
};
if final_value.is_empty() {
// If the database entry exists, delete it.
if db_key_exists == true {
index.script_language_docids.delete(wtxn, &key)?;
}
} else {
index.script_language_docids.put(wtxn, &key, &final_value)?;
}
index.script_language_docids.put(wtxn, &key, &final_value)?;
}
}
}
@ -387,6 +379,13 @@ fn merge_word_docids_reader_into_fst(
Ok(builder.into_set())
}
fn merge_roaring_bitmaps(new_value: &[u8], db_value: &[u8], buffer: &mut Vec<u8>) -> Result<()> {
let new_value = RoaringBitmap::deserialize_from(new_value)?;
let db_value = RoaringBitmap::deserialize_from(db_value)?;
let value = new_value | db_value;
Ok(serialize_roaring_bitmap(&value, buffer)?)
}
fn merge_cbo_roaring_bitmaps(
new_value: &[u8],
db_value: &[u8],
@ -456,7 +455,6 @@ where
R: io::Read + io::Seek,
FS: for<'a> Fn(&'a [u8], &'a mut Vec<u8>) -> Result<&'a [u8]>,
FM: Fn(&[u8], &[u8], &mut Vec<u8>) -> Result<()>,
K: for<'a> heed::BytesDecode<'a>,
{
puffin::profile_function!(format!("number of entries: {}", data.len()));
@ -477,12 +475,6 @@ where
let mut cursor = data.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? {
if valid_lmdb_key(key) {
debug_assert!(
K::bytes_decode(&key).is_some(),
"Couldn't decode key with the database decoder, key length: {} - key bytes: {:x?}",
key.len(),
&key
);
buffer.clear();
let value = serialize_value(value, &mut buffer)?;
unsafe { database.append(key, value)? };

View File

@ -21,7 +21,6 @@ pub use self::words_prefixes_fst::WordsPrefixesFst;
mod available_documents_ids;
mod clear_documents;
pub(crate) mod del_add;
mod delete_documents;
pub(crate) mod facet;
mod index_documents;

View File

@ -5,15 +5,15 @@ use heed::types::{ByteSlice, Str};
use heed::Database;
use crate::update::index_documents::{
create_sorter, merge_cbo_roaring_bitmaps, sorter_into_lmdb_database, valid_lmdb_key,
create_sorter, merge_roaring_bitmaps, sorter_into_lmdb_database, valid_lmdb_key,
CursorClonableMmap, MergeFn,
};
use crate::{CboRoaringBitmapCodec, Result};
use crate::{Result, RoaringBitmapCodec};
pub struct WordPrefixDocids<'t, 'u, 'i> {
wtxn: &'t mut heed::RwTxn<'i, 'u>,
word_docids: Database<Str, CboRoaringBitmapCodec>,
word_prefix_docids: Database<Str, CboRoaringBitmapCodec>,
word_docids: Database<Str, RoaringBitmapCodec>,
word_prefix_docids: Database<Str, RoaringBitmapCodec>,
pub(crate) chunk_compression_type: CompressionType,
pub(crate) chunk_compression_level: Option<u32>,
pub(crate) max_nb_chunks: Option<usize>,
@ -23,8 +23,8 @@ pub struct WordPrefixDocids<'t, 'u, 'i> {
impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> {
pub fn new(
wtxn: &'t mut heed::RwTxn<'i, 'u>,
word_docids: Database<Str, CboRoaringBitmapCodec>,
word_prefix_docids: Database<Str, CboRoaringBitmapCodec>,
word_docids: Database<Str, RoaringBitmapCodec>,
word_prefix_docids: Database<Str, RoaringBitmapCodec>,
) -> WordPrefixDocids<'t, 'u, 'i> {
WordPrefixDocids {
wtxn,
@ -40,7 +40,6 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> {
#[logging_timer::time("WordPrefixDocids::{}")]
pub fn execute(
self,
// TODO grenad::Reader<onkv::Reader<Word, obkv::Reader<DelAdd, CboRoaringBitmap>>>
mut new_word_docids_iter: grenad::ReaderCursor<CursorClonableMmap>,
new_prefix_fst_words: &[String],
common_prefix_fst_words: &[&[String]],
@ -52,8 +51,7 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> {
// and write into it at the same time, therefore we write into another file.
let mut prefix_docids_sorter = create_sorter(
grenad::SortAlgorithm::Unstable,
// TODO change to merge_deladd_cbo_roaring_bitmaps
merge_cbo_roaring_bitmaps,
merge_roaring_bitmaps,
self.chunk_compression_type,
self.chunk_compression_level,
self.max_nb_chunks,
@ -98,7 +96,6 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> {
let prefix = std::str::from_utf8(prefix.as_bytes())?;
for result in db.prefix_iter(self.wtxn, prefix)? {
let (_word, data) = result?;
// TODO fake a DelAdd -> Add(`data`)
prefix_docids_sorter.insert(prefix, data)?;
}
}
@ -114,14 +111,11 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> {
drop(iter);
// We finally write the word prefix docids into the LMDB database.
// TODO introduce a new function that is similar to `append_entries_into_database`
// and accepts the `merge_deladd_cbo_roaring_bitmaps` function
sorter_into_lmdb_database(
self.wtxn,
*self.word_prefix_docids.as_polymorph(),
prefix_docids_sorter,
// TODO change to `merge_deladd_cbo_roaring_bitmaps`
merge_cbo_roaring_bitmaps,
merge_roaring_bitmaps,
)?;
Ok(())
@ -133,7 +127,6 @@ fn write_prefixes_in_sorter(
sorter: &mut grenad::Sorter<MergeFn>,
) -> Result<()> {
for (key, data_slices) in prefixes.drain() {
// TODO merge keys before inserting them in the sorter
for data in data_slices {
if valid_lmdb_key(&key) {
sorter.insert(&key, data)?;