Support filtering the documents to edit with lua

This commit is contained in:
Clément Renault
2024-05-08 15:53:40 +02:00
parent 1702b5cf44
commit ba85959642
5 changed files with 153 additions and 17 deletions

View File

@@ -1410,8 +1410,55 @@ impl IndexScheduler {
Ok(tasks)
}
IndexOperation::DocumentEdition { .. } => {
todo!()
IndexOperation::DocumentEdition { mut task, .. } => {
let (filter, edition_code) =
if let KindWithContent::DocumentEdition { filter_expr, edition_code, .. } =
&task.kind
{
(filter_expr, edition_code)
} else {
unreachable!()
};
let edited_documents = edit_documents_by_function(
index_wtxn,
filter,
edition_code,
self.index_mapper.indexer_config(),
self.must_stop_processing.clone(),
index,
);
let (original_filter, edition_code) =
if let Some(Details::DocumentEdition {
original_filter, edition_code, ..
}) = task.details
{
(original_filter, edition_code)
} else {
// In the case of a `documentDeleteByFilter` the details MUST be set
unreachable!();
};
match edited_documents {
Ok(edited_documents) => {
task.status = Status::Succeeded;
task.details = Some(Details::DocumentEdition {
original_filter,
edition_code,
edited_documents: Some(edited_documents),
});
}
Err(e) => {
task.status = Status::Failed;
task.details = Some(Details::DocumentEdition {
original_filter,
edition_code,
edited_documents: Some(0),
});
task.error = Some(e.into());
}
}
Ok(vec![task])
}
IndexOperation::IndexDocumentDeletionByFilter { mut task, index_uid: _ } => {
let filter =
@@ -1701,3 +1748,45 @@ fn delete_document_by_filter<'a>(
0
})
}
fn edit_documents_by_function<'a>(
wtxn: &mut RwTxn<'a>,
filter: &serde_json::Value,
code: &str,
indexer_config: &IndexerConfig,
must_stop_processing: MustStopProcessing,
index: &'a Index,
) -> Result<u64> {
let filter = Filter::from_json(filter)?;
Ok(if let Some(filter) = filter {
let candidates = filter.evaluate(wtxn, index).map_err(|err| match err {
milli::Error::UserError(milli::UserError::InvalidFilter(_)) => {
Error::from(err).with_custom_error_code(Code::InvalidDocumentFilter)
}
e => e.into(),
})?;
let config = IndexDocumentsConfig {
update_method: IndexDocumentsMethod::ReplaceDocuments,
..Default::default()
};
let mut builder = milli::update::IndexDocuments::new(
wtxn,
index,
indexer_config,
config,
|indexing_step| tracing::debug!(update = ?indexing_step),
|| must_stop_processing.get(),
)?;
todo!("edit documents with the code and reinsert them in the builder")
// let (new_builder, count) = builder.remove_documents_from_db_no_batch(&candidates)?;
// builder = new_builder;
// let _ = builder.execute()?;
// count
} else {
0
})
}

View File

