introduce a new settings update system

This commit is contained in:
qdequele
2020-01-08 14:17:38 +01:00
parent 203c83bdb4
commit 2ee90a891c
18 changed files with 835 additions and 663 deletions

View File

@@ -353,10 +353,12 @@ impl Database {
#[cfg(test)]
mod tests {
use super::*;
use crate::criterion::{self, CriteriaBuilder};
use crate::update::{ProcessedUpdateResult, UpdateStatus};
use crate::settings::Settings;
use crate::{Document, DocumentId};
use serde::de::IgnoredAny;
use std::sync::mpsc;
@@ -376,23 +378,20 @@ mod tests {
database.set_update_callback(Box::new(update_fn));
let schema = {
let settings = {
let data = r#"
identifier = "id"
[attributes."name"]
displayed = true
indexed = true
[attributes."description"]
displayed = true
indexed = true
{
"attribute_identifier": "id",
"attributes_searchable": ["name", "description"],
"attributes_displayed": ["name", "description"]
}
"#;
toml::from_str(data).unwrap()
let settings: Settings = serde_json::from_str(data).unwrap();
settings.into()
};
let mut update_writer = db.update_write_txn().unwrap();
let _update_id = index.schema_update(&mut update_writer, schema).unwrap();
let _update_id = index.settings_update(&mut update_writer, settings).unwrap();
update_writer.commit().unwrap();
let mut additions = index.documents_addition();
@@ -439,23 +438,20 @@ mod tests {
database.set_update_callback(Box::new(update_fn));
let schema = {
let settings = {
let data = r#"
identifier = "id"
[attributes."name"]
displayed = true
indexed = true
[attributes."description"]
displayed = true
indexed = true
{
"attribute_identifier": "id",
"attributes_searchable": ["name", "description"],
"attributes_displayed": ["name", "description"]
}
"#;
toml::from_str(data).unwrap()
let settings: Settings = serde_json::from_str(data).unwrap();
settings.into()
};
let mut update_writer = db.update_write_txn().unwrap();
let _update_id = index.schema_update(&mut update_writer, schema).unwrap();
let _update_id = index.settings_update(&mut update_writer, settings).unwrap();
update_writer.commit().unwrap();
let mut additions = index.documents_addition();
@@ -501,19 +497,20 @@ mod tests {
database.set_update_callback(Box::new(update_fn));
let schema = {
let settings = {
let data = r#"
identifier = "id"
[attributes."name"]
displayed = true
indexed = true
{
"attribute_identifier": "id",
"attributes_searchable": ["name"],
"attributes_displayed": ["name"]
}
"#;
toml::from_str(data).unwrap()
let settings: Settings = serde_json::from_str(data).unwrap();
settings.into()
};
let mut update_writer = db.update_write_txn().unwrap();
let _update_id = index.schema_update(&mut update_writer, schema).unwrap();
let _update_id = index.settings_update(&mut update_writer, settings).unwrap();
update_writer.commit().unwrap();
let mut additions = index.documents_addition();
@@ -552,23 +549,20 @@ mod tests {
database.set_update_callback(Box::new(update_fn));
let schema = {
let settings = {
let data = r#"
identifier = "id"
[attributes."name"]
displayed = true
indexed = true
[attributes."description"]
displayed = true
indexed = true
{
"attribute_identifier": "id",
"attributes_searchable": ["name", "description"],
"attributes_displayed": ["name", "description"]
}
"#;
toml::from_str(data).unwrap()
let settings: Settings = serde_json::from_str(data).unwrap();
settings.into()
};
let mut update_writer = db.update_write_txn().unwrap();
let _update_id = index.schema_update(&mut update_writer, schema).unwrap();
let _update_id = index.settings_update(&mut update_writer, settings).unwrap();
update_writer.commit().unwrap();
let mut additions = index.documents_addition();
@@ -592,31 +586,21 @@ mod tests {
let _update_id = additions.finalize(&mut update_writer).unwrap();
update_writer.commit().unwrap();
let schema = {
let settings = {
let data = r#"
identifier = "id"
[attributes."name"]
displayed = true
indexed = true
[attributes."description"]
displayed = true
indexed = true
[attributes."age"]
displayed = true
indexed = true
[attributes."sex"]
displayed = true
indexed = true
{
"attribute_identifier": "id",
"attributes_searchable": ["name", "description", "age", "sex"],
"attributes_displayed": ["name", "description", "age", "sex"]
}
"#;
toml::from_str(data).unwrap()
let settings: Settings = serde_json::from_str(data).unwrap();
settings.into()
};
let mut writer = db.update_write_txn().unwrap();
let update_id = index.schema_update(&mut writer, schema).unwrap();
let update_id = index.settings_update(&mut writer, settings).unwrap();
writer.commit().unwrap();
// block until the transaction is processed
@@ -670,44 +654,28 @@ mod tests {
reader.abort();
// try to introduce attributes in the middle of the schema
let schema = {
let settings = {
let data = r#"
identifier = "id"
[attributes."name"]
displayed = true
indexed = true
[attributes."description"]
displayed = true
indexed = true
[attributes."city"]
displayed = true
indexed = true
[attributes."age"]
displayed = true
indexed = true
[attributes."sex"]
displayed = true
indexed = true
{
"attribute_identifier": "id",
"attributes_searchable": ["name", "description", "city", "age", "sex"],
"attributes_displayed": ["name", "description", "city", "age", "sex"]
}
"#;
toml::from_str(data).unwrap()
let settings: Settings = serde_json::from_str(data).unwrap();
settings.into()
};
let mut writer = db.update_write_txn().unwrap();
let update_id = index.schema_update(&mut writer, schema).unwrap();
let update_id = index.settings_update(&mut writer, settings).unwrap();
writer.commit().unwrap();
// block until the transaction is processed
let _ = receiver.iter().find(|id| *id == update_id);
// check if it has been accepted
let update_reader = db.update_read_txn().unwrap();
let result = index.update_status(&update_reader, update_id).unwrap();
assert_matches!(result, Some(UpdateStatus::Failed { content }) if content.error.is_some());
assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_none());
}
#[test]
@@ -725,23 +693,20 @@ mod tests {
database.set_update_callback(Box::new(update_fn));
let schema = {
let settings = {
let data = r#"
identifier = "id"
[attributes."name"]
displayed = true
indexed = true
[attributes."description"]
displayed = true
indexed = true
{
"attribute_identifier": "id",
"attributes_searchable": ["name", "description"],
"attributes_displayed": ["name", "description"]
}
"#;
toml::from_str(data).unwrap()
let settings: Settings = serde_json::from_str(data).unwrap();
settings.into()
};
let mut writer = db.update_write_txn().unwrap();
let _update_id = index.schema_update(&mut writer, schema).unwrap();
let _update_id = index.settings_update(&mut writer, settings).unwrap();
writer.commit().unwrap();
let mut additions = index.documents_addition();
@@ -805,26 +770,20 @@ mod tests {
database.set_update_callback(Box::new(update_fn));
let schema = {
let settings = {
let data = r#"
identifier = "id"
[attributes."id"]
displayed = true
[attributes."name"]
displayed = true
indexed = true
[attributes."description"]
displayed = true
indexed = true
{
"attribute_identifier": "id",
"attributes_searchable": ["name", "description"],
"attributes_displayed": ["name", "description", "id"]
}
"#;
toml::from_str(data).unwrap()
let settings: Settings = serde_json::from_str(data).unwrap();
settings.into()
};
let mut writer = db.update_write_txn().unwrap();
let _update_id = index.schema_update(&mut writer, schema).unwrap();
let _update_id = index.settings_update(&mut writer, settings).unwrap();
writer.commit().unwrap();
let mut additions = index.documents_addition();
@@ -947,24 +906,20 @@ mod tests {
database.set_update_callback(Box::new(update_fn));
let schema = {
let settings = {
let data = r#"
identifier = "id"
[attributes."name"]
displayed = true
indexed = true
[attributes."description"]
displayed = true
indexed = true
{
"attribute_identifier": "id",
"attributes_searchable": ["name", "description"],
"attributes_displayed": ["name", "description"]
}
"#;
toml::from_str(data).unwrap()
let settings: Settings = serde_json::from_str(data).unwrap();
settings.into()
};
// add a schema to the index
let mut writer = db.update_write_txn().unwrap();
let _update_id = index.schema_update(&mut writer, schema).unwrap();
let _update_id = index.settings_update(&mut writer, settings).unwrap();
writer.commit().unwrap();
// add documents to the index
@@ -1015,23 +970,21 @@ mod tests {
database.set_update_callback(Box::new(update_fn));
let schema = {
let settings = {
let data = r#"
identifier = "id"
[attributes."name"]
displayed = true
indexed = true
[attributes."release_date"]
displayed = true
ranked = true
{
"attribute_identifier": "id",
"attributes_searchable": ["name", "release_date"],
"attributes_displayed": ["name", "release_date"],
"attributes_ranked": ["release_date"]
}
"#;
toml::from_str(data).unwrap()
let settings: Settings = serde_json::from_str(data).unwrap();
settings.into()
};
let mut writer = db.update_write_txn().unwrap();
let _update_id = index.schema_update(&mut writer, schema).unwrap();
let _update_id = index.settings_update(&mut writer, settings).unwrap();
writer.commit().unwrap();
let mut additions = index.documents_addition();

View File

@@ -16,6 +16,7 @@ mod ranked_map;
mod raw_document;
mod reordered_attrs;
mod update;
mod settings;
pub mod criterion;
pub mod raw_indexer;
pub mod serde;

View File

@@ -0,0 +1,85 @@
use std::collections::{BTreeMap, BTreeSet};
use serde::{Deserialize, Serialize};
#[derive(Default, Clone, Serialize, Deserialize)]
pub struct Settings {
pub ranking_rules: Option<Vec<String>>,
pub ranking_distinct: Option<String>,
pub attribute_identifier: Option<String>,
pub attributes_searchable: Option<Vec<String>>,
pub attributes_displayed: Option<Vec<String>>,
pub attributes_ranked: Option<Vec<String>>,
pub stop_words: Option<BTreeSet<String>>,
pub synonyms: Option<BTreeMap<String, Vec<String>>>,
}
impl Into<SettingsUpdate> for Settings {
fn into(self) -> SettingsUpdate {
let settings = self.clone();
SettingsUpdate {
ranking_rules: settings.ranking_rules.into(),
ranking_distinct: settings.ranking_distinct.into(),
attribute_identifier: settings.attribute_identifier.into(),
attributes_searchable: settings.attributes_searchable.into(),
attributes_displayed: settings.attributes_displayed.into(),
attributes_ranked: settings.attributes_ranked.into(),
stop_words: settings.stop_words.into(),
synonyms: settings.synonyms.into(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum UpdateState<T> {
Update(T),
Add(T),
Delete(T),
Clear,
Nothing,
}
impl <T> From<Option<T>> for UpdateState<T> {
fn from(opt: Option<T>) -> UpdateState<T> {
match opt {
Some(t) => UpdateState::Update(t),
None => UpdateState::Nothing,
}
}
}
impl<T> UpdateState<T> {
pub fn is_changed(&self) -> bool {
match self {
UpdateState::Nothing => false,
_ => true,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SettingsUpdate {
pub ranking_rules: UpdateState<Vec<String>>,
pub ranking_distinct: UpdateState<String>,
pub attribute_identifier: UpdateState<String>,
pub attributes_searchable: UpdateState<Vec<String>>,
pub attributes_displayed: UpdateState<Vec<String>>,
pub attributes_ranked: UpdateState<Vec<String>>,
pub stop_words: UpdateState<BTreeSet<String>>,
pub synonyms: UpdateState<BTreeMap<String, Vec<String>>>,
}
impl Default for SettingsUpdate {
fn default() -> Self {
Self {
ranking_rules: UpdateState::Nothing,
ranking_distinct: UpdateState::Nothing,
attribute_identifier: UpdateState::Nothing,
attributes_searchable: UpdateState::Nothing,
attributes_displayed: UpdateState::Nothing,
attributes_ranked: UpdateState::Nothing,
stop_words: UpdateState::Nothing,
synonyms: UpdateState::Nothing,
}
}
}

View File

@@ -4,18 +4,20 @@ use chrono::{DateTime, Utc};
use heed::types::{ByteSlice, OwnedType, SerdeBincode, Str};
use heed::Result as ZResult;
use meilisearch_schema::Schema;
use std::collections::HashMap;
use std::collections::{HashMap, BTreeMap, BTreeSet};
use std::sync::Arc;
const CREATED_AT_KEY: &str = "created-at";
const RANKING_RULES_KEY: &str = "ranking-rules-key";
const RANKING_DISTINCT_KEY: &str = "ranking-distinct-key";
const STOP_WORDS_KEY: &str = "stop-words-key";
const SYNONYMS_KEY: &str = "synonyms-key";
const CUSTOMS_KEY: &str = "customs-key";
const FIELDS_FREQUENCY_KEY: &str = "fields-frequency";
const NAME_KEY: &str = "name";
const NUMBER_OF_DOCUMENTS_KEY: &str = "number-of-documents";
const RANKED_MAP_KEY: &str = "ranked-map";
const SCHEMA_KEY: &str = "schema";
const STOP_WORDS_KEY: &str = "stop-words";
const SYNONYMS_KEY: &str = "synonyms";
const UPDATED_AT_KEY: &str = "updated-at";
const WORDS_KEY: &str = "words";
@@ -184,6 +186,54 @@ impl Main {
}
}
pub fn ranking_rules<'txn>(&self, reader: &'txn heed::RoTxn<MainT>) -> ZResult<Option<Vec<String>>> {
self.main.get::<_, Str, SerdeBincode<Vec<String>>>(reader, RANKING_RULES_KEY)
}
pub fn put_ranking_rules(self, writer: &mut heed::RwTxn<MainT>, value: Vec<String>) -> ZResult<()> {
self.main.put::<_, Str, SerdeBincode<Vec<String>>>(writer, RANKING_RULES_KEY, &value)
}
pub fn delete_ranking_rules(self, writer: &mut heed::RwTxn<MainT>) -> ZResult<bool> {
self.main.delete::<_, Str>(writer, RANKING_RULES_KEY)
}
pub fn ranking_distinct<'txn>(&self, reader: &'txn heed::RoTxn<MainT>) -> ZResult<Option<String>> {
self.main.get::<_, Str, SerdeBincode<String>>(reader, RANKING_DISTINCT_KEY)
}
pub fn put_ranking_distinct(self, writer: &mut heed::RwTxn<MainT>, value: String) -> ZResult<()> {
self.main.put::<_, Str, SerdeBincode<String>>(writer, RANKING_DISTINCT_KEY, &value)
}
pub fn delete_ranking_distinct(self, writer: &mut heed::RwTxn<MainT>) -> ZResult<bool> {
self.main.delete::<_, Str>(writer, RANKING_DISTINCT_KEY)
}
pub fn stop_words<'txn>(&self, reader: &'txn heed::RoTxn<MainT>) -> ZResult<Option<BTreeSet<String>>> {
self.main.get::<_, Str, SerdeBincode<BTreeSet<String>>>(reader, STOP_WORDS_KEY)
}
pub fn put_stop_words(self, writer: &mut heed::RwTxn<MainT>, value: BTreeSet<String>) -> ZResult<()> {
self.main.put::<_, Str, SerdeBincode<BTreeSet<String>>>(writer, STOP_WORDS_KEY, &value)
}
pub fn delete_stop_words(self, writer: &mut heed::RwTxn<MainT>) -> ZResult<bool> {
self.main.delete::<_, Str>(writer, STOP_WORDS_KEY)
}
pub fn synonyms<'txn>(&self, reader: &'txn heed::RoTxn<MainT>) -> ZResult<Option<BTreeMap<String, Vec<String>>>> {
self.main.get::<_, Str, SerdeBincode<BTreeMap<String, Vec<String>>>>(reader, SYNONYMS_KEY)
}
pub fn put_synonyms(self, writer: &mut heed::RwTxn<MainT>, value: BTreeMap<String, Vec<String>>) -> ZResult<()> {
self.main.put::<_, Str, SerdeBincode<BTreeMap<String, Vec<String>>>>(writer, SYNONYMS_KEY, &value)
}
pub fn delete_synonyms(self, writer: &mut heed::RwTxn<MainT>) -> ZResult<bool> {
self.main.delete::<_, Str>(writer, SYNONYMS_KEY)
}
pub fn put_customs(self, writer: &mut heed::RwTxn<MainT>, customs: &[u8]) -> ZResult<()> {
self.main
.put::<_, Str, ByteSlice>(writer, CUSTOMS_KEY, customs)

View File

@@ -35,8 +35,8 @@ use serde::de::{self, Deserialize};
use zerocopy::{AsBytes, FromBytes};
use crate::criterion::Criteria;
use crate::database::{UpdateEvent, UpdateEventsEmitter};
use crate::database::{MainT, UpdateT};
use crate::database::{UpdateEvent, UpdateEventsEmitter};
use crate::serde::Deserializer;
use crate::{query_builder::QueryBuilder, update, DocIndex, DocumentId, Error, MResult};
@@ -240,16 +240,17 @@ impl Index {
}
}
pub fn schema_update(&self, writer: &mut heed::RwTxn<UpdateT>, schema: Schema) -> MResult<u64> {
let _ = self.updates_notifier.send(UpdateEvent::NewUpdate);
update::push_schema_update(writer, self, schema)
}
pub fn customs_update(&self, writer: &mut heed::RwTxn<UpdateT>, customs: Vec<u8>) -> ZResult<u64> {
let _ = self.updates_notifier.send(UpdateEvent::NewUpdate);
update::push_customs_update(writer, self.updates, self.updates_results, customs)
}
pub fn settings_update(&self, writer: &mut heed::RwTxn<UpdateT>, update: SettingsUpdate) -> ZResult<u64> {
let _ = self.updates_notifier.send(UpdateEvent::NewUpdate);
update::push_settings_update(writer, self.updates, self.updates_results, update)
}
pub fn documents_addition<D>(&self) -> update::DocumentsAddition<D> {
update::DocumentsAddition::new(
self.updates,
@@ -279,22 +280,6 @@ impl Index {
update::push_clear_all(writer, self.updates, self.updates_results)
}
pub fn synonyms_update(&self) -> update::SynonymsUpdate {
update::SynonymsUpdate::new(
self.updates,
self.updates_results,
self.updates_notifier.clone(),
)
}
pub fn stop_words_update(&self) -> update::StopWordsUpdate {
update::StopWordsUpdate::new(
self.updates,
self.updates_results,
self.updates_notifier.clone(),
)
}
pub fn current_update_id(&self, reader: &heed::RoTxn<UpdateT>) -> MResult<Option<u64>> {
match self.updates.last_update(reader)? {
Some((id, _)) => Ok(Some(id)),

View File

@@ -2,9 +2,8 @@ mod clear_all;
mod customs_update;
mod documents_addition;
mod documents_deletion;
mod schema_update;
mod stop_words_update;
mod synonyms_update;
mod settings_update;
pub use self::clear_all::{apply_clear_all, push_clear_all};
pub use self::customs_update::{apply_customs_update, push_customs_update};
@@ -12,12 +11,10 @@ pub use self::documents_addition::{
apply_documents_addition, apply_documents_partial_addition, DocumentsAddition,
};
pub use self::documents_deletion::{apply_documents_deletion, DocumentsDeletion};
pub use self::schema_update::{apply_schema_update, push_schema_update};
pub use self::stop_words_update::{apply_stop_words_update, StopWordsUpdate};
pub use self::synonyms_update::{apply_synonyms_update, SynonymsUpdate};
pub use self::settings_update::{apply_settings_update, push_settings_update};
use std::cmp;
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::collections::HashMap;
use std::time::Instant;
use chrono::{DateTime, Utc};
@@ -29,7 +26,7 @@ use sdset::Set;
use crate::{store, DocumentId, MResult};
use crate::database::{MainT, UpdateT};
use meilisearch_schema::Schema;
use crate::settings::SettingsUpdate;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Update {
@@ -45,13 +42,6 @@ impl Update {
}
}
fn schema(data: Schema) -> Update {
Update {
data: UpdateData::Schema(data),
enqueued_at: Utc::now(),
}
}
fn customs(data: Vec<u8>) -> Update {
Update {
data: UpdateData::Customs(data),
@@ -80,16 +70,9 @@ impl Update {
}
}
fn synonyms_update(data: BTreeMap<String, Vec<String>>) -> Update {
fn settings(data: SettingsUpdate) -> Update {
Update {
data: UpdateData::SynonymsUpdate(data),
enqueued_at: Utc::now(),
}
}
fn stop_words_update(data: BTreeSet<String>) -> Update {
Update {
data: UpdateData::StopWordsUpdate(data),
data: UpdateData::Settings(data),
enqueued_at: Utc::now(),
}
}
@@ -98,20 +81,17 @@ impl Update {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum UpdateData {
ClearAll,
Schema(Schema),
Customs(Vec<u8>),
DocumentsAddition(Vec<HashMap<String, serde_json::Value>>),
DocumentsPartial(Vec<HashMap<String, serde_json::Value>>),
DocumentsDeletion(Vec<DocumentId>),
SynonymsUpdate(BTreeMap<String, Vec<String>>),
StopWordsUpdate(BTreeSet<String>),
Settings(SettingsUpdate)
}
impl UpdateData {
pub fn update_type(&self) -> UpdateType {
match self {
UpdateData::ClearAll => UpdateType::ClearAll,
UpdateData::Schema(_) => UpdateType::Schema,
UpdateData::Customs(_) => UpdateType::Customs,
UpdateData::DocumentsAddition(addition) => UpdateType::DocumentsAddition {
number: addition.len(),
@@ -122,12 +102,7 @@ impl UpdateData {
UpdateData::DocumentsDeletion(deletion) => UpdateType::DocumentsDeletion {
number: deletion.len(),
},
UpdateData::SynonymsUpdate(addition) => UpdateType::SynonymsUpdate {
number: addition.len(),
},
UpdateData::StopWordsUpdate(update) => UpdateType::StopWordsUpdate {
number: update.len(),
},
UpdateData::Settings(update) => UpdateType::Settings(update.clone()),
}
}
}
@@ -136,13 +111,11 @@ impl UpdateData {
#[serde(tag = "name")]
pub enum UpdateType {
ClearAll,
Schema,
Customs,
DocumentsAddition { number: usize },
DocumentsPartial { number: usize },
DocumentsDeletion { number: usize },
SynonymsUpdate { number: usize },
StopWordsUpdate { number: usize },
Settings(SettingsUpdate),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -247,14 +220,6 @@ pub fn update_task<'a, 'b>(
(update_type, result, start.elapsed())
}
UpdateData::Schema(schema) => {
let start = Instant::now();
let update_type = UpdateType::Schema;
let result = apply_schema_update(writer, &schema, index);
(update_type, result, start.elapsed())
}
UpdateData::Customs(customs) => {
let start = Instant::now();
@@ -296,25 +261,16 @@ pub fn update_task<'a, 'b>(
(update_type, result, start.elapsed())
}
UpdateData::SynonymsUpdate(synonyms) => {
UpdateData::Settings(settings) => {
let start = Instant::now();
let update_type = UpdateType::SynonymsUpdate {
number: synonyms.len(),
};
let update_type = UpdateType::Settings(settings.clone());
let result = apply_synonyms_update(writer, index.main, index.synonyms, synonyms);
(update_type, result, start.elapsed())
}
UpdateData::StopWordsUpdate(stop_words) => {
let start = Instant::now();
let update_type = UpdateType::StopWordsUpdate {
number: stop_words.len(),
};
let result = apply_stop_words_deletion(writer, index, stop_words);
let result = apply_settings_update(
writer,
index,
settings,
);
(update_type, result, start.elapsed())
}

View File

@@ -1,64 +1 @@
use meilisearch_schema::{Diff, Schema};
use crate::database::{MainT, UpdateT};
use crate::update::documents_addition::reindex_all_documents;
use crate::update::{next_update_id, Update};
use crate::{error::UnsupportedOperation, store, MResult};
pub fn apply_schema_update(
writer: &mut heed::RwTxn<MainT>,
new_schema: &Schema,
index: &store::Index,
) -> MResult<()> {
use UnsupportedOperation::{
CanOnlyIntroduceNewSchemaAttributesAtEnd, CannotRemoveSchemaAttribute,
CannotReorderSchemaAttribute, CannotUpdateSchemaIdentifier,
};
let mut need_full_reindexing = false;
if let Some(old_schema) = index.main.schema(writer)? {
for diff in meilisearch_schema::diff(&old_schema, new_schema) {
match diff {
Diff::IdentChange { .. } => return Err(CannotUpdateSchemaIdentifier.into()),
Diff::AttrMove { .. } => return Err(CannotReorderSchemaAttribute.into()),
Diff::AttrPropsChange { old, new, .. } => {
if new.indexed != old.indexed {
need_full_reindexing = true;
}
if new.ranked != old.ranked {
need_full_reindexing = true;
}
}
Diff::NewAttr { pos, .. } => {
// new attribute not at the end of the schema
if pos < old_schema.number_of_attributes() {
return Err(CanOnlyIntroduceNewSchemaAttributesAtEnd.into());
}
}
Diff::RemovedAttr { .. } => return Err(CannotRemoveSchemaAttribute.into()),
}
}
}
index.main.put_schema(writer, new_schema)?;
if need_full_reindexing {
reindex_all_documents(writer, index)?
}
Ok(())
}
pub fn push_schema_update(
writer: &mut heed::RwTxn<UpdateT>,
index: &store::Index,
schema: Schema,
) -> MResult<u64> {
let last_update_id = next_update_id(writer, index.updates, index.updates_results)?;
let update = Update::schema(schema);
index.updates.put_update(writer, last_update_id, &update)?;
Ok(last_update_id)
}

View File

@@ -0,0 +1,483 @@
use std::collections::{HashMap, BTreeMap, BTreeSet};
use heed::Result as ZResult;
use fst::{set::OpBuilder, SetBuilder};
use sdset::SetBuf;
use meilisearch_schema::{Schema, SchemaAttr, diff_transposition, generate_schema};
use crate::database::{MainT, UpdateT};
use crate::settings::{UpdateState, SettingsUpdate};
use crate::update::documents_addition::reindex_all_documents;
use crate::update::{next_update_id, Update};
use crate::{store, MResult};
pub fn push_settings_update(
writer: &mut heed::RwTxn<UpdateT>,
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
settings: SettingsUpdate,
) -> ZResult<u64> {
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
let update = Update::settings(settings);
updates_store.put_update(writer, last_update_id, &update)?;
Ok(last_update_id)
}
pub fn apply_settings_update(
writer: &mut heed::RwTxn<MainT>,
index: &store::Index,
settings: SettingsUpdate,
) -> MResult<()> {
let mut must_reindex = false;
let old_schema = index.main.schema(writer)?;
match settings.ranking_rules {
UpdateState::Update(v) => {
index.main.put_ranking_rules(writer, v)?;
},
UpdateState::Clear => {
index.main.delete_ranking_rules(writer)?;
},
_ => (),
}
match settings.ranking_distinct {
UpdateState::Update(v) => {
index.main.put_ranking_distinct(writer, v)?;
},
UpdateState::Clear => {
index.main.delete_ranking_distinct(writer)?;
},
_ => (),
}
let identifier = match settings.attribute_identifier.clone() {
UpdateState::Update(v) => v,
_ => {
old_schema.clone().unwrap().identifier_name().to_owned()
},
};
let attributes_searchable: Vec<String> = match settings.attributes_searchable.clone() {
UpdateState::Update(v) => v,
UpdateState::Clear => Vec::new(),
UpdateState::Nothing => {
match old_schema.clone() {
Some(schema) => {
schema.into_iter()
.filter(|(_, props)| props.is_indexed())
.map(|(name, _)| name)
.collect()
},
None => Vec::new(),
}
},
UpdateState::Add(attrs) => {
let mut old_attrs = match old_schema.clone() {
Some(schema) => {
schema.into_iter()
.filter(|(_, props)| props.is_indexed())
.map(|(name, _)| name)
.collect()
},
None => Vec::new(),
};
for attr in attrs {
if !old_attrs.contains(&attr) {
old_attrs.push(attr);
}
}
old_attrs
},
UpdateState::Delete(attrs) => {
let mut old_attrs = match old_schema.clone() {
Some(schema) => {
schema.into_iter()
.filter(|(_, props)| props.is_indexed())
.map(|(name, _)| name)
.collect()
},
None => Vec::new(),
};
for attr in attrs {
old_attrs.retain(|x| *x == attr)
}
old_attrs
}
};
let attributes_displayed: Vec<String> = match settings.attributes_displayed.clone() {
UpdateState::Update(v) => v,
UpdateState::Clear => Vec::new(),
UpdateState::Nothing => {
match old_schema.clone() {
Some(schema) => {
schema.into_iter()
.filter(|(_, props)| props.is_displayed())
.map(|(name, _)| name)
.collect()
},
None => Vec::new(),
}
},
UpdateState::Add(attrs) => {
let mut old_attrs = match old_schema.clone() {
Some(schema) => {
schema.into_iter()
.filter(|(_, props)| props.is_displayed())
.map(|(name, _)| name)
.collect()
},
None => Vec::new(),
};
for attr in attrs {
if !old_attrs.contains(&attr) {
old_attrs.push(attr);
}
}
old_attrs
},
UpdateState::Delete(attrs) => {
let mut old_attrs = match old_schema.clone() {
Some(schema) => {
schema.into_iter()
.filter(|(_, props)| props.is_displayed())
.map(|(name, _)| name)
.collect()
},
None => Vec::new(),
};
for attr in attrs {
old_attrs.retain(|x| *x == attr)
}
old_attrs
}
};
let attributes_ranked: Vec<String> = match settings.attributes_ranked.clone() {
UpdateState::Update(v) => v,
UpdateState::Clear => Vec::new(),
UpdateState::Nothing => {
match old_schema.clone() {
Some(schema) => {
schema.into_iter()
.filter(|(_, props)| props.is_ranked())
.map(|(name, _)| name)
.collect()
},
None => Vec::new(),
}
},
UpdateState::Add(attrs) => {
let mut old_attrs = match old_schema.clone() {
Some(schema) => {
schema.into_iter()
.filter(|(_, props)| props.is_ranked())
.map(|(name, _)| name)
.collect()
},
None => Vec::new(),
};
for attr in attrs {
if !old_attrs.contains(&attr) {
old_attrs.push(attr);
}
}
old_attrs
},
UpdateState::Delete(attrs) => {
let mut old_attrs = match old_schema.clone() {
Some(schema) => {
schema.into_iter()
.filter(|(_, props)| props.is_ranked())
.map(|(name, _)| name)
.collect()
},
None => Vec::new(),
};
for attr in attrs {
old_attrs.retain(|x| *x == attr)
}
old_attrs
}
};
let new_schema = generate_schema(identifier, attributes_searchable, attributes_displayed, attributes_ranked);
index.main.put_schema(writer, &new_schema)?;
match settings.stop_words {
UpdateState::Update(stop_words) => {
if apply_stop_words_update(writer, index, stop_words)? {
must_reindex = true;
}
},
UpdateState::Clear => {
if apply_stop_words_update(writer, index, BTreeSet::new())? {
must_reindex = true;
}
},
_ => (),
}
match settings.synonyms {
UpdateState::Update(synonyms) => apply_synonyms_update(writer, index, synonyms)?,
UpdateState::Clear => apply_synonyms_update(writer, index, BTreeMap::new())?,
_ => (),
}
let main_store = index.main;
let documents_fields_store = index.documents_fields;
let documents_fields_counts_store = index.documents_fields_counts;
let postings_lists_store = index.postings_lists;
let docs_words_store = index.docs_words;
if settings.attribute_identifier.is_changed() ||
settings.attributes_ranked.is_changed() ||
settings.attributes_searchable.is_changed() ||
settings.attributes_displayed.is_changed()
{
if let Some(old_schema) = old_schema {
rewrite_all_documents(writer, index, &old_schema, &new_schema)?;
must_reindex = true;
}
}
if must_reindex {
reindex_all_documents(
writer,
main_store,
documents_fields_store,
documents_fields_counts_store,
postings_lists_store,
docs_words_store,
)?;
}
Ok(())
}
pub fn apply_stop_words_update(
writer: &mut heed::RwTxn<MainT>,
index: &store::Index,
stop_words: BTreeSet<String>,
) -> MResult<bool> {
let main_store = index.main;
let mut must_reindex = false;
let old_stop_words: BTreeSet<String> = main_store
.stop_words_fst(writer)?
.unwrap_or_default()
.stream()
.into_strs().unwrap().into_iter().collect();
let deletion: BTreeSet<String> = old_stop_words.clone().difference(&stop_words).cloned().collect();
let addition: BTreeSet<String> = stop_words.clone().difference(&old_stop_words).cloned().collect();
if !addition.is_empty() {
apply_stop_words_addition(
writer,
index,
addition
)?;
}
if !deletion.is_empty() {
must_reindex = apply_stop_words_deletion(
writer,
index,
deletion
)?;
}
main_store.put_stop_words(writer, stop_words)?;
Ok(must_reindex)
}
fn apply_stop_words_addition(
writer: &mut heed::RwTxn<MainT>,
index: &store::Index,
addition: BTreeSet<String>,
) -> MResult<()> {
let main_store = index.main;
let postings_lists_store = index.postings_lists;
let mut stop_words_builder = SetBuilder::memory();
for word in addition {
stop_words_builder.insert(&word).unwrap();
// we remove every posting list associated to a new stop word
postings_lists_store.del_postings_list(writer, word.as_bytes())?;
}
// create the new delta stop words fst
let delta_stop_words = stop_words_builder
.into_inner()
.and_then(fst::Set::from_bytes)
.unwrap();
// we also need to remove all the stop words from the main fst
if let Some(word_fst) = main_store.words_fst(writer)? {
let op = OpBuilder::new()
.add(&word_fst)
.add(&delta_stop_words)
.difference();
let mut word_fst_builder = SetBuilder::memory();
word_fst_builder.extend_stream(op).unwrap();
let word_fst = word_fst_builder
.into_inner()
.and_then(fst::Set::from_bytes)
.unwrap();
main_store.put_words_fst(writer, &word_fst)?;
}
// now we add all of these stop words from the main store
let stop_words_fst = main_store.stop_words_fst(writer)?.unwrap_or_default();
let op = OpBuilder::new()
.add(&stop_words_fst)
.add(&delta_stop_words)
.r#union();
let mut stop_words_builder = SetBuilder::memory();
stop_words_builder.extend_stream(op).unwrap();
let stop_words_fst = stop_words_builder
.into_inner()
.and_then(fst::Set::from_bytes)
.unwrap();
main_store.put_stop_words_fst(writer, &stop_words_fst)?;
Ok(())
}
fn apply_stop_words_deletion(
writer: &mut heed::RwTxn<MainT>,
index: &store::Index,
deletion: BTreeSet<String>,
) -> MResult<bool> {
let main_store = index.main;
let mut stop_words_builder = SetBuilder::memory();
for word in deletion {
stop_words_builder.insert(&word).unwrap();
}
// create the new delta stop words fst
let delta_stop_words = stop_words_builder
.into_inner()
.and_then(fst::Set::from_bytes)
.unwrap();
// now we delete all of these stop words from the main store
let stop_words_fst = main_store.stop_words_fst(writer)?.unwrap_or_default();
let op = OpBuilder::new()
.add(&stop_words_fst)
.add(&delta_stop_words)
.difference();
let mut stop_words_builder = SetBuilder::memory();
stop_words_builder.extend_stream(op).unwrap();
let stop_words_fst = stop_words_builder
.into_inner()
.and_then(fst::Set::from_bytes)
.unwrap();
main_store.put_stop_words_fst(writer, &stop_words_fst)?;
// now that we have setup the stop words
// lets reindex everything...
if let Ok(number) = main_store.number_of_documents(writer) {
if number > 0 {
return Ok(true)
}
}
Ok(false)
}
pub fn apply_synonyms_update(
writer: &mut heed::RwTxn<MainT>,
index: &store::Index,
synonyms: BTreeMap<String, Vec<String>>,
) -> MResult<()> {
let main_store = index.main;
let synonyms_store = index.synonyms;
let mut synonyms_builder = SetBuilder::memory();
synonyms_store.clear(writer)?;
for (word, alternatives) in synonyms.clone() {
synonyms_builder.insert(&word).unwrap();
let alternatives = {
let alternatives = SetBuf::from_dirty(alternatives);
let mut alternatives_builder = SetBuilder::memory();
alternatives_builder.extend_iter(alternatives).unwrap();
let bytes = alternatives_builder.into_inner().unwrap();
fst::Set::from_bytes(bytes).unwrap()
};
synonyms_store.put_synonyms(writer, word.as_bytes(), &alternatives)?;
}
let synonyms_set = synonyms_builder
.into_inner()
.and_then(fst::Set::from_bytes)
.unwrap();
main_store.put_synonyms_fst(writer, &synonyms_set)?;
main_store.put_synonyms(writer, synonyms)?;
Ok(())
}
pub fn rewrite_all_documents(
writer: &mut heed::RwTxn<MainT>,
index: &store::Index,
old_schema: &Schema,
new_schema: &Schema,
) -> MResult<()> {
let mut documents_ids_to_reindex = Vec::new();
// Retrieve all documents present on the database
for result in index.documents_fields_counts.documents_ids(writer)? {
let document_id = result?;
documents_ids_to_reindex.push(document_id);
}
let transpotition = diff_transposition(old_schema, new_schema);
// Rewrite all documents one by one
for id in documents_ids_to_reindex {
let mut document: HashMap<SchemaAttr, Vec<u8>> = HashMap::new();
// Retrieve the old document
for item in index.documents_fields.document_fields(writer, id)? {
if let Ok(item) = item {
if let Some(pos) = transpotition[(item.0).0 as usize] {
// Save the current document with the new SchemaAttr
document.insert(SchemaAttr::new(pos), item.1.to_vec());
}
}
}
// Remove the current document
index.documents_fields.del_all_document_fields(writer, id)?;
// Rewrite the new document
// TODO: use cursor to not do memory jump at each call
for (key, value) in document {
index.documents_fields.put_document_field(writer, id, key, &value)?;
}
}
Ok(())
}

View File

@@ -1,218 +0,0 @@
use std::collections::BTreeSet;
use fst::{set::OpBuilder, SetBuilder};
use crate::automaton::normalize_str;
use crate::database::{MainT, UpdateT};
use crate::database::{UpdateEvent, UpdateEventsEmitter};
use crate::update::documents_addition::reindex_all_documents;
use crate::update::{next_update_id, Update};
use crate::{store, MResult};
pub struct StopWordsUpdate {
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
updates_notifier: UpdateEventsEmitter,
stop_words: BTreeSet<String>,
}
impl StopWordsUpdate {
pub fn new(
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
updates_notifier: UpdateEventsEmitter,
) -> StopWordsUpdate {
StopWordsUpdate {
updates_store,
updates_results_store,
updates_notifier,
stop_words: BTreeSet::new(),
}
}
pub fn add_stop_word<S: AsRef<str>>(&mut self, stop_word: S) {
let stop_word = normalize_str(stop_word.as_ref());
self.stop_words.insert(stop_word);
}
pub fn finalize(self, writer: &mut heed::RwTxn<UpdateT>) -> MResult<u64> {
let _ = self.updates_notifier.send(UpdateEvent::NewUpdate);
let update_id = push_stop_words_update(
writer,
self.updates_store,
self.updates_results_store,
self.stop_words,
)?;
Ok(update_id)
}
}
pub fn push_stop_words_update(
writer: &mut heed::RwTxn<UpdateT>,
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
update: BTreeSet<String>,
) -> MResult<u64> {
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
let update = Update::stop_words_update(update);
updates_store.put_update(writer, last_update_id, &update)?;
Ok(last_update_id)
}
pub fn apply_stop_words_update(
writer: &mut heed::RwTxn<MainT>,
main_store: store::Main,
documents_fields_store: store::DocumentsFields,
documents_fields_counts_store: store::DocumentsFieldsCounts,
postings_lists_store: store::PostingsLists,
docs_words_store: store::DocsWords,
stop_words: BTreeSet<String>,
) -> MResult<()> {
let old_stop_words: BTreeSet<String> = main_store
.stop_words_fst(writer)?
.unwrap_or_default()
.stream()
.into_strs().unwrap().into_iter().collect();
let deletion: BTreeSet<String> = old_stop_words.clone().difference(&stop_words).cloned().collect();
let addition: BTreeSet<String> = stop_words.clone().difference(&old_stop_words).cloned().collect();
if !addition.is_empty() {
apply_stop_words_addition(
writer,
main_store,
postings_lists_store,
addition
)?;
}
if !deletion.is_empty() {
apply_stop_words_deletion(
writer,
main_store,
documents_fields_store,
documents_fields_counts_store,
postings_lists_store,
docs_words_store,
deletion
)?;
}
Ok(())
}
fn apply_stop_words_addition(
writer: &mut heed::RwTxn<MainT>,
main_store: store::Main,
postings_lists_store: store::PostingsLists,
addition: BTreeSet<String>,
) -> MResult<()> {
let mut stop_words_builder = SetBuilder::memory();
for word in addition {
stop_words_builder.insert(&word).unwrap();
// we remove every posting list associated to a new stop word
postings_lists_store.del_postings_list(writer, word.as_bytes())?;
}
// create the new delta stop words fst
let delta_stop_words = stop_words_builder
.into_inner()
.and_then(fst::Set::from_bytes)
.unwrap();
// we also need to remove all the stop words from the main fst
if let Some(word_fst) = main_store.words_fst(writer)? {
let op = OpBuilder::new()
.add(&word_fst)
.add(&delta_stop_words)
.difference();
let mut word_fst_builder = SetBuilder::memory();
word_fst_builder.extend_stream(op).unwrap();
let word_fst = word_fst_builder
.into_inner()
.and_then(fst::Set::from_bytes)
.unwrap();
main_store.put_words_fst(writer, &word_fst)?;
}
// now we add all of these stop words from the main store
let stop_words_fst = main_store.stop_words_fst(writer)?.unwrap_or_default();
let op = OpBuilder::new()
.add(&stop_words_fst)
.add(&delta_stop_words)
.r#union();
let mut stop_words_builder = SetBuilder::memory();
stop_words_builder.extend_stream(op).unwrap();
let stop_words_fst = stop_words_builder
.into_inner()
.and_then(fst::Set::from_bytes)
.unwrap();
main_store.put_stop_words_fst(writer, &stop_words_fst)?;
Ok(())
}
fn apply_stop_words_deletion(
writer: &mut heed::RwTxn<MainT>,
main_store: store::Main,
documents_fields_store: store::DocumentsFields,
documents_fields_counts_store: store::DocumentsFieldsCounts,
postings_lists_store: store::PostingsLists,
docs_words_store: store::DocsWords,
deletion: BTreeSet<String>,
) -> MResult<()> {
let mut stop_words_builder = SetBuilder::memory();
for word in deletion {
stop_words_builder.insert(&word).unwrap();
}
// create the new delta stop words fst
let delta_stop_words = stop_words_builder
.into_inner()
.and_then(fst::Set::from_bytes)
.unwrap();
// now we delete all of these stop words from the main store
let stop_words_fst = main_store.stop_words_fst(writer)?.unwrap_or_default();
let op = OpBuilder::new()
.add(&stop_words_fst)
.add(&delta_stop_words)
.difference();
let mut stop_words_builder = SetBuilder::memory();
stop_words_builder.extend_stream(op).unwrap();
let stop_words_fst = stop_words_builder
.into_inner()
.and_then(fst::Set::from_bytes)
.unwrap();
main_store.put_stop_words_fst(writer, &stop_words_fst)?;
// now that we have setup the stop words
// lets reindex everything...
if let Ok(number) = main_store.number_of_documents(writer) {
if number > 0 {
reindex_all_documents(
writer,
main_store,
documents_fields_store,
documents_fields_counts_store,
postings_lists_store,
docs_words_store,
)?;
}
}
Ok(())
}

View File

@@ -1,103 +0,0 @@
use std::collections::BTreeMap;
use fst::SetBuilder;
use sdset::SetBuf;
use crate::database::{MainT, UpdateT};
use crate::automaton::normalize_str;
use crate::database::{UpdateEvent, UpdateEventsEmitter};
use crate::update::{next_update_id, Update};
use crate::{store, MResult};
pub struct SynonymsUpdate {
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
updates_notifier: UpdateEventsEmitter,
synonyms: BTreeMap<String, Vec<String>>,
}
impl SynonymsUpdate {
pub fn new(
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
updates_notifier: UpdateEventsEmitter,
) -> SynonymsUpdate {
SynonymsUpdate {
updates_store,
updates_results_store,
updates_notifier,
synonyms: BTreeMap::new(),
}
}
pub fn add_synonym<S, T, I>(&mut self, synonym: S, alternatives: I)
where
S: AsRef<str>,
T: AsRef<str>,
I: IntoIterator<Item = T>,
{
let synonym = normalize_str(synonym.as_ref());
let alternatives = alternatives.into_iter().map(|s| s.as_ref().to_lowercase());
self.synonyms
.entry(synonym)
.or_insert_with(Vec::new)
.extend(alternatives);
}
pub fn finalize(self, writer: &mut heed::RwTxn<UpdateT>) -> MResult<u64> {
let _ = self.updates_notifier.send(UpdateEvent::NewUpdate);
let update_id = push_synonyms_update(
writer,
self.updates_store,
self.updates_results_store,
self.synonyms,
)?;
Ok(update_id)
}
}
pub fn push_synonyms_update(
writer: &mut heed::RwTxn<UpdateT>,
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
addition: BTreeMap<String, Vec<String>>,
) -> MResult<u64> {
let last_update_id = next_update_id(writer, updates_store, updates_results_store)?;
let update = Update::synonyms_update(addition);
updates_store.put_update(writer, last_update_id, &update)?;
Ok(last_update_id)
}
pub fn apply_synonyms_update(
writer: &mut heed::RwTxn<MainT>,
main_store: store::Main,
synonyms_store: store::Synonyms,
addition: BTreeMap<String, Vec<String>>,
) -> MResult<()> {
let mut synonyms_builder = SetBuilder::memory();
synonyms_store.clear(writer)?;
for (word, alternatives) in addition {
synonyms_builder.insert(&word).unwrap();
let alternatives = {
let alternatives = SetBuf::from_dirty(alternatives);
let mut alternatives_builder = SetBuilder::memory();
alternatives_builder.extend_iter(alternatives).unwrap();
let bytes = alternatives_builder.into_inner().unwrap();
fst::Set::from_bytes(bytes).unwrap()
};
synonyms_store.put_synonyms(writer, word.as_bytes(), &alternatives)?;
}
let synonyms = synonyms_builder
.into_inner()
.and_then(fst::Set::from_bytes)
.unwrap();
main_store.put_synonyms_fst(writer, &synonyms)?;
Ok(())
}