mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-24 20:46:27 +00:00 
			
		
		
		
	Integrate first searchable exctrator
This commit is contained in:
		| @@ -1,10 +1,12 @@ | ||||
| use std::fs::File; | ||||
|  | ||||
| use crossbeam_channel::{IntoIter, Receiver, SendError, Sender}; | ||||
| use grenad::Merger; | ||||
| use heed::types::Bytes; | ||||
|  | ||||
| use super::StdResult; | ||||
| use crate::update::new::KvReaderFieldId; | ||||
| use crate::update::MergeDeladdCboRoaringBitmaps; | ||||
| use crate::{DocumentId, Index}; | ||||
|  | ||||
| /// The capacity of the channel is currently in number of messages. | ||||
| @@ -159,7 +161,7 @@ impl DocumentSender { | ||||
| } | ||||
|  | ||||
| pub enum MergerOperation { | ||||
|     WordDocidsCursors(Vec<grenad::ReaderCursor<File>>), | ||||
|     WordDocidsMerger(Merger<File, MergeDeladdCboRoaringBitmaps>), | ||||
| } | ||||
|  | ||||
| pub struct MergerReceiver(Receiver<MergerOperation>); | ||||
| @@ -175,3 +177,16 @@ impl IntoIterator for MergerReceiver { | ||||
|  | ||||
| #[derive(Clone)] | ||||
| pub struct DeladdCboRoaringBitmapSender(Sender<MergerOperation>); | ||||
|  | ||||
| impl DeladdCboRoaringBitmapSender { | ||||
|     pub fn word_docids( | ||||
|         &self, | ||||
|         merger: Merger<File, MergeDeladdCboRoaringBitmaps>, | ||||
|     ) -> StdResult<(), SendError<()>> { | ||||
|         let operation = MergerOperation::WordDocidsMerger(merger); | ||||
|         match self.0.send(operation) { | ||||
|             Ok(()) => Ok(()), | ||||
|             Err(SendError(_)) => Err(SendError(())), | ||||
|         } | ||||
|     } | ||||
| } | ||||
|   | ||||
| @@ -2,7 +2,7 @@ use heed::RoTxn; | ||||
| use obkv::KvReader; | ||||
|  | ||||
| use crate::update::new::KvReaderFieldId; | ||||
| use crate::{DocumentId, FieldId}; | ||||
| use crate::{DocumentId, FieldId, Index}; | ||||
|  | ||||
| pub enum DocumentChange { | ||||
|     Deletion(Deletion), | ||||
| @@ -13,13 +13,13 @@ pub enum DocumentChange { | ||||
| pub struct Deletion { | ||||
|     docid: DocumentId, | ||||
|     external_docid: String,        // ? | ||||
|     current: Box<KvReaderFieldId>, | ||||
|     current: Box<KvReaderFieldId>, // ? | ||||
| } | ||||
|  | ||||
| pub struct Update { | ||||
|     docid: DocumentId, | ||||
|     external_docid: String,        // ? | ||||
|     current: Box<KvReaderFieldId>, | ||||
|     current: Box<KvReaderFieldId>, // ? | ||||
|     new: Box<KvReaderFieldId>, | ||||
| } | ||||
|  | ||||
| @@ -30,7 +30,7 @@ pub struct Insertion { | ||||
| } | ||||
|  | ||||
| impl DocumentChange { | ||||
|     fn docid(&self) -> DocumentId { | ||||
|     pub fn docid(&self) -> DocumentId { | ||||
|         match &self { | ||||
|             Self::Deletion(inner) => inner.docid(), | ||||
|             Self::Update(inner) => inner.docid(), | ||||
| @@ -48,11 +48,11 @@ impl Deletion { | ||||
|         Self { docid, external_docid, current } | ||||
|     } | ||||
|  | ||||
|     fn docid(&self) -> DocumentId { | ||||
|     pub fn docid(&self) -> DocumentId { | ||||
|         self.docid | ||||
|     } | ||||
|  | ||||
|     fn current(&self, rtxn: &RoTxn) -> &KvReader<FieldId> { | ||||
|     pub fn current(&self, rtxn: &RoTxn, index: &Index) -> &KvReader<FieldId> { | ||||
|         unimplemented!() | ||||
|     } | ||||
| } | ||||
| @@ -62,11 +62,11 @@ impl Insertion { | ||||
|         Insertion { docid, external_docid, new } | ||||
|     } | ||||
|  | ||||
|     fn docid(&self) -> DocumentId { | ||||
|     pub fn docid(&self) -> DocumentId { | ||||
|         self.docid | ||||
|     } | ||||
|  | ||||
|     fn new(&self) -> &KvReader<FieldId> { | ||||
|     pub fn new(&self) -> &KvReader<FieldId> { | ||||
|         unimplemented!() | ||||
|     } | ||||
| } | ||||
| @@ -81,15 +81,15 @@ impl Update { | ||||
|         Update { docid, external_docid, current, new } | ||||
|     } | ||||
|  | ||||
|     fn docid(&self) -> DocumentId { | ||||
|     pub fn docid(&self) -> DocumentId { | ||||
|         self.docid | ||||
|     } | ||||
|  | ||||
|     fn current(&self, rtxn: &RoTxn) -> &KvReader<FieldId> { | ||||
|     pub fn current(&self, rtxn: &RoTxn, index: &Index) -> &KvReader<FieldId> { | ||||
|         unimplemented!() | ||||
|     } | ||||
|  | ||||
|     fn new(&self) -> &KvReader<FieldId> { | ||||
|     pub fn new(&self) -> &KvReader<FieldId> { | ||||
|         unimplemented!() | ||||
|     } | ||||
| } | ||||
|   | ||||
| @@ -2,12 +2,12 @@ use std::borrow::Cow; | ||||
| use std::num::NonZeroUsize; | ||||
| use std::{io, mem}; | ||||
|  | ||||
| use grenad2::{MergeFunction, Sorter}; | ||||
| use grenad::{MergeFunction, Sorter}; | ||||
| use lru::LruCache; | ||||
| use roaring::RoaringBitmap; | ||||
| use smallvec::SmallVec; | ||||
|  | ||||
| use crate::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; | ||||
| use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; | ||||
|  | ||||
| #[derive(Debug)] | ||||
| pub struct CachedSorter<MF> { | ||||
|   | ||||
| @@ -1,84 +1,180 @@ | ||||
| pub fn extract_word_docids( | ||||
| use std::fs::File; | ||||
|  | ||||
| use charabia::TokenizerBuilder; | ||||
| use grenad::Merger; | ||||
| use grenad::ReaderCursor; | ||||
| use heed::RoTxn; | ||||
| use rayon::iter::IntoParallelIterator; | ||||
| use rayon::iter::ParallelBridge; | ||||
| use rayon::iter::ParallelIterator; | ||||
|  | ||||
| use crate::update::MergeDeladdCboRoaringBitmaps; | ||||
| use crate::{ | ||||
|     update::{ | ||||
|         create_sorter, | ||||
|         new::{DocumentChange, ItemsPool}, | ||||
|         GrenadParameters, | ||||
|     }, | ||||
|     FieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE, | ||||
| }; | ||||
|  | ||||
| use super::{ | ||||
|     cache::{CachedSorter, DelAddRoaringBitmapMerger}, | ||||
|     tokenize_document::DocumentTokenizer, | ||||
| }; | ||||
|  | ||||
| pub trait SearchableExtractor { | ||||
|     fn run_extraction( | ||||
|         index: &Index, | ||||
|         fields_ids_map: &FieldsIdsMap, | ||||
|         indexer: GrenadParameters, | ||||
|         document_changes: impl IntoParallelIterator<Item = Result<DocumentChange>>, | ||||
|     ) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>> { | ||||
|         let max_memory = indexer.max_memory_by_thread(); | ||||
|  | ||||
|         let rtxn = index.read_txn()?; | ||||
|         let stop_words = index.stop_words(&rtxn)?; | ||||
|         let allowed_separators = index.allowed_separators(&rtxn)?; | ||||
|         let allowed_separators: Option<Vec<_>> = | ||||
|             allowed_separators.as_ref().map(|s| s.iter().map(String::as_str).collect()); | ||||
|         let dictionary = index.dictionary(&rtxn)?; | ||||
|         let dictionary: Option<Vec<_>> = | ||||
|             dictionary.as_ref().map(|s| s.iter().map(String::as_str).collect()); | ||||
|         let builder = tokenizer_builder( | ||||
|             stop_words.as_ref(), | ||||
|             allowed_separators.as_deref(), | ||||
|             dictionary.as_deref(), | ||||
|         ); | ||||
|         let tokenizer = builder.into_tokenizer(); | ||||
|  | ||||
|         let user_defined_searchable_fields = index.user_defined_searchable_fields(&rtxn)?; | ||||
|         let localized_attributes_rules = | ||||
|             index.localized_attributes_rules(&rtxn)?.unwrap_or_default(); | ||||
|  | ||||
|         let document_tokenizer = DocumentTokenizer { | ||||
|             tokenizer: &tokenizer, | ||||
|             searchable_attributes: user_defined_searchable_fields.as_deref(), | ||||
|             localized_attributes_rules: &localized_attributes_rules, | ||||
|             max_positions_per_attributes: MAX_POSITION_PER_ATTRIBUTE, | ||||
|         }; | ||||
|  | ||||
|         let context_pool = ItemsPool::new(|| { | ||||
|             Ok(( | ||||
|                 index.read_txn()?, | ||||
|                 &document_tokenizer, | ||||
|                 CachedSorter::new( | ||||
|                     // TODO use a better value | ||||
|                     100.try_into().unwrap(), | ||||
|                     create_sorter( | ||||
|                         grenad::SortAlgorithm::Stable, | ||||
|                         DelAddRoaringBitmapMerger, | ||||
|                         indexer.chunk_compression_type, | ||||
|                         indexer.chunk_compression_level, | ||||
|                         indexer.max_nb_chunks, | ||||
|                         max_memory, | ||||
|                     ), | ||||
|                 ), | ||||
|             )) | ||||
|         }); | ||||
|  | ||||
|         document_changes.into_par_iter().try_for_each(|document_change| { | ||||
|             context_pool.with(|(rtxn, document_tokenizer, cached_sorter)| { | ||||
|                 Self::extract_document_change( | ||||
|                     &*rtxn, | ||||
|                     index, | ||||
|                     document_tokenizer, | ||||
|                     &fields_ids_map, | ||||
|                     cached_sorter, | ||||
|                     document_change?, | ||||
|                 ) | ||||
|             }) | ||||
|         })?; | ||||
|  | ||||
|         let mut builder = grenad::MergerBuilder::new(MergeDeladdCboRoaringBitmaps); | ||||
|         for (_rtxn, _tokenizer, cache) in context_pool.into_items() { | ||||
|             let sorter = cache.into_sorter()?; | ||||
|             let readers = sorter.into_reader_cursors()?; | ||||
|             builder.extend(readers); | ||||
|         } | ||||
|  | ||||
|         Ok(builder.build()) | ||||
|     } | ||||
|  | ||||
|     fn extract_document_change( | ||||
|         rtxn: &RoTxn, | ||||
|         index: &Index, | ||||
|         document_tokenizer: &DocumentTokenizer, | ||||
|         fields_ids_map: &FieldsIdsMap, | ||||
|         cached_sorter: &mut CachedSorter<DelAddRoaringBitmapMerger>, | ||||
|         document_change: DocumentChange, | ||||
|     _tokenizer: &Tokenizer, | ||||
|     output: &mut CachedSorter<DelAddRoaringBitmapMerger>, | ||||
| ) -> grenad::Result<(), io::Error> { | ||||
|     ) -> Result<()>; | ||||
| } | ||||
|  | ||||
| pub struct WordDocidsExtractor; | ||||
| impl SearchableExtractor for WordDocidsExtractor { | ||||
|     fn extract_document_change( | ||||
|         rtxn: &RoTxn, | ||||
|         index: &Index, | ||||
|         document_tokenizer: &DocumentTokenizer, | ||||
|         fields_ids_map: &FieldsIdsMap, | ||||
|         // TODO: DelAddRoaringBitmapMerger should be CBO | ||||
|         cached_sorter: &mut CachedSorter<DelAddRoaringBitmapMerger>, | ||||
|         document_change: DocumentChange, | ||||
|     ) -> crate::Result<()> { | ||||
|         match document_change { | ||||
|             DocumentChange::Deletion(inner) => { | ||||
|             unimplemented!() | ||||
|                 let mut token_fn = |_fid, _pos: u16, word: &str| { | ||||
|                     cached_sorter.insert_del_u32(word.as_bytes(), inner.docid()).unwrap(); | ||||
|                 }; | ||||
|                 document_tokenizer.tokenize_document( | ||||
|                     inner.current(rtxn, index), | ||||
|                     fields_ids_map, | ||||
|                     &mut token_fn, | ||||
|                 )?; | ||||
|             } | ||||
|             DocumentChange::Update(inner) => { | ||||
|             unimplemented!() | ||||
|                 let mut token_fn = |_fid, _pos, word: &str| { | ||||
|                     cached_sorter.insert_del_u32(word.as_bytes(), inner.docid()).unwrap(); | ||||
|                 }; | ||||
|                 document_tokenizer.tokenize_document( | ||||
|                     inner.current(rtxn, index), | ||||
|                     fields_ids_map, | ||||
|                     &mut token_fn, | ||||
|                 )?; | ||||
|  | ||||
|                 let mut token_fn = |_fid, _pos, word: &str| { | ||||
|                     cached_sorter.insert_add_u32(word.as_bytes(), inner.docid()).unwrap(); | ||||
|                 }; | ||||
|                 document_tokenizer.tokenize_document(inner.new(), fields_ids_map, &mut token_fn)?; | ||||
|             } | ||||
|             DocumentChange::Insertion(inner) => { | ||||
|             unimplemented!() | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     let normalizer_options = NormalizerOption::default(); | ||||
|  | ||||
|     if let Some(previous_doc) = previous_doc { | ||||
|         for (_, v) in previous_doc.iter() { | ||||
|             // Only manage the direct JSON strings | ||||
|             // TODO manage the JSON strings correctly (escaped chars) | ||||
|             if v.first().zip(v.last()) == Some((&b'"', &b'"')) { | ||||
|                 let s = std::str::from_utf8(&v[1..v.len() - 1]).unwrap(); | ||||
|                 // for token in tokenizer.tokenize(s).filter(|t| t.is_word()) { | ||||
|                 //     let key = token.lemma().normalize(&normalizer_options); | ||||
|                 for token in s.split_whitespace() { | ||||
|                     let key = token.normalize(&normalizer_options); | ||||
|                     output.insert_del_u32(key.as_bytes(), docid)?; | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     for (_, v) in new_doc.iter() { | ||||
|         // Only manage the direct JSON strings | ||||
|         // TODO manage the JSON strings correctly (escaped chars) | ||||
|         if v.first().zip(v.last()) == Some((&b'"', &b'"')) { | ||||
|             let s = std::str::from_utf8(&v[1..v.len() - 1]).unwrap(); | ||||
|             // for token in tokenizer.tokenize(s).filter(|t| t.is_word()) { | ||||
|             //     let key = token.lemma().normalize(&normalizer_options); | ||||
|             for token in s.split_whitespace() { | ||||
|                 let key = token.normalize(&normalizer_options); | ||||
|                 output.insert_add_u32(key.as_bytes(), docid)?; | ||||
|             } | ||||
|                 let mut token_fn = |_fid, _pos, word: &str| { | ||||
|                     cached_sorter.insert_add_u32(word.as_bytes(), inner.docid()).unwrap(); | ||||
|                 }; | ||||
|                 document_tokenizer.tokenize_document(inner.new(), fields_ids_map, &mut token_fn)?; | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// take an iterator on tokens and compute their relative position depending on separator kinds | ||||
| /// if it's an `Hard` separator we add an additional relative proximity of 8 between words, | ||||
| /// else we keep the standard proximity of 1 between words. | ||||
| fn process_tokens<'a>( | ||||
|     tokens: impl Iterator<Item = Token<'a>>, | ||||
| ) -> impl Iterator<Item = (usize, Token<'a>)> { | ||||
|     tokens | ||||
|         .skip_while(|token| token.is_separator()) | ||||
|         .scan((0, None), |(offset, prev_kind), mut token| { | ||||
|             match token.kind { | ||||
|                 TokenKind::Word | TokenKind::StopWord if !token.lemma().is_empty() => { | ||||
|                     *offset += match *prev_kind { | ||||
|                         Some(TokenKind::Separator(SeparatorKind::Hard)) => 8, | ||||
|                         Some(_) => 1, | ||||
|                         None => 0, | ||||
|                     }; | ||||
|                     *prev_kind = Some(token.kind) | ||||
| /// Factorize tokenizer building. | ||||
| fn tokenizer_builder<'a>( | ||||
|     stop_words: Option<&'a fst::Set<&'a [u8]>>, | ||||
|     allowed_separators: Option<&'a [&str]>, | ||||
|     dictionary: Option<&'a [&str]>, | ||||
| ) -> TokenizerBuilder<'a, &'a [u8]> { | ||||
|     let mut tokenizer_builder = TokenizerBuilder::new(); | ||||
|     if let Some(stop_words) = stop_words { | ||||
|         tokenizer_builder.stop_words(stop_words); | ||||
|     } | ||||
|                 TokenKind::Separator(SeparatorKind::Hard) => { | ||||
|                     *prev_kind = Some(token.kind); | ||||
|     if let Some(dictionary) = dictionary { | ||||
|         tokenizer_builder.words_dict(dictionary); | ||||
|     } | ||||
|                 TokenKind::Separator(SeparatorKind::Soft) | ||||
|                     if *prev_kind != Some(TokenKind::Separator(SeparatorKind::Hard)) => | ||||
|                 { | ||||
|                     *prev_kind = Some(token.kind); | ||||
|     if let Some(separators) = allowed_separators { | ||||
|         tokenizer_builder.separators(separators); | ||||
|     } | ||||
|                 _ => token.kind = TokenKind::Unknown, | ||||
|             } | ||||
|             Some((*offset, token)) | ||||
|         }) | ||||
|         .filter(|(_, t)| t.is_word()) | ||||
|  | ||||
|     tokenizer_builder | ||||
| } | ||||
|   | ||||
| @@ -1,2 +1,6 @@ | ||||
| mod cache; | ||||
| mod extract_word_docids; | ||||
| mod tokenize_document; | ||||
|  | ||||
| pub use extract_word_docids::SearchableExtractor; | ||||
| pub use extract_word_docids::WordDocidsExtractor; | ||||
|   | ||||
| @@ -1,56 +1,71 @@ | ||||
| pub struct DocumentTokenizer { | ||||
|     tokenizer: &Tokenizer, | ||||
|     searchable_attributes: Option<&[String]>, | ||||
|     localized_attributes_rules: &[LocalizedAttributesRule], | ||||
|     max_positions_per_attributes: u32, | ||||
| use crate::{ | ||||
|     update::new::KvReaderFieldId, FieldId, FieldsIdsMap, Index, InternalError, | ||||
|     LocalizedAttributesRule, Result, MAX_POSITION_PER_ATTRIBUTE, MAX_WORD_LENGTH, | ||||
| }; | ||||
| use charabia::{SeparatorKind, Token, TokenKind, Tokenizer, TokenizerBuilder}; | ||||
| use heed::RoTxn; | ||||
| use serde_json::Value; | ||||
| use std::collections::HashMap; | ||||
|  | ||||
| pub struct DocumentTokenizer<'a> { | ||||
|     pub tokenizer: &'a Tokenizer<'a>, | ||||
|     pub searchable_attributes: Option<&'a [&'a str]>, | ||||
|     pub localized_attributes_rules: &'a [LocalizedAttributesRule], | ||||
|     pub max_positions_per_attributes: u32, | ||||
| } | ||||
|  | ||||
| impl DocumentTokenizer { | ||||
|     // pub fn new(tokenizer: &Tokenizer, settings: &InnerIndexSettings) -> Self { | ||||
|     //     Self { tokenizer, settings } | ||||
|     // } | ||||
|  | ||||
|     pub fn tokenize_document<'a>( | ||||
|         obkv: &KvReader<'a, FieldId>, | ||||
| impl<'a> DocumentTokenizer<'a> { | ||||
|     pub fn tokenize_document( | ||||
|         &self, | ||||
|         obkv: &KvReaderFieldId, | ||||
|         field_id_map: &FieldsIdsMap, | ||||
|         token_fn: impl Fn(FieldId, u16, &str), | ||||
|     ) { | ||||
|         let mut field_position = Hashmap::new(); | ||||
|         token_fn: &mut impl FnMut(FieldId, u16, &str), | ||||
|     ) -> Result<()> { | ||||
|         let mut field_position = HashMap::new(); | ||||
|         for (field_id, field_bytes) in obkv { | ||||
|             let field_name = field_id_map.name(field_id); | ||||
|             let Some(field_name) = field_id_map.name(field_id) else { | ||||
|                 unreachable!("field id not found in field id map"); | ||||
|             }; | ||||
|  | ||||
|             let mut tokenize_field = |name: &str, value: &Value| { | ||||
|                 let Some(field_id) = field_id_map.id(name) else { | ||||
|                     unreachable!("field name not found in field id map"); | ||||
|                 }; | ||||
|  | ||||
|                 let position = | ||||
|                     field_position.entry(field_id).and_modify(|counter| *counter += 8).or_insert(0); | ||||
|                 if *position as u32 >= self.max_positions_per_attributes { | ||||
|                     return; | ||||
|                 } | ||||
|  | ||||
|             let tokenize_field = |name, value| { | ||||
|                 let field_id = field_id_map.id(name); | ||||
|                 match value { | ||||
|                     Number(n) => { | ||||
|                     Value::Number(n) => { | ||||
|                         let token = n.to_string(); | ||||
|                         let position = field_position | ||||
|                             .entry(field_id) | ||||
|                             .and_modify(|counter| *counter += 8) | ||||
|                             .or_insert(0); | ||||
|                         if let Ok(position) = (*position).try_into() { | ||||
|                             token_fn(field_id, position, token.as_str()); | ||||
|                         } | ||||
|                     String(text) => { | ||||
|                     } | ||||
|                     Value::String(text) => { | ||||
|                         // create an iterator of token with their positions. | ||||
|                         let locales = self | ||||
|                             .localized_attributes_rules | ||||
|                             .iter() | ||||
|                             .first(|rule| rule.match_str(field_name)) | ||||
|                             .map(|rule| rule.locales(field_id)); | ||||
|                         let tokens = | ||||
|                             process_tokens(tokenizer.tokenize_with_allow_list(field, locales)) | ||||
|                                 .take_while(|(p, _)| { | ||||
|                                     (*p as u32) < self.max_positions_per_attributes | ||||
|                                 }); | ||||
|                             .find(|rule| rule.match_str(field_name)) | ||||
|                             .map(|rule| rule.locales()); | ||||
|                         let tokens = process_tokens( | ||||
|                             *position, | ||||
|                             self.tokenizer.tokenize_with_allow_list(text.as_str(), locales), | ||||
|                         ) | ||||
|                         .take_while(|(p, _)| (*p as u32) < self.max_positions_per_attributes); | ||||
|  | ||||
|                         for (index, token) in tokens { | ||||
|                             // keep a word only if it is not empty and fit in a LMDB key. | ||||
|                             let token = token.lemma().trim(); | ||||
|                             if !token.is_empty() && token.len() <= MAX_WORD_LENGTH { | ||||
|                                 let position: u16 = index | ||||
|                                     .try_into() | ||||
|                                     .map_err(|_| SerializationError::InvalidNumberSerialization)?; | ||||
|                                 writer.insert(position, token.as_bytes())?; | ||||
|                                 *position = index; | ||||
|                                 if let Ok(position) = (*position).try_into() { | ||||
|                                     token_fn(field_id, position, token); | ||||
|                                 } | ||||
|                             } | ||||
|                         } | ||||
|                     } | ||||
| @@ -59,21 +74,28 @@ impl DocumentTokenizer { | ||||
|             }; | ||||
|  | ||||
|             // if the current field is searchable or contains a searchable attribute | ||||
|             if searchable_attributes.map_or(true, |attributes| { | ||||
|                 attributes.iter().any(|name| contained_in(name, field_name)) | ||||
|             if self.searchable_attributes.map_or(true, |attributes| { | ||||
|                 attributes.iter().any(|name| perm_json_p::contained_in(name, field_name)) | ||||
|             }) { | ||||
|                 // parse json. | ||||
|                 match serde_json::from_slice(field_bytes).map_err(InternalError::SerdeJson)? { | ||||
|                     Value::Object(object) => { | ||||
|                         seek_leaf_values_in_object(object, selectors, &field_name, tokenize_field) | ||||
|                     } | ||||
|                     Value::Array(array) => { | ||||
|                         seek_leaf_values_in_array(array, selectors, &field_name, tokenize_field) | ||||
|                     } | ||||
|                     value => tokenize_field(&base_key, value), | ||||
|                     Value::Object(object) => perm_json_p::seek_leaf_values_in_object( | ||||
|                         &object, | ||||
|                         self.searchable_attributes.as_deref(), | ||||
|                         &field_name, | ||||
|                         &mut tokenize_field, | ||||
|                     ), | ||||
|                     Value::Array(array) => perm_json_p::seek_leaf_values_in_array( | ||||
|                         &array, | ||||
|                         self.searchable_attributes.as_deref(), | ||||
|                         &field_name, | ||||
|                         &mut tokenize_field, | ||||
|                     ), | ||||
|                     value => tokenize_field(&field_name, &value), | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| @@ -81,11 +103,12 @@ impl DocumentTokenizer { | ||||
| /// if it's an `Hard` separator we add an additional relative proximity of 8 between words, | ||||
| /// else we keep the standard proximity of 1 between words. | ||||
| fn process_tokens<'a>( | ||||
|     start_offset: usize, | ||||
|     tokens: impl Iterator<Item = Token<'a>>, | ||||
| ) -> impl Iterator<Item = (usize, Token<'a>)> { | ||||
|     tokens | ||||
|         .skip_while(|token| token.is_separator()) | ||||
|         .scan((0, None), |(offset, prev_kind), mut token| { | ||||
|         .scan((start_offset, None), |(offset, prev_kind), mut token| { | ||||
|             match token.kind { | ||||
|                 TokenKind::Word | TokenKind::StopWord if !token.lemma().is_empty() => { | ||||
|                     *offset += match *prev_kind { | ||||
| @@ -110,42 +133,45 @@ fn process_tokens<'a>( | ||||
|         .filter(|(_, t)| t.is_word()) | ||||
| } | ||||
|  | ||||
| /// Returns `true` if the `selector` match the `key`. | ||||
| /// | ||||
| /// ```text | ||||
| /// Example: | ||||
| /// `animaux`           match `animaux` | ||||
| /// `animaux.chien`     match `animaux` | ||||
| /// `animaux.chien`     match `animaux` | ||||
| /// `animaux.chien.nom` match `animaux` | ||||
| /// `animaux.chien.nom` match `animaux.chien` | ||||
| /// ----------------------------------------- | ||||
| /// `animaux`    doesn't match `animaux.chien` | ||||
| /// `animaux.`   doesn't match `animaux` | ||||
| /// `animaux.ch` doesn't match `animaux.chien` | ||||
| /// `animau`     doesn't match `animaux` | ||||
| /// ``` | ||||
| fn contained_in(selector: &str, key: &str) -> bool { | ||||
|     selector.starts_with(key) | ||||
|         && selector[key.len()..].chars().next().map(|c| c == SPLIT_SYMBOL).unwrap_or(true) | ||||
| } | ||||
|  | ||||
| /// TODO move in permissive json pointer | ||||
| mod perm_json_p { | ||||
|     use serde_json::{Map, Value}; | ||||
|     const SPLIT_SYMBOL: char = '.'; | ||||
|  | ||||
|     /// Returns `true` if the `selector` match the `key`. | ||||
|     /// | ||||
|     /// ```text | ||||
|     /// Example: | ||||
|     /// `animaux`           match `animaux` | ||||
|     /// `animaux.chien`     match `animaux` | ||||
|     /// `animaux.chien`     match `animaux` | ||||
|     /// `animaux.chien.nom` match `animaux` | ||||
|     /// `animaux.chien.nom` match `animaux.chien` | ||||
|     /// ----------------------------------------- | ||||
|     /// `animaux`    doesn't match `animaux.chien` | ||||
|     /// `animaux.`   doesn't match `animaux` | ||||
|     /// `animaux.ch` doesn't match `animaux.chien` | ||||
|     /// `animau`     doesn't match `animaux` | ||||
|     /// ``` | ||||
|     pub fn contained_in(selector: &str, key: &str) -> bool { | ||||
|         selector.starts_with(key) | ||||
|             && selector[key.len()..].chars().next().map(|c| c == SPLIT_SYMBOL).unwrap_or(true) | ||||
|     } | ||||
|  | ||||
|     pub fn seek_leaf_values<'a>( | ||||
|         value: &Map<String, Value>, | ||||
|         selectors: impl IntoIterator<Item = &'a str>, | ||||
|         seeker: impl Fn(&str, &Value), | ||||
|         seeker: &mut impl FnMut(&str, &Value), | ||||
|     ) { | ||||
|         let selectors: Vec<_> = selectors.into_iter().collect(); | ||||
|         seek_leaf_values_in_object(value, &selectors, "", &seeker); | ||||
|         seek_leaf_values_in_object(value, Some(&selectors), "", seeker); | ||||
|     } | ||||
|  | ||||
|     pub fn seek_leaf_values_in_object( | ||||
|         value: &Map<String, Value>, | ||||
|         selectors: &[&str], | ||||
|         selectors: Option<&[&str]>, | ||||
|         base_key: &str, | ||||
|         seeker: &impl Fn(&str, &Value), | ||||
|         seeker: &mut impl FnMut(&str, &Value), | ||||
|     ) { | ||||
|         for (key, value) in value.iter() { | ||||
|             let base_key = if base_key.is_empty() { | ||||
| @@ -156,8 +182,10 @@ mod perm_json_p { | ||||
|  | ||||
|             // here if the user only specified `doggo` we need to iterate in all the fields of `doggo` | ||||
|             // so we check the contained_in on both side | ||||
|             let should_continue = selectors.iter().any(|selector| { | ||||
|             let should_continue = selectors.map_or(true, |selectors| { | ||||
|                 selectors.iter().any(|selector| { | ||||
|                     contained_in(selector, &base_key) || contained_in(&base_key, selector) | ||||
|                 }) | ||||
|             }); | ||||
|  | ||||
|             if should_continue { | ||||
| @@ -175,12 +203,12 @@ mod perm_json_p { | ||||
|     } | ||||
|  | ||||
|     pub fn seek_leaf_values_in_array( | ||||
|         values: &mut [Value], | ||||
|         selectors: &[&str], | ||||
|         values: &[Value], | ||||
|         selectors: Option<&[&str]>, | ||||
|         base_key: &str, | ||||
|         seeker: &impl Fn(&str, &Value), | ||||
|         seeker: &mut impl FnMut(&str, &Value), | ||||
|     ) { | ||||
|         for value in values.iter_mut() { | ||||
|         for value in values { | ||||
|             match value { | ||||
|                 Value::Object(object) => { | ||||
|                     seek_leaf_values_in_object(object, selectors, base_key, seeker) | ||||
| @@ -193,3 +221,91 @@ mod perm_json_p { | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[cfg(test)] | ||||
| mod test { | ||||
|     use super::*; | ||||
|     use charabia::TokenizerBuilder; | ||||
|     use meili_snap::snapshot; | ||||
|     use obkv::KvReader; | ||||
|     use serde_json::json; | ||||
|     #[test] | ||||
|     fn test_tokenize_document() { | ||||
|         let mut fields_ids_map = FieldsIdsMap::new(); | ||||
|  | ||||
|         let field_1 = json!({ | ||||
|                 "name": "doggo", | ||||
|                 "age": 10, | ||||
|         }); | ||||
|  | ||||
|         let field_2 = json!({ | ||||
|                 "catto": { | ||||
|                     "name": "pesti", | ||||
|                     "age": 23, | ||||
|                 } | ||||
|         }); | ||||
|  | ||||
|         let field_3 = json!(["doggo", "catto"]); | ||||
|  | ||||
|         let mut obkv = obkv::KvWriter::memory(); | ||||
|         let field_1_id = fields_ids_map.insert("doggo").unwrap(); | ||||
|         let field_1 = serde_json::to_string(&field_1).unwrap(); | ||||
|         obkv.insert(field_1_id, field_1.as_bytes()).unwrap(); | ||||
|         let field_2_id = fields_ids_map.insert("catto").unwrap(); | ||||
|         let field_2 = serde_json::to_string(&field_2).unwrap(); | ||||
|         obkv.insert(field_2_id, field_2.as_bytes()).unwrap(); | ||||
|         let field_3_id = fields_ids_map.insert("doggo.name").unwrap(); | ||||
|         let field_3 = serde_json::to_string(&field_3).unwrap(); | ||||
|         obkv.insert(field_3_id, field_3.as_bytes()).unwrap(); | ||||
|         let value = obkv.into_inner().unwrap(); | ||||
|         let obkv = KvReader::from_slice(value.as_slice()); | ||||
|  | ||||
|         fields_ids_map.insert("doggo.age"); | ||||
|         fields_ids_map.insert("catto.catto.name"); | ||||
|         fields_ids_map.insert("catto.catto.age"); | ||||
|  | ||||
|         let mut tb = TokenizerBuilder::default(); | ||||
|         let document_tokenizer = DocumentTokenizer { | ||||
|             tokenizer: &tb.build(), | ||||
|             searchable_attributes: None, | ||||
|             localized_attributes_rules: &[], | ||||
|             max_positions_per_attributes: 1000, | ||||
|         }; | ||||
|  | ||||
|         let mut words = std::collections::BTreeMap::new(); | ||||
|         document_tokenizer | ||||
|             .tokenize_document(obkv, &fields_ids_map, &mut |fid, pos, word| { | ||||
|                 words.insert([fid, pos], word.to_string()); | ||||
|             }) | ||||
|             .unwrap(); | ||||
|  | ||||
|         snapshot!(format!("{:#?}", words), @r###" | ||||
|         { | ||||
|             [ | ||||
|                 2, | ||||
|                 0, | ||||
|             ]: "doggo", | ||||
|             [ | ||||
|                 2, | ||||
|                 8, | ||||
|             ]: "doggo", | ||||
|             [ | ||||
|                 2, | ||||
|                 16, | ||||
|             ]: "catto", | ||||
|             [ | ||||
|                 3, | ||||
|                 0, | ||||
|             ]: "10", | ||||
|             [ | ||||
|                 4, | ||||
|                 0, | ||||
|             ]: "pesti", | ||||
|             [ | ||||
|                 5, | ||||
|                 0, | ||||
|             ]: "23", | ||||
|         } | ||||
|         "###); | ||||
|     } | ||||
| } | ||||
|   | ||||
| @@ -15,11 +15,13 @@ use super::channel::{ | ||||
|     WriterOperation, | ||||
| }; | ||||
| use super::document_change::DocumentChange; | ||||
| use super::extract::{SearchableExtractor, WordDocidsExtractor}; | ||||
| use super::merger::merge_grenad_entries; | ||||
| use super::StdResult; | ||||
| use crate::documents::{ | ||||
|     obkv_to_object, DocumentsBatchCursor, DocumentsBatchIndex, PrimaryKey, DEFAULT_PRIMARY_KEY, | ||||
| }; | ||||
| use crate::update::GrenadParameters; | ||||
| use crate::{Index, Result, UserError}; | ||||
|  | ||||
| mod document_deletion; | ||||
| @@ -45,7 +47,7 @@ pub fn index<PI>( | ||||
|     wtxn: &mut RwTxn, | ||||
|     index: &Index, | ||||
|     pool: &ThreadPool, | ||||
|     _document_changes: PI, | ||||
|     document_changes: PI, | ||||
| ) -> Result<()> | ||||
| where | ||||
|     PI: IntoParallelIterator<Item = Result<DocumentChange>> + Send, | ||||
| @@ -59,10 +61,18 @@ where | ||||
|         // TODO manage the errors correctly | ||||
|         let handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, || { | ||||
|             pool.in_place_scope(|_s| { | ||||
|                 let document_changes = document_changes.into_par_iter(); | ||||
|                 // word docids | ||||
|                 // document_changes.into_par_iter().try_for_each(|_dc| Ok(()) as Result<_>) | ||||
|                 // let grenads = extractor_function(document_changes)?; | ||||
|                 // deladd_cbo_roaring_bitmap_sender.word_docids(grenads)?; | ||||
|                 let merger = WordDocidsExtractor::run_extraction( | ||||
|                     index, | ||||
|                     todo!(), | ||||
|                     /// TODO: GrenadParameters::default() should be removed in favor a passed parameter | ||||
|                     GrenadParameters::default(), | ||||
|                     document_changes.clone(), | ||||
|                 )?; | ||||
|  | ||||
|                 /// TODO: manage the errors correctly | ||||
|                 deladd_cbo_roaring_bitmap_sender.word_docids(merger).unwrap(); | ||||
|  | ||||
|                 Ok(()) as Result<_> | ||||
|             }) | ||||
|   | ||||
| @@ -20,14 +20,12 @@ pub fn merge_grenad_entries( | ||||
|  | ||||
|     for merger_operation in receiver { | ||||
|         match merger_operation { | ||||
|             MergerOperation::WordDocidsCursors(cursors) => { | ||||
|             MergerOperation::WordDocidsMerger(merger) => { | ||||
|                 let sender = sender.word_docids(); | ||||
|                 let database = index.word_docids.remap_types::<Bytes, Bytes>(); | ||||
|  | ||||
|                 let mut builder = grenad::MergerBuilder::new(MergeDeladdCboRoaringBitmaps); | ||||
|                 builder.extend(cursors); | ||||
|                 /// TODO manage the error correctly | ||||
|                 let mut merger_iter = builder.build().into_stream_merger_iter().unwrap(); | ||||
|                 let mut merger_iter = merger.into_stream_merger_iter().unwrap(); | ||||
|  | ||||
|                 // TODO manage the error correctly | ||||
|                 while let Some((key, deladd)) = merger_iter.next().unwrap() { | ||||
|   | ||||
| @@ -4,13 +4,12 @@ pub use items_pool::ItemsPool; | ||||
| use super::del_add::DelAdd; | ||||
| use crate::FieldId; | ||||
|  | ||||
| mod document_change; | ||||
| mod merger; | ||||
| // mod extract; | ||||
| mod channel; | ||||
| //mod global_fields_ids_map; | ||||
| mod document_change; | ||||
| mod extract; | ||||
| pub mod indexer; | ||||
| mod items_pool; | ||||
| mod merger; | ||||
|  | ||||
| /// TODO move them elsewhere | ||||
| pub type StdResult<T, E> = std::result::Result<T, E>; | ||||
|   | ||||
		Reference in New Issue
	
	Block a user