mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-09-05 20:26:31 +00:00
Proxy all document tasks to the network when sharding is enabled
This commit is contained in:
@ -45,6 +45,7 @@ use crate::extractors::authentication::policies::*;
|
||||
use crate::extractors::authentication::GuardedData;
|
||||
use crate::extractors::payload::Payload;
|
||||
use crate::extractors::sequential_extractor::SeqHandler;
|
||||
use crate::routes::indexes::proxy::{proxy, Body};
|
||||
use crate::routes::indexes::search::fix_sort_query_parameters;
|
||||
use crate::routes::{
|
||||
get_task_id, is_dry_run, PaginationView, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT,
|
||||
@ -338,6 +339,7 @@ pub async fn delete_document(
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
let DocumentParam { index_uid, document_id } = path.into_inner();
|
||||
let index_uid = IndexUid::try_from(index_uid)?;
|
||||
let network = index_scheduler.network();
|
||||
|
||||
analytics.publish(
|
||||
DocumentsDeletionAggregator {
|
||||
@ -355,10 +357,16 @@ pub async fn delete_document(
|
||||
};
|
||||
let uid = get_task_id(&req, &opt)?;
|
||||
let dry_run = is_dry_run(&req, &opt)?;
|
||||
let task: SummarizedTaskView =
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
|
||||
.await??
|
||||
.into();
|
||||
let task = {
|
||||
let index_scheduler = index_scheduler.clone();
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await??
|
||||
};
|
||||
|
||||
if network.sharding && !dry_run {
|
||||
proxy(&index_scheduler, &index_uid, &req, network, Body::none(), &task).await?;
|
||||
}
|
||||
|
||||
let task: SummarizedTaskView = task.into();
|
||||
debug!("returns: {:?}", task);
|
||||
Ok(HttpResponse::Accepted().json(task))
|
||||
}
|
||||
@ -804,7 +812,6 @@ pub async fn replace_documents(
|
||||
let uid = get_task_id(&req, &opt)?;
|
||||
let dry_run = is_dry_run(&req, &opt)?;
|
||||
let task = document_addition(
|
||||
extract_mime_type(&req)?,
|
||||
index_scheduler,
|
||||
index_uid,
|
||||
params.primary_key,
|
||||
@ -814,8 +821,10 @@ pub async fn replace_documents(
|
||||
uid,
|
||||
dry_run,
|
||||
allow_index_creation,
|
||||
&req,
|
||||
)
|
||||
.await?;
|
||||
|
||||
debug!(returns = ?task, "Replace documents");
|
||||
|
||||
Ok(HttpResponse::Accepted().json(task))
|
||||
@ -905,7 +914,6 @@ pub async fn update_documents(
|
||||
let uid = get_task_id(&req, &opt)?;
|
||||
let dry_run = is_dry_run(&req, &opt)?;
|
||||
let task = document_addition(
|
||||
extract_mime_type(&req)?,
|
||||
index_scheduler,
|
||||
index_uid,
|
||||
params.primary_key,
|
||||
@ -915,6 +923,7 @@ pub async fn update_documents(
|
||||
uid,
|
||||
dry_run,
|
||||
allow_index_creation,
|
||||
&req,
|
||||
)
|
||||
.await?;
|
||||
debug!(returns = ?task, "Update documents");
|
||||
@ -924,7 +933,6 @@ pub async fn update_documents(
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn document_addition(
|
||||
mime_type: Option<Mime>,
|
||||
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_ADD }>, Data<IndexScheduler>>,
|
||||
index_uid: IndexUid,
|
||||
primary_key: Option<String>,
|
||||
@ -934,7 +942,11 @@ async fn document_addition(
|
||||
task_id: Option<TaskId>,
|
||||
dry_run: bool,
|
||||
allow_index_creation: bool,
|
||||
req: &HttpRequest,
|
||||
) -> Result<SummarizedTaskView, MeilisearchHttpError> {
|
||||
let mime_type = extract_mime_type(req)?;
|
||||
let network = index_scheduler.network();
|
||||
|
||||
let format = match (
|
||||
mime_type.as_ref().map(|m| (m.type_().as_str(), m.subtype().as_str())),
|
||||
csv_delimiter,
|
||||
@ -966,7 +978,7 @@ async fn document_addition(
|
||||
};
|
||||
|
||||
let (uuid, mut update_file) = index_scheduler.queue.create_update_file(dry_run)?;
|
||||
let documents_count = match format {
|
||||
let res = match format {
|
||||
PayloadType::Ndjson => {
|
||||
let (path, file) = update_file.into_parts();
|
||||
let file = match file {
|
||||
@ -981,19 +993,19 @@ async fn document_addition(
|
||||
None => None,
|
||||
};
|
||||
|
||||
let documents_count = tokio::task::spawn_blocking(move || {
|
||||
let res = tokio::task::spawn_blocking(move || {
|
||||
let documents_count = file.as_ref().map_or(Ok(0), |ntf| {
|
||||
read_ndjson(ntf.as_file()).map_err(MeilisearchHttpError::DocumentFormat)
|
||||
})?;
|
||||
|
||||
let update_file = file_store::File::from_parts(path, file);
|
||||
update_file.persist()?;
|
||||
let update_file = update_file.persist()?;
|
||||
|
||||
Ok(documents_count)
|
||||
Ok((documents_count, update_file))
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(documents_count)
|
||||
Ok(res)
|
||||
}
|
||||
PayloadType::Json | PayloadType::Csv { delimiter: _ } => {
|
||||
let temp_file = match tempfile() {
|
||||
@ -1012,16 +1024,16 @@ async fn document_addition(
|
||||
unreachable!("We already wrote the user content into the update file")
|
||||
}
|
||||
};
|
||||
// we NEED to persist the file here because we moved the `udpate_file` in another task.
|
||||
update_file.persist()?;
|
||||
Ok(documents_count)
|
||||
// we NEED to persist the file here because we moved the `update_file` in another task.
|
||||
let file = update_file.persist()?;
|
||||
Ok((documents_count, file))
|
||||
})
|
||||
.await
|
||||
}
|
||||
};
|
||||
|
||||
let documents_count = match documents_count {
|
||||
Ok(Ok(documents_count)) => documents_count,
|
||||
let (documents_count, file) = match res {
|
||||
Ok(Ok((documents_count, file))) => (documents_count, file),
|
||||
// in this case the file has not possibly be persisted.
|
||||
Ok(Err(e)) => return Err(e),
|
||||
Err(e) => {
|
||||
@ -1063,6 +1075,12 @@ async fn document_addition(
|
||||
}
|
||||
};
|
||||
|
||||
if network.sharding {
|
||||
if let Some(file) = file {
|
||||
proxy(&index_scheduler, &index_uid, req, network, Body::with_file(file), &task).await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(task.into())
|
||||
}
|
||||
|
||||
@ -1141,6 +1159,7 @@ pub async fn delete_documents_batch(
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
debug!(parameters = ?body, "Delete documents by batch");
|
||||
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
||||
let network = index_scheduler.network();
|
||||
|
||||
analytics.publish(
|
||||
DocumentsDeletionAggregator {
|
||||
@ -1161,10 +1180,16 @@ pub async fn delete_documents_batch(
|
||||
KindWithContent::DocumentDeletion { index_uid: index_uid.to_string(), documents_ids: ids };
|
||||
let uid = get_task_id(&req, &opt)?;
|
||||
let dry_run = is_dry_run(&req, &opt)?;
|
||||
let task: SummarizedTaskView =
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
|
||||
.await??
|
||||
.into();
|
||||
let task = {
|
||||
let index_scheduler = index_scheduler.clone();
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await??
|
||||
};
|
||||
|
||||
if network.sharding && !dry_run {
|
||||
proxy(&index_scheduler, &index_uid, &req, network, Body::Inline(body), &task).await?;
|
||||
}
|
||||
|
||||
let task: SummarizedTaskView = task.into();
|
||||
|
||||
debug!(returns = ?task, "Delete documents by batch");
|
||||
Ok(HttpResponse::Accepted().json(task))
|
||||
@ -1219,7 +1244,8 @@ pub async fn delete_documents_by_filter(
|
||||
debug!(parameters = ?body, "Delete documents by filter");
|
||||
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
||||
let index_uid = index_uid.into_inner();
|
||||
let filter = body.into_inner().filter;
|
||||
let filter = body.into_inner();
|
||||
let network = index_scheduler.network();
|
||||
|
||||
analytics.publish(
|
||||
DocumentsDeletionAggregator {
|
||||
@ -1232,17 +1258,30 @@ pub async fn delete_documents_by_filter(
|
||||
);
|
||||
|
||||
// we ensure the filter is well formed before enqueuing it
|
||||
crate::search::parse_filter(&filter, Code::InvalidDocumentFilter, index_scheduler.features())?
|
||||
.ok_or(MeilisearchHttpError::EmptyFilter)?;
|
||||
crate::search::parse_filter(
|
||||
&filter.filter,
|
||||
Code::InvalidDocumentFilter,
|
||||
index_scheduler.features(),
|
||||
)?
|
||||
.ok_or(MeilisearchHttpError::EmptyFilter)?;
|
||||
|
||||
let task = KindWithContent::DocumentDeletionByFilter { index_uid, filter_expr: filter };
|
||||
let task = KindWithContent::DocumentDeletionByFilter {
|
||||
index_uid: index_uid.clone(),
|
||||
filter_expr: filter.filter.clone(),
|
||||
};
|
||||
|
||||
let uid = get_task_id(&req, &opt)?;
|
||||
let dry_run = is_dry_run(&req, &opt)?;
|
||||
let task: SummarizedTaskView =
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
|
||||
.await??
|
||||
.into();
|
||||
let task = {
|
||||
let index_scheduler = index_scheduler.clone();
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await??
|
||||
};
|
||||
|
||||
if network.sharding && !dry_run {
|
||||
proxy(&index_scheduler, &index_uid, &req, network, Body::Inline(filter), &task).await?;
|
||||
}
|
||||
|
||||
let task: SummarizedTaskView = task.into();
|
||||
|
||||
debug!(returns = ?task, "Delete documents by filter");
|
||||
Ok(HttpResponse::Accepted().json(task))
|
||||
@ -1336,6 +1375,8 @@ pub async fn edit_documents_by_function(
|
||||
.features()
|
||||
.check_edit_documents_by_function("Using the documents edit route")?;
|
||||
|
||||
let network = index_scheduler.network();
|
||||
|
||||
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
||||
let index_uid = index_uid.into_inner();
|
||||
let params = params.into_inner();
|
||||
@ -1349,13 +1390,12 @@ pub async fn edit_documents_by_function(
|
||||
&req,
|
||||
);
|
||||
|
||||
let DocumentEditionByFunction { filter, context, function } = params;
|
||||
let engine = milli::rhai::Engine::new();
|
||||
if let Err(e) = engine.compile(&function) {
|
||||
if let Err(e) = engine.compile(¶ms.function) {
|
||||
return Err(ResponseError::from_msg(e.to_string(), Code::BadRequest));
|
||||
}
|
||||
|
||||
if let Some(ref filter) = filter {
|
||||
if let Some(ref filter) = params.filter {
|
||||
// we ensure the filter is well formed before enqueuing it
|
||||
crate::search::parse_filter(
|
||||
filter,
|
||||
@ -1365,9 +1405,9 @@ pub async fn edit_documents_by_function(
|
||||
.ok_or(MeilisearchHttpError::EmptyFilter)?;
|
||||
}
|
||||
let task = KindWithContent::DocumentEdition {
|
||||
index_uid,
|
||||
filter_expr: filter,
|
||||
context: match context {
|
||||
index_uid: index_uid.clone(),
|
||||
filter_expr: params.filter.clone(),
|
||||
context: match params.context.clone() {
|
||||
Some(Value::Object(m)) => Some(m),
|
||||
None => None,
|
||||
_ => {
|
||||
@ -1377,15 +1417,21 @@ pub async fn edit_documents_by_function(
|
||||
))
|
||||
}
|
||||
},
|
||||
function,
|
||||
function: params.function.clone(),
|
||||
};
|
||||
|
||||
let uid = get_task_id(&req, &opt)?;
|
||||
let dry_run = is_dry_run(&req, &opt)?;
|
||||
let task: SummarizedTaskView =
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
|
||||
.await??
|
||||
.into();
|
||||
let task = {
|
||||
let index_scheduler = index_scheduler.clone();
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await??
|
||||
};
|
||||
|
||||
if network.sharding && !dry_run {
|
||||
proxy(&index_scheduler, &index_uid, &req, network, Body::Inline(params), &task).await?;
|
||||
}
|
||||
|
||||
let task: SummarizedTaskView = task.into();
|
||||
|
||||
debug!(returns = ?task, "Edit documents by function");
|
||||
Ok(HttpResponse::Accepted().json(task))
|
||||
@ -1428,6 +1474,8 @@ pub async fn clear_all_documents(
|
||||
analytics: web::Data<Analytics>,
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
||||
let network = index_scheduler.network();
|
||||
|
||||
analytics.publish(
|
||||
DocumentsDeletionAggregator {
|
||||
clear_all: true,
|
||||
@ -1441,10 +1489,18 @@ pub async fn clear_all_documents(
|
||||
let task = KindWithContent::DocumentClear { index_uid: index_uid.to_string() };
|
||||
let uid = get_task_id(&req, &opt)?;
|
||||
let dry_run = is_dry_run(&req, &opt)?;
|
||||
let task: SummarizedTaskView =
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
|
||||
.await??
|
||||
.into();
|
||||
|
||||
let task = {
|
||||
let index_scheduler = index_scheduler.clone();
|
||||
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await??
|
||||
};
|
||||
|
||||
if network.sharding && !dry_run {
|
||||
proxy(&index_scheduler, &index_uid, &req, network, Body::none(), &task).await?;
|
||||
}
|
||||
|
||||
let task: SummarizedTaskView = task.into();
|
||||
|
||||
debug!(returns = ?task, "Delete all documents");
|
||||
Ok(HttpResponse::Accepted().json(task))
|
||||
|
Reference in New Issue
Block a user