Rework the QueryBuilder to make it easier to construct and use

This commit is contained in:
Clément Renault
2019-10-17 14:45:21 +02:00
parent 0ff73039e5
commit d941c512db
3 changed files with 334 additions and 293 deletions

View File

@@ -268,9 +268,10 @@ fn crop_text(
fn search_command(command: SearchCommand, database: Database) -> Result<(), Box<dyn Error>> { fn search_command(command: SearchCommand, database: Database) -> Result<(), Box<dyn Error>> {
let env = &database.env; let env = &database.env;
let index = database.open_index(INDEX_NAME).expect("Could not find index"); let index = database.open_index(INDEX_NAME).expect("Could not find index");
let reader = env.read_txn().unwrap();
let reader = env.read_txn().unwrap();
let schema = index.main.schema(&reader)?; let schema = index.main.schema(&reader)?;
reader.abort();
let schema = schema.ok_or(meilidb_core::Error::SchemaMissing)?; let schema = schema.ok_or(meilidb_core::Error::SchemaMissing)?;
let fields = command.displayed_fields.iter().map(String::as_str); let fields = command.displayed_fields.iter().map(String::as_str);
@@ -285,35 +286,32 @@ fn search_command(command: SearchCommand, database: Database) -> Result<(), Box<
Ok(query) => { Ok(query) => {
let start_total = Instant::now(); let start_total = Instant::now();
let builder = index.query_builder(); let reader = env.read_txn().unwrap();
let builder = if let Some(timeout) = command.fetch_timeout_ms { let ref_index = &index;
builder.with_fetch_timeout(Duration::from_millis(timeout)) let ref_reader = &reader;
} else {
builder
};
let documents = match command.filter { let mut builder = index.query_builder();
Some(ref filter) => { if let Some(timeout) = command.fetch_timeout_ms {
let filter = filter.as_str(); builder.with_fetch_timeout(Duration::from_millis(timeout));
let (positive, filter) = if filter.chars().next() == Some('!') { }
(false, &filter[1..])
} else {
(true, filter)
};
let attr = schema.attribute(&filter).expect("Could not find filtered attribute"); if let Some(ref filter) = command.filter {
let filter = filter.as_str();
let (positive, filter) = if filter.chars().next() == Some('!') {
(false, &filter[1..])
} else {
(true, filter)
};
let builder = builder.with_filter(|document_id| { let attr = schema.attribute(&filter).expect("Could not find filtered attribute");
let string: String = index.document_attribute(&reader, document_id, attr).unwrap().unwrap();
(string == "true") == positive
});
builder.query(&reader, &query, 0..command.number_results)? builder.with_filter(move |document_id| {
}, let string: String = ref_index.document_attribute(ref_reader, document_id, attr).unwrap().unwrap();
None => { (string == "true") == positive
builder.query(&reader, &query, 0..command.number_results)? });
} }
};
let documents = builder.query(ref_reader, &query, 0..command.number_results)?;
let mut retrieve_duration = Duration::default(); let mut retrieve_duration = Duration::default();

View File

@@ -1,5 +1,4 @@
use hashbrown::HashMap; use hashbrown::HashMap;
use std::hash::Hash;
use std::mem; use std::mem;
use std::ops::Range; use std::ops::Range;
use std::rc::Rc; use std::rc::Rc;
@@ -15,10 +14,11 @@ use crate::raw_document::{RawDocument, raw_documents_from};
use crate::{Document, DocumentId, Highlight, TmpMatch, criterion::Criteria}; use crate::{Document, DocumentId, Highlight, TmpMatch, criterion::Criteria};
use crate::{store, MResult, reordered_attrs::ReorderedAttrs}; use crate::{store, MResult, reordered_attrs::ReorderedAttrs};
pub struct QueryBuilder<'c, FI = fn(DocumentId) -> bool> { pub struct QueryBuilder<'c, 'f, 'd> {
criteria: Criteria<'c>, criteria: Criteria<'c>,
searchable_attrs: Option<ReorderedAttrs>, searchable_attrs: Option<ReorderedAttrs>,
filter: Option<FI>, filter: Option<Box<dyn Fn(DocumentId) -> bool + 'f>>,
distinct: Option<(Box<dyn Fn(DocumentId) -> Option<u64> + 'd>, usize)>,
timeout: Option<Duration>, timeout: Option<Duration>,
main_store: store::Main, main_store: store::Main,
postings_lists_store: store::PostingsLists, postings_lists_store: store::PostingsLists,
@@ -204,13 +204,13 @@ fn fetch_raw_documents(
Ok(raw_documents_from(matches, highlights, fields_counts)) Ok(raw_documents_from(matches, highlights, fields_counts))
} }
impl<'c> QueryBuilder<'c> { impl<'c, 'f, 'd> QueryBuilder<'c, 'f, 'd> {
pub fn new( pub fn new(
main: store::Main, main: store::Main,
postings_lists: store::PostingsLists, postings_lists: store::PostingsLists,
documents_fields_counts: store::DocumentsFieldsCounts, documents_fields_counts: store::DocumentsFieldsCounts,
synonyms: store::Synonyms, synonyms: store::Synonyms,
) -> QueryBuilder<'c> ) -> QueryBuilder<'c, 'f, 'd>
{ {
QueryBuilder::with_criteria( QueryBuilder::with_criteria(
main, main,
@@ -227,12 +227,13 @@ impl<'c> QueryBuilder<'c> {
documents_fields_counts: store::DocumentsFieldsCounts, documents_fields_counts: store::DocumentsFieldsCounts,
synonyms: store::Synonyms, synonyms: store::Synonyms,
criteria: Criteria<'c>, criteria: Criteria<'c>,
) -> QueryBuilder<'c> ) -> QueryBuilder<'c, 'f, 'd>
{ {
QueryBuilder { QueryBuilder {
criteria, criteria,
searchable_attrs: None, searchable_attrs: None,
filter: None, filter: None,
distinct: None,
timeout: None, timeout: None,
main_store: main, main_store: main,
postings_lists_store: postings_lists, postings_lists_store: postings_lists,
@@ -242,40 +243,28 @@ impl<'c> QueryBuilder<'c> {
} }
} }
impl<'c, FI> QueryBuilder<'c, FI> { impl<'c, 'f, 'd> QueryBuilder<'c, 'f, 'd> {
pub fn with_filter<F>(self, function: F) -> QueryBuilder<'c, F> pub fn with_filter<F>(&mut self, function: F)
where F: Fn(DocumentId) -> bool, where F: Fn(DocumentId) -> bool + 'f,
{ {
QueryBuilder { self.filter = Some(Box::new(function))
criteria: self.criteria,
searchable_attrs: self.searchable_attrs,
filter: Some(function),
timeout: self.timeout,
main_store: self.main_store,
postings_lists_store: self.postings_lists_store,
documents_fields_counts_store: self.documents_fields_counts_store,
synonyms_store: self.synonyms_store,
}
} }
pub fn with_fetch_timeout(self, timeout: Duration) -> QueryBuilder<'c, FI> { pub fn with_fetch_timeout(&mut self, timeout: Duration) {
QueryBuilder { timeout: Some(timeout), ..self } self.timeout = Some(timeout)
} }
pub fn with_distinct<F, K>(self, function: F, size: usize) -> DistinctQueryBuilder<'c, FI, F> pub fn with_distinct<F, K>(&mut self, function: F, size: usize)
where F: Fn(DocumentId) -> Option<K>, where F: Fn(DocumentId) -> Option<u64> + 'd,
K: Hash + Eq,
{ {
DistinctQueryBuilder { inner: self, function, size } self.distinct = Some((Box::new(function), size))
} }
pub fn add_searchable_attribute(&mut self, attribute: u16) { pub fn add_searchable_attribute(&mut self, attribute: u16) {
let reorders = self.searchable_attrs.get_or_insert_with(ReorderedAttrs::new); let reorders = self.searchable_attrs.get_or_insert_with(ReorderedAttrs::new);
reorders.insert_attribute(attribute); reorders.insert_attribute(attribute);
} }
}
impl<FI> QueryBuilder<'_, FI> where FI: Fn(DocumentId) -> bool {
pub fn query( pub fn query(
self, self,
reader: &zlmdb::RoTxn, reader: &zlmdb::RoTxn,
@@ -283,286 +272,336 @@ impl<FI> QueryBuilder<'_, FI> where FI: Fn(DocumentId) -> bool {
range: Range<usize>, range: Range<usize>,
) -> MResult<Vec<Document>> ) -> MResult<Vec<Document>>
{ {
// We delegate the filter work to the distinct query builder, match self.distinct {
// specifying a distinct rule that has no effect. Some((distinct, distinct_size)) => {
if self.filter.is_some() { raw_query_with_distinct(
let builder = self.with_distinct(|_| None as Option<()>, 1); reader,
return builder.query(reader, query, range); query,
} range,
self.filter,
let start_processing = Instant::now(); distinct,
let mut raw_documents_processed = Vec::with_capacity(range.len()); distinct_size,
self.timeout,
let (automaton_producer, query_enhancer) = AutomatonProducer::new( self.criteria,
reader, self.searchable_attrs,
query, self.main_store,
self.main_store, self.postings_lists_store,
self.synonyms_store, self.documents_fields_counts_store,
)?; self.synonyms_store,
)
let mut automaton_producer = automaton_producer.into_iter(); },
let mut automatons = Vec::new(); None => {
raw_query(
// aggregate automatons groups by groups after time reader,
while let Some(auts) = automaton_producer.next() { query,
automatons.extend(auts); range,
self.filter,
// we must retrieve the documents associated self.timeout,
// with the current automatons self.criteria,
let mut raw_documents = fetch_raw_documents( self.searchable_attrs,
reader, self.main_store,
&automatons, self.postings_lists_store,
&query_enhancer, self.documents_fields_counts_store,
self.searchable_attrs.as_ref(), self.synonyms_store,
&self.main_store, )
&self.postings_lists_store,
&self.documents_fields_counts_store,
)?;
// stop processing when time is running out
if let Some(timeout) = self.timeout {
if !raw_documents_processed.is_empty() && start_processing.elapsed() > timeout {
break
}
}
let mut groups = vec![raw_documents.as_mut_slice()];
'criteria: for criterion in self.criteria.as_ref() {
let tmp_groups = mem::replace(&mut groups, Vec::new());
let mut documents_seen = 0;
for group in tmp_groups {
// if this group does not overlap with the requested range,
// push it without sorting and splitting it
if documents_seen + group.len() < range.start {
documents_seen += group.len();
groups.push(group);
continue;
}
group.sort_unstable_by(|a, b| criterion.evaluate(a, b));
for group in group.binary_group_by_mut(|a, b| criterion.eq(a, b)) {
documents_seen += group.len();
groups.push(group);
// we have sort enough documents if the last document sorted is after
// the end of the requested range, we can continue to the next criterion
if documents_seen >= range.end { continue 'criteria }
}
}
}
// once we classified the documents related to the current
// automatons we save that as the next valid result
let iter = raw_documents.into_iter().skip(range.start).take(range.len());
raw_documents_processed.clear();
raw_documents_processed.extend(iter);
// stop processing when time is running out
if let Some(timeout) = self.timeout {
if start_processing.elapsed() > timeout { break }
} }
} }
// make real documents now that we know
// those must be returned
let documents = raw_documents_processed
.into_iter()
.map(|d| Document::from_raw(d))
.collect();
Ok(documents)
} }
} }
pub struct DistinctQueryBuilder<'c, FI, FD> { fn raw_query<'c, FI>(
inner: QueryBuilder<'c, FI>, reader: &zlmdb::RoTxn,
function: FD,
size: usize,
}
impl<'c, FI, FD> DistinctQueryBuilder<'c, FI, FD> { query: &str,
pub fn with_filter<F>(self, function: F) -> DistinctQueryBuilder<'c, F, FD> range: Range<usize>,
where F: Fn(DocumentId) -> bool,
{
DistinctQueryBuilder {
inner: self.inner.with_filter(function),
function: self.function,
size: self.size,
}
}
pub fn with_fetch_timeout(self, timeout: Duration) -> DistinctQueryBuilder<'c, FI, FD> { filter: Option<FI>,
DistinctQueryBuilder { timeout: Option<Duration>,
inner: self.inner.with_fetch_timeout(timeout),
function: self.function,
size: self.size,
}
}
pub fn add_searchable_attribute(&mut self, attribute: u16) { criteria: Criteria<'c>,
self.inner.add_searchable_attribute(attribute); searchable_attrs: Option<ReorderedAttrs>,
}
}
impl<'c, FI, FD, K> DistinctQueryBuilder<'c, FI, FD> main_store: store::Main,
postings_lists_store: store::PostingsLists,
documents_fields_counts_store: store::DocumentsFieldsCounts,
synonyms_store: store::Synonyms,
) -> MResult<Vec<Document>>
where FI: Fn(DocumentId) -> bool, where FI: Fn(DocumentId) -> bool,
FD: Fn(DocumentId) -> Option<K>,
K: Hash + Eq,
{ {
pub fn query( // We delegate the filter work to the distinct query builder,
self, // specifying a distinct rule that has no effect.
reader: &zlmdb::RoTxn, if filter.is_some() {
query: &str, let distinct = |_| None;
range: Range<usize>, let distinct_size = 1;
) -> MResult<Vec<Document>> return raw_query_with_distinct(
{
let start_processing = Instant::now();
let mut raw_documents_processed = Vec::new();
let (automaton_producer, query_enhancer) = AutomatonProducer::new(
reader, reader,
query, query,
self.inner.main_store, range,
self.inner.synonyms_store, filter,
distinct,
distinct_size,
timeout,
criteria,
searchable_attrs,
main_store,
postings_lists_store,
documents_fields_counts_store,
synonyms_store,
)
}
let start_processing = Instant::now();
let mut raw_documents_processed = Vec::with_capacity(range.len());
let (automaton_producer, query_enhancer) = AutomatonProducer::new(
reader,
query,
main_store,
synonyms_store,
)?;
let mut automaton_producer = automaton_producer.into_iter();
let mut automatons = Vec::new();
// aggregate automatons groups by groups after time
while let Some(auts) = automaton_producer.next() {
automatons.extend(auts);
// we must retrieve the documents associated
// with the current automatons
let mut raw_documents = fetch_raw_documents(
reader,
&automatons,
&query_enhancer,
searchable_attrs.as_ref(),
&main_store,
&postings_lists_store,
&documents_fields_counts_store,
)?; )?;
let mut automaton_producer = automaton_producer.into_iter(); // stop processing when time is running out
let mut automatons = Vec::new(); if let Some(timeout) = timeout {
if !raw_documents_processed.is_empty() && start_processing.elapsed() > timeout {
break
}
}
// aggregate automatons groups by groups after time let mut groups = vec![raw_documents.as_mut_slice()];
while let Some(auts) = automaton_producer.next() {
automatons.extend(auts);
// we must retrieve the documents associated 'criteria: for criterion in criteria.as_ref() {
// with the current automatons let tmp_groups = mem::replace(&mut groups, Vec::new());
let mut raw_documents = fetch_raw_documents( let mut documents_seen = 0;
reader,
&automatons,
&query_enhancer,
self.inner.searchable_attrs.as_ref(),
&self.inner.main_store,
&self.inner.postings_lists_store,
&self.inner.documents_fields_counts_store,
)?;
// stop processing when time is running out for group in tmp_groups {
if let Some(timeout) = self.inner.timeout { // if this group does not overlap with the requested range,
if !raw_documents_processed.is_empty() && start_processing.elapsed() > timeout { // push it without sorting and splitting it
break if documents_seen + group.len() < range.start {
documents_seen += group.len();
groups.push(group);
continue;
}
group.sort_unstable_by(|a, b| criterion.evaluate(a, b));
for group in group.binary_group_by_mut(|a, b| criterion.eq(a, b)) {
documents_seen += group.len();
groups.push(group);
// we have sort enough documents if the last document sorted is after
// the end of the requested range, we can continue to the next criterion
if documents_seen >= range.end { continue 'criteria }
} }
} }
}
let mut groups = vec![raw_documents.as_mut_slice()]; // once we classified the documents related to the current
let mut key_cache = HashMap::new(); // automatons we save that as the next valid result
let iter = raw_documents.into_iter().skip(range.start).take(range.len());
raw_documents_processed.clear();
raw_documents_processed.extend(iter);
let mut filter_map = HashMap::new(); // stop processing when time is running out
// these two variables informs on the current distinct map and if let Some(timeout) = timeout {
// on the raw offset of the start of the group where the if start_processing.elapsed() > timeout { break }
// range.start bound is located according to the distinct function }
let mut distinct_map = DistinctMap::new(self.size); }
let mut distinct_raw_offset = 0;
'criteria: for criterion in self.inner.criteria.as_ref() { // make real documents now that we know
let tmp_groups = mem::replace(&mut groups, Vec::new()); // those must be returned
let mut buf_distinct = BufferedDistinctMap::new(&mut distinct_map); let documents = raw_documents_processed
let mut documents_seen = 0; .into_iter()
.map(|d| Document::from_raw(d))
.collect();
for group in tmp_groups { Ok(documents)
// if this group does not overlap with the requested range, }
// push it without sorting and splitting it
if documents_seen + group.len() < distinct_raw_offset {
documents_seen += group.len();
groups.push(group);
continue;
}
group.sort_unstable_by(|a, b| criterion.evaluate(a, b)); fn raw_query_with_distinct<'c, FI, FD>(
reader: &zlmdb::RoTxn,
for group in group.binary_group_by_mut(|a, b| criterion.eq(a, b)) { query: &str,
// we must compute the real distinguished len of this sub-group range: Range<usize>,
for document in group.iter() {
let filter_accepted = match &self.inner.filter { filter: Option<FI>,
Some(filter) => {
let entry = filter_map.entry(document.id); distinct: FD,
*entry.or_insert_with(|| (filter)(document.id)) distinct_size: usize,
}, timeout: Option<Duration>,
None => true,
criteria: Criteria<'c>,
searchable_attrs: Option<ReorderedAttrs>,
main_store: store::Main,
postings_lists_store: store::PostingsLists,
documents_fields_counts_store: store::DocumentsFieldsCounts,
synonyms_store: store::Synonyms,
) -> MResult<Vec<Document>>
where FI: Fn(DocumentId) -> bool,
FD: Fn(DocumentId) -> Option<u64>,
{
let start_processing = Instant::now();
let mut raw_documents_processed = Vec::new();
let (automaton_producer, query_enhancer) = AutomatonProducer::new(
reader,
query,
main_store,
synonyms_store,
)?;
let mut automaton_producer = automaton_producer.into_iter();
let mut automatons = Vec::new();
// aggregate automatons groups by groups after time
while let Some(auts) = automaton_producer.next() {
automatons.extend(auts);
// we must retrieve the documents associated
// with the current automatons
let mut raw_documents = fetch_raw_documents(
reader,
&automatons,
&query_enhancer,
searchable_attrs.as_ref(),
&main_store,
&postings_lists_store,
&documents_fields_counts_store,
)?;
// stop processing when time is running out
if let Some(timeout) = timeout {
if !raw_documents_processed.is_empty() && start_processing.elapsed() > timeout {
break
}
}
let mut groups = vec![raw_documents.as_mut_slice()];
let mut key_cache = HashMap::new();
let mut filter_map = HashMap::new();
// these two variables informs on the current distinct map and
// on the raw offset of the start of the group where the
// range.start bound is located according to the distinct function
let mut distinct_map = DistinctMap::new(distinct_size);
let mut distinct_raw_offset = 0;
'criteria: for criterion in criteria.as_ref() {
let tmp_groups = mem::replace(&mut groups, Vec::new());
let mut buf_distinct = BufferedDistinctMap::new(&mut distinct_map);
let mut documents_seen = 0;
for group in tmp_groups {
// if this group does not overlap with the requested range,
// push it without sorting and splitting it
if documents_seen + group.len() < distinct_raw_offset {
documents_seen += group.len();
groups.push(group);
continue;
}
group.sort_unstable_by(|a, b| criterion.evaluate(a, b));
for group in group.binary_group_by_mut(|a, b| criterion.eq(a, b)) {
// we must compute the real distinguished len of this sub-group
for document in group.iter() {
let filter_accepted = match &filter {
Some(filter) => {
let entry = filter_map.entry(document.id);
*entry.or_insert_with(|| (filter)(document.id))
},
None => true,
};
if filter_accepted {
let entry = key_cache.entry(document.id);
let key = entry.or_insert_with(|| (distinct)(document.id).map(Rc::new));
match key.clone() {
Some(key) => buf_distinct.register(key),
None => buf_distinct.register_without_key(),
}; };
if filter_accepted {
let entry = key_cache.entry(document.id);
let key = entry.or_insert_with(|| (self.function)(document.id).map(Rc::new));
match key.clone() {
Some(key) => buf_distinct.register(key),
None => buf_distinct.register_without_key(),
};
}
// the requested range end is reached: stop computing distinct
if buf_distinct.len() >= range.end { break }
} }
documents_seen += group.len(); // the requested range end is reached: stop computing distinct
groups.push(group); if buf_distinct.len() >= range.end { break }
// if this sub-group does not overlap with the requested range
// we must update the distinct map and its start index
if buf_distinct.len() < range.start {
buf_distinct.transfert_to_internal();
distinct_raw_offset = documents_seen;
}
// we have sort enough documents if the last document sorted is after
// the end of the requested range, we can continue to the next criterion
if buf_distinct.len() >= range.end { continue 'criteria }
} }
documents_seen += group.len();
groups.push(group);
// if this sub-group does not overlap with the requested range
// we must update the distinct map and its start index
if buf_distinct.len() < range.start {
buf_distinct.transfert_to_internal();
distinct_raw_offset = documents_seen;
}
// we have sort enough documents if the last document sorted is after
// the end of the requested range, we can continue to the next criterion
if buf_distinct.len() >= range.end { continue 'criteria }
} }
} }
}
// once we classified the documents related to the current // once we classified the documents related to the current
// automatons we save that as the next valid result // automatons we save that as the next valid result
let mut seen = BufferedDistinctMap::new(&mut distinct_map); let mut seen = BufferedDistinctMap::new(&mut distinct_map);
raw_documents_processed.clear(); raw_documents_processed.clear();
for document in raw_documents.into_iter().skip(distinct_raw_offset) { for document in raw_documents.into_iter().skip(distinct_raw_offset) {
let filter_accepted = match &self.inner.filter { let filter_accepted = match &filter {
Some(_) => filter_map.remove(&document.id).unwrap(), Some(_) => filter_map.remove(&document.id).unwrap(),
None => true, None => true,
};
if filter_accepted {
let key = key_cache.remove(&document.id).unwrap();
let distinct_accepted = match key {
Some(key) => seen.register(key),
None => seen.register_without_key(),
}; };
if filter_accepted { if distinct_accepted && seen.len() > range.start {
let key = key_cache.remove(&document.id).unwrap(); raw_documents_processed.push(document);
let distinct_accepted = match key { if raw_documents_processed.len() == range.len() { break }
Some(key) => seen.register(key),
None => seen.register_without_key(),
};
if distinct_accepted && seen.len() > range.start {
raw_documents_processed.push(document);
if raw_documents_processed.len() == range.len() { break }
}
} }
} }
// stop processing when time is running out
if let Some(timeout) = self.inner.timeout {
if start_processing.elapsed() > timeout { break }
}
} }
// make real documents now that we know // stop processing when time is running out
// those must be returned if let Some(timeout) = timeout {
let documents = raw_documents_processed if start_processing.elapsed() > timeout { break }
.into_iter() }
.map(|d| Document::from_raw(d))
.collect();
Ok(documents)
} }
// make real documents now that we know
// those must be returned
let documents = raw_documents_processed
.into_iter()
.map(|d| Document::from_raw(d))
.collect();
Ok(documents)
} }
#[cfg(test)] #[cfg(test)]

View File

@@ -202,7 +202,11 @@ impl Index {
) )
} }
pub fn query_builder_with_criteria<'c>(&self, criteria: Criteria<'c>) -> QueryBuilder<'c> { pub fn query_builder_with_criteria<'c, 'f, 'd>(
&self,
criteria: Criteria<'c>,
) -> QueryBuilder<'c, 'f, 'd>
{
QueryBuilder::with_criteria( QueryBuilder::with_criteria(
self.main, self.main,
self.postings_lists, self.postings_lists,