Compare commits

...

15 Commits

Author SHA1 Message Date
a136c62208 Merge pull request #249 from meilisearch/display-all-updates
Display enqueued along with processed updates
2019-10-31 13:53:46 +01:00
cc461b1331 Display enqueued along with processed updates 2019-10-31 12:25:52 +01:00
dbe5363672 Merge pull request #248 from meilisearch/fix-highlight-too-long
Correctly highlight when query string is too long
2019-10-30 18:19:06 +01:00
45d4361e7d Correctly highlight when query string is longer 2019-10-30 17:49:50 +01:00
b28c44cc6b Merge pull request #247 from meilisearch/bump-meilidb
Bump the meili-core/schema/tokenizer crates to 0.5.11
2019-10-30 17:48:26 +01:00
b709a7a30a Bump the meili-core/schema/tokenizer crates to 0.5.11 2019-10-30 17:40:31 +01:00
64c25bdb40 Merge pull request #246 from meilisearch/better-highlighting-area
Make the highlight system much better
2019-10-30 17:39:12 +01:00
c230f244be Make the highlight system much better 2019-10-30 17:32:29 +01:00
02af4ff113 Merge pull request #245 from meilisearch/reindex-all-documents-reduce-memory-usage
Reduce the ram consumption when re-indexing all the documents
2019-10-29 17:54:47 +01:00
4dff8a215e Reduce the ram consumption when re-indexing all the documents 2019-10-29 17:46:23 +01:00
41065305aa Merge pull request #244 from meilisearch/reintroduce-stop-words
Reintroduce stop words
2019-10-29 16:35:03 +01:00
e9dce3ce81 Add a test to ensure that the indexer support stop words 2019-10-29 16:18:06 +01:00
ff7dde7522 Make the RawIndexer support stop words 2019-10-29 16:18:06 +01:00
a226fd23c3 Introduce the stop words deletion update type 2019-10-29 16:18:06 +01:00
776673ebae Introduce the stop words addition update type 2019-10-29 15:24:09 +01:00
17 changed files with 639 additions and 90 deletions

View File

@ -1,6 +1,6 @@
[package]
name = "meilidb-core"
version = "0.1.0"
version = "0.5.11"
authors = ["Kerollmops <clement@meilisearch.com>"]
edition = "2018"
@ -14,8 +14,8 @@ env_logger = "0.7.0"
hashbrown = { version = "0.6.0", features = ["serde"] }
heed = "0.1.0"
log = "0.4.8"
meilidb-schema = { path = "../meilidb-schema", version = "0.1.0" }
meilidb-tokenizer = { path = "../meilidb-tokenizer", version = "0.1.0" }
meilidb-schema = { path = "../meilidb-schema", version = "0.5.11" }
meilidb-tokenizer = { path = "../meilidb-tokenizer", version = "0.5.11" }
once_cell = "1.2.0"
ordered-float = { version = "1.0.2", features = ["serde"] }
sdset = "0.3.3"

View File

