mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-07-18 20:30:47 +00:00
Compare commits
8 Commits
latest
...
post-updat
Author | SHA1 | Date | |
---|---|---|---|
dfca4b6219 | |||
8d9eb2a7c4 | |||
44586e089d | |||
c8b7822d0d | |||
446b9c142c | |||
e755e25847 | |||
81419935f2 | |||
51acd7a381 |
@ -305,6 +305,7 @@ pub(crate) mod test {
|
|||||||
localized_attributes: Setting::NotSet,
|
localized_attributes: Setting::NotSet,
|
||||||
facet_search: Setting::NotSet,
|
facet_search: Setting::NotSet,
|
||||||
prefix_search: Setting::NotSet,
|
prefix_search: Setting::NotSet,
|
||||||
|
execute_after_update: Setting::NotSet,
|
||||||
_kind: std::marker::PhantomData,
|
_kind: std::marker::PhantomData,
|
||||||
};
|
};
|
||||||
settings.check()
|
settings.check()
|
||||||
|
@ -397,6 +397,7 @@ impl<T> From<v5::Settings<T>> for v6::Settings<v6::Unchecked> {
|
|||||||
search_cutoff_ms: v6::Setting::NotSet,
|
search_cutoff_ms: v6::Setting::NotSet,
|
||||||
facet_search: v6::Setting::NotSet,
|
facet_search: v6::Setting::NotSet,
|
||||||
prefix_search: v6::Setting::NotSet,
|
prefix_search: v6::Setting::NotSet,
|
||||||
|
execute_after_update: v6::Setting::NotSet,
|
||||||
_kind: std::marker::PhantomData,
|
_kind: std::marker::PhantomData,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
use std::fmt;
|
use std::fmt;
|
||||||
|
use std::io::ErrorKind;
|
||||||
|
|
||||||
use meilisearch_types::heed::RoTxn;
|
use meilisearch_types::heed::RoTxn;
|
||||||
use meilisearch_types::milli::update::IndexDocumentsMethod;
|
use meilisearch_types::milli::update::IndexDocumentsMethod;
|
||||||
@ -535,7 +536,11 @@ impl IndexScheduler {
|
|||||||
.and_then(|task| task.ok_or(Error::CorruptedTaskQueue))?;
|
.and_then(|task| task.ok_or(Error::CorruptedTaskQueue))?;
|
||||||
|
|
||||||
if let Some(uuid) = task.content_uuid() {
|
if let Some(uuid) = task.content_uuid() {
|
||||||
let content_size = self.queue.file_store.compute_size(uuid)?;
|
let content_size = match self.queue.file_store.compute_size(uuid) {
|
||||||
|
Ok(content_size) => content_size,
|
||||||
|
Err(file_store::Error::IoError(err)) if err.kind() == ErrorKind::NotFound => 0,
|
||||||
|
Err(otherwise) => return Err(otherwise.into()),
|
||||||
|
};
|
||||||
total_size = total_size.saturating_add(content_size);
|
total_size = total_size.saturating_add(content_size);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -312,6 +312,7 @@ InvalidSettingsDisplayedAttributes , InvalidRequest , BAD_REQUEST ;
|
|||||||
InvalidSettingsDistinctAttribute , InvalidRequest , BAD_REQUEST ;
|
InvalidSettingsDistinctAttribute , InvalidRequest , BAD_REQUEST ;
|
||||||
InvalidSettingsProximityPrecision , InvalidRequest , BAD_REQUEST ;
|
InvalidSettingsProximityPrecision , InvalidRequest , BAD_REQUEST ;
|
||||||
InvalidSettingsFacetSearch , InvalidRequest , BAD_REQUEST ;
|
InvalidSettingsFacetSearch , InvalidRequest , BAD_REQUEST ;
|
||||||
|
InvalidSettingsexecuteAfterUpdate , InvalidRequest , BAD_REQUEST ;
|
||||||
InvalidSettingsPrefixSearch , InvalidRequest , BAD_REQUEST ;
|
InvalidSettingsPrefixSearch , InvalidRequest , BAD_REQUEST ;
|
||||||
InvalidSettingsFaceting , InvalidRequest , BAD_REQUEST ;
|
InvalidSettingsFaceting , InvalidRequest , BAD_REQUEST ;
|
||||||
InvalidSettingsFilterableAttributes , InvalidRequest , BAD_REQUEST ;
|
InvalidSettingsFilterableAttributes , InvalidRequest , BAD_REQUEST ;
|
||||||
|
@ -289,6 +289,12 @@ pub struct Settings<T> {
|
|||||||
#[schema(value_type = Option<PrefixSearchSettings>, example = json!("Hemlo"))]
|
#[schema(value_type = Option<PrefixSearchSettings>, example = json!("Hemlo"))]
|
||||||
pub prefix_search: Setting<PrefixSearchSettings>,
|
pub prefix_search: Setting<PrefixSearchSettings>,
|
||||||
|
|
||||||
|
/// Function to execute after an update
|
||||||
|
#[serde(default, skip_serializing_if = "Setting::is_not_set")]
|
||||||
|
#[deserr(default, error = DeserrJsonError<InvalidSettingsexecuteAfterUpdate>)]
|
||||||
|
#[schema(value_type = Option<String>, example = json!("doc.likes += 1"))]
|
||||||
|
pub execute_after_update: Setting<String>,
|
||||||
|
|
||||||
#[serde(skip)]
|
#[serde(skip)]
|
||||||
#[deserr(skip)]
|
#[deserr(skip)]
|
||||||
pub _kind: PhantomData<T>,
|
pub _kind: PhantomData<T>,
|
||||||
@ -354,6 +360,7 @@ impl Settings<Checked> {
|
|||||||
localized_attributes: Setting::Reset,
|
localized_attributes: Setting::Reset,
|
||||||
facet_search: Setting::Reset,
|
facet_search: Setting::Reset,
|
||||||
prefix_search: Setting::Reset,
|
prefix_search: Setting::Reset,
|
||||||
|
execute_after_update: Setting::Reset,
|
||||||
_kind: PhantomData,
|
_kind: PhantomData,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -380,6 +387,7 @@ impl Settings<Checked> {
|
|||||||
localized_attributes: localized_attributes_rules,
|
localized_attributes: localized_attributes_rules,
|
||||||
facet_search,
|
facet_search,
|
||||||
prefix_search,
|
prefix_search,
|
||||||
|
execute_after_update,
|
||||||
_kind,
|
_kind,
|
||||||
} = self;
|
} = self;
|
||||||
|
|
||||||
@ -404,6 +412,7 @@ impl Settings<Checked> {
|
|||||||
localized_attributes: localized_attributes_rules,
|
localized_attributes: localized_attributes_rules,
|
||||||
facet_search,
|
facet_search,
|
||||||
prefix_search,
|
prefix_search,
|
||||||
|
execute_after_update,
|
||||||
_kind: PhantomData,
|
_kind: PhantomData,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -454,6 +463,7 @@ impl Settings<Unchecked> {
|
|||||||
localized_attributes: self.localized_attributes,
|
localized_attributes: self.localized_attributes,
|
||||||
facet_search: self.facet_search,
|
facet_search: self.facet_search,
|
||||||
prefix_search: self.prefix_search,
|
prefix_search: self.prefix_search,
|
||||||
|
execute_after_update: self.execute_after_update,
|
||||||
_kind: PhantomData,
|
_kind: PhantomData,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -530,6 +540,10 @@ impl Settings<Unchecked> {
|
|||||||
},
|
},
|
||||||
prefix_search: other.prefix_search.or(self.prefix_search),
|
prefix_search: other.prefix_search.or(self.prefix_search),
|
||||||
facet_search: other.facet_search.or(self.facet_search),
|
facet_search: other.facet_search.or(self.facet_search),
|
||||||
|
execute_after_update: other
|
||||||
|
.execute_after_update
|
||||||
|
.clone()
|
||||||
|
.or(self.execute_after_update.clone()),
|
||||||
_kind: PhantomData,
|
_kind: PhantomData,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -568,6 +582,7 @@ pub fn apply_settings_to_builder(
|
|||||||
localized_attributes: localized_attributes_rules,
|
localized_attributes: localized_attributes_rules,
|
||||||
facet_search,
|
facet_search,
|
||||||
prefix_search,
|
prefix_search,
|
||||||
|
execute_after_update,
|
||||||
_kind,
|
_kind,
|
||||||
} = settings;
|
} = settings;
|
||||||
|
|
||||||
@ -772,6 +787,14 @@ pub fn apply_settings_to_builder(
|
|||||||
Setting::Reset => builder.reset_facet_search(),
|
Setting::Reset => builder.reset_facet_search(),
|
||||||
Setting::NotSet => (),
|
Setting::NotSet => (),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
match execute_after_update {
|
||||||
|
Setting::Set(execute_after_update) => {
|
||||||
|
builder.set_execute_after_update(execute_after_update.clone())
|
||||||
|
}
|
||||||
|
Setting::Reset => builder.reset_execute_after_update(),
|
||||||
|
Setting::NotSet => (),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub enum SecretPolicy {
|
pub enum SecretPolicy {
|
||||||
@ -867,14 +890,11 @@ pub fn settings(
|
|||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
let embedders = Setting::Set(embedders);
|
let embedders = Setting::Set(embedders);
|
||||||
|
|
||||||
let search_cutoff_ms = index.search_cutoff(rtxn)?;
|
let search_cutoff_ms = index.search_cutoff(rtxn)?;
|
||||||
|
|
||||||
let localized_attributes_rules = index.localized_attributes_rules(rtxn)?;
|
let localized_attributes_rules = index.localized_attributes_rules(rtxn)?;
|
||||||
|
|
||||||
let prefix_search = index.prefix_search(rtxn)?.map(PrefixSearchSettings::from);
|
let prefix_search = index.prefix_search(rtxn)?.map(PrefixSearchSettings::from);
|
||||||
|
|
||||||
let facet_search = index.facet_search(rtxn)?;
|
let facet_search = index.facet_search(rtxn)?;
|
||||||
|
let execute_after_update = index.execute_after_update(rtxn)?;
|
||||||
|
|
||||||
let mut settings = Settings {
|
let mut settings = Settings {
|
||||||
displayed_attributes: match displayed_attributes {
|
displayed_attributes: match displayed_attributes {
|
||||||
@ -914,6 +934,10 @@ pub fn settings(
|
|||||||
},
|
},
|
||||||
prefix_search: Setting::Set(prefix_search.unwrap_or_default()),
|
prefix_search: Setting::Set(prefix_search.unwrap_or_default()),
|
||||||
facet_search: Setting::Set(facet_search),
|
facet_search: Setting::Set(facet_search),
|
||||||
|
execute_after_update: match execute_after_update {
|
||||||
|
Some(function) => Setting::Set(function.to_string()),
|
||||||
|
None => Setting::NotSet,
|
||||||
|
},
|
||||||
_kind: PhantomData,
|
_kind: PhantomData,
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -1141,6 +1165,7 @@ pub(crate) mod test {
|
|||||||
search_cutoff_ms: Setting::NotSet,
|
search_cutoff_ms: Setting::NotSet,
|
||||||
facet_search: Setting::NotSet,
|
facet_search: Setting::NotSet,
|
||||||
prefix_search: Setting::NotSet,
|
prefix_search: Setting::NotSet,
|
||||||
|
execute_after_update: Setting::NotSet,
|
||||||
_kind: PhantomData::<Unchecked>,
|
_kind: PhantomData::<Unchecked>,
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -1172,6 +1197,7 @@ pub(crate) mod test {
|
|||||||
search_cutoff_ms: Setting::NotSet,
|
search_cutoff_ms: Setting::NotSet,
|
||||||
facet_search: Setting::NotSet,
|
facet_search: Setting::NotSet,
|
||||||
prefix_search: Setting::NotSet,
|
prefix_search: Setting::NotSet,
|
||||||
|
execute_after_update: Setting::NotSet,
|
||||||
_kind: PhantomData::<Unchecked>,
|
_kind: PhantomData::<Unchecked>,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -497,6 +497,17 @@ make_setting_routes!(
|
|||||||
camelcase_attr: "facetSearch",
|
camelcase_attr: "facetSearch",
|
||||||
analytics: FacetSearchAnalytics
|
analytics: FacetSearchAnalytics
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
route: "/execute-after-update",
|
||||||
|
update_verb: put,
|
||||||
|
value_type: String,
|
||||||
|
err_type: meilisearch_types::deserr::DeserrJsonError<
|
||||||
|
meilisearch_types::error::deserr_codes::InvalidSettingsexecuteAfterUpdate,
|
||||||
|
>,
|
||||||
|
attr: execute_after_update,
|
||||||
|
camelcase_attr: "executeAfterUpdate",
|
||||||
|
analytics: ExecuteAfterUpdateAnalytics
|
||||||
|
},
|
||||||
{
|
{
|
||||||
route: "/prefix-search",
|
route: "/prefix-search",
|
||||||
update_verb: put,
|
update_verb: put,
|
||||||
@ -596,6 +607,9 @@ pub async fn update_all(
|
|||||||
new_settings.non_separator_tokens.as_ref().set(),
|
new_settings.non_separator_tokens.as_ref().set(),
|
||||||
),
|
),
|
||||||
facet_search: FacetSearchAnalytics::new(new_settings.facet_search.as_ref().set()),
|
facet_search: FacetSearchAnalytics::new(new_settings.facet_search.as_ref().set()),
|
||||||
|
execute_after_update: ExecuteAfterUpdateAnalytics::new(
|
||||||
|
new_settings.execute_after_update.as_ref().set(),
|
||||||
|
),
|
||||||
prefix_search: PrefixSearchAnalytics::new(new_settings.prefix_search.as_ref().set()),
|
prefix_search: PrefixSearchAnalytics::new(new_settings.prefix_search.as_ref().set()),
|
||||||
},
|
},
|
||||||
&req,
|
&req,
|
||||||
|
@ -39,6 +39,7 @@ pub struct SettingsAnalytics {
|
|||||||
pub non_separator_tokens: NonSeparatorTokensAnalytics,
|
pub non_separator_tokens: NonSeparatorTokensAnalytics,
|
||||||
pub facet_search: FacetSearchAnalytics,
|
pub facet_search: FacetSearchAnalytics,
|
||||||
pub prefix_search: PrefixSearchAnalytics,
|
pub prefix_search: PrefixSearchAnalytics,
|
||||||
|
pub execute_after_update: ExecuteAfterUpdateAnalytics,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Aggregate for SettingsAnalytics {
|
impl Aggregate for SettingsAnalytics {
|
||||||
@ -194,6 +195,9 @@ impl Aggregate for SettingsAnalytics {
|
|||||||
set: new.facet_search.set | self.facet_search.set,
|
set: new.facet_search.set | self.facet_search.set,
|
||||||
value: new.facet_search.value.or(self.facet_search.value),
|
value: new.facet_search.value.or(self.facet_search.value),
|
||||||
},
|
},
|
||||||
|
execute_after_update: ExecuteAfterUpdateAnalytics {
|
||||||
|
set: new.execute_after_update.set | self.execute_after_update.set,
|
||||||
|
},
|
||||||
prefix_search: PrefixSearchAnalytics {
|
prefix_search: PrefixSearchAnalytics {
|
||||||
set: new.prefix_search.set | self.prefix_search.set,
|
set: new.prefix_search.set | self.prefix_search.set,
|
||||||
value: new.prefix_search.value.or(self.prefix_search.value),
|
value: new.prefix_search.value.or(self.prefix_search.value),
|
||||||
@ -659,6 +663,21 @@ impl FacetSearchAnalytics {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Default)]
|
||||||
|
pub struct ExecuteAfterUpdateAnalytics {
|
||||||
|
pub set: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ExecuteAfterUpdateAnalytics {
|
||||||
|
pub fn new(distinct: Option<&String>) -> Self {
|
||||||
|
Self { set: distinct.is_some() }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn into_settings(self) -> SettingsAnalytics {
|
||||||
|
SettingsAnalytics { execute_after_update: self, ..Default::default() }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Default)]
|
#[derive(Serialize, Default)]
|
||||||
pub struct PrefixSearchAnalytics {
|
pub struct PrefixSearchAnalytics {
|
||||||
pub set: bool,
|
pub set: bool,
|
||||||
|
@ -76,6 +76,7 @@ pub mod main_key {
|
|||||||
pub const SEARCH_CUTOFF: &str = "search_cutoff";
|
pub const SEARCH_CUTOFF: &str = "search_cutoff";
|
||||||
pub const LOCALIZED_ATTRIBUTES_RULES: &str = "localized_attributes_rules";
|
pub const LOCALIZED_ATTRIBUTES_RULES: &str = "localized_attributes_rules";
|
||||||
pub const FACET_SEARCH: &str = "facet_search";
|
pub const FACET_SEARCH: &str = "facet_search";
|
||||||
|
pub const EXECUTE_AFTER_UPDATE: &str = "execute-after-update";
|
||||||
pub const PREFIX_SEARCH: &str = "prefix_search";
|
pub const PREFIX_SEARCH: &str = "prefix_search";
|
||||||
pub const DOCUMENTS_STATS: &str = "documents_stats";
|
pub const DOCUMENTS_STATS: &str = "documents_stats";
|
||||||
}
|
}
|
||||||
@ -1623,6 +1624,22 @@ impl Index {
|
|||||||
self.main.remap_key_type::<Str>().delete(txn, main_key::FACET_SEARCH)
|
self.main.remap_key_type::<Str>().delete(txn, main_key::FACET_SEARCH)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn execute_after_update<'t>(&self, txn: &'t RoTxn<'_>) -> heed::Result<Option<&'t str>> {
|
||||||
|
self.main.remap_types::<Str, Str>().get(txn, main_key::EXECUTE_AFTER_UPDATE)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn put_execute_after_update(
|
||||||
|
&self,
|
||||||
|
txn: &mut RwTxn<'_>,
|
||||||
|
val: &str,
|
||||||
|
) -> heed::Result<()> {
|
||||||
|
self.main.remap_types::<Str, Str>().put(txn, main_key::EXECUTE_AFTER_UPDATE, &val)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn delete_execute_after_update(&self, txn: &mut RwTxn<'_>) -> heed::Result<bool> {
|
||||||
|
self.main.remap_key_type::<Str>().delete(txn, main_key::EXECUTE_AFTER_UPDATE)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn localized_attributes_rules(
|
pub fn localized_attributes_rules(
|
||||||
&self,
|
&self,
|
||||||
rtxn: &RoTxn<'_>,
|
rtxn: &RoTxn<'_>,
|
||||||
|
@ -406,6 +406,71 @@ impl<'doc> Versions<'doc> {
|
|||||||
Ok(Some(Self::single(data)))
|
Ok(Some(Self::single(data)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn multiple_with_edits(
|
||||||
|
doc: Option<rhai::Map>,
|
||||||
|
mut versions: impl Iterator<Item = Result<RawMap<'doc, FxBuildHasher>>>,
|
||||||
|
engine: &rhai::Engine,
|
||||||
|
edit_function: &rhai::AST,
|
||||||
|
doc_alloc: &'doc bumpalo::Bump,
|
||||||
|
) -> Result<Option<Option<Self>>> {
|
||||||
|
let Some(data) = versions.next() else { return Ok(None) };
|
||||||
|
|
||||||
|
let mut doc = doc.unwrap_or_default();
|
||||||
|
let mut data = data?;
|
||||||
|
for version in versions {
|
||||||
|
let version = version?;
|
||||||
|
for (field, value) in version {
|
||||||
|
data.insert(field, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut scope = rhai::Scope::new();
|
||||||
|
data.iter().for_each(|(k, v)| {
|
||||||
|
doc.insert(k.into(), serde_json::from_str(v.get()).unwrap());
|
||||||
|
});
|
||||||
|
scope.push("doc", doc.clone());
|
||||||
|
|
||||||
|
let _ = engine.eval_ast_with_scope::<rhai::Dynamic>(&mut scope, edit_function).unwrap();
|
||||||
|
data = RawMap::with_hasher_in(FxBuildHasher, doc_alloc);
|
||||||
|
match scope.get_value::<rhai::Map>("doc") {
|
||||||
|
Some(map) => {
|
||||||
|
for (key, value) in map {
|
||||||
|
let mut vec = bumpalo::collections::Vec::new_in(doc_alloc);
|
||||||
|
serde_json::to_writer(&mut vec, &value).unwrap();
|
||||||
|
let key = doc_alloc.alloc_str(key.as_str());
|
||||||
|
let raw_value = serde_json::from_slice(vec.into_bump_slice()).unwrap();
|
||||||
|
data.insert(key, raw_value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// In case the deletes the document and it's not the last change
|
||||||
|
// we simply set the document to an empty one and await the next change.
|
||||||
|
None => (),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// We must also run the code after the last change
|
||||||
|
let mut scope = rhai::Scope::new();
|
||||||
|
data.iter().for_each(|(k, v)| {
|
||||||
|
doc.insert(k.into(), serde_json::from_str(v.get()).unwrap());
|
||||||
|
});
|
||||||
|
scope.push("doc", doc);
|
||||||
|
|
||||||
|
let _ = engine.eval_ast_with_scope::<rhai::Dynamic>(&mut scope, edit_function).unwrap();
|
||||||
|
data = RawMap::with_hasher_in(FxBuildHasher, doc_alloc);
|
||||||
|
match scope.get_value::<rhai::Map>("doc") {
|
||||||
|
Some(map) => {
|
||||||
|
for (key, value) in map {
|
||||||
|
let mut vec = bumpalo::collections::Vec::new_in(doc_alloc);
|
||||||
|
serde_json::to_writer(&mut vec, &value).unwrap();
|
||||||
|
let key = doc_alloc.alloc_str(key.as_str());
|
||||||
|
let raw_value = serde_json::from_slice(vec.into_bump_slice()).unwrap();
|
||||||
|
data.insert(key, raw_value);
|
||||||
|
}
|
||||||
|
Ok(Some(Some(Self::single(data))))
|
||||||
|
}
|
||||||
|
None => Ok(Some(None)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn single(version: RawMap<'doc, FxBuildHasher>) -> Self {
|
pub fn single(version: RawMap<'doc, FxBuildHasher>) -> Self {
|
||||||
Self { data: version }
|
Self { data: version }
|
||||||
}
|
}
|
||||||
|
@ -17,6 +17,7 @@ use super::guess_primary_key::retrieve_or_guess_primary_key;
|
|||||||
use crate::documents::PrimaryKey;
|
use crate::documents::PrimaryKey;
|
||||||
use crate::progress::{AtomicPayloadStep, Progress};
|
use crate::progress::{AtomicPayloadStep, Progress};
|
||||||
use crate::update::new::document::Versions;
|
use crate::update::new::document::Versions;
|
||||||
|
use crate::update::new::indexer::update_by_function::obkv_to_rhaimap;
|
||||||
use crate::update::new::steps::IndexingStep;
|
use crate::update::new::steps::IndexingStep;
|
||||||
use crate::update::new::thread_local::MostlySend;
|
use crate::update::new::thread_local::MostlySend;
|
||||||
use crate::update::new::{Deletion, Insertion, Update};
|
use crate::update::new::{Deletion, Insertion, Update};
|
||||||
@ -157,7 +158,16 @@ impl<'pl> DocumentOperation<'pl> {
|
|||||||
.sort_unstable_by_key(|(_, po)| first_update_pointer(&po.operations).unwrap_or(0));
|
.sort_unstable_by_key(|(_, po)| first_update_pointer(&po.operations).unwrap_or(0));
|
||||||
|
|
||||||
let docids_version_offsets = docids_version_offsets.into_bump_slice();
|
let docids_version_offsets = docids_version_offsets.into_bump_slice();
|
||||||
Ok((DocumentOperationChanges { docids_version_offsets }, operations_stats, primary_key))
|
let engine = rhai::Engine::new();
|
||||||
|
// Make sure to correctly setup the engine and remove all settings
|
||||||
|
let ast = index.execute_after_update(rtxn)?.map(|f| engine.compile(f).unwrap());
|
||||||
|
let fidmap = index.fields_ids_map(rtxn)?;
|
||||||
|
|
||||||
|
Ok((
|
||||||
|
DocumentOperationChanges { docids_version_offsets, engine, ast, fidmap },
|
||||||
|
operations_stats,
|
||||||
|
primary_key,
|
||||||
|
))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -418,7 +428,15 @@ impl<'pl> DocumentChanges<'pl> for DocumentOperationChanges<'pl> {
|
|||||||
'pl: 'doc,
|
'pl: 'doc,
|
||||||
{
|
{
|
||||||
let (external_doc, payload_operations) = item;
|
let (external_doc, payload_operations) = item;
|
||||||
payload_operations.merge(external_doc, &context.doc_alloc)
|
payload_operations.merge(
|
||||||
|
&context.rtxn,
|
||||||
|
context.index,
|
||||||
|
&self.fidmap,
|
||||||
|
&self.engine,
|
||||||
|
self.ast.as_ref(),
|
||||||
|
external_doc,
|
||||||
|
&context.doc_alloc,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn len(&self) -> usize {
|
fn len(&self) -> usize {
|
||||||
@ -427,6 +445,9 @@ impl<'pl> DocumentChanges<'pl> for DocumentOperationChanges<'pl> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub struct DocumentOperationChanges<'pl> {
|
pub struct DocumentOperationChanges<'pl> {
|
||||||
|
engine: rhai::Engine,
|
||||||
|
ast: Option<rhai::AST>,
|
||||||
|
fidmap: FieldsIdsMap,
|
||||||
docids_version_offsets: &'pl [(&'pl str, PayloadOperations<'pl>)],
|
docids_version_offsets: &'pl [(&'pl str, PayloadOperations<'pl>)],
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -489,10 +510,14 @@ impl<'pl> PayloadOperations<'pl> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Returns only the most recent version of a document based on the updates from the payloads.
|
/// Returns only the most recent version of a document based on the updates from the payloads.
|
||||||
///
|
#[allow(clippy::too_many_arguments)]
|
||||||
/// This function is only meant to be used when doing a replacement and not an update.
|
|
||||||
fn merge<'doc>(
|
fn merge<'doc>(
|
||||||
&self,
|
&self,
|
||||||
|
rtxn: &heed::RoTxn,
|
||||||
|
index: &Index,
|
||||||
|
fidmap: &FieldsIdsMap,
|
||||||
|
engine: &rhai::Engine,
|
||||||
|
ast: Option<&rhai::AST>,
|
||||||
external_doc: &'doc str,
|
external_doc: &'doc str,
|
||||||
doc_alloc: &'doc Bump,
|
doc_alloc: &'doc Bump,
|
||||||
) -> Result<Option<DocumentChange<'doc>>>
|
) -> Result<Option<DocumentChange<'doc>>>
|
||||||
@ -556,9 +581,34 @@ impl<'pl> PayloadOperations<'pl> {
|
|||||||
Ok(document)
|
Ok(document)
|
||||||
});
|
});
|
||||||
|
|
||||||
let Some(versions) = Versions::multiple(versions)? else { return Ok(None) };
|
let versions = match ast {
|
||||||
|
Some(ast) => {
|
||||||
|
let doc = index
|
||||||
|
.documents
|
||||||
|
.get(rtxn, &self.docid)?
|
||||||
|
.map(|obkv| obkv_to_rhaimap(obkv, fidmap))
|
||||||
|
.transpose()?;
|
||||||
|
match Versions::multiple_with_edits(doc, versions, engine, ast, doc_alloc)?
|
||||||
|
{
|
||||||
|
Some(Some(versions)) => Some(versions),
|
||||||
|
Some(None) if self.is_new => return Ok(None),
|
||||||
|
Some(None) => {
|
||||||
|
return Ok(Some(DocumentChange::Deletion(Deletion::create(
|
||||||
|
self.docid,
|
||||||
|
external_doc,
|
||||||
|
))));
|
||||||
|
}
|
||||||
|
None => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => Versions::multiple(versions)?,
|
||||||
|
};
|
||||||
|
|
||||||
if self.is_new {
|
let Some(versions) = versions else {
|
||||||
|
return Ok(None);
|
||||||
|
};
|
||||||
|
|
||||||
|
if self.is_new || ast.is_some() {
|
||||||
Ok(Some(DocumentChange::Insertion(Insertion::create(
|
Ok(Some(DocumentChange::Insertion(Insertion::create(
|
||||||
self.docid,
|
self.docid,
|
||||||
external_doc,
|
external_doc,
|
||||||
|
@ -189,7 +189,7 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn obkv_to_rhaimap(obkv: &KvReaderFieldId, fields_ids_map: &FieldsIdsMap) -> Result<rhai::Map> {
|
pub fn obkv_to_rhaimap(obkv: &KvReaderFieldId, fields_ids_map: &FieldsIdsMap) -> Result<rhai::Map> {
|
||||||
let all_keys = obkv.iter().map(|(k, _v)| k).collect::<Vec<_>>();
|
let all_keys = obkv.iter().map(|(k, _v)| k).collect::<Vec<_>>();
|
||||||
let map: Result<rhai::Map> = all_keys
|
let map: Result<rhai::Map> = all_keys
|
||||||
.iter()
|
.iter()
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
use std::cell::RefCell;
|
use std::cell::RefCell;
|
||||||
use std::collections::BTreeSet;
|
use std::collections::BTreeSet;
|
||||||
use std::io::{BufReader, BufWriter, Read, Seek, Write};
|
use std::io::{BufReader, BufWriter, Read, Seek, Write};
|
||||||
|
use std::num::NonZeroUsize;
|
||||||
|
|
||||||
use hashbrown::HashMap;
|
use hashbrown::HashMap;
|
||||||
use heed::types::Bytes;
|
use heed::types::Bytes;
|
||||||
@ -217,7 +218,7 @@ impl WordPrefixIntegerDocids {
|
|||||||
index.push(PrefixIntegerEntry {
|
index.push(PrefixIntegerEntry {
|
||||||
prefix,
|
prefix,
|
||||||
pos,
|
pos,
|
||||||
serialized_length: Some(buffer.len()),
|
serialized_length: NonZeroUsize::new(buffer.len()),
|
||||||
});
|
});
|
||||||
file.write_all(buffer)?;
|
file.write_all(buffer)?;
|
||||||
}
|
}
|
||||||
@ -243,7 +244,7 @@ impl WordPrefixIntegerDocids {
|
|||||||
key_buffer.extend_from_slice(&pos.to_be_bytes());
|
key_buffer.extend_from_slice(&pos.to_be_bytes());
|
||||||
match serialized_length {
|
match serialized_length {
|
||||||
Some(serialized_length) => {
|
Some(serialized_length) => {
|
||||||
buffer.resize(serialized_length, 0);
|
buffer.resize(serialized_length.get(), 0);
|
||||||
file.read_exact(&mut buffer)?;
|
file.read_exact(&mut buffer)?;
|
||||||
self.prefix_database.remap_data_type::<Bytes>().put(
|
self.prefix_database.remap_data_type::<Bytes>().put(
|
||||||
wtxn,
|
wtxn,
|
||||||
@ -266,7 +267,7 @@ impl WordPrefixIntegerDocids {
|
|||||||
struct PrefixIntegerEntry<'a> {
|
struct PrefixIntegerEntry<'a> {
|
||||||
prefix: &'a str,
|
prefix: &'a str,
|
||||||
pos: u16,
|
pos: u16,
|
||||||
serialized_length: Option<usize>,
|
serialized_length: Option<NonZeroUsize>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// TODO doc
|
/// TODO doc
|
||||||
|
@ -183,6 +183,7 @@ pub struct Settings<'a, 't, 'i> {
|
|||||||
localized_attributes_rules: Setting<Vec<LocalizedAttributesRule>>,
|
localized_attributes_rules: Setting<Vec<LocalizedAttributesRule>>,
|
||||||
prefix_search: Setting<PrefixSearch>,
|
prefix_search: Setting<PrefixSearch>,
|
||||||
facet_search: Setting<bool>,
|
facet_search: Setting<bool>,
|
||||||
|
execute_after_update: Setting<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, 't, 'i> Settings<'a, 't, 'i> {
|
impl<'a, 't, 'i> Settings<'a, 't, 'i> {
|
||||||
@ -220,6 +221,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
|
|||||||
localized_attributes_rules: Setting::NotSet,
|
localized_attributes_rules: Setting::NotSet,
|
||||||
prefix_search: Setting::NotSet,
|
prefix_search: Setting::NotSet,
|
||||||
facet_search: Setting::NotSet,
|
facet_search: Setting::NotSet,
|
||||||
|
execute_after_update: Setting::NotSet,
|
||||||
indexer_config,
|
indexer_config,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -442,6 +444,14 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
|
|||||||
self.facet_search = Setting::Reset;
|
self.facet_search = Setting::Reset;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn set_execute_after_update(&mut self, value: String) {
|
||||||
|
self.execute_after_update = Setting::Set(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn reset_execute_after_update(&mut self) {
|
||||||
|
self.execute_after_update = Setting::Reset;
|
||||||
|
}
|
||||||
|
|
||||||
#[tracing::instrument(
|
#[tracing::instrument(
|
||||||
level = "trace"
|
level = "trace"
|
||||||
skip(self, progress_callback, should_abort, settings_diff),
|
skip(self, progress_callback, should_abort, settings_diff),
|
||||||
@ -994,6 +1004,18 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
|
|||||||
Ok(changed)
|
Ok(changed)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn update_execute_after_update(&mut self) -> Result<()> {
|
||||||
|
match self.execute_after_update.as_ref() {
|
||||||
|
Setting::Set(new) => {
|
||||||
|
self.index.put_execute_after_update(self.wtxn, &new).map_err(Into::into)
|
||||||
|
}
|
||||||
|
Setting::Reset => {
|
||||||
|
self.index.delete_execute_after_update(self.wtxn).map(drop).map_err(Into::into)
|
||||||
|
}
|
||||||
|
Setting::NotSet => Ok(()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn update_embedding_configs(&mut self) -> Result<BTreeMap<String, EmbedderAction>> {
|
fn update_embedding_configs(&mut self) -> Result<BTreeMap<String, EmbedderAction>> {
|
||||||
match std::mem::take(&mut self.embedder_settings) {
|
match std::mem::take(&mut self.embedder_settings) {
|
||||||
Setting::Set(configs) => self.update_embedding_configs_set(configs),
|
Setting::Set(configs) => self.update_embedding_configs_set(configs),
|
||||||
@ -1245,6 +1267,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
|
|||||||
self.update_proximity_precision()?;
|
self.update_proximity_precision()?;
|
||||||
self.update_prefix_search()?;
|
self.update_prefix_search()?;
|
||||||
self.update_facet_search()?;
|
self.update_facet_search()?;
|
||||||
|
self.update_execute_after_update()?;
|
||||||
self.update_localized_attributes_rules()?;
|
self.update_localized_attributes_rules()?;
|
||||||
|
|
||||||
let embedding_config_updates = self.update_embedding_configs()?;
|
let embedding_config_updates = self.update_embedding_configs()?;
|
||||||
|
@ -896,6 +896,7 @@ fn test_correct_settings_init() {
|
|||||||
localized_attributes_rules,
|
localized_attributes_rules,
|
||||||
prefix_search,
|
prefix_search,
|
||||||
facet_search,
|
facet_search,
|
||||||
|
execute_after_update,
|
||||||
} = settings;
|
} = settings;
|
||||||
assert!(matches!(searchable_fields, Setting::NotSet));
|
assert!(matches!(searchable_fields, Setting::NotSet));
|
||||||
assert!(matches!(displayed_fields, Setting::NotSet));
|
assert!(matches!(displayed_fields, Setting::NotSet));
|
||||||
@ -923,6 +924,7 @@ fn test_correct_settings_init() {
|
|||||||
assert!(matches!(localized_attributes_rules, Setting::NotSet));
|
assert!(matches!(localized_attributes_rules, Setting::NotSet));
|
||||||
assert!(matches!(prefix_search, Setting::NotSet));
|
assert!(matches!(prefix_search, Setting::NotSet));
|
||||||
assert!(matches!(facet_search, Setting::NotSet));
|
assert!(matches!(facet_search, Setting::NotSet));
|
||||||
|
assert!(matches!(execute_after_update, Setting::NotSet));
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user