mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-12-02 02:35:36 +00:00
Compare commits
11 Commits
prototype-
...
prototype-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
376cfefff7 | ||
|
|
5af3c7e758 | ||
|
|
29fe2f2e64 | ||
|
|
39edf8a0d3 | ||
|
|
7106c6e203 | ||
|
|
1a07d2e780 | ||
|
|
8dff899954 | ||
|
|
133674751f | ||
|
|
9183ad47a7 | ||
|
|
6d5b79bc1f | ||
|
|
082b171d45 |
10
Cargo.lock
generated
10
Cargo.lock
generated
@@ -3536,6 +3536,7 @@ dependencies = [
|
||||
"rand",
|
||||
"rand_pcg",
|
||||
"rayon",
|
||||
"rayon-par-bridge",
|
||||
"rhai",
|
||||
"roaring",
|
||||
"rstar",
|
||||
@@ -4332,6 +4333,15 @@ dependencies = [
|
||||
"crossbeam-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rayon-par-bridge"
|
||||
version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cb6a14d8f65834aca6b0fe4cbbd7a27e639cd3efb1f2a32de9942368f1991de8"
|
||||
dependencies = [
|
||||
"rayon",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "reborrow"
|
||||
version = "0.5.5"
|
||||
|
||||
@@ -31,7 +31,7 @@ use meilisearch_types::milli::heed::CompactionOption;
|
||||
use meilisearch_types::milli::update::{
|
||||
IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings as MilliSettings,
|
||||
};
|
||||
use meilisearch_types::milli::{self, Filter};
|
||||
use meilisearch_types::milli::{self, Filter, Object};
|
||||
use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked};
|
||||
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task};
|
||||
use meilisearch_types::{compression, Index, VERSION_FILE_NAME};
|
||||
@@ -1359,39 +1359,45 @@ impl IndexScheduler {
|
||||
Ok(tasks)
|
||||
}
|
||||
IndexOperation::DocumentEdition { mut task, .. } => {
|
||||
let (filter, edition_code) =
|
||||
if let KindWithContent::DocumentEdition { filter_expr, edition_code, .. } =
|
||||
&task.kind
|
||||
let (filter, context, function) =
|
||||
if let KindWithContent::DocumentEdition {
|
||||
filter_expr, context, function, ..
|
||||
} = &task.kind
|
||||
{
|
||||
(filter_expr, edition_code)
|
||||
(filter_expr, context, function)
|
||||
} else {
|
||||
unreachable!()
|
||||
};
|
||||
let edited_documents = edit_documents_by_function(
|
||||
let result_count = edit_documents_by_function(
|
||||
index_wtxn,
|
||||
filter,
|
||||
edition_code,
|
||||
context.clone(),
|
||||
function,
|
||||
self.index_mapper.indexer_config(),
|
||||
self.must_stop_processing.clone(),
|
||||
index,
|
||||
);
|
||||
let (original_filter, edition_code) =
|
||||
if let Some(Details::DocumentEdition {
|
||||
original_filter, edition_code, ..
|
||||
}) = task.details
|
||||
{
|
||||
(original_filter, edition_code)
|
||||
} else {
|
||||
// In the case of a `documentDeleteByFilter` the details MUST be set
|
||||
unreachable!();
|
||||
};
|
||||
let (original_filter, context, function) = if let Some(Details::DocumentEdition {
|
||||
original_filter,
|
||||
context,
|
||||
function,
|
||||
..
|
||||
}) = task.details
|
||||
{
|
||||
(original_filter, context, function)
|
||||
} else {
|
||||
// In the case of a `documentDeleteByFilter` the details MUST be set
|
||||
unreachable!();
|
||||
};
|
||||
|
||||
match edited_documents {
|
||||
Ok(edited_documents) => {
|
||||
match result_count {
|
||||
Ok((deleted_documents, edited_documents)) => {
|
||||
task.status = Status::Succeeded;
|
||||
task.details = Some(Details::DocumentEdition {
|
||||
original_filter,
|
||||
edition_code,
|
||||
context,
|
||||
function,
|
||||
deleted_documents: Some(deleted_documents),
|
||||
edited_documents: Some(edited_documents),
|
||||
});
|
||||
}
|
||||
@@ -1399,7 +1405,9 @@ impl IndexScheduler {
|
||||
task.status = Status::Failed;
|
||||
task.details = Some(Details::DocumentEdition {
|
||||
original_filter,
|
||||
edition_code,
|
||||
context,
|
||||
function,
|
||||
deleted_documents: Some(0),
|
||||
edited_documents: Some(0),
|
||||
});
|
||||
task.error = Some(e.into());
|
||||
@@ -1700,11 +1708,12 @@ fn delete_document_by_filter<'a>(
|
||||
fn edit_documents_by_function<'a>(
|
||||
wtxn: &mut RwTxn<'a>,
|
||||
filter: &Option<serde_json::Value>,
|
||||
context: Option<Object>,
|
||||
code: &str,
|
||||
indexer_config: &IndexerConfig,
|
||||
must_stop_processing: MustStopProcessing,
|
||||
index: &'a Index,
|
||||
) -> Result<u64> {
|
||||
) -> Result<(u64, u64)> {
|
||||
let candidates = match filter.as_ref().map(Filter::from_json) {
|
||||
Some(Ok(Some(filter))) => filter.evaluate(wtxn, index).map_err(|err| match err {
|
||||
milli::Error::UserError(milli::UserError::InvalidFilter(_)) => {
|
||||
@@ -1730,7 +1739,7 @@ fn edit_documents_by_function<'a>(
|
||||
|| must_stop_processing.get(),
|
||||
)?;
|
||||
|
||||
let (new_builder, count) = builder.edit_documents(&candidates, code)?;
|
||||
let (new_builder, count) = builder.edit_documents(&candidates, context, code)?;
|
||||
builder = new_builder;
|
||||
|
||||
let _ = builder.execute()?;
|
||||
|
||||
@@ -179,11 +179,15 @@ fn snapshot_details(d: &Details) -> String {
|
||||
format!("{{ received_documents: {received_documents}, indexed_documents: {indexed_documents:?} }}")
|
||||
}
|
||||
Details::DocumentEdition {
|
||||
deleted_documents,
|
||||
edited_documents,
|
||||
edition_code,
|
||||
original_filter,
|
||||
context,
|
||||
function,
|
||||
} => {
|
||||
format!("{{ edited_documents: {edited_documents:?}, edition_code: {edition_code:?}, original_filter: {original_filter:?} }}")
|
||||
format!(
|
||||
"{{ deleted_documents: {deleted_documents:?}, edited_documents: {edited_documents:?}, context: {context:?}, function: {function:?}, original_filter: {original_filter:?} }}"
|
||||
)
|
||||
}
|
||||
Details::SettingsUpdate { settings } => {
|
||||
format!("{{ settings: {settings:?} }}")
|
||||
|
||||
@@ -4749,6 +4749,7 @@ mod tests {
|
||||
"types": {
|
||||
"documentAdditionOrUpdate": 0,
|
||||
"documentDeletion": 0,
|
||||
"documentEdition": 0,
|
||||
"dumpCreation": 0,
|
||||
"indexCreation": 3,
|
||||
"indexDeletion": 0,
|
||||
@@ -4780,6 +4781,7 @@ mod tests {
|
||||
"types": {
|
||||
"documentAdditionOrUpdate": 0,
|
||||
"documentDeletion": 0,
|
||||
"documentEdition": 0,
|
||||
"dumpCreation": 0,
|
||||
"indexCreation": 3,
|
||||
"indexDeletion": 0,
|
||||
@@ -4818,6 +4820,7 @@ mod tests {
|
||||
"types": {
|
||||
"documentAdditionOrUpdate": 0,
|
||||
"documentDeletion": 0,
|
||||
"documentEdition": 0,
|
||||
"dumpCreation": 0,
|
||||
"indexCreation": 3,
|
||||
"indexDeletion": 0,
|
||||
@@ -4857,6 +4860,7 @@ mod tests {
|
||||
"types": {
|
||||
"documentAdditionOrUpdate": 0,
|
||||
"documentDeletion": 0,
|
||||
"documentEdition": 0,
|
||||
"dumpCreation": 0,
|
||||
"indexCreation": 3,
|
||||
"indexDeletion": 0,
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use milli::Object;
|
||||
use serde::Serialize;
|
||||
use time::{Duration, OffsetDateTime};
|
||||
|
||||
@@ -72,7 +73,9 @@ pub struct DetailsView {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub dump_uid: Option<Option<String>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub edition_code: Option<String>,
|
||||
pub context: Option<Option<Object>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub function: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(flatten)]
|
||||
pub settings: Option<Box<Settings<Unchecked>>>,
|
||||
@@ -90,14 +93,20 @@ impl From<Details> for DetailsView {
|
||||
..DetailsView::default()
|
||||
}
|
||||
}
|
||||
Details::DocumentEdition { edited_documents, original_filter, edition_code } => {
|
||||
DetailsView {
|
||||
edited_documents: Some(edited_documents),
|
||||
original_filter: Some(original_filter),
|
||||
edition_code: Some(edition_code),
|
||||
..DetailsView::default()
|
||||
}
|
||||
}
|
||||
Details::DocumentEdition {
|
||||
deleted_documents,
|
||||
edited_documents,
|
||||
original_filter,
|
||||
context,
|
||||
function,
|
||||
} => DetailsView {
|
||||
deleted_documents: Some(deleted_documents),
|
||||
edited_documents: Some(edited_documents),
|
||||
original_filter: Some(original_filter),
|
||||
context: Some(context),
|
||||
function: Some(function),
|
||||
..DetailsView::default()
|
||||
},
|
||||
Details::SettingsUpdate { mut settings } => {
|
||||
settings.hide_secrets();
|
||||
DetailsView { settings: Some(settings), ..DetailsView::default() }
|
||||
|
||||
@@ -5,6 +5,7 @@ use std::str::FromStr;
|
||||
|
||||
use enum_iterator::Sequence;
|
||||
use milli::update::IndexDocumentsMethod;
|
||||
use milli::Object;
|
||||
use roaring::RoaringBitmap;
|
||||
use serde::{Deserialize, Serialize, Serializer};
|
||||
use time::{Duration, OffsetDateTime};
|
||||
@@ -99,7 +100,8 @@ pub enum KindWithContent {
|
||||
DocumentEdition {
|
||||
index_uid: String,
|
||||
filter_expr: Option<serde_json::Value>,
|
||||
edition_code: String,
|
||||
context: Option<milli::Object>,
|
||||
function: String,
|
||||
},
|
||||
DocumentDeletion {
|
||||
index_uid: String,
|
||||
@@ -211,11 +213,13 @@ impl KindWithContent {
|
||||
indexed_documents: None,
|
||||
})
|
||||
}
|
||||
KindWithContent::DocumentEdition { index_uid: _, edition_code, filter_expr } => {
|
||||
KindWithContent::DocumentEdition { index_uid: _, filter_expr, context, function } => {
|
||||
Some(Details::DocumentEdition {
|
||||
deleted_documents: None,
|
||||
edited_documents: None,
|
||||
original_filter: filter_expr.as_ref().map(|v| v.to_string()),
|
||||
edition_code: edition_code.clone(),
|
||||
context: context.clone(),
|
||||
function: function.clone(),
|
||||
})
|
||||
}
|
||||
KindWithContent::DocumentDeletion { index_uid: _, documents_ids } => {
|
||||
@@ -266,11 +270,13 @@ impl KindWithContent {
|
||||
indexed_documents: Some(0),
|
||||
})
|
||||
}
|
||||
KindWithContent::DocumentEdition { index_uid: _, filter_expr, edition_code } => {
|
||||
KindWithContent::DocumentEdition { index_uid: _, filter_expr, context, function } => {
|
||||
Some(Details::DocumentEdition {
|
||||
deleted_documents: Some(0),
|
||||
edited_documents: Some(0),
|
||||
original_filter: filter_expr.as_ref().map(|v| v.to_string()),
|
||||
edition_code: edition_code.clone(),
|
||||
context: context.clone(),
|
||||
function: function.clone(),
|
||||
})
|
||||
}
|
||||
KindWithContent::DocumentDeletion { index_uid: _, documents_ids } => {
|
||||
@@ -529,9 +535,11 @@ pub enum Details {
|
||||
indexed_documents: Option<u64>,
|
||||
},
|
||||
DocumentEdition {
|
||||
deleted_documents: Option<u64>,
|
||||
edited_documents: Option<u64>,
|
||||
original_filter: Option<String>,
|
||||
edition_code: String,
|
||||
context: Option<Object>,
|
||||
function: String,
|
||||
},
|
||||
SettingsUpdate {
|
||||
settings: Box<Settings<Unchecked>>,
|
||||
|
||||
@@ -559,12 +559,14 @@ pub async fn delete_documents_by_filter(
|
||||
pub struct DocumentEditionByFunction {
|
||||
#[deserr(default, error = DeserrJsonError<InvalidDocumentFilter>)]
|
||||
filter: Option<Value>,
|
||||
#[deserr(default, error = DeserrJsonError<InvalidDocumentFilter>)]
|
||||
context: Option<Value>,
|
||||
#[deserr(error = DeserrJsonError<InvalidDocumentFilter>, missing_field_error = DeserrJsonError::missing_document_filter)]
|
||||
function: String,
|
||||
}
|
||||
|
||||
pub async fn edit_documents_by_function(
|
||||
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_ADD }>, Data<IndexScheduler>>,
|
||||
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_ALL }>, Data<IndexScheduler>>,
|
||||
index_uid: web::Path<String>,
|
||||
body: AwebJson<DocumentEditionByFunction, DeserrJsonError>,
|
||||
req: HttpRequest,
|
||||
@@ -574,7 +576,7 @@ pub async fn edit_documents_by_function(
|
||||
debug!(parameters = ?body, "Edit documents by function");
|
||||
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
||||
let index_uid = index_uid.into_inner();
|
||||
let DocumentEditionByFunction { filter, function } = body.into_inner();
|
||||
let DocumentEditionByFunction { filter, context, function } = body.into_inner();
|
||||
|
||||
// analytics.delete_documents(DocumentDeletionKind::PerFilter, &req);
|
||||
|
||||
@@ -591,8 +593,15 @@ pub async fn edit_documents_by_function(
|
||||
// and whatever was the error, the error code should always be an InvalidDocumentFilter
|
||||
.map_err(|err| ResponseError::from_msg(err.message, Code::InvalidDocumentFilter))?;
|
||||
}
|
||||
let task =
|
||||
KindWithContent::DocumentEdition { index_uid, filter_expr: filter, edition_code: function };
|
||||
let task = KindWithContent::DocumentEdition {
|
||||
index_uid,
|
||||
filter_expr: filter,
|
||||
context: context.map(|v| match v {
|
||||
serde_json::Value::Object(m) => m,
|
||||
_ => panic!("The context must be an Object"),
|
||||
}),
|
||||
function,
|
||||
};
|
||||
|
||||
let uid = get_task_id(&req, &opt)?;
|
||||
let dry_run = is_dry_run(&req, &opt)?;
|
||||
|
||||
@@ -591,7 +591,7 @@ mod tests {
|
||||
let err = deserr_query_params::<TaskDeletionOrCancelationQuery>(params).unwrap_err();
|
||||
snapshot!(meili_snap::json_string!(err), @r###"
|
||||
{
|
||||
"message": "Invalid value in parameter `types`: `createIndex` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`.",
|
||||
"message": "Invalid value in parameter `types`: `createIndex` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`.",
|
||||
"code": "invalid_task_types",
|
||||
"type": "invalid_request",
|
||||
"link": "https://docs.meilisearch.com/errors#invalid_task_types"
|
||||
|
||||
@@ -97,7 +97,7 @@ async fn task_bad_types() {
|
||||
snapshot!(code, @"400 Bad Request");
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`.",
|
||||
"message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`.",
|
||||
"code": "invalid_task_types",
|
||||
"type": "invalid_request",
|
||||
"link": "https://docs.meilisearch.com/errors#invalid_task_types"
|
||||
@@ -108,7 +108,7 @@ async fn task_bad_types() {
|
||||
snapshot!(code, @"400 Bad Request");
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`.",
|
||||
"message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`.",
|
||||
"code": "invalid_task_types",
|
||||
"type": "invalid_request",
|
||||
"link": "https://docs.meilisearch.com/errors#invalid_task_types"
|
||||
@@ -119,7 +119,7 @@ async fn task_bad_types() {
|
||||
snapshot!(code, @"400 Bad Request");
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`.",
|
||||
"message": "Invalid value in parameter `types`: `doggo` is not a valid task type. Available types are `documentAdditionOrUpdate`, `documentEdition`, `documentDeletion`, `settingsUpdate`, `indexCreation`, `indexDeletion`, `indexUpdate`, `indexSwap`, `taskCancelation`, `taskDeletion`, `dumpCreation`, `snapshotCreation`.",
|
||||
"code": "invalid_task_types",
|
||||
"type": "invalid_request",
|
||||
"link": "https://docs.meilisearch.com/errors#invalid_task_types"
|
||||
|
||||
@@ -82,12 +82,13 @@ hf-hub = { git = "https://github.com/dureuill/hf-hub.git", branch = "rust_tls",
|
||||
] }
|
||||
tiktoken-rs = "0.5.8"
|
||||
liquid = "0.26.4"
|
||||
rhai = { version = "1.18.0", features = ["serde", "no_module", "no_custom_syntax", "no_time", "sync"] }
|
||||
arroy = "0.2.0"
|
||||
rand = "0.8.5"
|
||||
tracing = "0.1.40"
|
||||
ureq = { version = "2.9.7", features = ["json"] }
|
||||
url = "2.5.0"
|
||||
rhai = { version = "1.18.0", features = ["serde", "no_module", "no_custom_syntax"] }
|
||||
rayon-par-bridge = "0.1.0"
|
||||
|
||||
[dev-dependencies]
|
||||
mimalloc = { version = "0.1.39", default-features = false }
|
||||
|
||||
@@ -195,7 +195,7 @@ mod tests {
|
||||
fn merge_cbo_roaring_bitmaps() {
|
||||
let mut buffer = Vec::new();
|
||||
|
||||
let small_data = vec![
|
||||
let small_data = [
|
||||
RoaringBitmap::from_sorted_iter(1..4).unwrap(),
|
||||
RoaringBitmap::from_sorted_iter(2..5).unwrap(),
|
||||
RoaringBitmap::from_sorted_iter(4..6).unwrap(),
|
||||
@@ -209,7 +209,7 @@ mod tests {
|
||||
let expected = RoaringBitmap::from_sorted_iter(1..6).unwrap();
|
||||
assert_eq!(bitmap, expected);
|
||||
|
||||
let medium_data = vec![
|
||||
let medium_data = [
|
||||
RoaringBitmap::from_sorted_iter(1..4).unwrap(),
|
||||
RoaringBitmap::from_sorted_iter(2..5).unwrap(),
|
||||
RoaringBitmap::from_sorted_iter(4..8).unwrap(),
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
mod enrich;
|
||||
mod extract;
|
||||
mod helpers;
|
||||
mod parallel;
|
||||
mod transform;
|
||||
mod typed_chunk;
|
||||
|
||||
@@ -15,7 +16,8 @@ use grenad::{Merger, MergerBuilder};
|
||||
use heed::types::Str;
|
||||
use heed::Database;
|
||||
use rand::SeedableRng;
|
||||
use rhai::{Dynamic, Engine, Scope};
|
||||
use rayon::iter::{ParallelBridge, ParallelIterator};
|
||||
use rhai::{Dynamic, Engine, OptimizationLevel, Scope};
|
||||
use roaring::RoaringBitmap;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use slice_group_by::GroupBy;
|
||||
@@ -36,11 +38,12 @@ use crate::documents::{obkv_to_object, DocumentsBatchBuilder, DocumentsBatchRead
|
||||
use crate::error::{Error, InternalError, UserError};
|
||||
use crate::thread_pool_no_abort::ThreadPoolNoAbortBuilder;
|
||||
pub use crate::update::index_documents::helpers::CursorClonableMmap;
|
||||
use crate::update::index_documents::parallel::ImmutableObkvs;
|
||||
use crate::update::{
|
||||
IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst,
|
||||
};
|
||||
use crate::vector::EmbeddingConfigs;
|
||||
use crate::{all_obkv_to_json, CboRoaringBitmapCodec, FieldsIdsMap, Index, Object, Result};
|
||||
use crate::{CboRoaringBitmapCodec, Index, Object, Result};
|
||||
|
||||
static MERGED_DATABASE_COUNT: usize = 7;
|
||||
static PREFIX_DATABASE_COUNT: usize = 4;
|
||||
@@ -178,35 +181,12 @@ where
|
||||
pub fn edit_documents(
|
||||
self,
|
||||
documents: &RoaringBitmap,
|
||||
context: Option<Object>,
|
||||
code: &str,
|
||||
) -> Result<(Self, StdResult<u64, UserError>)> {
|
||||
) -> Result<(Self, StdResult<(u64, u64), UserError>)> {
|
||||
// Early return when there is no document to add
|
||||
if documents.is_empty() {
|
||||
return Ok((self, Ok(0)));
|
||||
}
|
||||
|
||||
/// Transform every field of a raw obkv store into a Rhai Map.
|
||||
pub fn all_obkv_to_rhaimap(
|
||||
obkv: obkv::KvReaderU16,
|
||||
fields_ids_map: &FieldsIdsMap,
|
||||
) -> Result<rhai::Map> {
|
||||
let all_keys = obkv.iter().map(|(k, _v)| k).collect::<Vec<_>>();
|
||||
all_keys
|
||||
.iter()
|
||||
.copied()
|
||||
.flat_map(|id| obkv.get(id).map(|value| (id, value)))
|
||||
.map(|(id, value)| {
|
||||
let name = fields_ids_map.name(id).ok_or(
|
||||
crate::error::FieldIdMapMissingEntry::FieldId {
|
||||
field_id: id,
|
||||
process: "allobkv_to_rhaimap",
|
||||
},
|
||||
)?;
|
||||
let value = serde_json::from_slice(value)
|
||||
.map_err(crate::error::InternalError::SerdeJson)?;
|
||||
Ok((name.into(), value))
|
||||
})
|
||||
.collect()
|
||||
return Ok((self, Ok((0, 0))));
|
||||
}
|
||||
|
||||
fn rhaimap_to_object(map: rhai::Map) -> Object {
|
||||
@@ -218,47 +198,98 @@ where
|
||||
output
|
||||
}
|
||||
|
||||
let engine = Engine::new();
|
||||
let mut engine = Engine::new();
|
||||
engine.set_optimization_level(OptimizationLevel::Full);
|
||||
// It is an arbitrary value. We need to let users define this in the settings.
|
||||
engine.set_max_operations(1_000_000);
|
||||
|
||||
let ast = engine.compile(code).unwrap();
|
||||
let fields_ids_map = self.index.fields_ids_map(self.wtxn)?;
|
||||
let primary_key = self.index.primary_key(self.wtxn)?.unwrap();
|
||||
let primary_key_id = fields_ids_map.id(primary_key).unwrap();
|
||||
let mut documents_batch_builder = tempfile::tempfile().map(DocumentsBatchBuilder::new)?;
|
||||
let mut documents_to_remove = RoaringBitmap::new();
|
||||
|
||||
for docid in documents {
|
||||
let (document, document_object, document_id) =
|
||||
match self.index.documents.get(self.wtxn, &docid)? {
|
||||
Some(obkv) => {
|
||||
let document_id_bytes = obkv.get(primary_key_id).unwrap();
|
||||
let document_id: serde_json::Value =
|
||||
serde_json::from_slice(document_id_bytes).unwrap();
|
||||
let document = all_obkv_to_rhaimap(obkv, &fields_ids_map)?;
|
||||
let document_object = all_obkv_to_json(obkv, &fields_ids_map)?;
|
||||
(document, document_object, document_id)
|
||||
}
|
||||
None => panic!("documents must exist"),
|
||||
};
|
||||
let context: Dynamic = match context {
|
||||
Some(context) => serde_json::from_value(context.into()).unwrap(),
|
||||
None => Dynamic::from(()),
|
||||
};
|
||||
|
||||
enum DocumentEdition {
|
||||
Deleted(crate::DocumentId),
|
||||
Edited(Object),
|
||||
Nothing,
|
||||
}
|
||||
|
||||
let immutable_obkvs = ImmutableObkvs::new(
|
||||
self.wtxn,
|
||||
self.index.documents,
|
||||
fields_ids_map.clone(),
|
||||
documents.clone(),
|
||||
)?;
|
||||
|
||||
let processing = documents.into_iter().par_bridge().map(|docid| {
|
||||
let rhai_document = immutable_obkvs.rhai_map(docid)?.unwrap();
|
||||
let json_document = immutable_obkvs.json_map(docid)?.unwrap();
|
||||
let document_id = &json_document[primary_key];
|
||||
|
||||
let mut scope = Scope::new();
|
||||
scope.push("doc", document);
|
||||
scope.push_constant_dynamic("context", context.clone());
|
||||
scope.push("doc", rhai_document);
|
||||
let _ = engine.eval_ast_with_scope::<Dynamic>(&mut scope, &ast).unwrap();
|
||||
let new_document = scope.remove("doc").unwrap();
|
||||
let new_document = rhaimap_to_object(new_document);
|
||||
|
||||
if document_object != new_document {
|
||||
assert_eq!(
|
||||
Some(&document_id),
|
||||
new_document.get(primary_key),
|
||||
"you cannot change the document id when editing documents"
|
||||
);
|
||||
documents_batch_builder.append_json_object(&new_document)?;
|
||||
match scope.remove::<Dynamic>("doc") {
|
||||
// If the "doc" variable has been removed from the scope
|
||||
// or set to (), we effectively delete the document.
|
||||
Some(doc) if doc.is_unit() => {
|
||||
return Ok(DocumentEdition::Deleted(docid));
|
||||
}
|
||||
None => unreachable!(),
|
||||
Some(document) => match document.try_cast() {
|
||||
Some(document) => {
|
||||
let new_document = rhaimap_to_object(document);
|
||||
if json_document != new_document {
|
||||
assert_eq!(
|
||||
Some(document_id),
|
||||
new_document.get(primary_key),
|
||||
"you cannot change the document id when editing documents"
|
||||
);
|
||||
return Ok(DocumentEdition::Edited(new_document));
|
||||
}
|
||||
}
|
||||
None => panic!("Why is \"doc\" no longer a Map?"),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
Ok(DocumentEdition::Nothing) as Result<_>
|
||||
});
|
||||
|
||||
rayon_par_bridge::par_bridge(100, processing, |iterator| {
|
||||
for result in iterator {
|
||||
if (self.should_abort)() {
|
||||
return Err(Error::InternalError(InternalError::AbortedIndexation));
|
||||
}
|
||||
|
||||
match result? {
|
||||
DocumentEdition::Deleted(docid) => {
|
||||
documents_to_remove.push(docid);
|
||||
}
|
||||
DocumentEdition::Edited(new_document) => {
|
||||
documents_batch_builder.append_json_object(&new_document)?;
|
||||
}
|
||||
DocumentEdition::Nothing => (),
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
let file = documents_batch_builder.into_inner()?;
|
||||
let reader = DocumentsBatchReader::from_reader(file)?;
|
||||
|
||||
self.add_documents(reader)
|
||||
let (this, removed) = self.remove_documents_from_db_no_batch(&documents_to_remove)?;
|
||||
let (this, result) = this.add_documents(reader)?;
|
||||
|
||||
Ok((this, result.map(|added| (removed, added))))
|
||||
}
|
||||
|
||||
pub fn with_embedders(mut self, embedders: EmbeddingConfigs) -> Self {
|
||||
|
||||
86
milli/src/update/index_documents/parallel.rs
Normal file
86
milli/src/update/index_documents/parallel.rs
Normal file
@@ -0,0 +1,86 @@
|
||||
use heed::types::Bytes;
|
||||
use heed::{Database, RoTxn};
|
||||
use obkv::KvReaderU16;
|
||||
use roaring::RoaringBitmap;
|
||||
|
||||
use crate::{all_obkv_to_json, DocumentId, FieldsIdsMap, Object, ObkvCodec, Result, BEU32};
|
||||
|
||||
pub struct ImmutableObkvs<'t> {
|
||||
ids: RoaringBitmap,
|
||||
fields_ids_map: FieldsIdsMap,
|
||||
slices: Vec<&'t [u8]>,
|
||||
}
|
||||
|
||||
impl<'t> ImmutableObkvs<'t> {
|
||||
/// Creates the structure by fetching all the OBKVs
|
||||
/// and keeping the transaction making the pointers valid.
|
||||
pub fn new(
|
||||
rtxn: &'t RoTxn,
|
||||
documents_database: Database<BEU32, ObkvCodec>,
|
||||
fields_ids_map: FieldsIdsMap,
|
||||
subset: RoaringBitmap,
|
||||
) -> heed::Result<Self> {
|
||||
let mut slices = Vec::new();
|
||||
let documents_database = documents_database.remap_data_type::<Bytes>();
|
||||
for docid in &subset {
|
||||
let slice = documents_database.get(rtxn, &docid)?.unwrap();
|
||||
slices.push(slice);
|
||||
}
|
||||
|
||||
Ok(ImmutableObkvs { ids: subset, fields_ids_map, slices })
|
||||
}
|
||||
|
||||
/// Returns the OBKVs identified by the given ID.
|
||||
pub fn obkv(&self, docid: DocumentId) -> heed::Result<Option<KvReaderU16<'t>>> {
|
||||
match self
|
||||
.ids
|
||||
.rank(docid)
|
||||
.checked_sub(1)
|
||||
.and_then(|offset| self.slices.get(offset as usize))
|
||||
{
|
||||
Some(bytes) => Ok(Some(KvReaderU16::new(bytes))),
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the owned rhai::Map identified by the given ID.
|
||||
pub fn rhai_map(&self, docid: DocumentId) -> Result<Option<rhai::Map>> {
|
||||
let obkv = match self.obkv(docid) {
|
||||
Ok(Some(obkv)) => obkv,
|
||||
Ok(None) => return Ok(None),
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
|
||||
let all_keys = obkv.iter().map(|(k, _v)| k).collect::<Vec<_>>();
|
||||
let map: Result<rhai::Map> = all_keys
|
||||
.iter()
|
||||
.copied()
|
||||
.flat_map(|id| obkv.get(id).map(|value| (id, value)))
|
||||
.map(|(id, value)| {
|
||||
let name = self.fields_ids_map.name(id).ok_or(
|
||||
crate::error::FieldIdMapMissingEntry::FieldId {
|
||||
field_id: id,
|
||||
process: "allobkv_to_rhaimap",
|
||||
},
|
||||
)?;
|
||||
let value = serde_json::from_slice(value)
|
||||
.map_err(crate::error::InternalError::SerdeJson)?;
|
||||
Ok((name.into(), value))
|
||||
})
|
||||
.collect();
|
||||
|
||||
map.map(Some)
|
||||
}
|
||||
|
||||
pub fn json_map(&self, docid: DocumentId) -> Result<Option<Object>> {
|
||||
let obkv = match self.obkv(docid) {
|
||||
Ok(Some(obkv)) => obkv,
|
||||
Ok(None) => return Ok(None),
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
|
||||
all_obkv_to_json(obkv, &self.fields_ids_map).map(Some)
|
||||
}
|
||||
}
|
||||
|
||||
unsafe impl Sync for ImmutableObkvs<'_> {}
|
||||
Reference in New Issue
Block a user