@ -12,7 +12,7 @@ use serde::{Deserialize, Serialize};
use structopt::StructOpt;
use termcolor::{Color, ColorChoice, ColorSpec, StandardStream, WriteColor};
use meilidb_core::{Database, Highlight, UpdateResult};
use meilidb_core::{Database, Highlight, ProcessedUpdateResult};
use meilidb_schema::SchemaAttr;
const INDEX_NAME: &str = "default";
@ -97,7 +97,7 @@ fn index_command(command: IndexCommand, database: Database) -> Result<(), Box<dy
let start = Instant::now();
let (sender, receiver) = mpsc::sync_channel(100);
let update_fn = move |update: UpdateResult| sender.send(update.update_id).unwrap();
let update_fn = move |update: ProcessedUpdateResult| sender.send(update.update_id).unwrap();
let index = match database.open_index(INDEX_NAME) {
Some(index) => index,
None => database.create_index(INDEX_NAME).unwrap(),

View File

@ -11,7 +11,7 @@ use log::{debug, error};
use crate::{store, update, Index, MResult};
pub type BoxUpdateFn = Box<dyn Fn(update::UpdateResult) + Send + Sync + 'static>;
pub type BoxUpdateFn = Box<dyn Fn(update::ProcessedUpdateResult) + Send + Sync + 'static>;
type ArcSwapFn = arc_swap::ArcSwapOption<BoxUpdateFn>;
pub struct Database {

View File

@ -0,0 +1,134 @@
use std::cmp::min;
use std::collections::BTreeMap;
use std::ops::{Index, IndexMut};
// A simple wrapper around vec so we can get contiguous but index it like it's 2D array.
struct N2Array<T> {
y_size: usize,
buf: Vec<T>,
}
impl<T: Clone> N2Array<T> {
fn new(x: usize, y: usize, value: T) -> N2Array<T> {
N2Array {
y_size: y,
buf: vec![value; x * y],
}
}
}
impl<T> Index<(usize, usize)> for N2Array<T> {
type Output = T;
#[inline]
fn index(&self, (x, y): (usize, usize)) -> &T {
&self.buf[(x * self.y_size) + y]
}
}
impl<T> IndexMut<(usize, usize)> for N2Array<T> {
#[inline]
fn index_mut(&mut self, (x, y): (usize, usize)) -> &mut T {
&mut self.buf[(x * self.y_size) + y]
}
}
pub fn prefix_damerau_levenshtein(source: &[u8], target: &[u8]) -> (u32, usize) {
let (n, m) = (source.len(), target.len());
assert!(
n <= m,
"the source string must be shorter than the target one"
);
if n == 0 {
return (m as u32, 0);
}
if m == 0 {
return (n as u32, 0);
}
if n == m && source == target {
return (0, m);
}
let inf = n + m;
let mut matrix = N2Array::new(n + 2, m + 2, 0);
matrix[(0, 0)] = inf;
for i in 0..n + 1 {
matrix[(i + 1, 0)] = inf;
matrix[(i + 1, 1)] = i;
}
for j in 0..m + 1 {
matrix[(0, j + 1)] = inf;
matrix[(1, j + 1)] = j;
}
let mut last_row = BTreeMap::new();
for (row, char_s) in source.iter().enumerate() {
let mut last_match_col = 0;
let row = row + 1;
for (col, char_t) in target.iter().enumerate() {
let col = col + 1;
let last_match_row = *last_row.get(&char_t).unwrap_or(&0);
let cost = if char_s == char_t { 0 } else { 1 };
let dist_add = matrix[(row, col + 1)] + 1;
let dist_del = matrix[(row + 1, col)] + 1;
let dist_sub = matrix[(row, col)] + cost;
let dist_trans = matrix[(last_match_row, last_match_col)]
+ (row - last_match_row - 1)
+ 1
+ (col - last_match_col - 1);
let dist = min(min(dist_add, dist_del), min(dist_sub, dist_trans));
matrix[(row + 1, col + 1)] = dist;
if cost == 0 {
last_match_col = col;
}
}
last_row.insert(char_s, row);
}
let mut minimum = (u32::max_value(), 0);
for x in n..=m {
let dist = matrix[(n + 1, x + 1)] as u32;
if dist < minimum.0 {
minimum = (dist, x)
}
}
minimum
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn matched_length() {
let query = "Levenste";
let text = "Levenshtein";
let (dist, length) = prefix_damerau_levenshtein(query.as_bytes(), text.as_bytes());
assert_eq!(dist, 1);
assert_eq!(&text[..length], "Levenshte");
}
#[test]
#[should_panic]
fn matched_length_panic() {
let query = "Levenshtein";
let text = "Levenste";
// this function will panic if source if longer than target
prefix_damerau_levenshtein(query.as_bytes(), text.as_bytes());
}
}

View File

@ -7,6 +7,7 @@ pub mod criterion;
mod database;
mod distinct_map;
mod error;
mod levenshtein;
mod number;
mod query_builder;
mod ranked_map;
@ -23,7 +24,7 @@ pub use self::number::{Number, ParseNumberError};
pub use self::ranked_map::RankedMap;
pub use self::raw_document::RawDocument;
pub use self::store::Index;
pub use self::update::{UpdateResult, UpdateStatus, UpdateType};
pub use self::update::{EnqueuedUpdateResult, ProcessedUpdateResult, UpdateStatus, UpdateType};
use ::serde::{Deserialize, Serialize};
use zerocopy::{AsBytes, FromBytes};

View File

@ -11,6 +11,7 @@ use slice_group_by::{GroupBy, GroupByMut};
use crate::automaton::{Automaton, AutomatonGroup, AutomatonProducer, QueryEnhancer};
use crate::distinct_map::{BufferedDistinctMap, DistinctMap};
use crate::levenshtein::prefix_damerau_levenshtein;
use crate::raw_document::{raw_documents_from, RawDocument};
use crate::{criterion::Criteria, Document, DocumentId, Highlight, TmpMatch};
use crate::{reordered_attrs::ReorderedAttrs, store, MResult};
@ -162,6 +163,7 @@ fn fetch_raw_documents(
index,
is_exact,
query_len,
query,
..
} = automaton;
let dfa = automaton.dfa();
@ -176,6 +178,12 @@ fn fetch_raw_documents(
let distance = dfa.eval(input).to_u8();
let is_exact = *is_exact && distance == 0 && input.len() == *query_len;
let covered_area = if query.len() > input.len() {
input.len()
} else {
prefix_damerau_levenshtein(query.as_bytes(), input).1
};
let doc_indexes = match postings_lists_store.postings_list(reader, input)? {
Some(doc_indexes) => doc_indexes,
None => continue,
@ -197,7 +205,7 @@ fn fetch_raw_documents(
let highlight = Highlight {
attribute: di.attribute,
char_index: di.char_index,
char_length: u16::try_from(*query_len).unwrap_or(u16::max_value()),
char_length: u16::try_from(covered_area).unwrap_or(u16::max_value()),
};
tmp_matches.push((di.document_id, id, match_, highlight));

View File

@ -11,6 +11,7 @@ type Word = Vec<u8>; // TODO make it be a SmallVec
pub struct RawIndexer {
word_limit: usize, // the maximum number of indexed words
stop_words: fst::Set,
words_doc_indexes: BTreeMap<Word, Vec<DocIndex>>,
docs_words: HashMap<DocumentId, Vec<Word>>,
}
@ -21,13 +22,14 @@ pub struct Indexed {
}
impl RawIndexer {
pub fn new() -> RawIndexer {
RawIndexer::with_word_limit(1000)
pub fn new(stop_words: fst::Set) -> RawIndexer {
RawIndexer::with_word_limit(stop_words, 1000)
}
pub fn with_word_limit(limit: usize) -> RawIndexer {
pub fn with_word_limit(stop_words: fst::Set, limit: usize) -> RawIndexer {
RawIndexer {
word_limit: limit,
stop_words,
words_doc_indexes: BTreeMap::new(),
docs_words: HashMap::new(),
}
@ -56,6 +58,7 @@ impl RawIndexer {
id,
attr,
self.word_limit,
&self.stop_words,
&mut self.words_doc_indexes,
&mut self.docs_words,
);
@ -87,6 +90,7 @@ impl RawIndexer {
id,
attr,
self.word_limit,
&self.stop_words,
&mut self.words_doc_indexes,
&mut self.docs_words,
);
@ -118,6 +122,7 @@ impl RawIndexer {
id,
attr,
self.word_limit,
&self.stop_words,
&mut self.words_doc_indexes,
&mut self.docs_words,
);
@ -152,17 +157,12 @@ impl RawIndexer {
}
}
impl Default for RawIndexer {
fn default() -> Self {
Self::new()
}
}
fn index_token(
token: Token,
id: DocumentId,
attr: SchemaAttr,
word_limit: usize,
stop_words: &fst::Set,
words_doc_indexes: &mut BTreeMap<Word, Vec<DocIndex>>,
docs_words: &mut HashMap<DocumentId, Vec<Word>>,
) -> bool {
@ -170,16 +170,18 @@ fn index_token(
return false;
}
match token_to_docindex(id, attr, token) {
Some(docindex) => {
let word = Vec::from(token.word);
words_doc_indexes
.entry(word.clone())
.or_insert_with(Vec::new)
.push(docindex);
docs_words.entry(id).or_insert_with(Vec::new).push(word);
if !stop_words.contains(&token.word) {
match token_to_docindex(id, attr, token) {
Some(docindex) => {
let word = Vec::from(token.word);
words_doc_indexes
.entry(word.clone())
.or_insert_with(Vec::new)
.push(docindex);
docs_words.entry(id).or_insert_with(Vec::new).push(word);
}
None => return false,
}
None => return false,
}
true
@ -207,7 +209,7 @@ mod tests {
#[test]
fn strange_apostrophe() {
let mut indexer = RawIndexer::new();
let mut indexer = RawIndexer::new(fst::Set::default());
let docid = DocumentId(0);
let attr = SchemaAttr(0);
@ -231,7 +233,7 @@ mod tests {
#[test]
fn strange_apostrophe_in_sequence() {
let mut indexer = RawIndexer::new();
let mut indexer = RawIndexer::new(fst::Set::default());
let docid = DocumentId(0);
let attr = SchemaAttr(0);
@ -252,4 +254,33 @@ mod tests {
.get(&"l’éteindre".to_owned().into_bytes())
.is_some());
}
#[test]
fn basic_stop_words() {
let stop_words = sdset::SetBuf::from_dirty(vec!["l", "j", "ai", "de"]);
let stop_words = fst::Set::from_iter(stop_words).unwrap();
let mut indexer = RawIndexer::new(stop_words);
let docid = DocumentId(0);
let attr = SchemaAttr(0);
let text = "Zut, l’aspirateur, j’ai oublié de l’éteindre !";
indexer.index_text(docid, attr, text);
let Indexed {
words_doc_indexes, ..
} = indexer.build();
assert!(words_doc_indexes.get(&b"l"[..]).is_none());
assert!(words_doc_indexes.get(&b"aspirateur"[..]).is_some());
assert!(words_doc_indexes.get(&b"j"[..]).is_none());
assert!(words_doc_indexes.get(&b"ai"[..]).is_none());
assert!(words_doc_indexes.get(&b"de"[..]).is_none());
assert!(words_doc_indexes.get(&b"eteindre"[..]).is_some());
// with the ugly apostrophe...
assert!(words_doc_indexes
.get(&"l’éteindre".to_owned().into_bytes())
.is_some());
}
}

View File

@ -9,6 +9,7 @@ const NUMBER_OF_DOCUMENTS_KEY: &str = "number-of-documents";
const RANKED_MAP_KEY: &str = "ranked-map";
const SCHEMA_KEY: &str = "schema";
const SYNONYMS_KEY: &str = "synonyms";
const STOP_WORDS_KEY: &str = "stop-words";
const WORDS_KEY: &str = "words";
#[derive(Copy, Clone)]
@ -71,6 +72,24 @@ impl Main {
}
}
pub fn put_stop_words_fst(self, writer: &mut heed::RwTxn, fst: &fst::Set) -> ZResult<()> {
let bytes = fst.as_fst().as_bytes();
self.main
.put::<Str, ByteSlice>(writer, STOP_WORDS_KEY, bytes)
}
pub fn stop_words_fst(self, reader: &heed::RoTxn) -> ZResult<Option<fst::Set>> {
match self.main.get::<Str, ByteSlice>(reader, STOP_WORDS_KEY)? {
Some(bytes) => {
let len = bytes.len();
let bytes = Arc::from(bytes);
let fst = fst::raw::Fst::from_shared_bytes(bytes, 0, len).unwrap();
Ok(Some(fst::Set::from(fst)))
}
None => Ok(None),
}
}
pub fn put_number_of_documents<F>(self, writer: &mut heed::RwTxn, f: F) -> ZResult<u64>
where
F: Fn(u64) -> u64,

View File

@ -187,6 +187,22 @@ impl Index {
)
}
pub fn stop_words_addition(&self) -> update::StopWordsAddition {
update::StopWordsAddition::new(
self.updates,
self.updates_results,
self.updates_notifier.clone(),
)
}
pub fn stop_words_deletion(&self) -> update::StopWordsDeletion {
update::StopWordsDeletion::new(
self.updates,
self.updates_results,
self.updates_notifier.clone(),
)
}
pub fn current_update_id(&self, reader: &heed::RoTxn) -> MResult<Option<u64>> {
match self.updates.last_update_id(reader)? {
Some((id, _)) => Ok(Some(id)),
@ -203,17 +219,29 @@ impl Index {
}
pub fn all_updates_status(&self, reader: &heed::RoTxn) -> MResult<Vec<update::UpdateStatus>> {
match self.updates_results.last_update_id(reader)? {
Some((last_id, _)) => {
let mut updates = Vec::with_capacity(last_id as usize + 1);
for id in 0..=last_id {
let update = self.update_status(reader, id)?;
updates.push(update);
}
Ok(updates)
let mut updates = Vec::new();
let mut last_update_result_id = 0;
// retrieve all updates results
if let Some((last_id, _)) = self.updates_results.last_update_id(reader)? {
updates.reserve(last_id as usize);
for id in 0..=last_id {
let update = self.update_status(reader, id)?;
updates.push(update);
last_update_result_id = id;
}
None => Ok(Vec::new()),
}
// retrieve all enqueued updates
if let Some((last_id, _)) = self.updates.last_update_id(reader)? {
for id in last_update_result_id + 1..last_id {
let update = self.update_status(reader, id)?;
updates.push(update);
}
}
Ok(updates)
}
pub fn query_builder(&self) -> QueryBuilder {

View File

@ -26,9 +26,9 @@ impl Updates {
}
// TODO do not trigger deserialize if possible
pub fn contains(self, reader: &heed::RoTxn, update_id: u64) -> ZResult<bool> {
pub fn get(self, reader: &heed::RoTxn, update_id: u64) -> ZResult<Option<Update>> {
let update_id = BEU64::new(update_id);
self.updates.get(reader, &update_id).map(|v| v.is_some())
self.updates.get(reader, &update_id)
}
pub fn put_update(

View File

@ -1,15 +1,19 @@
use super::BEU64;
use crate::update::UpdateResult;
use crate::update::ProcessedUpdateResult;
use heed::types::{OwnedType, SerdeBincode};
use heed::Result as ZResult;
#[derive(Copy, Clone)]
pub struct UpdatesResults {
pub(crate) updates_results: heed::Database<OwnedType<BEU64>, SerdeBincode<UpdateResult>>,
pub(crate) updates_results:
heed::Database<OwnedType<BEU64>, SerdeBincode<ProcessedUpdateResult>>,
}
impl UpdatesResults {
pub fn last_update_id(self, reader: &heed::RoTxn) -> ZResult<Option<(u64, UpdateResult)>> {
pub fn last_update_id(
self,
reader: &heed::RoTxn,
) -> ZResult<Option<(u64, ProcessedUpdateResult)>> {
match self.updates_results.last(reader)? {
Some((key, data)) => Ok(Some((key.get(), data))),
None => Ok(None),
@ -20,7 +24,7 @@ impl UpdatesResults {
self,
writer: &mut heed::RwTxn,
update_id: u64,
update_result: &UpdateResult,
update_result: &ProcessedUpdateResult,
) -> ZResult<()> {
let update_id = BEU64::new(update_id);
self.updates_results.put(writer, &update_id, update_result)
@ -30,7 +34,7 @@ impl UpdatesResults {
self,
reader: &heed::RoTxn,
update_id: u64,
) -> ZResult<Option<UpdateResult>> {
) -> ZResult<Option<ProcessedUpdateResult>> {
let update_id = BEU64::new(update_id);
self.updates_results.get(reader, &update_id)
}

View File

@ -87,7 +87,6 @@ pub fn apply_documents_addition(
addition: Vec<serde_json::Value>,
) -> MResult<()> {
let mut documents_additions = HashMap::new();
let mut indexer = RawIndexer::new();
let schema = match main_store.schema(writer)? {
Some(schema) => schema,
@ -124,7 +123,14 @@ pub fn apply_documents_addition(
None => RankedMap::default(),
};
let stop_words = match main_store.stop_words_fst(writer)? {
Some(stop_words) => stop_words,
None => fst::Set::default(),
};
// 3. index the documents fields in the stores
let mut indexer = RawIndexer::new(stop_words);
for (document_id, document) in documents_additions {
let serializer = Serializer {
txn: writer,
@ -144,7 +150,7 @@ pub fn apply_documents_addition(
main_store,
postings_lists_store,
docs_words_store,
ranked_map,
&ranked_map,
number_of_inserted_documents,
indexer,
)
@ -173,49 +179,58 @@ pub fn reindex_all_documents(
}
// 2. remove the documents posting lists
let number_of_inserted_documents = documents_ids_to_reindex.len();
main_store.put_words_fst(writer, &fst::Set::default())?;
main_store.put_ranked_map(writer, &ranked_map)?;
main_store.put_number_of_documents(writer, |_| 0)?;
postings_lists_store.clear(writer)?;
docs_words_store.clear(writer)?;
// 3. re-index one document by one document (otherwise we make the borrow checker unhappy)
let mut indexer = RawIndexer::new();
let mut ram_store = HashMap::new();
// 3. re-index chunks of documents (otherwise we make the borrow checker unhappy)
for documents_ids in documents_ids_to_reindex.chunks(100) {
let stop_words = match main_store.stop_words_fst(writer)? {
Some(stop_words) => stop_words,
None => fst::Set::default(),
};
for document_id in documents_ids_to_reindex {
for result in documents_fields_store.document_fields(writer, document_id)? {
let (attr, bytes) = result?;
let value: serde_json::Value = serde_json::from_slice(bytes)?;
ram_store.insert((document_id, attr), value);
let number_of_inserted_documents = documents_ids.len();
let mut indexer = RawIndexer::new(stop_words);
let mut ram_store = HashMap::new();
for document_id in documents_ids {
for result in documents_fields_store.document_fields(writer, *document_id)? {
let (attr, bytes) = result?;
let value: serde_json::Value = serde_json::from_slice(bytes)?;
ram_store.insert((document_id, attr), value);
}
for ((docid, attr), value) in ram_store.drain() {
serialize_value(
writer,
attr,
schema.props(attr),
*docid,
documents_fields_store,
documents_fields_counts_store,
&mut indexer,
&mut ranked_map,
&value,
)?;
}
}
for ((docid, attr), value) in ram_store.drain() {
serialize_value(
writer,
attr,
schema.props(attr),
docid,
documents_fields_store,
documents_fields_counts_store,
&mut indexer,
&mut ranked_map,
&value,
)?;
}
// 4. write the new index in the main store
write_documents_addition_index(
writer,
main_store,
postings_lists_store,
docs_words_store,
&ranked_map,
number_of_inserted_documents,
indexer,
)?;
}
// 4. write the new index in the main store
write_documents_addition_index(
writer,
main_store,
postings_lists_store,
docs_words_store,
ranked_map,
number_of_inserted_documents,
indexer,
)
Ok(())
}
pub fn write_documents_addition_index(
@ -223,7 +238,7 @@ pub fn write_documents_addition_index(
main_store: store::Main,
postings_lists_store: store::PostingsLists,
docs_words_store: store::DocsWords,
ranked_map: RankedMap,
ranked_map: &RankedMap,
number_of_inserted_documents: usize,
indexer: RawIndexer,
) -> MResult<()> {
@ -268,7 +283,7 @@ pub fn write_documents_addition_index(
};
main_store.put_words_fst(writer, &words)?;
main_store.put_ranked_map(writer, &ranked_map)?;
main_store.put_ranked_map(writer, ranked_map)?;
main_store.put_number_of_documents(writer, |old| old + number_of_inserted_documents as u64)?;
Ok(())

View File

@ -3,6 +3,8 @@ mod customs_update;
mod documents_addition;
mod documents_deletion;
mod schema_update;
mod stop_words_addition;
mod stop_words_deletion;
mod synonyms_addition;
mod synonyms_deletion;
@ -11,11 +13,13 @@ pub use self::customs_update::{apply_customs_update, push_customs_update};
pub use self::documents_addition::{apply_documents_addition, DocumentsAddition};
pub use self::documents_deletion::{apply_documents_deletion, DocumentsDeletion};
pub use self::schema_update::{apply_schema_update, push_schema_update};
pub use self::stop_words_addition::{apply_stop_words_addition, StopWordsAddition};
pub use self::stop_words_deletion::{apply_stop_words_deletion, StopWordsDeletion};
pub use self::synonyms_addition::{apply_synonyms_addition, SynonymsAddition};
pub use self::synonyms_deletion::{apply_synonyms_deletion, SynonymsDeletion};
use std::cmp;
use std::collections::BTreeMap;
use std::collections::{BTreeMap, BTreeSet};
use std::time::{Duration, Instant};
use heed::Result as ZResult;
@ -34,6 +38,38 @@ pub enum Update {
DocumentsDeletion(Vec<DocumentId>),
SynonymsAddition(BTreeMap<String, Vec<String>>),
SynonymsDeletion(BTreeMap<String, Option<Vec<String>>>),
StopWordsAddition(BTreeSet<String>),
StopWordsDeletion(BTreeSet<String>),
}
impl Update {
pub fn update_type(&self) -> UpdateType {
match self {
Update::ClearAll => UpdateType::ClearAll,
Update::Schema(schema) => UpdateType::Schema {
schema: schema.clone(),
},
Update::Customs(_) => UpdateType::Customs,
Update::DocumentsAddition(addition) => UpdateType::DocumentsAddition {
number: addition.len(),
},
Update::DocumentsDeletion(deletion) => UpdateType::DocumentsDeletion {
number: deletion.len(),
},
Update::SynonymsAddition(addition) => UpdateType::SynonymsAddition {
number: addition.len(),
},
Update::SynonymsDeletion(deletion) => UpdateType::SynonymsDeletion {
number: deletion.len(),
},
Update::StopWordsAddition(addition) => UpdateType::StopWordsAddition {
number: addition.len(),
},
Update::StopWordsDeletion(deletion) => UpdateType::StopWordsDeletion {
number: deletion.len(),
},
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -45,6 +81,8 @@ pub enum UpdateType {
DocumentsDeletion { number: usize },
SynonymsAddition { number: usize },
SynonymsDeletion { number: usize },
StopWordsAddition { number: usize },
StopWordsDeletion { number: usize },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -53,17 +91,23 @@ pub struct DetailedDuration {
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UpdateResult {
pub struct ProcessedUpdateResult {
pub update_id: u64,
pub update_type: UpdateType,
pub result: Result<(), String>,
pub detailed_duration: DetailedDuration,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EnqueuedUpdateResult {
pub update_id: u64,
pub update_type: UpdateType,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum UpdateStatus {
Enqueued,
Processed(UpdateResult),
Enqueued(EnqueuedUpdateResult),
Processed(ProcessedUpdateResult),
Unknown,
}
@ -76,8 +120,11 @@ pub fn update_status(
match updates_results_store.update_result(reader, update_id)? {
Some(result) => Ok(UpdateStatus::Processed(result)),
None => {
if updates_store.contains(reader, update_id)? {
Ok(UpdateStatus::Enqueued)
if let Some(update) = updates_store.get(reader, update_id)? {
Ok(UpdateStatus::Enqueued(EnqueuedUpdateResult {
update_id,
update_type: update.update_type(),
}))
} else {
Ok(UpdateStatus::Unknown)
}
@ -102,7 +149,10 @@ pub fn next_update_id(
Ok(new_update_id)
}
pub fn update_task(writer: &mut heed::RwTxn, index: store::Index) -> MResult<Option<UpdateResult>> {
pub fn update_task(
writer: &mut heed::RwTxn,
index: store::Index,
) -> MResult<Option<ProcessedUpdateResult>> {
let (update_id, update) = match index.updates.pop_front(writer)? {
Some(value) => value,
None => return Ok(None),
@ -210,6 +260,37 @@ pub fn update_task(writer: &mut heed::RwTxn, index: store::Index) -> MResult<Opt
let result = apply_synonyms_deletion(writer, index.main, index.synonyms, synonyms);
(update_type, result, start.elapsed())
}
Update::StopWordsAddition(stop_words) => {
let start = Instant::now();
let update_type = UpdateType::StopWordsAddition {
number: stop_words.len(),
};
let result =
apply_stop_words_addition(writer, index.main, index.postings_lists, stop_words);
(update_type, result, start.elapsed())
}
Update::StopWordsDeletion(stop_words) => {
let start = Instant::now();
let update_type = UpdateType::StopWordsDeletion {
number: stop_words.len(),
};
let result = apply_stop_words_deletion(
writer,
index.main,
index.documents_fields,
index.documents_fields_counts,
index.postings_lists,
index.docs_words,
stop_words,
);
(update_type, result, start.elapsed())
}
};
@ -220,7 +301,7 @@ pub fn update_task(writer: &mut heed::RwTxn, index: store::Index) -> MResult<Opt
);
let detailed_duration = DetailedDuration { main: duration };
let status = UpdateResult {
let status = ProcessedUpdateResult {
update_id,
update_type,
result: result.map_err(|e| e.to_string()),

View File

@ -0,0 +1,116 @@
use std::collections::BTreeSet;
use fst::{set::OpBuilder, SetBuilder};
use crate::automaton::normalize_str;
use crate::update::{next_update_id, Update};
use crate::{store, MResult};
pub struct StopWordsAddition {
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
updates_notifier: crossbeam_channel::Sender<()>,
stop_words: BTreeSet<String>,
}
impl StopWordsAddition {
pub fn new(
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
updates_notifier: crossbeam_channel::Sender<()>,
) -> StopWordsAddition {
StopWordsAddition {
updates_store,
updates_results_store,
updates_notifier,
stop_words: BTreeSet::new(),
}
}
pub fn add_stop_word<S: AsRef<str>>(&mut self, stop_word: S) {
let stop_word = normalize_str(stop_word.as_ref());
self.stop_words.insert(stop_word);
}
pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult<u64> {
let _ = self.updates_notifier.send(());
let update_id = push_stop_words_addition(
writer,
self.updates_store,
self.updates_results_store,
self.stop_words,
)?;
Ok(update_id)
}
}
pub fn push_stop_words_addition(
writer: &mut heed::RwTxn,
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
addition: BTreeSet<String>,
) -> MResult<u64> {
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
let update = Update::StopWordsAddition(addition);
updates_store.put_update(writer, last_update_id, &update)?;
Ok(last_update_id)
}
pub fn apply_stop_words_addition(
writer: &mut heed::RwTxn,
main_store: store::Main,
postings_lists_store: store::PostingsLists,
addition: BTreeSet<String>,
) -> MResult<()> {
let mut stop_words_builder = SetBuilder::memory();
for word in addition {
stop_words_builder.insert(&word).unwrap();
// we remove every posting list associated to a new stop word
postings_lists_store.del_postings_list(writer, word.as_bytes())?;
}
// create the new delta stop words fst
let delta_stop_words = stop_words_builder
.into_inner()
.and_then(fst::Set::from_bytes)
.unwrap();
// we also need to remove all the stop words from the main fst
if let Some(word_fst) = main_store.words_fst(writer)? {
let op = OpBuilder::new()
.add(&word_fst)
.add(&delta_stop_words)
.difference();
let mut word_fst_builder = SetBuilder::memory();
word_fst_builder.extend_stream(op).unwrap();
let word_fst = word_fst_builder
.into_inner()
.and_then(fst::Set::from_bytes)
.unwrap();
main_store.put_words_fst(writer, &word_fst)?;
}
// now we add all of these stop words from the main store
let stop_words_fst = main_store.stop_words_fst(writer)?.unwrap_or_default();
let op = OpBuilder::new()
.add(&stop_words_fst)
.add(&delta_stop_words)
.r#union();
let mut stop_words_builder = SetBuilder::memory();
stop_words_builder.extend_stream(op).unwrap();
let stop_words_fst = stop_words_builder
.into_inner()
.and_then(fst::Set::from_bytes)
.unwrap();
main_store.put_stop_words_fst(writer, &stop_words_fst)?;
Ok(())
}

View File

@ -0,0 +1,112 @@
use std::collections::BTreeSet;
use fst::{set::OpBuilder, SetBuilder};
use crate::automaton::normalize_str;
use crate::update::documents_addition::reindex_all_documents;
use crate::update::{next_update_id, Update};
use crate::{store, MResult};
pub struct StopWordsDeletion {
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
updates_notifier: crossbeam_channel::Sender<()>,
stop_words: BTreeSet<String>,
}
impl StopWordsDeletion {
pub fn new(
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
updates_notifier: crossbeam_channel::Sender<()>,
) -> StopWordsDeletion {
StopWordsDeletion {
updates_store,
updates_results_store,
updates_notifier,
stop_words: BTreeSet::new(),
}
}
pub fn delete_stop_word<S: AsRef<str>>(&mut self, stop_word: S) {
let stop_word = normalize_str(stop_word.as_ref());
self.stop_words.insert(stop_word);
}
pub fn finalize(self, writer: &mut heed::RwTxn) -> MResult<u64> {
let _ = self.updates_notifier.send(());
let update_id = push_stop_words_deletion(
writer,
self.updates_store,
self.updates_results_store,
self.stop_words,
)?;
Ok(update_id)
}
}
pub fn push_stop_words_deletion(
writer: &mut heed::RwTxn,
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
deletion: BTreeSet<String>,
) -> MResult<u64> {
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
let update = Update::StopWordsDeletion(deletion);
updates_store.put_update(writer, last_update_id, &update)?;
Ok(last_update_id)
}
pub fn apply_stop_words_deletion(
writer: &mut heed::RwTxn,
main_store: store::Main,
documents_fields_store: store::DocumentsFields,
documents_fields_counts_store: store::DocumentsFieldsCounts,
postings_lists_store: store::PostingsLists,
docs_words_store: store::DocsWords,
deletion: BTreeSet<String>,
) -> MResult<()> {
let mut stop_words_builder = SetBuilder::memory();
for word in deletion {
stop_words_builder.insert(&word).unwrap();
}
// create the new delta stop words fst
let delta_stop_words = stop_words_builder
.into_inner()
.and_then(fst::Set::from_bytes)
.unwrap();
// now we delete all of these stop words from the main store
let stop_words_fst = main_store.stop_words_fst(writer)?.unwrap_or_default();
let op = OpBuilder::new()
.add(&stop_words_fst)
.add(&delta_stop_words)
.difference();
let mut stop_words_builder = SetBuilder::memory();
stop_words_builder.extend_stream(op).unwrap();
let stop_words_fst = stop_words_builder
.into_inner()
.and_then(fst::Set::from_bytes)
.unwrap();
main_store.put_stop_words_fst(writer, &stop_words_fst)?;
// now that we have setup the stop words
// lets reindex everything...
reindex_all_documents(
writer,
main_store,
documents_fields_store,
documents_fields_counts_store,
postings_lists_store,
docs_words_store,
)?;
Ok(())
}

View File

@ -1,6 +1,6 @@
[package]
name = "meilidb-schema"
version = "0.1.0"
version = "0.5.11"
authors = ["Kerollmops <renault.cle@gmail.com>"]
edition = "2018"

View File

@ -1,6 +1,6 @@
[package]
name = "meilidb-tokenizer"
version = "0.1.0"
version = "0.5.11"
authors = ["Kerollmops <renault.cle@gmail.com>"]
edition = "2018"