mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-11-28 17:00:32 +00:00
Allow to attach customMetadatain the document addition or update tasks
This commit is contained in:
@@ -96,6 +96,8 @@ pub struct TaskDump {
|
|||||||
pub finished_at: Option<OffsetDateTime>,
|
pub finished_at: Option<OffsetDateTime>,
|
||||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
pub network: Option<TaskNetwork>,
|
pub network: Option<TaskNetwork>,
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
pub custom_metadata: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
// A `Kind` specific version made for the dump. If modified you may break the dump.
|
// A `Kind` specific version made for the dump. If modified you may break the dump.
|
||||||
@@ -178,6 +180,7 @@ impl From<Task> for TaskDump {
|
|||||||
started_at: task.started_at,
|
started_at: task.started_at,
|
||||||
finished_at: task.finished_at,
|
finished_at: task.finished_at,
|
||||||
network: task.network,
|
network: task.network,
|
||||||
|
custom_metadata: task.custom_metadata,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -396,6 +399,7 @@ pub(crate) mod test {
|
|||||||
started_at: Some(datetime!(2022-11-20 0:00 UTC)),
|
started_at: Some(datetime!(2022-11-20 0:00 UTC)),
|
||||||
finished_at: Some(datetime!(2022-11-21 0:00 UTC)),
|
finished_at: Some(datetime!(2022-11-21 0:00 UTC)),
|
||||||
network: None,
|
network: None,
|
||||||
|
custom_metadata: None,
|
||||||
},
|
},
|
||||||
None,
|
None,
|
||||||
),
|
),
|
||||||
@@ -421,6 +425,7 @@ pub(crate) mod test {
|
|||||||
started_at: None,
|
started_at: None,
|
||||||
finished_at: None,
|
finished_at: None,
|
||||||
network: None,
|
network: None,
|
||||||
|
custom_metadata: None,
|
||||||
},
|
},
|
||||||
Some(vec![
|
Some(vec![
|
||||||
json!({ "id": 4, "race": "leonberg" }).as_object().unwrap().clone(),
|
json!({ "id": 4, "race": "leonberg" }).as_object().unwrap().clone(),
|
||||||
@@ -441,6 +446,7 @@ pub(crate) mod test {
|
|||||||
started_at: None,
|
started_at: None,
|
||||||
finished_at: None,
|
finished_at: None,
|
||||||
network: None,
|
network: None,
|
||||||
|
custom_metadata: None,
|
||||||
},
|
},
|
||||||
None,
|
None,
|
||||||
),
|
),
|
||||||
|
|||||||
@@ -164,6 +164,7 @@ impl CompatV5ToV6 {
|
|||||||
started_at: task_view.started_at,
|
started_at: task_view.started_at,
|
||||||
finished_at: task_view.finished_at,
|
finished_at: task_view.finished_at,
|
||||||
network: None,
|
network: None,
|
||||||
|
custom_metadata: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
(task, content_file)
|
(task, content_file)
|
||||||
|
|||||||
@@ -150,6 +150,7 @@ impl<'a> Dump<'a> {
|
|||||||
details: task.details,
|
details: task.details,
|
||||||
status: task.status,
|
status: task.status,
|
||||||
network: task.network,
|
network: task.network,
|
||||||
|
custom_metadata: task.custom_metadata,
|
||||||
kind: match task.kind {
|
kind: match task.kind {
|
||||||
KindDump::DocumentImport {
|
KindDump::DocumentImport {
|
||||||
primary_key,
|
primary_key,
|
||||||
|
|||||||
@@ -231,6 +231,7 @@ pub fn snapshot_task(task: &Task) -> String {
|
|||||||
status,
|
status,
|
||||||
kind,
|
kind,
|
||||||
network,
|
network,
|
||||||
|
custom_metadata,
|
||||||
} = task;
|
} = task;
|
||||||
snap.push('{');
|
snap.push('{');
|
||||||
snap.push_str(&format!("uid: {uid}, "));
|
snap.push_str(&format!("uid: {uid}, "));
|
||||||
@@ -251,6 +252,9 @@ pub fn snapshot_task(task: &Task) -> String {
|
|||||||
if let Some(network) = network {
|
if let Some(network) = network {
|
||||||
snap.push_str(&format!("network: {network:?}, "))
|
snap.push_str(&format!("network: {network:?}, "))
|
||||||
}
|
}
|
||||||
|
if let Some(custom_metadata) = custom_metadata {
|
||||||
|
snap.push_str(&format!("custom_metadata: {custom_metadata:?}"))
|
||||||
|
}
|
||||||
|
|
||||||
snap.push('}');
|
snap.push('}');
|
||||||
snap
|
snap
|
||||||
|
|||||||
@@ -726,6 +726,19 @@ impl IndexScheduler {
|
|||||||
kind: KindWithContent,
|
kind: KindWithContent,
|
||||||
task_id: Option<TaskId>,
|
task_id: Option<TaskId>,
|
||||||
dry_run: bool,
|
dry_run: bool,
|
||||||
|
) -> Result<Task> {
|
||||||
|
self.register_with_custom_metadata(kind, task_id, None, dry_run)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Register a new task in the scheduler, with metadata.
|
||||||
|
///
|
||||||
|
/// If it fails and data was associated with the task, it tries to delete the associated data.
|
||||||
|
pub fn register_with_custom_metadata(
|
||||||
|
&self,
|
||||||
|
kind: KindWithContent,
|
||||||
|
task_id: Option<TaskId>,
|
||||||
|
custom_metadata: Option<String>,
|
||||||
|
dry_run: bool,
|
||||||
) -> Result<Task> {
|
) -> Result<Task> {
|
||||||
// if the task doesn't delete or cancel anything and 40% of the task queue is full, we must refuse to enqueue the incoming task
|
// if the task doesn't delete or cancel anything and 40% of the task queue is full, we must refuse to enqueue the incoming task
|
||||||
if !matches!(&kind, KindWithContent::TaskDeletion { tasks, .. } | KindWithContent::TaskCancelation { tasks, .. } if !tasks.is_empty())
|
if !matches!(&kind, KindWithContent::TaskDeletion { tasks, .. } | KindWithContent::TaskCancelation { tasks, .. } if !tasks.is_empty())
|
||||||
@@ -736,7 +749,7 @@ impl IndexScheduler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let mut wtxn = self.env.write_txn()?;
|
let mut wtxn = self.env.write_txn()?;
|
||||||
let task = self.queue.register(&mut wtxn, &kind, task_id, dry_run)?;
|
let task = self.queue.register(&mut wtxn, &kind, task_id, custom_metadata, dry_run)?;
|
||||||
|
|
||||||
// If the registered task is a task cancelation
|
// If the registered task is a task cancelation
|
||||||
// we inform the processing tasks to stop (if necessary).
|
// we inform the processing tasks to stop (if necessary).
|
||||||
|
|||||||
@@ -257,6 +257,7 @@ impl Queue {
|
|||||||
wtxn: &mut RwTxn,
|
wtxn: &mut RwTxn,
|
||||||
kind: &KindWithContent,
|
kind: &KindWithContent,
|
||||||
task_id: Option<TaskId>,
|
task_id: Option<TaskId>,
|
||||||
|
custom_metadata: Option<String>,
|
||||||
dry_run: bool,
|
dry_run: bool,
|
||||||
) -> Result<Task> {
|
) -> Result<Task> {
|
||||||
let next_task_id = self.tasks.next_task_id(wtxn)?;
|
let next_task_id = self.tasks.next_task_id(wtxn)?;
|
||||||
@@ -280,6 +281,7 @@ impl Queue {
|
|||||||
status: Status::Enqueued,
|
status: Status::Enqueued,
|
||||||
kind: kind.clone(),
|
kind: kind.clone(),
|
||||||
network: None,
|
network: None,
|
||||||
|
custom_metadata,
|
||||||
};
|
};
|
||||||
// For deletion and cancelation tasks, we want to make extra sure that they
|
// For deletion and cancelation tasks, we want to make extra sure that they
|
||||||
// don't attempt to delete/cancel tasks that are newer than themselves.
|
// don't attempt to delete/cancel tasks that are newer than themselves.
|
||||||
@@ -344,6 +346,7 @@ impl Queue {
|
|||||||
tasks: to_delete,
|
tasks: to_delete,
|
||||||
},
|
},
|
||||||
None,
|
None,
|
||||||
|
None,
|
||||||
false,
|
false,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
|||||||
@@ -98,6 +98,7 @@ pub fn upgrade_index_scheduler(
|
|||||||
status: Status::Enqueued,
|
status: Status::Enqueued,
|
||||||
kind: KindWithContent::UpgradeDatabase { from },
|
kind: KindWithContent::UpgradeDatabase { from },
|
||||||
network: None,
|
network: None,
|
||||||
|
custom_metadata: None,
|
||||||
},
|
},
|
||||||
)?;
|
)?;
|
||||||
wtxn.commit()?;
|
wtxn.commit()?;
|
||||||
|
|||||||
@@ -379,6 +379,7 @@ impl crate::IndexScheduler {
|
|||||||
status,
|
status,
|
||||||
kind,
|
kind,
|
||||||
network: _,
|
network: _,
|
||||||
|
custom_metadata: _,
|
||||||
} = task;
|
} = task;
|
||||||
assert_eq!(uid, task.uid);
|
assert_eq!(uid, task.uid);
|
||||||
if task.status != Status::Enqueued {
|
if task.status != Status::Enqueued {
|
||||||
|
|||||||
@@ -254,6 +254,7 @@ InvalidSearchHybridQuery , InvalidRequest , BAD_REQU
|
|||||||
InvalidIndexLimit , InvalidRequest , BAD_REQUEST ;
|
InvalidIndexLimit , InvalidRequest , BAD_REQUEST ;
|
||||||
InvalidIndexOffset , InvalidRequest , BAD_REQUEST ;
|
InvalidIndexOffset , InvalidRequest , BAD_REQUEST ;
|
||||||
InvalidIndexPrimaryKey , InvalidRequest , BAD_REQUEST ;
|
InvalidIndexPrimaryKey , InvalidRequest , BAD_REQUEST ;
|
||||||
|
InvalidIndexCustomMetadata , InvalidRequest , BAD_REQUEST ;
|
||||||
InvalidIndexUid , InvalidRequest , BAD_REQUEST ;
|
InvalidIndexUid , InvalidRequest , BAD_REQUEST ;
|
||||||
InvalidMultiSearchFacets , InvalidRequest , BAD_REQUEST ;
|
InvalidMultiSearchFacets , InvalidRequest , BAD_REQUEST ;
|
||||||
InvalidMultiSearchFacetsByIndex , InvalidRequest , BAD_REQUEST ;
|
InvalidMultiSearchFacetsByIndex , InvalidRequest , BAD_REQUEST ;
|
||||||
|
|||||||
@@ -55,6 +55,9 @@ pub struct TaskView {
|
|||||||
|
|
||||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
pub network: Option<TaskNetwork>,
|
pub network: Option<TaskNetwork>,
|
||||||
|
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
pub custom_metadata: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TaskView {
|
impl TaskView {
|
||||||
@@ -73,6 +76,7 @@ impl TaskView {
|
|||||||
started_at: task.started_at,
|
started_at: task.started_at,
|
||||||
finished_at: task.finished_at,
|
finished_at: task.finished_at,
|
||||||
network: task.network.clone(),
|
network: task.network.clone(),
|
||||||
|
custom_metadata: task.custom_metadata.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -45,6 +45,9 @@ pub struct Task {
|
|||||||
|
|
||||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
pub network: Option<TaskNetwork>,
|
pub network: Option<TaskNetwork>,
|
||||||
|
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
pub custom_metadata: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Task {
|
impl Task {
|
||||||
|
|||||||
@@ -678,6 +678,10 @@ pub struct UpdateDocumentsQuery {
|
|||||||
#[param(value_type = char, default = ",", example = ";")]
|
#[param(value_type = char, default = ",", example = ";")]
|
||||||
#[deserr(default, try_from(char) = from_char_csv_delimiter -> DeserrQueryParamError<InvalidDocumentCsvDelimiter>, error = DeserrQueryParamError<InvalidDocumentCsvDelimiter>)]
|
#[deserr(default, try_from(char) = from_char_csv_delimiter -> DeserrQueryParamError<InvalidDocumentCsvDelimiter>, error = DeserrQueryParamError<InvalidDocumentCsvDelimiter>)]
|
||||||
pub csv_delimiter: Option<u8>,
|
pub csv_delimiter: Option<u8>,
|
||||||
|
|
||||||
|
#[param(example = "custom")]
|
||||||
|
#[deserr(default, error = DeserrQueryParamError<InvalidIndexCustomMetadata>)]
|
||||||
|
pub custom_metadata: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn from_char_csv_delimiter(
|
fn from_char_csv_delimiter(
|
||||||
@@ -819,6 +823,7 @@ pub async fn replace_documents(
|
|||||||
body,
|
body,
|
||||||
IndexDocumentsMethod::ReplaceDocuments,
|
IndexDocumentsMethod::ReplaceDocuments,
|
||||||
uid,
|
uid,
|
||||||
|
params.custom_metadata,
|
||||||
dry_run,
|
dry_run,
|
||||||
allow_index_creation,
|
allow_index_creation,
|
||||||
&req,
|
&req,
|
||||||
@@ -921,6 +926,7 @@ pub async fn update_documents(
|
|||||||
body,
|
body,
|
||||||
IndexDocumentsMethod::UpdateDocuments,
|
IndexDocumentsMethod::UpdateDocuments,
|
||||||
uid,
|
uid,
|
||||||
|
params.custom_metadata,
|
||||||
dry_run,
|
dry_run,
|
||||||
allow_index_creation,
|
allow_index_creation,
|
||||||
&req,
|
&req,
|
||||||
@@ -940,6 +946,7 @@ async fn document_addition(
|
|||||||
body: Payload,
|
body: Payload,
|
||||||
method: IndexDocumentsMethod,
|
method: IndexDocumentsMethod,
|
||||||
task_id: Option<TaskId>,
|
task_id: Option<TaskId>,
|
||||||
|
custom_metadata: Option<String>,
|
||||||
dry_run: bool,
|
dry_run: bool,
|
||||||
allow_index_creation: bool,
|
allow_index_creation: bool,
|
||||||
req: &HttpRequest,
|
req: &HttpRequest,
|
||||||
@@ -1065,8 +1072,10 @@ async fn document_addition(
|
|||||||
};
|
};
|
||||||
|
|
||||||
let scheduler = index_scheduler.clone();
|
let scheduler = index_scheduler.clone();
|
||||||
let task = match tokio::task::spawn_blocking(move || scheduler.register(task, task_id, dry_run))
|
let task = match tokio::task::spawn_blocking(move || {
|
||||||
.await?
|
scheduler.register_with_custom_metadata(task, task_id, custom_metadata, dry_run)
|
||||||
|
})
|
||||||
|
.await?
|
||||||
{
|
{
|
||||||
Ok(task) => task,
|
Ok(task) => task,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
|||||||
Reference in New Issue
Block a user