mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-11-22 04:36:32 +00:00
Merge pull request #5963 from meilisearch/engprod-2128-allow-attaching-user-defined-metadata-to-tasks-and-return
Allow to attach `customMetadata` in the document addition or update tasks
This commit is contained in:
@@ -96,6 +96,8 @@ pub struct TaskDump {
|
||||
pub finished_at: Option<OffsetDateTime>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub network: Option<TaskNetwork>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub custom_metadata: Option<String>,
|
||||
}
|
||||
|
||||
// A `Kind` specific version made for the dump. If modified you may break the dump.
|
||||
@@ -178,6 +180,7 @@ impl From<Task> for TaskDump {
|
||||
started_at: task.started_at,
|
||||
finished_at: task.finished_at,
|
||||
network: task.network,
|
||||
custom_metadata: task.custom_metadata,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -396,6 +399,7 @@ pub(crate) mod test {
|
||||
started_at: Some(datetime!(2022-11-20 0:00 UTC)),
|
||||
finished_at: Some(datetime!(2022-11-21 0:00 UTC)),
|
||||
network: None,
|
||||
custom_metadata: None,
|
||||
},
|
||||
None,
|
||||
),
|
||||
@@ -421,6 +425,7 @@ pub(crate) mod test {
|
||||
started_at: None,
|
||||
finished_at: None,
|
||||
network: None,
|
||||
custom_metadata: None,
|
||||
},
|
||||
Some(vec![
|
||||
json!({ "id": 4, "race": "leonberg" }).as_object().unwrap().clone(),
|
||||
@@ -441,6 +446,7 @@ pub(crate) mod test {
|
||||
started_at: None,
|
||||
finished_at: None,
|
||||
network: None,
|
||||
custom_metadata: None,
|
||||
},
|
||||
None,
|
||||
),
|
||||
|
||||
@@ -164,6 +164,7 @@ impl CompatV5ToV6 {
|
||||
started_at: task_view.started_at,
|
||||
finished_at: task_view.finished_at,
|
||||
network: None,
|
||||
custom_metadata: None,
|
||||
};
|
||||
|
||||
(task, content_file)
|
||||
|
||||
@@ -150,6 +150,7 @@ impl<'a> Dump<'a> {
|
||||
details: task.details,
|
||||
status: task.status,
|
||||
network: task.network,
|
||||
custom_metadata: task.custom_metadata,
|
||||
kind: match task.kind {
|
||||
KindDump::DocumentImport {
|
||||
primary_key,
|
||||
|
||||
@@ -232,6 +232,7 @@ pub fn snapshot_task(task: &Task) -> String {
|
||||
status,
|
||||
kind,
|
||||
network,
|
||||
custom_metadata,
|
||||
} = task;
|
||||
snap.push('{');
|
||||
snap.push_str(&format!("uid: {uid}, "));
|
||||
@@ -252,6 +253,9 @@ pub fn snapshot_task(task: &Task) -> String {
|
||||
if let Some(network) = network {
|
||||
snap.push_str(&format!("network: {network:?}, "))
|
||||
}
|
||||
if let Some(custom_metadata) = custom_metadata {
|
||||
snap.push_str(&format!("custom_metadata: {custom_metadata:?}"))
|
||||
}
|
||||
|
||||
snap.push('}');
|
||||
snap
|
||||
|
||||
@@ -756,6 +756,19 @@ impl IndexScheduler {
|
||||
kind: KindWithContent,
|
||||
task_id: Option<TaskId>,
|
||||
dry_run: bool,
|
||||
) -> Result<Task> {
|
||||
self.register_with_custom_metadata(kind, task_id, None, dry_run)
|
||||
}
|
||||
|
||||
/// Register a new task in the scheduler, with metadata.
|
||||
///
|
||||
/// If it fails and data was associated with the task, it tries to delete the associated data.
|
||||
pub fn register_with_custom_metadata(
|
||||
&self,
|
||||
kind: KindWithContent,
|
||||
task_id: Option<TaskId>,
|
||||
custom_metadata: Option<String>,
|
||||
dry_run: bool,
|
||||
) -> Result<Task> {
|
||||
// if the task doesn't delete or cancel anything and 40% of the task queue is full, we must refuse to enqueue the incoming task
|
||||
if !matches!(&kind, KindWithContent::TaskDeletion { tasks, .. } | KindWithContent::TaskCancelation { tasks, .. } if !tasks.is_empty())
|
||||
@@ -766,7 +779,7 @@ impl IndexScheduler {
|
||||
}
|
||||
|
||||
let mut wtxn = self.env.write_txn()?;
|
||||
let task = self.queue.register(&mut wtxn, &kind, task_id, dry_run)?;
|
||||
let task = self.queue.register(&mut wtxn, &kind, task_id, custom_metadata, dry_run)?;
|
||||
|
||||
// If the registered task is a task cancelation
|
||||
// we inform the processing tasks to stop (if necessary).
|
||||
|
||||
@@ -257,6 +257,7 @@ impl Queue {
|
||||
wtxn: &mut RwTxn,
|
||||
kind: &KindWithContent,
|
||||
task_id: Option<TaskId>,
|
||||
custom_metadata: Option<String>,
|
||||
dry_run: bool,
|
||||
) -> Result<Task> {
|
||||
let next_task_id = self.tasks.next_task_id(wtxn)?;
|
||||
@@ -280,6 +281,7 @@ impl Queue {
|
||||
status: Status::Enqueued,
|
||||
kind: kind.clone(),
|
||||
network: None,
|
||||
custom_metadata,
|
||||
};
|
||||
// For deletion and cancelation tasks, we want to make extra sure that they
|
||||
// don't attempt to delete/cancel tasks that are newer than themselves.
|
||||
@@ -344,6 +346,7 @@ impl Queue {
|
||||
tasks: to_delete,
|
||||
},
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
)?;
|
||||
|
||||
|
||||
@@ -98,6 +98,7 @@ pub fn upgrade_index_scheduler(
|
||||
status: Status::Enqueued,
|
||||
kind: KindWithContent::UpgradeDatabase { from },
|
||||
network: None,
|
||||
custom_metadata: None,
|
||||
},
|
||||
)?;
|
||||
wtxn.commit()?;
|
||||
|
||||
@@ -379,6 +379,7 @@ impl crate::IndexScheduler {
|
||||
status,
|
||||
kind,
|
||||
network: _,
|
||||
custom_metadata: _,
|
||||
} = task;
|
||||
assert_eq!(uid, task.uid);
|
||||
if task.status != Status::Enqueued {
|
||||
|
||||
@@ -254,6 +254,7 @@ InvalidSearchHybridQuery , InvalidRequest , BAD_REQU
|
||||
InvalidIndexLimit , InvalidRequest , BAD_REQUEST ;
|
||||
InvalidIndexOffset , InvalidRequest , BAD_REQUEST ;
|
||||
InvalidIndexPrimaryKey , InvalidRequest , BAD_REQUEST ;
|
||||
InvalidIndexCustomMetadata , InvalidRequest , BAD_REQUEST ;
|
||||
InvalidIndexUid , InvalidRequest , BAD_REQUEST ;
|
||||
InvalidMultiSearchFacets , InvalidRequest , BAD_REQUEST ;
|
||||
InvalidMultiSearchFacetsByIndex , InvalidRequest , BAD_REQUEST ;
|
||||
|
||||
@@ -55,6 +55,9 @@ pub struct TaskView {
|
||||
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub network: Option<TaskNetwork>,
|
||||
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub custom_metadata: Option<String>,
|
||||
}
|
||||
|
||||
impl TaskView {
|
||||
@@ -73,6 +76,7 @@ impl TaskView {
|
||||
started_at: task.started_at,
|
||||
finished_at: task.finished_at,
|
||||
network: task.network.clone(),
|
||||
custom_metadata: task.custom_metadata.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,6 +45,9 @@ pub struct Task {
|
||||
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub network: Option<TaskNetwork>,
|
||||
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub custom_metadata: Option<String>,
|
||||
}
|
||||
|
||||
impl Task {
|
||||
|
||||
@@ -333,10 +333,12 @@ impl Aggregate for DocumentsDeletionAggregator {
|
||||
pub async fn delete_document(
|
||||
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>,
|
||||
path: web::Path<DocumentParam>,
|
||||
params: AwebQueryParameter<CustomMetadataQuery, DeserrQueryParamError>,
|
||||
req: HttpRequest,
|
||||
opt: web::Data<Opt>,
|
||||
analytics: web::Data<Analytics>,
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
let CustomMetadataQuery { custom_metadata } = params.into_inner();
|
||||
let DocumentParam { index_uid, document_id } = path.into_inner();
|
||||
let index_uid = IndexUid::try_from(index_uid)?;
|
||||
let network = index_scheduler.network();
|
||||
@@ -359,7 +361,10 @@ pub async fn delete_document(
|
||||
let dry_run = is_dry_run(&req, &opt)?;
|
||||
let task = {
|
||||
let index_scheduler = index_scheduler.clone();
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await??
|
||||
tokio::task::spawn_blocking(move || {
|
||||
index_scheduler.register_with_custom_metadata(task, uid, custom_metadata, dry_run)
|
||||
})
|
||||
.await??
|
||||
};
|
||||
|
||||
if network.sharding && !dry_run {
|
||||
@@ -678,6 +683,19 @@ pub struct UpdateDocumentsQuery {
|
||||
#[param(value_type = char, default = ",", example = ";")]
|
||||
#[deserr(default, try_from(char) = from_char_csv_delimiter -> DeserrQueryParamError<InvalidDocumentCsvDelimiter>, error = DeserrQueryParamError<InvalidDocumentCsvDelimiter>)]
|
||||
pub csv_delimiter: Option<u8>,
|
||||
|
||||
#[param(example = "custom")]
|
||||
#[deserr(default, error = DeserrQueryParamError<InvalidIndexCustomMetadata>)]
|
||||
pub custom_metadata: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Deserr, IntoParams)]
|
||||
#[deserr(error = DeserrQueryParamError, rename_all = camelCase, deny_unknown_fields)]
|
||||
#[into_params(parameter_in = Query, rename_all = "camelCase")]
|
||||
pub struct CustomMetadataQuery {
|
||||
#[param(example = "custom")]
|
||||
#[deserr(default, error = DeserrQueryParamError<InvalidIndexCustomMetadata>)]
|
||||
pub custom_metadata: Option<String>,
|
||||
}
|
||||
|
||||
fn from_char_csv_delimiter(
|
||||
@@ -819,6 +837,7 @@ pub async fn replace_documents(
|
||||
body,
|
||||
IndexDocumentsMethod::ReplaceDocuments,
|
||||
uid,
|
||||
params.custom_metadata,
|
||||
dry_run,
|
||||
allow_index_creation,
|
||||
&req,
|
||||
@@ -921,6 +940,7 @@ pub async fn update_documents(
|
||||
body,
|
||||
IndexDocumentsMethod::UpdateDocuments,
|
||||
uid,
|
||||
params.custom_metadata,
|
||||
dry_run,
|
||||
allow_index_creation,
|
||||
&req,
|
||||
@@ -940,6 +960,7 @@ async fn document_addition(
|
||||
body: Payload,
|
||||
method: IndexDocumentsMethod,
|
||||
task_id: Option<TaskId>,
|
||||
custom_metadata: Option<String>,
|
||||
dry_run: bool,
|
||||
allow_index_creation: bool,
|
||||
req: &HttpRequest,
|
||||
@@ -1065,8 +1086,10 @@ async fn document_addition(
|
||||
};
|
||||
|
||||
let scheduler = index_scheduler.clone();
|
||||
let task = match tokio::task::spawn_blocking(move || scheduler.register(task, task_id, dry_run))
|
||||
.await?
|
||||
let task = match tokio::task::spawn_blocking(move || {
|
||||
scheduler.register_with_custom_metadata(task, task_id, custom_metadata, dry_run)
|
||||
})
|
||||
.await?
|
||||
{
|
||||
Ok(task) => task,
|
||||
Err(e) => {
|
||||
@@ -1130,7 +1153,7 @@ async fn copy_body_to_file(
|
||||
/// Delete a set of documents based on an array of document ids.
|
||||
#[utoipa::path(
|
||||
post,
|
||||
path = "{indexUid}/delete-batch",
|
||||
path = "{indexUid}/documents/delete-batch",
|
||||
tag = "Documents",
|
||||
security(("Bearer" = ["documents.delete", "documents.*", "*"])),
|
||||
params(
|
||||
@@ -1161,11 +1184,14 @@ pub async fn delete_documents_batch(
|
||||
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>,
|
||||
index_uid: web::Path<String>,
|
||||
body: web::Json<Vec<Value>>,
|
||||
params: AwebQueryParameter<CustomMetadataQuery, DeserrQueryParamError>,
|
||||
req: HttpRequest,
|
||||
opt: web::Data<Opt>,
|
||||
analytics: web::Data<Analytics>,
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
debug!(parameters = ?body, "Delete documents by batch");
|
||||
let CustomMetadataQuery { custom_metadata } = params.into_inner();
|
||||
|
||||
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
||||
let network = index_scheduler.network();
|
||||
|
||||
@@ -1190,7 +1216,10 @@ pub async fn delete_documents_batch(
|
||||
let dry_run = is_dry_run(&req, &opt)?;
|
||||
let task = {
|
||||
let index_scheduler = index_scheduler.clone();
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await??
|
||||
tokio::task::spawn_blocking(move || {
|
||||
index_scheduler.register_with_custom_metadata(task, uid, custom_metadata, dry_run)
|
||||
})
|
||||
.await??
|
||||
};
|
||||
|
||||
if network.sharding && !dry_run {
|
||||
@@ -1244,12 +1273,15 @@ pub struct DocumentDeletionByFilter {
|
||||
pub async fn delete_documents_by_filter(
|
||||
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>,
|
||||
index_uid: web::Path<String>,
|
||||
params: AwebQueryParameter<CustomMetadataQuery, DeserrQueryParamError>,
|
||||
body: AwebJson<DocumentDeletionByFilter, DeserrJsonError>,
|
||||
req: HttpRequest,
|
||||
opt: web::Data<Opt>,
|
||||
analytics: web::Data<Analytics>,
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
debug!(parameters = ?body, "Delete documents by filter");
|
||||
let CustomMetadataQuery { custom_metadata } = params.into_inner();
|
||||
|
||||
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
||||
let index_uid = index_uid.into_inner();
|
||||
let filter = body.into_inner();
|
||||
@@ -1282,7 +1314,10 @@ pub async fn delete_documents_by_filter(
|
||||
let dry_run = is_dry_run(&req, &opt)?;
|
||||
let task = {
|
||||
let index_scheduler = index_scheduler.clone();
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await??
|
||||
tokio::task::spawn_blocking(move || {
|
||||
index_scheduler.register_with_custom_metadata(task, uid, custom_metadata, dry_run)
|
||||
})
|
||||
.await??
|
||||
};
|
||||
|
||||
if network.sharding && !dry_run {
|
||||
@@ -1372,12 +1407,14 @@ impl Aggregate for EditDocumentsByFunctionAggregator {
|
||||
pub async fn edit_documents_by_function(
|
||||
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_ALL }>, Data<IndexScheduler>>,
|
||||
index_uid: web::Path<String>,
|
||||
params: AwebJson<DocumentEditionByFunction, DeserrJsonError>,
|
||||
params: AwebQueryParameter<CustomMetadataQuery, DeserrQueryParamError>,
|
||||
body: AwebJson<DocumentEditionByFunction, DeserrJsonError>,
|
||||
req: HttpRequest,
|
||||
opt: web::Data<Opt>,
|
||||
analytics: web::Data<Analytics>,
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
debug!(parameters = ?params, "Edit documents by function");
|
||||
debug!(parameters = ?body, "Edit documents by function");
|
||||
let CustomMetadataQuery { custom_metadata } = params.into_inner();
|
||||
|
||||
index_scheduler
|
||||
.features()
|
||||
@@ -1387,23 +1424,23 @@ pub async fn edit_documents_by_function(
|
||||
|
||||
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
||||
let index_uid = index_uid.into_inner();
|
||||
let params = params.into_inner();
|
||||
let body = body.into_inner();
|
||||
|
||||
analytics.publish(
|
||||
EditDocumentsByFunctionAggregator {
|
||||
filtered: params.filter.is_some(),
|
||||
with_context: params.context.is_some(),
|
||||
filtered: body.filter.is_some(),
|
||||
with_context: body.context.is_some(),
|
||||
index_creation: index_scheduler.index(&index_uid).is_err(),
|
||||
},
|
||||
&req,
|
||||
);
|
||||
|
||||
let engine = milli::rhai::Engine::new();
|
||||
if let Err(e) = engine.compile(¶ms.function) {
|
||||
if let Err(e) = engine.compile(&body.function) {
|
||||
return Err(ResponseError::from_msg(e.to_string(), Code::BadRequest));
|
||||
}
|
||||
|
||||
if let Some(ref filter) = params.filter {
|
||||
if let Some(ref filter) = body.filter {
|
||||
// we ensure the filter is well formed before enqueuing it
|
||||
crate::search::parse_filter(
|
||||
filter,
|
||||
@@ -1414,8 +1451,8 @@ pub async fn edit_documents_by_function(
|
||||
}
|
||||
let task = KindWithContent::DocumentEdition {
|
||||
index_uid: index_uid.clone(),
|
||||
filter_expr: params.filter.clone(),
|
||||
context: match params.context.clone() {
|
||||
filter_expr: body.filter.clone(),
|
||||
context: match body.context.clone() {
|
||||
Some(Value::Object(m)) => Some(m),
|
||||
None => None,
|
||||
_ => {
|
||||
@@ -1425,18 +1462,21 @@ pub async fn edit_documents_by_function(
|
||||
))
|
||||
}
|
||||
},
|
||||
function: params.function.clone(),
|
||||
function: body.function.clone(),
|
||||
};
|
||||
|
||||
let uid = get_task_id(&req, &opt)?;
|
||||
let dry_run = is_dry_run(&req, &opt)?;
|
||||
let task = {
|
||||
let index_scheduler = index_scheduler.clone();
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await??
|
||||
tokio::task::spawn_blocking(move || {
|
||||
index_scheduler.register_with_custom_metadata(task, uid, custom_metadata, dry_run)
|
||||
})
|
||||
.await??
|
||||
};
|
||||
|
||||
if network.sharding && !dry_run {
|
||||
proxy(&index_scheduler, &index_uid, &req, network, Body::Inline(params), &task).await?;
|
||||
proxy(&index_scheduler, &index_uid, &req, network, Body::Inline(body), &task).await?;
|
||||
}
|
||||
|
||||
let task: SummarizedTaskView = task.into();
|
||||
@@ -1477,12 +1517,14 @@ pub async fn edit_documents_by_function(
|
||||
pub async fn clear_all_documents(
|
||||
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>,
|
||||
index_uid: web::Path<String>,
|
||||
params: AwebQueryParameter<CustomMetadataQuery, DeserrQueryParamError>,
|
||||
req: HttpRequest,
|
||||
opt: web::Data<Opt>,
|
||||
analytics: web::Data<Analytics>,
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
||||
let network = index_scheduler.network();
|
||||
let CustomMetadataQuery { custom_metadata } = params.into_inner();
|
||||
|
||||
analytics.publish(
|
||||
DocumentsDeletionAggregator {
|
||||
@@ -1501,7 +1543,10 @@ pub async fn clear_all_documents(
|
||||
let task = {
|
||||
let index_scheduler = index_scheduler.clone();
|
||||
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await??
|
||||
tokio::task::spawn_blocking(move || {
|
||||
index_scheduler.register_with_custom_metadata(task, uid, custom_metadata, dry_run)
|
||||
})
|
||||
.await??
|
||||
};
|
||||
|
||||
if network.sharding && !dry_run {
|
||||
|
||||
@@ -218,6 +218,8 @@ pub struct SummarizedTaskView {
|
||||
deserialize_with = "time::serde::rfc3339::deserialize"
|
||||
)]
|
||||
enqueued_at: OffsetDateTime,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
custom_metadata: Option<String>,
|
||||
}
|
||||
|
||||
impl From<Task> for SummarizedTaskView {
|
||||
@@ -228,6 +230,7 @@ impl From<Task> for SummarizedTaskView {
|
||||
status: task.status,
|
||||
kind: task.kind.as_kind(),
|
||||
enqueued_at: task.enqueued_at,
|
||||
custom_metadata: task.custom_metadata,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -91,7 +91,16 @@ impl<'a> Index<'a, Owned> {
|
||||
documents: Value,
|
||||
primary_key: Option<&str>,
|
||||
) -> (Value, StatusCode) {
|
||||
self._add_documents(documents, primary_key).await
|
||||
self._add_documents(documents, primary_key, None).await
|
||||
}
|
||||
|
||||
pub async fn add_documents_with_custom_metadata(
|
||||
&self,
|
||||
documents: Value,
|
||||
primary_key: Option<&str>,
|
||||
custom_metadata: Option<&str>,
|
||||
) -> (Value, StatusCode) {
|
||||
self._add_documents(documents, primary_key, custom_metadata).await
|
||||
}
|
||||
|
||||
pub async fn raw_add_documents(
|
||||
@@ -352,12 +361,25 @@ impl<State> Index<'_, State> {
|
||||
&self,
|
||||
documents: Value,
|
||||
primary_key: Option<&str>,
|
||||
custom_metadata: Option<&str>,
|
||||
) -> (Value, StatusCode) {
|
||||
let url = match primary_key {
|
||||
Some(key) => {
|
||||
format!("/indexes/{}/documents?primaryKey={}", urlencode(self.uid.as_ref()), key)
|
||||
let url = match (primary_key, custom_metadata) {
|
||||
(Some(key), Some(meta)) => {
|
||||
format!(
|
||||
"/indexes/{}/documents?primaryKey={key}&customMetadata={meta}",
|
||||
urlencode(self.uid.as_ref()),
|
||||
)
|
||||
}
|
||||
None => format!("/indexes/{}/documents", urlencode(self.uid.as_ref())),
|
||||
(None, Some(meta)) => {
|
||||
format!(
|
||||
"/indexes/{}/documents?&customMetadata={meta}",
|
||||
urlencode(self.uid.as_ref()),
|
||||
)
|
||||
}
|
||||
(Some(key), None) => {
|
||||
format!("/indexes/{}/documents?&primaryKey={key}", urlencode(self.uid.as_ref()),)
|
||||
}
|
||||
(None, None) => format!("/indexes/{}/documents", urlencode(self.uid.as_ref())),
|
||||
};
|
||||
self.service.post_encoded(url, documents, self.encoder).await
|
||||
}
|
||||
|
||||
@@ -241,7 +241,7 @@ pub async fn shared_index_with_documents() -> &'static Index<'static, Shared> {
|
||||
let server = Server::new_shared();
|
||||
let index = server._index("SHARED_DOCUMENTS").to_shared();
|
||||
let documents = DOCUMENTS.clone();
|
||||
let (response, _code) = index._add_documents(documents, None).await;
|
||||
let (response, _code) = index._add_documents(documents, None, None).await;
|
||||
server.wait_task(response.uid()).await.succeeded();
|
||||
let (response, _code) = index
|
||||
._update_settings(
|
||||
@@ -284,7 +284,7 @@ pub async fn shared_index_with_score_documents() -> &'static Index<'static, Shar
|
||||
let server = Server::new_shared();
|
||||
let index = server._index("SHARED_SCORE_DOCUMENTS").to_shared();
|
||||
let documents = SCORE_DOCUMENTS.clone();
|
||||
let (response, _code) = index._add_documents(documents, None).await;
|
||||
let (response, _code) = index._add_documents(documents, None, None).await;
|
||||
server.wait_task(response.uid()).await.succeeded();
|
||||
let (response, _code) = index
|
||||
._update_settings(
|
||||
@@ -361,7 +361,7 @@ pub async fn shared_index_with_nested_documents() -> &'static Index<'static, Sha
|
||||
let server = Server::new_shared();
|
||||
let index = server._index("SHARED_NESTED_DOCUMENTS").to_shared();
|
||||
let documents = NESTED_DOCUMENTS.clone();
|
||||
let (response, _code) = index._add_documents(documents, None).await;
|
||||
let (response, _code) = index._add_documents(documents, None, None).await;
|
||||
server.wait_task(response.uid()).await.succeeded();
|
||||
let (response, _code) = index
|
||||
._update_settings(
|
||||
@@ -508,7 +508,7 @@ pub async fn shared_index_with_geo_documents() -> &'static Index<'static, Shared
|
||||
.get_or_init(|| async {
|
||||
let server = Server::new_shared();
|
||||
let index = server._index("SHARED_GEO_DOCUMENTS").to_shared();
|
||||
let (response, _code) = index._add_documents(GEO_DOCUMENTS.clone(), None).await;
|
||||
let (response, _code) = index._add_documents(GEO_DOCUMENTS.clone(), None, None).await;
|
||||
server.wait_task(response.uid()).await.succeeded();
|
||||
|
||||
let (response, _code) = index
|
||||
@@ -531,7 +531,7 @@ pub async fn shared_index_geojson_documents() -> &'static Index<'static, Shared>
|
||||
let index = server._index("SHARED_GEOJSON_DOCUMENTS").to_shared();
|
||||
let countries = include_str!("../documents/geojson/assets/countries.json");
|
||||
let lille = serde_json::from_str::<serde_json::Value>(countries).unwrap();
|
||||
let (response, _code) = index._add_documents(Value(lille), Some("name")).await;
|
||||
let (response, _code) = index._add_documents(Value(lille), Some("name"), None).await;
|
||||
server.wait_task(response.uid()).await.succeeded();
|
||||
|
||||
let (response, _code) =
|
||||
|
||||
@@ -3141,3 +3141,513 @@ fn fail(override_response_body: Option<&str>) -> ResponseTemplate {
|
||||
response.set_body_json(json!({"error": "provoked error", "code": "test_error", "link": "https://docs.meilisearch.com/errors#test_error"}))
|
||||
}
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn remote_auto_sharding() {
|
||||
let ms0 = Server::new().await;
|
||||
let ms1 = Server::new().await;
|
||||
let ms2 = Server::new().await;
|
||||
|
||||
// enable feature
|
||||
|
||||
let (response, code) = ms0.set_features(json!({"network": true})).await;
|
||||
snapshot!(code, @"200 OK");
|
||||
snapshot!(json_string!(response["network"]), @"true");
|
||||
let (response, code) = ms1.set_features(json!({"network": true})).await;
|
||||
snapshot!(code, @"200 OK");
|
||||
snapshot!(json_string!(response["network"]), @"true");
|
||||
let (response, code) = ms2.set_features(json!({"network": true})).await;
|
||||
snapshot!(code, @"200 OK");
|
||||
snapshot!(json_string!(response["network"]), @"true");
|
||||
|
||||
// set self & sharding
|
||||
|
||||
let (response, code) = ms0.set_network(json!({"self": "ms0", "sharding": true})).await;
|
||||
snapshot!(code, @"200 OK");
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"self": "ms0",
|
||||
"remotes": {},
|
||||
"sharding": true
|
||||
}
|
||||
"###);
|
||||
let (response, code) = ms1.set_network(json!({"self": "ms1", "sharding": true})).await;
|
||||
snapshot!(code, @"200 OK");
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"self": "ms1",
|
||||
"remotes": {},
|
||||
"sharding": true
|
||||
}
|
||||
"###);
|
||||
let (response, code) = ms2.set_network(json!({"self": "ms2", "sharding": true})).await;
|
||||
snapshot!(code, @"200 OK");
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"self": "ms2",
|
||||
"remotes": {},
|
||||
"sharding": true
|
||||
}
|
||||
"###);
|
||||
|
||||
// wrap servers
|
||||
let ms0 = Arc::new(ms0);
|
||||
let ms1 = Arc::new(ms1);
|
||||
let ms2 = Arc::new(ms2);
|
||||
|
||||
let rms0 = LocalMeili::new(ms0.clone()).await;
|
||||
let rms1 = LocalMeili::new(ms1.clone()).await;
|
||||
let rms2 = LocalMeili::new(ms2.clone()).await;
|
||||
|
||||
// set network
|
||||
let network = json!({"remotes": {
|
||||
"ms0": {
|
||||
"url": rms0.url()
|
||||
},
|
||||
"ms1": {
|
||||
"url": rms1.url()
|
||||
},
|
||||
"ms2": {
|
||||
"url": rms2.url()
|
||||
}
|
||||
}});
|
||||
|
||||
println!("{}", serde_json::to_string_pretty(&network).unwrap());
|
||||
|
||||
let (_response, status_code) = ms0.set_network(network.clone()).await;
|
||||
snapshot!(status_code, @"200 OK");
|
||||
let (_response, status_code) = ms1.set_network(network.clone()).await;
|
||||
snapshot!(status_code, @"200 OK");
|
||||
let (_response, status_code) = ms2.set_network(network.clone()).await;
|
||||
snapshot!(status_code, @"200 OK");
|
||||
|
||||
// add documents
|
||||
let documents = SCORE_DOCUMENTS.clone();
|
||||
let documents = documents.as_array().unwrap();
|
||||
let index0 = ms0.index("test");
|
||||
let _index1 = ms1.index("test");
|
||||
let _index2 = ms2.index("test");
|
||||
|
||||
let (task, _status_code) = index0.add_documents(json!(documents), None).await;
|
||||
|
||||
let t0 = task.uid();
|
||||
let (t, _) = ms0.get_task(task.uid()).await;
|
||||
let t1 = t["network"]["remote_tasks"]["ms1"]["taskUid"].as_u64().unwrap();
|
||||
let t2 = t["network"]["remote_tasks"]["ms2"]["taskUid"].as_u64().unwrap();
|
||||
|
||||
ms0.wait_task(t0).await.succeeded();
|
||||
ms1.wait_task(t1).await.succeeded();
|
||||
ms2.wait_task(t2).await.succeeded();
|
||||
|
||||
// perform multi-search
|
||||
let query = "badman returns";
|
||||
let request = json!({
|
||||
"federation": {},
|
||||
"queries": [
|
||||
{
|
||||
"q": query,
|
||||
"indexUid": "test",
|
||||
"federationOptions": {
|
||||
"remote": "ms0"
|
||||
}
|
||||
},
|
||||
{
|
||||
"q": query,
|
||||
"indexUid": "test",
|
||||
"federationOptions": {
|
||||
"remote": "ms1"
|
||||
}
|
||||
},
|
||||
{
|
||||
"q": query,
|
||||
"indexUid": "test",
|
||||
"federationOptions": {
|
||||
"remote": "ms2"
|
||||
}
|
||||
},
|
||||
]
|
||||
});
|
||||
|
||||
let (response, _status_code) = ms0.multi_search(request.clone()).await;
|
||||
snapshot!(code, @"200 OK");
|
||||
snapshot!(json_string!(response, { ".processingTimeMs" => "[time]", ".requestUid" => "[uuid]" }), @r###"
|
||||
{
|
||||
"hits": [
|
||||
{
|
||||
"title": "Batman Returns",
|
||||
"id": "C",
|
||||
"_federation": {
|
||||
"indexUid": "test",
|
||||
"queriesPosition": 2,
|
||||
"weightedRankingScore": 0.8317901234567902,
|
||||
"remote": "ms2"
|
||||
}
|
||||
},
|
||||
{
|
||||
"title": "Batman the dark knight returns: Part 1",
|
||||
"id": "A",
|
||||
"_federation": {
|
||||
"indexUid": "test",
|
||||
"queriesPosition": 1,
|
||||
"weightedRankingScore": 0.7028218694885362,
|
||||
"remote": "ms1"
|
||||
}
|
||||
},
|
||||
{
|
||||
"title": "Batman the dark knight returns: Part 2",
|
||||
"id": "B",
|
||||
"_federation": {
|
||||
"indexUid": "test",
|
||||
"queriesPosition": 1,
|
||||
"weightedRankingScore": 0.7028218694885362,
|
||||
"remote": "ms1"
|
||||
}
|
||||
},
|
||||
{
|
||||
"title": "Badman",
|
||||
"id": "E",
|
||||
"_federation": {
|
||||
"indexUid": "test",
|
||||
"queriesPosition": 2,
|
||||
"weightedRankingScore": 0.5,
|
||||
"remote": "ms2"
|
||||
}
|
||||
},
|
||||
{
|
||||
"title": "Batman",
|
||||
"id": "D",
|
||||
"_federation": {
|
||||
"indexUid": "test",
|
||||
"queriesPosition": 0,
|
||||
"weightedRankingScore": 0.23106060606060605,
|
||||
"remote": "ms0"
|
||||
}
|
||||
}
|
||||
],
|
||||
"processingTimeMs": "[time]",
|
||||
"limit": 20,
|
||||
"offset": 0,
|
||||
"estimatedTotalHits": 5,
|
||||
"requestUid": "[uuid]",
|
||||
"remoteErrors": {}
|
||||
}
|
||||
"###);
|
||||
let (response, _status_code) = ms1.multi_search(request.clone()).await;
|
||||
snapshot!(code, @"200 OK");
|
||||
snapshot!(json_string!(response, { ".processingTimeMs" => "[time]", ".requestUid" => "[uuid]" }), @r###"
|
||||
{
|
||||
"hits": [
|
||||
{
|
||||
"title": "Batman Returns",
|
||||
"id": "C",
|
||||
"_federation": {
|
||||
"indexUid": "test",
|
||||
"queriesPosition": 2,
|
||||
"weightedRankingScore": 0.8317901234567902,
|
||||
"remote": "ms2"
|
||||
}
|
||||
},
|
||||
{
|
||||
"title": "Batman the dark knight returns: Part 1",
|
||||
"id": "A",
|
||||
"_federation": {
|
||||
"indexUid": "test",
|
||||
"queriesPosition": 1,
|
||||
"weightedRankingScore": 0.7028218694885362,
|
||||
"remote": "ms1"
|
||||
}
|
||||
},
|
||||
{
|
||||
"title": "Batman the dark knight returns: Part 2",
|
||||
"id": "B",
|
||||
"_federation": {
|
||||
"indexUid": "test",
|
||||
"queriesPosition": 1,
|
||||
"weightedRankingScore": 0.7028218694885362,
|
||||
"remote": "ms1"
|
||||
}
|
||||
},
|
||||
{
|
||||
"title": "Badman",
|
||||
"id": "E",
|
||||
"_federation": {
|
||||
"indexUid": "test",
|
||||
"queriesPosition": 2,
|
||||
"weightedRankingScore": 0.5,
|
||||
"remote": "ms2"
|
||||
}
|
||||
},
|
||||
{
|
||||
"title": "Batman",
|
||||
"id": "D",
|
||||
"_federation": {
|
||||
"indexUid": "test",
|
||||
"queriesPosition": 0,
|
||||
"weightedRankingScore": 0.23106060606060605,
|
||||
"remote": "ms0"
|
||||
}
|
||||
}
|
||||
],
|
||||
"processingTimeMs": "[time]",
|
||||
"limit": 20,
|
||||
"offset": 0,
|
||||
"estimatedTotalHits": 5,
|
||||
"requestUid": "[uuid]",
|
||||
"remoteErrors": {}
|
||||
}
|
||||
"###);
|
||||
let (response, _status_code) = ms2.multi_search(request.clone()).await;
|
||||
snapshot!(code, @"200 OK");
|
||||
snapshot!(json_string!(response, { ".processingTimeMs" => "[time]", ".requestUid" => "[uuid]" }), @r###"
|
||||
{
|
||||
"hits": [
|
||||
{
|
||||
"title": "Batman Returns",
|
||||
"id": "C",
|
||||
"_federation": {
|
||||
"indexUid": "test",
|
||||
"queriesPosition": 2,
|
||||
"weightedRankingScore": 0.8317901234567902,
|
||||
"remote": "ms2"
|
||||
}
|
||||
},
|
||||
{
|
||||
"title": "Batman the dark knight returns: Part 1",
|
||||
"id": "A",
|
||||
"_federation": {
|
||||
"indexUid": "test",
|
||||
"queriesPosition": 1,
|
||||
"weightedRankingScore": 0.7028218694885362,
|
||||
"remote": "ms1"
|
||||
}
|
||||
},
|
||||
{
|
||||
"title": "Batman the dark knight returns: Part 2",
|
||||
"id": "B",
|
||||
"_federation": {
|
||||
"indexUid": "test",
|
||||
"queriesPosition": 1,
|
||||
"weightedRankingScore": 0.7028218694885362,
|
||||
"remote": "ms1"
|
||||
}
|
||||
},
|
||||
{
|
||||
"title": "Badman",
|
||||
"id": "E",
|
||||
"_federation": {
|
||||
"indexUid": "test",
|
||||
"queriesPosition": 2,
|
||||
"weightedRankingScore": 0.5,
|
||||
"remote": "ms2"
|
||||
}
|
||||
},
|
||||
{
|
||||
"title": "Batman",
|
||||
"id": "D",
|
||||
"_federation": {
|
||||
"indexUid": "test",
|
||||
"queriesPosition": 0,
|
||||
"weightedRankingScore": 0.23106060606060605,
|
||||
"remote": "ms0"
|
||||
}
|
||||
}
|
||||
],
|
||||
"processingTimeMs": "[time]",
|
||||
"limit": 20,
|
||||
"offset": 0,
|
||||
"estimatedTotalHits": 5,
|
||||
"requestUid": "[uuid]",
|
||||
"remoteErrors": {}
|
||||
}
|
||||
"###);
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn remote_auto_sharding_with_custom_metadata() {
|
||||
let ms0 = Server::new().await;
|
||||
let ms1 = Server::new().await;
|
||||
let ms2 = Server::new().await;
|
||||
|
||||
// enable feature
|
||||
|
||||
let (response, code) = ms0.set_features(json!({"network": true})).await;
|
||||
snapshot!(code, @"200 OK");
|
||||
snapshot!(json_string!(response["network"]), @"true");
|
||||
let (response, code) = ms1.set_features(json!({"network": true})).await;
|
||||
snapshot!(code, @"200 OK");
|
||||
snapshot!(json_string!(response["network"]), @"true");
|
||||
let (response, code) = ms2.set_features(json!({"network": true})).await;
|
||||
snapshot!(code, @"200 OK");
|
||||
snapshot!(json_string!(response["network"]), @"true");
|
||||
|
||||
// set self & sharding
|
||||
|
||||
let (response, code) = ms0.set_network(json!({"self": "ms0", "sharding": true})).await;
|
||||
snapshot!(code, @"200 OK");
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"self": "ms0",
|
||||
"remotes": {},
|
||||
"sharding": true
|
||||
}
|
||||
"###);
|
||||
let (response, code) = ms1.set_network(json!({"self": "ms1", "sharding": true})).await;
|
||||
snapshot!(code, @"200 OK");
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"self": "ms1",
|
||||
"remotes": {},
|
||||
"sharding": true
|
||||
}
|
||||
"###);
|
||||
let (response, code) = ms2.set_network(json!({"self": "ms2", "sharding": true})).await;
|
||||
snapshot!(code, @"200 OK");
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"self": "ms2",
|
||||
"remotes": {},
|
||||
"sharding": true
|
||||
}
|
||||
"###);
|
||||
|
||||
// wrap servers
|
||||
let ms0 = Arc::new(ms0);
|
||||
let ms1 = Arc::new(ms1);
|
||||
let ms2 = Arc::new(ms2);
|
||||
|
||||
let rms0 = LocalMeili::new(ms0.clone()).await;
|
||||
let rms1 = LocalMeili::new(ms1.clone()).await;
|
||||
let rms2 = LocalMeili::new(ms2.clone()).await;
|
||||
|
||||
// set network
|
||||
let network = json!({"remotes": {
|
||||
"ms0": {
|
||||
"url": rms0.url()
|
||||
},
|
||||
"ms1": {
|
||||
"url": rms1.url()
|
||||
},
|
||||
"ms2": {
|
||||
"url": rms2.url()
|
||||
}
|
||||
}});
|
||||
|
||||
println!("{}", serde_json::to_string_pretty(&network).unwrap());
|
||||
|
||||
let (_response, status_code) = ms0.set_network(network.clone()).await;
|
||||
snapshot!(status_code, @"200 OK");
|
||||
let (_response, status_code) = ms1.set_network(network.clone()).await;
|
||||
snapshot!(status_code, @"200 OK");
|
||||
let (_response, status_code) = ms2.set_network(network.clone()).await;
|
||||
snapshot!(status_code, @"200 OK");
|
||||
|
||||
// add documents
|
||||
let documents = SCORE_DOCUMENTS.clone();
|
||||
let documents = documents.as_array().unwrap();
|
||||
let index0 = ms0.index("test");
|
||||
let _index1 = ms1.index("test");
|
||||
let _index2 = ms2.index("test");
|
||||
|
||||
let (task, _status_code) = index0
|
||||
.add_documents_with_custom_metadata(
|
||||
json!(documents),
|
||||
None,
|
||||
Some("remote_auto_sharding_with_custom_metadata"),
|
||||
)
|
||||
.await;
|
||||
|
||||
let t0 = task.uid();
|
||||
let (t, _) = ms0.get_task(task.uid()).await;
|
||||
let t1 = t["network"]["remote_tasks"]["ms1"]["taskUid"].as_u64().unwrap();
|
||||
let t2 = t["network"]["remote_tasks"]["ms2"]["taskUid"].as_u64().unwrap();
|
||||
|
||||
let t = ms0.wait_task(t0).await.succeeded();
|
||||
snapshot!(t, @r###"
|
||||
{
|
||||
"uid": "[uid]",
|
||||
"batchUid": "[batch_uid]",
|
||||
"indexUid": "test",
|
||||
"status": "succeeded",
|
||||
"type": "documentAdditionOrUpdate",
|
||||
"canceledBy": null,
|
||||
"details": {
|
||||
"receivedDocuments": 5,
|
||||
"indexedDocuments": 1
|
||||
},
|
||||
"error": null,
|
||||
"duration": "[duration]",
|
||||
"enqueuedAt": "[date]",
|
||||
"startedAt": "[date]",
|
||||
"finishedAt": "[date]",
|
||||
"network": {
|
||||
"remote_tasks": {
|
||||
"ms1": {
|
||||
"taskUid": 0,
|
||||
"error": null
|
||||
},
|
||||
"ms2": {
|
||||
"taskUid": 0,
|
||||
"error": null
|
||||
}
|
||||
}
|
||||
},
|
||||
"customMetadata": "remote_auto_sharding_with_custom_metadata"
|
||||
}
|
||||
"###);
|
||||
|
||||
let t = ms1.wait_task(t1).await.succeeded();
|
||||
snapshot!(t, @r###"
|
||||
{
|
||||
"uid": "[uid]",
|
||||
"batchUid": "[batch_uid]",
|
||||
"indexUid": "test",
|
||||
"status": "succeeded",
|
||||
"type": "documentAdditionOrUpdate",
|
||||
"canceledBy": null,
|
||||
"details": {
|
||||
"receivedDocuments": 5,
|
||||
"indexedDocuments": 2
|
||||
},
|
||||
"error": null,
|
||||
"duration": "[duration]",
|
||||
"enqueuedAt": "[date]",
|
||||
"startedAt": "[date]",
|
||||
"finishedAt": "[date]",
|
||||
"network": {
|
||||
"origin": {
|
||||
"remoteName": "ms0",
|
||||
"taskUid": 0
|
||||
}
|
||||
},
|
||||
"customMetadata": "remote_auto_sharding_with_custom_metadata"
|
||||
}
|
||||
"###);
|
||||
|
||||
let t = ms2.wait_task(t2).await.succeeded();
|
||||
snapshot!(t, @r###"
|
||||
{
|
||||
"uid": "[uid]",
|
||||
"batchUid": "[batch_uid]",
|
||||
"indexUid": "test",
|
||||
"status": "succeeded",
|
||||
"type": "documentAdditionOrUpdate",
|
||||
"canceledBy": null,
|
||||
"details": {
|
||||
"receivedDocuments": 5,
|
||||
"indexedDocuments": 2
|
||||
},
|
||||
"error": null,
|
||||
"duration": "[duration]",
|
||||
"enqueuedAt": "[date]",
|
||||
"startedAt": "[date]",
|
||||
"finishedAt": "[date]",
|
||||
"network": {
|
||||
"origin": {
|
||||
"remoteName": "ms0",
|
||||
"taskUid": 0
|
||||
}
|
||||
},
|
||||
"customMetadata": "remote_auto_sharding_with_custom_metadata"
|
||||
}
|
||||
"###);
|
||||
}
|
||||
|
||||
@@ -656,3 +656,119 @@ async fn forbidden_fields() {
|
||||
}
|
||||
"#);
|
||||
}
|
||||
|
||||
#[actix_web::test]
|
||||
async fn receive_custom_metadata() {
|
||||
let WebhookHandle { server_handle: handle1, url: url1, receiver: mut receiver1 } =
|
||||
create_webhook_server().await;
|
||||
let WebhookHandle { server_handle: handle2, url: url2, receiver: mut receiver2 } =
|
||||
create_webhook_server().await;
|
||||
let WebhookHandle { server_handle: handle3, url: url3, receiver: mut receiver3 } =
|
||||
create_webhook_server().await;
|
||||
|
||||
let db_path = tempfile::tempdir().unwrap();
|
||||
let server = Server::new_with_options(Opt {
|
||||
task_webhook_url: Some(Url::parse(&url3).unwrap()),
|
||||
..default_settings(db_path.path())
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
for url in [url1, url2] {
|
||||
let (value, code) = server.create_webhook(json!({ "url": url })).await;
|
||||
snapshot!(code, @"201 Created");
|
||||
snapshot!(json_string!(value, { ".uuid" => "[uuid]", ".url" => "[ignored]" }), @r#"
|
||||
{
|
||||
"uuid": "[uuid]",
|
||||
"isEditable": true,
|
||||
"url": "[ignored]",
|
||||
"headers": {}
|
||||
}
|
||||
"#);
|
||||
}
|
||||
let index = server.index("tamo");
|
||||
let (response, code) = index
|
||||
.add_documents_with_custom_metadata(
|
||||
json!({ "id": 1, "doggo": "bone" }),
|
||||
None,
|
||||
Some("test_meta"),
|
||||
)
|
||||
.await;
|
||||
|
||||
snapshot!(response, @r###"
|
||||
{
|
||||
"taskUid": 0,
|
||||
"indexUid": "tamo",
|
||||
"status": "enqueued",
|
||||
"type": "documentAdditionOrUpdate",
|
||||
"enqueuedAt": "[date]",
|
||||
"customMetadata": "test_meta"
|
||||
}
|
||||
"###);
|
||||
snapshot!(code, @"202 Accepted");
|
||||
|
||||
let mut count1 = 0;
|
||||
let mut count2 = 0;
|
||||
let mut count3 = 0;
|
||||
while count1 == 0 || count2 == 0 || count3 == 0 {
|
||||
tokio::select! {
|
||||
msg = receiver1.recv() => {
|
||||
if let Some(msg) = msg {
|
||||
count1 += 1;
|
||||
check_metadata(msg);
|
||||
}
|
||||
},
|
||||
msg = receiver2.recv() => {
|
||||
if let Some(msg) = msg {
|
||||
count2 += 1;
|
||||
check_metadata(msg);
|
||||
}
|
||||
},
|
||||
msg = receiver3.recv() => {
|
||||
if let Some(msg) = msg {
|
||||
count3 += 1;
|
||||
check_metadata(msg);
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(count1, 1);
|
||||
assert_eq!(count2, 1);
|
||||
assert_eq!(count3, 1);
|
||||
|
||||
handle1.abort();
|
||||
handle2.abort();
|
||||
handle3.abort();
|
||||
}
|
||||
|
||||
fn check_metadata(msg: Vec<u8>) {
|
||||
let msg = String::from_utf8(msg).unwrap();
|
||||
let tasks = msg.split('\n');
|
||||
for task in tasks {
|
||||
if task.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let task: serde_json::Value = serde_json::from_str(task).unwrap();
|
||||
snapshot!(common::Value(task), @r###"
|
||||
{
|
||||
"uid": "[uid]",
|
||||
"batchUid": "[batch_uid]",
|
||||
"indexUid": "tamo",
|
||||
"status": "succeeded",
|
||||
"type": "documentAdditionOrUpdate",
|
||||
"canceledBy": null,
|
||||
"details": {
|
||||
"receivedDocuments": 1,
|
||||
"indexedDocuments": 1
|
||||
},
|
||||
"error": null,
|
||||
"duration": "[duration]",
|
||||
"enqueuedAt": "[date]",
|
||||
"startedAt": "[date]",
|
||||
"finishedAt": "[date]",
|
||||
"customMetadata": "test_meta"
|
||||
}
|
||||
"###);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user