mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-30 23:46:28 +00:00 
			
		
		
		
	Merge #4621
4621: Bring back changes from v1.8.0 into main r=curquiza a=curquiza Co-authored-by: ManyTheFish <many@meilisearch.com> Co-authored-by: Tamo <tamo@meilisearch.com> Co-authored-by: meili-bors[bot] <89034592+meili-bors[bot]@users.noreply.github.com> Co-authored-by: Clément Renault <clement@meilisearch.com>
This commit is contained in:
		| @@ -17,7 +17,7 @@ bincode = "1.3.3" | ||||
| bstr = "1.9.0" | ||||
| bytemuck = { version = "1.14.0", features = ["extern_crate_alloc"] } | ||||
| byteorder = "1.5.0" | ||||
| charabia = { version = "0.8.8", default-features = false } | ||||
| charabia = { version = "0.8.10", default-features = false } | ||||
| concat-arrays = "0.1.2" | ||||
| crossbeam-channel = "0.5.11" | ||||
| deserr = "0.6.1" | ||||
| @@ -115,6 +115,7 @@ lmdb-posix-sem = ["heed/posix-sem"] | ||||
|  | ||||
| # allow chinese specialized tokenization | ||||
| chinese = ["charabia/chinese"] | ||||
| chinese-pinyin = ["chinese", "charabia/chinese-normalization-pinyin"] | ||||
|  | ||||
| # allow hebrew specialized tokenization | ||||
| hebrew = ["charabia/hebrew"] | ||||
| @@ -135,7 +136,11 @@ greek = ["charabia/greek"] | ||||
| # allow khmer specialized tokenization | ||||
| khmer = ["charabia/khmer"] | ||||
|  | ||||
| # allow vietnamese specialized tokenization | ||||
| vietnamese = ["charabia/vietnamese"] | ||||
|  | ||||
| # force swedish character recomposition | ||||
| swedish-recomposition = ["charabia/swedish-recomposition"] | ||||
|  | ||||
| # allow CUDA support, see <https://github.com/meilisearch/meilisearch/issues/4306> | ||||
| cuda = ["candle-core/cuda"] | ||||
|   | ||||
| @@ -9,6 +9,7 @@ use serde_json::Value; | ||||
| use thiserror::Error; | ||||
|  | ||||
| use crate::documents::{self, DocumentsBatchCursorError}; | ||||
| use crate::thread_pool_no_abort::PanicCatched; | ||||
| use crate::{CriterionError, DocumentId, FieldId, Object, SortError}; | ||||
|  | ||||
| pub fn is_reserved_keyword(keyword: &str) -> bool { | ||||
| @@ -39,17 +40,19 @@ pub enum InternalError { | ||||
|     Fst(#[from] fst::Error), | ||||
|     #[error(transparent)] | ||||
|     DocumentsError(#[from] documents::Error), | ||||
|     #[error("Invalid compression type have been specified to grenad.")] | ||||
|     #[error("Invalid compression type have been specified to grenad")] | ||||
|     GrenadInvalidCompressionType, | ||||
|     #[error("Invalid grenad file with an invalid version format.")] | ||||
|     #[error("Invalid grenad file with an invalid version format")] | ||||
|     GrenadInvalidFormatVersion, | ||||
|     #[error("Invalid merge while processing {process}.")] | ||||
|     #[error("Invalid merge while processing {process}")] | ||||
|     IndexingMergingKeys { process: &'static str }, | ||||
|     #[error("{}", HeedError::InvalidDatabaseTyping)] | ||||
|     InvalidDatabaseTyping, | ||||
|     #[error(transparent)] | ||||
|     RayonThreadPool(#[from] ThreadPoolBuildError), | ||||
|     #[error(transparent)] | ||||
|     PanicInThreadPool(#[from] PanicCatched), | ||||
|     #[error(transparent)] | ||||
|     SerdeJson(#[from] serde_json::Error), | ||||
|     #[error(transparent)] | ||||
|     Serialization(#[from] SerializationError), | ||||
| @@ -57,9 +60,9 @@ pub enum InternalError { | ||||
|     Store(#[from] MdbError), | ||||
|     #[error(transparent)] | ||||
|     Utf8(#[from] str::Utf8Error), | ||||
|     #[error("An indexation process was explicitly aborted.")] | ||||
|     #[error("An indexation process was explicitly aborted")] | ||||
|     AbortedIndexation, | ||||
|     #[error("The matching words list contains at least one invalid member.")] | ||||
|     #[error("The matching words list contains at least one invalid member")] | ||||
|     InvalidMatchingWords, | ||||
|     #[error(transparent)] | ||||
|     ArroyError(#[from] arroy::Error), | ||||
|   | ||||
| @@ -678,6 +678,23 @@ impl Index { | ||||
|             .get(rtxn, main_key::USER_DEFINED_SEARCHABLE_FIELDS_KEY) | ||||
|     } | ||||
|  | ||||
|     /// Identical to `user_defined_searchable_fields`, but returns ids instead. | ||||
|     pub fn user_defined_searchable_fields_ids(&self, rtxn: &RoTxn) -> Result<Option<Vec<FieldId>>> { | ||||
|         match self.user_defined_searchable_fields(rtxn)? { | ||||
|             Some(fields) => { | ||||
|                 let fields_ids_map = self.fields_ids_map(rtxn)?; | ||||
|                 let mut fields_ids = Vec::new(); | ||||
|                 for name in fields { | ||||
|                     if let Some(field_id) = fields_ids_map.id(name) { | ||||
|                         fields_ids.push(field_id); | ||||
|                     } | ||||
|                 } | ||||
|                 Ok(Some(fields_ids)) | ||||
|             } | ||||
|             None => Ok(None), | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /* filterable fields */ | ||||
|  | ||||
|     /// Writes the filterable fields names in the database. | ||||
| @@ -824,11 +841,11 @@ impl Index { | ||||
|  | ||||
|     /// Identical to `user_defined_faceted_fields`, but returns ids instead. | ||||
|     pub fn user_defined_faceted_fields_ids(&self, rtxn: &RoTxn) -> Result<HashSet<FieldId>> { | ||||
|         let fields = self.faceted_fields(rtxn)?; | ||||
|         let fields = self.user_defined_faceted_fields(rtxn)?; | ||||
|         let fields_ids_map = self.fields_ids_map(rtxn)?; | ||||
|  | ||||
|         let mut fields_ids = HashSet::new(); | ||||
|         for name in fields.into_iter() { | ||||
|         for name in fields { | ||||
|             if let Some(field_id) = fields_ids_map.id(&name) { | ||||
|                 fields_ids.insert(field_id); | ||||
|             } | ||||
|   | ||||
| @@ -21,6 +21,7 @@ pub mod prompt; | ||||
| pub mod proximity; | ||||
| pub mod score_details; | ||||
| mod search; | ||||
| mod thread_pool_no_abort; | ||||
| pub mod update; | ||||
| pub mod vector; | ||||
|  | ||||
| @@ -42,6 +43,7 @@ pub use search::new::{ | ||||
|     SearchLogger, VisualSearchLogger, | ||||
| }; | ||||
| use serde_json::Value; | ||||
| pub use thread_pool_no_abort::{PanicCatched, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder}; | ||||
| pub use {charabia as tokenizer, heed}; | ||||
|  | ||||
| pub use self::asc_desc::{AscDesc, AscDescError, Member, SortError}; | ||||
| @@ -128,7 +130,7 @@ impl fmt::Debug for TimeBudget { | ||||
|  | ||||
| impl Default for TimeBudget { | ||||
|     fn default() -> Self { | ||||
|         Self::new(std::time::Duration::from_millis(150)) | ||||
|         Self::new(std::time::Duration::from_millis(1500)) | ||||
|     } | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -97,6 +97,7 @@ impl<'a> FacetDistribution<'a> { | ||||
|     ) -> heed::Result<()> { | ||||
|         match facet_type { | ||||
|             FacetType::Number => { | ||||
|                 let mut lexicographic_distribution = BTreeMap::new(); | ||||
|                 let mut key_buffer: Vec<_> = field_id.to_be_bytes().to_vec(); | ||||
|  | ||||
|                 let distribution_prelength = distribution.len(); | ||||
| @@ -111,14 +112,17 @@ impl<'a> FacetDistribution<'a> { | ||||
|  | ||||
|                     for result in iter { | ||||
|                         let ((_, _, value), ()) = result?; | ||||
|                         *distribution.entry(value.to_string()).or_insert(0) += 1; | ||||
|                         *lexicographic_distribution.entry(value.to_string()).or_insert(0) += 1; | ||||
|  | ||||
|                         if distribution.len() - distribution_prelength == self.max_values_per_facet | ||||
|                         if lexicographic_distribution.len() - distribution_prelength | ||||
|                             == self.max_values_per_facet | ||||
|                         { | ||||
|                             break; | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|  | ||||
|                 distribution.extend(lexicographic_distribution); | ||||
|             } | ||||
|             FacetType::String => { | ||||
|                 let mut normalized_distribution = BTreeMap::new(); | ||||
|   | ||||
							
								
								
									
										69
									
								
								milli/src/thread_pool_no_abort.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										69
									
								
								milli/src/thread_pool_no_abort.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,69 @@ | ||||
| use std::sync::atomic::{AtomicBool, Ordering}; | ||||
| use std::sync::Arc; | ||||
|  | ||||
| use rayon::{ThreadPool, ThreadPoolBuilder}; | ||||
| use thiserror::Error; | ||||
|  | ||||
| /// A rayon ThreadPool wrapper that can catch panics in the pool | ||||
| /// and modifies the install function accordingly. | ||||
| #[derive(Debug)] | ||||
| pub struct ThreadPoolNoAbort { | ||||
|     thread_pool: ThreadPool, | ||||
|     /// Set to true if the thread pool catched a panic. | ||||
|     pool_catched_panic: Arc<AtomicBool>, | ||||
| } | ||||
|  | ||||
| impl ThreadPoolNoAbort { | ||||
|     pub fn install<OP, R>(&self, op: OP) -> Result<R, PanicCatched> | ||||
|     where | ||||
|         OP: FnOnce() -> R + Send, | ||||
|         R: Send, | ||||
|     { | ||||
|         let output = self.thread_pool.install(op); | ||||
|         // While reseting the pool panic catcher we return an error if we catched one. | ||||
|         if self.pool_catched_panic.swap(false, Ordering::SeqCst) { | ||||
|             Err(PanicCatched) | ||||
|         } else { | ||||
|             Ok(output) | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn current_num_threads(&self) -> usize { | ||||
|         self.thread_pool.current_num_threads() | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[derive(Error, Debug)] | ||||
| #[error("A panic occured. Read the logs to find more information about it")] | ||||
| pub struct PanicCatched; | ||||
|  | ||||
| #[derive(Default)] | ||||
| pub struct ThreadPoolNoAbortBuilder(ThreadPoolBuilder); | ||||
|  | ||||
| impl ThreadPoolNoAbortBuilder { | ||||
|     pub fn new() -> ThreadPoolNoAbortBuilder { | ||||
|         ThreadPoolNoAbortBuilder::default() | ||||
|     } | ||||
|  | ||||
|     pub fn thread_name<F>(mut self, closure: F) -> Self | ||||
|     where | ||||
|         F: FnMut(usize) -> String + 'static, | ||||
|     { | ||||
|         self.0 = self.0.thread_name(closure); | ||||
|         self | ||||
|     } | ||||
|  | ||||
|     pub fn num_threads(mut self, num_threads: usize) -> ThreadPoolNoAbortBuilder { | ||||
|         self.0 = self.0.num_threads(num_threads); | ||||
|         self | ||||
|     } | ||||
|  | ||||
|     pub fn build(mut self) -> Result<ThreadPoolNoAbort, rayon::ThreadPoolBuildError> { | ||||
|         let pool_catched_panic = Arc::new(AtomicBool::new(false)); | ||||
|         self.0 = self.0.panic_handler({ | ||||
|             let catched_panic = pool_catched_panic.clone(); | ||||
|             move |_result| catched_panic.store(true, Ordering::SeqCst) | ||||
|         }); | ||||
|         Ok(ThreadPoolNoAbort { thread_pool: self.0.build()?, pool_catched_panic }) | ||||
|     } | ||||
| } | ||||
| @@ -71,8 +71,8 @@ pub enum DelAddOperation { | ||||
| /// putting each deletion obkv's keys under an DelAdd::Deletion | ||||
| /// and putting each addition obkv's keys under an DelAdd::Addition | ||||
| pub fn del_add_from_two_obkvs<K: obkv::Key + PartialOrd + Ord>( | ||||
|     deletion: obkv::KvReader<K>, | ||||
|     addition: obkv::KvReader<K>, | ||||
|     deletion: &obkv::KvReader<K>, | ||||
|     addition: &obkv::KvReader<K>, | ||||
|     buffer: &mut Vec<u8>, | ||||
| ) -> Result<(), std::io::Error> { | ||||
|     use itertools::merge_join_by; | ||||
|   | ||||
| @@ -1,4 +1,4 @@ | ||||
| use std::collections::{HashMap, HashSet}; | ||||
| use std::collections::HashMap; | ||||
| use std::convert::TryInto; | ||||
| use std::fs::File; | ||||
| use std::io::BufReader; | ||||
| @@ -12,6 +12,7 @@ use serde_json::Value; | ||||
| use super::helpers::{create_sorter, keep_latest_obkv, sorter_into_reader, GrenadParameters}; | ||||
| use crate::error::{InternalError, SerializationError}; | ||||
| use crate::update::del_add::{del_add_from_two_obkvs, DelAdd, KvReaderDelAdd}; | ||||
| use crate::update::settings::{InnerIndexSettings, InnerIndexSettingsDiff}; | ||||
| use crate::{FieldId, Result, MAX_POSITION_PER_ATTRIBUTE, MAX_WORD_LENGTH}; | ||||
|  | ||||
| pub type ScriptLanguageDocidsMap = HashMap<(Script, Language), (RoaringBitmap, RoaringBitmap)>; | ||||
| @@ -25,10 +26,7 @@ pub type ScriptLanguageDocidsMap = HashMap<(Script, Language), (RoaringBitmap, R | ||||
| pub fn extract_docid_word_positions<R: io::Read + io::Seek>( | ||||
|     obkv_documents: grenad::Reader<R>, | ||||
|     indexer: GrenadParameters, | ||||
|     searchable_fields: &Option<HashSet<FieldId>>, | ||||
|     stop_words: Option<&fst::Set<Vec<u8>>>, | ||||
|     allowed_separators: Option<&[&str]>, | ||||
|     dictionary: Option<&[&str]>, | ||||
|     settings_diff: &InnerIndexSettingsDiff, | ||||
|     max_positions_per_attributes: Option<u32>, | ||||
| ) -> Result<(grenad::Reader<BufReader<File>>, ScriptLanguageDocidsMap)> { | ||||
|     puffin::profile_function!(); | ||||
| @@ -36,6 +34,7 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>( | ||||
|     let max_positions_per_attributes = max_positions_per_attributes | ||||
|         .map_or(MAX_POSITION_PER_ATTRIBUTE, |max| max.min(MAX_POSITION_PER_ATTRIBUTE)); | ||||
|     let max_memory = indexer.max_memory_by_thread(); | ||||
|     let force_reindexing = settings_diff.reindex_searchable(); | ||||
|  | ||||
|     // initialize destination values. | ||||
|     let mut documents_ids = RoaringBitmap::new(); | ||||
| @@ -56,8 +55,37 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>( | ||||
|     let mut value_buffer = Vec::new(); | ||||
|  | ||||
|     // initialize tokenizer. | ||||
|     let mut builder = tokenizer_builder(stop_words, allowed_separators, dictionary, None); | ||||
|     let tokenizer = builder.build(); | ||||
|     let old_stop_words = settings_diff.old.stop_words.as_ref(); | ||||
|     let old_separators: Option<Vec<_>> = settings_diff | ||||
|         .old | ||||
|         .allowed_separators | ||||
|         .as_ref() | ||||
|         .map(|s| s.iter().map(String::as_str).collect()); | ||||
|     let old_dictionary: Option<Vec<_>> = | ||||
|         settings_diff.old.dictionary.as_ref().map(|s| s.iter().map(String::as_str).collect()); | ||||
|     let mut del_builder = tokenizer_builder( | ||||
|         old_stop_words, | ||||
|         old_separators.as_deref(), | ||||
|         old_dictionary.as_deref(), | ||||
|         None, | ||||
|     ); | ||||
|     let del_tokenizer = del_builder.build(); | ||||
|  | ||||
|     let new_stop_words = settings_diff.new.stop_words.as_ref(); | ||||
|     let new_separators: Option<Vec<_>> = settings_diff | ||||
|         .new | ||||
|         .allowed_separators | ||||
|         .as_ref() | ||||
|         .map(|s| s.iter().map(String::as_str).collect()); | ||||
|     let new_dictionary: Option<Vec<_>> = | ||||
|         settings_diff.new.dictionary.as_ref().map(|s| s.iter().map(String::as_str).collect()); | ||||
|     let mut add_builder = tokenizer_builder( | ||||
|         new_stop_words, | ||||
|         new_separators.as_deref(), | ||||
|         new_dictionary.as_deref(), | ||||
|         None, | ||||
|     ); | ||||
|     let add_tokenizer = add_builder.build(); | ||||
|  | ||||
|     // iterate over documents. | ||||
|     let mut cursor = obkv_documents.into_cursor()?; | ||||
| @@ -69,7 +97,7 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>( | ||||
|         let obkv = KvReader::<FieldId>::new(value); | ||||
|  | ||||
|         // if the searchable fields didn't change, skip the searchable indexing for this document. | ||||
|         if !searchable_fields_changed(&KvReader::<FieldId>::new(value), searchable_fields) { | ||||
|         if !force_reindexing && !searchable_fields_changed(&obkv, settings_diff) { | ||||
|             continue; | ||||
|         } | ||||
|  | ||||
| @@ -85,11 +113,8 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>( | ||||
|                 // deletions | ||||
|                 lang_safe_tokens_from_document( | ||||
|                     &obkv, | ||||
|                     searchable_fields, | ||||
|                     &tokenizer, | ||||
|                     stop_words, | ||||
|                     allowed_separators, | ||||
|                     dictionary, | ||||
|                     &settings_diff.old, | ||||
|                     &del_tokenizer, | ||||
|                     max_positions_per_attributes, | ||||
|                     DelAdd::Deletion, | ||||
|                     &mut del_buffers, | ||||
| @@ -99,11 +124,8 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>( | ||||
|                 // additions | ||||
|                 lang_safe_tokens_from_document( | ||||
|                     &obkv, | ||||
|                     searchable_fields, | ||||
|                     &tokenizer, | ||||
|                     stop_words, | ||||
|                     allowed_separators, | ||||
|                     dictionary, | ||||
|                     &settings_diff.new, | ||||
|                     &add_tokenizer, | ||||
|                     max_positions_per_attributes, | ||||
|                     DelAdd::Addition, | ||||
|                     &mut add_buffers, | ||||
| @@ -118,8 +140,8 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>( | ||||
|         // transforming two KV<FieldId, KV<u16, String>> into one KV<FieldId, KV<DelAdd, KV<u16, String>>> | ||||
|         value_buffer.clear(); | ||||
|         del_add_from_two_obkvs( | ||||
|             KvReader::<FieldId>::new(del_obkv), | ||||
|             KvReader::<FieldId>::new(add_obkv), | ||||
|             &KvReader::<FieldId>::new(del_obkv), | ||||
|             &KvReader::<FieldId>::new(add_obkv), | ||||
|             &mut value_buffer, | ||||
|         )?; | ||||
|  | ||||
| @@ -160,8 +182,9 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>( | ||||
| /// Check if any searchable fields of a document changed. | ||||
| fn searchable_fields_changed( | ||||
|     obkv: &KvReader<FieldId>, | ||||
|     searchable_fields: &Option<HashSet<FieldId>>, | ||||
|     settings_diff: &InnerIndexSettingsDiff, | ||||
| ) -> bool { | ||||
|     let searchable_fields = &settings_diff.new.searchable_fields_ids; | ||||
|     for (field_id, field_bytes) in obkv.iter() { | ||||
|         if searchable_fields.as_ref().map_or(true, |sf| sf.contains(&field_id)) { | ||||
|             let del_add = KvReaderDelAdd::new(field_bytes); | ||||
| @@ -206,14 +229,10 @@ fn tokenizer_builder<'a>( | ||||
|  | ||||
| /// Extract words mapped with their positions of a document, | ||||
| /// ensuring no Language detection mistakes was made. | ||||
| #[allow(clippy::too_many_arguments)] // FIXME: consider grouping arguments in a struct | ||||
| fn lang_safe_tokens_from_document<'a>( | ||||
|     obkv: &KvReader<FieldId>, | ||||
|     searchable_fields: &Option<HashSet<FieldId>>, | ||||
|     settings: &InnerIndexSettings, | ||||
|     tokenizer: &Tokenizer, | ||||
|     stop_words: Option<&fst::Set<Vec<u8>>>, | ||||
|     allowed_separators: Option<&[&str]>, | ||||
|     dictionary: Option<&[&str]>, | ||||
|     max_positions_per_attributes: u32, | ||||
|     del_add: DelAdd, | ||||
|     buffers: &'a mut Buffers, | ||||
| @@ -222,7 +241,7 @@ fn lang_safe_tokens_from_document<'a>( | ||||
|  | ||||
|     tokens_from_document( | ||||
|         obkv, | ||||
|         searchable_fields, | ||||
|         &settings.searchable_fields_ids, | ||||
|         tokenizer, | ||||
|         max_positions_per_attributes, | ||||
|         del_add, | ||||
| @@ -246,12 +265,15 @@ fn lang_safe_tokens_from_document<'a>( | ||||
|         // then we don't rerun the extraction. | ||||
|         if !script_language.is_empty() { | ||||
|             // build a new temporary tokenizer including the allow list. | ||||
|             let mut builder = tokenizer_builder( | ||||
|                 stop_words, | ||||
|                 allowed_separators, | ||||
|                 dictionary, | ||||
|                 Some(&script_language), | ||||
|             ); | ||||
|             let stop_words = settings.stop_words.as_ref(); | ||||
|             let separators: Option<Vec<_>> = settings | ||||
|                 .allowed_separators | ||||
|                 .as_ref() | ||||
|                 .map(|s| s.iter().map(String::as_str).collect()); | ||||
|             let dictionary: Option<Vec<_>> = | ||||
|                 settings.dictionary.as_ref().map(|s| s.iter().map(String::as_str).collect()); | ||||
|             let mut builder = | ||||
|                 tokenizer_builder(stop_words, separators.as_deref(), dictionary.as_deref(), None); | ||||
|             let tokenizer = builder.build(); | ||||
|  | ||||
|             script_language_word_count.clear(); | ||||
| @@ -259,7 +281,7 @@ fn lang_safe_tokens_from_document<'a>( | ||||
|             // rerun the extraction. | ||||
|             tokens_from_document( | ||||
|                 obkv, | ||||
|                 searchable_fields, | ||||
|                 &settings.searchable_fields_ids, | ||||
|                 &tokenizer, | ||||
|                 max_positions_per_attributes, | ||||
|                 del_add, | ||||
| @@ -276,7 +298,7 @@ fn lang_safe_tokens_from_document<'a>( | ||||
| /// Extract words mapped with their positions of a document. | ||||
| fn tokens_from_document<'a>( | ||||
|     obkv: &KvReader<FieldId>, | ||||
|     searchable_fields: &Option<HashSet<FieldId>>, | ||||
|     searchable_fields: &Option<Vec<FieldId>>, | ||||
|     tokenizer: &Tokenizer, | ||||
|     max_positions_per_attributes: u32, | ||||
|     del_add: DelAdd, | ||||
|   | ||||
| @@ -10,6 +10,7 @@ use crate::heed_codec::facet::{ | ||||
|     FacetGroupKey, FacetGroupKeyCodec, FieldDocIdFacetF64Codec, OrderedF64Codec, | ||||
| }; | ||||
| use crate::update::del_add::{KvReaderDelAdd, KvWriterDelAdd}; | ||||
| use crate::update::settings::InnerIndexSettingsDiff; | ||||
| use crate::Result; | ||||
|  | ||||
| /// Extracts the facet number and the documents ids where this facet number appear. | ||||
| @@ -20,6 +21,7 @@ use crate::Result; | ||||
| pub fn extract_facet_number_docids<R: io::Read + io::Seek>( | ||||
|     fid_docid_facet_number: grenad::Reader<R>, | ||||
|     indexer: GrenadParameters, | ||||
|     _settings_diff: &InnerIndexSettingsDiff, | ||||
| ) -> Result<grenad::Reader<BufReader<File>>> { | ||||
|     puffin::profile_function!(); | ||||
|  | ||||
|   | ||||
| @@ -15,6 +15,7 @@ use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; | ||||
| use crate::update::index_documents::helpers::{ | ||||
|     merge_deladd_btreeset_string, merge_deladd_cbo_roaring_bitmaps, | ||||
| }; | ||||
| use crate::update::settings::InnerIndexSettingsDiff; | ||||
| use crate::{FieldId, Result, MAX_FACET_VALUE_LENGTH}; | ||||
|  | ||||
| /// Extracts the facet string and the documents ids where this facet string appear. | ||||
| @@ -25,6 +26,7 @@ use crate::{FieldId, Result, MAX_FACET_VALUE_LENGTH}; | ||||
| pub fn extract_facet_string_docids<R: io::Read + io::Seek>( | ||||
|     docid_fid_facet_string: grenad::Reader<R>, | ||||
|     indexer: GrenadParameters, | ||||
|     _settings_diff: &InnerIndexSettingsDiff, | ||||
| ) -> Result<(grenad::Reader<BufReader<File>>, grenad::Reader<BufReader<File>>)> { | ||||
|     puffin::profile_function!(); | ||||
|  | ||||
|   | ||||
| @@ -1,5 +1,5 @@ | ||||
| use std::borrow::Cow; | ||||
| use std::collections::{BTreeMap, HashSet}; | ||||
| use std::collections::BTreeMap; | ||||
| use std::convert::TryInto; | ||||
| use std::fs::File; | ||||
| use std::io::{self, BufReader}; | ||||
| @@ -20,6 +20,7 @@ use crate::error::InternalError; | ||||
| use crate::facet::value_encoding::f64_into_bytes; | ||||
| use crate::update::del_add::{DelAdd, KvWriterDelAdd}; | ||||
| use crate::update::index_documents::{create_writer, writer_into_reader}; | ||||
| use crate::update::settings::InnerIndexSettingsDiff; | ||||
| use crate::{CboRoaringBitmapCodec, DocumentId, Error, FieldId, Result, MAX_FACET_VALUE_LENGTH}; | ||||
|  | ||||
| /// The length of the elements that are always in the buffer when inserting new values. | ||||
| @@ -43,7 +44,7 @@ pub struct ExtractedFacetValues { | ||||
| pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>( | ||||
|     obkv_documents: grenad::Reader<R>, | ||||
|     indexer: GrenadParameters, | ||||
|     faceted_fields: &HashSet<FieldId>, | ||||
|     settings_diff: &InnerIndexSettingsDiff, | ||||
|     geo_fields_ids: Option<(FieldId, FieldId)>, | ||||
| ) -> Result<ExtractedFacetValues> { | ||||
|     puffin::profile_function!(); | ||||
| @@ -82,7 +83,9 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>( | ||||
|         let obkv = obkv::KvReader::new(value); | ||||
|  | ||||
|         for (field_id, field_bytes) in obkv.iter() { | ||||
|             if faceted_fields.contains(&field_id) { | ||||
|             let delete_faceted = settings_diff.old.faceted_fields_ids.contains(&field_id); | ||||
|             let add_faceted = settings_diff.new.faceted_fields_ids.contains(&field_id); | ||||
|             if delete_faceted || add_faceted { | ||||
|                 numbers_key_buffer.clear(); | ||||
|                 strings_key_buffer.clear(); | ||||
|  | ||||
| @@ -99,11 +102,12 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>( | ||||
|                 strings_key_buffer.extend_from_slice(docid_bytes); | ||||
|  | ||||
|                 let del_add_obkv = obkv::KvReader::new(field_bytes); | ||||
|                 let del_value = match del_add_obkv.get(DelAdd::Deletion) { | ||||
|                 let del_value = match del_add_obkv.get(DelAdd::Deletion).filter(|_| delete_faceted) | ||||
|                 { | ||||
|                     Some(bytes) => Some(from_slice(bytes).map_err(InternalError::SerdeJson)?), | ||||
|                     None => None, | ||||
|                 }; | ||||
|                 let add_value = match del_add_obkv.get(DelAdd::Addition) { | ||||
|                 let add_value = match del_add_obkv.get(DelAdd::Addition).filter(|_| add_faceted) { | ||||
|                     Some(bytes) => Some(from_slice(bytes).map_err(InternalError::SerdeJson)?), | ||||
|                     None => None, | ||||
|                 }; | ||||
|   | ||||
| @@ -10,6 +10,7 @@ use super::helpers::{ | ||||
| use crate::error::SerializationError; | ||||
| use crate::index::db_name::DOCID_WORD_POSITIONS; | ||||
| use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; | ||||
| use crate::update::settings::InnerIndexSettingsDiff; | ||||
| use crate::Result; | ||||
|  | ||||
| const MAX_COUNTED_WORDS: usize = 30; | ||||
| @@ -23,6 +24,7 @@ const MAX_COUNTED_WORDS: usize = 30; | ||||
| pub fn extract_fid_word_count_docids<R: io::Read + io::Seek>( | ||||
|     docid_word_positions: grenad::Reader<R>, | ||||
|     indexer: GrenadParameters, | ||||
|     _settings_diff: &InnerIndexSettingsDiff, | ||||
| ) -> Result<grenad::Reader<BufReader<File>>> { | ||||
|     puffin::profile_function!(); | ||||
|  | ||||
|   | ||||
| @@ -17,8 +17,9 @@ use crate::error::UserError; | ||||
| use crate::prompt::Prompt; | ||||
| use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; | ||||
| use crate::update::index_documents::helpers::try_split_at; | ||||
| use crate::update::settings::InnerIndexSettingsDiff; | ||||
| use crate::vector::Embedder; | ||||
| use crate::{DocumentId, FieldsIdsMap, InternalError, Result, VectorOrArrayOfVectors}; | ||||
| use crate::{DocumentId, InternalError, Result, ThreadPoolNoAbort, VectorOrArrayOfVectors}; | ||||
|  | ||||
| /// The length of the elements that are always in the buffer when inserting new values. | ||||
| const TRUNCATE_SIZE: usize = size_of::<DocumentId>(); | ||||
| @@ -71,12 +72,15 @@ impl VectorStateDelta { | ||||
| pub fn extract_vector_points<R: io::Read + io::Seek>( | ||||
|     obkv_documents: grenad::Reader<R>, | ||||
|     indexer: GrenadParameters, | ||||
|     field_id_map: &FieldsIdsMap, | ||||
|     settings_diff: &InnerIndexSettingsDiff, | ||||
|     prompt: &Prompt, | ||||
|     embedder_name: &str, | ||||
| ) -> Result<ExtractedVectorPoints> { | ||||
|     puffin::profile_function!(); | ||||
|  | ||||
|     let old_fields_ids_map = &settings_diff.old.fields_ids_map; | ||||
|     let new_fields_ids_map = &settings_diff.new.fields_ids_map; | ||||
|  | ||||
|     // (docid, _index) -> KvWriterDelAdd -> Vector | ||||
|     let mut manual_vectors_writer = create_writer( | ||||
|         indexer.chunk_compression_type, | ||||
| @@ -98,8 +102,6 @@ pub fn extract_vector_points<R: io::Read + io::Seek>( | ||||
|         tempfile::tempfile()?, | ||||
|     ); | ||||
|  | ||||
|     let vectors_fid = field_id_map.id("_vectors"); | ||||
|  | ||||
|     let mut key_buffer = Vec::new(); | ||||
|     let mut cursor = obkv_documents.into_cursor()?; | ||||
|     while let Some((key, value)) = cursor.move_on_next()? { | ||||
| @@ -116,15 +118,29 @@ pub fn extract_vector_points<R: io::Read + io::Seek>( | ||||
|         // lazily get it when needed | ||||
|         let document_id = || -> Value { from_utf8(external_id_bytes).unwrap().into() }; | ||||
|  | ||||
|         let vectors_field = vectors_fid | ||||
|             .and_then(|vectors_fid| obkv.get(vectors_fid)) | ||||
|             .map(KvReaderDelAdd::new) | ||||
|             .map(|obkv| to_vector_maps(obkv, document_id)) | ||||
|             .transpose()?; | ||||
|         // the vector field id may have changed | ||||
|         let old_vectors_fid = old_fields_ids_map.id("_vectors"); | ||||
|         // filter the old vector fid if the settings has been changed forcing reindexing. | ||||
|         let old_vectors_fid = old_vectors_fid.filter(|_| !settings_diff.reindex_vectors()); | ||||
|  | ||||
|         let (del_map, add_map) = vectors_field.unzip(); | ||||
|         let del_map = del_map.flatten(); | ||||
|         let add_map = add_map.flatten(); | ||||
|         let new_vectors_fid = new_fields_ids_map.id("_vectors"); | ||||
|         let vectors_field = { | ||||
|             let del = old_vectors_fid | ||||
|                 .and_then(|vectors_fid| obkv.get(vectors_fid)) | ||||
|                 .map(KvReaderDelAdd::new) | ||||
|                 .map(|obkv| to_vector_map(obkv, DelAdd::Deletion, &document_id)) | ||||
|                 .transpose()? | ||||
|                 .flatten(); | ||||
|             let add = new_vectors_fid | ||||
|                 .and_then(|vectors_fid| obkv.get(vectors_fid)) | ||||
|                 .map(KvReaderDelAdd::new) | ||||
|                 .map(|obkv| to_vector_map(obkv, DelAdd::Addition, &document_id)) | ||||
|                 .transpose()? | ||||
|                 .flatten(); | ||||
|             (del, add) | ||||
|         }; | ||||
|  | ||||
|         let (del_map, add_map) = vectors_field; | ||||
|  | ||||
|         let del_value = del_map.and_then(|mut map| map.remove(embedder_name)); | ||||
|         let add_value = add_map.and_then(|mut map| map.remove(embedder_name)); | ||||
| @@ -155,7 +171,7 @@ pub fn extract_vector_points<R: io::Read + io::Seek>( | ||||
|                     VectorStateDelta::NowGenerated(prompt.render( | ||||
|                         obkv, | ||||
|                         DelAdd::Addition, | ||||
|                         field_id_map, | ||||
|                         new_fields_ids_map, | ||||
|                     )?) | ||||
|                 } else { | ||||
|                     VectorStateDelta::NowRemoved | ||||
| @@ -182,10 +198,16 @@ pub fn extract_vector_points<R: io::Read + io::Seek>( | ||||
|  | ||||
|                 if document_is_kept { | ||||
|                     // Don't give up if the old prompt was failing | ||||
|                     let old_prompt = | ||||
|                         prompt.render(obkv, DelAdd::Deletion, field_id_map).unwrap_or_default(); | ||||
|                     let new_prompt = prompt.render(obkv, DelAdd::Addition, field_id_map)?; | ||||
|                     if old_prompt != new_prompt { | ||||
|                     let old_prompt = Some(prompt) | ||||
|                         // TODO: this filter works because we erase the vec database when a embedding setting changes. | ||||
|                         // When vector pipeline will be optimized, this should be removed. | ||||
|                         .filter(|_| !settings_diff.reindex_vectors()) | ||||
|                         .map(|p| { | ||||
|                             p.render(obkv, DelAdd::Deletion, old_fields_ids_map).unwrap_or_default() | ||||
|                         }); | ||||
|                     let new_prompt = prompt.render(obkv, DelAdd::Addition, new_fields_ids_map)?; | ||||
|                     if old_prompt.as_ref() != Some(&new_prompt) { | ||||
|                         let old_prompt = old_prompt.unwrap_or_default(); | ||||
|                         tracing::trace!( | ||||
|                             "🚀 Changing prompt from\n{old_prompt}\n===to===\n{new_prompt}" | ||||
|                         ); | ||||
| @@ -207,6 +229,7 @@ pub fn extract_vector_points<R: io::Read + io::Seek>( | ||||
|             &mut manual_vectors_writer, | ||||
|             &mut key_buffer, | ||||
|             delta, | ||||
|             settings_diff, | ||||
|         )?; | ||||
|     } | ||||
|  | ||||
| @@ -220,15 +243,6 @@ pub fn extract_vector_points<R: io::Read + io::Seek>( | ||||
|     }) | ||||
| } | ||||
|  | ||||
| fn to_vector_maps( | ||||
|     obkv: KvReaderDelAdd, | ||||
|     document_id: impl Fn() -> Value, | ||||
| ) -> Result<(Option<serde_json::Map<String, Value>>, Option<serde_json::Map<String, Value>>)> { | ||||
|     let del = to_vector_map(obkv, DelAdd::Deletion, &document_id)?; | ||||
|     let add = to_vector_map(obkv, DelAdd::Addition, &document_id)?; | ||||
|     Ok((del, add)) | ||||
| } | ||||
|  | ||||
| fn to_vector_map( | ||||
|     obkv: KvReaderDelAdd, | ||||
|     side: DelAdd, | ||||
| @@ -256,10 +270,15 @@ fn push_vectors_diff( | ||||
|     manual_vectors_writer: &mut Writer<BufWriter<File>>, | ||||
|     key_buffer: &mut Vec<u8>, | ||||
|     delta: VectorStateDelta, | ||||
|     settings_diff: &InnerIndexSettingsDiff, | ||||
| ) -> Result<()> { | ||||
|     puffin::profile_function!(); | ||||
|     let (must_remove, prompt, (mut del_vectors, mut add_vectors)) = delta.into_values(); | ||||
|     if must_remove { | ||||
|     if must_remove | ||||
|     // TODO: the below condition works because we erase the vec database when a embedding setting changes. | ||||
|     // When vector pipeline will be optimized, this should be removed. | ||||
|     && !settings_diff.reindex_vectors() | ||||
|     { | ||||
|         key_buffer.truncate(TRUNCATE_SIZE); | ||||
|         remove_vectors_writer.insert(&key_buffer, [])?; | ||||
|     } | ||||
| @@ -287,12 +306,16 @@ fn push_vectors_diff( | ||||
|         match eob { | ||||
|             EitherOrBoth::Both(_, _) => (), // no need to touch anything | ||||
|             EitherOrBoth::Left(vector) => { | ||||
|                 // We insert only the Del part of the Obkv to inform | ||||
|                 // that we only want to remove all those vectors. | ||||
|                 let mut obkv = KvWriterDelAdd::memory(); | ||||
|                 obkv.insert(DelAdd::Deletion, cast_slice(&vector))?; | ||||
|                 let bytes = obkv.into_inner()?; | ||||
|                 manual_vectors_writer.insert(&key_buffer, bytes)?; | ||||
|                 // TODO: the below condition works because we erase the vec database when a embedding setting changes. | ||||
|                 // When vector pipeline will be optimized, this should be removed. | ||||
|                 if !settings_diff.reindex_vectors() { | ||||
|                     // We insert only the Del part of the Obkv to inform | ||||
|                     // that we only want to remove all those vectors. | ||||
|                     let mut obkv = KvWriterDelAdd::memory(); | ||||
|                     obkv.insert(DelAdd::Deletion, cast_slice(&vector))?; | ||||
|                     let bytes = obkv.into_inner()?; | ||||
|                     manual_vectors_writer.insert(&key_buffer, bytes)?; | ||||
|                 } | ||||
|             } | ||||
|             EitherOrBoth::Right(vector) => { | ||||
|                 // We insert only the Add part of the Obkv to inform | ||||
| @@ -339,7 +362,7 @@ pub fn extract_embeddings<R: io::Read + io::Seek>( | ||||
|     prompt_reader: grenad::Reader<R>, | ||||
|     indexer: GrenadParameters, | ||||
|     embedder: Arc<Embedder>, | ||||
|     request_threads: &rayon::ThreadPool, | ||||
|     request_threads: &ThreadPoolNoAbort, | ||||
| ) -> Result<grenad::Reader<BufReader<File>>> { | ||||
|     puffin::profile_function!(); | ||||
|     let n_chunks = embedder.chunk_count_hint(); // chunk level parallelism | ||||
|   | ||||
| @@ -1,20 +1,23 @@ | ||||
| use std::collections::{BTreeSet, HashSet}; | ||||
| use std::collections::BTreeSet; | ||||
| use std::fs::File; | ||||
| use std::io::{self, BufReader}; | ||||
|  | ||||
| use heed::BytesDecode; | ||||
| use heed::{BytesDecode, BytesEncode}; | ||||
| use obkv::KvReaderU16; | ||||
| use roaring::RoaringBitmap; | ||||
|  | ||||
| use super::helpers::{ | ||||
|     create_sorter, create_writer, merge_deladd_cbo_roaring_bitmaps, sorter_into_reader, | ||||
|     try_split_array_at, writer_into_reader, GrenadParameters, | ||||
|     create_sorter, create_writer, merge_deladd_cbo_roaring_bitmaps, try_split_array_at, | ||||
|     writer_into_reader, GrenadParameters, | ||||
| }; | ||||
| use crate::error::SerializationError; | ||||
| use crate::heed_codec::StrBEU16Codec; | ||||
| use crate::index::db_name::DOCID_WORD_POSITIONS; | ||||
| use crate::update::del_add::{is_noop_del_add_obkv, DelAdd, KvReaderDelAdd, KvWriterDelAdd}; | ||||
| use crate::update::index_documents::helpers::sorter_into_reader; | ||||
| use crate::update::settings::InnerIndexSettingsDiff; | ||||
| use crate::update::MergeFn; | ||||
| use crate::{DocumentId, FieldId, Result}; | ||||
| use crate::{CboRoaringBitmapCodec, DocumentId, FieldId, Result}; | ||||
|  | ||||
| /// Extracts the word and the documents ids where this word appear. | ||||
| /// | ||||
| @@ -27,7 +30,7 @@ use crate::{DocumentId, FieldId, Result}; | ||||
| pub fn extract_word_docids<R: io::Read + io::Seek>( | ||||
|     docid_word_positions: grenad::Reader<R>, | ||||
|     indexer: GrenadParameters, | ||||
|     exact_attributes: &HashSet<FieldId>, | ||||
|     settings_diff: &InnerIndexSettingsDiff, | ||||
| ) -> Result<( | ||||
|     grenad::Reader<BufReader<File>>, | ||||
|     grenad::Reader<BufReader<File>>, | ||||
| @@ -43,7 +46,7 @@ pub fn extract_word_docids<R: io::Read + io::Seek>( | ||||
|         indexer.chunk_compression_type, | ||||
|         indexer.chunk_compression_level, | ||||
|         indexer.max_nb_chunks, | ||||
|         max_memory.map(|x| x / 3), | ||||
|         max_memory.map(|m| m / 3), | ||||
|     ); | ||||
|     let mut key_buffer = Vec::new(); | ||||
|     let mut del_words = BTreeSet::new(); | ||||
| @@ -85,13 +88,19 @@ pub fn extract_word_docids<R: io::Read + io::Seek>( | ||||
|         add_words.clear(); | ||||
|     } | ||||
|  | ||||
|     let mut word_fid_docids_writer = create_writer( | ||||
|         indexer.chunk_compression_type, | ||||
|         indexer.chunk_compression_level, | ||||
|         tempfile::tempfile()?, | ||||
|     ); | ||||
|  | ||||
|     let mut word_docids_sorter = create_sorter( | ||||
|         grenad::SortAlgorithm::Unstable, | ||||
|         merge_deladd_cbo_roaring_bitmaps, | ||||
|         indexer.chunk_compression_type, | ||||
|         indexer.chunk_compression_level, | ||||
|         indexer.max_nb_chunks, | ||||
|         max_memory.map(|x| x / 3), | ||||
|         max_memory.map(|m| m / 3), | ||||
|     ); | ||||
|  | ||||
|     let mut exact_word_docids_sorter = create_sorter( | ||||
| @@ -100,31 +109,45 @@ pub fn extract_word_docids<R: io::Read + io::Seek>( | ||||
|         indexer.chunk_compression_type, | ||||
|         indexer.chunk_compression_level, | ||||
|         indexer.max_nb_chunks, | ||||
|         max_memory.map(|x| x / 3), | ||||
|     ); | ||||
|  | ||||
|     let mut word_fid_docids_writer = create_writer( | ||||
|         indexer.chunk_compression_type, | ||||
|         indexer.chunk_compression_level, | ||||
|         tempfile::tempfile()?, | ||||
|         max_memory.map(|m| m / 3), | ||||
|     ); | ||||
|  | ||||
|     let mut iter = word_fid_docids_sorter.into_stream_merger_iter()?; | ||||
|     // TODO: replace sorters by writers by accumulating values into a buffer before inserting them. | ||||
|     let mut buffer = Vec::new(); | ||||
|     // NOTE: replacing sorters by bitmap merging is less efficient, so, use sorters. | ||||
|     while let Some((key, value)) = iter.next()? { | ||||
|         // only keep the value if their is a change to apply in the DB. | ||||
|         if !is_noop_del_add_obkv(KvReaderDelAdd::new(value)) { | ||||
|             word_fid_docids_writer.insert(key, value)?; | ||||
|         } | ||||
|  | ||||
|         let (word, fid) = StrBEU16Codec::bytes_decode(key) | ||||
|         let (w, fid) = StrBEU16Codec::bytes_decode(key) | ||||
|             .map_err(|_| SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; | ||||
|  | ||||
|         // every words contained in an attribute set to exact must be pushed in the exact_words list. | ||||
|         if exact_attributes.contains(&fid) { | ||||
|             exact_word_docids_sorter.insert(word.as_bytes(), value)?; | ||||
|         } else { | ||||
|             word_docids_sorter.insert(word.as_bytes(), value)?; | ||||
|         // merge all deletions | ||||
|         let obkv = KvReaderDelAdd::new(value); | ||||
|         if let Some(value) = obkv.get(DelAdd::Deletion) { | ||||
|             let delete_from_exact = settings_diff.old.exact_attributes.contains(&fid); | ||||
|             buffer.clear(); | ||||
|             let mut obkv = KvWriterDelAdd::new(&mut buffer); | ||||
|             obkv.insert(DelAdd::Deletion, value)?; | ||||
|             if delete_from_exact { | ||||
|                 exact_word_docids_sorter.insert(w, obkv.into_inner().unwrap())?; | ||||
|             } else { | ||||
|                 word_docids_sorter.insert(w, obkv.into_inner().unwrap())?; | ||||
|             } | ||||
|         } | ||||
|         // merge all additions | ||||
|         if let Some(value) = obkv.get(DelAdd::Addition) { | ||||
|             let add_in_exact = settings_diff.new.exact_attributes.contains(&fid); | ||||
|             buffer.clear(); | ||||
|             let mut obkv = KvWriterDelAdd::new(&mut buffer); | ||||
|             obkv.insert(DelAdd::Addition, value)?; | ||||
|             if add_in_exact { | ||||
|                 exact_word_docids_sorter.insert(w, obkv.into_inner().unwrap())?; | ||||
|             } else { | ||||
|                 word_docids_sorter.insert(w, obkv.into_inner().unwrap())?; | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
| @@ -178,3 +201,45 @@ fn words_into_sorter( | ||||
|  | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| #[tracing::instrument(level = "trace", skip_all, target = "indexing::extract")] | ||||
| fn docids_into_writers<W>( | ||||
|     word: &str, | ||||
|     deletions: &RoaringBitmap, | ||||
|     additions: &RoaringBitmap, | ||||
|     writer: &mut grenad::Writer<W>, | ||||
| ) -> Result<()> | ||||
| where | ||||
|     W: std::io::Write, | ||||
| { | ||||
|     if deletions == additions { | ||||
|         // if the same value is deleted and added, do nothing. | ||||
|         return Ok(()); | ||||
|     } | ||||
|  | ||||
|     // Write each value in the same KvDelAdd before inserting it in the final writer. | ||||
|     let mut obkv = KvWriterDelAdd::memory(); | ||||
|     // deletions: | ||||
|     if !deletions.is_empty() && !deletions.is_subset(additions) { | ||||
|         obkv.insert( | ||||
|             DelAdd::Deletion, | ||||
|             CboRoaringBitmapCodec::bytes_encode(deletions).map_err(|_| { | ||||
|                 SerializationError::Encoding { db_name: Some(DOCID_WORD_POSITIONS) } | ||||
|             })?, | ||||
|         )?; | ||||
|     } | ||||
|     // additions: | ||||
|     if !additions.is_empty() { | ||||
|         obkv.insert( | ||||
|             DelAdd::Addition, | ||||
|             CboRoaringBitmapCodec::bytes_encode(additions).map_err(|_| { | ||||
|                 SerializationError::Encoding { db_name: Some(DOCID_WORD_POSITIONS) } | ||||
|             })?, | ||||
|         )?; | ||||
|     } | ||||
|  | ||||
|     // insert everything in the same writer. | ||||
|     writer.insert(word.as_bytes(), obkv.into_inner().unwrap())?; | ||||
|  | ||||
|     Ok(()) | ||||
| } | ||||
|   | ||||
| @@ -11,8 +11,9 @@ use super::helpers::{ | ||||
| }; | ||||
| use crate::error::SerializationError; | ||||
| use crate::index::db_name::DOCID_WORD_POSITIONS; | ||||
| use crate::proximity::{index_proximity, MAX_DISTANCE}; | ||||
| use crate::proximity::{index_proximity, ProximityPrecision, MAX_DISTANCE}; | ||||
| use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; | ||||
| use crate::update::settings::InnerIndexSettingsDiff; | ||||
| use crate::{DocumentId, Result}; | ||||
|  | ||||
| /// Extracts the best proximity between pairs of words and the documents ids where this pair appear. | ||||
| @@ -23,8 +24,21 @@ use crate::{DocumentId, Result}; | ||||
| pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>( | ||||
|     docid_word_positions: grenad::Reader<R>, | ||||
|     indexer: GrenadParameters, | ||||
|     settings_diff: &InnerIndexSettingsDiff, | ||||
| ) -> Result<grenad::Reader<BufReader<File>>> { | ||||
|     puffin::profile_function!(); | ||||
|     let any_deletion = settings_diff.old.proximity_precision == ProximityPrecision::ByWord; | ||||
|     let any_addition = settings_diff.new.proximity_precision == ProximityPrecision::ByWord; | ||||
|  | ||||
|     // early return if the data shouldn't be deleted nor created. | ||||
|     if !any_deletion && !any_addition { | ||||
|         let writer = create_writer( | ||||
|             indexer.chunk_compression_type, | ||||
|             indexer.chunk_compression_level, | ||||
|             tempfile::tempfile()?, | ||||
|         ); | ||||
|         return writer_into_reader(writer); | ||||
|     } | ||||
|  | ||||
|     let max_memory = indexer.max_memory_by_thread(); | ||||
|  | ||||
| @@ -77,6 +91,10 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>( | ||||
|  | ||||
|         let (del, add): (Result<_>, Result<_>) = rayon::join( | ||||
|             || { | ||||
|                 if !any_deletion { | ||||
|                     return Ok(()); | ||||
|                 } | ||||
|  | ||||
|                 // deletions | ||||
|                 if let Some(deletion) = KvReaderDelAdd::new(value).get(DelAdd::Deletion) { | ||||
|                     for (position, word) in KvReaderU16::new(deletion).iter() { | ||||
| @@ -106,6 +124,10 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>( | ||||
|                 Ok(()) | ||||
|             }, | ||||
|             || { | ||||
|                 if !any_addition { | ||||
|                     return Ok(()); | ||||
|                 } | ||||
|  | ||||
|                 // additions | ||||
|                 if let Some(addition) = KvReaderDelAdd::new(value).get(DelAdd::Addition) { | ||||
|                     for (position, word) in KvReaderU16::new(addition).iter() { | ||||
|   | ||||
| @@ -11,6 +11,7 @@ use super::helpers::{ | ||||
| use crate::error::SerializationError; | ||||
| use crate::index::db_name::DOCID_WORD_POSITIONS; | ||||
| use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; | ||||
| use crate::update::settings::InnerIndexSettingsDiff; | ||||
| use crate::update::MergeFn; | ||||
| use crate::{bucketed_position, DocumentId, Result}; | ||||
|  | ||||
| @@ -22,6 +23,7 @@ use crate::{bucketed_position, DocumentId, Result}; | ||||
| pub fn extract_word_position_docids<R: io::Read + io::Seek>( | ||||
|     docid_word_positions: grenad::Reader<R>, | ||||
|     indexer: GrenadParameters, | ||||
|     _settings_diff: &InnerIndexSettingsDiff, | ||||
| ) -> Result<grenad::Reader<BufReader<File>>> { | ||||
|     puffin::profile_function!(); | ||||
|  | ||||
|   | ||||
| @@ -9,9 +9,9 @@ mod extract_word_docids; | ||||
| mod extract_word_pair_proximity_docids; | ||||
| mod extract_word_position_docids; | ||||
|  | ||||
| use std::collections::HashSet; | ||||
| use std::fs::File; | ||||
| use std::io::BufReader; | ||||
| use std::sync::Arc; | ||||
|  | ||||
| use crossbeam_channel::Sender; | ||||
| use rayon::prelude::*; | ||||
| @@ -30,9 +30,8 @@ use self::extract_word_pair_proximity_docids::extract_word_pair_proximity_docids | ||||
| use self::extract_word_position_docids::extract_word_position_docids; | ||||
| use super::helpers::{as_cloneable_grenad, CursorClonableMmap, GrenadParameters}; | ||||
| use super::{helpers, TypedChunk}; | ||||
| use crate::proximity::ProximityPrecision; | ||||
| use crate::vector::EmbeddingConfigs; | ||||
| use crate::{FieldId, FieldsIdsMap, Result}; | ||||
| use crate::update::settings::InnerIndexSettingsDiff; | ||||
| use crate::{FieldId, Result, ThreadPoolNoAbortBuilder}; | ||||
|  | ||||
| /// Extract data for each databases from obkv documents in parallel. | ||||
| /// Send data in grenad file over provided Sender. | ||||
| @@ -43,18 +42,10 @@ pub(crate) fn data_from_obkv_documents( | ||||
|     flattened_obkv_chunks: impl Iterator<Item = Result<grenad::Reader<BufReader<File>>>> + Send, | ||||
|     indexer: GrenadParameters, | ||||
|     lmdb_writer_sx: Sender<Result<TypedChunk>>, | ||||
|     searchable_fields: Option<HashSet<FieldId>>, | ||||
|     faceted_fields: HashSet<FieldId>, | ||||
|     primary_key_id: FieldId, | ||||
|     geo_fields_ids: Option<(FieldId, FieldId)>, | ||||
|     field_id_map: FieldsIdsMap, | ||||
|     stop_words: Option<fst::Set<Vec<u8>>>, | ||||
|     allowed_separators: Option<&[&str]>, | ||||
|     dictionary: Option<&[&str]>, | ||||
|     settings_diff: Arc<InnerIndexSettingsDiff>, | ||||
|     max_positions_per_attributes: Option<u32>, | ||||
|     exact_attributes: HashSet<FieldId>, | ||||
|     proximity_precision: ProximityPrecision, | ||||
|     embedders: EmbeddingConfigs, | ||||
| ) -> Result<()> { | ||||
|     puffin::profile_function!(); | ||||
|  | ||||
| @@ -67,8 +58,7 @@ pub(crate) fn data_from_obkv_documents( | ||||
|                         original_documents_chunk, | ||||
|                         indexer, | ||||
|                         lmdb_writer_sx.clone(), | ||||
|                         field_id_map.clone(), | ||||
|                         embedders.clone(), | ||||
|                         settings_diff.clone(), | ||||
|                     ) | ||||
|                 }) | ||||
|                 .collect::<Result<()>>() | ||||
| @@ -81,13 +71,9 @@ pub(crate) fn data_from_obkv_documents( | ||||
|                         flattened_obkv_chunks, | ||||
|                         indexer, | ||||
|                         lmdb_writer_sx.clone(), | ||||
|                         &searchable_fields, | ||||
|                         &faceted_fields, | ||||
|                         primary_key_id, | ||||
|                         geo_fields_ids, | ||||
|                         &stop_words, | ||||
|                         &allowed_separators, | ||||
|                         &dictionary, | ||||
|                         settings_diff.clone(), | ||||
|                         max_positions_per_attributes, | ||||
|                     ) | ||||
|                 }) | ||||
| @@ -100,13 +86,12 @@ pub(crate) fn data_from_obkv_documents( | ||||
|                         run_extraction_task::<_, _, grenad::Reader<BufReader<File>>>( | ||||
|                             docid_word_positions_chunk.clone(), | ||||
|                             indexer, | ||||
|                             settings_diff.clone(), | ||||
|                             lmdb_writer_sx.clone(), | ||||
|                             extract_fid_word_count_docids, | ||||
|                             TypedChunk::FieldIdWordCountDocids, | ||||
|                             "field-id-wordcount-docids", | ||||
|                         ); | ||||
|  | ||||
|                         let exact_attributes = exact_attributes.clone(); | ||||
|                         run_extraction_task::< | ||||
|                             _, | ||||
|                             _, | ||||
| @@ -118,10 +103,9 @@ pub(crate) fn data_from_obkv_documents( | ||||
|                         >( | ||||
|                             docid_word_positions_chunk.clone(), | ||||
|                             indexer, | ||||
|                             settings_diff.clone(), | ||||
|                             lmdb_writer_sx.clone(), | ||||
|                             move |doc_word_pos, indexer| { | ||||
|                                 extract_word_docids(doc_word_pos, indexer, &exact_attributes) | ||||
|                             }, | ||||
|                             extract_word_docids, | ||||
|                             |( | ||||
|                                 word_docids_reader, | ||||
|                                 exact_word_docids_reader, | ||||
| @@ -139,6 +123,7 @@ pub(crate) fn data_from_obkv_documents( | ||||
|                         run_extraction_task::<_, _, grenad::Reader<BufReader<File>>>( | ||||
|                             docid_word_positions_chunk.clone(), | ||||
|                             indexer, | ||||
|                             settings_diff.clone(), | ||||
|                             lmdb_writer_sx.clone(), | ||||
|                             extract_word_position_docids, | ||||
|                             TypedChunk::WordPositionDocids, | ||||
| @@ -152,6 +137,7 @@ pub(crate) fn data_from_obkv_documents( | ||||
|                         >( | ||||
|                             fid_docid_facet_strings_chunk.clone(), | ||||
|                             indexer, | ||||
|                             settings_diff.clone(), | ||||
|                             lmdb_writer_sx.clone(), | ||||
|                             extract_facet_string_docids, | ||||
|                             TypedChunk::FieldIdFacetStringDocids, | ||||
| @@ -161,22 +147,22 @@ pub(crate) fn data_from_obkv_documents( | ||||
|                         run_extraction_task::<_, _, grenad::Reader<BufReader<File>>>( | ||||
|                             fid_docid_facet_numbers_chunk.clone(), | ||||
|                             indexer, | ||||
|                             settings_diff.clone(), | ||||
|                             lmdb_writer_sx.clone(), | ||||
|                             extract_facet_number_docids, | ||||
|                             TypedChunk::FieldIdFacetNumberDocids, | ||||
|                             "field-id-facet-number-docids", | ||||
|                         ); | ||||
|  | ||||
|                         if proximity_precision == ProximityPrecision::ByWord { | ||||
|                             run_extraction_task::<_, _, grenad::Reader<BufReader<File>>>( | ||||
|                                 docid_word_positions_chunk.clone(), | ||||
|                                 indexer, | ||||
|                                 lmdb_writer_sx.clone(), | ||||
|                                 extract_word_pair_proximity_docids, | ||||
|                                 TypedChunk::WordPairProximityDocids, | ||||
|                                 "word-pair-proximity-docids", | ||||
|                             ); | ||||
|                         } | ||||
|                         run_extraction_task::<_, _, grenad::Reader<BufReader<File>>>( | ||||
|                             docid_word_positions_chunk.clone(), | ||||
|                             indexer, | ||||
|                             settings_diff.clone(), | ||||
|                             lmdb_writer_sx.clone(), | ||||
|                             extract_word_pair_proximity_docids, | ||||
|                             TypedChunk::WordPairProximityDocids, | ||||
|                             "word-pair-proximity-docids", | ||||
|                         ); | ||||
|                     } | ||||
|  | ||||
|                     Ok(()) | ||||
| @@ -195,12 +181,17 @@ pub(crate) fn data_from_obkv_documents( | ||||
| fn run_extraction_task<FE, FS, M>( | ||||
|     chunk: grenad::Reader<CursorClonableMmap>, | ||||
|     indexer: GrenadParameters, | ||||
|     settings_diff: Arc<InnerIndexSettingsDiff>, | ||||
|     lmdb_writer_sx: Sender<Result<TypedChunk>>, | ||||
|     extract_fn: FE, | ||||
|     serialize_fn: FS, | ||||
|     name: &'static str, | ||||
| ) where | ||||
|     FE: Fn(grenad::Reader<CursorClonableMmap>, GrenadParameters) -> Result<M> | ||||
|     FE: Fn( | ||||
|             grenad::Reader<CursorClonableMmap>, | ||||
|             GrenadParameters, | ||||
|             &InnerIndexSettingsDiff, | ||||
|         ) -> Result<M> | ||||
|         + Sync | ||||
|         + Send | ||||
|         + 'static, | ||||
| @@ -213,7 +204,7 @@ fn run_extraction_task<FE, FS, M>( | ||||
|         let child_span = tracing::trace_span!(target: "indexing::extract::details", parent: ¤t_span, "extract_multiple_chunks"); | ||||
|         let _entered = child_span.enter(); | ||||
|         puffin::profile_scope!("extract_multiple_chunks", name); | ||||
|         match extract_fn(chunk, indexer) { | ||||
|         match extract_fn(chunk, indexer, &settings_diff) { | ||||
|             Ok(chunk) => { | ||||
|                 let _ = lmdb_writer_sx.send(Ok(serialize_fn(chunk))); | ||||
|             } | ||||
| @@ -230,8 +221,7 @@ fn send_original_documents_data( | ||||
|     original_documents_chunk: Result<grenad::Reader<BufReader<File>>>, | ||||
|     indexer: GrenadParameters, | ||||
|     lmdb_writer_sx: Sender<Result<TypedChunk>>, | ||||
|     field_id_map: FieldsIdsMap, | ||||
|     embedders: EmbeddingConfigs, | ||||
|     settings_diff: Arc<InnerIndexSettingsDiff>, | ||||
| ) -> Result<()> { | ||||
|     let original_documents_chunk = | ||||
|         original_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?; | ||||
| @@ -239,55 +229,58 @@ fn send_original_documents_data( | ||||
|     let documents_chunk_cloned = original_documents_chunk.clone(); | ||||
|     let lmdb_writer_sx_cloned = lmdb_writer_sx.clone(); | ||||
|  | ||||
|     let request_threads = rayon::ThreadPoolBuilder::new() | ||||
|     let request_threads = ThreadPoolNoAbortBuilder::new() | ||||
|         .num_threads(crate::vector::REQUEST_PARALLELISM) | ||||
|         .thread_name(|index| format!("embedding-request-{index}")) | ||||
|         .build()?; | ||||
|  | ||||
|     rayon::spawn(move || { | ||||
|         for (name, (embedder, prompt)) in embedders { | ||||
|             let result = extract_vector_points( | ||||
|                 documents_chunk_cloned.clone(), | ||||
|                 indexer, | ||||
|                 &field_id_map, | ||||
|                 &prompt, | ||||
|                 &name, | ||||
|             ); | ||||
|             match result { | ||||
|                 Ok(ExtractedVectorPoints { manual_vectors, remove_vectors, prompts }) => { | ||||
|                     let embeddings = match extract_embeddings( | ||||
|                         prompts, | ||||
|                         indexer, | ||||
|                         embedder.clone(), | ||||
|                         &request_threads, | ||||
|                     ) { | ||||
|                         Ok(results) => Some(results), | ||||
|                         Err(error) => { | ||||
|                             let _ = lmdb_writer_sx_cloned.send(Err(error)); | ||||
|                             None | ||||
|                         } | ||||
|                     }; | ||||
|     if settings_diff.reindex_vectors() || !settings_diff.settings_update_only() { | ||||
|         let settings_diff = settings_diff.clone(); | ||||
|         rayon::spawn(move || { | ||||
|             for (name, (embedder, prompt)) in settings_diff.new.embedding_configs.clone() { | ||||
|                 let result = extract_vector_points( | ||||
|                     documents_chunk_cloned.clone(), | ||||
|                     indexer, | ||||
|                     &settings_diff, | ||||
|                     &prompt, | ||||
|                     &name, | ||||
|                 ); | ||||
|                 match result { | ||||
|                     Ok(ExtractedVectorPoints { manual_vectors, remove_vectors, prompts }) => { | ||||
|                         let embeddings = match extract_embeddings( | ||||
|                             prompts, | ||||
|                             indexer, | ||||
|                             embedder.clone(), | ||||
|                             &request_threads, | ||||
|                         ) { | ||||
|                             Ok(results) => Some(results), | ||||
|                             Err(error) => { | ||||
|                                 let _ = lmdb_writer_sx_cloned.send(Err(error)); | ||||
|                                 None | ||||
|                             } | ||||
|                         }; | ||||
|  | ||||
|                     if !(remove_vectors.is_empty() | ||||
|                         && manual_vectors.is_empty() | ||||
|                         && embeddings.as_ref().map_or(true, |e| e.is_empty())) | ||||
|                     { | ||||
|                         let _ = lmdb_writer_sx_cloned.send(Ok(TypedChunk::VectorPoints { | ||||
|                             remove_vectors, | ||||
|                             embeddings, | ||||
|                             expected_dimension: embedder.dimensions(), | ||||
|                             manual_vectors, | ||||
|                             embedder_name: name, | ||||
|                         })); | ||||
|                         if !(remove_vectors.is_empty() | ||||
|                             && manual_vectors.is_empty() | ||||
|                             && embeddings.as_ref().map_or(true, |e| e.is_empty())) | ||||
|                         { | ||||
|                             let _ = lmdb_writer_sx_cloned.send(Ok(TypedChunk::VectorPoints { | ||||
|                                 remove_vectors, | ||||
|                                 embeddings, | ||||
|                                 expected_dimension: embedder.dimensions(), | ||||
|                                 manual_vectors, | ||||
|                                 embedder_name: name, | ||||
|                             })); | ||||
|                         } | ||||
|                     } | ||||
|  | ||||
|                     Err(error) => { | ||||
|                         let _ = lmdb_writer_sx_cloned.send(Err(error)); | ||||
|                     } | ||||
|                 } | ||||
|  | ||||
|                 Err(error) => { | ||||
|                     let _ = lmdb_writer_sx_cloned.send(Err(error)); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|     }); | ||||
|         }); | ||||
|     } | ||||
|  | ||||
|     // TODO: create a custom internal error | ||||
|     let _ = lmdb_writer_sx.send(Ok(TypedChunk::Documents(original_documents_chunk))); | ||||
| @@ -306,13 +299,9 @@ fn send_and_extract_flattened_documents_data( | ||||
|     flattened_documents_chunk: Result<grenad::Reader<BufReader<File>>>, | ||||
|     indexer: GrenadParameters, | ||||
|     lmdb_writer_sx: Sender<Result<TypedChunk>>, | ||||
|     searchable_fields: &Option<HashSet<FieldId>>, | ||||
|     faceted_fields: &HashSet<FieldId>, | ||||
|     primary_key_id: FieldId, | ||||
|     geo_fields_ids: Option<(FieldId, FieldId)>, | ||||
|     stop_words: &Option<fst::Set<Vec<u8>>>, | ||||
|     allowed_separators: &Option<&[&str]>, | ||||
|     dictionary: &Option<&[&str]>, | ||||
|     settings_diff: Arc<InnerIndexSettingsDiff>, | ||||
|     max_positions_per_attributes: Option<u32>, | ||||
| ) -> Result<( | ||||
|     grenad::Reader<CursorClonableMmap>, | ||||
| @@ -341,10 +330,7 @@ fn send_and_extract_flattened_documents_data( | ||||
|                     extract_docid_word_positions( | ||||
|                         flattened_documents_chunk.clone(), | ||||
|                         indexer, | ||||
|                         searchable_fields, | ||||
|                         stop_words.as_ref(), | ||||
|                         *allowed_separators, | ||||
|                         *dictionary, | ||||
|                         &settings_diff, | ||||
|                         max_positions_per_attributes, | ||||
|                     )?; | ||||
|  | ||||
| @@ -367,7 +353,7 @@ fn send_and_extract_flattened_documents_data( | ||||
|                 } = extract_fid_docid_facet_values( | ||||
|                     flattened_documents_chunk.clone(), | ||||
|                     indexer, | ||||
|                     faceted_fields, | ||||
|                     &settings_diff, | ||||
|                     geo_fields_ids, | ||||
|                 )?; | ||||
|  | ||||
|   | ||||
| @@ -6,9 +6,9 @@ mod typed_chunk; | ||||
|  | ||||
| use std::collections::{HashMap, HashSet}; | ||||
| use std::io::{Read, Seek}; | ||||
| use std::iter::FromIterator; | ||||
| use std::num::NonZeroU32; | ||||
| use std::result::Result as StdResult; | ||||
| use std::sync::Arc; | ||||
|  | ||||
| use crossbeam_channel::{Receiver, Sender}; | ||||
| use grenad::{Merger, MergerBuilder}; | ||||
| @@ -33,6 +33,7 @@ use self::helpers::{grenad_obkv_into_chunks, GrenadParameters}; | ||||
| pub use self::transform::{Transform, TransformOutput}; | ||||
| use crate::documents::{obkv_to_object, DocumentsBatchReader}; | ||||
| use crate::error::{Error, InternalError, UserError}; | ||||
| use crate::thread_pool_no_abort::ThreadPoolNoAbortBuilder; | ||||
| pub use crate::update::index_documents::helpers::CursorClonableMmap; | ||||
| use crate::update::{ | ||||
|     IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst, | ||||
| @@ -259,21 +260,6 @@ where | ||||
|             .expect("Invalid document addition state") | ||||
|             .output_from_sorter(self.wtxn, &self.progress)?; | ||||
|  | ||||
|         let new_facets = output.compute_real_facets(self.wtxn, self.index)?; | ||||
|         self.index.put_faceted_fields(self.wtxn, &new_facets)?; | ||||
|  | ||||
|         // in case new fields were introduced we're going to recreate the searchable fields. | ||||
|         if let Some(faceted_fields) = self.index.user_defined_searchable_fields(self.wtxn)? { | ||||
|             // we can't keep references on the faceted fields while we update the index thus we need to own it. | ||||
|             let faceted_fields: Vec<String> = | ||||
|                 faceted_fields.into_iter().map(str::to_string).collect(); | ||||
|             self.index.put_all_searchable_fields_from_fields_ids_map( | ||||
|                 self.wtxn, | ||||
|                 &faceted_fields.iter().map(String::as_ref).collect::<Vec<_>>(), | ||||
|                 &output.fields_ids_map, | ||||
|             )?; | ||||
|         } | ||||
|  | ||||
|         let indexed_documents = output.documents_count as u64; | ||||
|         let number_of_documents = self.execute_raw(output)?; | ||||
|  | ||||
| @@ -296,32 +282,35 @@ where | ||||
|  | ||||
|         let TransformOutput { | ||||
|             primary_key, | ||||
|             fields_ids_map, | ||||
|             mut settings_diff, | ||||
|             field_distribution, | ||||
|             documents_count, | ||||
|             original_documents, | ||||
|             flattened_documents, | ||||
|         } = output; | ||||
|  | ||||
|         // The fields_ids_map is put back to the store now so the rest of the transaction sees an | ||||
|         // up to date field map. | ||||
|         self.index.put_fields_ids_map(self.wtxn, &fields_ids_map)?; | ||||
|         // update the internal facet and searchable list, | ||||
|         // because they might have changed due to the nested documents flattening. | ||||
|         settings_diff.new.recompute_facets(self.wtxn, self.index)?; | ||||
|         settings_diff.new.recompute_searchables(self.wtxn, self.index)?; | ||||
|  | ||||
|         let settings_diff = Arc::new(settings_diff); | ||||
|  | ||||
|         let backup_pool; | ||||
|         let pool = match self.indexer_config.thread_pool { | ||||
|             Some(ref pool) => pool, | ||||
|             #[cfg(not(test))] | ||||
|             None => { | ||||
|                 // We initialize a bakcup pool with the default | ||||
|                 // We initialize a backup pool with the default | ||||
|                 // settings if none have already been set. | ||||
|                 backup_pool = rayon::ThreadPoolBuilder::new().build()?; | ||||
|                 &backup_pool | ||||
|             } | ||||
|             #[cfg(test)] | ||||
|             None => { | ||||
|                 // We initialize a bakcup pool with the default | ||||
|                 // settings if none have already been set. | ||||
|                 backup_pool = rayon::ThreadPoolBuilder::new().num_threads(1).build()?; | ||||
|                 #[allow(unused_mut)] | ||||
|                 let mut pool_builder = ThreadPoolNoAbortBuilder::new(); | ||||
|  | ||||
|                 #[cfg(test)] | ||||
|                 { | ||||
|                     pool_builder = pool_builder.num_threads(1); | ||||
|                 } | ||||
|  | ||||
|                 backup_pool = pool_builder.build()?; | ||||
|                 &backup_pool | ||||
|             } | ||||
|         }; | ||||
| @@ -333,13 +322,8 @@ where | ||||
|         ) = crossbeam_channel::unbounded(); | ||||
|  | ||||
|         // get the primary key field id | ||||
|         let primary_key_id = fields_ids_map.id(&primary_key).unwrap(); | ||||
|         let primary_key_id = settings_diff.new.fields_ids_map.id(&primary_key).unwrap(); | ||||
|  | ||||
|         // get searchable fields for word databases | ||||
|         let searchable_fields = | ||||
|             self.index.searchable_fields_ids(self.wtxn)?.map(HashSet::from_iter); | ||||
|         // get filterable fields for facet databases | ||||
|         let faceted_fields = self.index.faceted_fields_ids(self.wtxn)?; | ||||
|         // get the fid of the `_geo.lat` and `_geo.lng` fields. | ||||
|         let mut field_id_map = self.index.fields_ids_map(self.wtxn)?; | ||||
|  | ||||
| @@ -362,12 +346,6 @@ where | ||||
|             None => None, | ||||
|         }; | ||||
|  | ||||
|         let stop_words = self.index.stop_words(self.wtxn)?; | ||||
|         let separators = self.index.allowed_separators(self.wtxn)?; | ||||
|         let dictionary = self.index.dictionary(self.wtxn)?; | ||||
|         let exact_attributes = self.index.exact_attributes_ids(self.wtxn)?; | ||||
|         let proximity_precision = self.index.proximity_precision(self.wtxn)?.unwrap_or_default(); | ||||
|  | ||||
|         let pool_params = GrenadParameters { | ||||
|             chunk_compression_type: self.indexer_config.chunk_compression_type, | ||||
|             chunk_compression_level: self.indexer_config.chunk_compression_level, | ||||
| @@ -400,8 +378,6 @@ where | ||||
|  | ||||
|         let max_positions_per_attributes = self.indexer_config.max_positions_per_attributes; | ||||
|  | ||||
|         let cloned_embedder = self.embedders.clone(); | ||||
|  | ||||
|         let mut final_documents_ids = RoaringBitmap::new(); | ||||
|         let mut databases_seen = 0; | ||||
|         let mut word_position_docids = None; | ||||
| @@ -410,7 +386,6 @@ where | ||||
|         let mut exact_word_docids = None; | ||||
|         let mut chunk_accumulator = ChunkAccumulator::default(); | ||||
|         let mut dimension = HashMap::new(); | ||||
|         let stop_words = stop_words.map(|sw| sw.map_data(Vec::from).unwrap()); | ||||
|  | ||||
|         let current_span = tracing::Span::current(); | ||||
|  | ||||
| @@ -428,10 +403,6 @@ where | ||||
|                 let flattened_chunk_iter = | ||||
|                     grenad_obkv_into_chunks(flattened_documents, pool_params, documents_chunk_size); | ||||
|  | ||||
|                 let separators: Option<Vec<_>> = | ||||
|                     separators.as_ref().map(|x| x.iter().map(String::as_str).collect()); | ||||
|                 let dictionary: Option<Vec<_>> = | ||||
|                     dictionary.as_ref().map(|x| x.iter().map(String::as_str).collect()); | ||||
|                 let result = original_chunk_iter.and_then(|original_chunk| { | ||||
|                     let flattened_chunk = flattened_chunk_iter?; | ||||
|                     // extract all databases from the chunked obkv douments | ||||
| @@ -440,18 +411,10 @@ where | ||||
|                         flattened_chunk, | ||||
|                         pool_params, | ||||
|                         lmdb_writer_sx.clone(), | ||||
|                         searchable_fields, | ||||
|                         faceted_fields, | ||||
|                         primary_key_id, | ||||
|                         geo_fields_ids, | ||||
|                         field_id_map, | ||||
|                         stop_words, | ||||
|                         separators.as_deref(), | ||||
|                         dictionary.as_deref(), | ||||
|                         settings_diff.clone(), | ||||
|                         max_positions_per_attributes, | ||||
|                         exact_attributes, | ||||
|                         proximity_precision, | ||||
|                         cloned_embedder, | ||||
|                     ) | ||||
|                 }); | ||||
|  | ||||
| @@ -571,7 +534,7 @@ where | ||||
|             } | ||||
|  | ||||
|             Ok(()) | ||||
|         })?; | ||||
|         }).map_err(InternalError::from)??; | ||||
|  | ||||
|         // We write the field distribution into the main database | ||||
|         self.index.put_field_distribution(self.wtxn, &field_distribution)?; | ||||
| @@ -600,7 +563,8 @@ where | ||||
|                     writer.build(wtxn, &mut rng, None)?; | ||||
|                 } | ||||
|                 Result::Ok(()) | ||||
|             })?; | ||||
|             }) | ||||
|             .map_err(InternalError::from)??; | ||||
|         } | ||||
|  | ||||
|         self.execute_prefix_databases( | ||||
|   | ||||
| @@ -1,12 +1,11 @@ | ||||
| use std::borrow::Cow; | ||||
| use std::collections::btree_map::Entry as BEntry; | ||||
| use std::collections::hash_map::Entry as HEntry; | ||||
| use std::collections::{HashMap, HashSet}; | ||||
| use std::collections::HashMap; | ||||
| use std::fs::File; | ||||
| use std::io::{Read, Seek}; | ||||
|  | ||||
| use fxhash::FxHashMap; | ||||
| use heed::RoTxn; | ||||
| use itertools::Itertools; | ||||
| use obkv::{KvReader, KvReaderU16, KvWriter}; | ||||
| use roaring::RoaringBitmap; | ||||
| @@ -21,14 +20,17 @@ use super::{IndexDocumentsMethod, IndexerConfig}; | ||||
| use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader}; | ||||
| use crate::error::{Error, InternalError, UserError}; | ||||
| use crate::index::{db_name, main_key}; | ||||
| use crate::update::del_add::{into_del_add_obkv, DelAdd, DelAddOperation, KvReaderDelAdd}; | ||||
| use crate::update::del_add::{ | ||||
|     del_add_from_two_obkvs, into_del_add_obkv, DelAdd, DelAddOperation, KvReaderDelAdd, | ||||
| }; | ||||
| use crate::update::index_documents::GrenadParameters; | ||||
| use crate::update::{AvailableDocumentsIds, ClearDocuments, UpdateIndexingStep}; | ||||
| use crate::update::settings::{InnerIndexSettings, InnerIndexSettingsDiff}; | ||||
| use crate::update::{AvailableDocumentsIds, UpdateIndexingStep}; | ||||
| use crate::{FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldsIdsMap, Index, Result}; | ||||
|  | ||||
| pub struct TransformOutput { | ||||
|     pub primary_key: String, | ||||
|     pub fields_ids_map: FieldsIdsMap, | ||||
|     pub settings_diff: InnerIndexSettingsDiff, | ||||
|     pub field_distribution: FieldDistribution, | ||||
|     pub documents_count: usize, | ||||
|     pub original_documents: File, | ||||
| @@ -282,7 +284,9 @@ impl<'a, 'i> Transform<'a, 'i> { | ||||
|                     self.original_sorter | ||||
|                         .insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?; | ||||
|                     let base_obkv = KvReader::new(base_obkv); | ||||
|                     if let Some(flattened_obkv) = self.flatten_from_fields_ids_map(base_obkv)? { | ||||
|                     if let Some(flattened_obkv) = | ||||
|                         Self::flatten_from_fields_ids_map(&base_obkv, &mut self.fields_ids_map)? | ||||
|                     { | ||||
|                         // we recreate our buffer with the flattened documents | ||||
|                         document_sorter_value_buffer.clear(); | ||||
|                         document_sorter_value_buffer.push(Operation::Addition as u8); | ||||
| @@ -315,7 +319,9 @@ impl<'a, 'i> Transform<'a, 'i> { | ||||
|                     .insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?; | ||||
|  | ||||
|                 let flattened_obkv = KvReader::new(&obkv_buffer); | ||||
|                 if let Some(obkv) = self.flatten_from_fields_ids_map(flattened_obkv)? { | ||||
|                 if let Some(obkv) = | ||||
|                     Self::flatten_from_fields_ids_map(&flattened_obkv, &mut self.fields_ids_map)? | ||||
|                 { | ||||
|                     document_sorter_value_buffer.clear(); | ||||
|                     document_sorter_value_buffer.push(Operation::Addition as u8); | ||||
|                     into_del_add_obkv( | ||||
| @@ -524,7 +530,9 @@ impl<'a, 'i> Transform<'a, 'i> { | ||||
|  | ||||
|         // flatten it and push it as to delete in the flattened_sorter | ||||
|         let flattened_obkv = KvReader::new(base_obkv); | ||||
|         if let Some(obkv) = self.flatten_from_fields_ids_map(flattened_obkv)? { | ||||
|         if let Some(obkv) = | ||||
|             Self::flatten_from_fields_ids_map(&flattened_obkv, &mut self.fields_ids_map)? | ||||
|         { | ||||
|             // we recreate our buffer with the flattened documents | ||||
|             document_sorter_value_buffer.clear(); | ||||
|             document_sorter_value_buffer.push(Operation::Deletion as u8); | ||||
| @@ -541,8 +549,15 @@ impl<'a, 'i> Transform<'a, 'i> { | ||||
|  | ||||
|     // Flatten a document from the fields ids map contained in self and insert the new | ||||
|     // created fields. Returns `None` if the document doesn't need to be flattened. | ||||
|     #[tracing::instrument(level = "trace", skip(self, obkv), target = "indexing::transform")] | ||||
|     fn flatten_from_fields_ids_map(&mut self, obkv: KvReader<FieldId>) -> Result<Option<Vec<u8>>> { | ||||
|     #[tracing::instrument( | ||||
|         level = "trace", | ||||
|         skip(obkv, fields_ids_map), | ||||
|         target = "indexing::transform" | ||||
|     )] | ||||
|     fn flatten_from_fields_ids_map( | ||||
|         obkv: &KvReader<FieldId>, | ||||
|         fields_ids_map: &mut FieldsIdsMap, | ||||
|     ) -> Result<Option<Vec<u8>>> { | ||||
|         if obkv | ||||
|             .iter() | ||||
|             .all(|(_, value)| !json_depth_checker::should_flatten_from_unchecked_slice(value)) | ||||
| @@ -563,7 +578,7 @@ impl<'a, 'i> Transform<'a, 'i> { | ||||
|         // all the raw values get inserted directly in the `key_value` vec. | ||||
|         for (key, value) in obkv.iter() { | ||||
|             if json_depth_checker::should_flatten_from_unchecked_slice(value) { | ||||
|                 let key = self.fields_ids_map.name(key).ok_or(FieldIdMapMissingEntry::FieldId { | ||||
|                 let key = fields_ids_map.name(key).ok_or(FieldIdMapMissingEntry::FieldId { | ||||
|                     field_id: key, | ||||
|                     process: "Flatten from fields ids map.", | ||||
|                 })?; | ||||
| @@ -581,7 +596,7 @@ impl<'a, 'i> Transform<'a, 'i> { | ||||
|         // Once we have the flattened version we insert all the new generated fields_ids | ||||
|         // (if any) in the fields ids map and serialize the value. | ||||
|         for (key, value) in flattened.into_iter() { | ||||
|             let fid = self.fields_ids_map.insert(&key).ok_or(UserError::AttributeLimitReached)?; | ||||
|             let fid = fields_ids_map.insert(&key).ok_or(UserError::AttributeLimitReached)?; | ||||
|             let value = serde_json::to_vec(&value).map_err(InternalError::SerdeJson)?; | ||||
|             key_value.push((fid, value.into())); | ||||
|         } | ||||
| @@ -792,9 +807,19 @@ impl<'a, 'i> Transform<'a, 'i> { | ||||
|             fst_new_external_documents_ids_builder.insert(key, value) | ||||
|         })?; | ||||
|  | ||||
|         let old_inner_settings = InnerIndexSettings::from_index(self.index, wtxn)?; | ||||
|         let mut new_inner_settings = old_inner_settings.clone(); | ||||
|         new_inner_settings.fields_ids_map = self.fields_ids_map; | ||||
|         let settings_diff = InnerIndexSettingsDiff { | ||||
|             old: old_inner_settings, | ||||
|             new: new_inner_settings, | ||||
|             embedding_configs_updated: false, | ||||
|             settings_update_only: false, | ||||
|         }; | ||||
|  | ||||
|         Ok(TransformOutput { | ||||
|             primary_key, | ||||
|             fields_ids_map: self.fields_ids_map, | ||||
|             settings_diff, | ||||
|             field_distribution, | ||||
|             documents_count: self.documents_count, | ||||
|             original_documents: original_documents.into_inner().map_err(|err| err.into_error())?, | ||||
| @@ -804,6 +829,44 @@ impl<'a, 'i> Transform<'a, 'i> { | ||||
|         }) | ||||
|     } | ||||
|  | ||||
|     /// Rebind the field_ids of the provided document to their values | ||||
|     /// based on the field_ids_maps difference between the old and the new settings, | ||||
|     /// then fill the provided buffers with delta documents using KvWritterDelAdd. | ||||
|     fn rebind_existing_document( | ||||
|         old_obkv: KvReader<FieldId>, | ||||
|         settings_diff: &InnerIndexSettingsDiff, | ||||
|         original_obkv_buffer: &mut Vec<u8>, | ||||
|         flattened_obkv_buffer: &mut Vec<u8>, | ||||
|     ) -> Result<()> { | ||||
|         let mut old_fields_ids_map = settings_diff.old.fields_ids_map.clone(); | ||||
|         let mut new_fields_ids_map = settings_diff.new.fields_ids_map.clone(); | ||||
|         let mut obkv_writer = KvWriter::<_, FieldId>::memory(); | ||||
|         // We iterate over the new `FieldsIdsMap` ids in order and construct the new obkv. | ||||
|         for (id, name) in new_fields_ids_map.iter() { | ||||
|             if let Some(val) = old_fields_ids_map.id(name).and_then(|id| old_obkv.get(id)) { | ||||
|                 obkv_writer.insert(id, val)?; | ||||
|             } | ||||
|         } | ||||
|         let data = obkv_writer.into_inner()?; | ||||
|         let new_obkv = KvReader::<FieldId>::new(&data); | ||||
|  | ||||
|         // take the non-flattened version if flatten_from_fields_ids_map returns None. | ||||
|         let old_flattened = Self::flatten_from_fields_ids_map(&old_obkv, &mut old_fields_ids_map)?; | ||||
|         let old_flattened = | ||||
|             old_flattened.as_deref().map_or_else(|| old_obkv, KvReader::<FieldId>::new); | ||||
|         let new_flattened = Self::flatten_from_fields_ids_map(&new_obkv, &mut new_fields_ids_map)?; | ||||
|         let new_flattened = | ||||
|             new_flattened.as_deref().map_or_else(|| new_obkv, KvReader::<FieldId>::new); | ||||
|  | ||||
|         original_obkv_buffer.clear(); | ||||
|         flattened_obkv_buffer.clear(); | ||||
|  | ||||
|         del_add_from_two_obkvs(&old_obkv, &new_obkv, original_obkv_buffer)?; | ||||
|         del_add_from_two_obkvs(&old_flattened, &new_flattened, flattened_obkv_buffer)?; | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     /// Clear all databases. Returns a `TransformOutput` with a file that contains the documents | ||||
|     /// of the index with the attributes reordered accordingly to the `FieldsIdsMap` given as argument. | ||||
|     /// | ||||
| @@ -811,8 +874,7 @@ impl<'a, 'i> Transform<'a, 'i> { | ||||
|     pub fn prepare_for_documents_reindexing( | ||||
|         self, | ||||
|         wtxn: &mut heed::RwTxn<'i>, | ||||
|         old_fields_ids_map: FieldsIdsMap, | ||||
|         mut new_fields_ids_map: FieldsIdsMap, | ||||
|         settings_diff: InnerIndexSettingsDiff, | ||||
|     ) -> Result<TransformOutput> { | ||||
|         // There already has been a document addition, the primary key should be set by now. | ||||
|         let primary_key = self | ||||
| @@ -848,78 +910,27 @@ impl<'a, 'i> Transform<'a, 'i> { | ||||
|             self.indexer_settings.max_memory.map(|mem| mem / 2), | ||||
|         ); | ||||
|  | ||||
|         let mut obkv_buffer = Vec::new(); | ||||
|         let mut original_obkv_buffer = Vec::new(); | ||||
|         let mut flattened_obkv_buffer = Vec::new(); | ||||
|         let mut document_sorter_key_buffer = Vec::new(); | ||||
|         let mut document_sorter_value_buffer = Vec::new(); | ||||
|         for result in self.index.external_documents_ids().iter(wtxn)? { | ||||
|             let (external_id, docid) = result?; | ||||
|             let obkv = self.index.documents.get(wtxn, &docid)?.ok_or( | ||||
|             let old_obkv = self.index.documents.get(wtxn, &docid)?.ok_or( | ||||
|                 InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None }, | ||||
|             )?; | ||||
|  | ||||
|             obkv_buffer.clear(); | ||||
|             let mut obkv_writer = KvWriter::<_, FieldId>::new(&mut obkv_buffer); | ||||
|  | ||||
|             // We iterate over the new `FieldsIdsMap` ids in order and construct the new obkv. | ||||
|             for (id, name) in new_fields_ids_map.iter() { | ||||
|                 if let Some(val) = old_fields_ids_map.id(name).and_then(|id| obkv.get(id)) { | ||||
|                     obkv_writer.insert(id, val)?; | ||||
|                 } | ||||
|             } | ||||
|  | ||||
|             let buffer = obkv_writer.into_inner()?; | ||||
|             Self::rebind_existing_document( | ||||
|                 old_obkv, | ||||
|                 &settings_diff, | ||||
|                 &mut original_obkv_buffer, | ||||
|                 &mut flattened_obkv_buffer, | ||||
|             )?; | ||||
|  | ||||
|             document_sorter_key_buffer.clear(); | ||||
|             document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes()); | ||||
|             document_sorter_key_buffer.extend_from_slice(external_id.as_bytes()); | ||||
|             document_sorter_value_buffer.clear(); | ||||
|             into_del_add_obkv( | ||||
|                 KvReaderU16::new(buffer), | ||||
|                 DelAddOperation::Addition, | ||||
|                 &mut document_sorter_value_buffer, | ||||
|             )?; | ||||
|             original_sorter.insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?; | ||||
|  | ||||
|             // Once we have the document. We're going to flatten it | ||||
|             // and insert it in the flattened sorter. | ||||
|             let mut doc = serde_json::Map::new(); | ||||
|  | ||||
|             let reader = obkv::KvReader::new(buffer); | ||||
|             for (k, v) in reader.iter() { | ||||
|                 let key = new_fields_ids_map.name(k).ok_or(FieldIdMapMissingEntry::FieldId { | ||||
|                     field_id: k, | ||||
|                     process: "Accessing field distribution in transform.", | ||||
|                 })?; | ||||
|                 let value = serde_json::from_slice::<serde_json::Value>(v) | ||||
|                     .map_err(InternalError::SerdeJson)?; | ||||
|                 doc.insert(key.to_string(), value); | ||||
|             } | ||||
|  | ||||
|             let flattened = flatten_serde_json::flatten(&doc); | ||||
|  | ||||
|             // Once we have the flattened version we can convert it back to obkv and | ||||
|             // insert all the new generated fields_ids (if any) in the fields ids map. | ||||
|             let mut buffer: Vec<u8> = Vec::new(); | ||||
|             let mut writer = KvWriter::new(&mut buffer); | ||||
|             let mut flattened: Vec<_> = flattened.into_iter().collect(); | ||||
|             // we reorder the field to get all the known field first | ||||
|             flattened.sort_unstable_by_key(|(key, _)| { | ||||
|                 new_fields_ids_map.id(key).unwrap_or(FieldId::MAX) | ||||
|             }); | ||||
|  | ||||
|             for (key, value) in flattened { | ||||
|                 let fid = | ||||
|                     new_fields_ids_map.insert(&key).ok_or(UserError::AttributeLimitReached)?; | ||||
|                 let value = serde_json::to_vec(&value).map_err(InternalError::SerdeJson)?; | ||||
|                 writer.insert(fid, &value)?; | ||||
|             } | ||||
|             document_sorter_value_buffer.clear(); | ||||
|             into_del_add_obkv( | ||||
|                 KvReaderU16::new(&buffer), | ||||
|                 DelAddOperation::Addition, | ||||
|                 &mut document_sorter_value_buffer, | ||||
|             )?; | ||||
|             flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_value_buffer)?; | ||||
|             original_sorter.insert(&document_sorter_key_buffer, &original_obkv_buffer)?; | ||||
|             flattened_sorter.insert(docid.to_be_bytes(), &flattened_obkv_buffer)?; | ||||
|         } | ||||
|  | ||||
|         let grenad_params = GrenadParameters { | ||||
| @@ -934,22 +945,14 @@ impl<'a, 'i> Transform<'a, 'i> { | ||||
|  | ||||
|         let flattened_documents = sorter_into_reader(flattened_sorter, grenad_params)?; | ||||
|  | ||||
|         let output = TransformOutput { | ||||
|         Ok(TransformOutput { | ||||
|             primary_key, | ||||
|             fields_ids_map: new_fields_ids_map, | ||||
|             field_distribution, | ||||
|             settings_diff, | ||||
|             documents_count, | ||||
|             original_documents: original_documents.into_inner().into_inner(), | ||||
|             flattened_documents: flattened_documents.into_inner().into_inner(), | ||||
|         }; | ||||
|  | ||||
|         let new_facets = output.compute_real_facets(wtxn, self.index)?; | ||||
|         self.index.put_faceted_fields(wtxn, &new_facets)?; | ||||
|  | ||||
|         // We clear the full database (words-fst, documents ids and documents content). | ||||
|         ClearDocuments::new(wtxn, self.index).execute()?; | ||||
|  | ||||
|         Ok(output) | ||||
|         }) | ||||
|     } | ||||
| } | ||||
|  | ||||
| @@ -964,20 +967,6 @@ fn drop_and_reuse<U, T>(mut vec: Vec<U>) -> Vec<T> { | ||||
|     vec.into_iter().map(|_| unreachable!()).collect() | ||||
| } | ||||
|  | ||||
| impl TransformOutput { | ||||
|     // find and insert the new field ids | ||||
|     pub fn compute_real_facets(&self, rtxn: &RoTxn, index: &Index) -> Result<HashSet<String>> { | ||||
|         let user_defined_facets = index.user_defined_faceted_fields(rtxn)?; | ||||
|  | ||||
|         Ok(self | ||||
|             .fields_ids_map | ||||
|             .names() | ||||
|             .filter(|&field| crate::is_faceted(field, &user_defined_facets)) | ||||
|             .map(|field| field.to_string()) | ||||
|             .collect()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[cfg(test)] | ||||
| mod test { | ||||
|     use super::*; | ||||
|   | ||||
| @@ -1,5 +1,6 @@ | ||||
| use grenad::CompressionType; | ||||
| use rayon::ThreadPool; | ||||
|  | ||||
| use crate::thread_pool_no_abort::ThreadPoolNoAbort; | ||||
|  | ||||
| #[derive(Debug)] | ||||
| pub struct IndexerConfig { | ||||
| @@ -9,7 +10,7 @@ pub struct IndexerConfig { | ||||
|     pub max_memory: Option<usize>, | ||||
|     pub chunk_compression_type: CompressionType, | ||||
|     pub chunk_compression_level: Option<u32>, | ||||
|     pub thread_pool: Option<ThreadPool>, | ||||
|     pub thread_pool: Option<ThreadPoolNoAbort>, | ||||
|     pub max_positions_per_attributes: Option<u32>, | ||||
|     pub skip_index_budget: bool, | ||||
| } | ||||
|   | ||||
| @@ -20,7 +20,7 @@ use crate::update::index_documents::IndexDocumentsMethod; | ||||
| use crate::update::{IndexDocuments, UpdateIndexingStep}; | ||||
| use crate::vector::settings::{check_set, check_unset, EmbedderSource, EmbeddingSettings}; | ||||
| use crate::vector::{Embedder, EmbeddingConfig, EmbeddingConfigs}; | ||||
| use crate::{FieldsIdsMap, Index, Result}; | ||||
| use crate::{FieldId, FieldsIdsMap, Index, Result}; | ||||
|  | ||||
| #[derive(Debug, Clone, PartialEq, Eq, Copy)] | ||||
| pub enum Setting<T> { | ||||
| @@ -385,14 +385,14 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> { | ||||
|  | ||||
|     #[tracing::instrument( | ||||
|         level = "trace" | ||||
|         skip(self, progress_callback, should_abort, old_fields_ids_map), | ||||
|         skip(self, progress_callback, should_abort, settings_diff), | ||||
|         target = "indexing::documents" | ||||
|     )] | ||||
|     fn reindex<FP, FA>( | ||||
|         &mut self, | ||||
|         progress_callback: &FP, | ||||
|         should_abort: &FA, | ||||
|         old_fields_ids_map: FieldsIdsMap, | ||||
|         settings_diff: InnerIndexSettingsDiff, | ||||
|     ) -> Result<()> | ||||
|     where | ||||
|         FP: Fn(UpdateIndexingStep) + Sync, | ||||
| @@ -400,7 +400,6 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> { | ||||
|     { | ||||
|         puffin::profile_function!(); | ||||
|  | ||||
|         let fields_ids_map = self.index.fields_ids_map(self.wtxn)?; | ||||
|         // if the settings are set before any document update, we don't need to do anything, and | ||||
|         // will set the primary key during the first document addition. | ||||
|         if self.index.number_of_documents(self.wtxn)? == 0 { | ||||
| @@ -416,14 +415,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> { | ||||
|         )?; | ||||
|  | ||||
|         // We clear the databases and remap the documents fields based on the new `FieldsIdsMap`. | ||||
|         let output = transform.prepare_for_documents_reindexing( | ||||
|             self.wtxn, | ||||
|             old_fields_ids_map, | ||||
|             fields_ids_map, | ||||
|         )?; | ||||
|  | ||||
|         let embedder_configs = self.index.embedding_configs(self.wtxn)?; | ||||
|         let embedders = self.embedders(embedder_configs)?; | ||||
|         let output = transform.prepare_for_documents_reindexing(self.wtxn, settings_diff)?; | ||||
|  | ||||
|         // We index the generated `TransformOutput` which must contain | ||||
|         // all the documents with fields in the newly defined searchable order. | ||||
| @@ -436,32 +428,11 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> { | ||||
|             &should_abort, | ||||
|         )?; | ||||
|  | ||||
|         let indexing_builder = indexing_builder.with_embedders(embedders); | ||||
|         indexing_builder.execute_raw(output)?; | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     fn embedders( | ||||
|         &self, | ||||
|         embedding_configs: Vec<(String, EmbeddingConfig)>, | ||||
|     ) -> Result<EmbeddingConfigs> { | ||||
|         let res: Result<_> = embedding_configs | ||||
|             .into_iter() | ||||
|             .map(|(name, EmbeddingConfig { embedder_options, prompt })| { | ||||
|                 let prompt = Arc::new(prompt.try_into().map_err(crate::Error::from)?); | ||||
|  | ||||
|                 let embedder = Arc::new( | ||||
|                     Embedder::new(embedder_options.clone()) | ||||
|                         .map_err(crate::vector::Error::from) | ||||
|                         .map_err(crate::Error::from)?, | ||||
|                 ); | ||||
|                 Ok((name, (embedder, prompt))) | ||||
|             }) | ||||
|             .collect(); | ||||
|         res.map(EmbeddingConfigs::new) | ||||
|     } | ||||
|  | ||||
|     fn update_displayed(&mut self) -> Result<bool> { | ||||
|         match self.displayed_fields { | ||||
|             Setting::Set(ref fields) => { | ||||
| @@ -1038,6 +1009,13 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> { | ||||
|             } | ||||
|             Setting::NotSet => false, | ||||
|         }; | ||||
|  | ||||
|         // if any changes force a reindexing | ||||
|         // clear the vector database. | ||||
|         if update { | ||||
|             self.index.vector_arroy.clear(self.wtxn)?; | ||||
|         } | ||||
|  | ||||
|         Ok(update) | ||||
|     } | ||||
|  | ||||
| @@ -1066,20 +1044,10 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> { | ||||
|     { | ||||
|         self.index.set_updated_at(self.wtxn, &OffsetDateTime::now_utc())?; | ||||
|  | ||||
|         // Note: this MUST be before `update_sortable` so that we can get the old value to compare with the updated value afterwards | ||||
|  | ||||
|         let existing_fields: HashSet<_> = self | ||||
|             .index | ||||
|             .field_distribution(self.wtxn)? | ||||
|             .into_iter() | ||||
|             .filter_map(|(field, count)| (count != 0).then_some(field)) | ||||
|             .collect(); | ||||
|         let old_faceted_fields = self.index.user_defined_faceted_fields(self.wtxn)?; | ||||
|         let old_fields_ids_map = self.index.fields_ids_map(self.wtxn)?; | ||||
|         let old_inner_settings = InnerIndexSettings::from_index(self.index, self.wtxn)?; | ||||
|  | ||||
|         // never trigger re-indexing | ||||
|         self.update_displayed()?; | ||||
|         self.update_filterable()?; | ||||
|         self.update_sortable()?; | ||||
|         self.update_distinct_field()?; | ||||
|         self.update_criteria()?; | ||||
|         self.update_primary_key()?; | ||||
| @@ -1089,16 +1057,19 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> { | ||||
|         self.update_max_values_per_facet()?; | ||||
|         self.update_sort_facet_values_by()?; | ||||
|         self.update_pagination_max_total_hits()?; | ||||
|         self.update_search_cutoff()?; | ||||
|  | ||||
|         let faceted_updated = self.update_faceted(existing_fields, old_faceted_fields)?; | ||||
|         let stop_words_updated = self.update_stop_words()?; | ||||
|         let non_separator_tokens_updated = self.update_non_separator_tokens()?; | ||||
|         let separator_tokens_updated = self.update_separator_tokens()?; | ||||
|         let dictionary_updated = self.update_dictionary()?; | ||||
|         let synonyms_updated = self.update_synonyms()?; | ||||
|         let searchable_updated = self.update_searchable()?; | ||||
|         let exact_attributes_updated = self.update_exact_attributes()?; | ||||
|         let proximity_precision = self.update_proximity_precision()?; | ||||
|         // could trigger re-indexing | ||||
|         self.update_filterable()?; | ||||
|         self.update_sortable()?; | ||||
|         self.update_stop_words()?; | ||||
|         self.update_non_separator_tokens()?; | ||||
|         self.update_separator_tokens()?; | ||||
|         self.update_dictionary()?; | ||||
|         self.update_synonyms()?; | ||||
|         self.update_searchable()?; | ||||
|         self.update_exact_attributes()?; | ||||
|         self.update_proximity_precision()?; | ||||
|         // TODO: very rough approximation of the needs for reindexing where any change will result in | ||||
|         // a full reindexing. | ||||
|         // What can be done instead: | ||||
| @@ -1107,53 +1078,193 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> { | ||||
|         // 3. Keep the old vectors but reattempt indexing on a prompt change: only actually changed prompt will need embedding + storage | ||||
|         let embedding_configs_updated = self.update_embedding_configs()?; | ||||
|  | ||||
|         // never trigger re-indexing | ||||
|         self.update_search_cutoff()?; | ||||
|         let new_inner_settings = InnerIndexSettings::from_index(self.index, self.wtxn)?; | ||||
|         let inner_settings_diff = InnerIndexSettingsDiff { | ||||
|             old: old_inner_settings, | ||||
|             new: new_inner_settings, | ||||
|             embedding_configs_updated, | ||||
|             settings_update_only: true, | ||||
|         }; | ||||
|  | ||||
|         if stop_words_updated | ||||
|             || non_separator_tokens_updated | ||||
|             || separator_tokens_updated | ||||
|             || dictionary_updated | ||||
|             || faceted_updated | ||||
|             || synonyms_updated | ||||
|             || searchable_updated | ||||
|             || exact_attributes_updated | ||||
|             || proximity_precision | ||||
|             || embedding_configs_updated | ||||
|         { | ||||
|             self.reindex(&progress_callback, &should_abort, old_fields_ids_map)?; | ||||
|         if inner_settings_diff.any_reindexing_needed() { | ||||
|             self.reindex(&progress_callback, &should_abort, inner_settings_diff)?; | ||||
|         } | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
|  | ||||
|     fn update_faceted( | ||||
|         &self, | ||||
|         existing_fields: HashSet<String>, | ||||
|         old_faceted_fields: HashSet<String>, | ||||
|     ) -> Result<bool> { | ||||
| pub struct InnerIndexSettingsDiff { | ||||
|     pub(crate) old: InnerIndexSettings, | ||||
|     pub(crate) new: InnerIndexSettings, | ||||
|  | ||||
|     // TODO: compare directly the embedders. | ||||
|     pub(crate) embedding_configs_updated: bool, | ||||
|  | ||||
|     pub(crate) settings_update_only: bool, | ||||
| } | ||||
|  | ||||
| impl InnerIndexSettingsDiff { | ||||
|     pub fn any_reindexing_needed(&self) -> bool { | ||||
|         self.reindex_searchable() || self.reindex_facets() || self.reindex_vectors() | ||||
|     } | ||||
|  | ||||
|     pub fn reindex_searchable(&self) -> bool { | ||||
|         self.old | ||||
|             .fields_ids_map | ||||
|             .iter() | ||||
|             .zip(self.new.fields_ids_map.iter()) | ||||
|             .any(|(old, new)| old != new) | ||||
|             || self.old.stop_words.as_ref().map(|set| set.as_fst().as_bytes()) | ||||
|                 != self.new.stop_words.as_ref().map(|set| set.as_fst().as_bytes()) | ||||
|             || self.old.allowed_separators != self.new.allowed_separators | ||||
|             || self.old.dictionary != self.new.dictionary | ||||
|             || self.old.user_defined_searchable_fields != self.new.user_defined_searchable_fields | ||||
|             || self.old.exact_attributes != self.new.exact_attributes | ||||
|             || self.old.proximity_precision != self.new.proximity_precision | ||||
|     } | ||||
|  | ||||
|     pub fn reindex_facets(&self) -> bool { | ||||
|         let existing_fields = &self.new.existing_fields; | ||||
|         if existing_fields.iter().any(|field| field.contains('.')) { | ||||
|             return Ok(true); | ||||
|             return true; | ||||
|         } | ||||
|  | ||||
|         let old_faceted_fields = &self.old.user_defined_faceted_fields; | ||||
|         if old_faceted_fields.iter().any(|field| field.contains('.')) { | ||||
|             return Ok(true); | ||||
|             return true; | ||||
|         } | ||||
|  | ||||
|         // If there is new faceted fields we indicate that we must reindex as we must | ||||
|         // index new fields as facets. It means that the distinct attribute, | ||||
|         // an Asc/Desc criterion or a filtered attribute as be added or removed. | ||||
|         let new_faceted_fields = self.index.user_defined_faceted_fields(self.wtxn)?; | ||||
|  | ||||
|         let new_faceted_fields = &self.new.user_defined_faceted_fields; | ||||
|         if new_faceted_fields.iter().any(|field| field.contains('.')) { | ||||
|             return Ok(true); | ||||
|             return true; | ||||
|         } | ||||
|  | ||||
|         let faceted_updated = | ||||
|             (&existing_fields - &old_faceted_fields) != (&existing_fields - &new_faceted_fields); | ||||
|             (existing_fields - old_faceted_fields) != (existing_fields - new_faceted_fields); | ||||
|  | ||||
|         Ok(faceted_updated) | ||||
|         self.old | ||||
|             .fields_ids_map | ||||
|             .iter() | ||||
|             .zip(self.new.fields_ids_map.iter()) | ||||
|             .any(|(old, new)| old != new) | ||||
|             || faceted_updated | ||||
|     } | ||||
|  | ||||
|     pub fn reindex_vectors(&self) -> bool { | ||||
|         self.embedding_configs_updated | ||||
|     } | ||||
|  | ||||
|     pub fn settings_update_only(&self) -> bool { | ||||
|         self.settings_update_only | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[derive(Clone)] | ||||
| pub(crate) struct InnerIndexSettings { | ||||
|     pub stop_words: Option<fst::Set<Vec<u8>>>, | ||||
|     pub allowed_separators: Option<BTreeSet<String>>, | ||||
|     pub dictionary: Option<BTreeSet<String>>, | ||||
|     pub fields_ids_map: FieldsIdsMap, | ||||
|     pub user_defined_faceted_fields: HashSet<String>, | ||||
|     pub user_defined_searchable_fields: Option<Vec<String>>, | ||||
|     pub faceted_fields_ids: HashSet<FieldId>, | ||||
|     pub searchable_fields_ids: Option<Vec<FieldId>>, | ||||
|     pub exact_attributes: HashSet<FieldId>, | ||||
|     pub proximity_precision: ProximityPrecision, | ||||
|     pub embedding_configs: EmbeddingConfigs, | ||||
|     pub existing_fields: HashSet<String>, | ||||
| } | ||||
|  | ||||
| impl InnerIndexSettings { | ||||
|     pub fn from_index(index: &Index, rtxn: &heed::RoTxn) -> Result<Self> { | ||||
|         let stop_words = index.stop_words(rtxn)?; | ||||
|         let stop_words = stop_words.map(|sw| sw.map_data(Vec::from).unwrap()); | ||||
|         let allowed_separators = index.allowed_separators(rtxn)?; | ||||
|         let dictionary = index.dictionary(rtxn)?; | ||||
|         let fields_ids_map = index.fields_ids_map(rtxn)?; | ||||
|         let user_defined_searchable_fields = index.user_defined_searchable_fields(rtxn)?; | ||||
|         let user_defined_searchable_fields = | ||||
|             user_defined_searchable_fields.map(|sf| sf.into_iter().map(String::from).collect()); | ||||
|         let user_defined_faceted_fields = index.user_defined_faceted_fields(rtxn)?; | ||||
|         let searchable_fields_ids = index.searchable_fields_ids(rtxn)?; | ||||
|         let faceted_fields_ids = index.faceted_fields_ids(rtxn)?; | ||||
|         let exact_attributes = index.exact_attributes_ids(rtxn)?; | ||||
|         let proximity_precision = index.proximity_precision(rtxn)?.unwrap_or_default(); | ||||
|         let embedding_configs = embedders(index.embedding_configs(rtxn)?)?; | ||||
|         let existing_fields: HashSet<_> = index | ||||
|             .field_distribution(rtxn)? | ||||
|             .into_iter() | ||||
|             .filter_map(|(field, count)| (count != 0).then_some(field)) | ||||
|             .collect(); | ||||
|  | ||||
|         Ok(Self { | ||||
|             stop_words, | ||||
|             allowed_separators, | ||||
|             dictionary, | ||||
|             fields_ids_map, | ||||
|             user_defined_faceted_fields, | ||||
|             user_defined_searchable_fields, | ||||
|             faceted_fields_ids, | ||||
|             searchable_fields_ids, | ||||
|             exact_attributes, | ||||
|             proximity_precision, | ||||
|             embedding_configs, | ||||
|             existing_fields, | ||||
|         }) | ||||
|     } | ||||
|  | ||||
|     // find and insert the new field ids | ||||
|     pub fn recompute_facets(&mut self, wtxn: &mut heed::RwTxn, index: &Index) -> Result<()> { | ||||
|         let new_facets = self | ||||
|             .fields_ids_map | ||||
|             .names() | ||||
|             .filter(|&field| crate::is_faceted(field, &self.user_defined_faceted_fields)) | ||||
|             .map(|field| field.to_string()) | ||||
|             .collect(); | ||||
|         index.put_faceted_fields(wtxn, &new_facets)?; | ||||
|  | ||||
|         self.faceted_fields_ids = index.faceted_fields_ids(wtxn)?; | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     // find and insert the new field ids | ||||
|     pub fn recompute_searchables(&mut self, wtxn: &mut heed::RwTxn, index: &Index) -> Result<()> { | ||||
|         // in case new fields were introduced we're going to recreate the searchable fields. | ||||
|         if let Some(searchable_fields) = self.user_defined_searchable_fields.as_ref() { | ||||
|             let searchable_fields = | ||||
|                 searchable_fields.iter().map(String::as_ref).collect::<Vec<_>>(); | ||||
|             index.put_all_searchable_fields_from_fields_ids_map( | ||||
|                 wtxn, | ||||
|                 &searchable_fields, | ||||
|                 &self.fields_ids_map, | ||||
|             )?; | ||||
|             let searchable_fields_ids = index.searchable_fields_ids(wtxn)?; | ||||
|             self.searchable_fields_ids = searchable_fields_ids; | ||||
|         } | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| fn embedders(embedding_configs: Vec<(String, EmbeddingConfig)>) -> Result<EmbeddingConfigs> { | ||||
|     let res: Result<_> = embedding_configs | ||||
|         .into_iter() | ||||
|         .map(|(name, EmbeddingConfig { embedder_options, prompt })| { | ||||
|             let prompt = Arc::new(prompt.try_into().map_err(crate::Error::from)?); | ||||
|  | ||||
|             let embedder = Arc::new( | ||||
|                 Embedder::new(embedder_options.clone()) | ||||
|                     .map_err(crate::vector::Error::from) | ||||
|                     .map_err(crate::Error::from)?, | ||||
|             ); | ||||
|             Ok((name, (embedder, prompt))) | ||||
|         }) | ||||
|         .collect(); | ||||
|     res.map(EmbeddingConfigs::new) | ||||
| } | ||||
|  | ||||
| fn validate_prompt( | ||||
| @@ -1643,6 +1754,70 @@ mod tests { | ||||
|             .unwrap() | ||||
|             .count(); | ||||
|         assert_eq!(count, 4); | ||||
|  | ||||
|         // Set the filterable fields to be the age and the name. | ||||
|         index | ||||
|             .update_settings(|settings| { | ||||
|                 settings.set_filterable_fields(hashset! { S("age"),  S("name") }); | ||||
|             }) | ||||
|             .unwrap(); | ||||
|  | ||||
|         // Check that the displayed fields are correctly set. | ||||
|         let rtxn = index.read_txn().unwrap(); | ||||
|         let fields_ids = index.filterable_fields(&rtxn).unwrap(); | ||||
|         assert_eq!(fields_ids, hashset! { S("age"),  S("name") }); | ||||
|  | ||||
|         let rtxn = index.read_txn().unwrap(); | ||||
|         // Only count the field_id 0 and level 0 facet values. | ||||
|         let count = index | ||||
|             .facet_id_f64_docids | ||||
|             .remap_key_type::<Bytes>() | ||||
|             .prefix_iter(&rtxn, &[0, 1, 0]) | ||||
|             .unwrap() | ||||
|             .count(); | ||||
|         assert_eq!(count, 4); | ||||
|  | ||||
|         let rtxn = index.read_txn().unwrap(); | ||||
|         // Only count the field_id 0 and level 0 facet values. | ||||
|         let count = index | ||||
|             .facet_id_string_docids | ||||
|             .remap_key_type::<Bytes>() | ||||
|             .prefix_iter(&rtxn, &[0, 0]) | ||||
|             .unwrap() | ||||
|             .count(); | ||||
|         assert_eq!(count, 5); | ||||
|  | ||||
|         // Remove the age from the filterable fields. | ||||
|         index | ||||
|             .update_settings(|settings| { | ||||
|                 settings.set_filterable_fields(hashset! { S("name") }); | ||||
|             }) | ||||
|             .unwrap(); | ||||
|  | ||||
|         // Check that the displayed fields are correctly set. | ||||
|         let rtxn = index.read_txn().unwrap(); | ||||
|         let fields_ids = index.filterable_fields(&rtxn).unwrap(); | ||||
|         assert_eq!(fields_ids, hashset! { S("name") }); | ||||
|  | ||||
|         let rtxn = index.read_txn().unwrap(); | ||||
|         // Only count the field_id 0 and level 0 facet values. | ||||
|         let count = index | ||||
|             .facet_id_f64_docids | ||||
|             .remap_key_type::<Bytes>() | ||||
|             .prefix_iter(&rtxn, &[0, 1, 0]) | ||||
|             .unwrap() | ||||
|             .count(); | ||||
|         assert_eq!(count, 0); | ||||
|  | ||||
|         let rtxn = index.read_txn().unwrap(); | ||||
|         // Only count the field_id 0 and level 0 facet values. | ||||
|         let count = index | ||||
|             .facet_id_string_docids | ||||
|             .remap_key_type::<Bytes>() | ||||
|             .prefix_iter(&rtxn, &[0, 0]) | ||||
|             .unwrap() | ||||
|             .count(); | ||||
|         assert_eq!(count, 5); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|   | ||||
| @@ -3,6 +3,7 @@ use std::path::PathBuf; | ||||
| use hf_hub::api::sync::ApiError; | ||||
|  | ||||
| use crate::error::FaultSource; | ||||
| use crate::PanicCatched; | ||||
|  | ||||
| #[derive(Debug, thiserror::Error)] | ||||
| #[error("Error while generating embeddings: {inner}")] | ||||
| @@ -80,6 +81,8 @@ pub enum EmbedErrorKind { | ||||
|     OpenAiUnexpectedDimension(usize, usize), | ||||
|     #[error("no embedding was produced")] | ||||
|     MissingEmbedding, | ||||
|     #[error(transparent)] | ||||
|     PanicInThreadPool(#[from] PanicCatched), | ||||
| } | ||||
|  | ||||
| impl EmbedError { | ||||
|   | ||||
| @@ -7,6 +7,7 @@ use serde::{Deserialize, Serialize}; | ||||
|  | ||||
| use self::error::{EmbedError, NewEmbedderError}; | ||||
| use crate::prompt::{Prompt, PromptData}; | ||||
| use crate::ThreadPoolNoAbort; | ||||
|  | ||||
| pub mod error; | ||||
| pub mod hf; | ||||
| @@ -254,7 +255,7 @@ impl Embedder { | ||||
|     pub fn embed_chunks( | ||||
|         &self, | ||||
|         text_chunks: Vec<Vec<String>>, | ||||
|         threads: &rayon::ThreadPool, | ||||
|         threads: &ThreadPoolNoAbort, | ||||
|     ) -> std::result::Result<Vec<Vec<Embeddings<f32>>>, EmbedError> { | ||||
|         match self { | ||||
|             Embedder::HuggingFace(embedder) => embedder.embed_chunks(text_chunks), | ||||
|   | ||||
| @@ -3,6 +3,8 @@ use rayon::iter::{IntoParallelIterator as _, ParallelIterator as _}; | ||||
| use super::error::{EmbedError, EmbedErrorKind, NewEmbedderError, NewEmbedderErrorKind}; | ||||
| use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions}; | ||||
| use super::{DistributionShift, Embeddings}; | ||||
| use crate::error::FaultSource; | ||||
| use crate::ThreadPoolNoAbort; | ||||
|  | ||||
| #[derive(Debug)] | ||||
| pub struct Embedder { | ||||
| @@ -71,11 +73,16 @@ impl Embedder { | ||||
|     pub fn embed_chunks( | ||||
|         &self, | ||||
|         text_chunks: Vec<Vec<String>>, | ||||
|         threads: &rayon::ThreadPool, | ||||
|         threads: &ThreadPoolNoAbort, | ||||
|     ) -> Result<Vec<Vec<Embeddings<f32>>>, EmbedError> { | ||||
|         threads.install(move || { | ||||
|             text_chunks.into_par_iter().map(move |chunk| self.embed(chunk)).collect() | ||||
|         }) | ||||
|         threads | ||||
|             .install(move || { | ||||
|                 text_chunks.into_par_iter().map(move |chunk| self.embed(chunk)).collect() | ||||
|             }) | ||||
|             .map_err(|error| EmbedError { | ||||
|                 kind: EmbedErrorKind::PanicInThreadPool(error), | ||||
|                 fault: FaultSource::Bug, | ||||
|             })? | ||||
|     } | ||||
|  | ||||
|     pub fn chunk_count_hint(&self) -> usize { | ||||
|   | ||||
| @@ -4,7 +4,9 @@ use rayon::iter::{IntoParallelIterator, ParallelIterator as _}; | ||||
| use super::error::{EmbedError, NewEmbedderError}; | ||||
| use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions}; | ||||
| use super::{DistributionShift, Embeddings}; | ||||
| use crate::error::FaultSource; | ||||
| use crate::vector::error::EmbedErrorKind; | ||||
| use crate::ThreadPoolNoAbort; | ||||
|  | ||||
| #[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] | ||||
| pub struct EmbedderOptions { | ||||
| @@ -241,11 +243,16 @@ impl Embedder { | ||||
|     pub fn embed_chunks( | ||||
|         &self, | ||||
|         text_chunks: Vec<Vec<String>>, | ||||
|         threads: &rayon::ThreadPool, | ||||
|         threads: &ThreadPoolNoAbort, | ||||
|     ) -> Result<Vec<Vec<Embeddings<f32>>>, EmbedError> { | ||||
|         threads.install(move || { | ||||
|             text_chunks.into_par_iter().map(move |chunk| self.embed(chunk)).collect() | ||||
|         }) | ||||
|         threads | ||||
|             .install(move || { | ||||
|                 text_chunks.into_par_iter().map(move |chunk| self.embed(chunk)).collect() | ||||
|             }) | ||||
|             .map_err(|error| EmbedError { | ||||
|                 kind: EmbedErrorKind::PanicInThreadPool(error), | ||||
|                 fault: FaultSource::Bug, | ||||
|             })? | ||||
|     } | ||||
|  | ||||
|     pub fn chunk_count_hint(&self) -> usize { | ||||
|   | ||||
| @@ -2,9 +2,12 @@ use deserr::Deserr; | ||||
| use rayon::iter::{IntoParallelIterator as _, ParallelIterator as _}; | ||||
| use serde::{Deserialize, Serialize}; | ||||
|  | ||||
| use super::error::EmbedErrorKind; | ||||
| use super::{ | ||||
|     DistributionShift, EmbedError, Embedding, Embeddings, NewEmbedderError, REQUEST_PARALLELISM, | ||||
| }; | ||||
| use crate::error::FaultSource; | ||||
| use crate::ThreadPoolNoAbort; | ||||
|  | ||||
| // retrying in case of failure | ||||
|  | ||||
| @@ -158,11 +161,16 @@ impl Embedder { | ||||
|     pub fn embed_chunks( | ||||
|         &self, | ||||
|         text_chunks: Vec<Vec<String>>, | ||||
|         threads: &rayon::ThreadPool, | ||||
|         threads: &ThreadPoolNoAbort, | ||||
|     ) -> Result<Vec<Vec<Embeddings<f32>>>, EmbedError> { | ||||
|         threads.install(move || { | ||||
|             text_chunks.into_par_iter().map(move |chunk| self.embed(chunk)).collect() | ||||
|         }) | ||||
|         threads | ||||
|             .install(move || { | ||||
|                 text_chunks.into_par_iter().map(move |chunk| self.embed(chunk)).collect() | ||||
|             }) | ||||
|             .map_err(|error| EmbedError { | ||||
|                 kind: EmbedErrorKind::PanicInThreadPool(error), | ||||
|                 fault: FaultSource::Bug, | ||||
|             })? | ||||
|     } | ||||
|  | ||||
|     pub fn chunk_count_hint(&self) -> usize { | ||||
|   | ||||
| @@ -301,10 +301,14 @@ impl From<EmbeddingConfig> for EmbeddingSettings { | ||||
|     fn from(value: EmbeddingConfig) -> Self { | ||||
|         let EmbeddingConfig { embedder_options, prompt } = value; | ||||
|         match embedder_options { | ||||
|             super::EmbedderOptions::HuggingFace(options) => Self { | ||||
|             super::EmbedderOptions::HuggingFace(super::hf::EmbedderOptions { | ||||
|                 model, | ||||
|                 revision, | ||||
|                 distribution, | ||||
|             }) => Self { | ||||
|                 source: Setting::Set(EmbedderSource::HuggingFace), | ||||
|                 model: Setting::Set(options.model), | ||||
|                 revision: options.revision.map(Setting::Set).unwrap_or_default(), | ||||
|                 model: Setting::Set(model), | ||||
|                 revision: revision.map(Setting::Set).unwrap_or_default(), | ||||
|                 api_key: Setting::NotSet, | ||||
|                 dimensions: Setting::NotSet, | ||||
|                 document_template: Setting::Set(prompt.template), | ||||
| @@ -314,14 +318,19 @@ impl From<EmbeddingConfig> for EmbeddingSettings { | ||||
|                 path_to_embeddings: Setting::NotSet, | ||||
|                 embedding_object: Setting::NotSet, | ||||
|                 input_type: Setting::NotSet, | ||||
|                 distribution: options.distribution.map(Setting::Set).unwrap_or_default(), | ||||
|                 distribution: distribution.map(Setting::Set).unwrap_or_default(), | ||||
|             }, | ||||
|             super::EmbedderOptions::OpenAi(options) => Self { | ||||
|             super::EmbedderOptions::OpenAi(super::openai::EmbedderOptions { | ||||
|                 api_key, | ||||
|                 embedding_model, | ||||
|                 dimensions, | ||||
|                 distribution, | ||||
|             }) => Self { | ||||
|                 source: Setting::Set(EmbedderSource::OpenAi), | ||||
|                 model: Setting::Set(options.embedding_model.name().to_owned()), | ||||
|                 model: Setting::Set(embedding_model.name().to_owned()), | ||||
|                 revision: Setting::NotSet, | ||||
|                 api_key: options.api_key.map(Setting::Set).unwrap_or_default(), | ||||
|                 dimensions: options.dimensions.map(Setting::Set).unwrap_or_default(), | ||||
|                 api_key: api_key.map(Setting::Set).unwrap_or_default(), | ||||
|                 dimensions: dimensions.map(Setting::Set).unwrap_or_default(), | ||||
|                 document_template: Setting::Set(prompt.template), | ||||
|                 url: Setting::NotSet, | ||||
|                 query: Setting::NotSet, | ||||
| @@ -329,29 +338,37 @@ impl From<EmbeddingConfig> for EmbeddingSettings { | ||||
|                 path_to_embeddings: Setting::NotSet, | ||||
|                 embedding_object: Setting::NotSet, | ||||
|                 input_type: Setting::NotSet, | ||||
|                 distribution: options.distribution.map(Setting::Set).unwrap_or_default(), | ||||
|                 distribution: distribution.map(Setting::Set).unwrap_or_default(), | ||||
|             }, | ||||
|             super::EmbedderOptions::Ollama(options) => Self { | ||||
|             super::EmbedderOptions::Ollama(super::ollama::EmbedderOptions { | ||||
|                 embedding_model, | ||||
|                 url, | ||||
|                 api_key, | ||||
|                 distribution, | ||||
|             }) => Self { | ||||
|                 source: Setting::Set(EmbedderSource::Ollama), | ||||
|                 model: Setting::Set(options.embedding_model.to_owned()), | ||||
|                 model: Setting::Set(embedding_model), | ||||
|                 revision: Setting::NotSet, | ||||
|                 api_key: Setting::NotSet, | ||||
|                 api_key: api_key.map(Setting::Set).unwrap_or_default(), | ||||
|                 dimensions: Setting::NotSet, | ||||
|                 document_template: Setting::Set(prompt.template), | ||||
|                 url: Setting::NotSet, | ||||
|                 url: url.map(Setting::Set).unwrap_or_default(), | ||||
|                 query: Setting::NotSet, | ||||
|                 input_field: Setting::NotSet, | ||||
|                 path_to_embeddings: Setting::NotSet, | ||||
|                 embedding_object: Setting::NotSet, | ||||
|                 input_type: Setting::NotSet, | ||||
|                 distribution: options.distribution.map(Setting::Set).unwrap_or_default(), | ||||
|                 distribution: distribution.map(Setting::Set).unwrap_or_default(), | ||||
|             }, | ||||
|             super::EmbedderOptions::UserProvided(options) => Self { | ||||
|             super::EmbedderOptions::UserProvided(super::manual::EmbedderOptions { | ||||
|                 dimensions, | ||||
|                 distribution, | ||||
|             }) => Self { | ||||
|                 source: Setting::Set(EmbedderSource::UserProvided), | ||||
|                 model: Setting::NotSet, | ||||
|                 revision: Setting::NotSet, | ||||
|                 api_key: Setting::NotSet, | ||||
|                 dimensions: Setting::Set(options.dimensions), | ||||
|                 dimensions: Setting::Set(dimensions), | ||||
|                 document_template: Setting::NotSet, | ||||
|                 url: Setting::NotSet, | ||||
|                 query: Setting::NotSet, | ||||
| @@ -359,7 +376,7 @@ impl From<EmbeddingConfig> for EmbeddingSettings { | ||||
|                 path_to_embeddings: Setting::NotSet, | ||||
|                 embedding_object: Setting::NotSet, | ||||
|                 input_type: Setting::NotSet, | ||||
|                 distribution: options.distribution.map(Setting::Set).unwrap_or_default(), | ||||
|                 distribution: distribution.map(Setting::Set).unwrap_or_default(), | ||||
|             }, | ||||
|             super::EmbedderOptions::Rest(super::rest::EmbedderOptions { | ||||
|                 api_key, | ||||
|   | ||||
		Reference in New Issue
	
	Block a user