Proxy all document tasks to the network when sharding is enabled

This commit is contained in:
Louis Dureuil
2025-07-29 14:43:17 +02:00
parent cda5995922
commit be065c4c51

View File

@ -45,6 +45,7 @@ use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData;
use crate::extractors::payload::Payload;
use crate::extractors::sequential_extractor::SeqHandler;
use crate::routes::indexes::proxy::{proxy, Body};
use crate::routes::indexes::search::fix_sort_query_parameters;
use crate::routes::{
get_task_id, is_dry_run, PaginationView, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT,
@ -338,6 +339,7 @@ pub async fn delete_document(
) -> Result<HttpResponse, ResponseError> {
let DocumentParam { index_uid, document_id } = path.into_inner();
let index_uid = IndexUid::try_from(index_uid)?;
let network = index_scheduler.network();
analytics.publish(
DocumentsDeletionAggregator {
@ -355,10 +357,16 @@ pub async fn delete_document(
};
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
let task = {
let index_scheduler = index_scheduler.clone();
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await??
};
if network.sharding && !dry_run {
proxy(&index_scheduler, &index_uid, &req, network, Body::none(), &task).await?;
}
let task: SummarizedTaskView = task.into();
debug!("returns: {:?}", task);
Ok(HttpResponse::Accepted().json(task))
}
@ -804,7 +812,6 @@ pub async fn replace_documents(
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task = document_addition(
extract_mime_type(&req)?,
index_scheduler,
index_uid,
params.primary_key,
@ -814,8 +821,10 @@ pub async fn replace_documents(
uid,
dry_run,
allow_index_creation,
&req,
)
.await?;
debug!(returns = ?task, "Replace documents");
Ok(HttpResponse::Accepted().json(task))
@ -905,7 +914,6 @@ pub async fn update_documents(
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task = document_addition(
extract_mime_type(&req)?,
index_scheduler,
index_uid,
params.primary_key,
@ -915,6 +923,7 @@ pub async fn update_documents(
uid,
dry_run,
allow_index_creation,
&req,
)
.await?;
debug!(returns = ?task, "Update documents");
@ -924,7 +933,6 @@ pub async fn update_documents(
#[allow(clippy::too_many_arguments)]
async fn document_addition(
mime_type: Option<Mime>,
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_ADD }>, Data<IndexScheduler>>,
index_uid: IndexUid,
primary_key: Option<String>,
@ -934,7 +942,11 @@ async fn document_addition(
task_id: Option<TaskId>,
dry_run: bool,
allow_index_creation: bool,
req: &HttpRequest,
) -> Result<SummarizedTaskView, MeilisearchHttpError> {
let mime_type = extract_mime_type(req)?;
let network = index_scheduler.network();
let format = match (
mime_type.as_ref().map(|m| (m.type_().as_str(), m.subtype().as_str())),
csv_delimiter,
@ -966,7 +978,7 @@ async fn document_addition(
};
let (uuid, mut update_file) = index_scheduler.queue.create_update_file(dry_run)?;
let documents_count = match format {
let res = match format {
PayloadType::Ndjson => {
let (path, file) = update_file.into_parts();
let file = match file {
@ -981,19 +993,19 @@ async fn document_addition(
None => None,
};
let documents_count = tokio::task::spawn_blocking(move || {
let res = tokio::task::spawn_blocking(move || {
let documents_count = file.as_ref().map_or(Ok(0), |ntf| {
read_ndjson(ntf.as_file()).map_err(MeilisearchHttpError::DocumentFormat)
})?;
let update_file = file_store::File::from_parts(path, file);
update_file.persist()?;
let update_file = update_file.persist()?;
Ok(documents_count)
Ok((documents_count, update_file))
})
.await?;
Ok(documents_count)
Ok(res)
}
PayloadType::Json | PayloadType::Csv { delimiter: _ } => {
let temp_file = match tempfile() {
@ -1012,16 +1024,16 @@ async fn document_addition(
unreachable!("We already wrote the user content into the update file")
}
};
// we NEED to persist the file here because we moved the `udpate_file` in another task.
update_file.persist()?;
Ok(documents_count)
// we NEED to persist the file here because we moved the `update_file` in another task.
let file = update_file.persist()?;
Ok((documents_count, file))
})
.await
}
};
let documents_count = match documents_count {
Ok(Ok(documents_count)) => documents_count,
let (documents_count, file) = match res {
Ok(Ok((documents_count, file))) => (documents_count, file),
// in this case the file has not possibly be persisted.
Ok(Err(e)) => return Err(e),
Err(e) => {
@ -1063,6 +1075,12 @@ async fn document_addition(
}
};
if network.sharding {
if let Some(file) = file {
proxy(&index_scheduler, &index_uid, req, network, Body::with_file(file), &task).await?;
}
}
Ok(task.into())
}
@ -1141,6 +1159,7 @@ pub async fn delete_documents_batch(
) -> Result<HttpResponse, ResponseError> {
debug!(parameters = ?body, "Delete documents by batch");
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
let network = index_scheduler.network();
analytics.publish(
DocumentsDeletionAggregator {
@ -1161,10 +1180,16 @@ pub async fn delete_documents_batch(
KindWithContent::DocumentDeletion { index_uid: index_uid.to_string(), documents_ids: ids };
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
let task = {
let index_scheduler = index_scheduler.clone();
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await??
};
if network.sharding && !dry_run {
proxy(&index_scheduler, &index_uid, &req, network, Body::Inline(body), &task).await?;
}
let task: SummarizedTaskView = task.into();
debug!(returns = ?task, "Delete documents by batch");
Ok(HttpResponse::Accepted().json(task))
@ -1219,7 +1244,8 @@ pub async fn delete_documents_by_filter(
debug!(parameters = ?body, "Delete documents by filter");
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
let index_uid = index_uid.into_inner();
let filter = body.into_inner().filter;
let filter = body.into_inner();
let network = index_scheduler.network();
analytics.publish(
DocumentsDeletionAggregator {
@ -1232,17 +1258,30 @@ pub async fn delete_documents_by_filter(
);
// we ensure the filter is well formed before enqueuing it
crate::search::parse_filter(&filter, Code::InvalidDocumentFilter, index_scheduler.features())?
.ok_or(MeilisearchHttpError::EmptyFilter)?;
crate::search::parse_filter(
&filter.filter,
Code::InvalidDocumentFilter,
index_scheduler.features(),
)?
.ok_or(MeilisearchHttpError::EmptyFilter)?;
let task = KindWithContent::DocumentDeletionByFilter { index_uid, filter_expr: filter };
let task = KindWithContent::DocumentDeletionByFilter {
index_uid: index_uid.clone(),
filter_expr: filter.filter.clone(),
};
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
let task = {
let index_scheduler = index_scheduler.clone();
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await??
};
if network.sharding && !dry_run {
proxy(&index_scheduler, &index_uid, &req, network, Body::Inline(filter), &task).await?;
}
let task: SummarizedTaskView = task.into();
debug!(returns = ?task, "Delete documents by filter");
Ok(HttpResponse::Accepted().json(task))
@ -1336,6 +1375,8 @@ pub async fn edit_documents_by_function(
.features()
.check_edit_documents_by_function("Using the documents edit route")?;
let network = index_scheduler.network();
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
let index_uid = index_uid.into_inner();
let params = params.into_inner();
@ -1349,13 +1390,12 @@ pub async fn edit_documents_by_function(
&req,
);
let DocumentEditionByFunction { filter, context, function } = params;
let engine = milli::rhai::Engine::new();
if let Err(e) = engine.compile(&function) {
if let Err(e) = engine.compile(&params.function) {
return Err(ResponseError::from_msg(e.to_string(), Code::BadRequest));
}
if let Some(ref filter) = filter {
if let Some(ref filter) = params.filter {
// we ensure the filter is well formed before enqueuing it
crate::search::parse_filter(
filter,
@ -1365,9 +1405,9 @@ pub async fn edit_documents_by_function(
.ok_or(MeilisearchHttpError::EmptyFilter)?;
}
let task = KindWithContent::DocumentEdition {
index_uid,
filter_expr: filter,
context: match context {
index_uid: index_uid.clone(),
filter_expr: params.filter.clone(),
context: match params.context.clone() {
Some(Value::Object(m)) => Some(m),
None => None,
_ => {
@ -1377,15 +1417,21 @@ pub async fn edit_documents_by_function(
))
}
},
function,
function: params.function.clone(),
};
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
let task = {
let index_scheduler = index_scheduler.clone();
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await??
};
if network.sharding && !dry_run {
proxy(&index_scheduler, &index_uid, &req, network, Body::Inline(params), &task).await?;
}
let task: SummarizedTaskView = task.into();
debug!(returns = ?task, "Edit documents by function");
Ok(HttpResponse::Accepted().json(task))
@ -1428,6 +1474,8 @@ pub async fn clear_all_documents(
analytics: web::Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
let network = index_scheduler.network();
analytics.publish(
DocumentsDeletionAggregator {
clear_all: true,
@ -1441,10 +1489,18 @@ pub async fn clear_all_documents(
let task = KindWithContent::DocumentClear { index_uid: index_uid.to_string() };
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
let task = {
let index_scheduler = index_scheduler.clone();
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await??
};
if network.sharding && !dry_run {
proxy(&index_scheduler, &index_uid, &req, network, Body::none(), &task).await?;
}
let task: SummarizedTaskView = task.into();
debug!(returns = ?task, "Delete all documents");
Ok(HttpResponse::Accepted().json(task))