mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-25 21:16:28 +00:00 
			
		
		
		
	implement the dry run ha parameter
This commit is contained in:
		| @@ -251,7 +251,9 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc< | ||||
|             .name(String::from("register-snapshot-tasks")) | ||||
|             .spawn(move || loop { | ||||
|                 thread::sleep(snapshot_delay); | ||||
|                 if let Err(e) = index_scheduler.register(KindWithContent::SnapshotCreation, None) { | ||||
|                 if let Err(e) = | ||||
|                     index_scheduler.register(KindWithContent::SnapshotCreation, None, false) | ||||
|                 { | ||||
|                     error!("Error while registering snapshot: {}", e); | ||||
|                 } | ||||
|             }) | ||||
|   | ||||
| @@ -11,7 +11,7 @@ use crate::analytics::Analytics; | ||||
| use crate::extractors::authentication::policies::*; | ||||
| use crate::extractors::authentication::GuardedData; | ||||
| use crate::extractors::sequential_extractor::SeqHandler; | ||||
| use crate::routes::{get_task_id, SummarizedTaskView}; | ||||
| use crate::routes::{get_task_id, is_dry_run, SummarizedTaskView}; | ||||
| use crate::Opt; | ||||
|  | ||||
| pub fn configure(cfg: &mut web::ServiceConfig) { | ||||
| @@ -32,8 +32,11 @@ pub async fn create_dump( | ||||
|         instance_uid: analytics.instance_uid().cloned(), | ||||
|     }; | ||||
|     let uid = get_task_id(&req, &opt)?; | ||||
|     let dry_run = is_dry_run(&req, &opt)?; | ||||
|     let task: SummarizedTaskView = | ||||
|         tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); | ||||
|         tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) | ||||
|             .await?? | ||||
|             .into(); | ||||
|  | ||||
|     debug!(returns = ?task, "Create dump"); | ||||
|     Ok(HttpResponse::Accepted().json(task)) | ||||
|   | ||||
| @@ -36,7 +36,9 @@ use crate::extractors::authentication::policies::*; | ||||
| use crate::extractors::authentication::GuardedData; | ||||
| use crate::extractors::payload::Payload; | ||||
| use crate::extractors::sequential_extractor::SeqHandler; | ||||
| use crate::routes::{get_task_id, PaginationView, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT}; | ||||
| use crate::routes::{ | ||||
|     get_task_id, is_dry_run, PaginationView, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT, | ||||
| }; | ||||
| use crate::search::parse_filter; | ||||
| use crate::Opt; | ||||
|  | ||||
| @@ -133,8 +135,11 @@ pub async fn delete_document( | ||||
|         documents_ids: vec![document_id], | ||||
|     }; | ||||
|     let uid = get_task_id(&req, &opt)?; | ||||
|     let dry_run = is_dry_run(&req, &opt)?; | ||||
|     let task: SummarizedTaskView = | ||||
|         tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); | ||||
|         tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) | ||||
|             .await?? | ||||
|             .into(); | ||||
|     debug!("returns: {:?}", task); | ||||
|     Ok(HttpResponse::Accepted().json(task)) | ||||
| } | ||||
| @@ -282,6 +287,7 @@ pub async fn replace_documents( | ||||
|  | ||||
|     let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid); | ||||
|     let uid = get_task_id(&req, &opt)?; | ||||
|     let dry_run = is_dry_run(&req, &opt)?; | ||||
|     let task = document_addition( | ||||
|         extract_mime_type(&req)?, | ||||
|         index_scheduler, | ||||
| @@ -291,6 +297,7 @@ pub async fn replace_documents( | ||||
|         body, | ||||
|         IndexDocumentsMethod::ReplaceDocuments, | ||||
|         uid, | ||||
|         dry_run, | ||||
|         allow_index_creation, | ||||
|     ) | ||||
|     .await?; | ||||
| @@ -317,6 +324,7 @@ pub async fn update_documents( | ||||
|  | ||||
|     let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid); | ||||
|     let uid = get_task_id(&req, &opt)?; | ||||
|     let dry_run = is_dry_run(&req, &opt)?; | ||||
|     let task = document_addition( | ||||
|         extract_mime_type(&req)?, | ||||
|         index_scheduler, | ||||
| @@ -326,6 +334,7 @@ pub async fn update_documents( | ||||
|         body, | ||||
|         IndexDocumentsMethod::UpdateDocuments, | ||||
|         uid, | ||||
|         dry_run, | ||||
|         allow_index_creation, | ||||
|     ) | ||||
|     .await?; | ||||
| @@ -344,6 +353,7 @@ async fn document_addition( | ||||
|     mut body: Payload, | ||||
|     method: IndexDocumentsMethod, | ||||
|     task_id: Option<TaskId>, | ||||
|     dry_run: bool, | ||||
|     allow_index_creation: bool, | ||||
| ) -> Result<SummarizedTaskView, MeilisearchHttpError> { | ||||
|     let format = match ( | ||||
| @@ -376,7 +386,7 @@ async fn document_addition( | ||||
|         } | ||||
|     }; | ||||
|  | ||||
|     let (uuid, mut update_file) = index_scheduler.create_update_file()?; | ||||
|     let (uuid, mut update_file) = index_scheduler.create_update_file(dry_run)?; | ||||
|  | ||||
|     let temp_file = match tempfile() { | ||||
|         Ok(file) => file, | ||||
| @@ -460,7 +470,9 @@ async fn document_addition( | ||||
|     }; | ||||
|  | ||||
|     let scheduler = index_scheduler.clone(); | ||||
|     let task = match tokio::task::spawn_blocking(move || scheduler.register(task, task_id)).await? { | ||||
|     let task = match tokio::task::spawn_blocking(move || scheduler.register(task, task_id, dry_run)) | ||||
|         .await? | ||||
|     { | ||||
|         Ok(task) => task, | ||||
|         Err(e) => { | ||||
|             index_scheduler.delete_update_file(uuid)?; | ||||
| @@ -492,8 +504,11 @@ pub async fn delete_documents_batch( | ||||
|     let task = | ||||
|         KindWithContent::DocumentDeletion { index_uid: index_uid.to_string(), documents_ids: ids }; | ||||
|     let uid = get_task_id(&req, &opt)?; | ||||
|     let dry_run = is_dry_run(&req, &opt)?; | ||||
|     let task: SummarizedTaskView = | ||||
|         tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); | ||||
|         tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) | ||||
|             .await?? | ||||
|             .into(); | ||||
|  | ||||
|     debug!(returns = ?task, "Delete documents by batch"); | ||||
|     Ok(HttpResponse::Accepted().json(task)) | ||||
| @@ -530,8 +545,11 @@ pub async fn delete_documents_by_filter( | ||||
|     let task = KindWithContent::DocumentDeletionByFilter { index_uid, filter_expr: filter }; | ||||
|  | ||||
|     let uid = get_task_id(&req, &opt)?; | ||||
|     let dry_run = is_dry_run(&req, &opt)?; | ||||
|     let task: SummarizedTaskView = | ||||
|         tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); | ||||
|         tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) | ||||
|             .await?? | ||||
|             .into(); | ||||
|  | ||||
|     debug!(returns = ?task, "Delete documents by filter"); | ||||
|     Ok(HttpResponse::Accepted().json(task)) | ||||
| @@ -549,8 +567,11 @@ pub async fn clear_all_documents( | ||||
|  | ||||
|     let task = KindWithContent::DocumentClear { index_uid: index_uid.to_string() }; | ||||
|     let uid = get_task_id(&req, &opt)?; | ||||
|     let dry_run = is_dry_run(&req, &opt)?; | ||||
|     let task: SummarizedTaskView = | ||||
|         tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); | ||||
|         tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) | ||||
|             .await?? | ||||
|             .into(); | ||||
|  | ||||
|     debug!(returns = ?task, "Delete all documents"); | ||||
|     Ok(HttpResponse::Accepted().json(task)) | ||||
|   | ||||
| @@ -22,6 +22,7 @@ use crate::analytics::Analytics; | ||||
| use crate::extractors::authentication::policies::*; | ||||
| use crate::extractors::authentication::{AuthenticationError, GuardedData}; | ||||
| use crate::extractors::sequential_extractor::SeqHandler; | ||||
| use crate::routes::is_dry_run; | ||||
| use crate::Opt; | ||||
|  | ||||
| pub mod documents; | ||||
| @@ -140,8 +141,11 @@ pub async fn create_index( | ||||
|  | ||||
|         let task = KindWithContent::IndexCreation { index_uid: uid.to_string(), primary_key }; | ||||
|         let uid = get_task_id(&req, &opt)?; | ||||
|         let dry_run = is_dry_run(&req, &opt)?; | ||||
|         let task: SummarizedTaskView = | ||||
|             tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); | ||||
|             tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) | ||||
|                 .await?? | ||||
|                 .into(); | ||||
|         debug!(returns = ?task, "Create index"); | ||||
|  | ||||
|         Ok(HttpResponse::Accepted().json(task)) | ||||
| @@ -211,8 +215,11 @@ pub async fn update_index( | ||||
|     }; | ||||
|  | ||||
|     let uid = get_task_id(&req, &opt)?; | ||||
|     let dry_run = is_dry_run(&req, &opt)?; | ||||
|     let task: SummarizedTaskView = | ||||
|         tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); | ||||
|         tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) | ||||
|             .await?? | ||||
|             .into(); | ||||
|  | ||||
|     debug!(returns = ?task, "Update index"); | ||||
|     Ok(HttpResponse::Accepted().json(task)) | ||||
| @@ -227,8 +234,11 @@ pub async fn delete_index( | ||||
|     let index_uid = IndexUid::try_from(index_uid.into_inner())?; | ||||
|     let task = KindWithContent::IndexDeletion { index_uid: index_uid.into_inner() }; | ||||
|     let uid = get_task_id(&req, &opt)?; | ||||
|     let dry_run = is_dry_run(&req, &opt)?; | ||||
|     let task: SummarizedTaskView = | ||||
|         tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); | ||||
|         tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) | ||||
|             .await?? | ||||
|             .into(); | ||||
|     debug!(returns = ?task, "Delete index"); | ||||
|  | ||||
|     Ok(HttpResponse::Accepted().json(task)) | ||||
|   | ||||
| @@ -15,7 +15,7 @@ use tracing::debug; | ||||
| use crate::analytics::Analytics; | ||||
| use crate::extractors::authentication::policies::*; | ||||
| use crate::extractors::authentication::GuardedData; | ||||
| use crate::routes::{get_task_id, SummarizedTaskView}; | ||||
| use crate::routes::{get_task_id, is_dry_run, SummarizedTaskView}; | ||||
| use crate::Opt; | ||||
|  | ||||
| #[macro_export] | ||||
| @@ -36,7 +36,7 @@ macro_rules! make_setting_route { | ||||
|             use $crate::extractors::authentication::GuardedData; | ||||
|             use $crate::extractors::sequential_extractor::SeqHandler; | ||||
|             use $crate::Opt; | ||||
|             use $crate::routes::{get_task_id, SummarizedTaskView}; | ||||
|             use $crate::routes::{is_dry_run, get_task_id, SummarizedTaskView}; | ||||
|  | ||||
|             pub async fn delete( | ||||
|                 index_scheduler: GuardedData< | ||||
| @@ -61,8 +61,9 @@ macro_rules! make_setting_route { | ||||
|                     allow_index_creation, | ||||
|                 }; | ||||
|                 let uid = get_task_id(&req, &opt)?; | ||||
|                 let dry_run = is_dry_run(&req, &opt)?; | ||||
|                 let task: SummarizedTaskView = | ||||
|                     tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)) | ||||
|                     tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) | ||||
|                         .await?? | ||||
|                         .into(); | ||||
|  | ||||
| @@ -112,8 +113,9 @@ macro_rules! make_setting_route { | ||||
|                     allow_index_creation, | ||||
|                 }; | ||||
|                 let uid = get_task_id(&req, &opt)?; | ||||
|                 let dry_run = is_dry_run(&req, &opt)?; | ||||
|                 let task: SummarizedTaskView = | ||||
|                     tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)) | ||||
|                     tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) | ||||
|                         .await?? | ||||
|                         .into(); | ||||
|  | ||||
| @@ -776,8 +778,11 @@ pub async fn update_all( | ||||
|         allow_index_creation, | ||||
|     }; | ||||
|     let uid = get_task_id(&req, &opt)?; | ||||
|     let dry_run = is_dry_run(&req, &opt)?; | ||||
|     let task: SummarizedTaskView = | ||||
|         tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); | ||||
|         tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) | ||||
|             .await?? | ||||
|             .into(); | ||||
|  | ||||
|     debug!(returns = ?task, "Update all settings"); | ||||
|     Ok(HttpResponse::Accepted().json(task)) | ||||
| @@ -815,8 +820,11 @@ pub async fn delete_all( | ||||
|         allow_index_creation, | ||||
|     }; | ||||
|     let uid = get_task_id(&req, &opt)?; | ||||
|     let dry_run = is_dry_run(&req, &opt)?; | ||||
|     let task: SummarizedTaskView = | ||||
|         tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); | ||||
|         tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) | ||||
|             .await?? | ||||
|             .into(); | ||||
|  | ||||
|     debug!(returns = ?task, "Delete all settings"); | ||||
|     Ok(HttpResponse::Accepted().json(task)) | ||||
|   | ||||
| @@ -77,6 +77,25 @@ pub fn get_task_id(req: &HttpRequest, opt: &Opt) -> Result<Option<TaskId>, Respo | ||||
|     Ok(task_id) | ||||
| } | ||||
|  | ||||
| pub fn is_dry_run(req: &HttpRequest, opt: &Opt) -> Result<bool, ResponseError> { | ||||
|     if !opt.experimental_ha_parameters { | ||||
|         return Ok(false); | ||||
|     } | ||||
|     Ok(req | ||||
|         .headers() | ||||
|         .get("DryRun") | ||||
|         .map(|header| { | ||||
|             header.to_str().map_err(|e| { | ||||
|                 ResponseError::from_msg( | ||||
|                     format!("DryRun is not a valid utf-8 string: {e}"), | ||||
|                     Code::BadRequest, | ||||
|                 ) | ||||
|             }) | ||||
|         }) | ||||
|         .transpose()? | ||||
|         .map_or(false, |s| s.to_lowercase() == "true")) | ||||
| } | ||||
|  | ||||
| #[derive(Debug, Serialize)] | ||||
| #[serde(rename_all = "camelCase")] | ||||
| pub struct SummarizedTaskView { | ||||
|   | ||||
| @@ -10,7 +10,7 @@ use crate::analytics::Analytics; | ||||
| use crate::extractors::authentication::policies::*; | ||||
| use crate::extractors::authentication::GuardedData; | ||||
| use crate::extractors::sequential_extractor::SeqHandler; | ||||
| use crate::routes::{get_task_id, SummarizedTaskView}; | ||||
| use crate::routes::{get_task_id, is_dry_run, SummarizedTaskView}; | ||||
| use crate::Opt; | ||||
|  | ||||
| pub fn configure(cfg: &mut web::ServiceConfig) { | ||||
| @@ -27,8 +27,11 @@ pub async fn create_snapshot( | ||||
|  | ||||
|     let task = KindWithContent::SnapshotCreation; | ||||
|     let uid = get_task_id(&req, &opt)?; | ||||
|     let dry_run = is_dry_run(&req, &opt)?; | ||||
|     let task: SummarizedTaskView = | ||||
|         tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); | ||||
|         tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) | ||||
|             .await?? | ||||
|             .into(); | ||||
|  | ||||
|     debug!(returns = ?task, "Create snapshot"); | ||||
|     Ok(HttpResponse::Accepted().json(task)) | ||||
|   | ||||
| @@ -10,7 +10,7 @@ use meilisearch_types::index_uid::IndexUid; | ||||
| use meilisearch_types::tasks::{IndexSwap, KindWithContent}; | ||||
| use serde_json::json; | ||||
|  | ||||
| use super::{get_task_id, SummarizedTaskView}; | ||||
| use super::{get_task_id, is_dry_run, SummarizedTaskView}; | ||||
| use crate::analytics::Analytics; | ||||
| use crate::error::MeilisearchHttpError; | ||||
| use crate::extractors::authentication::policies::*; | ||||
| @@ -63,7 +63,10 @@ pub async fn swap_indexes( | ||||
|  | ||||
|     let task = KindWithContent::IndexSwap { swaps }; | ||||
|     let uid = get_task_id(&req, &opt)?; | ||||
|     let dry_run = is_dry_run(&req, &opt)?; | ||||
|     let task: SummarizedTaskView = | ||||
|         tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into(); | ||||
|         tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) | ||||
|             .await?? | ||||
|             .into(); | ||||
|     Ok(HttpResponse::Accepted().json(task)) | ||||
| } | ||||
|   | ||||
| @@ -18,7 +18,7 @@ use time::macros::format_description; | ||||
| use time::{Date, Duration, OffsetDateTime, Time}; | ||||
| use tokio::task; | ||||
|  | ||||
| use super::{get_task_id, SummarizedTaskView}; | ||||
| use super::{get_task_id, is_dry_run, SummarizedTaskView}; | ||||
| use crate::analytics::Analytics; | ||||
| use crate::extractors::authentication::policies::*; | ||||
| use crate::extractors::authentication::GuardedData; | ||||
| @@ -200,8 +200,10 @@ async fn cancel_tasks( | ||||
|         KindWithContent::TaskCancelation { query: format!("?{}", req.query_string()), tasks }; | ||||
|  | ||||
|     let uid = get_task_id(&req, &opt)?; | ||||
|     let dry_run = is_dry_run(&req, &opt)?; | ||||
|     let task = | ||||
|         task::spawn_blocking(move || index_scheduler.register(task_cancelation, uid)).await??; | ||||
|         task::spawn_blocking(move || index_scheduler.register(task_cancelation, uid, dry_run)) | ||||
|             .await??; | ||||
|     let task: SummarizedTaskView = task.into(); | ||||
|  | ||||
|     Ok(HttpResponse::Ok().json(task)) | ||||
| @@ -248,7 +250,9 @@ async fn delete_tasks( | ||||
|         KindWithContent::TaskDeletion { query: format!("?{}", req.query_string()), tasks }; | ||||
|  | ||||
|     let uid = get_task_id(&req, &opt)?; | ||||
|     let task = task::spawn_blocking(move || index_scheduler.register(task_deletion, uid)).await??; | ||||
|     let dry_run = is_dry_run(&req, &opt)?; | ||||
|     let task = task::spawn_blocking(move || index_scheduler.register(task_deletion, uid, dry_run)) | ||||
|         .await??; | ||||
|     let task: SummarizedTaskView = task.into(); | ||||
|  | ||||
|     Ok(HttpResponse::Ok().json(task)) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user