Compare commits

...

7 Commits

12 changed files with 454 additions and 17 deletions

73
Cargo.lock generated
View File

@ -262,6 +262,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011"
dependencies = [
"cfg-if",
"const-random",
"getrandom",
"once_cell",
"version_check",
@ -1049,6 +1050,26 @@ dependencies = [
"windows-sys 0.45.0",
]
[[package]]
name = "const-random"
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359"
dependencies = [
"const-random-macro",
]
[[package]]
name = "const-random-macro"
version = "0.1.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e"
dependencies = [
"getrandom",
"once_cell",
"tiny-keccak",
]
[[package]]
name = "constant_time_eq"
version = "0.1.5"
@ -3515,6 +3536,7 @@ dependencies = [
"rand",
"rand_pcg",
"rayon",
"rhai",
"roaring",
"rstar",
"serde",
@ -4422,6 +4444,35 @@ version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c31b5c4033f8fdde8700e4657be2c497e7288f01515be52168c631e2e4d4086"
[[package]]
name = "rhai"
version = "1.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a7d88770120601ba1e548bb6bc2a05019e54ff01b51479e38e64ec3b59d4759"
dependencies = [
"ahash",
"bitflags 2.5.0",
"instant",
"num-traits",
"once_cell",
"rhai_codegen",
"serde",
"smallvec",
"smartstring",
"thin-vec",
]
[[package]]
name = "rhai_codegen"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59aecf17969c04b9c0c5d21f6bc9da9fec9dd4980e64d1871443a476589d8c86"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.60",
]
[[package]]
name = "ring"
version = "0.17.8"
@ -4835,6 +4886,9 @@ name = "smallvec"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2593d31f82ead8df961d8bd23a64c2ccf2eb5dd34b0a34bfb4dd54011c72009e"
dependencies = [
"serde",
]
[[package]]
name = "smartstring"
@ -4843,6 +4897,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fb72c633efbaa2dd666986505016c32c3044395ceaf881518399d2f4127ee29"
dependencies = [
"autocfg",
"serde",
"static_assertions",
"version_check",
]
@ -5089,6 +5144,15 @@ dependencies = [
"winapi-util",
]
[[package]]
name = "thin-vec"
version = "0.2.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a38c90d48152c236a3ab59271da4f4ae63d678c5d7ad6b7714d7cb9760be5e4b"
dependencies = [
"serde",
]
[[package]]
name = "thiserror"
version = "1.0.58"
@ -5167,6 +5231,15 @@ dependencies = [
"time-core",
]
[[package]]
name = "tiny-keccak"
version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237"
dependencies = [
"crunchy",
]
[[package]]
name = "tinytemplate"
version = "1.2.1"

View File

@ -166,6 +166,7 @@ impl From<KindWithContent> for KindDump {
documents_count,
allow_index_creation,
},
KindWithContent::DocumentEdition { .. } => todo!(),
KindWithContent::DocumentDeletion { documents_ids, .. } => {
KindDump::DocumentDeletion { documents_ids }
}

View File

@ -24,6 +24,7 @@ enum AutobatchKind {
allow_index_creation: bool,
primary_key: Option<String>,
},
DocumentEdition,
DocumentDeletion,
DocumentDeletionByFilter,
DocumentClear,
@ -63,6 +64,7 @@ impl From<KindWithContent> for AutobatchKind {
primary_key,
..
} => AutobatchKind::DocumentImport { method, allow_index_creation, primary_key },
KindWithContent::DocumentEdition { .. } => AutobatchKind::DocumentEdition,
KindWithContent::DocumentDeletion { .. } => AutobatchKind::DocumentDeletion,
KindWithContent::DocumentClear { .. } => AutobatchKind::DocumentClear,
KindWithContent::DocumentDeletionByFilter { .. } => {
@ -98,6 +100,9 @@ pub enum BatchKind {
primary_key: Option<String>,
operation_ids: Vec<TaskId>,
},
DocumentEdition {
id: TaskId,
},
DocumentDeletion {
deletion_ids: Vec<TaskId>,
},
@ -199,6 +204,7 @@ impl BatchKind {
}),
allow_index_creation,
),
K::DocumentEdition => (Break(BatchKind::DocumentEdition { id: task_id }), false),
K::DocumentDeletion => {
(Continue(BatchKind::DocumentDeletion { deletion_ids: vec![task_id] }), false)
}
@ -222,7 +228,7 @@ impl BatchKind {
match (self, kind) {
// We don't batch any of these operations
(this, K::IndexCreation | K::IndexUpdate | K::IndexSwap | K::DocumentDeletionByFilter) => Break(this),
(this, K::IndexCreation | K::IndexUpdate | K::IndexSwap | K::DocumentEdition | K::DocumentDeletionByFilter) => Break(this),
// We must not batch tasks that don't have the same index creation rights if the index doesn't already exists.
(this, kind) if !index_already_exists && this.allow_index_creation() == Some(false) && kind.allow_index_creation() == Some(true) => {
Break(this)
@ -519,6 +525,7 @@ impl BatchKind {
| BatchKind::IndexDeletion { .. }
| BatchKind::IndexUpdate { .. }
| BatchKind::IndexSwap { .. }
| BatchKind::DocumentEdition { .. }
| BatchKind::DocumentDeletionByFilter { .. },
_,
) => {

View File

@ -103,6 +103,10 @@ pub(crate) enum IndexOperation {
operations: Vec<DocumentOperation>,
tasks: Vec<Task>,
},
DocumentEdition {
index_uid: String,
task: Task,
},
IndexDocumentDeletionByFilter {
index_uid: String,
task: Task,
@ -161,7 +165,8 @@ impl Batch {
| IndexOperation::DocumentClear { tasks, .. } => {
RoaringBitmap::from_iter(tasks.iter().map(|task| task.uid))
}
IndexOperation::IndexDocumentDeletionByFilter { task, .. } => {
IndexOperation::DocumentEdition { task, .. }
| IndexOperation::IndexDocumentDeletionByFilter { task, .. } => {
RoaringBitmap::from_sorted_iter(std::iter::once(task.uid)).unwrap()
}
IndexOperation::SettingsAndDocumentOperation {
@ -225,6 +230,7 @@ impl IndexOperation {
pub fn index_uid(&self) -> &str {
match self {
IndexOperation::DocumentOperation { index_uid, .. }
| IndexOperation::DocumentEdition { index_uid, .. }
| IndexOperation::IndexDocumentDeletionByFilter { index_uid, .. }
| IndexOperation::DocumentClear { index_uid, .. }
| IndexOperation::Settings { index_uid, .. }
@ -240,6 +246,9 @@ impl fmt::Display for IndexOperation {
IndexOperation::DocumentOperation { .. } => {
f.write_str("IndexOperation::DocumentOperation")
}
IndexOperation::DocumentEdition { .. } => {
f.write_str("IndexOperation::DocumentEdition")
}
IndexOperation::IndexDocumentDeletionByFilter { .. } => {
f.write_str("IndexOperation::IndexDocumentDeletionByFilter")
}
@ -292,6 +301,21 @@ impl IndexScheduler {
_ => unreachable!(),
}
}
BatchKind::DocumentEdition { id } => {
let task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
match &task.kind {
KindWithContent::DocumentEdition { index_uid, .. } => {
Ok(Some(Batch::IndexOperation {
op: IndexOperation::DocumentEdition {
index_uid: index_uid.clone(),
task,
},
must_create_index: false,
}))
}
_ => unreachable!(),
}
}
BatchKind::DocumentOperation { method, operation_ids, .. } => {
let tasks = self.get_existing_tasks(rtxn, operation_ids)?;
let primary_key = tasks
@ -1334,6 +1358,56 @@ impl IndexScheduler {
Ok(tasks)
}
IndexOperation::DocumentEdition { mut task, .. } => {
let (filter, edition_code) =
if let KindWithContent::DocumentEdition { filter_expr, edition_code, .. } =
&task.kind
{
(filter_expr, edition_code)
} else {
unreachable!()
};
let edited_documents = edit_documents_by_function(
index_wtxn,
filter,
edition_code,
self.index_mapper.indexer_config(),
self.must_stop_processing.clone(),
index,
);
let (original_filter, edition_code) =
if let Some(Details::DocumentEdition {
original_filter, edition_code, ..
}) = task.details
{
(original_filter, edition_code)
} else {
// In the case of a `documentDeleteByFilter` the details MUST be set
unreachable!();
};
match edited_documents {
Ok(edited_documents) => {
task.status = Status::Succeeded;
task.details = Some(Details::DocumentEdition {
original_filter,
edition_code,
edited_documents: Some(edited_documents),
});
}
Err(e) => {
task.status = Status::Failed;
task.details = Some(Details::DocumentEdition {
original_filter,
edition_code,
edited_documents: Some(0),
});
task.error = Some(e.into());
}
}
Ok(vec![task])
}
IndexOperation::IndexDocumentDeletionByFilter { mut task, index_uid: _ } => {
let filter =
if let KindWithContent::DocumentDeletionByFilter { filter_expr, .. } =
@ -1622,3 +1696,43 @@ fn delete_document_by_filter<'a>(
0
})
}
fn edit_documents_by_function<'a>(
wtxn: &mut RwTxn<'a>,
filter: &Option<serde_json::Value>,
code: &str,
indexer_config: &IndexerConfig,
must_stop_processing: MustStopProcessing,
index: &'a Index,
) -> Result<u64> {
let candidates = match filter.as_ref().map(Filter::from_json) {
Some(Ok(Some(filter))) => filter.evaluate(wtxn, index).map_err(|err| match err {
milli::Error::UserError(milli::UserError::InvalidFilter(_)) => {
Error::from(err).with_custom_error_code(Code::InvalidDocumentFilter)
}
e => e.into(),
})?,
None | Some(Ok(None)) => index.documents_ids(wtxn)?,
Some(Err(e)) => return Err(e.into()),
};
let config = IndexDocumentsConfig {
update_method: IndexDocumentsMethod::ReplaceDocuments,
..Default::default()
};
let mut builder = milli::update::IndexDocuments::new(
wtxn,
index,
indexer_config,
config,
|indexing_step| tracing::debug!(update = ?indexing_step),
|| must_stop_processing.get(),
)?;
let (new_builder, count) = builder.edit_documents(&candidates, code)?;
builder = new_builder;
let _ = builder.execute()?;
Ok(count.unwrap())
}

View File

@ -178,6 +178,13 @@ fn snapshot_details(d: &Details) -> String {
} => {
format!("{{ received_documents: {received_documents}, indexed_documents: {indexed_documents:?} }}")
}
Details::DocumentEdition {
edited_documents,
edition_code,
original_filter,
} => {
format!("{{ edited_documents: {edited_documents:?}, edition_code: {edition_code:?}, original_filter: {original_filter:?} }}")
}
Details::SettingsUpdate { settings } => {
format!("{{ settings: {settings:?} }}")
}

View File

@ -238,6 +238,7 @@ pub fn swap_index_uid_in_task(task: &mut Task, swap: (&str, &str)) {
let mut index_uids = vec![];
match &mut task.kind {
K::DocumentAdditionOrUpdate { index_uid, .. } => index_uids.push(index_uid),
K::DocumentEdition { index_uid, .. } => index_uids.push(index_uid),
K::DocumentDeletion { index_uid, .. } => index_uids.push(index_uid),
K::DocumentDeletionByFilter { index_uid, .. } => index_uids.push(index_uid),
K::DocumentClear { index_uid } => index_uids.push(index_uid),
@ -408,7 +409,26 @@ impl IndexScheduler {
match status {
Status::Succeeded => assert!(indexed_documents <= received_documents),
Status::Failed | Status::Canceled => assert_eq!(indexed_documents, 0),
status => panic!("DocumentAddition can't have an indexed_document set if it's {}", status),
status => panic!("DocumentAddition can't have an indexed_documents set if it's {}", status),
}
}
None => {
assert!(matches!(status, Status::Enqueued | Status::Processing))
}
}
}
Details::DocumentEdition { edited_documents, .. } => {
assert_eq!(kind.as_kind(), Kind::DocumentEdition);
match edited_documents {
Some(edited_documents) => {
assert!(matches!(
status,
Status::Succeeded | Status::Failed | Status::Canceled
));
match status {
Status::Succeeded => (),
Status::Failed | Status::Canceled => assert_eq!(edited_documents, 0),
status => panic!("DocumentEdition can't have an edited_documents set if it's {}", status),
}
}
None => {

View File

@ -54,6 +54,8 @@ pub struct DetailsView {
#[serde(skip_serializing_if = "Option::is_none")]
pub indexed_documents: Option<Option<u64>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub edited_documents: Option<Option<u64>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub primary_key: Option<Option<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub provided_ids: Option<usize>,
@ -70,6 +72,8 @@ pub struct DetailsView {
#[serde(skip_serializing_if = "Option::is_none")]
pub dump_uid: Option<Option<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub edition_code: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(flatten)]
pub settings: Option<Box<Settings<Unchecked>>>,
#[serde(skip_serializing_if = "Option::is_none")]
@ -86,6 +90,14 @@ impl From<Details> for DetailsView {
..DetailsView::default()
}
}
Details::DocumentEdition { edited_documents, original_filter, edition_code } => {
DetailsView {
edited_documents: Some(edited_documents),
original_filter: Some(original_filter),
edition_code: Some(edition_code),
..DetailsView::default()
}
}
Details::SettingsUpdate { mut settings } => {
settings.hide_secrets();
DetailsView { settings: Some(settings), ..DetailsView::default() }

View File

@ -48,6 +48,7 @@ impl Task {
| TaskDeletion { .. }
| IndexSwap { .. } => None,
DocumentAdditionOrUpdate { index_uid, .. }
| DocumentEdition { index_uid, .. }
| DocumentDeletion { index_uid, .. }
| DocumentDeletionByFilter { index_uid, .. }
| DocumentClear { index_uid }
@ -67,7 +68,8 @@ impl Task {
pub fn content_uuid(&self) -> Option<Uuid> {
match self.kind {
KindWithContent::DocumentAdditionOrUpdate { content_file, .. } => Some(content_file),
KindWithContent::DocumentDeletion { .. }
KindWithContent::DocumentEdition { .. }
| KindWithContent::DocumentDeletion { .. }
| KindWithContent::DocumentDeletionByFilter { .. }
| KindWithContent::DocumentClear { .. }
| KindWithContent::SettingsUpdate { .. }
@ -94,6 +96,11 @@ pub enum KindWithContent {
documents_count: u64,
allow_index_creation: bool,
},
DocumentEdition {
index_uid: String,
filter_expr: Option<serde_json::Value>,
edition_code: String,
},
DocumentDeletion {
index_uid: String,
documents_ids: Vec<String>,
@ -150,6 +157,7 @@ impl KindWithContent {
pub fn as_kind(&self) -> Kind {
match self {
KindWithContent::DocumentAdditionOrUpdate { .. } => Kind::DocumentAdditionOrUpdate,
KindWithContent::DocumentEdition { .. } => Kind::DocumentEdition,
KindWithContent::DocumentDeletion { .. } => Kind::DocumentDeletion,
KindWithContent::DocumentDeletionByFilter { .. } => Kind::DocumentDeletion,
KindWithContent::DocumentClear { .. } => Kind::DocumentDeletion,
@ -174,6 +182,7 @@ impl KindWithContent {
| TaskCancelation { .. }
| TaskDeletion { .. } => vec![],
DocumentAdditionOrUpdate { index_uid, .. }
| DocumentEdition { index_uid, .. }
| DocumentDeletion { index_uid, .. }
| DocumentDeletionByFilter { index_uid, .. }
| DocumentClear { index_uid }
@ -202,6 +211,13 @@ impl KindWithContent {
indexed_documents: None,
})
}
KindWithContent::DocumentEdition { index_uid: _, edition_code, filter_expr } => {
Some(Details::DocumentEdition {
edited_documents: None,
original_filter: filter_expr.as_ref().map(|v| v.to_string()),
edition_code: edition_code.clone(),
})
}
KindWithContent::DocumentDeletion { index_uid: _, documents_ids } => {
Some(Details::DocumentDeletion {
provided_ids: documents_ids.len(),
@ -250,6 +266,13 @@ impl KindWithContent {
indexed_documents: Some(0),
})
}
KindWithContent::DocumentEdition { index_uid: _, filter_expr, edition_code } => {
Some(Details::DocumentEdition {
edited_documents: Some(0),
original_filter: filter_expr.as_ref().map(|v| v.to_string()),
edition_code: edition_code.clone(),
})
}
KindWithContent::DocumentDeletion { index_uid: _, documents_ids } => {
Some(Details::DocumentDeletion {
provided_ids: documents_ids.len(),
@ -301,6 +324,7 @@ impl From<&KindWithContent> for Option<Details> {
indexed_documents: None,
})
}
KindWithContent::DocumentEdition { .. } => None,
KindWithContent::DocumentDeletion { .. } => None,
KindWithContent::DocumentDeletionByFilter { .. } => None,
KindWithContent::DocumentClear { .. } => None,
@ -394,6 +418,7 @@ impl std::error::Error for ParseTaskStatusError {}
#[serde(rename_all = "camelCase")]
pub enum Kind {
DocumentAdditionOrUpdate,
DocumentEdition,
DocumentDeletion,
SettingsUpdate,
IndexCreation,
@ -410,6 +435,7 @@ impl Kind {
pub fn related_to_one_index(&self) -> bool {
match self {
Kind::DocumentAdditionOrUpdate
| Kind::DocumentEdition
| Kind::DocumentDeletion
| Kind::SettingsUpdate
| Kind::IndexCreation
@ -427,6 +453,7 @@ impl Display for Kind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Kind::DocumentAdditionOrUpdate => write!(f, "documentAdditionOrUpdate"),
Kind::DocumentEdition => write!(f, "documentEdition"),
Kind::DocumentDeletion => write!(f, "documentDeletion"),
Kind::SettingsUpdate => write!(f, "settingsUpdate"),
Kind::IndexCreation => write!(f, "indexCreation"),
@ -454,6 +481,8 @@ impl FromStr for Kind {
Ok(Kind::IndexDeletion)
} else if kind.eq_ignore_ascii_case("documentAdditionOrUpdate") {
Ok(Kind::DocumentAdditionOrUpdate)
} else if kind.eq_ignore_ascii_case("documentEdition") {
Ok(Kind::DocumentEdition)
} else if kind.eq_ignore_ascii_case("documentDeletion") {
Ok(Kind::DocumentDeletion)
} else if kind.eq_ignore_ascii_case("settingsUpdate") {
@ -495,16 +524,48 @@ impl std::error::Error for ParseTaskKindError {}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub enum Details {
DocumentAdditionOrUpdate { received_documents: u64, indexed_documents: Option<u64> },
SettingsUpdate { settings: Box<Settings<Unchecked>> },
IndexInfo { primary_key: Option<String> },
DocumentDeletion { provided_ids: usize, deleted_documents: Option<u64> },
DocumentDeletionByFilter { original_filter: String, deleted_documents: Option<u64> },
ClearAll { deleted_documents: Option<u64> },
TaskCancelation { matched_tasks: u64, canceled_tasks: Option<u64>, original_filter: String },
TaskDeletion { matched_tasks: u64, deleted_tasks: Option<u64>, original_filter: String },
Dump { dump_uid: Option<String> },
IndexSwap { swaps: Vec<IndexSwap> },
DocumentAdditionOrUpdate {
received_documents: u64,
indexed_documents: Option<u64>,
},
DocumentEdition {
edited_documents: Option<u64>,
original_filter: Option<String>,
edition_code: String,
},
SettingsUpdate {
settings: Box<Settings<Unchecked>>,
},
IndexInfo {
primary_key: Option<String>,
},
DocumentDeletion {
provided_ids: usize,
deleted_documents: Option<u64>,
},
DocumentDeletionByFilter {
original_filter: String,
deleted_documents: Option<u64>,
},
ClearAll {
deleted_documents: Option<u64>,
},
TaskCancelation {
matched_tasks: u64,
canceled_tasks: Option<u64>,
original_filter: String,
},
TaskDeletion {
matched_tasks: u64,
deleted_tasks: Option<u64>,
original_filter: String,
},
Dump {
dump_uid: Option<String>,
},
IndexSwap {
swaps: Vec<IndexSwap>,
},
}
impl Details {
@ -514,6 +575,7 @@ impl Details {
Self::DocumentAdditionOrUpdate { indexed_documents, .. } => {
*indexed_documents = Some(0)
}
Self::DocumentEdition { edited_documents, .. } => *edited_documents = Some(0),
Self::DocumentDeletion { deleted_documents, .. } => *deleted_documents = Some(0),
Self::DocumentDeletionByFilter { deleted_documents, .. } => {
*deleted_documents = Some(0)

View File

@ -81,6 +81,7 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
web::resource("/delete-batch").route(web::post().to(SeqHandler(delete_documents_batch))),
)
.service(web::resource("/delete").route(web::post().to(SeqHandler(delete_documents_by_filter))))
.service(web::resource("/edit").route(web::post().to(SeqHandler(edit_documents_by_function))))
.service(web::resource("/fetch").route(web::post().to(SeqHandler(documents_by_query_post))))
.service(
web::resource("/{document_id}")
@ -553,6 +554,57 @@ pub async fn delete_documents_by_filter(
Ok(HttpResponse::Accepted().json(task))
}
#[derive(Debug, Deserr)]
#[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)]
pub struct DocumentEditionByFunction {
#[deserr(default, error = DeserrJsonError<InvalidDocumentFilter>)]
filter: Option<Value>,
#[deserr(error = DeserrJsonError<InvalidDocumentFilter>, missing_field_error = DeserrJsonError::missing_document_filter)]
function: String,
}
pub async fn edit_documents_by_function(
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_ADD }>, Data<IndexScheduler>>,
index_uid: web::Path<String>,
body: AwebJson<DocumentEditionByFunction, DeserrJsonError>,
req: HttpRequest,
opt: web::Data<Opt>,
_analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> {
debug!(parameters = ?body, "Edit documents by function");
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
let index_uid = index_uid.into_inner();
let DocumentEditionByFunction { filter, function } = body.into_inner();
// analytics.delete_documents(DocumentDeletionKind::PerFilter, &req);
let engine = milli::rhai::Engine::new();
if let Err(e) = engine.compile(&function) {
return Err(ResponseError::from_msg(e.to_string(), Code::BadRequest));
}
if let Some(ref filter) = filter {
// we ensure the filter is well formed before enqueuing it
|| -> Result<_, ResponseError> {
Ok(crate::search::parse_filter(filter)?.ok_or(MeilisearchHttpError::EmptyFilter)?)
}()
// and whatever was the error, the error code should always be an InvalidDocumentFilter
.map_err(|err| ResponseError::from_msg(err.message, Code::InvalidDocumentFilter))?;
}
let task =
KindWithContent::DocumentEdition { index_uid, filter_expr: filter, edition_code: function };
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
debug!(returns = ?task, "Delete documents by filter");
Ok(HttpResponse::Accepted().json(task))
}
pub async fn clear_all_documents(
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>,
index_uid: web::Path<String>,

View File

@ -87,6 +87,7 @@ rand = "0.8.5"
tracing = "0.1.40"
ureq = { version = "2.9.7", features = ["json"] }
url = "2.5.0"
rhai = { version = "1.18.0", features = ["serde", "no_module", "no_custom_syntax"] }
[dev-dependencies]
mimalloc = { version = "0.1.39", default-features = false }

View File

@ -44,7 +44,7 @@ pub use search::new::{
};
use serde_json::Value;
pub use thread_pool_no_abort::{PanicCatched, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
pub use {charabia as tokenizer, heed};
pub use {charabia as tokenizer, heed, rhai};
pub use self::asc_desc::{AscDesc, AscDescError, Member, SortError};
pub use self::criterion::{default_criteria, Criterion, CriterionError};

View File

@ -15,6 +15,7 @@ use grenad::{Merger, MergerBuilder};
use heed::types::Str;
use heed::Database;
use rand::SeedableRng;
use rhai::{Dynamic, Engine, Scope};
use roaring::RoaringBitmap;
use serde::{Deserialize, Serialize};
use slice_group_by::GroupBy;
@ -31,7 +32,7 @@ pub use self::helpers::{
};
use self::helpers::{grenad_obkv_into_chunks, GrenadParameters};
pub use self::transform::{Transform, TransformOutput};
use crate::documents::{obkv_to_object, DocumentsBatchReader};
use crate::documents::{obkv_to_object, DocumentsBatchBuilder, DocumentsBatchReader};
use crate::error::{Error, InternalError, UserError};
use crate::thread_pool_no_abort::ThreadPoolNoAbortBuilder;
pub use crate::update::index_documents::helpers::CursorClonableMmap;
@ -39,7 +40,7 @@ use crate::update::{
IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst,
};
use crate::vector::EmbeddingConfigs;
use crate::{CboRoaringBitmapCodec, Index, Result};
use crate::{all_obkv_to_json, CboRoaringBitmapCodec, FieldsIdsMap, Index, Object, Result};
static MERGED_DATABASE_COUNT: usize = 7;
static PREFIX_DATABASE_COUNT: usize = 4;
@ -173,6 +174,93 @@ where
Ok((self, Ok(indexed_documents)))
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing::documents")]
pub fn edit_documents(
self,
documents: &RoaringBitmap,
code: &str,
) -> Result<(Self, StdResult<u64, UserError>)> {
// Early return when there is no document to add
if documents.is_empty() {
return Ok((self, Ok(0)));
}
/// Transform every field of a raw obkv store into a Rhai Map.
pub fn all_obkv_to_rhaimap(
obkv: obkv::KvReaderU16,
fields_ids_map: &FieldsIdsMap,
) -> Result<rhai::Map> {
let all_keys = obkv.iter().map(|(k, _v)| k).collect::<Vec<_>>();
all_keys
.iter()
.copied()
.flat_map(|id| obkv.get(id).map(|value| (id, value)))
.map(|(id, value)| {
let name = fields_ids_map.name(id).ok_or(
crate::error::FieldIdMapMissingEntry::FieldId {
field_id: id,
process: "allobkv_to_rhaimap",
},
)?;
let value = serde_json::from_slice(value)
.map_err(crate::error::InternalError::SerdeJson)?;
Ok((name.into(), value))
})
.collect()
}
fn rhaimap_to_object(map: rhai::Map) -> Object {
let mut output = Object::new();
for (key, value) in map {
let value = serde_json::to_value(&value).unwrap();
output.insert(key.into(), value);
}
output
}
let engine = Engine::new();
let ast = engine.compile(code).unwrap();
let fields_ids_map = self.index.fields_ids_map(self.wtxn)?;
let primary_key = self.index.primary_key(self.wtxn)?.unwrap();
let primary_key_id = fields_ids_map.id(primary_key).unwrap();
let mut documents_batch_builder = tempfile::tempfile().map(DocumentsBatchBuilder::new)?;
for docid in documents {
let (document, document_object, document_id) =
match self.index.documents.get(self.wtxn, &docid)? {
Some(obkv) => {
let document_id_bytes = obkv.get(primary_key_id).unwrap();
let document_id: serde_json::Value =
serde_json::from_slice(document_id_bytes).unwrap();
let document = all_obkv_to_rhaimap(obkv, &fields_ids_map)?;
let document_object = all_obkv_to_json(obkv, &fields_ids_map)?;
(document, document_object, document_id)
}
None => panic!("documents must exist"),
};
let mut scope = Scope::new();
scope.push("doc", document);
let _ = engine.eval_ast_with_scope::<Dynamic>(&mut scope, &ast).unwrap();
let new_document = scope.remove("doc").unwrap();
let new_document = rhaimap_to_object(new_document);
if document_object != new_document {
assert_eq!(
Some(&document_id),
new_document.get(primary_key),
"you cannot change the document id when editing documents"
);
documents_batch_builder.append_json_object(&new_document)?;
}
}
let file = documents_batch_builder.into_inner()?;
let reader = DocumentsBatchReader::from_reader(file)?;
self.add_documents(reader)
}
pub fn with_embedders(mut self, embedders: EmbeddingConfigs) -> Self {
self.embedders = embedders;
self