diff --git a/crates/meilisearch/src/routes/indexes/documents.rs b/crates/meilisearch/src/routes/indexes/documents.rs index 5ced4603e..90abddc86 100644 --- a/crates/meilisearch/src/routes/indexes/documents.rs +++ b/crates/meilisearch/src/routes/indexes/documents.rs @@ -45,6 +45,7 @@ use crate::extractors::authentication::policies::*; use crate::extractors::authentication::GuardedData; use crate::extractors::payload::Payload; use crate::extractors::sequential_extractor::SeqHandler; +use crate::routes::indexes::proxy::{proxy, Body}; use crate::routes::indexes::search::fix_sort_query_parameters; use crate::routes::{ get_task_id, is_dry_run, PaginationView, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT, @@ -338,6 +339,7 @@ pub async fn delete_document( ) -> Result { let DocumentParam { index_uid, document_id } = path.into_inner(); let index_uid = IndexUid::try_from(index_uid)?; + let network = index_scheduler.network(); analytics.publish( DocumentsDeletionAggregator { @@ -355,10 +357,16 @@ pub async fn delete_document( }; let uid = get_task_id(&req, &opt)?; let dry_run = is_dry_run(&req, &opt)?; - let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) - .await?? - .into(); + let task = { + let index_scheduler = index_scheduler.clone(); + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await?? + }; + + if network.sharding && !dry_run { + proxy(&index_scheduler, &index_uid, &req, network, Body::none(), &task).await?; + } + + let task: SummarizedTaskView = task.into(); debug!("returns: {:?}", task); Ok(HttpResponse::Accepted().json(task)) } @@ -804,7 +812,6 @@ pub async fn replace_documents( let uid = get_task_id(&req, &opt)?; let dry_run = is_dry_run(&req, &opt)?; let task = document_addition( - extract_mime_type(&req)?, index_scheduler, index_uid, params.primary_key, @@ -814,8 +821,10 @@ pub async fn replace_documents( uid, dry_run, allow_index_creation, + &req, ) .await?; + debug!(returns = ?task, "Replace documents"); Ok(HttpResponse::Accepted().json(task)) @@ -905,7 +914,6 @@ pub async fn update_documents( let uid = get_task_id(&req, &opt)?; let dry_run = is_dry_run(&req, &opt)?; let task = document_addition( - extract_mime_type(&req)?, index_scheduler, index_uid, params.primary_key, @@ -915,6 +923,7 @@ pub async fn update_documents( uid, dry_run, allow_index_creation, + &req, ) .await?; debug!(returns = ?task, "Update documents"); @@ -924,7 +933,6 @@ pub async fn update_documents( #[allow(clippy::too_many_arguments)] async fn document_addition( - mime_type: Option, index_scheduler: GuardedData, Data>, index_uid: IndexUid, primary_key: Option, @@ -934,7 +942,11 @@ async fn document_addition( task_id: Option, dry_run: bool, allow_index_creation: bool, + req: &HttpRequest, ) -> Result { + let mime_type = extract_mime_type(req)?; + let network = index_scheduler.network(); + let format = match ( mime_type.as_ref().map(|m| (m.type_().as_str(), m.subtype().as_str())), csv_delimiter, @@ -966,7 +978,7 @@ async fn document_addition( }; let (uuid, mut update_file) = index_scheduler.queue.create_update_file(dry_run)?; - let documents_count = match format { + let res = match format { PayloadType::Ndjson => { let (path, file) = update_file.into_parts(); let file = match file { @@ -981,19 +993,19 @@ async fn document_addition( None => None, }; - let documents_count = tokio::task::spawn_blocking(move || { + let res = tokio::task::spawn_blocking(move || { let documents_count = file.as_ref().map_or(Ok(0), |ntf| { read_ndjson(ntf.as_file()).map_err(MeilisearchHttpError::DocumentFormat) })?; let update_file = file_store::File::from_parts(path, file); - update_file.persist()?; + let update_file = update_file.persist()?; - Ok(documents_count) + Ok((documents_count, update_file)) }) .await?; - Ok(documents_count) + Ok(res) } PayloadType::Json | PayloadType::Csv { delimiter: _ } => { let temp_file = match tempfile() { @@ -1012,16 +1024,16 @@ async fn document_addition( unreachable!("We already wrote the user content into the update file") } }; - // we NEED to persist the file here because we moved the `udpate_file` in another task. - update_file.persist()?; - Ok(documents_count) + // we NEED to persist the file here because we moved the `update_file` in another task. + let file = update_file.persist()?; + Ok((documents_count, file)) }) .await } }; - let documents_count = match documents_count { - Ok(Ok(documents_count)) => documents_count, + let (documents_count, file) = match res { + Ok(Ok((documents_count, file))) => (documents_count, file), // in this case the file has not possibly be persisted. Ok(Err(e)) => return Err(e), Err(e) => { @@ -1063,6 +1075,12 @@ async fn document_addition( } }; + if network.sharding { + if let Some(file) = file { + proxy(&index_scheduler, &index_uid, req, network, Body::with_file(file), &task).await?; + } + } + Ok(task.into()) } @@ -1141,6 +1159,7 @@ pub async fn delete_documents_batch( ) -> Result { debug!(parameters = ?body, "Delete documents by batch"); let index_uid = IndexUid::try_from(index_uid.into_inner())?; + let network = index_scheduler.network(); analytics.publish( DocumentsDeletionAggregator { @@ -1161,10 +1180,16 @@ pub async fn delete_documents_batch( KindWithContent::DocumentDeletion { index_uid: index_uid.to_string(), documents_ids: ids }; let uid = get_task_id(&req, &opt)?; let dry_run = is_dry_run(&req, &opt)?; - let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) - .await?? - .into(); + let task = { + let index_scheduler = index_scheduler.clone(); + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await?? + }; + + if network.sharding && !dry_run { + proxy(&index_scheduler, &index_uid, &req, network, Body::Inline(body), &task).await?; + } + + let task: SummarizedTaskView = task.into(); debug!(returns = ?task, "Delete documents by batch"); Ok(HttpResponse::Accepted().json(task)) @@ -1219,7 +1244,8 @@ pub async fn delete_documents_by_filter( debug!(parameters = ?body, "Delete documents by filter"); let index_uid = IndexUid::try_from(index_uid.into_inner())?; let index_uid = index_uid.into_inner(); - let filter = body.into_inner().filter; + let filter = body.into_inner(); + let network = index_scheduler.network(); analytics.publish( DocumentsDeletionAggregator { @@ -1232,17 +1258,30 @@ pub async fn delete_documents_by_filter( ); // we ensure the filter is well formed before enqueuing it - crate::search::parse_filter(&filter, Code::InvalidDocumentFilter, index_scheduler.features())? - .ok_or(MeilisearchHttpError::EmptyFilter)?; + crate::search::parse_filter( + &filter.filter, + Code::InvalidDocumentFilter, + index_scheduler.features(), + )? + .ok_or(MeilisearchHttpError::EmptyFilter)?; - let task = KindWithContent::DocumentDeletionByFilter { index_uid, filter_expr: filter }; + let task = KindWithContent::DocumentDeletionByFilter { + index_uid: index_uid.clone(), + filter_expr: filter.filter.clone(), + }; let uid = get_task_id(&req, &opt)?; let dry_run = is_dry_run(&req, &opt)?; - let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) - .await?? - .into(); + let task = { + let index_scheduler = index_scheduler.clone(); + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await?? + }; + + if network.sharding && !dry_run { + proxy(&index_scheduler, &index_uid, &req, network, Body::Inline(filter), &task).await?; + } + + let task: SummarizedTaskView = task.into(); debug!(returns = ?task, "Delete documents by filter"); Ok(HttpResponse::Accepted().json(task)) @@ -1336,6 +1375,8 @@ pub async fn edit_documents_by_function( .features() .check_edit_documents_by_function("Using the documents edit route")?; + let network = index_scheduler.network(); + let index_uid = IndexUid::try_from(index_uid.into_inner())?; let index_uid = index_uid.into_inner(); let params = params.into_inner(); @@ -1349,13 +1390,12 @@ pub async fn edit_documents_by_function( &req, ); - let DocumentEditionByFunction { filter, context, function } = params; let engine = milli::rhai::Engine::new(); - if let Err(e) = engine.compile(&function) { + if let Err(e) = engine.compile(¶ms.function) { return Err(ResponseError::from_msg(e.to_string(), Code::BadRequest)); } - if let Some(ref filter) = filter { + if let Some(ref filter) = params.filter { // we ensure the filter is well formed before enqueuing it crate::search::parse_filter( filter, @@ -1365,9 +1405,9 @@ pub async fn edit_documents_by_function( .ok_or(MeilisearchHttpError::EmptyFilter)?; } let task = KindWithContent::DocumentEdition { - index_uid, - filter_expr: filter, - context: match context { + index_uid: index_uid.clone(), + filter_expr: params.filter.clone(), + context: match params.context.clone() { Some(Value::Object(m)) => Some(m), None => None, _ => { @@ -1377,15 +1417,21 @@ pub async fn edit_documents_by_function( )) } }, - function, + function: params.function.clone(), }; let uid = get_task_id(&req, &opt)?; let dry_run = is_dry_run(&req, &opt)?; - let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) - .await?? - .into(); + let task = { + let index_scheduler = index_scheduler.clone(); + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await?? + }; + + if network.sharding && !dry_run { + proxy(&index_scheduler, &index_uid, &req, network, Body::Inline(params), &task).await?; + } + + let task: SummarizedTaskView = task.into(); debug!(returns = ?task, "Edit documents by function"); Ok(HttpResponse::Accepted().json(task)) @@ -1428,6 +1474,8 @@ pub async fn clear_all_documents( analytics: web::Data, ) -> Result { let index_uid = IndexUid::try_from(index_uid.into_inner())?; + let network = index_scheduler.network(); + analytics.publish( DocumentsDeletionAggregator { clear_all: true, @@ -1441,10 +1489,18 @@ pub async fn clear_all_documents( let task = KindWithContent::DocumentClear { index_uid: index_uid.to_string() }; let uid = get_task_id(&req, &opt)?; let dry_run = is_dry_run(&req, &opt)?; - let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) - .await?? - .into(); + + let task = { + let index_scheduler = index_scheduler.clone(); + + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await?? + }; + + if network.sharding && !dry_run { + proxy(&index_scheduler, &index_uid, &req, network, Body::none(), &task).await?; + } + + let task: SummarizedTaskView = task.into(); debug!(returns = ?task, "Delete all documents"); Ok(HttpResponse::Accepted().json(task))