From 0703767fc689e5026545d2c29eddf1c4200253a2 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Tue, 23 Sep 2025 16:35:48 +0200 Subject: [PATCH] Add a process network task type --- .../src/scheduler/create_batch.rs | 21 ++- .../src/scheduler/enterprise_edition/mod.rs | 6 + .../enterprise_edition/process_network.rs | 137 ++++++++++++++++++ crates/index-scheduler/src/scheduler/mod.rs | 1 + .../src/scheduler/process_batch.rs | 3 + 5 files changed, 166 insertions(+), 2 deletions(-) create mode 100644 crates/index-scheduler/src/scheduler/enterprise_edition/mod.rs create mode 100644 crates/index-scheduler/src/scheduler/enterprise_edition/process_network.rs diff --git a/crates/index-scheduler/src/scheduler/create_batch.rs b/crates/index-scheduler/src/scheduler/create_batch.rs index c598ad405..0d1e02113 100644 --- a/crates/index-scheduler/src/scheduler/create_batch.rs +++ b/crates/index-scheduler/src/scheduler/create_batch.rs @@ -55,6 +55,9 @@ pub(crate) enum Batch { UpgradeDatabase { tasks: Vec, }, + NetworkTopologyChanges { + tasks: Vec, + }, } #[derive(Debug)] @@ -116,7 +119,8 @@ impl Batch { Batch::SnapshotCreation(tasks) | Batch::TaskDeletions(tasks) | Batch::UpgradeDatabase { tasks } - | Batch::IndexDeletion { tasks, .. } => { + | Batch::IndexDeletion { tasks, .. } + | Batch::NetworkTopologyChanges { tasks } => { RoaringBitmap::from_iter(tasks.iter().map(|task| task.uid)) } Batch::IndexOperation { op, .. } => match op { @@ -151,6 +155,7 @@ impl Batch { | Dump(_) | Export { .. } | UpgradeDatabase { .. } + | NetworkTopologyChanges { .. } | IndexSwap { .. } => None, IndexOperation { op, .. } => Some(op.index_uid()), IndexCreation { index_uid, .. } @@ -176,6 +181,7 @@ impl fmt::Display for Batch { Batch::IndexDeletion { .. } => f.write_str("IndexDeletion")?, Batch::IndexSwap { .. } => f.write_str("IndexSwap")?, Batch::Export { .. } => f.write_str("Export")?, + Batch::NetworkTopologyChanges { .. } => f.write_str("NetworkTopologyChange")?, Batch::UpgradeDatabase { .. } => f.write_str("UpgradeDatabase")?, }; match index_uid { @@ -545,7 +551,18 @@ impl IndexScheduler { return Ok(Some((Batch::Dump(task), current_batch))); } - // 6. We make a batch from the unprioritised tasks. Start by taking the next enqueued task. + // 6. We batch the network changes. + let to_network = self.queue.tasks.get_kind(rtxn, Kind::NetworkTopologyChange)? & enqueued; + if !to_network.is_empty() { + let mut tasks = self.queue.tasks.get_existing_tasks(rtxn, to_network)?; + current_batch.processing(&mut tasks); + current_batch.reason(BatchStopReason::TaskKindCannotBeBatched { + kind: Kind::NetworkTopologyChange, + }); + return Ok(Some((Batch::NetworkTopologyChanges { tasks }, current_batch))); + } + + // 7. We make a batch from the unprioritised tasks. Start by taking the next enqueued task. let task_id = if let Some(task_id) = enqueued.min() { task_id } else { return Ok(None) }; let mut task = self.queue.tasks.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?; diff --git a/crates/index-scheduler/src/scheduler/enterprise_edition/mod.rs b/crates/index-scheduler/src/scheduler/enterprise_edition/mod.rs new file mode 100644 index 000000000..c6afbbc6e --- /dev/null +++ b/crates/index-scheduler/src/scheduler/enterprise_edition/mod.rs @@ -0,0 +1,6 @@ +// Copyright © 2025 Meilisearch Some Rights Reserved +// This file is part of Meilisearch Enterprise Edition (EE). +// Use of this source code is governed by the Business Source License 1.1, +// as found in the LICENSE-EE file or at + +mod process_network; diff --git a/crates/index-scheduler/src/scheduler/enterprise_edition/process_network.rs b/crates/index-scheduler/src/scheduler/enterprise_edition/process_network.rs new file mode 100644 index 000000000..dac98687a --- /dev/null +++ b/crates/index-scheduler/src/scheduler/enterprise_edition/process_network.rs @@ -0,0 +1,137 @@ +// Copyright © 2025 Meilisearch Some Rights Reserved +// This file is part of Meilisearch Enterprise Edition (EE). +// Use of this source code is governed by the Business Source License 1.1, +// as found in the LICENSE-EE file or at + +use std::collections::BTreeMap; + +use itertools::{EitherOrBoth, Itertools}; +use meilisearch_types::enterprise_edition::network::{DbNetwork, DbRemote, Network, Remote}; +use meilisearch_types::milli; +use meilisearch_types::milli::progress::Progress; +use meilisearch_types::milli::update::Setting; +use meilisearch_types::tasks::{KindWithContent, Status, Task}; + +use crate::{Error, IndexScheduler}; + +impl IndexScheduler { + pub(crate) fn process_network_changes( + &self, + progress: Progress, + mut tasks: Vec, + ) -> crate::Result> { + let mut current_network = Some(self.network()); + for task in &tasks { + let KindWithContent::NetworkTopologyChange { network } = &task.kind else { + continue; + }; + current_network = match (current_network, network) { + (None, None) => None, + (None, Some(network)) => Some(accumulate(DbNetwork::default(), network.clone())?), + (Some(current_network), None) => Some(current_network), + (Some(current_network), Some(new_network)) => { + Some(accumulate(current_network, new_network.clone())?) + } + }; + } + + if let Some(new_network) = current_network { + self.put_network(new_network)?; + } else { + self.put_network(DbNetwork::default())?; + } + + for task in &mut tasks { + task.status = Status::Succeeded; + } + Ok(tasks) + } +} + +fn accumulate(old_network: DbNetwork, new_network: Network) -> crate::Result { + let err = |err| Err(Error::from_milli(milli::Error::UserError(err), None)); + + let merged_local = match new_network.local { + Setting::Set(new_self) => Some(new_self), + Setting::Reset => None, + Setting::NotSet => old_network.local, + }; + + let merged_sharding = match new_network.sharding { + Setting::Set(new_sharding) => new_sharding, + Setting::Reset => false, + Setting::NotSet => old_network.sharding, + }; + + if merged_sharding && merged_local.is_none() { + return err(milli::UserError::NetworkShardingWithoutSelf); + } + + let merged_remotes = match new_network.remotes { + Setting::Set(new_remotes) => { + let mut merged_remotes = BTreeMap::new(); + for either_or_both in old_network + .remotes + .into_iter() + .merge_join_by(new_remotes.into_iter(), |left, right| left.0.cmp(&right.0)) + { + match either_or_both { + EitherOrBoth::Both((name, old), (_, Some(new))) => { + let DbRemote { + url: old_url, + search_api_key: old_search_api_key, + write_api_key: old_write_api_key, + } = old; + + let Remote { + url: new_url, + search_api_key: new_search_api_key, + write_api_key: new_write_api_key, + } = new; + + let merged = DbRemote { + url: match new_url { + Setting::Set(new_url) => new_url, + Setting::Reset => { + return err(milli::UserError::NetworkMissingUrl(name)) + } + Setting::NotSet => old_url, + }, + search_api_key: match new_search_api_key { + Setting::Set(new_search_api_key) => Some(new_search_api_key), + Setting::Reset => None, + Setting::NotSet => old_search_api_key, + }, + write_api_key: match new_write_api_key { + Setting::Set(new_write_api_key) => Some(new_write_api_key), + Setting::Reset => None, + Setting::NotSet => old_write_api_key, + }, + }; + merged_remotes.insert(name, merged); + } + EitherOrBoth::Both((_, _), (_, None)) | EitherOrBoth::Right((_, None)) => {} + EitherOrBoth::Left((name, node)) => { + merged_remotes.insert(name, node); + } + EitherOrBoth::Right((name, Some(node))) => { + let Some(url) = node.url.set() else { + return err(milli::UserError::NetworkMissingUrl(name)); + }; + let node = DbRemote { + url, + search_api_key: node.search_api_key.set(), + write_api_key: node.write_api_key.set(), + }; + merged_remotes.insert(name, node); + } + } + } + merged_remotes + } + Setting::Reset => BTreeMap::new(), + Setting::NotSet => old_network.remotes, + }; + + Ok(DbNetwork { local: merged_local, remotes: merged_remotes, sharding: merged_sharding }) +} diff --git a/crates/index-scheduler/src/scheduler/mod.rs b/crates/index-scheduler/src/scheduler/mod.rs index c57bbf70d..4b2360adf 100644 --- a/crates/index-scheduler/src/scheduler/mod.rs +++ b/crates/index-scheduler/src/scheduler/mod.rs @@ -2,6 +2,7 @@ mod autobatcher; #[cfg(test)] mod autobatcher_test; mod create_batch; +mod enterprise_edition; mod process_batch; mod process_dump_creation; mod process_export; diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index efa137cdb..7dde352ca 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -135,6 +135,9 @@ impl IndexScheduler { Batch::Dump(task) => self .process_dump_creation(progress, task) .map(|tasks| (tasks, ProcessBatchInfo::default())), + Batch::NetworkTopologyChanges { tasks } => self + .process_network_changes(progress, tasks) + .map(|tasks| (tasks, ProcessBatchInfo::default())), Batch::IndexOperation { op, must_create_index } => { let index_uid = op.index_uid().to_string(); let index = if must_create_index {