@@ -180,8 +180,9 @@ fn snapshot_details(d: &Details) -> String {
Details::DocumentEdition {
edited_documents,
edition_code,
original_filter,
} => {
format!("{{ edited_documents: {edited_documents:?}, edition_code: {edition_code:?} }}")
format!("{{ edited_documents: {edited_documents:?}, edition_code: {edition_code:?}, original_filter: {original_filter:?} }}")
}
Details::SettingsUpdate { settings } => {
format!("{{ settings: {settings:?} }}")

View File

@@ -90,11 +90,14 @@ impl From<Details> for DetailsView {
..DetailsView::default()
}
}
Details::DocumentEdition { edited_documents, edition_code } => DetailsView {
Details::DocumentEdition { edited_documents, original_filter, edition_code } => {
DetailsView {
edited_documents: Some(edited_documents),
original_filter: Some(Some(original_filter)),
edition_code: Some(edition_code),
..DetailsView::default()
},
}
}
Details::SettingsUpdate { mut settings } => {
settings.hide_secrets();
DetailsView { settings: Some(settings), ..DetailsView::default() }

View File

@@ -98,6 +98,7 @@ pub enum KindWithContent {
},
DocumentEdition {
index_uid: String,
filter_expr: serde_json::Value,
edition_code: String,
},
DocumentDeletion {
@@ -210,9 +211,10 @@ impl KindWithContent {
indexed_documents: None,
})
}
KindWithContent::DocumentEdition { edition_code, .. } => {
KindWithContent::DocumentEdition { index_uid: _, edition_code, filter_expr } => {
Some(Details::DocumentEdition {
edited_documents: None,
original_filter: filter_expr.to_string(),
edition_code: edition_code.clone(),
})
}
@@ -264,9 +266,10 @@ impl KindWithContent {
indexed_documents: Some(0),
})
}
KindWithContent::DocumentEdition { edition_code, .. } => {
KindWithContent::DocumentEdition { index_uid: _, filter_expr, edition_code } => {
Some(Details::DocumentEdition {
edited_documents: Some(0),
original_filter: filter_expr.to_string(),
edition_code: edition_code.clone(),
})
}
@@ -321,12 +324,7 @@ impl From<&KindWithContent> for Option<Details> {
indexed_documents: None,
})
}
KindWithContent::DocumentEdition { edition_code, .. } => {
Some(Details::DocumentEdition {
edited_documents: None,
edition_code: edition_code.clone(),
})
}
KindWithContent::DocumentEdition { .. } => None,
KindWithContent::DocumentDeletion { .. } => None,
KindWithContent::DocumentDeletionByFilter { .. } => None,
KindWithContent::DocumentClear { .. } => None,
@@ -527,7 +525,7 @@ impl std::error::Error for ParseTaskKindError {}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub enum Details {
DocumentAdditionOrUpdate { received_documents: u64, indexed_documents: Option<u64> },
DocumentEdition { edited_documents: Option<u64>, edition_code: String },
DocumentEdition { edited_documents: Option<u64>, original_filter: String, edition_code: String },
SettingsUpdate { settings: Box<Settings<Unchecked>> },
IndexInfo { primary_key: Option<String> },
DocumentDeletion { provided_ids: usize, deleted_documents: Option<u64> },

View File

@@ -82,6 +82,7 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
web::resource("/delete-batch").route(web::post().to(SeqHandler(delete_documents_batch))),
)
.service(web::resource("/delete").route(web::post().to(SeqHandler(delete_documents_by_filter))))
.service(web::resource("/edit").route(web::post().to(SeqHandler(edit_documents_by_function))))
.service(web::resource("/fetch").route(web::post().to(SeqHandler(documents_by_query_post))))
.service(
web::resource("/{document_id}")
@@ -574,6 +575,50 @@ pub async fn delete_documents_by_filter(
Ok(HttpResponse::Accepted().json(task))
}
#[derive(Debug, Deserr)]
#[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)]
pub struct DocumentEditionByFunction {
#[deserr(error = DeserrJsonError<InvalidDocumentFilter>, missing_field_error = DeserrJsonError::missing_document_filter)]
filter: Value,
#[deserr(error = DeserrJsonError<InvalidDocumentFilter>, missing_field_error = DeserrJsonError::missing_document_filter)]
function: String,
}
pub async fn edit_documents_by_function(
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_ADD }>, Data<IndexScheduler>>,
index_uid: web::Path<String>,
body: AwebJson<DocumentEditionByFunction, DeserrJsonError>,
req: HttpRequest,
opt: web::Data<Opt>,
_analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> {
debug!(parameters = ?body, "Edit documents by function");
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
let index_uid = index_uid.into_inner();
let DocumentEditionByFunction { filter, function } = body.into_inner();
// analytics.delete_documents(DocumentDeletionKind::PerFilter, &req);
// we ensure the filter is well formed before enqueuing it
|| -> Result<_, ResponseError> {
Ok(crate::search::parse_filter(&filter)?.ok_or(MeilisearchHttpError::EmptyFilter)?)
}()
// and whatever was the error, the error code should always be an InvalidDocumentFilter
.map_err(|err| ResponseError::from_msg(err.message, Code::InvalidDocumentFilter))?;
let task =
KindWithContent::DocumentEdition { index_uid, filter_expr: filter, edition_code: function };
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
debug!(returns = ?task, "Delete documents by filter");
Ok(HttpResponse::Accepted().json(task))
}
pub async fn clear_all_documents(
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>,
index_uid: web::Path<String>,