Support deleting documents with functions

This commit is contained in:
Clément Renault
2024-05-10 23:26:55 +02:00
parent 400e6b93ce
commit 33fa17bf12
5 changed files with 47 additions and 19 deletions

View File

@@ -1420,7 +1420,7 @@ impl IndexScheduler {
} else { } else {
unreachable!() unreachable!()
}; };
let edited_documents = edit_documents_by_function( let result_count = edit_documents_by_function(
index_wtxn, index_wtxn,
filter, filter,
context.clone(), context.clone(),
@@ -1442,13 +1442,14 @@ impl IndexScheduler {
unreachable!(); unreachable!();
}; };
match edited_documents { match result_count {
Ok(edited_documents) => { Ok((deleted_documents, edited_documents)) => {
task.status = Status::Succeeded; task.status = Status::Succeeded;
task.details = Some(Details::DocumentEdition { task.details = Some(Details::DocumentEdition {
original_filter, original_filter,
context, context,
function, function,
deleted_documents: Some(deleted_documents),
edited_documents: Some(edited_documents), edited_documents: Some(edited_documents),
}); });
} }
@@ -1458,6 +1459,7 @@ impl IndexScheduler {
original_filter, original_filter,
context, context,
function, function,
deleted_documents: Some(0),
edited_documents: Some(0), edited_documents: Some(0),
}); });
task.error = Some(e.into()); task.error = Some(e.into());
@@ -1763,7 +1765,7 @@ fn edit_documents_by_function<'a>(
indexer_config: &IndexerConfig, indexer_config: &IndexerConfig,
must_stop_processing: MustStopProcessing, must_stop_processing: MustStopProcessing,
index: &'a Index, index: &'a Index,
) -> Result<u64> { ) -> Result<(u64, u64)> {
let candidates = match filter.as_ref().map(Filter::from_json) { let candidates = match filter.as_ref().map(Filter::from_json) {
Some(Ok(Some(filter))) => filter.evaluate(wtxn, index).map_err(|err| match err { Some(Ok(Some(filter))) => filter.evaluate(wtxn, index).map_err(|err| match err {
milli::Error::UserError(milli::UserError::InvalidFilter(_)) => { milli::Error::UserError(milli::UserError::InvalidFilter(_)) => {

View File

@@ -178,12 +178,15 @@ fn snapshot_details(d: &Details) -> String {
format!("{{ received_documents: {received_documents}, indexed_documents: {indexed_documents:?} }}") format!("{{ received_documents: {received_documents}, indexed_documents: {indexed_documents:?} }}")
} }
Details::DocumentEdition { Details::DocumentEdition {
deleted_documents,
edited_documents, edited_documents,
original_filter, original_filter,
context, context,
function, function,
} => { } => {
format!("{{ edited_documents: {edited_documents:?}, context: {context:?}, function: {function:?}, original_filter: {original_filter:?} }}") format!(
"{{ deleted_documents: {deleted_documents:?}, edited_documents: {edited_documents:?}, context: {context:?}, function: {function:?}, original_filter: {original_filter:?} }}"
)
} }
Details::SettingsUpdate { settings } => { Details::SettingsUpdate { settings } => {
format!("{{ settings: {settings:?} }}") format!("{{ settings: {settings:?} }}")

View File

@@ -93,15 +93,20 @@ impl From<Details> for DetailsView {
..DetailsView::default() ..DetailsView::default()
} }
} }
Details::DocumentEdition { edited_documents, original_filter, context, function } => { Details::DocumentEdition {
DetailsView { deleted_documents,
edited_documents,
original_filter,
context,
function,
} => DetailsView {
deleted_documents: Some(deleted_documents),
edited_documents: Some(edited_documents), edited_documents: Some(edited_documents),
original_filter: Some(original_filter), original_filter: Some(original_filter),
context: Some(context), context: Some(context),
function: Some(function), function: Some(function),
..DetailsView::default() ..DetailsView::default()
} },
}
Details::SettingsUpdate { mut settings } => { Details::SettingsUpdate { mut settings } => {
settings.hide_secrets(); settings.hide_secrets();
DetailsView { settings: Some(settings), ..DetailsView::default() } DetailsView { settings: Some(settings), ..DetailsView::default() }

View File

@@ -215,6 +215,7 @@ impl KindWithContent {
} }
KindWithContent::DocumentEdition { index_uid: _, filter_expr, context, function } => { KindWithContent::DocumentEdition { index_uid: _, filter_expr, context, function } => {
Some(Details::DocumentEdition { Some(Details::DocumentEdition {
deleted_documents: None,
edited_documents: None, edited_documents: None,
original_filter: filter_expr.as_ref().map(|v| v.to_string()), original_filter: filter_expr.as_ref().map(|v| v.to_string()),
context: context.clone(), context: context.clone(),
@@ -271,6 +272,7 @@ impl KindWithContent {
} }
KindWithContent::DocumentEdition { index_uid: _, filter_expr, context, function } => { KindWithContent::DocumentEdition { index_uid: _, filter_expr, context, function } => {
Some(Details::DocumentEdition { Some(Details::DocumentEdition {
deleted_documents: Some(0),
edited_documents: Some(0), edited_documents: Some(0),
original_filter: filter_expr.as_ref().map(|v| v.to_string()), original_filter: filter_expr.as_ref().map(|v| v.to_string()),
context: context.clone(), context: context.clone(),
@@ -533,6 +535,7 @@ pub enum Details {
indexed_documents: Option<u64>, indexed_documents: Option<u64>,
}, },
DocumentEdition { DocumentEdition {
deleted_documents: Option<u64>,
edited_documents: Option<u64>, edited_documents: Option<u64>,
original_filter: Option<String>, original_filter: Option<String>,
context: Option<Object>, context: Option<Object>,

View File

@@ -179,10 +179,10 @@ where
documents: &RoaringBitmap, documents: &RoaringBitmap,
context: Option<Object>, context: Option<Object>,
code: &str, code: &str,
) -> Result<(Self, StdResult<u64, UserError>)> { ) -> Result<(Self, StdResult<(u64, u64), UserError>)> {
// Early return when there is no document to add // Early return when there is no document to add
if documents.is_empty() { if documents.is_empty() {
return Ok((self, Ok(0))); return Ok((self, Ok((0, 0))));
} }
/// Transform every field of a raw obkv store into a Rhai Map. /// Transform every field of a raw obkv store into a Rhai Map.
@@ -228,6 +228,7 @@ where
let primary_key = self.index.primary_key(self.wtxn)?.unwrap(); let primary_key = self.index.primary_key(self.wtxn)?.unwrap();
let primary_key_id = fields_ids_map.id(primary_key).unwrap(); let primary_key_id = fields_ids_map.id(primary_key).unwrap();
let mut documents_batch_builder = tempfile::tempfile().map(DocumentsBatchBuilder::new)?; let mut documents_batch_builder = tempfile::tempfile().map(DocumentsBatchBuilder::new)?;
let mut documents_to_remove = RoaringBitmap::new();
let context: Dynamic = match context { let context: Dynamic = match context {
Some(context) => serde_json::from_value(context.into()).unwrap(), Some(context) => serde_json::from_value(context.into()).unwrap(),
@@ -252,8 +253,19 @@ where
scope.push_constant_dynamic("context", context.clone()); scope.push_constant_dynamic("context", context.clone());
scope.push("doc", document); scope.push("doc", document);
let _ = engine.eval_ast_with_scope::<Dynamic>(&mut scope, &ast).unwrap(); let _ = engine.eval_ast_with_scope::<Dynamic>(&mut scope, &ast).unwrap();
let new_document = scope.remove("doc").unwrap(); let new_document = match scope.remove::<Dynamic>("doc") {
let new_document = rhaimap_to_object(new_document); // If the "doc" variable has been removed from the scope
// or set to (), we effectively delete the document.
Some(doc) if doc.is_unit() => {
documents_to_remove.push(docid);
continue;
}
None => unreachable!(),
Some(document) => match document.try_cast() {
Some(document) => rhaimap_to_object(document),
None => panic!("Why is \"doc\" no longer a Map?"),
},
};
if document_object != new_document { if document_object != new_document {
assert_eq!( assert_eq!(
@@ -268,7 +280,10 @@ where
let file = documents_batch_builder.into_inner()?; let file = documents_batch_builder.into_inner()?;
let reader = DocumentsBatchReader::from_reader(file)?; let reader = DocumentsBatchReader::from_reader(file)?;
self.add_documents(reader) let (this, removed) = self.remove_documents_from_db_no_batch(&documents_to_remove)?;
let (this, result) = this.add_documents(reader)?;
Ok((this, result.map(|added| (removed, added))))
} }
pub fn with_embedders(mut self, embedders: EmbeddingConfigs) -> Self { pub fn with_embedders(mut self, embedders: EmbeddingConfigs) -> Self {