Compare commits

...

8 Commits

14 changed files with 240 additions and 15 deletions

View File

@ -305,6 +305,7 @@ pub(crate) mod test {
localized_attributes: Setting::NotSet,
facet_search: Setting::NotSet,
prefix_search: Setting::NotSet,
execute_after_update: Setting::NotSet,
_kind: std::marker::PhantomData,
};
settings.check()

View File

@ -397,6 +397,7 @@ impl<T> From<v5::Settings<T>> for v6::Settings<v6::Unchecked> {
search_cutoff_ms: v6::Setting::NotSet,
facet_search: v6::Setting::NotSet,
prefix_search: v6::Setting::NotSet,
execute_after_update: v6::Setting::NotSet,
_kind: std::marker::PhantomData,
}
}

View File

@ -1,4 +1,5 @@
use std::fmt;
use std::io::ErrorKind;
use meilisearch_types::heed::RoTxn;
use meilisearch_types::milli::update::IndexDocumentsMethod;
@ -535,7 +536,11 @@ impl IndexScheduler {
.and_then(|task| task.ok_or(Error::CorruptedTaskQueue))?;
if let Some(uuid) = task.content_uuid() {
let content_size = self.queue.file_store.compute_size(uuid)?;
let content_size = match self.queue.file_store.compute_size(uuid) {
Ok(content_size) => content_size,
Err(file_store::Error::IoError(err)) if err.kind() == ErrorKind::NotFound => 0,
Err(otherwise) => return Err(otherwise.into()),
};
total_size = total_size.saturating_add(content_size);
}

View File

@ -312,6 +312,7 @@ InvalidSettingsDisplayedAttributes , InvalidRequest , BAD_REQUEST ;
InvalidSettingsDistinctAttribute , InvalidRequest , BAD_REQUEST ;
InvalidSettingsProximityPrecision , InvalidRequest , BAD_REQUEST ;
InvalidSettingsFacetSearch , InvalidRequest , BAD_REQUEST ;
InvalidSettingsexecuteAfterUpdate , InvalidRequest , BAD_REQUEST ;
InvalidSettingsPrefixSearch , InvalidRequest , BAD_REQUEST ;
InvalidSettingsFaceting , InvalidRequest , BAD_REQUEST ;
InvalidSettingsFilterableAttributes , InvalidRequest , BAD_REQUEST ;

View File

@ -289,6 +289,12 @@ pub struct Settings<T> {
#[schema(value_type = Option<PrefixSearchSettings>, example = json!("Hemlo"))]
pub prefix_search: Setting<PrefixSearchSettings>,
/// Function to execute after an update
#[serde(default, skip_serializing_if = "Setting::is_not_set")]
#[deserr(default, error = DeserrJsonError<InvalidSettingsexecuteAfterUpdate>)]
#[schema(value_type = Option<String>, example = json!("doc.likes += 1"))]
pub execute_after_update: Setting<String>,
#[serde(skip)]
#[deserr(skip)]
pub _kind: PhantomData<T>,
@ -354,6 +360,7 @@ impl Settings<Checked> {
localized_attributes: Setting::Reset,
facet_search: Setting::Reset,
prefix_search: Setting::Reset,
execute_after_update: Setting::Reset,
_kind: PhantomData,
}
}
@ -380,6 +387,7 @@ impl Settings<Checked> {
localized_attributes: localized_attributes_rules,
facet_search,
prefix_search,
execute_after_update,
_kind,
} = self;
@ -404,6 +412,7 @@ impl Settings<Checked> {
localized_attributes: localized_attributes_rules,
facet_search,
prefix_search,
execute_after_update,
_kind: PhantomData,
}
}
@ -454,6 +463,7 @@ impl Settings<Unchecked> {
localized_attributes: self.localized_attributes,
facet_search: self.facet_search,
prefix_search: self.prefix_search,
execute_after_update: self.execute_after_update,
_kind: PhantomData,
}
}
@ -530,6 +540,10 @@ impl Settings<Unchecked> {
},
prefix_search: other.prefix_search.or(self.prefix_search),
facet_search: other.facet_search.or(self.facet_search),
execute_after_update: other
.execute_after_update
.clone()
.or(self.execute_after_update.clone()),
_kind: PhantomData,
}
}
@ -568,6 +582,7 @@ pub fn apply_settings_to_builder(
localized_attributes: localized_attributes_rules,
facet_search,
prefix_search,
execute_after_update,
_kind,
} = settings;
@ -772,6 +787,14 @@ pub fn apply_settings_to_builder(
Setting::Reset => builder.reset_facet_search(),
Setting::NotSet => (),
}
match execute_after_update {
Setting::Set(execute_after_update) => {
builder.set_execute_after_update(execute_after_update.clone())
}
Setting::Reset => builder.reset_execute_after_update(),
Setting::NotSet => (),
}
}
pub enum SecretPolicy {
@ -867,14 +890,11 @@ pub fn settings(
})
.collect();
let embedders = Setting::Set(embedders);
let search_cutoff_ms = index.search_cutoff(rtxn)?;
let localized_attributes_rules = index.localized_attributes_rules(rtxn)?;
let prefix_search = index.prefix_search(rtxn)?.map(PrefixSearchSettings::from);
let facet_search = index.facet_search(rtxn)?;
let execute_after_update = index.execute_after_update(rtxn)?;
let mut settings = Settings {
displayed_attributes: match displayed_attributes {
@ -914,6 +934,10 @@ pub fn settings(
},
prefix_search: Setting::Set(prefix_search.unwrap_or_default()),
facet_search: Setting::Set(facet_search),
execute_after_update: match execute_after_update {
Some(function) => Setting::Set(function.to_string()),
None => Setting::NotSet,
},
_kind: PhantomData,
};
@ -1141,6 +1165,7 @@ pub(crate) mod test {
search_cutoff_ms: Setting::NotSet,
facet_search: Setting::NotSet,
prefix_search: Setting::NotSet,
execute_after_update: Setting::NotSet,
_kind: PhantomData::<Unchecked>,
};
@ -1172,6 +1197,7 @@ pub(crate) mod test {
search_cutoff_ms: Setting::NotSet,
facet_search: Setting::NotSet,
prefix_search: Setting::NotSet,
execute_after_update: Setting::NotSet,
_kind: PhantomData::<Unchecked>,
};

View File

@ -497,6 +497,17 @@ make_setting_routes!(
camelcase_attr: "facetSearch",
analytics: FacetSearchAnalytics
},
{
route: "/execute-after-update",
update_verb: put,
value_type: String,
err_type: meilisearch_types::deserr::DeserrJsonError<
meilisearch_types::error::deserr_codes::InvalidSettingsexecuteAfterUpdate,
>,
attr: execute_after_update,
camelcase_attr: "executeAfterUpdate",
analytics: ExecuteAfterUpdateAnalytics
},
{
route: "/prefix-search",
update_verb: put,
@ -596,6 +607,9 @@ pub async fn update_all(
new_settings.non_separator_tokens.as_ref().set(),
),
facet_search: FacetSearchAnalytics::new(new_settings.facet_search.as_ref().set()),
execute_after_update: ExecuteAfterUpdateAnalytics::new(
new_settings.execute_after_update.as_ref().set(),
),
prefix_search: PrefixSearchAnalytics::new(new_settings.prefix_search.as_ref().set()),
},
&req,

View File

@ -39,6 +39,7 @@ pub struct SettingsAnalytics {
pub non_separator_tokens: NonSeparatorTokensAnalytics,
pub facet_search: FacetSearchAnalytics,
pub prefix_search: PrefixSearchAnalytics,
pub execute_after_update: ExecuteAfterUpdateAnalytics,
}
impl Aggregate for SettingsAnalytics {
@ -194,6 +195,9 @@ impl Aggregate for SettingsAnalytics {
set: new.facet_search.set | self.facet_search.set,
value: new.facet_search.value.or(self.facet_search.value),
},
execute_after_update: ExecuteAfterUpdateAnalytics {
set: new.execute_after_update.set | self.execute_after_update.set,
},
prefix_search: PrefixSearchAnalytics {
set: new.prefix_search.set | self.prefix_search.set,
value: new.prefix_search.value.or(self.prefix_search.value),
@ -659,6 +663,21 @@ impl FacetSearchAnalytics {
}
}
#[derive(Serialize, Default)]
pub struct ExecuteAfterUpdateAnalytics {
pub set: bool,
}
impl ExecuteAfterUpdateAnalytics {
pub fn new(distinct: Option<&String>) -> Self {
Self { set: distinct.is_some() }
}
pub fn into_settings(self) -> SettingsAnalytics {
SettingsAnalytics { execute_after_update: self, ..Default::default() }
}
}
#[derive(Serialize, Default)]
pub struct PrefixSearchAnalytics {
pub set: bool,

View File

@ -76,6 +76,7 @@ pub mod main_key {
pub const SEARCH_CUTOFF: &str = "search_cutoff";
pub const LOCALIZED_ATTRIBUTES_RULES: &str = "localized_attributes_rules";
pub const FACET_SEARCH: &str = "facet_search";
pub const EXECUTE_AFTER_UPDATE: &str = "execute-after-update";
pub const PREFIX_SEARCH: &str = "prefix_search";
pub const DOCUMENTS_STATS: &str = "documents_stats";
}
@ -1623,6 +1624,22 @@ impl Index {
self.main.remap_key_type::<Str>().delete(txn, main_key::FACET_SEARCH)
}
pub fn execute_after_update<'t>(&self, txn: &'t RoTxn<'_>) -> heed::Result<Option<&'t str>> {
self.main.remap_types::<Str, Str>().get(txn, main_key::EXECUTE_AFTER_UPDATE)
}
pub(crate) fn put_execute_after_update(
&self,
txn: &mut RwTxn<'_>,
val: &str,
) -> heed::Result<()> {
self.main.remap_types::<Str, Str>().put(txn, main_key::EXECUTE_AFTER_UPDATE, &val)
}
pub(crate) fn delete_execute_after_update(&self, txn: &mut RwTxn<'_>) -> heed::Result<bool> {
self.main.remap_key_type::<Str>().delete(txn, main_key::EXECUTE_AFTER_UPDATE)
}
pub fn localized_attributes_rules(
&self,
rtxn: &RoTxn<'_>,

View File

@ -406,6 +406,71 @@ impl<'doc> Versions<'doc> {
Ok(Some(Self::single(data)))
}
pub fn multiple_with_edits(
doc: Option<rhai::Map>,
mut versions: impl Iterator<Item = Result<RawMap<'doc, FxBuildHasher>>>,
engine: &rhai::Engine,
edit_function: &rhai::AST,
doc_alloc: &'doc bumpalo::Bump,
) -> Result<Option<Option<Self>>> {
let Some(data) = versions.next() else { return Ok(None) };
let mut doc = doc.unwrap_or_default();
let mut data = data?;
for version in versions {
let version = version?;
for (field, value) in version {
data.insert(field, value);
}
let mut scope = rhai::Scope::new();
data.iter().for_each(|(k, v)| {
doc.insert(k.into(), serde_json::from_str(v.get()).unwrap());
});
scope.push("doc", doc.clone());
let _ = engine.eval_ast_with_scope::<rhai::Dynamic>(&mut scope, edit_function).unwrap();
data = RawMap::with_hasher_in(FxBuildHasher, doc_alloc);
match scope.get_value::<rhai::Map>("doc") {
Some(map) => {
for (key, value) in map {
let mut vec = bumpalo::collections::Vec::new_in(doc_alloc);
serde_json::to_writer(&mut vec, &value).unwrap();
let key = doc_alloc.alloc_str(key.as_str());
let raw_value = serde_json::from_slice(vec.into_bump_slice()).unwrap();
data.insert(key, raw_value);
}
}
// In case the deletes the document and it's not the last change
// we simply set the document to an empty one and await the next change.
None => (),
}
}
// We must also run the code after the last change
let mut scope = rhai::Scope::new();
data.iter().for_each(|(k, v)| {
doc.insert(k.into(), serde_json::from_str(v.get()).unwrap());
});
scope.push("doc", doc);
let _ = engine.eval_ast_with_scope::<rhai::Dynamic>(&mut scope, edit_function).unwrap();
data = RawMap::with_hasher_in(FxBuildHasher, doc_alloc);
match scope.get_value::<rhai::Map>("doc") {
Some(map) => {
for (key, value) in map {
let mut vec = bumpalo::collections::Vec::new_in(doc_alloc);
serde_json::to_writer(&mut vec, &value).unwrap();
let key = doc_alloc.alloc_str(key.as_str());
let raw_value = serde_json::from_slice(vec.into_bump_slice()).unwrap();
data.insert(key, raw_value);
}
Ok(Some(Some(Self::single(data))))
}
None => Ok(Some(None)),
}
}
pub fn single(version: RawMap<'doc, FxBuildHasher>) -> Self {
Self { data: version }
}

View File

@ -17,6 +17,7 @@ use super::guess_primary_key::retrieve_or_guess_primary_key;
use crate::documents::PrimaryKey;
use crate::progress::{AtomicPayloadStep, Progress};
use crate::update::new::document::Versions;
use crate::update::new::indexer::update_by_function::obkv_to_rhaimap;
use crate::update::new::steps::IndexingStep;
use crate::update::new::thread_local::MostlySend;
use crate::update::new::{Deletion, Insertion, Update};
@ -157,7 +158,16 @@ impl<'pl> DocumentOperation<'pl> {
.sort_unstable_by_key(|(_, po)| first_update_pointer(&po.operations).unwrap_or(0));
let docids_version_offsets = docids_version_offsets.into_bump_slice();
Ok((DocumentOperationChanges { docids_version_offsets }, operations_stats, primary_key))
let engine = rhai::Engine::new();
// Make sure to correctly setup the engine and remove all settings
let ast = index.execute_after_update(rtxn)?.map(|f| engine.compile(f).unwrap());
let fidmap = index.fields_ids_map(rtxn)?;
Ok((
DocumentOperationChanges { docids_version_offsets, engine, ast, fidmap },
operations_stats,
primary_key,
))
}
}
@ -418,7 +428,15 @@ impl<'pl> DocumentChanges<'pl> for DocumentOperationChanges<'pl> {
'pl: 'doc,
{
let (external_doc, payload_operations) = item;
payload_operations.merge(external_doc, &context.doc_alloc)
payload_operations.merge(
&context.rtxn,
context.index,
&self.fidmap,
&self.engine,
self.ast.as_ref(),
external_doc,
&context.doc_alloc,
)
}
fn len(&self) -> usize {
@ -427,6 +445,9 @@ impl<'pl> DocumentChanges<'pl> for DocumentOperationChanges<'pl> {
}
pub struct DocumentOperationChanges<'pl> {
engine: rhai::Engine,
ast: Option<rhai::AST>,
fidmap: FieldsIdsMap,
docids_version_offsets: &'pl [(&'pl str, PayloadOperations<'pl>)],
}
@ -489,10 +510,14 @@ impl<'pl> PayloadOperations<'pl> {
}
/// Returns only the most recent version of a document based on the updates from the payloads.
///
/// This function is only meant to be used when doing a replacement and not an update.
#[allow(clippy::too_many_arguments)]
fn merge<'doc>(
&self,
rtxn: &heed::RoTxn,
index: &Index,
fidmap: &FieldsIdsMap,
engine: &rhai::Engine,
ast: Option<&rhai::AST>,
external_doc: &'doc str,
doc_alloc: &'doc Bump,
) -> Result<Option<DocumentChange<'doc>>>
@ -556,9 +581,34 @@ impl<'pl> PayloadOperations<'pl> {
Ok(document)
});
let Some(versions) = Versions::multiple(versions)? else { return Ok(None) };
let versions = match ast {
Some(ast) => {
let doc = index
.documents
.get(rtxn, &self.docid)?
.map(|obkv| obkv_to_rhaimap(obkv, fidmap))
.transpose()?;
match Versions::multiple_with_edits(doc, versions, engine, ast, doc_alloc)?
{
Some(Some(versions)) => Some(versions),
Some(None) if self.is_new => return Ok(None),
Some(None) => {
return Ok(Some(DocumentChange::Deletion(Deletion::create(
self.docid,
external_doc,
))));
}
None => None,
}
}
None => Versions::multiple(versions)?,
};
if self.is_new {
let Some(versions) = versions else {
return Ok(None);
};
if self.is_new || ast.is_some() {
Ok(Some(DocumentChange::Insertion(Insertion::create(
self.docid,
external_doc,

View File

@ -189,7 +189,7 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> {
}
}
fn obkv_to_rhaimap(obkv: &KvReaderFieldId, fields_ids_map: &FieldsIdsMap) -> Result<rhai::Map> {
pub fn obkv_to_rhaimap(obkv: &KvReaderFieldId, fields_ids_map: &FieldsIdsMap) -> Result<rhai::Map> {
let all_keys = obkv.iter().map(|(k, _v)| k).collect::<Vec<_>>();
let map: Result<rhai::Map> = all_keys
.iter()

View File

@ -1,6 +1,7 @@
use std::cell::RefCell;
use std::collections::BTreeSet;
use std::io::{BufReader, BufWriter, Read, Seek, Write};
use std::num::NonZeroUsize;
use hashbrown::HashMap;
use heed::types::Bytes;
@ -217,7 +218,7 @@ impl WordPrefixIntegerDocids {
index.push(PrefixIntegerEntry {
prefix,
pos,
serialized_length: Some(buffer.len()),
serialized_length: NonZeroUsize::new(buffer.len()),
});
file.write_all(buffer)?;
}
@ -243,7 +244,7 @@ impl WordPrefixIntegerDocids {
key_buffer.extend_from_slice(&pos.to_be_bytes());
match serialized_length {
Some(serialized_length) => {
buffer.resize(serialized_length, 0);
buffer.resize(serialized_length.get(), 0);
file.read_exact(&mut buffer)?;
self.prefix_database.remap_data_type::<Bytes>().put(
wtxn,
@ -266,7 +267,7 @@ impl WordPrefixIntegerDocids {
struct PrefixIntegerEntry<'a> {
prefix: &'a str,
pos: u16,
serialized_length: Option<usize>,
serialized_length: Option<NonZeroUsize>,
}
/// TODO doc

View File

@ -183,6 +183,7 @@ pub struct Settings<'a, 't, 'i> {
localized_attributes_rules: Setting<Vec<LocalizedAttributesRule>>,
prefix_search: Setting<PrefixSearch>,
facet_search: Setting<bool>,
execute_after_update: Setting<String>,
}
impl<'a, 't, 'i> Settings<'a, 't, 'i> {
@ -220,6 +221,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
localized_attributes_rules: Setting::NotSet,
prefix_search: Setting::NotSet,
facet_search: Setting::NotSet,
execute_after_update: Setting::NotSet,
indexer_config,
}
}
@ -442,6 +444,14 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
self.facet_search = Setting::Reset;
}
pub fn set_execute_after_update(&mut self, value: String) {
self.execute_after_update = Setting::Set(value);
}
pub fn reset_execute_after_update(&mut self) {
self.execute_after_update = Setting::Reset;
}
#[tracing::instrument(
level = "trace"
skip(self, progress_callback, should_abort, settings_diff),
@ -994,6 +1004,18 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
Ok(changed)
}
fn update_execute_after_update(&mut self) -> Result<()> {
match self.execute_after_update.as_ref() {
Setting::Set(new) => {
self.index.put_execute_after_update(self.wtxn, &new).map_err(Into::into)
}
Setting::Reset => {
self.index.delete_execute_after_update(self.wtxn).map(drop).map_err(Into::into)
}
Setting::NotSet => Ok(()),
}
}
fn update_embedding_configs(&mut self) -> Result<BTreeMap<String, EmbedderAction>> {
match std::mem::take(&mut self.embedder_settings) {
Setting::Set(configs) => self.update_embedding_configs_set(configs),
@ -1245,6 +1267,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
self.update_proximity_precision()?;
self.update_prefix_search()?;
self.update_facet_search()?;
self.update_execute_after_update()?;
self.update_localized_attributes_rules()?;
let embedding_config_updates = self.update_embedding_configs()?;

View File

@ -896,6 +896,7 @@ fn test_correct_settings_init() {
localized_attributes_rules,
prefix_search,
facet_search,
execute_after_update,
} = settings;
assert!(matches!(searchable_fields, Setting::NotSet));
assert!(matches!(displayed_fields, Setting::NotSet));
@ -923,6 +924,7 @@ fn test_correct_settings_init() {
assert!(matches!(localized_attributes_rules, Setting::NotSet));
assert!(matches!(prefix_search, Setting::NotSet));
assert!(matches!(facet_search, Setting::NotSet));
assert!(matches!(execute_after_update, Setting::NotSet));
})
.unwrap();
}