mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-25 04:56:28 +00:00 
			
		
		
		
	Reintroduce the distinct and filtering of documents
This commit is contained in:
		
							
								
								
									
										103
									
								
								meilidb-core/src/distinct_map.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										103
									
								
								meilidb-core/src/distinct_map.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,103 @@ | ||||
| use std::hash::Hash; | ||||
| use hashbrown::HashMap; | ||||
|  | ||||
| pub struct DistinctMap<K> { | ||||
|     inner: HashMap<K, usize>, | ||||
|     limit: usize, | ||||
|     len: usize, | ||||
| } | ||||
|  | ||||
| impl<K: Hash + Eq> DistinctMap<K> { | ||||
|     pub fn new(limit: usize) -> Self { | ||||
|         DistinctMap { | ||||
|             inner: HashMap::new(), | ||||
|             limit, | ||||
|             len: 0, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn len(&self) -> usize { | ||||
|         self.len | ||||
|     } | ||||
| } | ||||
|  | ||||
| pub struct BufferedDistinctMap<'a, K> { | ||||
|     internal: &'a mut DistinctMap<K>, | ||||
|     inner: HashMap<K, usize>, | ||||
|     len: usize, | ||||
| } | ||||
|  | ||||
| impl<'a, K: Hash + Eq> BufferedDistinctMap<'a, K> { | ||||
|     pub fn new(internal: &'a mut DistinctMap<K>) -> BufferedDistinctMap<'a, K> { | ||||
|         BufferedDistinctMap { | ||||
|             internal, | ||||
|             inner: HashMap::new(), | ||||
|             len: 0, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn register(&mut self, key: K) -> bool { | ||||
|         let internal_seen = self.internal.inner.get(&key).unwrap_or(&0); | ||||
|         let inner_seen = self.inner.entry(key).or_insert(0); | ||||
|         let seen = *internal_seen + *inner_seen; | ||||
|  | ||||
|         if seen < self.internal.limit { | ||||
|             *inner_seen += 1; | ||||
|             self.len += 1; | ||||
|             true | ||||
|         } else { | ||||
|             false | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn register_without_key(&mut self) -> bool { | ||||
|         self.len += 1; | ||||
|         true | ||||
|     } | ||||
|  | ||||
|     pub fn transfert_to_internal(&mut self) { | ||||
|         for (k, v) in self.inner.drain() { | ||||
|             let value = self.internal.inner.entry(k).or_insert(0); | ||||
|             *value += v; | ||||
|         } | ||||
|  | ||||
|         self.internal.len += self.len; | ||||
|         self.len = 0; | ||||
|     } | ||||
|  | ||||
|     pub fn len(&self) -> usize { | ||||
|         self.internal.len() + self.len | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[cfg(test)] | ||||
| mod tests { | ||||
|     use super::*; | ||||
|  | ||||
|     #[test] | ||||
|     fn easy_distinct_map() { | ||||
|         let mut map = DistinctMap::new(2); | ||||
|         let mut buffered = BufferedDistinctMap::new(&mut map); | ||||
|  | ||||
|         for x in &[1, 1, 1, 2, 3, 4, 5, 6, 6, 6, 6, 6] { | ||||
|             buffered.register(x); | ||||
|         } | ||||
|         buffered.transfert_to_internal(); | ||||
|         assert_eq!(map.len(), 8); | ||||
|  | ||||
|         let mut map = DistinctMap::new(2); | ||||
|         let mut buffered = BufferedDistinctMap::new(&mut map); | ||||
|         assert_eq!(buffered.register(1), true); | ||||
|         assert_eq!(buffered.register(1), true); | ||||
|         assert_eq!(buffered.register(1), false); | ||||
|         assert_eq!(buffered.register(1), false); | ||||
|  | ||||
|         assert_eq!(buffered.register(2), true); | ||||
|         assert_eq!(buffered.register(3), true); | ||||
|         assert_eq!(buffered.register(2), true); | ||||
|         assert_eq!(buffered.register(2), false); | ||||
|  | ||||
|         buffered.transfert_to_internal(); | ||||
|         assert_eq!(map.len(), 5); | ||||
|     } | ||||
| } | ||||
| @@ -3,6 +3,7 @@ | ||||
|  | ||||
| mod automaton; | ||||
| mod database; | ||||
| mod distinct_map; | ||||
| mod error; | ||||
| mod number; | ||||
| mod query_builder; | ||||
|   | ||||
| @@ -1,19 +1,24 @@ | ||||
| use std::time::{Instant, Duration}; | ||||
| use std::ops::Range; | ||||
| use hashbrown::HashMap; | ||||
| use std::hash::Hash; | ||||
| use std::mem; | ||||
| use std::ops::Range; | ||||
| use std::rc::Rc; | ||||
| use std::time::{Instant, Duration}; | ||||
|  | ||||
| use fst::{IntoStreamer, Streamer}; | ||||
| use sdset::SetBuf; | ||||
| use slice_group_by::{GroupBy, GroupByMut}; | ||||
|  | ||||
| use crate::automaton::{Automaton, AutomatonProducer, QueryEnhancer}; | ||||
| use crate::distinct_map::{DistinctMap, BufferedDistinctMap}; | ||||
| use crate::raw_document::{RawDocument, raw_documents_from}; | ||||
| use crate::{Document, DocumentId, Highlight, TmpMatch, criterion::Criteria}; | ||||
| use crate::{store, MResult, reordered_attrs::ReorderedAttrs}; | ||||
|  | ||||
| pub struct QueryBuilder<'a> { | ||||
|     criteria: Criteria<'a>, | ||||
|     searchables_attrs: Option<ReorderedAttrs>, | ||||
| pub struct QueryBuilder<'c, FI = fn(DocumentId) -> bool> { | ||||
|     criteria: Criteria<'c>, | ||||
|     searchable_attrs: Option<ReorderedAttrs>, | ||||
|     filter: Option<FI>, | ||||
|     timeout: Duration, | ||||
|     main_store: store::Main, | ||||
|     postings_lists_store: store::PostingsLists, | ||||
| @@ -185,22 +190,68 @@ fn fetch_raw_documents( | ||||
|     Ok(raw_documents_from(matches, highlights)) | ||||
| } | ||||
|  | ||||
| impl<'a> QueryBuilder<'a> { | ||||
| impl<'c> QueryBuilder<'c> { | ||||
|     pub fn new( | ||||
|         main: store::Main, | ||||
|         postings_lists: store::PostingsLists, | ||||
|         synonyms: store::Synonyms, | ||||
|     ) -> QueryBuilder<'a> { | ||||
|     ) -> QueryBuilder<'c> | ||||
|     { | ||||
|         QueryBuilder::with_criteria(main, postings_lists, synonyms, Criteria::default()) | ||||
|     } | ||||
|  | ||||
|     pub fn with_criteria( | ||||
|         main: store::Main, | ||||
|         postings_lists: store::PostingsLists, | ||||
|         synonyms: store::Synonyms, | ||||
|         criteria: Criteria<'c>, | ||||
|     ) -> QueryBuilder<'c> | ||||
|     { | ||||
|         QueryBuilder { | ||||
|             criteria: Criteria::default(), | ||||
|             searchables_attrs: None, | ||||
|             timeout: Duration::from_secs(1), | ||||
|             criteria, | ||||
|             searchable_attrs: None, | ||||
|             filter: None, | ||||
|             timeout: Duration::from_millis(30), | ||||
|             main_store: main, | ||||
|             postings_lists_store: postings_lists, | ||||
|             synonyms_store: synonyms, | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<'c, FI> QueryBuilder<'c, FI> { | ||||
|     pub fn with_filter<F>(self, function: F) -> QueryBuilder<'c, F> | ||||
|     where F: Fn(DocumentId) -> bool, | ||||
|     { | ||||
|         QueryBuilder { | ||||
|             criteria: self.criteria, | ||||
|             searchable_attrs: self.searchable_attrs, | ||||
|             filter: Some(function), | ||||
|             timeout: self.timeout, | ||||
|             main_store: self.main_store, | ||||
|             postings_lists_store: self.postings_lists_store, | ||||
|             synonyms_store: self.synonyms_store, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn with_fetch_timeout(self, timeout: Duration) -> QueryBuilder<'c, FI> { | ||||
|         QueryBuilder { timeout, ..self } | ||||
|     } | ||||
|  | ||||
|     pub fn with_distinct<F, K>(self, function: F, size: usize) -> DistinctQueryBuilder<'c, FI, F> | ||||
|     where F: Fn(DocumentId) -> Option<K>, | ||||
|           K: Hash + Eq, | ||||
|     { | ||||
|         DistinctQueryBuilder { inner: self, function, size } | ||||
|     } | ||||
|  | ||||
|     pub fn add_searchable_attribute(&mut self, attribute: u16) { | ||||
|         let reorders = self.searchable_attrs.get_or_insert_with(ReorderedAttrs::new); | ||||
|         reorders.insert_attribute(attribute); | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<FI> QueryBuilder<'_, FI> where FI: Fn(DocumentId) -> bool { | ||||
|     pub fn query( | ||||
|         self, | ||||
|         reader: &impl rkv::Readable, | ||||
| @@ -208,15 +259,22 @@ impl<'a> QueryBuilder<'a> { | ||||
|         range: Range<usize>, | ||||
|     ) -> MResult<Vec<Document>> | ||||
|     { | ||||
|         // We delegate the filter work to the distinct query builder, | ||||
|         // specifying a distinct rule that has no effect. | ||||
|         if self.filter.is_some() { | ||||
|             let builder = self.with_distinct(|_| None as Option<()>, 1); | ||||
|             return builder.query(reader, query, range); | ||||
|         } | ||||
|  | ||||
|         let start_processing = Instant::now(); | ||||
|         let mut raw_documents_processed = Vec::new(); | ||||
|         let mut raw_documents_processed = Vec::with_capacity(range.len()); | ||||
|  | ||||
|         let (automaton_producer, query_enhancer) = AutomatonProducer::new( | ||||
|                 reader, | ||||
|                 query, | ||||
|                 self.main_store, | ||||
|                 self.synonyms_store, | ||||
|             )?; | ||||
|             reader, | ||||
|             query, | ||||
|             self.main_store, | ||||
|             self.synonyms_store, | ||||
|         )?; | ||||
|  | ||||
|         let mut automaton_producer = automaton_producer.into_iter(); | ||||
|         let mut automatons = Vec::new(); | ||||
| @@ -231,11 +289,16 @@ impl<'a> QueryBuilder<'a> { | ||||
|                 reader, | ||||
|                 &automatons, | ||||
|                 &query_enhancer, | ||||
|                 self.searchables_attrs.as_ref(), | ||||
|                 self.searchable_attrs.as_ref(), | ||||
|                 &self.main_store, | ||||
|                 &self.postings_lists_store, | ||||
|             )?; | ||||
|  | ||||
|             // stop processing when time is running out | ||||
|             if !raw_documents_processed.is_empty() && start_processing.elapsed() > self.timeout { | ||||
|                 break | ||||
|             } | ||||
|  | ||||
|             let mut groups = vec![raw_documents.as_mut_slice()]; | ||||
|  | ||||
|             'criteria: for criterion in self.criteria.as_ref() { | ||||
| @@ -270,7 +333,7 @@ impl<'a> QueryBuilder<'a> { | ||||
|             raw_documents_processed.clear(); | ||||
|             raw_documents_processed.extend(iter); | ||||
|  | ||||
|             // stop processing after there is no time | ||||
|             // stop processing when time is running out | ||||
|             if start_processing.elapsed() > self.timeout { break } | ||||
|         } | ||||
|  | ||||
| @@ -285,6 +348,189 @@ impl<'a> QueryBuilder<'a> { | ||||
|     } | ||||
| } | ||||
|  | ||||
| pub struct DistinctQueryBuilder<'c, FI, FD> { | ||||
|     inner: QueryBuilder<'c, FI>, | ||||
|     function: FD, | ||||
|     size: usize, | ||||
| } | ||||
|  | ||||
| impl<'c, FI, FD> DistinctQueryBuilder<'c, FI, FD> { | ||||
|     pub fn with_filter<F>(self, function: F) -> DistinctQueryBuilder<'c, F, FD> | ||||
|     where F: Fn(DocumentId) -> bool, | ||||
|     { | ||||
|         DistinctQueryBuilder { | ||||
|             inner: self.inner.with_filter(function), | ||||
|             function: self.function, | ||||
|             size: self.size, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn with_fetch_timeout(self, timeout: Duration) -> DistinctQueryBuilder<'c, FI, FD> { | ||||
|         DistinctQueryBuilder { | ||||
|             inner: self.inner.with_fetch_timeout(timeout), | ||||
|             function: self.function, | ||||
|             size: self.size, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn add_searchable_attribute(&mut self, attribute: u16) { | ||||
|         self.inner.add_searchable_attribute(attribute); | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<'c, FI, FD, K> DistinctQueryBuilder<'c, FI, FD> | ||||
| where FI: Fn(DocumentId) -> bool, | ||||
|       FD: Fn(DocumentId) -> Option<K>, | ||||
|       K: Hash + Eq, | ||||
| { | ||||
|     pub fn query( | ||||
|         self, | ||||
|         reader: &impl rkv::Readable, | ||||
|         query: &str, | ||||
|         range: Range<usize>, | ||||
|     ) -> MResult<Vec<Document>> | ||||
|     { | ||||
|         let start_processing = Instant::now(); | ||||
|         let mut raw_documents_processed = Vec::new(); | ||||
|  | ||||
|         let (automaton_producer, query_enhancer) = AutomatonProducer::new( | ||||
|             reader, | ||||
|             query, | ||||
|             self.inner.main_store, | ||||
|             self.inner.synonyms_store, | ||||
|         )?; | ||||
|  | ||||
|         let mut automaton_producer = automaton_producer.into_iter(); | ||||
|         let mut automatons = Vec::new(); | ||||
|  | ||||
|         // aggregate automatons groups by groups after time | ||||
|         while let Some(auts) = automaton_producer.next() { | ||||
|             automatons.extend(auts); | ||||
|  | ||||
|             // we must retrieve the documents associated | ||||
|             // with the current automatons | ||||
|             let mut raw_documents = fetch_raw_documents( | ||||
|                 reader, | ||||
|                 &automatons, | ||||
|                 &query_enhancer, | ||||
|                 self.inner.searchable_attrs.as_ref(), | ||||
|                 &self.inner.main_store, | ||||
|                 &self.inner.postings_lists_store, | ||||
|             )?; | ||||
|  | ||||
|             // stop processing when time is running out | ||||
|             if !raw_documents_processed.is_empty() && start_processing.elapsed() > self.inner.timeout { | ||||
|                 break | ||||
|             } | ||||
|  | ||||
|             let mut groups = vec![raw_documents.as_mut_slice()]; | ||||
|             let mut key_cache = HashMap::new(); | ||||
|  | ||||
|             let mut filter_map = HashMap::new(); | ||||
|             // these two variables informs on the current distinct map and | ||||
|             // on the raw offset of the start of the group where the | ||||
|             // range.start bound is located according to the distinct function | ||||
|             let mut distinct_map = DistinctMap::new(self.size); | ||||
|             let mut distinct_raw_offset = 0; | ||||
|  | ||||
|             'criteria: for criterion in self.inner.criteria.as_ref() { | ||||
|                 let tmp_groups = mem::replace(&mut groups, Vec::new()); | ||||
|                 let mut buf_distinct = BufferedDistinctMap::new(&mut distinct_map); | ||||
|                 let mut documents_seen = 0; | ||||
|  | ||||
|                 for group in tmp_groups { | ||||
|                     // if this group does not overlap with the requested range, | ||||
|                     // push it without sorting and splitting it | ||||
|                     if documents_seen + group.len() < distinct_raw_offset { | ||||
|                         documents_seen += group.len(); | ||||
|                         groups.push(group); | ||||
|                         continue; | ||||
|                     } | ||||
|  | ||||
|                     group.sort_unstable_by(|a, b| criterion.evaluate(a, b)); | ||||
|  | ||||
|                     for group in group.binary_group_by_mut(|a, b| criterion.eq(a, b)) { | ||||
|                         // we must compute the real distinguished len of this sub-group | ||||
|                         for document in group.iter() { | ||||
|                             let filter_accepted = match &self.inner.filter { | ||||
|                                 Some(filter) => { | ||||
|                                     let entry = filter_map.entry(document.id); | ||||
|                                     *entry.or_insert_with(|| (filter)(document.id)) | ||||
|                                 }, | ||||
|                                 None => true, | ||||
|                             }; | ||||
|  | ||||
|                             if filter_accepted { | ||||
|                                 let entry = key_cache.entry(document.id); | ||||
|                                 let key = entry.or_insert_with(|| (self.function)(document.id).map(Rc::new)); | ||||
|  | ||||
|                                 match key.clone() { | ||||
|                                     Some(key) => buf_distinct.register(key), | ||||
|                                     None => buf_distinct.register_without_key(), | ||||
|                                 }; | ||||
|                             } | ||||
|  | ||||
|                             // the requested range end is reached: stop computing distinct | ||||
|                             if buf_distinct.len() >= range.end { break } | ||||
|                         } | ||||
|  | ||||
|                         documents_seen += group.len(); | ||||
|                         groups.push(group); | ||||
|  | ||||
|                         // if this sub-group does not overlap with the requested range | ||||
|                         // we must update the distinct map and its start index | ||||
|                         if buf_distinct.len() < range.start { | ||||
|                             buf_distinct.transfert_to_internal(); | ||||
|                             distinct_raw_offset = documents_seen; | ||||
|                         } | ||||
|  | ||||
|                         // we have sort enough documents if the last document sorted is after | ||||
|                         // the end of the requested range, we can continue to the next criterion | ||||
|                         if buf_distinct.len() >= range.end { continue 'criteria } | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|  | ||||
|             // once we classified the documents related to the current | ||||
|             // automatons we save that as the next valid result | ||||
|             let mut seen = BufferedDistinctMap::new(&mut distinct_map); | ||||
|             raw_documents_processed.clear(); | ||||
|  | ||||
|             for document in raw_documents.into_iter().skip(distinct_raw_offset) { | ||||
|                 let filter_accepted = match &self.inner.filter { | ||||
|                     Some(_) => filter_map.remove(&document.id).unwrap(), | ||||
|                     None => true, | ||||
|                 }; | ||||
|  | ||||
|                 if filter_accepted { | ||||
|                     let key = key_cache.remove(&document.id).unwrap(); | ||||
|                     let distinct_accepted = match key { | ||||
|                         Some(key) => seen.register(key), | ||||
|                         None => seen.register_without_key(), | ||||
|                     }; | ||||
|  | ||||
|                     if distinct_accepted && seen.len() > range.start { | ||||
|                         raw_documents_processed.push(document); | ||||
|                         if raw_documents_processed.len() == range.len() { break } | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|  | ||||
|             // stop processing when time is running out | ||||
|             if start_processing.elapsed() > self.inner.timeout { break } | ||||
|         } | ||||
|  | ||||
|         // make real documents now that we know | ||||
|         // those must be returned | ||||
|         let documents = raw_documents_processed | ||||
|             .into_iter() | ||||
|             .map(|d| Document::from_raw(d)) | ||||
|             .collect(); | ||||
|  | ||||
|         Ok(documents) | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[cfg(test)] | ||||
| mod tests { | ||||
|     use super::*; | ||||
|   | ||||
		Reference in New Issue
	
	Block a user