feat: Replace the rayon::scope by always checking time

This commit is contained in:
Clément Renault
2019-09-01 18:52:26 +02:00
parent a420fbf1e8
commit c8ee21f227
2 changed files with 87 additions and 113 deletions

View File

@@ -6,13 +6,12 @@ edition = "2018"
[dependencies] [dependencies]
byteorder = "1.3.1" byteorder = "1.3.1"
crossbeam-channel = "0.3.9"
deunicode = "1.0.0" deunicode = "1.0.0"
hashbrown = "0.2.2" hashbrown = "0.2.2"
lazy_static = "1.2.0" lazy_static = "1.2.0"
log = "0.4.6" log = "0.4.6"
meilidb-tokenizer = { path = "../meilidb-tokenizer", version = "0.1.0" } meilidb-tokenizer = { path = "../meilidb-tokenizer", version = "0.1.0" }
rayon = "1.0.3" rayon = "1.2.0"
sdset = "0.3.2" sdset = "0.3.2"
serde = { version = "1.0.88", features = ["derive"] } serde = { version = "1.0.88", features = ["derive"] }
slice-group-by = "0.2.6" slice-group-by = "0.2.6"

View File

@@ -2,12 +2,12 @@ use std::hash::Hash;
use std::ops::Range; use std::ops::Range;
use std::rc::Rc; use std::rc::Rc;
use std::time::{Instant, Duration}; use std::time::{Instant, Duration};
use std::{iter, mem, cmp, cmp::Reverse}; use std::{mem, cmp, cmp::Reverse};
use fst::{Streamer, IntoStreamer}; use fst::{Streamer, IntoStreamer};
use hashbrown::HashMap; use hashbrown::HashMap;
use levenshtein_automata::DFA; use levenshtein_automata::DFA;
use log::{info, error}; use log::info;
use meilidb_tokenizer::{is_cjk, split_query_string}; use meilidb_tokenizer::{is_cjk, split_query_string};
use rayon::slice::ParallelSliceMut; use rayon::slice::ParallelSliceMut;
use rayon::iter::{ParallelIterator, ParallelBridge}; use rayon::iter::{ParallelIterator, ParallelBridge};
@@ -246,12 +246,6 @@ fn multiword_rewrite_matches(
// for each attribute of each document // for each attribute of each document
for same_document_attribute in matches.linear_group_by_key(|(id, m)| (*id, m.attribute)) { for same_document_attribute in matches.linear_group_by_key(|(id, m)| (*id, m.attribute)) {
let elapsed = start.elapsed();
if timeout.map_or(false, |timeout| elapsed > timeout) {
info!("abort multiword rewrite after {:.2?}", elapsed);
break;
}
// padding will only be applied // padding will only be applied
// to word indices in the same attribute // to word indices in the same attribute
let mut padding = 0; let mut padding = 0;
@@ -328,6 +322,9 @@ fn multiword_rewrite_matches(
padding += biggest; padding += biggest;
} }
// check the timeout *after* having processed at least one element
if timeout.map_or(false, |timeout| start.elapsed() > timeout) { break }
} }
info!("main multiword rewrite took {:.2?}", start.elapsed()); info!("main multiword rewrite took {:.2?}", start.elapsed());
@@ -350,32 +347,25 @@ where S: Store + Sync,
let store = &self.store; let store = &self.store;
let fetch_timeout = &self.fetch_timeout; let fetch_timeout = &self.fetch_timeout;
rayon::scope(move |s| {
enum Error<E> {
SendError,
StoreError(E),
}
let mut matches = Vec::new(); let mut matches = Vec::new();
let mut highlights = Vec::new(); let mut highlights = Vec::new();
let recv_end_time = fetch_timeout.map(|d| Instant::now() + d * 75 / 100); let timeout = fetch_timeout.map(|d| d * 75 / 100);
let start = Instant::now(); let start = Instant::now();
let (sender, receiver) = crossbeam_channel::unbounded(); let results: Vec<_> = automatons
s.spawn(move |_| {
let result = automatons
.into_iter() .into_iter()
.par_bridge() .par_bridge()
.try_for_each_with((sender, store, searchables), |data, automaton| { .map_with((store, searchables), |(store, searchables), automaton| {
let (sender, store, searchables) = data;
let Automaton { index, is_exact, query_len, .. } = automaton; let Automaton { index, is_exact, query_len, .. } = automaton;
let dfa = automaton.dfa(); let dfa = automaton.dfa();
let words = store.words().map_err(Error::StoreError)?; let words = match store.words() {
let mut stream = words.search(&dfa).into_stream(); Ok(words) => words,
Err(err) => return Some(Err(err)),
};
let mut stream = words.search(&dfa).into_stream();
let mut matches = Vec::new(); let mut matches = Vec::new();
let mut highlights = Vec::new(); let mut highlights = Vec::new();
@@ -383,17 +373,16 @@ where S: Store + Sync,
let distance = dfa.eval(input).to_u8(); let distance = dfa.eval(input).to_u8();
let is_exact = is_exact && distance == 0 && input.len() == query_len; let is_exact = is_exact && distance == 0 && input.len() == query_len;
let doc_indexes = store.word_indexes(input).map_err(Error::StoreError)?; let doc_indexes = match store.word_indexes(input) {
let doc_indexes = match doc_indexes { Ok(Some(doc_indexes)) => doc_indexes,
Some(doc_indexes) => doc_indexes, Ok(None) => continue,
None => continue, Err(err) => return Some(Err(err)),
}; };
matches.reserve(doc_indexes.len()); matches.reserve(doc_indexes.len());
highlights.reserve(doc_indexes.len()); highlights.reserve(doc_indexes.len());
for di in doc_indexes.as_slice() { for di in doc_indexes.as_slice() {
let attribute = searchables.map_or(Some(di.attribute), |r| r.get(di.attribute)); let attribute = searchables.map_or(Some(di.attribute), |r| r.get(di.attribute));
if let Some(attribute) = attribute { if let Some(attribute) = attribute {
let match_ = TmpMatch { let match_ = TmpMatch {
@@ -414,35 +403,22 @@ where S: Store + Sync,
highlights.push((di.document_id, highlight)); highlights.push((di.document_id, highlight));
} }
} }
// check the timeout *after* having processed at least one element
if timeout.map_or(false, |timeout| start.elapsed() > timeout) { break }
} }
sender.send((matches, highlights)).map_err(|_| Error::SendError) Some(Ok((matches, highlights)))
}); })
.while_some()
.collect();
if let Err(Error::StoreError(e)) = result { for result in results {
error!("{}", e); let (mut rcv_matches, mut rcv_highlights) = result?;
}
});
let iter = receiver.recv().into_iter().chain(iter::from_fn(|| {
let recv_end_time = match recv_end_time {
Some(time) => time,
None => return receiver.recv().ok(),
};
match recv_end_time.checked_duration_since(Instant::now()) {
Some(timeout) => receiver.recv_timeout(timeout).ok(),
None => None,
}
}));
for (mut rcv_matches, mut rcv_highlights) in iter {
matches.append(&mut rcv_matches); matches.append(&mut rcv_matches);
highlights.append(&mut rcv_highlights); highlights.append(&mut rcv_highlights);
} }
drop(receiver);
info!("main query all took {:.2?}", start.elapsed()); info!("main query all took {:.2?}", start.elapsed());
info!("{} total matches to rewrite", matches.len()); info!("{} total matches to rewrite", matches.len());
@@ -467,7 +443,6 @@ where S: Store + Sync,
info!("{} total documents to classify", raw_documents.len()); info!("{} total documents to classify", raw_documents.len());
Ok(raw_documents) Ok(raw_documents)
})
} }
} }