restore settings updates

This commit is contained in:
mpostma
2021-09-24 14:55:57 +02:00
parent dfce44fa3b
commit c32012c44a
8 changed files with 264 additions and 280 deletions

View File

@@ -1,10 +1,7 @@
use crate::index::Index;
use milli::update::UpdateBuilder;
use milli::CompressionType;
use rayon::ThreadPool;
use crate::index_controller::updates::RegisterUpdate;
use crate::index_controller::updates::status::{Failed, Processed, Processing};
use crate::options::IndexerOpts;
pub struct UpdateHandler {
@@ -49,24 +46,4 @@ impl UpdateHandler {
update_builder.chunk_compression_type(self.chunk_compression_type);
update_builder
}
pub fn handle_update(
&self,
index: &Index,
meta: Processing,
) -> Result<Processed, Failed> {
let update_id = meta.id();
let update_builder = self.update_builder(update_id);
let result = match meta.meta() {
RegisterUpdate::DocumentAddition { primary_key, content_uuid, method } => {
index.update_documents(*method, *content_uuid, update_builder, primary_key.as_deref())
}
};
match result {
Ok(result) => Ok(meta.process(result)),
Err(e) => Err(meta.fail(e)),
}
}
}

View File

@@ -8,6 +8,7 @@ use milli::update::{IndexDocumentsMethod, Setting, UpdateBuilder};
use serde::{Deserialize, Serialize, Serializer};
use uuid::Uuid;
use crate::RegisterUpdate;
use crate::index_controller::updates::status::{Failed, Processed, Processing, UpdateResult};
use super::{Index, IndexMeta};
@@ -164,7 +165,27 @@ pub struct Facets {
impl Index {
pub fn handle_update(&self, update: Processing) -> std::result::Result<Processed, Failed> {
self.update_handler.handle_update(self, update)
let update_id = update.id();
let update_builder = self.update_handler.update_builder(update_id);
let result = (|| {
let mut txn = self.write_txn()?;
let result = match update.meta() {
RegisterUpdate::DocumentAddition { primary_key, content_uuid, method } => {
self.update_documents(&mut txn, *method, *content_uuid, update_builder, primary_key.as_deref())
}
RegisterUpdate::Settings(settings) => {
let settings = settings.clone().check();
self.update_settings(&mut txn, &settings, update_builder)
},
};
txn.commit()?;
result
})();
match result {
Ok(result) => Ok(update.process(result)),
Err(e) => Err(update.fail(e)),
}
}
pub fn update_primary_key(&self, primary_key: Option<String>) -> Result<IndexMeta> {
@@ -188,21 +209,7 @@ impl Index {
}
}
pub fn update_documents(
&self,
method: IndexDocumentsMethod,
content_uuid: Uuid,
update_builder: UpdateBuilder,
primary_key: Option<&str>,
) -> Result<UpdateResult> {
let mut txn = self.write_txn()?;
let result = self.update_documents_txn(&mut txn, method, content_uuid, update_builder, primary_key)?;
txn.commit()?;
Ok(result)
}
pub fn update_documents_txn<'a, 'b>(
fn update_documents<'a, 'b>(
&'a self,
txn: &mut heed::RwTxn<'a, 'b>,
method: IndexDocumentsMethod,
@@ -246,86 +253,75 @@ impl Index {
//.map_err(Into::into)
//}
//pub fn update_settings_txn<'a, 'b>(
//&'a self,
//txn: &mut heed::RwTxn<'a, 'b>,
//settings: &Settings<Checked>,
//update_builder: UpdateBuilder,
//) -> Result<UpdateResult> {
//// We must use the write transaction of the update here.
//let mut builder = update_builder.settings(txn, self);
fn update_settings<'a, 'b>(
&'a self,
txn: &mut heed::RwTxn<'a, 'b>,
settings: &Settings<Checked>,
update_builder: UpdateBuilder,
) -> Result<UpdateResult> {
// We must use the write transaction of the update here.
let mut builder = update_builder.settings(txn, self);
//match settings.searchable_attributes {
//Setting::Set(ref names) => builder.set_searchable_fields(names.clone()),
//Setting::Reset => builder.reset_searchable_fields(),
//Setting::NotSet => (),
//}
match settings.searchable_attributes {
Setting::Set(ref names) => builder.set_searchable_fields(names.clone()),
Setting::Reset => builder.reset_searchable_fields(),
Setting::NotSet => (),
}
//match settings.displayed_attributes {
//Setting::Set(ref names) => builder.set_displayed_fields(names.clone()),
//Setting::Reset => builder.reset_displayed_fields(),
//Setting::NotSet => (),
//}
match settings.displayed_attributes {
Setting::Set(ref names) => builder.set_displayed_fields(names.clone()),
Setting::Reset => builder.reset_displayed_fields(),
Setting::NotSet => (),
}
//match settings.filterable_attributes {
//Setting::Set(ref facets) => {
//builder.set_filterable_fields(facets.clone().into_iter().collect())
//}
//Setting::Reset => builder.reset_filterable_fields(),
//Setting::NotSet => (),
//}
match settings.filterable_attributes {
Setting::Set(ref facets) => {
builder.set_filterable_fields(facets.clone().into_iter().collect())
}
Setting::Reset => builder.reset_filterable_fields(),
Setting::NotSet => (),
}
//match settings.sortable_attributes {
//Setting::Set(ref fields) => {
//builder.set_sortable_fields(fields.iter().cloned().collect())
//}
//Setting::Reset => builder.reset_sortable_fields(),
//Setting::NotSet => (),
//}
match settings.sortable_attributes {
Setting::Set(ref fields) => {
builder.set_sortable_fields(fields.iter().cloned().collect())
}
Setting::Reset => builder.reset_sortable_fields(),
Setting::NotSet => (),
}
//match settings.ranking_rules {
//Setting::Set(ref criteria) => builder.set_criteria(criteria.clone()),
//Setting::Reset => builder.reset_criteria(),
//Setting::NotSet => (),
//}
match settings.ranking_rules {
Setting::Set(ref criteria) => builder.set_criteria(criteria.clone()),
Setting::Reset => builder.reset_criteria(),
Setting::NotSet => (),
}
//match settings.stop_words {
//Setting::Set(ref stop_words) => builder.set_stop_words(stop_words.clone()),
//Setting::Reset => builder.reset_stop_words(),
//Setting::NotSet => (),
//}
match settings.stop_words {
Setting::Set(ref stop_words) => builder.set_stop_words(stop_words.clone()),
Setting::Reset => builder.reset_stop_words(),
Setting::NotSet => (),
}
//match settings.synonyms {
//Setting::Set(ref synonyms) => {
//builder.set_synonyms(synonyms.clone().into_iter().collect())
//}
//Setting::Reset => builder.reset_synonyms(),
//Setting::NotSet => (),
//}
match settings.synonyms {
Setting::Set(ref synonyms) => {
builder.set_synonyms(synonyms.clone().into_iter().collect())
}
Setting::Reset => builder.reset_synonyms(),
Setting::NotSet => (),
}
//match settings.distinct_attribute {
//Setting::Set(ref attr) => builder.set_distinct_field(attr.clone()),
//Setting::Reset => builder.reset_distinct_field(),
//Setting::NotSet => (),
//}
match settings.distinct_attribute {
Setting::Set(ref attr) => builder.set_distinct_field(attr.clone()),
Setting::Reset => builder.reset_distinct_field(),
Setting::NotSet => (),
}
//builder.execute(|indexing_step, update_id| {
//debug!("update {}: {:?}", update_id, indexing_step)
//})?;
builder.execute(|indexing_step, update_id| {
debug!("update {}: {:?}", update_id, indexing_step)
})?;
//Ok(UpdateResult::Other)
//}
//pub fn update_settings(
//&self,
//settings: &Settings<Checked>,
//update_builder: UpdateBuilder,
//) -> Result<UpdateResult> {
//let mut txn = self.write_txn()?;
//let result = self.update_settings_txn(&mut txn, settings, update_builder)?;
//txn.commit()?;
//Ok(result)
//}
Ok(UpdateResult::Other)
}
//pub fn delete_documents(
//&self,

View File

@@ -18,7 +18,7 @@ use dump_actor::DumpActorHandle;
pub use dump_actor::{DumpInfo, DumpStatus};
use snapshot::load_snapshot;
use crate::index::{Checked, Document, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings};
use crate::index::{Checked, Document, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings, Unchecked};
use crate::index_controller::index_resolver::create_index_resolver;
use crate::options::IndexerOpts;
use error::Result;
@@ -95,6 +95,7 @@ pub struct Stats {
#[derive(derivative::Derivative)]
#[derivative(Debug)]
pub enum Update {
Settings(Settings<Unchecked>),
DocumentAddition {
#[derivative(Debug="ignore")]
payload: Payload,
@@ -242,8 +243,8 @@ impl IndexController {
IndexControllerBuilder::default()
}
pub async fn register_update(&self, uid: &str, update: Update) -> Result<UpdateStatus> {
match self.index_resolver.get_uuid(uid.to_string()).await {
pub async fn register_update(&self, uid: String, update: Update) -> Result<UpdateStatus> {
match self.index_resolver.get_uuid(uid).await {
Ok(uuid) => {
let update_result = UpdateMsg::update(&self.update_handle, uuid, update).await?;
Ok(update_result)

View File

@@ -24,6 +24,7 @@ use uuid::Uuid;
use self::error::{Result, UpdateLoopError};
pub use self::message::UpdateMsg;
use self::store::{UpdateStore, UpdateStoreInfo};
use crate::index::{Settings, Unchecked};
use crate::index_controller::update_file_store::UpdateFileStore;
use status::UpdateStatus;
@@ -53,6 +54,7 @@ pub enum RegisterUpdate {
method: IndexDocumentsMethod,
content_uuid: Uuid,
},
Settings(Settings<Unchecked>),
}
/// A wrapper type to implement read on a `Stream<Result<Bytes, Error>>`.
@@ -207,6 +209,7 @@ impl UpdateLoop {
content_uuid,
}
}
Update::Settings(settings) => RegisterUpdate::Settings(settings),
};
let store = self.store.clone();