mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-10-10 21:56:27 +00:00
Add a process network task type
This commit is contained in:
@@ -55,6 +55,9 @@ pub(crate) enum Batch {
|
||||
UpgradeDatabase {
|
||||
tasks: Vec<Task>,
|
||||
},
|
||||
NetworkTopologyChanges {
|
||||
tasks: Vec<Task>,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -116,7 +119,8 @@ impl Batch {
|
||||
Batch::SnapshotCreation(tasks)
|
||||
| Batch::TaskDeletions(tasks)
|
||||
| Batch::UpgradeDatabase { tasks }
|
||||
| Batch::IndexDeletion { tasks, .. } => {
|
||||
| Batch::IndexDeletion { tasks, .. }
|
||||
| Batch::NetworkTopologyChanges { tasks } => {
|
||||
RoaringBitmap::from_iter(tasks.iter().map(|task| task.uid))
|
||||
}
|
||||
Batch::IndexOperation { op, .. } => match op {
|
||||
@@ -151,6 +155,7 @@ impl Batch {
|
||||
| Dump(_)
|
||||
| Export { .. }
|
||||
| UpgradeDatabase { .. }
|
||||
| NetworkTopologyChanges { .. }
|
||||
| IndexSwap { .. } => None,
|
||||
IndexOperation { op, .. } => Some(op.index_uid()),
|
||||
IndexCreation { index_uid, .. }
|
||||
@@ -176,6 +181,7 @@ impl fmt::Display for Batch {
|
||||
Batch::IndexDeletion { .. } => f.write_str("IndexDeletion")?,
|
||||
Batch::IndexSwap { .. } => f.write_str("IndexSwap")?,
|
||||
Batch::Export { .. } => f.write_str("Export")?,
|
||||
Batch::NetworkTopologyChanges { .. } => f.write_str("NetworkTopologyChange")?,
|
||||
Batch::UpgradeDatabase { .. } => f.write_str("UpgradeDatabase")?,
|
||||
};
|
||||
match index_uid {
|
||||
@@ -545,7 +551,18 @@ impl IndexScheduler {
|
||||
return Ok(Some((Batch::Dump(task), current_batch)));
|
||||
}
|
||||
|
||||
// 6. We make a batch from the unprioritised tasks. Start by taking the next enqueued task.
|
||||
// 6. We batch the network changes.
|
||||
let to_network = self.queue.tasks.get_kind(rtxn, Kind::NetworkTopologyChange)? & enqueued;
|
||||
if !to_network.is_empty() {
|
||||
let mut tasks = self.queue.tasks.get_existing_tasks(rtxn, to_network)?;
|
||||
current_batch.processing(&mut tasks);
|
||||
current_batch.reason(BatchStopReason::TaskKindCannotBeBatched {
|
||||
kind: Kind::NetworkTopologyChange,
|
||||
});
|
||||
return Ok(Some((Batch::NetworkTopologyChanges { tasks }, current_batch)));
|
||||
}
|
||||
|
||||
// 7. We make a batch from the unprioritised tasks. Start by taking the next enqueued task.
|
||||
let task_id = if let Some(task_id) = enqueued.min() { task_id } else { return Ok(None) };
|
||||
let mut task =
|
||||
self.queue.tasks.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
|
||||
|
@@ -0,0 +1,6 @@
|
||||
// Copyright © 2025 Meilisearch Some Rights Reserved
|
||||
// This file is part of Meilisearch Enterprise Edition (EE).
|
||||
// Use of this source code is governed by the Business Source License 1.1,
|
||||
// as found in the LICENSE-EE file or at <https://mariadb.com/bsl11>
|
||||
|
||||
mod process_network;
|
@@ -0,0 +1,137 @@
|
||||
// Copyright © 2025 Meilisearch Some Rights Reserved
|
||||
// This file is part of Meilisearch Enterprise Edition (EE).
|
||||
// Use of this source code is governed by the Business Source License 1.1,
|
||||
// as found in the LICENSE-EE file or at <https://mariadb.com/bsl11>
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use itertools::{EitherOrBoth, Itertools};
|
||||
use meilisearch_types::enterprise_edition::network::{DbNetwork, DbRemote, Network, Remote};
|
||||
use meilisearch_types::milli;
|
||||
use meilisearch_types::milli::progress::Progress;
|
||||
use meilisearch_types::milli::update::Setting;
|
||||
use meilisearch_types::tasks::{KindWithContent, Status, Task};
|
||||
|
||||
use crate::{Error, IndexScheduler};
|
||||
|
||||
impl IndexScheduler {
|
||||
pub(crate) fn process_network_changes(
|
||||
&self,
|
||||
progress: Progress,
|
||||
mut tasks: Vec<Task>,
|
||||
) -> crate::Result<Vec<Task>> {
|
||||
let mut current_network = Some(self.network());
|
||||
for task in &tasks {
|
||||
let KindWithContent::NetworkTopologyChange { network } = &task.kind else {
|
||||
continue;
|
||||
};
|
||||
current_network = match (current_network, network) {
|
||||
(None, None) => None,
|
||||
(None, Some(network)) => Some(accumulate(DbNetwork::default(), network.clone())?),
|
||||
(Some(current_network), None) => Some(current_network),
|
||||
(Some(current_network), Some(new_network)) => {
|
||||
Some(accumulate(current_network, new_network.clone())?)
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
if let Some(new_network) = current_network {
|
||||
self.put_network(new_network)?;
|
||||
} else {
|
||||
self.put_network(DbNetwork::default())?;
|
||||
}
|
||||
|
||||
for task in &mut tasks {
|
||||
task.status = Status::Succeeded;
|
||||
}
|
||||
Ok(tasks)
|
||||
}
|
||||
}
|
||||
|
||||
fn accumulate(old_network: DbNetwork, new_network: Network) -> crate::Result<DbNetwork> {
|
||||
let err = |err| Err(Error::from_milli(milli::Error::UserError(err), None));
|
||||
|
||||
let merged_local = match new_network.local {
|
||||
Setting::Set(new_self) => Some(new_self),
|
||||
Setting::Reset => None,
|
||||
Setting::NotSet => old_network.local,
|
||||
};
|
||||
|
||||
let merged_sharding = match new_network.sharding {
|
||||
Setting::Set(new_sharding) => new_sharding,
|
||||
Setting::Reset => false,
|
||||
Setting::NotSet => old_network.sharding,
|
||||
};
|
||||
|
||||
if merged_sharding && merged_local.is_none() {
|
||||
return err(milli::UserError::NetworkShardingWithoutSelf);
|
||||
}
|
||||
|
||||
let merged_remotes = match new_network.remotes {
|
||||
Setting::Set(new_remotes) => {
|
||||
let mut merged_remotes = BTreeMap::new();
|
||||
for either_or_both in old_network
|
||||
.remotes
|
||||
.into_iter()
|
||||
.merge_join_by(new_remotes.into_iter(), |left, right| left.0.cmp(&right.0))
|
||||
{
|
||||
match either_or_both {
|
||||
EitherOrBoth::Both((name, old), (_, Some(new))) => {
|
||||
let DbRemote {
|
||||
url: old_url,
|
||||
search_api_key: old_search_api_key,
|
||||
write_api_key: old_write_api_key,
|
||||
} = old;
|
||||
|
||||
let Remote {
|
||||
url: new_url,
|
||||
search_api_key: new_search_api_key,
|
||||
write_api_key: new_write_api_key,
|
||||
} = new;
|
||||
|
||||
let merged = DbRemote {
|
||||
url: match new_url {
|
||||
Setting::Set(new_url) => new_url,
|
||||
Setting::Reset => {
|
||||
return err(milli::UserError::NetworkMissingUrl(name))
|
||||
}
|
||||
Setting::NotSet => old_url,
|
||||
},
|
||||
search_api_key: match new_search_api_key {
|
||||
Setting::Set(new_search_api_key) => Some(new_search_api_key),
|
||||
Setting::Reset => None,
|
||||
Setting::NotSet => old_search_api_key,
|
||||
},
|
||||
write_api_key: match new_write_api_key {
|
||||
Setting::Set(new_write_api_key) => Some(new_write_api_key),
|
||||
Setting::Reset => None,
|
||||
Setting::NotSet => old_write_api_key,
|
||||
},
|
||||
};
|
||||
merged_remotes.insert(name, merged);
|
||||
}
|
||||
EitherOrBoth::Both((_, _), (_, None)) | EitherOrBoth::Right((_, None)) => {}
|
||||
EitherOrBoth::Left((name, node)) => {
|
||||
merged_remotes.insert(name, node);
|
||||
}
|
||||
EitherOrBoth::Right((name, Some(node))) => {
|
||||
let Some(url) = node.url.set() else {
|
||||
return err(milli::UserError::NetworkMissingUrl(name));
|
||||
};
|
||||
let node = DbRemote {
|
||||
url,
|
||||
search_api_key: node.search_api_key.set(),
|
||||
write_api_key: node.write_api_key.set(),
|
||||
};
|
||||
merged_remotes.insert(name, node);
|
||||
}
|
||||
}
|
||||
}
|
||||
merged_remotes
|
||||
}
|
||||
Setting::Reset => BTreeMap::new(),
|
||||
Setting::NotSet => old_network.remotes,
|
||||
};
|
||||
|
||||
Ok(DbNetwork { local: merged_local, remotes: merged_remotes, sharding: merged_sharding })
|
||||
}
|
@@ -2,6 +2,7 @@ mod autobatcher;
|
||||
#[cfg(test)]
|
||||
mod autobatcher_test;
|
||||
mod create_batch;
|
||||
mod enterprise_edition;
|
||||
mod process_batch;
|
||||
mod process_dump_creation;
|
||||
mod process_export;
|
||||
|
@@ -135,6 +135,9 @@ impl IndexScheduler {
|
||||
Batch::Dump(task) => self
|
||||
.process_dump_creation(progress, task)
|
||||
.map(|tasks| (tasks, ProcessBatchInfo::default())),
|
||||
Batch::NetworkTopologyChanges { tasks } => self
|
||||
.process_network_changes(progress, tasks)
|
||||
.map(|tasks| (tasks, ProcessBatchInfo::default())),
|
||||
Batch::IndexOperation { op, must_create_index } => {
|
||||
let index_uid = op.index_uid().to_string();
|
||||
let index = if must_create_index {
|
||||
|
Reference in New Issue
Block a user