mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-10-24 04:26:27 +00:00
Support user-provided context for documents edition
This commit is contained in:
@@ -34,7 +34,7 @@ use meilisearch_types::milli::update::{
|
||||
use meilisearch_types::milli::vector::parsed_vectors::{
|
||||
ExplicitVectors, VectorOrArrayOfVectors, RESERVED_VECTORS_FIELD_NAME,
|
||||
};
|
||||
use meilisearch_types::milli::{self, Filter};
|
||||
use meilisearch_types::milli::{self, Filter, Object};
|
||||
use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked};
|
||||
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task};
|
||||
use meilisearch_types::{compression, Index, VERSION_FILE_NAME};
|
||||
@@ -1411,37 +1411,43 @@ impl IndexScheduler {
|
||||
Ok(tasks)
|
||||
}
|
||||
IndexOperation::DocumentEdition { mut task, .. } => {
|
||||
let (filter, function) =
|
||||
if let KindWithContent::DocumentEdition { filter_expr, function, .. } =
|
||||
&task.kind
|
||||
let (filter, context, function) =
|
||||
if let KindWithContent::DocumentEdition {
|
||||
filter_expr, context, function, ..
|
||||
} = &task.kind
|
||||
{
|
||||
(filter_expr, function)
|
||||
(filter_expr, context, function)
|
||||
} else {
|
||||
unreachable!()
|
||||
};
|
||||
let edited_documents = edit_documents_by_function(
|
||||
index_wtxn,
|
||||
filter,
|
||||
context.clone(),
|
||||
function,
|
||||
self.index_mapper.indexer_config(),
|
||||
self.must_stop_processing.clone(),
|
||||
index,
|
||||
);
|
||||
let (original_filter, function) =
|
||||
if let Some(Details::DocumentEdition { original_filter, function, .. }) =
|
||||
task.details
|
||||
{
|
||||
(original_filter, function)
|
||||
} else {
|
||||
// In the case of a `documentDeleteByFilter` the details MUST be set
|
||||
unreachable!();
|
||||
};
|
||||
let (original_filter, context, function) = if let Some(Details::DocumentEdition {
|
||||
original_filter,
|
||||
context,
|
||||
function,
|
||||
..
|
||||
}) = task.details
|
||||
{
|
||||
(original_filter, context, function)
|
||||
} else {
|
||||
// In the case of a `documentDeleteByFilter` the details MUST be set
|
||||
unreachable!();
|
||||
};
|
||||
|
||||
match edited_documents {
|
||||
Ok(edited_documents) => {
|
||||
task.status = Status::Succeeded;
|
||||
task.details = Some(Details::DocumentEdition {
|
||||
original_filter,
|
||||
context,
|
||||
function,
|
||||
edited_documents: Some(edited_documents),
|
||||
});
|
||||
@@ -1450,6 +1456,7 @@ impl IndexScheduler {
|
||||
task.status = Status::Failed;
|
||||
task.details = Some(Details::DocumentEdition {
|
||||
original_filter,
|
||||
context,
|
||||
function,
|
||||
edited_documents: Some(0),
|
||||
});
|
||||
@@ -1751,6 +1758,7 @@ fn delete_document_by_filter<'a>(
|
||||
fn edit_documents_by_function<'a>(
|
||||
wtxn: &mut RwTxn<'a>,
|
||||
filter: &Option<serde_json::Value>,
|
||||
context: Option<Object>,
|
||||
code: &str,
|
||||
indexer_config: &IndexerConfig,
|
||||
must_stop_processing: MustStopProcessing,
|
||||
@@ -1781,7 +1789,7 @@ fn edit_documents_by_function<'a>(
|
||||
|| must_stop_processing.get(),
|
||||
)?;
|
||||
|
||||
let (new_builder, count) = builder.edit_documents(&candidates, code)?;
|
||||
let (new_builder, count) = builder.edit_documents(&candidates, context, code)?;
|
||||
builder = new_builder;
|
||||
|
||||
let _ = builder.execute()?;
|
||||
|
@@ -179,10 +179,11 @@ fn snapshot_details(d: &Details) -> String {
|
||||
}
|
||||
Details::DocumentEdition {
|
||||
edited_documents,
|
||||
function,
|
||||
original_filter,
|
||||
context,
|
||||
function,
|
||||
} => {
|
||||
format!("{{ edited_documents: {edited_documents:?}, function: {function:?}, original_filter: {original_filter:?} }}")
|
||||
format!("{{ edited_documents: {edited_documents:?}, context: {context:?}, function: {function:?}, original_filter: {original_filter:?} }}")
|
||||
}
|
||||
Details::SettingsUpdate { settings } => {
|
||||
format!("{{ settings: {settings:?} }}")
|
||||
|
@@ -1,3 +1,4 @@
|
||||
use milli::Object;
|
||||
use serde::Serialize;
|
||||
use time::{Duration, OffsetDateTime};
|
||||
|
||||
@@ -72,6 +73,8 @@ pub struct DetailsView {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub dump_uid: Option<Option<String>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub context: Option<Option<Object>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub function: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(flatten)]
|
||||
@@ -90,10 +93,11 @@ impl From<Details> for DetailsView {
|
||||
..DetailsView::default()
|
||||
}
|
||||
}
|
||||
Details::DocumentEdition { edited_documents, original_filter, function } => {
|
||||
Details::DocumentEdition { edited_documents, original_filter, context, function } => {
|
||||
DetailsView {
|
||||
edited_documents: Some(edited_documents),
|
||||
original_filter: Some(original_filter),
|
||||
context: Some(context),
|
||||
function: Some(function),
|
||||
..DetailsView::default()
|
||||
}
|
||||
|
@@ -5,6 +5,7 @@ use std::str::FromStr;
|
||||
|
||||
use enum_iterator::Sequence;
|
||||
use milli::update::IndexDocumentsMethod;
|
||||
use milli::Object;
|
||||
use roaring::RoaringBitmap;
|
||||
use serde::{Deserialize, Serialize, Serializer};
|
||||
use time::{Duration, OffsetDateTime};
|
||||
@@ -99,6 +100,7 @@ pub enum KindWithContent {
|
||||
DocumentEdition {
|
||||
index_uid: String,
|
||||
filter_expr: Option<serde_json::Value>,
|
||||
context: Option<milli::Object>,
|
||||
function: String,
|
||||
},
|
||||
DocumentDeletion {
|
||||
@@ -211,10 +213,11 @@ impl KindWithContent {
|
||||
indexed_documents: None,
|
||||
})
|
||||
}
|
||||
KindWithContent::DocumentEdition { index_uid: _, function, filter_expr } => {
|
||||
KindWithContent::DocumentEdition { index_uid: _, filter_expr, context, function } => {
|
||||
Some(Details::DocumentEdition {
|
||||
edited_documents: None,
|
||||
original_filter: filter_expr.as_ref().map(|v| v.to_string()),
|
||||
context: context.clone(),
|
||||
function: function.clone(),
|
||||
})
|
||||
}
|
||||
@@ -266,10 +269,11 @@ impl KindWithContent {
|
||||
indexed_documents: Some(0),
|
||||
})
|
||||
}
|
||||
KindWithContent::DocumentEdition { index_uid: _, filter_expr, function } => {
|
||||
KindWithContent::DocumentEdition { index_uid: _, filter_expr, context, function } => {
|
||||
Some(Details::DocumentEdition {
|
||||
edited_documents: Some(0),
|
||||
original_filter: filter_expr.as_ref().map(|v| v.to_string()),
|
||||
context: context.clone(),
|
||||
function: function.clone(),
|
||||
})
|
||||
}
|
||||
@@ -531,6 +535,7 @@ pub enum Details {
|
||||
DocumentEdition {
|
||||
edited_documents: Option<u64>,
|
||||
original_filter: Option<String>,
|
||||
context: Option<Object>,
|
||||
function: String,
|
||||
},
|
||||
SettingsUpdate {
|
||||
|
@@ -580,6 +580,8 @@ pub async fn delete_documents_by_filter(
|
||||
pub struct DocumentEditionByFunction {
|
||||
#[deserr(default, error = DeserrJsonError<InvalidDocumentFilter>)]
|
||||
filter: Option<Value>,
|
||||
#[deserr(default, error = DeserrJsonError<InvalidDocumentFilter>)]
|
||||
context: Option<Value>,
|
||||
#[deserr(error = DeserrJsonError<InvalidDocumentFilter>, missing_field_error = DeserrJsonError::missing_document_filter)]
|
||||
function: String,
|
||||
}
|
||||
@@ -595,7 +597,7 @@ pub async fn edit_documents_by_function(
|
||||
debug!(parameters = ?body, "Edit documents by function");
|
||||
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
||||
let index_uid = index_uid.into_inner();
|
||||
let DocumentEditionByFunction { filter, function } = body.into_inner();
|
||||
let DocumentEditionByFunction { filter, context, function } = body.into_inner();
|
||||
|
||||
// analytics.delete_documents(DocumentDeletionKind::PerFilter, &req);
|
||||
|
||||
@@ -612,8 +614,15 @@ pub async fn edit_documents_by_function(
|
||||
// and whatever was the error, the error code should always be an InvalidDocumentFilter
|
||||
.map_err(|err| ResponseError::from_msg(err.message, Code::InvalidDocumentFilter))?;
|
||||
}
|
||||
let task =
|
||||
KindWithContent::DocumentEdition { index_uid, filter_expr: filter, function: function };
|
||||
let task = KindWithContent::DocumentEdition {
|
||||
index_uid,
|
||||
filter_expr: filter,
|
||||
context: context.map(|v| match v {
|
||||
serde_json::Value::Object(m) => m,
|
||||
_ => panic!("The context must be an Object"),
|
||||
}),
|
||||
function,
|
||||
};
|
||||
|
||||
let uid = get_task_id(&req, &opt)?;
|
||||
let dry_run = is_dry_run(&req, &opt)?;
|
||||
|
@@ -16,7 +16,7 @@ use grenad::{Merger, MergerBuilder};
|
||||
use heed::types::Str;
|
||||
use heed::Database;
|
||||
use rand::SeedableRng;
|
||||
use rhai::{Dynamic, Engine, Scope};
|
||||
use rhai::{Dynamic, Engine, OptimizationLevel, Scope};
|
||||
use roaring::RoaringBitmap;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use slice_group_by::GroupBy;
|
||||
@@ -177,6 +177,7 @@ where
|
||||
pub fn edit_documents(
|
||||
self,
|
||||
documents: &RoaringBitmap,
|
||||
context: Option<Object>,
|
||||
code: &str,
|
||||
) -> Result<(Self, StdResult<u64, UserError>)> {
|
||||
// Early return when there is no document to add
|
||||
@@ -218,6 +219,7 @@ where
|
||||
}
|
||||
|
||||
let mut engine = Engine::new();
|
||||
engine.set_optimization_level(OptimizationLevel::Full);
|
||||
//It is an arbitrary value. We need to let users define this in the settings.
|
||||
engine.set_max_operations(1_000_000);
|
||||
|
||||
@@ -227,6 +229,11 @@ where
|
||||
let primary_key_id = fields_ids_map.id(primary_key).unwrap();
|
||||
let mut documents_batch_builder = tempfile::tempfile().map(DocumentsBatchBuilder::new)?;
|
||||
|
||||
let context: Dynamic = match context {
|
||||
Some(context) => serde_json::from_value(context.into()).unwrap(),
|
||||
None => Dynamic::from(()),
|
||||
};
|
||||
|
||||
for docid in documents {
|
||||
let (document, document_object, document_id) =
|
||||
match self.index.documents.get(self.wtxn, &docid)? {
|
||||
@@ -242,6 +249,7 @@ where
|
||||
};
|
||||
|
||||
let mut scope = Scope::new();
|
||||
scope.push_constant_dynamic("context", context.clone());
|
||||
scope.push("doc", document);
|
||||
let _ = engine.eval_ast_with_scope::<Dynamic>(&mut scope, &ast).unwrap();
|
||||
let new_document = scope.remove("doc").unwrap();
|
||||
|
Reference in New Issue
Block a user