From 60716f13e1875af7a05d1e64d4ad8ba0a175aa0b Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Wed, 26 Nov 2025 16:19:40 +0100 Subject: [PATCH] WIP --- crates/index-scheduler/src/insta_snapshot.rs | 4 +- crates/index-scheduler/src/lib.rs | 6 +- crates/index-scheduler/src/queue/tasks.rs | 9 +- .../src/scheduler/enterprise_edition/mod.rs | 51 +++++--- .../src/scheduler/process_export.rs | 108 ++++++++-------- crates/index-scheduler/src/utils.rs | 1 - crates/meilisearch-types/src/task_view.rs | 3 +- .../src/tasks/enterprise_edition/network.rs | 122 ++++++++++-------- crates/meilisearch-types/src/tasks/mod.rs | 4 +- .../indexes/enterprise_edition/proxy.rs | 70 ++++++++-- .../src/routes/indexes/settings.rs | 111 ++++++++-------- 11 files changed, 283 insertions(+), 206 deletions(-) diff --git a/crates/index-scheduler/src/insta_snapshot.rs b/crates/index-scheduler/src/insta_snapshot.rs index cf3caaa34..0f732218b 100644 --- a/crates/index-scheduler/src/insta_snapshot.rs +++ b/crates/index-scheduler/src/insta_snapshot.rs @@ -325,8 +325,8 @@ fn snapshot_details(d: &Details) -> String { Details::IndexCompaction { index_uid, pre_compaction_size, post_compaction_size } => { format!("{{ index_uid: {index_uid:?}, pre_compaction_size: {pre_compaction_size:?}, post_compaction_size: {post_compaction_size:?} }}") } - Details::NetworkTopologyChange { moved_documents, received_documents, message } => { - format!("{{ moved_documents: {moved_documents:?}, received_documents: {received_documents:?}, message: {message:?}") + Details::NetworkTopologyChange { moved_documents, message } => { + format!("{{ moved_documents: {moved_documents:?}, message: {message:?}") } } } diff --git a/crates/index-scheduler/src/lib.rs b/crates/index-scheduler/src/lib.rs index 07033389f..303917d11 100644 --- a/crates/index-scheduler/src/lib.rs +++ b/crates/index-scheduler/src/lib.rs @@ -841,9 +841,9 @@ impl IndexScheduler { self.update_network_task( &mut wtxn, - &ImportData { remote_name, index_name: "null".into(), document_count: 0 }, + &ImportData { remote_name, index_name: None, document_count: 0 }, &origin, - &ImportMetadata { index_count: 0, task_key: 0, total_index_documents: 0 }, + &ImportMetadata { index_count: 0, task_key: None, total_index_documents: 0 }, )?; wtxn.commit()?; @@ -907,7 +907,7 @@ impl IndexScheduler { }; network_topology_change.receive_remote_task( &import_from.remote_name, - &import_from.index_name, + import_from.index_name.as_deref(), metadata.task_key, import_from.document_count, metadata.index_count, diff --git a/crates/index-scheduler/src/queue/tasks.rs b/crates/index-scheduler/src/queue/tasks.rs index 853fd6cc6..75fbb4c81 100644 --- a/crates/index-scheduler/src/queue/tasks.rs +++ b/crates/index-scheduler/src/queue/tasks.rs @@ -115,14 +115,15 @@ impl TaskQueue { /// - CorruptedTaskQueue: The task doesn't exist in the database pub(crate) fn update_task(&self, wtxn: &mut RwTxn, task: &mut Task) -> Result<()> { let old_task = self.get_task(wtxn, task.uid)?.ok_or(Error::CorruptedTaskQueue)?; - let reprocessing = old_task.status != Status::Enqueued; + // network topology tasks may be processed multiple times. + let maybe_reprocessing = old_task.status != Status::Enqueued || task.kind.as_kind() == Kind::NetworkTopologyChange; debug_assert!(old_task != *task); debug_assert_eq!(old_task.uid, task.uid); // If we're processing a task that failed it may already contains a batch_uid debug_assert!( - reprocessing || (old_task.batch_uid.is_none() && task.batch_uid.is_some()), + maybe_reprocessing || (old_task.batch_uid.is_none() && task.batch_uid.is_some()), "\n==> old: {old_task:?}\n==> new: {task:?}" ); @@ -161,7 +162,7 @@ impl TaskQueue { ); if old_task.started_at != task.started_at { assert!( - reprocessing || old_task.started_at.is_none(), + maybe_reprocessing || old_task.started_at.is_none(), "Cannot update a task's started_at time" ); if let Some(started_at) = old_task.started_at { @@ -173,7 +174,7 @@ impl TaskQueue { } if old_task.finished_at != task.finished_at { assert!( - reprocessing || old_task.finished_at.is_none(), + maybe_reprocessing || old_task.finished_at.is_none(), "Cannot update a task's finished_at time" ); if let Some(finished_at) = old_task.finished_at { diff --git a/crates/index-scheduler/src/scheduler/enterprise_edition/mod.rs b/crates/index-scheduler/src/scheduler/enterprise_edition/mod.rs index 823a5b033..e239b6bdd 100644 --- a/crates/index-scheduler/src/scheduler/enterprise_edition/mod.rs +++ b/crates/index-scheduler/src/scheduler/enterprise_edition/mod.rs @@ -40,11 +40,13 @@ impl IndexScheduler { let Some(import) = network.import_data() else { continue; }; - network_topology_change.process_remote_tasks( - &import.remote_name, - &import.index_name, - import.document_count, - ); + if let Some(index_name) = import.index_name.as_deref() { + network_topology_change.process_remote_tasks( + &import.remote_name, + index_name, + import.document_count, + ); + } } network_task.details = Some(network_topology_change.to_details()); @@ -82,14 +84,14 @@ impl IndexScheduler { }; if let Some((remotes, out_name)) = network_topology_change.export_to_process() { - self.balance_documents( + network_topology_change.set_moved(self.balance_documents( remotes, out_name, network_topology_change.in_name(), origin, &progress, &self.scheduler.must_stop_processing, - )?; + )?); } network_topology_change.update_state(); if network_topology_change.state() == NetworkTopologyState::Finished { @@ -108,7 +110,7 @@ impl IndexScheduler { network_change_origin: &Origin, progress: &Progress, must_stop_processing: &crate::scheduler::MustStopProcessing, - ) -> crate::Result<()> { + ) -> crate::Result { let new_shards = Shards::from_remotes_local( remotes.keys().map(String::as_str).chain(in_name.into_iter()), in_name, @@ -123,28 +125,36 @@ impl IndexScheduler { let index_count = self.index_mapper.index_count(&scheduler_rtxn)?; - // when the instance is empty, we still need to that to remotes, as they cannot know of that fact. + // when the instance is empty, we still need to tell that to remotes, as they cannot know of that fact and will be waiting for + // data if index_count == 0 { - for remote in remotes.values() { + for (remote_name, remote) in remotes { let target = TargetInstance { base_url: &remote.url, api_key: remote.write_api_key.as_deref(), }; - self.export_no_index( + let res = self.export_no_index( target, out_name, network_change_origin, &agent, must_stop_processing, - )?; + ); + + match res { + Ok(_) => {} + Err(err) => { + tracing::warn!("Could not signal not to wait documents to `{remote_name}` due to error: {err}"); + } + } } - return Ok(()); + return Ok(0); } - let _: Vec<()> = self.index_mapper.try_for_each_index( + let moved_documents: Vec = self.index_mapper.try_for_each_index( &scheduler_rtxn, - |index_uid, index| -> crate::Result<()> { + |index_uid, index| -> crate::Result { indexer_alloc.reset(); let err = |err| Error::from_milli(err, Some(index_uid.to_string())); let index_rtxn = index.read_txn()?; @@ -208,9 +218,11 @@ impl IndexScheduler { } if documents_to_delete.is_empty() { - return Ok(()); + return Ok(0); } + let moved_count = documents_to_delete.len(); + let mut new_fields_ids_map = fields_ids_map.clone(); // candidates not empty => index not empty => a primary key is set @@ -260,9 +272,12 @@ impl IndexScheduler { // update stats after committing changes to index mapper_wtxn.commit()?; - Ok(()) + Ok(moved_count) }, )?; - Ok(()) + + let moved_documents: u64 = moved_documents.into_iter().sum(); + + Ok(moved_documents) } } diff --git a/crates/index-scheduler/src/scheduler/process_export.rs b/crates/index-scheduler/src/scheduler/process_export.rs index 7a34e2d48..5a5814102 100644 --- a/crates/index-scheduler/src/scheduler/process_export.rs +++ b/crates/index-scheduler/src/scheduler/process_export.rs @@ -113,6 +113,8 @@ impl IndexScheduler { ctx: ExportContext<'_>, ) -> Result { let err = |err| Error::from_milli(err, Some(options.index_uid.to_string())); + let total_index_documents = ctx.universe.len(); + let task_network = options.task_network(total_index_documents); let bearer = target.api_key.map(|api_key| format!("Bearer {api_key}")); let url = format!( @@ -161,7 +163,6 @@ impl IndexScheduler { })?; } if !index_exists || options.override_settings { - /// TODO: attach a version to the settings let mut settings = settings::settings(&ctx.index, &ctx.index_rtxn, SecretPolicy::RevealSecrets) .map_err(err)?; @@ -175,13 +176,19 @@ impl IndexScheduler { base_url = target.base_url, index_uid = options.index_uid ); - retry(ctx.must_stop_processing, || { + + let _ = handle_response(retry(ctx.must_stop_processing, || { let mut request = ctx.agent.patch(&url); + + if let Some((import_data, origin, metadata)) = &task_network { + request = set_network_ureq_headers(request, import_data, origin, metadata); + } + if let Some(bearer) = bearer.as_ref() { request = request.set("Authorization", bearer); } request.send_json(settings.clone()).map_err(into_backoff_error) - })?; + }))?; } let fields_ids_map = ctx.index.fields_ids_map(&ctx.index_rtxn)?; @@ -199,12 +206,7 @@ impl IndexScheduler { // no document to send, but we must still send a task when performing network balancing if ctx.universe.is_empty() { - if let ExportMode::NetworkBalancing { - index_count, - export_old_remote_name, - network_change_origin, - } = options.export_mode - { + if let Some((import_data, network_change_origin, metadata)) = task_network { let mut compressed_buffer = Vec::new(); // ignore control flow, we're returning anyway let _ = send_buffer( @@ -214,19 +216,7 @@ impl IndexScheduler { ctx.agent, &documents_url, bearer.as_deref(), - Some(&( - ImportData { - remote_name: export_old_remote_name.to_string(), - index_name: options.index_uid.to_string(), - document_count: 0, - }, - network_change_origin.clone(), - ImportMetadata { - index_count, - task_key: 0, - total_index_documents: ctx.universe.len(), - }, - )), + Some(&(import_data, network_change_origin.clone(), metadata)), &err, )?; } @@ -235,28 +225,7 @@ impl IndexScheduler { let results = request_threads() .broadcast(|broadcast| { - let mut task_network = if let ExportMode::NetworkBalancing { - index_count, - export_old_remote_name, - network_change_origin, - } = options.export_mode - { - Some(( - ImportData { - remote_name: export_old_remote_name.to_string(), - index_name: options.index_uid.to_string(), - document_count: 0, - }, - network_change_origin.clone(), - ImportMetadata { - index_count, - task_key: 0, - total_index_documents: ctx.universe.len(), - }, - )) - } else { - None - }; + let mut task_network = options.task_network(total_index_documents); let index_rtxn = ctx.index.read_txn().map_err(milli::Error::from).map_err(err)?; @@ -269,7 +238,7 @@ impl IndexScheduler { } if let Some((import_data, _, metadata)) = &mut task_network { import_data.document_count += 1; - metadata.task_key = docid; + metadata.task_key = Some(docid); } let document = ctx.index.document(&index_rtxn, docid).map_err(err)?; @@ -352,7 +321,7 @@ impl IndexScheduler { compressed_buffer.clear(); if let Some((import_data, _, metadata)) = &mut task_network { import_data.document_count = 0; - metadata.task_key = 0; + metadata.task_key = None; } if control_flow.is_break() { return Ok(()); @@ -408,11 +377,11 @@ impl IndexScheduler { request, &ImportData { remote_name: export_old_remote_name.to_string(), - index_name: "null".to_string(), + index_name: None, document_count: 0, }, &network_change_origin, - &ImportMetadata { index_count: 0, task_key: 0, total_index_documents: 0 }, + &ImportMetadata { index_count: 0, task_key: None, total_index_documents: 0 }, ); request = request.set("Content-Type", "application/json"); if let Some(bearer) = &bearer { @@ -437,19 +406,28 @@ fn set_network_ureq_headers( origin: &Origin, metadata: &ImportMetadata, ) -> ureq::Request { - request + let request = request .set(headers::PROXY_ORIGIN_REMOTE_HEADER, &origin.remote_name) .set(headers::PROXY_ORIGIN_TASK_UID_HEADER, &origin.task_uid.to_string()) .set(headers::PROXY_ORIGIN_NETWORK_VERSION_HEADER, &origin.network_version.to_string()) .set(headers::PROXY_IMPORT_REMOTE_HEADER, &import_data.remote_name) - .set(headers::PROXY_IMPORT_INDEX_HEADER, &import_data.index_name) - .set(headers::PROXY_IMPORT_TASK_KEY_HEADER, &metadata.task_key.to_string()) .set(headers::PROXY_IMPORT_DOCS_HEADER, &import_data.document_count.to_string()) .set(headers::PROXY_IMPORT_INDEX_COUNT_HEADER, &metadata.index_count.to_string()) .set( headers::PROXY_IMPORT_TOTAL_INDEX_DOCS_HEADER, &metadata.total_index_documents.to_string(), - ) + ); + let request = if let Some(index_name) = import_data.index_name.as_deref() { + request.set(headers::PROXY_IMPORT_INDEX_HEADER, index_name) + } else { + request + }; + let request = if let Some(task_key) = metadata.task_key { + request.set(headers::PROXY_IMPORT_TASK_KEY_HEADER, &task_key.to_string()) + } else { + request + }; + request } fn send_buffer<'a, 'b>( @@ -581,6 +559,32 @@ pub(super) struct ExportOptions<'a> { pub(super) export_mode: ExportMode<'a>, } +impl ExportOptions<'_> { + fn task_network( + &self, + total_index_documents: u64, + ) -> Option<(ImportData, Origin, ImportMetadata)> { + if let ExportMode::NetworkBalancing { + index_count, + export_old_remote_name, + network_change_origin, + } = self.export_mode + { + Some(( + ImportData { + remote_name: export_old_remote_name.to_string(), + index_name: Some(self.index_uid.to_string()), + document_count: 0, + }, + network_change_origin.clone(), + ImportMetadata { index_count, task_key: None, total_index_documents }, + )) + } else { + None + } + } +} + pub(super) struct ExportContext<'a> { pub(super) index: &'a meilisearch_types::milli::Index, pub(super) index_rtxn: &'a milli::heed::RoTxn<'a>, diff --git a/crates/index-scheduler/src/utils.rs b/crates/index-scheduler/src/utils.rs index 138791b78..388e816b4 100644 --- a/crates/index-scheduler/src/utils.rs +++ b/crates/index-scheduler/src/utils.rs @@ -680,7 +680,6 @@ impl crate::IndexScheduler { } Details::NetworkTopologyChange { moved_documents: _, - received_documents: _, message: _, } => { assert_eq!(kind.as_kind(), Kind::NetworkTopologyChange); diff --git a/crates/meilisearch-types/src/task_view.rs b/crates/meilisearch-types/src/task_view.rs index 6b7290fdb..594db8b9c 100644 --- a/crates/meilisearch-types/src/task_view.rs +++ b/crates/meilisearch-types/src/task_view.rs @@ -467,10 +467,9 @@ impl From
for DetailsView { ..Default::default() } } - Details::NetworkTopologyChange { moved_documents, received_documents, message } => { + Details::NetworkTopologyChange { moved_documents, message } => { DetailsView { moved_documents: Some(moved_documents), - received_documents: Some(received_documents), message: Some(message), ..Default::default() } diff --git a/crates/meilisearch-types/src/tasks/enterprise_edition/network.rs b/crates/meilisearch-types/src/tasks/enterprise_edition/network.rs index bf4e0ddfb..22f6f2db6 100644 --- a/crates/meilisearch-types/src/tasks/enterprise_edition/network.rs +++ b/crates/meilisearch-types/src/tasks/enterprise_edition/network.rs @@ -109,7 +109,7 @@ pub struct ImportData { /// Remote that this task is imported from pub remote_name: String, /// Index relevant to this task - pub index_name: String, + pub index_name: Option, /// Number of documents in this task pub document_count: u64, } @@ -122,7 +122,7 @@ pub struct ImportMetadata { /// Key unique to this (network_change, index, host, key). /// /// In practice, an internal document id of one of the documents to import. - pub task_key: DocumentId, + pub task_key: Option, /// Total number of documents to import for this index from this host. pub total_index_documents: u64, } @@ -198,7 +198,7 @@ impl NetworkTopologyChange { out_name, out_remotes, in_remotes, - stats: NetworkTopologyStats { received_documents: 0, moved_documents: 0 }, + stats: NetworkTopologyStats { moved_documents: 0 }, } } @@ -228,6 +228,10 @@ impl NetworkTopologyChange { Some((&self.out_remotes, out_name)) } + pub fn set_moved(&mut self, moved_documents: u64) { + self.stats.moved_documents = moved_documents; + } + /// Compute the next state from the current state of the task. pub fn update_state(&mut self) { self.state = match self.state { @@ -257,8 +261,8 @@ impl NetworkTopologyChange { pub fn receive_remote_task( &mut self, remote_name: &str, - index_name: &str, - task_key: DocumentId, + index_name: Option<&str>, + task_key: Option, document_count: u64, total_indexes: u64, total_index_documents: u64, @@ -273,59 +277,73 @@ impl NetworkTopologyChange { ImportState::Finished { total_indexes, total_documents: 0 } } else { let mut task_keys = BTreeSet::new(); - task_keys.insert(task_key); - let mut import_index_state = BTreeMap::new(); - import_index_state.insert( - index_name.to_owned(), - ImportIndexState::Ongoing { - total_documents: total_index_documents, - received_documents: document_count, - task_keys, - processed_documents: 0, - }, - ); - ImportState::Ongoing { import_index_state, total_indexes } + if let Some(index_name) = index_name { + if let Some(task_key) = task_key { + task_keys.insert(task_key); + } + let mut import_index_state = BTreeMap::new(); + import_index_state.insert( + index_name.to_owned(), + ImportIndexState::Ongoing { + total_documents: total_index_documents, + received_documents: document_count, + task_keys, + processed_documents: 0, + }, + ); + ImportState::Ongoing { import_index_state, total_indexes } + } else { + ImportState::WaitingForInitialTask + } } } ImportState::Ongoing { mut import_index_state, total_indexes } => { - if let Some((index_name, mut index_state)) = - import_index_state.remove_entry(index_name) - { - index_state = match index_state { - ImportIndexState::Ongoing { - total_documents, - received_documents: previously_received, - processed_documents, - mut task_keys, - } => { - if !task_keys.insert(task_key) { - return Err(ReceiveTaskError::DuplicateTask(task_key)); - } - + if let Some(index_name) = index_name { + if let Some((index_name, mut index_state)) = + import_index_state.remove_entry(index_name) + { + index_state = match index_state { ImportIndexState::Ongoing { total_documents, - received_documents: previously_received + document_count, + received_documents: previously_received, processed_documents, - task_keys, + mut task_keys, + } => { + if let Some(task_key) = task_key { + if !task_keys.insert(task_key) { + return Err(ReceiveTaskError::DuplicateTask(task_key)); + } + } + + ImportIndexState::Ongoing { + total_documents, + received_documents: previously_received + document_count, + processed_documents, + task_keys, + } } + ImportIndexState::Finished { total_documents } => { + ImportIndexState::Finished { total_documents } + } + }; + import_index_state.insert(index_name, index_state); + } else { + let mut task_keys = BTreeSet::new(); + if let Some(task_key) = task_key { + task_keys.insert(task_key); } - ImportIndexState::Finished { total_documents } => { - ImportIndexState::Finished { total_documents } - } - }; - import_index_state.insert(index_name, index_state); + let state = ImportIndexState::Ongoing { + total_documents: total_index_documents, + received_documents: document_count, + processed_documents: 0, + task_keys, + }; + import_index_state.insert(index_name.to_string(), state); + } + ImportState::Ongoing { import_index_state, total_indexes: total_indexes } } else { - let mut task_keys = BTreeSet::new(); - task_keys.insert(task_key); - let state = ImportIndexState::Ongoing { - total_documents: total_index_documents, - received_documents: document_count, - processed_documents: 0, - task_keys, - }; - import_index_state.insert(index_name.to_string(), state); + ImportState::Ongoing { import_index_state, total_indexes } } - ImportState::Ongoing { import_index_state, total_indexes: total_indexes } } ImportState::Finished { total_indexes, total_documents } => { ImportState::Finished { total_indexes, total_documents } @@ -484,11 +502,7 @@ impl NetworkTopologyChange { } NetworkTopologyState::Finished => "Finished".into(), }; - Details::NetworkTopologyChange { - moved_documents: self.stats.moved_documents, - received_documents: self.stats.received_documents, - message, - } + Details::NetworkTopologyChange { moved_documents: self.stats.moved_documents, message } } pub fn is_import_finished(&self) -> bool { @@ -585,7 +599,7 @@ pub enum NetworkTopologyState { #[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize, Eq, PartialOrd, Ord)] #[serde(rename_all = "camelCase")] pub struct NetworkTopologyStats { - pub received_documents: u64, + #[serde(default)] pub moved_documents: u64, } diff --git a/crates/meilisearch-types/src/tasks/mod.rs b/crates/meilisearch-types/src/tasks/mod.rs index a497cf5f2..266534d68 100644 --- a/crates/meilisearch-types/src/tasks/mod.rs +++ b/crates/meilisearch-types/src/tasks/mod.rs @@ -349,7 +349,6 @@ impl KindWithContent { }), KindWithContent::NetworkTopologyChange { .. } => Some(Details::NetworkTopologyChange { moved_documents: 0, - received_documents: 0, message: "processing tasks for previous network versions".into(), }), } @@ -798,7 +797,6 @@ pub enum Details { }, NetworkTopologyChange { moved_documents: u64, - received_documents: u64, message: String, }, } @@ -843,7 +841,7 @@ impl Details { | Self::Export { .. } | Self::UpgradeDatabase { .. } | Self::IndexSwap { .. } => (), - Self::NetworkTopologyChange { moved_documents: _, received_documents: _, message } => { + Self::NetworkTopologyChange { moved_documents: _, message } => { *message = format!("Failed. Previous status: {}", message); } } diff --git a/crates/meilisearch/src/routes/indexes/enterprise_edition/proxy.rs b/crates/meilisearch/src/routes/indexes/enterprise_edition/proxy.rs index 937f08e79..b1eeedecb 100644 --- a/crates/meilisearch/src/routes/indexes/enterprise_edition/proxy.rs +++ b/crates/meilisearch/src/routes/indexes/enterprise_edition/proxy.rs @@ -616,7 +616,31 @@ pub fn import_data_from_req(req: &HttpRequest) -> Result, Mei header_name: PROXY_IMPORT_DOCS_HEADER, msg: format!("while URL-decoding documents: {err}"), })?; - (remote_name, index_name, documents) + (remote_name, Some(index_name), documents) + } + (Some(remote_name), None, Some(documents)) => { + let remote_name = urlencoding::decode(remote_name.to_str().map_err(|err| { + MeilisearchHttpError::InvalidHeaderValue { + header_name: PROXY_IMPORT_REMOTE_HEADER, + msg: format!("while parsing import remote name as UTF-8: {err}"), + } + })?) + .map_err(|err| MeilisearchHttpError::InvalidHeaderValue { + header_name: PROXY_IMPORT_REMOTE_HEADER, + msg: format!("while URL-decoding import remote name: {err}"), + })?; + + let documents = urlencoding::decode(documents.to_str().map_err(|err| { + MeilisearchHttpError::InvalidHeaderValue { + header_name: PROXY_IMPORT_DOCS_HEADER, + msg: format!("while parsing documents as UTF-8: {err}"), + } + })?) + .map_err(|err| MeilisearchHttpError::InvalidHeaderValue { + header_name: PROXY_IMPORT_DOCS_HEADER, + msg: format!("while URL-decoding documents: {err}"), + })?; + (remote_name, None, documents) } // catch-all pattern that has to contain an inconsistency since we already matched (None, None, None) and (Some, Some, Some) (remote_name, index_name, documents) => { @@ -636,7 +660,7 @@ pub fn import_data_from_req(req: &HttpRequest) -> Result, Mei Ok(Some(ImportData { remote_name: remote_name.to_string(), - index_name: index_name.to_string(), + index_name: index_name.map(|index_name| index_name.to_string()), document_count, })) } @@ -684,7 +708,32 @@ pub fn import_metadata_from_req( header_name: PROXY_IMPORT_TOTAL_INDEX_DOCS_HEADER, msg: format!("while URL-decoding total index documents: {err}"), })?; - (index_count, task_key, total_index_documents) + (index_count, Some(task_key), total_index_documents) + } + (Some(index_count), None, Some(total_index_documents)) => { + let index_count = urlencoding::decode(index_count.to_str().map_err(|err| { + MeilisearchHttpError::InvalidHeaderValue { + header_name: PROXY_IMPORT_REMOTE_HEADER, + msg: format!("while parsing import index count as UTF-8: {err}"), + } + })?) + .map_err(|err| MeilisearchHttpError::InvalidHeaderValue { + header_name: PROXY_IMPORT_INDEX_COUNT_HEADER, + msg: format!("while URL-decoding import index count: {err}"), + })?; + + let total_index_documents = + urlencoding::decode(total_index_documents.to_str().map_err(|err| { + MeilisearchHttpError::InvalidHeaderValue { + header_name: PROXY_IMPORT_TOTAL_INDEX_DOCS_HEADER, + msg: format!("while parsing total index documents as UTF-8: {err}"), + } + })?) + .map_err(|err| MeilisearchHttpError::InvalidHeaderValue { + header_name: PROXY_IMPORT_TOTAL_INDEX_DOCS_HEADER, + msg: format!("while URL-decoding total index documents: {err}"), + })?; + (index_count, None, total_index_documents) } // catch-all pattern that has to contain an inconsistency since we already matched (None, None, None) and (Some, Some, Some) (index_count, task_key, total_index_documents) => { @@ -702,11 +751,16 @@ pub fn import_metadata_from_req( msg: format!("while parsing the index count as an integer: {err}"), })?; - let task_key: DocumentId = - task_key.parse().map_err(|err| MeilisearchHttpError::InvalidHeaderValue { - header_name: PROXY_IMPORT_TASK_KEY_HEADER, - msg: format!("while parsing import task key as an integer: {err}"), - })?; + let task_key = task_key + .map(|task_key| { + let task_key: Result = + task_key.parse().map_err(|err| MeilisearchHttpError::InvalidHeaderValue { + header_name: PROXY_IMPORT_TASK_KEY_HEADER, + msg: format!("while parsing import task key as an integer: {err}"), + }); + task_key + }) + .transpose()?; let total_index_documents: u64 = total_index_documents.parse().map_err(|err| MeilisearchHttpError::InvalidHeaderValue { diff --git a/crates/meilisearch/src/routes/indexes/settings.rs b/crates/meilisearch/src/routes/indexes/settings.rs index f67664b9e..a7bae4795 100644 --- a/crates/meilisearch/src/routes/indexes/settings.rs +++ b/crates/meilisearch/src/routes/indexes/settings.rs @@ -17,6 +17,9 @@ use super::settings_analytics::*; use crate::analytics::Analytics; use crate::extractors::authentication::policies::*; use crate::extractors::authentication::GuardedData; +use crate::routes::indexes::enterprise_edition::proxy::{ + proxy, task_network_and_check_leader_and_version, Body, +}; use crate::routes::{get_task_id, is_dry_run, SummarizedTaskView}; use crate::Opt; @@ -76,14 +79,13 @@ macro_rules! make_setting_route { use meilisearch_types::index_uid::IndexUid; use meilisearch_types::milli::update::Setting; use meilisearch_types::settings::{settings, Settings}; - use meilisearch_types::tasks::KindWithContent; use tracing::debug; use $crate::analytics::Analytics; use $crate::extractors::authentication::policies::*; use $crate::extractors::authentication::GuardedData; use $crate::extractors::sequential_extractor::SeqHandler; use $crate::Opt; - use $crate::routes::{is_dry_run, get_task_id, SummarizedTaskView}; + use $crate::routes::SummarizedTaskView; #[allow(unused_imports)] use super::*; @@ -130,21 +132,7 @@ macro_rules! make_setting_route { let new_settings = Settings { $attr: Setting::Reset.into(), ..Default::default() }; - let allow_index_creation = - index_scheduler.filters().allow_index_creation(&index_uid); - - let task = KindWithContent::SettingsUpdate { - index_uid: index_uid.to_string(), - new_settings: Box::new(new_settings), - is_deletion: true, - allow_index_creation, - }; - let uid = get_task_id(&req, &opt)?; - let dry_run = is_dry_run(&req, &opt)?; - let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) - .await?? - .into(); + let task = register_new_settings(new_settings, true, index_scheduler, &req, index_uid, opt).await?; debug!(returns = ?task, "Delete settings"); Ok(HttpResponse::Accepted().json(task)) @@ -216,21 +204,7 @@ macro_rules! make_setting_route { &index_scheduler, )?; - let allow_index_creation = - index_scheduler.filters().allow_index_creation(&index_uid); - - let task = KindWithContent::SettingsUpdate { - index_uid: index_uid.to_string(), - new_settings: Box::new(new_settings), - is_deletion: false, - allow_index_creation, - }; - let uid = get_task_id(&req, &opt)?; - let dry_run = is_dry_run(&req, &opt)?; - let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) - .await?? - .into(); + let task = register_new_settings(new_settings, false, index_scheduler, &req, index_uid, opt).await?; debug!(returns = ?task, "Update settings"); Ok(HttpResponse::Accepted().json(task)) @@ -571,12 +545,12 @@ pub async fn update_all( index_uid: web::Path, body: AwebJson, DeserrJsonError>, req: HttpRequest, - opt: web::Data, - analytics: web::Data, + opt: Data, + analytics: Data, ) -> Result { let index_uid = IndexUid::try_from(index_uid.into_inner())?; - let new_settings = body.into_inner(); + let new_settings: Settings = body.into_inner(); debug!(parameters = ?new_settings, "Update all settings"); let new_settings = validate_settings(new_settings, &index_scheduler)?; @@ -626,24 +600,55 @@ pub async fn update_all( &req, ); + let task = + register_new_settings(new_settings, false, index_scheduler, &req, index_uid, opt).await?; + + debug!(returns = ?task, "Update all settings"); + Ok(HttpResponse::Accepted().json(task)) +} + +async fn register_new_settings( + new_settings: Settings, + is_deletion: bool, + index_scheduler: GuardedData, Data>, + req: &HttpRequest, + index_uid: IndexUid, + opt: Data, +) -> Result { + let network = index_scheduler.network(); + let task_network = task_network_and_check_leader_and_version(req, &network)?; + let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid); let index_uid = IndexUid::try_from(index_uid.into_inner())?.into_inner(); let task = KindWithContent::SettingsUpdate { - index_uid, - new_settings: Box::new(new_settings), - is_deletion: false, + index_uid: index_uid.clone(), + new_settings: Box::new(new_settings.clone()), + is_deletion, allow_index_creation, }; let uid = get_task_id(&req, &opt)?; let dry_run = is_dry_run(&req, &opt)?; - /// TODO: make sure to proxy all settings tasks - let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) - .await?? - .into(); - debug!(returns = ?task, "Update all settings"); - Ok(HttpResponse::Accepted().json(task)) + let scheduler = index_scheduler.clone(); + let mut task = tokio::task::spawn_blocking(move || { + scheduler.register_with_custom_metadata(task, uid, None, dry_run, task_network) + }) + .await??; + + if let Some(task_network) = task.network.take() { + proxy( + &index_scheduler, + Some(&index_uid), + &req, + task_network, + network, + Body::inline(new_settings), + &task, + ) + .await?; + } + + Ok(task.into()) } #[utoipa::path( @@ -732,20 +737,8 @@ pub async fn delete_all( let new_settings = Settings::cleared().into_unchecked(); - let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid); - let index_uid = IndexUid::try_from(index_uid.into_inner())?.into_inner(); - let task = KindWithContent::SettingsUpdate { - index_uid, - new_settings: Box::new(new_settings), - is_deletion: true, - allow_index_creation, - }; - let uid = get_task_id(&req, &opt)?; - let dry_run = is_dry_run(&req, &opt)?; - let task: SummarizedTaskView = - tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)) - .await?? - .into(); + let task = + register_new_settings(new_settings, true, index_scheduler, &req, index_uid, opt).await?; debug!(returns = ?task, "Delete all settings"); Ok(HttpResponse::Accepted().json(task))