diff --git a/crates/meilisearch/src/routes/indexes/documents.rs b/crates/meilisearch/src/routes/indexes/documents.rs index 138f5140f..29886c17d 100644 --- a/crates/meilisearch/src/routes/indexes/documents.rs +++ b/crates/meilisearch/src/routes/indexes/documents.rs @@ -45,6 +45,7 @@ use crate::extractors::authentication::policies::*; use crate::extractors::authentication::GuardedData; use crate::extractors::payload::Payload; use crate::extractors::sequential_extractor::SeqHandler; +use crate::routes::indexes::proxy::{proxy, Body}; use crate::routes::indexes::search::fix_sort_query_parameters; use crate::routes::{ get_task_id, is_dry_run, PaginationView, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT, @@ -334,6 +335,7 @@ pub async fn delete_document( ) -> Result { let DocumentParam { index_uid, document_id } = path.into_inner(); let index_uid = IndexUid::try_from(index_uid)?; + let network = index_scheduler.network(); analytics.publish( DocumentsDeletionAggregator { @@ -351,10 +353,16 @@ pub async fn delete_document( }; let uid = get_task_id(&req, &opt)?; let dry_run = is_dry_run(&req, &opt)?; - let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) - .await?? - .into(); + let task = { + let index_scheduler = index_scheduler.clone(); + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await?? + }; + + if network.sharding && !dry_run { + proxy(&index_scheduler, &index_uid, &req, network, Body::none(), &task).await?; + } + + let task: SummarizedTaskView = task.into(); debug!("returns: {:?}", task); Ok(HttpResponse::Accepted().json(task)) } @@ -792,7 +800,6 @@ pub async fn replace_documents( let uid = get_task_id(&req, &opt)?; let dry_run = is_dry_run(&req, &opt)?; let task = document_addition( - extract_mime_type(&req)?, index_scheduler, index_uid, params.primary_key, @@ -802,8 +809,10 @@ pub async fn replace_documents( uid, dry_run, allow_index_creation, + &req, ) .await?; + debug!(returns = ?task, "Replace documents"); Ok(HttpResponse::Accepted().json(task)) @@ -893,7 +902,6 @@ pub async fn update_documents( let uid = get_task_id(&req, &opt)?; let dry_run = is_dry_run(&req, &opt)?; let task = document_addition( - extract_mime_type(&req)?, index_scheduler, index_uid, params.primary_key, @@ -903,6 +911,7 @@ pub async fn update_documents( uid, dry_run, allow_index_creation, + &req, ) .await?; debug!(returns = ?task, "Update documents"); @@ -912,7 +921,6 @@ pub async fn update_documents( #[allow(clippy::too_many_arguments)] async fn document_addition( - mime_type: Option, index_scheduler: GuardedData, Data>, index_uid: IndexUid, primary_key: Option, @@ -922,7 +930,11 @@ async fn document_addition( task_id: Option, dry_run: bool, allow_index_creation: bool, + req: &HttpRequest, ) -> Result { + let mime_type = extract_mime_type(req)?; + let network = index_scheduler.network(); + let format = match ( mime_type.as_ref().map(|m| (m.type_().as_str(), m.subtype().as_str())), csv_delimiter, @@ -954,7 +966,7 @@ async fn document_addition( }; let (uuid, mut update_file) = index_scheduler.queue.create_update_file(dry_run)?; - let documents_count = match format { + let res = match format { PayloadType::Ndjson => { let (path, file) = update_file.into_parts(); let file = match file { @@ -969,19 +981,19 @@ async fn document_addition( None => None, }; - let documents_count = tokio::task::spawn_blocking(move || { + let res = tokio::task::spawn_blocking(move || { let documents_count = file.as_ref().map_or(Ok(0), |ntf| { read_ndjson(ntf.as_file()).map_err(MeilisearchHttpError::DocumentFormat) })?; let update_file = file_store::File::from_parts(path, file); - update_file.persist()?; + let update_file = update_file.persist()?; - Ok(documents_count) + Ok((documents_count, update_file)) }) .await?; - Ok(documents_count) + Ok(res) } PayloadType::Json | PayloadType::Csv { delimiter: _ } => { let temp_file = match tempfile() { @@ -1000,16 +1012,16 @@ async fn document_addition( unreachable!("We already wrote the user content into the update file") } }; - // we NEED to persist the file here because we moved the `udpate_file` in another task. - update_file.persist()?; - Ok(documents_count) + // we NEED to persist the file here because we moved the `update_file` in another task. + let file = update_file.persist()?; + Ok((documents_count, file)) }) .await } }; - let documents_count = match documents_count { - Ok(Ok(documents_count)) => documents_count, + let (documents_count, file) = match res { + Ok(Ok((documents_count, file))) => (documents_count, file), // in this case the file has not possibly be persisted. Ok(Err(e)) => return Err(e), Err(e) => { @@ -1051,6 +1063,12 @@ async fn document_addition( } }; + if network.sharding { + if let Some(file) = file { + proxy(&index_scheduler, &index_uid, req, network, Body::with_file(file), &task).await?; + } + } + Ok(task.into()) } @@ -1129,6 +1147,7 @@ pub async fn delete_documents_batch( ) -> Result { debug!(parameters = ?body, "Delete documents by batch"); let index_uid = IndexUid::try_from(index_uid.into_inner())?; + let network = index_scheduler.network(); analytics.publish( DocumentsDeletionAggregator { @@ -1149,10 +1168,16 @@ pub async fn delete_documents_batch( KindWithContent::DocumentDeletion { index_uid: index_uid.to_string(), documents_ids: ids }; let uid = get_task_id(&req, &opt)?; let dry_run = is_dry_run(&req, &opt)?; - let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) - .await?? - .into(); + let task = { + let index_scheduler = index_scheduler.clone(); + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await?? + }; + + if network.sharding && !dry_run { + proxy(&index_scheduler, &index_uid, &req, network, Body::Inline(body), &task).await?; + } + + let task: SummarizedTaskView = task.into(); debug!(returns = ?task, "Delete documents by batch"); Ok(HttpResponse::Accepted().json(task)) @@ -1207,7 +1232,8 @@ pub async fn delete_documents_by_filter( debug!(parameters = ?body, "Delete documents by filter"); let index_uid = IndexUid::try_from(index_uid.into_inner())?; let index_uid = index_uid.into_inner(); - let filter = body.into_inner().filter; + let filter = body.into_inner(); + let network = index_scheduler.network(); analytics.publish( DocumentsDeletionAggregator { @@ -1220,17 +1246,30 @@ pub async fn delete_documents_by_filter( ); // we ensure the filter is well formed before enqueuing it - crate::search::parse_filter(&filter, Code::InvalidDocumentFilter, index_scheduler.features())? - .ok_or(MeilisearchHttpError::EmptyFilter)?; + crate::search::parse_filter( + &filter.filter, + Code::InvalidDocumentFilter, + index_scheduler.features(), + )? + .ok_or(MeilisearchHttpError::EmptyFilter)?; - let task = KindWithContent::DocumentDeletionByFilter { index_uid, filter_expr: filter }; + let task = KindWithContent::DocumentDeletionByFilter { + index_uid: index_uid.clone(), + filter_expr: filter.filter.clone(), + }; let uid = get_task_id(&req, &opt)?; let dry_run = is_dry_run(&req, &opt)?; - let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) - .await?? - .into(); + let task = { + let index_scheduler = index_scheduler.clone(); + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await?? + }; + + if network.sharding && !dry_run { + proxy(&index_scheduler, &index_uid, &req, network, Body::Inline(filter), &task).await?; + } + + let task: SummarizedTaskView = task.into(); debug!(returns = ?task, "Delete documents by filter"); Ok(HttpResponse::Accepted().json(task)) @@ -1324,6 +1363,8 @@ pub async fn edit_documents_by_function( .features() .check_edit_documents_by_function("Using the documents edit route")?; + let network = index_scheduler.network(); + let index_uid = IndexUid::try_from(index_uid.into_inner())?; let index_uid = index_uid.into_inner(); let params = params.into_inner(); @@ -1337,13 +1378,12 @@ pub async fn edit_documents_by_function( &req, ); - let DocumentEditionByFunction { filter, context, function } = params; let engine = milli::rhai::Engine::new(); - if let Err(e) = engine.compile(&function) { + if let Err(e) = engine.compile(¶ms.function) { return Err(ResponseError::from_msg(e.to_string(), Code::BadRequest)); } - if let Some(ref filter) = filter { + if let Some(ref filter) = params.filter { // we ensure the filter is well formed before enqueuing it crate::search::parse_filter( filter, @@ -1353,9 +1393,9 @@ pub async fn edit_documents_by_function( .ok_or(MeilisearchHttpError::EmptyFilter)?; } let task = KindWithContent::DocumentEdition { - index_uid, - filter_expr: filter, - context: match context { + index_uid: index_uid.clone(), + filter_expr: params.filter.clone(), + context: match params.context.clone() { Some(Value::Object(m)) => Some(m), None => None, _ => { @@ -1365,15 +1405,21 @@ pub async fn edit_documents_by_function( )) } }, - function, + function: params.function.clone(), }; let uid = get_task_id(&req, &opt)?; let dry_run = is_dry_run(&req, &opt)?; - let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) - .await?? - .into(); + let task = { + let index_scheduler = index_scheduler.clone(); + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await?? + }; + + if network.sharding && !dry_run { + proxy(&index_scheduler, &index_uid, &req, network, Body::Inline(params), &task).await?; + } + + let task: SummarizedTaskView = task.into(); debug!(returns = ?task, "Edit documents by function"); Ok(HttpResponse::Accepted().json(task)) @@ -1416,6 +1462,8 @@ pub async fn clear_all_documents( analytics: web::Data, ) -> Result { let index_uid = IndexUid::try_from(index_uid.into_inner())?; + let network = index_scheduler.network(); + analytics.publish( DocumentsDeletionAggregator { clear_all: true, @@ -1429,10 +1477,18 @@ pub async fn clear_all_documents( let task = KindWithContent::DocumentClear { index_uid: index_uid.to_string() }; let uid = get_task_id(&req, &opt)?; let dry_run = is_dry_run(&req, &opt)?; - let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) - .await?? - .into(); + + let task = { + let index_scheduler = index_scheduler.clone(); + + tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await?? + }; + + if network.sharding && !dry_run { + proxy(&index_scheduler, &index_uid, &req, network, Body::none(), &task).await?; + } + + let task: SummarizedTaskView = task.into(); debug!(returns = ?task, "Delete all documents"); Ok(HttpResponse::Accepted().json(task))