mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-11-04 01:46:28 +00:00 
			
		
		
		
	refactor faceted and searchable pipeline
This commit is contained in:
		@@ -678,6 +678,23 @@ impl Index {
 | 
			
		||||
            .get(rtxn, main_key::USER_DEFINED_SEARCHABLE_FIELDS_KEY)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Identical to `user_defined_searchable_fields`, but returns ids instead.
 | 
			
		||||
    pub fn user_defined_searchable_fields_ids(&self, rtxn: &RoTxn) -> Result<Option<Vec<FieldId>>> {
 | 
			
		||||
        match self.user_defined_searchable_fields(rtxn)? {
 | 
			
		||||
            Some(fields) => {
 | 
			
		||||
                let fields_ids_map = self.fields_ids_map(rtxn)?;
 | 
			
		||||
                let mut fields_ids = Vec::new();
 | 
			
		||||
                for name in fields {
 | 
			
		||||
                    if let Some(field_id) = fields_ids_map.id(name) {
 | 
			
		||||
                        fields_ids.push(field_id);
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
                Ok(Some(fields_ids))
 | 
			
		||||
            }
 | 
			
		||||
            None => Ok(None),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /* filterable fields */
 | 
			
		||||
 | 
			
		||||
    /// Writes the filterable fields names in the database.
 | 
			
		||||
@@ -824,11 +841,11 @@ impl Index {
 | 
			
		||||
 | 
			
		||||
    /// Identical to `user_defined_faceted_fields`, but returns ids instead.
 | 
			
		||||
    pub fn user_defined_faceted_fields_ids(&self, rtxn: &RoTxn) -> Result<HashSet<FieldId>> {
 | 
			
		||||
        let fields = self.faceted_fields(rtxn)?;
 | 
			
		||||
        let fields = self.user_defined_faceted_fields(rtxn)?;
 | 
			
		||||
        let fields_ids_map = self.fields_ids_map(rtxn)?;
 | 
			
		||||
 | 
			
		||||
        let mut fields_ids = HashSet::new();
 | 
			
		||||
        for name in fields.into_iter() {
 | 
			
		||||
        for name in fields {
 | 
			
		||||
            if let Some(field_id) = fields_ids_map.id(&name) {
 | 
			
		||||
                fields_ids.insert(field_id);
 | 
			
		||||
            }
 | 
			
		||||
 
 | 
			
		||||
@@ -71,8 +71,8 @@ pub enum DelAddOperation {
 | 
			
		||||
/// putting each deletion obkv's keys under an DelAdd::Deletion
 | 
			
		||||
/// and putting each addition obkv's keys under an DelAdd::Addition
 | 
			
		||||
pub fn del_add_from_two_obkvs<K: obkv::Key + PartialOrd + Ord>(
 | 
			
		||||
    deletion: obkv::KvReader<K>,
 | 
			
		||||
    addition: obkv::KvReader<K>,
 | 
			
		||||
    deletion: &obkv::KvReader<K>,
 | 
			
		||||
    addition: &obkv::KvReader<K>,
 | 
			
		||||
    buffer: &mut Vec<u8>,
 | 
			
		||||
) -> Result<(), std::io::Error> {
 | 
			
		||||
    use itertools::merge_join_by;
 | 
			
		||||
 
 | 
			
		||||
@@ -1,4 +1,4 @@
 | 
			
		||||
use std::collections::{HashMap, HashSet};
 | 
			
		||||
use std::collections::HashMap;
 | 
			
		||||
use std::convert::TryInto;
 | 
			
		||||
use std::fs::File;
 | 
			
		||||
use std::io::BufReader;
 | 
			
		||||
@@ -12,6 +12,7 @@ use serde_json::Value;
 | 
			
		||||
use super::helpers::{create_sorter, keep_latest_obkv, sorter_into_reader, GrenadParameters};
 | 
			
		||||
use crate::error::{InternalError, SerializationError};
 | 
			
		||||
use crate::update::del_add::{del_add_from_two_obkvs, DelAdd, KvReaderDelAdd};
 | 
			
		||||
use crate::update::settings::{InnerIndexSettings, InnerIndexSettingsDiff};
 | 
			
		||||
use crate::{FieldId, Result, MAX_POSITION_PER_ATTRIBUTE, MAX_WORD_LENGTH};
 | 
			
		||||
 | 
			
		||||
pub type ScriptLanguageDocidsMap = HashMap<(Script, Language), (RoaringBitmap, RoaringBitmap)>;
 | 
			
		||||
@@ -25,10 +26,7 @@ pub type ScriptLanguageDocidsMap = HashMap<(Script, Language), (RoaringBitmap, R
 | 
			
		||||
pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
 | 
			
		||||
    obkv_documents: grenad::Reader<R>,
 | 
			
		||||
    indexer: GrenadParameters,
 | 
			
		||||
    searchable_fields: &Option<HashSet<FieldId>>,
 | 
			
		||||
    stop_words: Option<&fst::Set<Vec<u8>>>,
 | 
			
		||||
    allowed_separators: Option<&[&str]>,
 | 
			
		||||
    dictionary: Option<&[&str]>,
 | 
			
		||||
    settings_diff: &InnerIndexSettingsDiff,
 | 
			
		||||
    max_positions_per_attributes: Option<u32>,
 | 
			
		||||
) -> Result<(grenad::Reader<BufReader<File>>, ScriptLanguageDocidsMap)> {
 | 
			
		||||
    puffin::profile_function!();
 | 
			
		||||
@@ -56,8 +54,33 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
 | 
			
		||||
    let mut value_buffer = Vec::new();
 | 
			
		||||
 | 
			
		||||
    // initialize tokenizer.
 | 
			
		||||
    let mut builder = tokenizer_builder(stop_words, allowed_separators, dictionary, None);
 | 
			
		||||
    let tokenizer = builder.build();
 | 
			
		||||
    // TODO: Fix ugly allocation
 | 
			
		||||
    let old_stop_words = settings_diff.old.stop_words.as_ref();
 | 
			
		||||
    let old_separators: Option<Vec<_>> =
 | 
			
		||||
        settings_diff.old.allowed_separators.map(|s| s.iter().map(String::as_str).collect());
 | 
			
		||||
    let old_dictionary: Option<Vec<_>> =
 | 
			
		||||
        settings_diff.old.dictionary.map(|s| s.iter().map(String::as_str).collect());
 | 
			
		||||
    let mut del_builder = tokenizer_builder(
 | 
			
		||||
        old_stop_words,
 | 
			
		||||
        old_separators.as_deref(),
 | 
			
		||||
        old_dictionary.as_deref(),
 | 
			
		||||
        None,
 | 
			
		||||
    );
 | 
			
		||||
    let del_tokenizer = del_builder.build();
 | 
			
		||||
 | 
			
		||||
    // TODO: Fix ugly allocation
 | 
			
		||||
    let new_stop_words = settings_diff.new.stop_words.as_ref();
 | 
			
		||||
    let new_separators: Option<Vec<_>> =
 | 
			
		||||
        settings_diff.new.allowed_separators.map(|s| s.iter().map(String::as_str).collect());
 | 
			
		||||
    let new_dictionary: Option<Vec<_>> =
 | 
			
		||||
        settings_diff.new.dictionary.map(|s| s.iter().map(String::as_str).collect());
 | 
			
		||||
    let mut add_builder = tokenizer_builder(
 | 
			
		||||
        new_stop_words,
 | 
			
		||||
        new_separators.as_deref(),
 | 
			
		||||
        new_dictionary.as_deref(),
 | 
			
		||||
        None,
 | 
			
		||||
    );
 | 
			
		||||
    let add_tokenizer = add_builder.build();
 | 
			
		||||
 | 
			
		||||
    // iterate over documents.
 | 
			
		||||
    let mut cursor = obkv_documents.into_cursor()?;
 | 
			
		||||
@@ -69,7 +92,10 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
 | 
			
		||||
        let obkv = KvReader::<FieldId>::new(value);
 | 
			
		||||
 | 
			
		||||
        // if the searchable fields didn't change, skip the searchable indexing for this document.
 | 
			
		||||
        if !searchable_fields_changed(&KvReader::<FieldId>::new(value), searchable_fields) {
 | 
			
		||||
        if !searchable_fields_changed(
 | 
			
		||||
            &KvReader::<FieldId>::new(value),
 | 
			
		||||
            &settings_diff.new.searchable_fields_ids,
 | 
			
		||||
        ) {
 | 
			
		||||
            continue;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
@@ -85,11 +111,8 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
 | 
			
		||||
                // deletions
 | 
			
		||||
                lang_safe_tokens_from_document(
 | 
			
		||||
                    &obkv,
 | 
			
		||||
                    searchable_fields,
 | 
			
		||||
                    &tokenizer,
 | 
			
		||||
                    stop_words,
 | 
			
		||||
                    allowed_separators,
 | 
			
		||||
                    dictionary,
 | 
			
		||||
                    &settings_diff.old,
 | 
			
		||||
                    &del_tokenizer,
 | 
			
		||||
                    max_positions_per_attributes,
 | 
			
		||||
                    DelAdd::Deletion,
 | 
			
		||||
                    &mut del_buffers,
 | 
			
		||||
@@ -99,11 +122,8 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
 | 
			
		||||
                // additions
 | 
			
		||||
                lang_safe_tokens_from_document(
 | 
			
		||||
                    &obkv,
 | 
			
		||||
                    searchable_fields,
 | 
			
		||||
                    &tokenizer,
 | 
			
		||||
                    stop_words,
 | 
			
		||||
                    allowed_separators,
 | 
			
		||||
                    dictionary,
 | 
			
		||||
                    &settings_diff.new,
 | 
			
		||||
                    &add_tokenizer,
 | 
			
		||||
                    max_positions_per_attributes,
 | 
			
		||||
                    DelAdd::Addition,
 | 
			
		||||
                    &mut add_buffers,
 | 
			
		||||
@@ -118,8 +138,8 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
 | 
			
		||||
        // transforming two KV<FieldId, KV<u16, String>> into one KV<FieldId, KV<DelAdd, KV<u16, String>>>
 | 
			
		||||
        value_buffer.clear();
 | 
			
		||||
        del_add_from_two_obkvs(
 | 
			
		||||
            KvReader::<FieldId>::new(del_obkv),
 | 
			
		||||
            KvReader::<FieldId>::new(add_obkv),
 | 
			
		||||
            &KvReader::<FieldId>::new(del_obkv),
 | 
			
		||||
            &KvReader::<FieldId>::new(add_obkv),
 | 
			
		||||
            &mut value_buffer,
 | 
			
		||||
        )?;
 | 
			
		||||
 | 
			
		||||
@@ -160,7 +180,7 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
 | 
			
		||||
/// Check if any searchable fields of a document changed.
 | 
			
		||||
fn searchable_fields_changed(
 | 
			
		||||
    obkv: &KvReader<FieldId>,
 | 
			
		||||
    searchable_fields: &Option<HashSet<FieldId>>,
 | 
			
		||||
    searchable_fields: &Option<Vec<FieldId>>,
 | 
			
		||||
) -> bool {
 | 
			
		||||
    for (field_id, field_bytes) in obkv.iter() {
 | 
			
		||||
        if searchable_fields.as_ref().map_or(true, |sf| sf.contains(&field_id)) {
 | 
			
		||||
@@ -206,14 +226,10 @@ fn tokenizer_builder<'a>(
 | 
			
		||||
 | 
			
		||||
/// Extract words mapped with their positions of a document,
 | 
			
		||||
/// ensuring no Language detection mistakes was made.
 | 
			
		||||
#[allow(clippy::too_many_arguments)] // FIXME: consider grouping arguments in a struct
 | 
			
		||||
fn lang_safe_tokens_from_document<'a>(
 | 
			
		||||
    obkv: &KvReader<FieldId>,
 | 
			
		||||
    searchable_fields: &Option<HashSet<FieldId>>,
 | 
			
		||||
    settings: &InnerIndexSettings,
 | 
			
		||||
    tokenizer: &Tokenizer,
 | 
			
		||||
    stop_words: Option<&fst::Set<Vec<u8>>>,
 | 
			
		||||
    allowed_separators: Option<&[&str]>,
 | 
			
		||||
    dictionary: Option<&[&str]>,
 | 
			
		||||
    max_positions_per_attributes: u32,
 | 
			
		||||
    del_add: DelAdd,
 | 
			
		||||
    buffers: &'a mut Buffers,
 | 
			
		||||
@@ -222,7 +238,7 @@ fn lang_safe_tokens_from_document<'a>(
 | 
			
		||||
 | 
			
		||||
    tokens_from_document(
 | 
			
		||||
        obkv,
 | 
			
		||||
        searchable_fields,
 | 
			
		||||
        &settings.searchable_fields_ids,
 | 
			
		||||
        tokenizer,
 | 
			
		||||
        max_positions_per_attributes,
 | 
			
		||||
        del_add,
 | 
			
		||||
@@ -246,12 +262,14 @@ fn lang_safe_tokens_from_document<'a>(
 | 
			
		||||
        // then we don't rerun the extraction.
 | 
			
		||||
        if !script_language.is_empty() {
 | 
			
		||||
            // build a new temporary tokenizer including the allow list.
 | 
			
		||||
            let mut builder = tokenizer_builder(
 | 
			
		||||
                stop_words,
 | 
			
		||||
                allowed_separators,
 | 
			
		||||
                dictionary,
 | 
			
		||||
                Some(&script_language),
 | 
			
		||||
            );
 | 
			
		||||
            // TODO: Fix ugly allocation
 | 
			
		||||
            let stop_words = settings.stop_words.as_ref();
 | 
			
		||||
            let separators: Option<Vec<_>> =
 | 
			
		||||
                settings.allowed_separators.map(|s| s.iter().map(String::as_str).collect());
 | 
			
		||||
            let dictionary: Option<Vec<_>> =
 | 
			
		||||
                settings.dictionary.map(|s| s.iter().map(String::as_str).collect());
 | 
			
		||||
            let mut builder =
 | 
			
		||||
                tokenizer_builder(stop_words, separators.as_deref(), dictionary.as_deref(), None);
 | 
			
		||||
            let tokenizer = builder.build();
 | 
			
		||||
 | 
			
		||||
            script_language_word_count.clear();
 | 
			
		||||
@@ -259,7 +277,7 @@ fn lang_safe_tokens_from_document<'a>(
 | 
			
		||||
            // rerun the extraction.
 | 
			
		||||
            tokens_from_document(
 | 
			
		||||
                obkv,
 | 
			
		||||
                searchable_fields,
 | 
			
		||||
                &settings.searchable_fields_ids,
 | 
			
		||||
                &tokenizer,
 | 
			
		||||
                max_positions_per_attributes,
 | 
			
		||||
                del_add,
 | 
			
		||||
@@ -276,7 +294,7 @@ fn lang_safe_tokens_from_document<'a>(
 | 
			
		||||
/// Extract words mapped with their positions of a document.
 | 
			
		||||
fn tokens_from_document<'a>(
 | 
			
		||||
    obkv: &KvReader<FieldId>,
 | 
			
		||||
    searchable_fields: &Option<HashSet<FieldId>>,
 | 
			
		||||
    searchable_fields: &Option<Vec<FieldId>>,
 | 
			
		||||
    tokenizer: &Tokenizer,
 | 
			
		||||
    max_positions_per_attributes: u32,
 | 
			
		||||
    del_add: DelAdd,
 | 
			
		||||
 
 | 
			
		||||
@@ -10,6 +10,7 @@ use crate::heed_codec::facet::{
 | 
			
		||||
    FacetGroupKey, FacetGroupKeyCodec, FieldDocIdFacetF64Codec, OrderedF64Codec,
 | 
			
		||||
};
 | 
			
		||||
use crate::update::del_add::{KvReaderDelAdd, KvWriterDelAdd};
 | 
			
		||||
use crate::update::settings::InnerIndexSettingsDiff;
 | 
			
		||||
use crate::Result;
 | 
			
		||||
 | 
			
		||||
/// Extracts the facet number and the documents ids where this facet number appear.
 | 
			
		||||
@@ -20,6 +21,7 @@ use crate::Result;
 | 
			
		||||
pub fn extract_facet_number_docids<R: io::Read + io::Seek>(
 | 
			
		||||
    fid_docid_facet_number: grenad::Reader<R>,
 | 
			
		||||
    indexer: GrenadParameters,
 | 
			
		||||
    _settings_diff: &InnerIndexSettingsDiff,
 | 
			
		||||
) -> Result<grenad::Reader<BufReader<File>>> {
 | 
			
		||||
    puffin::profile_function!();
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -15,6 +15,7 @@ use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
 | 
			
		||||
use crate::update::index_documents::helpers::{
 | 
			
		||||
    merge_deladd_btreeset_string, merge_deladd_cbo_roaring_bitmaps,
 | 
			
		||||
};
 | 
			
		||||
use crate::update::settings::InnerIndexSettingsDiff;
 | 
			
		||||
use crate::{FieldId, Result, MAX_FACET_VALUE_LENGTH};
 | 
			
		||||
 | 
			
		||||
/// Extracts the facet string and the documents ids where this facet string appear.
 | 
			
		||||
@@ -25,6 +26,7 @@ use crate::{FieldId, Result, MAX_FACET_VALUE_LENGTH};
 | 
			
		||||
pub fn extract_facet_string_docids<R: io::Read + io::Seek>(
 | 
			
		||||
    docid_fid_facet_string: grenad::Reader<R>,
 | 
			
		||||
    indexer: GrenadParameters,
 | 
			
		||||
    _settings_diff: &InnerIndexSettingsDiff,
 | 
			
		||||
) -> Result<(grenad::Reader<BufReader<File>>, grenad::Reader<BufReader<File>>)> {
 | 
			
		||||
    puffin::profile_function!();
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -1,5 +1,5 @@
 | 
			
		||||
use std::borrow::Cow;
 | 
			
		||||
use std::collections::{BTreeMap, HashSet};
 | 
			
		||||
use std::collections::BTreeMap;
 | 
			
		||||
use std::convert::TryInto;
 | 
			
		||||
use std::fs::File;
 | 
			
		||||
use std::io::{self, BufReader};
 | 
			
		||||
@@ -20,6 +20,7 @@ use crate::error::InternalError;
 | 
			
		||||
use crate::facet::value_encoding::f64_into_bytes;
 | 
			
		||||
use crate::update::del_add::{DelAdd, KvWriterDelAdd};
 | 
			
		||||
use crate::update::index_documents::{create_writer, writer_into_reader};
 | 
			
		||||
use crate::update::settings::InnerIndexSettingsDiff;
 | 
			
		||||
use crate::{CboRoaringBitmapCodec, DocumentId, Error, FieldId, Result, MAX_FACET_VALUE_LENGTH};
 | 
			
		||||
 | 
			
		||||
/// The length of the elements that are always in the buffer when inserting new values.
 | 
			
		||||
@@ -43,7 +44,7 @@ pub struct ExtractedFacetValues {
 | 
			
		||||
pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
 | 
			
		||||
    obkv_documents: grenad::Reader<R>,
 | 
			
		||||
    indexer: GrenadParameters,
 | 
			
		||||
    faceted_fields: &HashSet<FieldId>,
 | 
			
		||||
    settings_diff: &InnerIndexSettingsDiff,
 | 
			
		||||
    geo_fields_ids: Option<(FieldId, FieldId)>,
 | 
			
		||||
) -> Result<ExtractedFacetValues> {
 | 
			
		||||
    puffin::profile_function!();
 | 
			
		||||
@@ -82,7 +83,9 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
 | 
			
		||||
        let obkv = obkv::KvReader::new(value);
 | 
			
		||||
 | 
			
		||||
        for (field_id, field_bytes) in obkv.iter() {
 | 
			
		||||
            if faceted_fields.contains(&field_id) {
 | 
			
		||||
            let delete_faceted = settings_diff.old.faceted_fields_ids.contains(&field_id);
 | 
			
		||||
            let add_faceted = settings_diff.new.faceted_fields_ids.contains(&field_id);
 | 
			
		||||
            if delete_faceted || add_faceted {
 | 
			
		||||
                numbers_key_buffer.clear();
 | 
			
		||||
                strings_key_buffer.clear();
 | 
			
		||||
 | 
			
		||||
@@ -99,11 +102,12 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
 | 
			
		||||
                strings_key_buffer.extend_from_slice(docid_bytes);
 | 
			
		||||
 | 
			
		||||
                let del_add_obkv = obkv::KvReader::new(field_bytes);
 | 
			
		||||
                let del_value = match del_add_obkv.get(DelAdd::Deletion) {
 | 
			
		||||
                let del_value = match del_add_obkv.get(DelAdd::Deletion).filter(|_| delete_faceted)
 | 
			
		||||
                {
 | 
			
		||||
                    Some(bytes) => Some(from_slice(bytes).map_err(InternalError::SerdeJson)?),
 | 
			
		||||
                    None => None,
 | 
			
		||||
                };
 | 
			
		||||
                let add_value = match del_add_obkv.get(DelAdd::Addition) {
 | 
			
		||||
                let add_value = match del_add_obkv.get(DelAdd::Addition).filter(|_| add_faceted) {
 | 
			
		||||
                    Some(bytes) => Some(from_slice(bytes).map_err(InternalError::SerdeJson)?),
 | 
			
		||||
                    None => None,
 | 
			
		||||
                };
 | 
			
		||||
 
 | 
			
		||||
@@ -10,6 +10,7 @@ use super::helpers::{
 | 
			
		||||
use crate::error::SerializationError;
 | 
			
		||||
use crate::index::db_name::DOCID_WORD_POSITIONS;
 | 
			
		||||
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
 | 
			
		||||
use crate::update::settings::InnerIndexSettingsDiff;
 | 
			
		||||
use crate::Result;
 | 
			
		||||
 | 
			
		||||
const MAX_COUNTED_WORDS: usize = 30;
 | 
			
		||||
@@ -23,6 +24,7 @@ const MAX_COUNTED_WORDS: usize = 30;
 | 
			
		||||
pub fn extract_fid_word_count_docids<R: io::Read + io::Seek>(
 | 
			
		||||
    docid_word_positions: grenad::Reader<R>,
 | 
			
		||||
    indexer: GrenadParameters,
 | 
			
		||||
    _settings_diff: &InnerIndexSettingsDiff,
 | 
			
		||||
) -> Result<grenad::Reader<BufReader<File>>> {
 | 
			
		||||
    puffin::profile_function!();
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -1,20 +1,22 @@
 | 
			
		||||
use std::collections::{BTreeSet, HashSet};
 | 
			
		||||
use std::collections::BTreeSet;
 | 
			
		||||
use std::fs::File;
 | 
			
		||||
use std::io::{self, BufReader};
 | 
			
		||||
 | 
			
		||||
use heed::BytesDecode;
 | 
			
		||||
use heed::{BytesDecode, BytesEncode};
 | 
			
		||||
use obkv::KvReaderU16;
 | 
			
		||||
use roaring::RoaringBitmap;
 | 
			
		||||
 | 
			
		||||
use super::helpers::{
 | 
			
		||||
    create_sorter, create_writer, merge_deladd_cbo_roaring_bitmaps, sorter_into_reader,
 | 
			
		||||
    try_split_array_at, writer_into_reader, GrenadParameters,
 | 
			
		||||
    create_sorter, create_writer, merge_deladd_cbo_roaring_bitmaps, try_split_array_at,
 | 
			
		||||
    writer_into_reader, GrenadParameters,
 | 
			
		||||
};
 | 
			
		||||
use crate::error::SerializationError;
 | 
			
		||||
use crate::heed_codec::StrBEU16Codec;
 | 
			
		||||
use crate::index::db_name::DOCID_WORD_POSITIONS;
 | 
			
		||||
use crate::update::del_add::{is_noop_del_add_obkv, DelAdd, KvReaderDelAdd, KvWriterDelAdd};
 | 
			
		||||
use crate::update::settings::InnerIndexSettingsDiff;
 | 
			
		||||
use crate::update::MergeFn;
 | 
			
		||||
use crate::{DocumentId, FieldId, Result};
 | 
			
		||||
use crate::{CboRoaringBitmapCodec, DocumentId, FieldId, Result};
 | 
			
		||||
 | 
			
		||||
/// Extracts the word and the documents ids where this word appear.
 | 
			
		||||
///
 | 
			
		||||
@@ -27,7 +29,7 @@ use crate::{DocumentId, FieldId, Result};
 | 
			
		||||
pub fn extract_word_docids<R: io::Read + io::Seek>(
 | 
			
		||||
    docid_word_positions: grenad::Reader<R>,
 | 
			
		||||
    indexer: GrenadParameters,
 | 
			
		||||
    exact_attributes: &HashSet<FieldId>,
 | 
			
		||||
    settings_diff: &InnerIndexSettingsDiff,
 | 
			
		||||
) -> Result<(
 | 
			
		||||
    grenad::Reader<BufReader<File>>,
 | 
			
		||||
    grenad::Reader<BufReader<File>>,
 | 
			
		||||
@@ -43,7 +45,7 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
 | 
			
		||||
        indexer.chunk_compression_type,
 | 
			
		||||
        indexer.chunk_compression_level,
 | 
			
		||||
        indexer.max_nb_chunks,
 | 
			
		||||
        max_memory.map(|x| x / 3),
 | 
			
		||||
        max_memory,
 | 
			
		||||
    );
 | 
			
		||||
    let mut key_buffer = Vec::new();
 | 
			
		||||
    let mut del_words = BTreeSet::new();
 | 
			
		||||
@@ -85,30 +87,29 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
 | 
			
		||||
        add_words.clear();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    let mut word_docids_sorter = create_sorter(
 | 
			
		||||
        grenad::SortAlgorithm::Unstable,
 | 
			
		||||
        merge_deladd_cbo_roaring_bitmaps,
 | 
			
		||||
        indexer.chunk_compression_type,
 | 
			
		||||
        indexer.chunk_compression_level,
 | 
			
		||||
        indexer.max_nb_chunks,
 | 
			
		||||
        max_memory.map(|x| x / 3),
 | 
			
		||||
    );
 | 
			
		||||
 | 
			
		||||
    let mut exact_word_docids_sorter = create_sorter(
 | 
			
		||||
        grenad::SortAlgorithm::Unstable,
 | 
			
		||||
        merge_deladd_cbo_roaring_bitmaps,
 | 
			
		||||
        indexer.chunk_compression_type,
 | 
			
		||||
        indexer.chunk_compression_level,
 | 
			
		||||
        indexer.max_nb_chunks,
 | 
			
		||||
        max_memory.map(|x| x / 3),
 | 
			
		||||
    );
 | 
			
		||||
 | 
			
		||||
    let mut word_fid_docids_writer = create_writer(
 | 
			
		||||
        indexer.chunk_compression_type,
 | 
			
		||||
        indexer.chunk_compression_level,
 | 
			
		||||
        tempfile::tempfile()?,
 | 
			
		||||
    );
 | 
			
		||||
 | 
			
		||||
    let mut word_docids_writer = create_writer(
 | 
			
		||||
        indexer.chunk_compression_type,
 | 
			
		||||
        indexer.chunk_compression_level,
 | 
			
		||||
        tempfile::tempfile()?,
 | 
			
		||||
    );
 | 
			
		||||
 | 
			
		||||
    let mut exact_word_docids_writer = create_writer(
 | 
			
		||||
        indexer.chunk_compression_type,
 | 
			
		||||
        indexer.chunk_compression_level,
 | 
			
		||||
        tempfile::tempfile()?,
 | 
			
		||||
    );
 | 
			
		||||
 | 
			
		||||
    let mut word: Option<String> = None;
 | 
			
		||||
    let mut deletions = RoaringBitmap::new();
 | 
			
		||||
    let mut additions = RoaringBitmap::new();
 | 
			
		||||
    let mut exact_deletions = RoaringBitmap::new();
 | 
			
		||||
    let mut exact_additions = RoaringBitmap::new();
 | 
			
		||||
    let mut iter = word_fid_docids_sorter.into_stream_merger_iter()?;
 | 
			
		||||
    // TODO: replace sorters by writers by accumulating values into a buffer before inserting them.
 | 
			
		||||
    while let Some((key, value)) = iter.next()? {
 | 
			
		||||
@@ -117,20 +118,69 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
 | 
			
		||||
            word_fid_docids_writer.insert(key, value)?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let (word, fid) = StrBEU16Codec::bytes_decode(key)
 | 
			
		||||
        let (w, fid) = StrBEU16Codec::bytes_decode(key)
 | 
			
		||||
            .map_err(|_| SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?;
 | 
			
		||||
 | 
			
		||||
        // every words contained in an attribute set to exact must be pushed in the exact_words list.
 | 
			
		||||
        if exact_attributes.contains(&fid) {
 | 
			
		||||
            exact_word_docids_sorter.insert(word.as_bytes(), value)?;
 | 
			
		||||
        if let Some(word) = word {
 | 
			
		||||
            if word.as_str() != w {
 | 
			
		||||
                docids_into_writers(&word, &deletions, &additions, &mut word_docids_writer);
 | 
			
		||||
                docids_into_writers(
 | 
			
		||||
                    &word,
 | 
			
		||||
                    &exact_deletions,
 | 
			
		||||
                    &exact_additions,
 | 
			
		||||
                    &mut exact_word_docids_writer,
 | 
			
		||||
                );
 | 
			
		||||
                let word = Some(w.to_string());
 | 
			
		||||
                // clear buffers
 | 
			
		||||
                deletions.clear();
 | 
			
		||||
                additions.clear();
 | 
			
		||||
                exact_deletions.clear();
 | 
			
		||||
                exact_additions.clear();
 | 
			
		||||
            }
 | 
			
		||||
        } else {
 | 
			
		||||
            word_docids_sorter.insert(word.as_bytes(), value)?;
 | 
			
		||||
            let word = Some(w.to_string());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // merge all deletions
 | 
			
		||||
        let obkv = KvReaderDelAdd::new(value);
 | 
			
		||||
        if let Some(value) = obkv.get(DelAdd::Deletion) {
 | 
			
		||||
            let delete_from_exact = settings_diff.old.exact_attributes.contains(&fid);
 | 
			
		||||
            let docids = CboRoaringBitmapCodec::bytes_decode(value).map_err(|_| {
 | 
			
		||||
                SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) }
 | 
			
		||||
            })?;
 | 
			
		||||
            if delete_from_exact {
 | 
			
		||||
                exact_deletions |= docids;
 | 
			
		||||
            } else {
 | 
			
		||||
                deletions |= docids
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        // merge all additions
 | 
			
		||||
        if let Some(value) = obkv.get(DelAdd::Addition) {
 | 
			
		||||
            let add_in_exact = settings_diff.new.exact_attributes.contains(&fid);
 | 
			
		||||
            let docids = CboRoaringBitmapCodec::bytes_decode(value).map_err(|_| {
 | 
			
		||||
                SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) }
 | 
			
		||||
            })?;
 | 
			
		||||
            if add_in_exact {
 | 
			
		||||
                exact_additions |= docids;
 | 
			
		||||
            } else {
 | 
			
		||||
                additions |= docids
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if let Some(word) = word {
 | 
			
		||||
        docids_into_writers(&word, &deletions, &additions, &mut word_docids_writer);
 | 
			
		||||
        docids_into_writers(
 | 
			
		||||
            &word,
 | 
			
		||||
            &exact_deletions,
 | 
			
		||||
            &exact_additions,
 | 
			
		||||
            &mut exact_word_docids_writer,
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    Ok((
 | 
			
		||||
        sorter_into_reader(word_docids_sorter, indexer)?,
 | 
			
		||||
        sorter_into_reader(exact_word_docids_sorter, indexer)?,
 | 
			
		||||
        writer_into_reader(word_docids_writer)?,
 | 
			
		||||
        writer_into_reader(exact_word_docids_writer)?,
 | 
			
		||||
        writer_into_reader(word_fid_docids_writer)?,
 | 
			
		||||
    ))
 | 
			
		||||
}
 | 
			
		||||
@@ -178,3 +228,45 @@ fn words_into_sorter(
 | 
			
		||||
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::extract")]
 | 
			
		||||
fn docids_into_writers<W>(
 | 
			
		||||
    word: &str,
 | 
			
		||||
    deletions: &RoaringBitmap,
 | 
			
		||||
    additions: &RoaringBitmap,
 | 
			
		||||
    writer: &mut grenad::Writer<W>,
 | 
			
		||||
) -> Result<()>
 | 
			
		||||
where
 | 
			
		||||
    W: std::io::Write,
 | 
			
		||||
{
 | 
			
		||||
    if deletions == additions {
 | 
			
		||||
        // if the same value is deleted and added, do nothing.
 | 
			
		||||
        return Ok(());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // Write each value in the same KvDelAdd before inserting it in the final writer.
 | 
			
		||||
    let mut obkv = KvWriterDelAdd::memory();
 | 
			
		||||
    // deletions:
 | 
			
		||||
    if !deletions.is_empty() && !deletions.is_subset(additions) {
 | 
			
		||||
        obkv.insert(
 | 
			
		||||
            DelAdd::Deletion,
 | 
			
		||||
            CboRoaringBitmapCodec::bytes_encode(deletions).map_err(|_| {
 | 
			
		||||
                SerializationError::Encoding { db_name: Some(DOCID_WORD_POSITIONS) }
 | 
			
		||||
            })?,
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
    // additions:
 | 
			
		||||
    if !additions.is_empty() {
 | 
			
		||||
        obkv.insert(
 | 
			
		||||
            DelAdd::Addition,
 | 
			
		||||
            CboRoaringBitmapCodec::bytes_encode(additions).map_err(|_| {
 | 
			
		||||
                SerializationError::Encoding { db_name: Some(DOCID_WORD_POSITIONS) }
 | 
			
		||||
            })?,
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // insert everything in the same writer.
 | 
			
		||||
    writer.insert(word.as_bytes(), obkv.into_inner().unwrap())?;
 | 
			
		||||
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -13,6 +13,7 @@ use crate::error::SerializationError;
 | 
			
		||||
use crate::index::db_name::DOCID_WORD_POSITIONS;
 | 
			
		||||
use crate::proximity::{index_proximity, MAX_DISTANCE};
 | 
			
		||||
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
 | 
			
		||||
use crate::update::settings::InnerIndexSettingsDiff;
 | 
			
		||||
use crate::{DocumentId, Result};
 | 
			
		||||
 | 
			
		||||
/// Extracts the best proximity between pairs of words and the documents ids where this pair appear.
 | 
			
		||||
@@ -23,6 +24,7 @@ use crate::{DocumentId, Result};
 | 
			
		||||
pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
 | 
			
		||||
    docid_word_positions: grenad::Reader<R>,
 | 
			
		||||
    indexer: GrenadParameters,
 | 
			
		||||
    _settings_diff: &InnerIndexSettingsDiff,
 | 
			
		||||
) -> Result<grenad::Reader<BufReader<File>>> {
 | 
			
		||||
    puffin::profile_function!();
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -11,6 +11,7 @@ use super::helpers::{
 | 
			
		||||
use crate::error::SerializationError;
 | 
			
		||||
use crate::index::db_name::DOCID_WORD_POSITIONS;
 | 
			
		||||
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
 | 
			
		||||
use crate::update::settings::InnerIndexSettingsDiff;
 | 
			
		||||
use crate::update::MergeFn;
 | 
			
		||||
use crate::{bucketed_position, DocumentId, Result};
 | 
			
		||||
 | 
			
		||||
@@ -22,6 +23,7 @@ use crate::{bucketed_position, DocumentId, Result};
 | 
			
		||||
pub fn extract_word_position_docids<R: io::Read + io::Seek>(
 | 
			
		||||
    docid_word_positions: grenad::Reader<R>,
 | 
			
		||||
    indexer: GrenadParameters,
 | 
			
		||||
    _settings_diff: &InnerIndexSettingsDiff,
 | 
			
		||||
) -> Result<grenad::Reader<BufReader<File>>> {
 | 
			
		||||
    puffin::profile_function!();
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -31,8 +31,8 @@ use self::extract_word_position_docids::extract_word_position_docids;
 | 
			
		||||
use super::helpers::{as_cloneable_grenad, CursorClonableMmap, GrenadParameters};
 | 
			
		||||
use super::{helpers, TypedChunk};
 | 
			
		||||
use crate::proximity::ProximityPrecision;
 | 
			
		||||
use crate::vector::EmbeddingConfigs;
 | 
			
		||||
use crate::{FieldId, FieldsIdsMap, Result};
 | 
			
		||||
use crate::update::settings::InnerIndexSettingsDiff;
 | 
			
		||||
use crate::{FieldId, Result};
 | 
			
		||||
 | 
			
		||||
/// Extract data for each databases from obkv documents in parallel.
 | 
			
		||||
/// Send data in grenad file over provided Sender.
 | 
			
		||||
@@ -43,18 +43,10 @@ pub(crate) fn data_from_obkv_documents(
 | 
			
		||||
    flattened_obkv_chunks: impl Iterator<Item = Result<grenad::Reader<BufReader<File>>>> + Send,
 | 
			
		||||
    indexer: GrenadParameters,
 | 
			
		||||
    lmdb_writer_sx: Sender<Result<TypedChunk>>,
 | 
			
		||||
    searchable_fields: Option<HashSet<FieldId>>,
 | 
			
		||||
    faceted_fields: HashSet<FieldId>,
 | 
			
		||||
    primary_key_id: FieldId,
 | 
			
		||||
    geo_fields_ids: Option<(FieldId, FieldId)>,
 | 
			
		||||
    field_id_map: FieldsIdsMap,
 | 
			
		||||
    stop_words: Option<fst::Set<Vec<u8>>>,
 | 
			
		||||
    allowed_separators: Option<&[&str]>,
 | 
			
		||||
    dictionary: Option<&[&str]>,
 | 
			
		||||
    settings_diff: &InnerIndexSettingsDiff,
 | 
			
		||||
    max_positions_per_attributes: Option<u32>,
 | 
			
		||||
    exact_attributes: HashSet<FieldId>,
 | 
			
		||||
    proximity_precision: ProximityPrecision,
 | 
			
		||||
    embedders: EmbeddingConfigs,
 | 
			
		||||
) -> Result<()> {
 | 
			
		||||
    puffin::profile_function!();
 | 
			
		||||
 | 
			
		||||
@@ -67,8 +59,7 @@ pub(crate) fn data_from_obkv_documents(
 | 
			
		||||
                        original_documents_chunk,
 | 
			
		||||
                        indexer,
 | 
			
		||||
                        lmdb_writer_sx.clone(),
 | 
			
		||||
                        field_id_map.clone(),
 | 
			
		||||
                        embedders.clone(),
 | 
			
		||||
                        settings_diff,
 | 
			
		||||
                    )
 | 
			
		||||
                })
 | 
			
		||||
                .collect::<Result<()>>()
 | 
			
		||||
@@ -81,13 +72,9 @@ pub(crate) fn data_from_obkv_documents(
 | 
			
		||||
                        flattened_obkv_chunks,
 | 
			
		||||
                        indexer,
 | 
			
		||||
                        lmdb_writer_sx.clone(),
 | 
			
		||||
                        &searchable_fields,
 | 
			
		||||
                        &faceted_fields,
 | 
			
		||||
                        primary_key_id,
 | 
			
		||||
                        geo_fields_ids,
 | 
			
		||||
                        &stop_words,
 | 
			
		||||
                        &allowed_separators,
 | 
			
		||||
                        &dictionary,
 | 
			
		||||
                        settings_diff,
 | 
			
		||||
                        max_positions_per_attributes,
 | 
			
		||||
                    )
 | 
			
		||||
                })
 | 
			
		||||
@@ -100,13 +87,12 @@ pub(crate) fn data_from_obkv_documents(
 | 
			
		||||
                        run_extraction_task::<_, _, grenad::Reader<BufReader<File>>>(
 | 
			
		||||
                            docid_word_positions_chunk.clone(),
 | 
			
		||||
                            indexer,
 | 
			
		||||
                            settings_diff,
 | 
			
		||||
                            lmdb_writer_sx.clone(),
 | 
			
		||||
                            extract_fid_word_count_docids,
 | 
			
		||||
                            TypedChunk::FieldIdWordCountDocids,
 | 
			
		||||
                            "field-id-wordcount-docids",
 | 
			
		||||
                        );
 | 
			
		||||
 | 
			
		||||
                        let exact_attributes = exact_attributes.clone();
 | 
			
		||||
                        run_extraction_task::<
 | 
			
		||||
                            _,
 | 
			
		||||
                            _,
 | 
			
		||||
@@ -118,10 +104,9 @@ pub(crate) fn data_from_obkv_documents(
 | 
			
		||||
                        >(
 | 
			
		||||
                            docid_word_positions_chunk.clone(),
 | 
			
		||||
                            indexer,
 | 
			
		||||
                            settings_diff,
 | 
			
		||||
                            lmdb_writer_sx.clone(),
 | 
			
		||||
                            move |doc_word_pos, indexer| {
 | 
			
		||||
                                extract_word_docids(doc_word_pos, indexer, &exact_attributes)
 | 
			
		||||
                            },
 | 
			
		||||
                            extract_word_docids,
 | 
			
		||||
                            |(
 | 
			
		||||
                                word_docids_reader,
 | 
			
		||||
                                exact_word_docids_reader,
 | 
			
		||||
@@ -139,6 +124,7 @@ pub(crate) fn data_from_obkv_documents(
 | 
			
		||||
                        run_extraction_task::<_, _, grenad::Reader<BufReader<File>>>(
 | 
			
		||||
                            docid_word_positions_chunk.clone(),
 | 
			
		||||
                            indexer,
 | 
			
		||||
                            settings_diff,
 | 
			
		||||
                            lmdb_writer_sx.clone(),
 | 
			
		||||
                            extract_word_position_docids,
 | 
			
		||||
                            TypedChunk::WordPositionDocids,
 | 
			
		||||
@@ -152,6 +138,7 @@ pub(crate) fn data_from_obkv_documents(
 | 
			
		||||
                        >(
 | 
			
		||||
                            fid_docid_facet_strings_chunk.clone(),
 | 
			
		||||
                            indexer,
 | 
			
		||||
                            settings_diff,
 | 
			
		||||
                            lmdb_writer_sx.clone(),
 | 
			
		||||
                            extract_facet_string_docids,
 | 
			
		||||
                            TypedChunk::FieldIdFacetStringDocids,
 | 
			
		||||
@@ -161,22 +148,22 @@ pub(crate) fn data_from_obkv_documents(
 | 
			
		||||
                        run_extraction_task::<_, _, grenad::Reader<BufReader<File>>>(
 | 
			
		||||
                            fid_docid_facet_numbers_chunk.clone(),
 | 
			
		||||
                            indexer,
 | 
			
		||||
                            settings_diff,
 | 
			
		||||
                            lmdb_writer_sx.clone(),
 | 
			
		||||
                            extract_facet_number_docids,
 | 
			
		||||
                            TypedChunk::FieldIdFacetNumberDocids,
 | 
			
		||||
                            "field-id-facet-number-docids",
 | 
			
		||||
                        );
 | 
			
		||||
 | 
			
		||||
                        if proximity_precision == ProximityPrecision::ByWord {
 | 
			
		||||
                            run_extraction_task::<_, _, grenad::Reader<BufReader<File>>>(
 | 
			
		||||
                                docid_word_positions_chunk.clone(),
 | 
			
		||||
                                indexer,
 | 
			
		||||
                                lmdb_writer_sx.clone(),
 | 
			
		||||
                                extract_word_pair_proximity_docids,
 | 
			
		||||
                                TypedChunk::WordPairProximityDocids,
 | 
			
		||||
                                "word-pair-proximity-docids",
 | 
			
		||||
                            );
 | 
			
		||||
                        }
 | 
			
		||||
                        run_extraction_task::<_, _, grenad::Reader<BufReader<File>>>(
 | 
			
		||||
                            docid_word_positions_chunk.clone(),
 | 
			
		||||
                            indexer,
 | 
			
		||||
                            settings_diff,
 | 
			
		||||
                            lmdb_writer_sx.clone(),
 | 
			
		||||
                            extract_word_pair_proximity_docids,
 | 
			
		||||
                            TypedChunk::WordPairProximityDocids,
 | 
			
		||||
                            "word-pair-proximity-docids",
 | 
			
		||||
                        );
 | 
			
		||||
                    }
 | 
			
		||||
 | 
			
		||||
                    Ok(())
 | 
			
		||||
@@ -195,12 +182,17 @@ pub(crate) fn data_from_obkv_documents(
 | 
			
		||||
fn run_extraction_task<FE, FS, M>(
 | 
			
		||||
    chunk: grenad::Reader<CursorClonableMmap>,
 | 
			
		||||
    indexer: GrenadParameters,
 | 
			
		||||
    settings_diff: &InnerIndexSettingsDiff,
 | 
			
		||||
    lmdb_writer_sx: Sender<Result<TypedChunk>>,
 | 
			
		||||
    extract_fn: FE,
 | 
			
		||||
    serialize_fn: FS,
 | 
			
		||||
    name: &'static str,
 | 
			
		||||
) where
 | 
			
		||||
    FE: Fn(grenad::Reader<CursorClonableMmap>, GrenadParameters) -> Result<M>
 | 
			
		||||
    FE: Fn(
 | 
			
		||||
            grenad::Reader<CursorClonableMmap>,
 | 
			
		||||
            GrenadParameters,
 | 
			
		||||
            &InnerIndexSettingsDiff,
 | 
			
		||||
        ) -> Result<M>
 | 
			
		||||
        + Sync
 | 
			
		||||
        + Send
 | 
			
		||||
        + 'static,
 | 
			
		||||
@@ -213,7 +205,7 @@ fn run_extraction_task<FE, FS, M>(
 | 
			
		||||
        let child_span = tracing::trace_span!(target: "indexing::extract::details", parent: ¤t_span, "extract_multiple_chunks");
 | 
			
		||||
        let _entered = child_span.enter();
 | 
			
		||||
        puffin::profile_scope!("extract_multiple_chunks", name);
 | 
			
		||||
        match extract_fn(chunk, indexer) {
 | 
			
		||||
        match extract_fn(chunk, indexer, settings_diff) {
 | 
			
		||||
            Ok(chunk) => {
 | 
			
		||||
                let _ = lmdb_writer_sx.send(Ok(serialize_fn(chunk)));
 | 
			
		||||
            }
 | 
			
		||||
@@ -230,8 +222,7 @@ fn send_original_documents_data(
 | 
			
		||||
    original_documents_chunk: Result<grenad::Reader<BufReader<File>>>,
 | 
			
		||||
    indexer: GrenadParameters,
 | 
			
		||||
    lmdb_writer_sx: Sender<Result<TypedChunk>>,
 | 
			
		||||
    field_id_map: FieldsIdsMap,
 | 
			
		||||
    embedders: EmbeddingConfigs,
 | 
			
		||||
    settings_diff: &InnerIndexSettingsDiff,
 | 
			
		||||
) -> Result<()> {
 | 
			
		||||
    let original_documents_chunk =
 | 
			
		||||
        original_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?;
 | 
			
		||||
@@ -306,13 +297,9 @@ fn send_and_extract_flattened_documents_data(
 | 
			
		||||
    flattened_documents_chunk: Result<grenad::Reader<BufReader<File>>>,
 | 
			
		||||
    indexer: GrenadParameters,
 | 
			
		||||
    lmdb_writer_sx: Sender<Result<TypedChunk>>,
 | 
			
		||||
    searchable_fields: &Option<HashSet<FieldId>>,
 | 
			
		||||
    faceted_fields: &HashSet<FieldId>,
 | 
			
		||||
    primary_key_id: FieldId,
 | 
			
		||||
    geo_fields_ids: Option<(FieldId, FieldId)>,
 | 
			
		||||
    stop_words: &Option<fst::Set<Vec<u8>>>,
 | 
			
		||||
    allowed_separators: &Option<&[&str]>,
 | 
			
		||||
    dictionary: &Option<&[&str]>,
 | 
			
		||||
    settings_diff: &InnerIndexSettingsDiff,
 | 
			
		||||
    max_positions_per_attributes: Option<u32>,
 | 
			
		||||
) -> Result<(
 | 
			
		||||
    grenad::Reader<CursorClonableMmap>,
 | 
			
		||||
@@ -341,10 +328,7 @@ fn send_and_extract_flattened_documents_data(
 | 
			
		||||
                    extract_docid_word_positions(
 | 
			
		||||
                        flattened_documents_chunk.clone(),
 | 
			
		||||
                        indexer,
 | 
			
		||||
                        searchable_fields,
 | 
			
		||||
                        stop_words.as_ref(),
 | 
			
		||||
                        *allowed_separators,
 | 
			
		||||
                        *dictionary,
 | 
			
		||||
                        settings_diff,
 | 
			
		||||
                        max_positions_per_attributes,
 | 
			
		||||
                    )?;
 | 
			
		||||
 | 
			
		||||
@@ -367,7 +351,7 @@ fn send_and_extract_flattened_documents_data(
 | 
			
		||||
                } = extract_fid_docid_facet_values(
 | 
			
		||||
                    flattened_documents_chunk.clone(),
 | 
			
		||||
                    indexer,
 | 
			
		||||
                    faceted_fields,
 | 
			
		||||
                    settings_diff,
 | 
			
		||||
                    geo_fields_ids,
 | 
			
		||||
                )?;
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -253,27 +253,12 @@ where
 | 
			
		||||
            let number_of_documents = self.index.number_of_documents(self.wtxn)?;
 | 
			
		||||
            return Ok(DocumentAdditionResult { indexed_documents: 0, number_of_documents });
 | 
			
		||||
        }
 | 
			
		||||
        let output = self
 | 
			
		||||
        let mut output = self
 | 
			
		||||
            .transform
 | 
			
		||||
            .take()
 | 
			
		||||
            .expect("Invalid document addition state")
 | 
			
		||||
            .output_from_sorter(self.wtxn, &self.progress)?;
 | 
			
		||||
 | 
			
		||||
        let new_facets = output.compute_real_facets(self.wtxn, self.index)?;
 | 
			
		||||
        self.index.put_faceted_fields(self.wtxn, &new_facets)?;
 | 
			
		||||
 | 
			
		||||
        // in case new fields were introduced we're going to recreate the searchable fields.
 | 
			
		||||
        if let Some(faceted_fields) = self.index.user_defined_searchable_fields(self.wtxn)? {
 | 
			
		||||
            // we can't keep references on the faceted fields while we update the index thus we need to own it.
 | 
			
		||||
            let faceted_fields: Vec<String> =
 | 
			
		||||
                faceted_fields.into_iter().map(str::to_string).collect();
 | 
			
		||||
            self.index.put_all_searchable_fields_from_fields_ids_map(
 | 
			
		||||
                self.wtxn,
 | 
			
		||||
                &faceted_fields.iter().map(String::as_ref).collect::<Vec<_>>(),
 | 
			
		||||
                &output.fields_ids_map,
 | 
			
		||||
            )?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let indexed_documents = output.documents_count as u64;
 | 
			
		||||
        let number_of_documents = self.execute_raw(output)?;
 | 
			
		||||
 | 
			
		||||
@@ -296,16 +281,17 @@ where
 | 
			
		||||
 | 
			
		||||
        let TransformOutput {
 | 
			
		||||
            primary_key,
 | 
			
		||||
            fields_ids_map,
 | 
			
		||||
            settings_diff,
 | 
			
		||||
            field_distribution,
 | 
			
		||||
            documents_count,
 | 
			
		||||
            original_documents,
 | 
			
		||||
            flattened_documents,
 | 
			
		||||
        } = output;
 | 
			
		||||
 | 
			
		||||
        // The fields_ids_map is put back to the store now so the rest of the transaction sees an
 | 
			
		||||
        // up to date field map.
 | 
			
		||||
        self.index.put_fields_ids_map(self.wtxn, &fields_ids_map)?;
 | 
			
		||||
        // update the internal facet and searchable list,
 | 
			
		||||
        // because they might have changed due to the nested documents flattening.
 | 
			
		||||
        settings_diff.new.recompute_facets(self.wtxn, self.index)?;
 | 
			
		||||
        settings_diff.new.recompute_searchables(self.wtxn, self.index)?;
 | 
			
		||||
 | 
			
		||||
        let backup_pool;
 | 
			
		||||
        let pool = match self.indexer_config.thread_pool {
 | 
			
		||||
@@ -333,7 +319,7 @@ where
 | 
			
		||||
        ) = crossbeam_channel::unbounded();
 | 
			
		||||
 | 
			
		||||
        // get the primary key field id
 | 
			
		||||
        let primary_key_id = fields_ids_map.id(&primary_key).unwrap();
 | 
			
		||||
        let primary_key_id = output.settings_diff.new.fields_ids_map.id(&primary_key).unwrap();
 | 
			
		||||
 | 
			
		||||
        // get searchable fields for word databases
 | 
			
		||||
        let searchable_fields =
 | 
			
		||||
@@ -400,8 +386,6 @@ where
 | 
			
		||||
 | 
			
		||||
        let max_positions_per_attributes = self.indexer_config.max_positions_per_attributes;
 | 
			
		||||
 | 
			
		||||
        let cloned_embedder = self.embedders.clone();
 | 
			
		||||
 | 
			
		||||
        let mut final_documents_ids = RoaringBitmap::new();
 | 
			
		||||
        let mut databases_seen = 0;
 | 
			
		||||
        let mut word_position_docids = None;
 | 
			
		||||
@@ -410,7 +394,6 @@ where
 | 
			
		||||
        let mut exact_word_docids = None;
 | 
			
		||||
        let mut chunk_accumulator = ChunkAccumulator::default();
 | 
			
		||||
        let mut dimension = HashMap::new();
 | 
			
		||||
        let stop_words = stop_words.map(|sw| sw.map_data(Vec::from).unwrap());
 | 
			
		||||
 | 
			
		||||
        let current_span = tracing::Span::current();
 | 
			
		||||
 | 
			
		||||
@@ -428,10 +411,6 @@ where
 | 
			
		||||
                let flattened_chunk_iter =
 | 
			
		||||
                    grenad_obkv_into_chunks(flattened_documents, pool_params, documents_chunk_size);
 | 
			
		||||
 | 
			
		||||
                let separators: Option<Vec<_>> =
 | 
			
		||||
                    separators.as_ref().map(|x| x.iter().map(String::as_str).collect());
 | 
			
		||||
                let dictionary: Option<Vec<_>> =
 | 
			
		||||
                    dictionary.as_ref().map(|x| x.iter().map(String::as_str).collect());
 | 
			
		||||
                let result = original_chunk_iter.and_then(|original_chunk| {
 | 
			
		||||
                    let flattened_chunk = flattened_chunk_iter?;
 | 
			
		||||
                    // extract all databases from the chunked obkv douments
 | 
			
		||||
@@ -440,18 +419,10 @@ where
 | 
			
		||||
                        flattened_chunk,
 | 
			
		||||
                        pool_params,
 | 
			
		||||
                        lmdb_writer_sx.clone(),
 | 
			
		||||
                        searchable_fields,
 | 
			
		||||
                        faceted_fields,
 | 
			
		||||
                        primary_key_id,
 | 
			
		||||
                        geo_fields_ids,
 | 
			
		||||
                        field_id_map,
 | 
			
		||||
                        stop_words,
 | 
			
		||||
                        separators.as_deref(),
 | 
			
		||||
                        dictionary.as_deref(),
 | 
			
		||||
                        &settings_diff,
 | 
			
		||||
                        max_positions_per_attributes,
 | 
			
		||||
                        exact_attributes,
 | 
			
		||||
                        proximity_precision,
 | 
			
		||||
                        cloned_embedder,
 | 
			
		||||
                    )
 | 
			
		||||
                });
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -21,14 +21,17 @@ use super::{IndexDocumentsMethod, IndexerConfig};
 | 
			
		||||
use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader};
 | 
			
		||||
use crate::error::{Error, InternalError, UserError};
 | 
			
		||||
use crate::index::{db_name, main_key};
 | 
			
		||||
use crate::update::del_add::{into_del_add_obkv, DelAdd, DelAddOperation, KvReaderDelAdd};
 | 
			
		||||
use crate::update::del_add::{
 | 
			
		||||
    del_add_from_two_obkvs, into_del_add_obkv, DelAdd, DelAddOperation, KvReaderDelAdd,
 | 
			
		||||
};
 | 
			
		||||
use crate::update::index_documents::GrenadParameters;
 | 
			
		||||
use crate::update::settings::{InnerIndexSettings, InnerIndexSettingsDiff};
 | 
			
		||||
use crate::update::{AvailableDocumentsIds, UpdateIndexingStep};
 | 
			
		||||
use crate::{FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldsIdsMap, Index, Result};
 | 
			
		||||
 | 
			
		||||
pub struct TransformOutput {
 | 
			
		||||
    pub primary_key: String,
 | 
			
		||||
    pub fields_ids_map: FieldsIdsMap,
 | 
			
		||||
    pub settings_diff: InnerIndexSettingsDiff,
 | 
			
		||||
    pub field_distribution: FieldDistribution,
 | 
			
		||||
    pub documents_count: usize,
 | 
			
		||||
    pub original_documents: File,
 | 
			
		||||
@@ -282,7 +285,9 @@ impl<'a, 'i> Transform<'a, 'i> {
 | 
			
		||||
                    self.original_sorter
 | 
			
		||||
                        .insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?;
 | 
			
		||||
                    let base_obkv = KvReader::new(base_obkv);
 | 
			
		||||
                    if let Some(flattened_obkv) = self.flatten_from_fields_ids_map(base_obkv)? {
 | 
			
		||||
                    if let Some(flattened_obkv) =
 | 
			
		||||
                        Self::flatten_from_fields_ids_map(&base_obkv, &mut self.fields_ids_map)?
 | 
			
		||||
                    {
 | 
			
		||||
                        // we recreate our buffer with the flattened documents
 | 
			
		||||
                        document_sorter_value_buffer.clear();
 | 
			
		||||
                        document_sorter_value_buffer.push(Operation::Addition as u8);
 | 
			
		||||
@@ -315,7 +320,9 @@ impl<'a, 'i> Transform<'a, 'i> {
 | 
			
		||||
                    .insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?;
 | 
			
		||||
 | 
			
		||||
                let flattened_obkv = KvReader::new(&obkv_buffer);
 | 
			
		||||
                if let Some(obkv) = self.flatten_from_fields_ids_map(flattened_obkv)? {
 | 
			
		||||
                if let Some(obkv) =
 | 
			
		||||
                    Self::flatten_from_fields_ids_map(&flattened_obkv, &mut self.fields_ids_map)?
 | 
			
		||||
                {
 | 
			
		||||
                    document_sorter_value_buffer.clear();
 | 
			
		||||
                    document_sorter_value_buffer.push(Operation::Addition as u8);
 | 
			
		||||
                    into_del_add_obkv(
 | 
			
		||||
@@ -524,7 +531,9 @@ impl<'a, 'i> Transform<'a, 'i> {
 | 
			
		||||
 | 
			
		||||
        // flatten it and push it as to delete in the flattened_sorter
 | 
			
		||||
        let flattened_obkv = KvReader::new(base_obkv);
 | 
			
		||||
        if let Some(obkv) = self.flatten_from_fields_ids_map(flattened_obkv)? {
 | 
			
		||||
        if let Some(obkv) =
 | 
			
		||||
            Self::flatten_from_fields_ids_map(&flattened_obkv, &mut self.fields_ids_map)?
 | 
			
		||||
        {
 | 
			
		||||
            // we recreate our buffer with the flattened documents
 | 
			
		||||
            document_sorter_value_buffer.clear();
 | 
			
		||||
            document_sorter_value_buffer.push(Operation::Deletion as u8);
 | 
			
		||||
@@ -541,8 +550,15 @@ impl<'a, 'i> Transform<'a, 'i> {
 | 
			
		||||
 | 
			
		||||
    // Flatten a document from the fields ids map contained in self and insert the new
 | 
			
		||||
    // created fields. Returns `None` if the document doesn't need to be flattened.
 | 
			
		||||
    #[tracing::instrument(level = "trace", skip(self, obkv), target = "indexing::transform")]
 | 
			
		||||
    fn flatten_from_fields_ids_map(&mut self, obkv: KvReader<FieldId>) -> Result<Option<Vec<u8>>> {
 | 
			
		||||
    #[tracing::instrument(
 | 
			
		||||
        level = "trace",
 | 
			
		||||
        skip(obkv, fields_ids_map),
 | 
			
		||||
        target = "indexing::transform"
 | 
			
		||||
    )]
 | 
			
		||||
    fn flatten_from_fields_ids_map(
 | 
			
		||||
        obkv: &KvReader<FieldId>,
 | 
			
		||||
        fields_ids_map: &mut FieldsIdsMap,
 | 
			
		||||
    ) -> Result<Option<Vec<u8>>> {
 | 
			
		||||
        if obkv
 | 
			
		||||
            .iter()
 | 
			
		||||
            .all(|(_, value)| !json_depth_checker::should_flatten_from_unchecked_slice(value))
 | 
			
		||||
@@ -563,7 +579,7 @@ impl<'a, 'i> Transform<'a, 'i> {
 | 
			
		||||
        // all the raw values get inserted directly in the `key_value` vec.
 | 
			
		||||
        for (key, value) in obkv.iter() {
 | 
			
		||||
            if json_depth_checker::should_flatten_from_unchecked_slice(value) {
 | 
			
		||||
                let key = self.fields_ids_map.name(key).ok_or(FieldIdMapMissingEntry::FieldId {
 | 
			
		||||
                let key = fields_ids_map.name(key).ok_or(FieldIdMapMissingEntry::FieldId {
 | 
			
		||||
                    field_id: key,
 | 
			
		||||
                    process: "Flatten from fields ids map.",
 | 
			
		||||
                })?;
 | 
			
		||||
@@ -581,7 +597,7 @@ impl<'a, 'i> Transform<'a, 'i> {
 | 
			
		||||
        // Once we have the flattened version we insert all the new generated fields_ids
 | 
			
		||||
        // (if any) in the fields ids map and serialize the value.
 | 
			
		||||
        for (key, value) in flattened.into_iter() {
 | 
			
		||||
            let fid = self.fields_ids_map.insert(&key).ok_or(UserError::AttributeLimitReached)?;
 | 
			
		||||
            let fid = fields_ids_map.insert(&key).ok_or(UserError::AttributeLimitReached)?;
 | 
			
		||||
            let value = serde_json::to_vec(&value).map_err(InternalError::SerdeJson)?;
 | 
			
		||||
            key_value.push((fid, value.into()));
 | 
			
		||||
        }
 | 
			
		||||
@@ -792,9 +808,18 @@ impl<'a, 'i> Transform<'a, 'i> {
 | 
			
		||||
            fst_new_external_documents_ids_builder.insert(key, value)
 | 
			
		||||
        })?;
 | 
			
		||||
 | 
			
		||||
        let old_inner_settings = InnerIndexSettings::from_index(&self.index, wtxn)?;
 | 
			
		||||
        let mut new_inner_settings = old_inner_settings.clone();
 | 
			
		||||
        new_inner_settings.fields_ids_map = self.fields_ids_map;
 | 
			
		||||
        let settings_diff = InnerIndexSettingsDiff {
 | 
			
		||||
            old: old_inner_settings,
 | 
			
		||||
            new: new_inner_settings,
 | 
			
		||||
            embedding_configs_updated: true,
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        Ok(TransformOutput {
 | 
			
		||||
            primary_key,
 | 
			
		||||
            fields_ids_map: self.fields_ids_map,
 | 
			
		||||
            settings_diff,
 | 
			
		||||
            field_distribution,
 | 
			
		||||
            documents_count: self.documents_count,
 | 
			
		||||
            original_documents: original_documents.into_inner().map_err(|err| err.into_error())?,
 | 
			
		||||
@@ -804,6 +829,38 @@ impl<'a, 'i> Transform<'a, 'i> {
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn rebind_existing_document(
 | 
			
		||||
        old_obkv: KvReader<FieldId>,
 | 
			
		||||
        settings_diff: &InnerIndexSettingsDiff,
 | 
			
		||||
        original_obkv_buffer: &mut Vec<u8>,
 | 
			
		||||
        flattened_obkv_buffer: &mut Vec<u8>,
 | 
			
		||||
    ) -> Result<()> {
 | 
			
		||||
        let mut old_fields_ids_map = settings_diff.old.fields_ids_map.clone();
 | 
			
		||||
        let mut new_fields_ids_map = settings_diff.new.fields_ids_map.clone();
 | 
			
		||||
        let mut obkv_writer = KvWriter::<_, FieldId>::memory();
 | 
			
		||||
        // We iterate over the new `FieldsIdsMap` ids in order and construct the new obkv.
 | 
			
		||||
        for (id, name) in new_fields_ids_map.iter() {
 | 
			
		||||
            if let Some(val) = old_fields_ids_map.id(name).and_then(|id| old_obkv.get(id)) {
 | 
			
		||||
                obkv_writer.insert(id, val)?;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        let new_obkv = KvReader::<FieldId>::new(&obkv_writer.into_inner()?);
 | 
			
		||||
 | 
			
		||||
        // take the non-flattened version if flatten_from_fields_ids_map returns None.
 | 
			
		||||
        let old_flattened = Self::flatten_from_fields_ids_map(&old_obkv, &mut old_fields_ids_map)?
 | 
			
		||||
            .map_or_else(|| old_obkv, |bytes| KvReader::<FieldId>::new(&bytes));
 | 
			
		||||
        let new_flattened = Self::flatten_from_fields_ids_map(&new_obkv, &mut new_fields_ids_map)?
 | 
			
		||||
            .map_or_else(|| new_obkv, |bytes| KvReader::<FieldId>::new(&bytes));
 | 
			
		||||
 | 
			
		||||
        original_obkv_buffer.clear();
 | 
			
		||||
        flattened_obkv_buffer.clear();
 | 
			
		||||
 | 
			
		||||
        del_add_from_two_obkvs(&old_obkv, &new_obkv, original_obkv_buffer)?;
 | 
			
		||||
        del_add_from_two_obkvs(&old_flattened, &new_flattened, flattened_obkv_buffer)?;
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Clear all databases. Returns a `TransformOutput` with a file that contains the documents
 | 
			
		||||
    /// of the index with the attributes reordered accordingly to the `FieldsIdsMap` given as argument.
 | 
			
		||||
    ///
 | 
			
		||||
@@ -811,8 +868,7 @@ impl<'a, 'i> Transform<'a, 'i> {
 | 
			
		||||
    pub fn prepare_for_documents_reindexing(
 | 
			
		||||
        self,
 | 
			
		||||
        wtxn: &mut heed::RwTxn<'i>,
 | 
			
		||||
        old_fields_ids_map: FieldsIdsMap,
 | 
			
		||||
        mut new_fields_ids_map: FieldsIdsMap,
 | 
			
		||||
        settings_diff: InnerIndexSettingsDiff,
 | 
			
		||||
    ) -> Result<TransformOutput> {
 | 
			
		||||
        // There already has been a document addition, the primary key should be set by now.
 | 
			
		||||
        let primary_key = self
 | 
			
		||||
@@ -848,78 +904,27 @@ impl<'a, 'i> Transform<'a, 'i> {
 | 
			
		||||
            self.indexer_settings.max_memory.map(|mem| mem / 2),
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        let mut obkv_buffer = Vec::new();
 | 
			
		||||
        let mut original_obkv_buffer = Vec::new();
 | 
			
		||||
        let mut flattened_obkv_buffer = Vec::new();
 | 
			
		||||
        let mut document_sorter_key_buffer = Vec::new();
 | 
			
		||||
        let mut document_sorter_value_buffer = Vec::new();
 | 
			
		||||
        for result in self.index.external_documents_ids().iter(wtxn)? {
 | 
			
		||||
            let (external_id, docid) = result?;
 | 
			
		||||
            let obkv = self.index.documents.get(wtxn, &docid)?.ok_or(
 | 
			
		||||
            let old_obkv = self.index.documents.get(wtxn, &docid)?.ok_or(
 | 
			
		||||
                InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None },
 | 
			
		||||
            )?;
 | 
			
		||||
 | 
			
		||||
            obkv_buffer.clear();
 | 
			
		||||
            let mut obkv_writer = KvWriter::<_, FieldId>::new(&mut obkv_buffer);
 | 
			
		||||
 | 
			
		||||
            // We iterate over the new `FieldsIdsMap` ids in order and construct the new obkv.
 | 
			
		||||
            for (id, name) in new_fields_ids_map.iter() {
 | 
			
		||||
                if let Some(val) = old_fields_ids_map.id(name).and_then(|id| obkv.get(id)) {
 | 
			
		||||
                    obkv_writer.insert(id, val)?;
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            let buffer = obkv_writer.into_inner()?;
 | 
			
		||||
            Self::rebind_existing_document(
 | 
			
		||||
                old_obkv,
 | 
			
		||||
                &settings_diff,
 | 
			
		||||
                &mut original_obkv_buffer,
 | 
			
		||||
                &mut flattened_obkv_buffer,
 | 
			
		||||
            )?;
 | 
			
		||||
 | 
			
		||||
            document_sorter_key_buffer.clear();
 | 
			
		||||
            document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes());
 | 
			
		||||
            document_sorter_key_buffer.extend_from_slice(external_id.as_bytes());
 | 
			
		||||
            document_sorter_value_buffer.clear();
 | 
			
		||||
            into_del_add_obkv(
 | 
			
		||||
                KvReaderU16::new(buffer),
 | 
			
		||||
                DelAddOperation::DeletionAndAddition,
 | 
			
		||||
                &mut document_sorter_value_buffer,
 | 
			
		||||
            )?;
 | 
			
		||||
            original_sorter.insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?;
 | 
			
		||||
 | 
			
		||||
            // Once we have the document. We're going to flatten it
 | 
			
		||||
            // and insert it in the flattened sorter.
 | 
			
		||||
            let mut doc = serde_json::Map::new();
 | 
			
		||||
 | 
			
		||||
            let reader = obkv::KvReader::new(buffer);
 | 
			
		||||
            for (k, v) in reader.iter() {
 | 
			
		||||
                let key = new_fields_ids_map.name(k).ok_or(FieldIdMapMissingEntry::FieldId {
 | 
			
		||||
                    field_id: k,
 | 
			
		||||
                    process: "Accessing field distribution in transform.",
 | 
			
		||||
                })?;
 | 
			
		||||
                let value = serde_json::from_slice::<serde_json::Value>(v)
 | 
			
		||||
                    .map_err(InternalError::SerdeJson)?;
 | 
			
		||||
                doc.insert(key.to_string(), value);
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            let flattened = flatten_serde_json::flatten(&doc);
 | 
			
		||||
 | 
			
		||||
            // Once we have the flattened version we can convert it back to obkv and
 | 
			
		||||
            // insert all the new generated fields_ids (if any) in the fields ids map.
 | 
			
		||||
            let mut buffer: Vec<u8> = Vec::new();
 | 
			
		||||
            let mut writer = KvWriter::new(&mut buffer);
 | 
			
		||||
            let mut flattened: Vec<_> = flattened.into_iter().collect();
 | 
			
		||||
            // we reorder the field to get all the known field first
 | 
			
		||||
            flattened.sort_unstable_by_key(|(key, _)| {
 | 
			
		||||
                new_fields_ids_map.id(key).unwrap_or(FieldId::MAX)
 | 
			
		||||
            });
 | 
			
		||||
 | 
			
		||||
            for (key, value) in flattened {
 | 
			
		||||
                let fid =
 | 
			
		||||
                    new_fields_ids_map.insert(&key).ok_or(UserError::AttributeLimitReached)?;
 | 
			
		||||
                let value = serde_json::to_vec(&value).map_err(InternalError::SerdeJson)?;
 | 
			
		||||
                writer.insert(fid, &value)?;
 | 
			
		||||
            }
 | 
			
		||||
            document_sorter_value_buffer.clear();
 | 
			
		||||
            into_del_add_obkv(
 | 
			
		||||
                KvReaderU16::new(&buffer),
 | 
			
		||||
                DelAddOperation::DeletionAndAddition,
 | 
			
		||||
                &mut document_sorter_value_buffer,
 | 
			
		||||
            )?;
 | 
			
		||||
            flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_value_buffer)?;
 | 
			
		||||
            original_sorter.insert(&document_sorter_key_buffer, &original_obkv_buffer)?;
 | 
			
		||||
            flattened_sorter.insert(docid.to_be_bytes(), &flattened_obkv_buffer)?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let grenad_params = GrenadParameters {
 | 
			
		||||
@@ -934,19 +939,14 @@ impl<'a, 'i> Transform<'a, 'i> {
 | 
			
		||||
 | 
			
		||||
        let flattened_documents = sorter_into_reader(flattened_sorter, grenad_params)?;
 | 
			
		||||
 | 
			
		||||
        let output = TransformOutput {
 | 
			
		||||
        Ok(TransformOutput {
 | 
			
		||||
            primary_key,
 | 
			
		||||
            fields_ids_map: new_fields_ids_map,
 | 
			
		||||
            field_distribution,
 | 
			
		||||
            settings_diff,
 | 
			
		||||
            documents_count,
 | 
			
		||||
            original_documents: original_documents.into_inner().into_inner(),
 | 
			
		||||
            flattened_documents: flattened_documents.into_inner().into_inner(),
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        let new_facets = output.compute_real_facets(wtxn, self.index)?;
 | 
			
		||||
        self.index.put_faceted_fields(wtxn, &new_facets)?;
 | 
			
		||||
 | 
			
		||||
        Ok(output)
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -961,20 +961,6 @@ fn drop_and_reuse<U, T>(mut vec: Vec<U>) -> Vec<T> {
 | 
			
		||||
    vec.into_iter().map(|_| unreachable!()).collect()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl TransformOutput {
 | 
			
		||||
    // find and insert the new field ids
 | 
			
		||||
    pub fn compute_real_facets(&self, rtxn: &RoTxn, index: &Index) -> Result<HashSet<String>> {
 | 
			
		||||
        let user_defined_facets = index.user_defined_faceted_fields(rtxn)?;
 | 
			
		||||
 | 
			
		||||
        Ok(self
 | 
			
		||||
            .fields_ids_map
 | 
			
		||||
            .names()
 | 
			
		||||
            .filter(|&field| crate::is_faceted(field, &user_defined_facets))
 | 
			
		||||
            .map(|field| field.to_string())
 | 
			
		||||
            .collect())
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[cfg(test)]
 | 
			
		||||
mod test {
 | 
			
		||||
    use super::*;
 | 
			
		||||
 
 | 
			
		||||
@@ -385,14 +385,14 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
 | 
			
		||||
 | 
			
		||||
    #[tracing::instrument(
 | 
			
		||||
        level = "trace"
 | 
			
		||||
        skip(self, progress_callback, should_abort, old_fields_ids_map),
 | 
			
		||||
        skip(self, progress_callback, should_abort, settings_diff),
 | 
			
		||||
        target = "indexing::documents"
 | 
			
		||||
    )]
 | 
			
		||||
    fn reindex<FP, FA>(
 | 
			
		||||
        &mut self,
 | 
			
		||||
        progress_callback: &FP,
 | 
			
		||||
        should_abort: &FA,
 | 
			
		||||
        old_fields_ids_map: FieldsIdsMap,
 | 
			
		||||
        settings_diff: InnerIndexSettingsDiff,
 | 
			
		||||
    ) -> Result<()>
 | 
			
		||||
    where
 | 
			
		||||
        FP: Fn(UpdateIndexingStep) + Sync,
 | 
			
		||||
@@ -416,14 +416,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
 | 
			
		||||
        )?;
 | 
			
		||||
 | 
			
		||||
        // We clear the databases and remap the documents fields based on the new `FieldsIdsMap`.
 | 
			
		||||
        let output = transform.prepare_for_documents_reindexing(
 | 
			
		||||
            self.wtxn,
 | 
			
		||||
            old_fields_ids_map,
 | 
			
		||||
            fields_ids_map,
 | 
			
		||||
        )?;
 | 
			
		||||
 | 
			
		||||
        let embedder_configs = self.index.embedding_configs(self.wtxn)?;
 | 
			
		||||
        let embedders = self.embedders(embedder_configs)?;
 | 
			
		||||
        let output = transform.prepare_for_documents_reindexing(self.wtxn, settings_diff)?;
 | 
			
		||||
 | 
			
		||||
        // We index the generated `TransformOutput` which must contain
 | 
			
		||||
        // all the documents with fields in the newly defined searchable order.
 | 
			
		||||
@@ -436,32 +429,11 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
 | 
			
		||||
            &should_abort,
 | 
			
		||||
        )?;
 | 
			
		||||
 | 
			
		||||
        let indexing_builder = indexing_builder.with_embedders(embedders);
 | 
			
		||||
        indexing_builder.execute_raw(output)?;
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn embedders(
 | 
			
		||||
        &self,
 | 
			
		||||
        embedding_configs: Vec<(String, EmbeddingConfig)>,
 | 
			
		||||
    ) -> Result<EmbeddingConfigs> {
 | 
			
		||||
        let res: Result<_> = embedding_configs
 | 
			
		||||
            .into_iter()
 | 
			
		||||
            .map(|(name, EmbeddingConfig { embedder_options, prompt })| {
 | 
			
		||||
                let prompt = Arc::new(prompt.try_into().map_err(crate::Error::from)?);
 | 
			
		||||
 | 
			
		||||
                let embedder = Arc::new(
 | 
			
		||||
                    Embedder::new(embedder_options.clone())
 | 
			
		||||
                        .map_err(crate::vector::Error::from)
 | 
			
		||||
                        .map_err(crate::Error::from)?,
 | 
			
		||||
                );
 | 
			
		||||
                Ok((name, (embedder, prompt)))
 | 
			
		||||
            })
 | 
			
		||||
            .collect();
 | 
			
		||||
        res.map(EmbeddingConfigs::new)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn update_displayed(&mut self) -> Result<bool> {
 | 
			
		||||
        match self.displayed_fields {
 | 
			
		||||
            Setting::Set(ref fields) => {
 | 
			
		||||
@@ -1067,7 +1039,6 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
 | 
			
		||||
        self.index.set_updated_at(self.wtxn, &OffsetDateTime::now_utc())?;
 | 
			
		||||
 | 
			
		||||
        let old_inner_settings = InnerIndexSettings::from_index(&self.index, &self.wtxn)?;
 | 
			
		||||
        let old_fields_ids_map = self.index.fields_ids_map(self.wtxn)?;
 | 
			
		||||
 | 
			
		||||
        // never trigger re-indexing
 | 
			
		||||
        self.update_displayed()?;
 | 
			
		||||
@@ -1109,47 +1080,19 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        if inner_settings_diff.any_reindexing_needed() {
 | 
			
		||||
            self.reindex(&progress_callback, &should_abort, old_fields_ids_map)?;
 | 
			
		||||
            self.reindex(&progress_callback, &should_abort, inner_settings_diff)?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn update_faceted(
 | 
			
		||||
        &self,
 | 
			
		||||
        existing_fields: HashSet<String>,
 | 
			
		||||
        old_faceted_fields: HashSet<String>,
 | 
			
		||||
    ) -> Result<bool> {
 | 
			
		||||
        if existing_fields.iter().any(|field| field.contains('.')) {
 | 
			
		||||
            return Ok(true);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if old_faceted_fields.iter().any(|field| field.contains('.')) {
 | 
			
		||||
            return Ok(true);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // If there is new faceted fields we indicate that we must reindex as we must
 | 
			
		||||
        // index new fields as facets. It means that the distinct attribute,
 | 
			
		||||
        // an Asc/Desc criterion or a filtered attribute as be added or removed.
 | 
			
		||||
        let new_faceted_fields = self.index.user_defined_faceted_fields(self.wtxn)?;
 | 
			
		||||
 | 
			
		||||
        if new_faceted_fields.iter().any(|field| field.contains('.')) {
 | 
			
		||||
            return Ok(true);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let faceted_updated =
 | 
			
		||||
            (&existing_fields - &old_faceted_fields) != (&existing_fields - &new_faceted_fields);
 | 
			
		||||
 | 
			
		||||
        Ok(faceted_updated)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub(crate) struct InnerIndexSettingsDiff {
 | 
			
		||||
    old: InnerIndexSettings,
 | 
			
		||||
    new: InnerIndexSettings,
 | 
			
		||||
    pub old: InnerIndexSettings,
 | 
			
		||||
    pub new: InnerIndexSettings,
 | 
			
		||||
 | 
			
		||||
    // TODO: compare directly the embedders.
 | 
			
		||||
    embedding_configs_updated: bool,
 | 
			
		||||
    pub embedding_configs_updated: bool,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl InnerIndexSettingsDiff {
 | 
			
		||||
@@ -1167,7 +1110,7 @@ impl InnerIndexSettingsDiff {
 | 
			
		||||
                != self.new.stop_words.as_ref().map(|set| set.as_fst().as_bytes())
 | 
			
		||||
            || self.old.allowed_separators != self.new.allowed_separators
 | 
			
		||||
            || self.old.dictionary != self.new.dictionary
 | 
			
		||||
            || self.old.searchable_fields != self.new.searchable_fields
 | 
			
		||||
            || self.old.user_defined_searchable_fields != self.new.user_defined_searchable_fields
 | 
			
		||||
            || self.old.exact_attributes != self.new.exact_attributes
 | 
			
		||||
            || self.old.proximity_precision != self.new.proximity_precision
 | 
			
		||||
    }
 | 
			
		||||
@@ -1207,33 +1150,38 @@ impl InnerIndexSettingsDiff {
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Clone, Debug)]
 | 
			
		||||
#[derive(Clone)]
 | 
			
		||||
pub(crate) struct InnerIndexSettings {
 | 
			
		||||
    stop_words: Option<fst::Set<Vec<u8>>>,
 | 
			
		||||
    allowed_separators: Option<BTreeSet<String>>,
 | 
			
		||||
    dictionary: Option<BTreeSet<String>>,
 | 
			
		||||
    fields_ids_map: FieldsIdsMap,
 | 
			
		||||
    faceted_fields: HashSet<FieldId>,
 | 
			
		||||
    searchable_fields: Option<BTreeSet<FieldId>>,
 | 
			
		||||
    exact_attributes: HashSet<FieldId>,
 | 
			
		||||
    proximity_precision: ProximityPrecision,
 | 
			
		||||
    embedding_configs: Vec<(String, crate::vector::EmbeddingConfig)>,
 | 
			
		||||
    existing_fields: HashSet<String>,
 | 
			
		||||
    pub stop_words: Option<fst::Set<Vec<u8>>>,
 | 
			
		||||
    pub allowed_separators: Option<BTreeSet<String>>,
 | 
			
		||||
    pub dictionary: Option<BTreeSet<String>>,
 | 
			
		||||
    pub fields_ids_map: FieldsIdsMap,
 | 
			
		||||
    pub user_defined_faceted_fields: HashSet<String>,
 | 
			
		||||
    pub user_defined_searchable_fields: Option<Vec<String>>,
 | 
			
		||||
    pub faceted_fields_ids: HashSet<FieldId>,
 | 
			
		||||
    pub searchable_fields_ids: Option<Vec<FieldId>>,
 | 
			
		||||
    pub exact_attributes: HashSet<FieldId>,
 | 
			
		||||
    pub proximity_precision: ProximityPrecision,
 | 
			
		||||
    pub embedding_configs: EmbeddingConfigs,
 | 
			
		||||
    pub existing_fields: HashSet<String>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl InnerIndexSettings {
 | 
			
		||||
    fn from_index(index: &Index, rtxn: &heed::RoTxn) -> Result<Self> {
 | 
			
		||||
    pub fn from_index(index: &Index, rtxn: &heed::RoTxn) -> Result<Self> {
 | 
			
		||||
        let stop_words = index.stop_words(rtxn)?;
 | 
			
		||||
        let stop_words = stop_words.map(|sw| sw.map_data(Vec::from).unwrap());
 | 
			
		||||
        let allowed_separators = index.allowed_separators(rtxn)?;
 | 
			
		||||
        let dictionary = index.dictionary(rtxn)?;
 | 
			
		||||
        let fields_ids_map = index.fields_ids_map(rtxn)?;
 | 
			
		||||
        let searchable_fields = index.searchable_fields_ids(rtxn)?;
 | 
			
		||||
        let searchable_fields = searchable_fields.map(|sf| sf.into_iter().collect());
 | 
			
		||||
        let faceted_fields = index.faceted_fields_ids(rtxn)?;
 | 
			
		||||
        let user_defined_searchable_fields = index.user_defined_searchable_fields(rtxn)?;
 | 
			
		||||
        let user_defined_searchable_fields =
 | 
			
		||||
            user_defined_searchable_fields.map(|sf| sf.into_iter().map(String::from).collect());
 | 
			
		||||
        let user_defined_faceted_fields = index.user_defined_faceted_fields(rtxn)?;
 | 
			
		||||
        let searchable_fields_ids = index.searchable_fields_ids(rtxn)?;
 | 
			
		||||
        let faceted_fields_ids = index.faceted_fields_ids(rtxn)?;
 | 
			
		||||
        let exact_attributes = index.exact_attributes_ids(rtxn)?;
 | 
			
		||||
        let proximity_precision = index.proximity_precision(rtxn)?.unwrap_or_default();
 | 
			
		||||
        let embedding_configs = index.embedding_configs(rtxn)?;
 | 
			
		||||
        let embedding_configs = embedders(index.embedding_configs(rtxn)?)?;
 | 
			
		||||
        let existing_fields: HashSet<_> = index
 | 
			
		||||
            .field_distribution(rtxn)?
 | 
			
		||||
            .into_iter()
 | 
			
		||||
@@ -1245,14 +1193,65 @@ impl InnerIndexSettings {
 | 
			
		||||
            allowed_separators,
 | 
			
		||||
            dictionary,
 | 
			
		||||
            fields_ids_map,
 | 
			
		||||
            faceted_fields,
 | 
			
		||||
            searchable_fields,
 | 
			
		||||
            user_defined_faceted_fields,
 | 
			
		||||
            user_defined_searchable_fields,
 | 
			
		||||
            faceted_fields_ids,
 | 
			
		||||
            searchable_fields_ids,
 | 
			
		||||
            exact_attributes,
 | 
			
		||||
            proximity_precision,
 | 
			
		||||
            embedding_configs,
 | 
			
		||||
            existing_fields,
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // find and insert the new field ids
 | 
			
		||||
    pub fn recompute_facets(&mut self, wtxn: &mut heed::RwTxn, index: &Index) -> Result<()> {
 | 
			
		||||
        let new_facets = self
 | 
			
		||||
            .fields_ids_map
 | 
			
		||||
            .names()
 | 
			
		||||
            .filter(|&field| crate::is_faceted(field, &self.user_defined_faceted_fields))
 | 
			
		||||
            .map(|field| field.to_string())
 | 
			
		||||
            .collect();
 | 
			
		||||
        index.put_faceted_fields(wtxn, &new_facets)?;
 | 
			
		||||
 | 
			
		||||
        self.faceted_fields_ids = index.faceted_fields_ids(wtxn)?;
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // find and insert the new field ids
 | 
			
		||||
    pub fn recompute_searchables(&mut self, wtxn: &mut heed::RwTxn, index: &Index) -> Result<()> {
 | 
			
		||||
        // in case new fields were introduced we're going to recreate the searchable fields.
 | 
			
		||||
        if let Some(searchable_fields) = self.user_defined_searchable_fields.as_ref() {
 | 
			
		||||
            let searchable_fields =
 | 
			
		||||
                searchable_fields.iter().map(String::as_ref).collect::<Vec<_>>();
 | 
			
		||||
            index.put_all_searchable_fields_from_fields_ids_map(
 | 
			
		||||
                wtxn,
 | 
			
		||||
                &searchable_fields,
 | 
			
		||||
                &self.fields_ids_map,
 | 
			
		||||
            )?;
 | 
			
		||||
            let searchable_fields_ids = index.searchable_fields_ids(wtxn)?;
 | 
			
		||||
            self.searchable_fields_ids = searchable_fields_ids;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn embedders(embedding_configs: Vec<(String, EmbeddingConfig)>) -> Result<EmbeddingConfigs> {
 | 
			
		||||
    let res: Result<_> = embedding_configs
 | 
			
		||||
        .into_iter()
 | 
			
		||||
        .map(|(name, EmbeddingConfig { embedder_options, prompt })| {
 | 
			
		||||
            let prompt = Arc::new(prompt.try_into().map_err(crate::Error::from)?);
 | 
			
		||||
 | 
			
		||||
            let embedder = Arc::new(
 | 
			
		||||
                Embedder::new(embedder_options.clone())
 | 
			
		||||
                    .map_err(crate::vector::Error::from)
 | 
			
		||||
                    .map_err(crate::Error::from)?,
 | 
			
		||||
            );
 | 
			
		||||
            Ok((name, (embedder, prompt)))
 | 
			
		||||
        })
 | 
			
		||||
        .collect();
 | 
			
		||||
    res.map(EmbeddingConfigs::new)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn validate_prompt(
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user