Compare commits

...

4 Commits

Author SHA1 Message Date
Louis Dureuil
4e4d6dfe2c add batch reason 2025-12-11 22:57:10 +01:00
Louis Dureuil
65944df325 Address issue where old tasks where handled with new network 2025-12-11 17:47:51 +01:00
Louis Dureuil
ed3cb36dca Update migration test to check that the migrated network version is nil UUID 2025-12-11 15:51:55 +01:00
Louis Dureuil
316998ce97 upgrade the network to a nil version to make sure that all upgradees have the same version 2025-12-11 15:46:09 +01:00
10 changed files with 124 additions and 57 deletions

View File

@@ -745,6 +745,7 @@ impl IndexScheduler {
mut current_batch: ProcessingBatch,
) -> Result<Option<(Batch, ProcessingBatch)>> {
current_batch.processing(Some(&mut task));
current_batch.reason(BatchStopReason::NetworkTask { id: task.uid });
let change_version =
task.network.as_ref().map(|network| network.network_version()).unwrap_or_default();
@@ -777,11 +778,16 @@ impl IndexScheduler {
task_version >= change_version
});
let (batch, current_batch) = res?;
let (batch, mut current_batch) = res?;
let batch = match batch {
Some(batch) => {
let inner_batch = Box::new(batch);
let inner_reason = current_batch.reason.to_string();
current_batch.reason(BatchStopReason::NetworkTaskOlderTasks {
id: task.uid,
inner_reason,
});
Batch::NetworkIndexBatch { network_task: task, inner_batch }
}
@@ -819,10 +825,15 @@ impl IndexScheduler {
task_version != change_version
});
let (batch, current_batch) = res?;
let (batch, mut current_batch) = res?;
let batch = batch.map(|batch| {
let inner_batch = Box::new(batch);
let inner_reason = current_batch.reason.to_string();
current_batch.reason(BatchStopReason::NetworkTaskOlderTasks {
id: task.uid,
inner_reason,
});
(Batch::NetworkIndexBatch { network_task: task, inner_batch }, current_batch)
});

View File

@@ -3,7 +3,6 @@
// Use of this source code is governed by the Business Source License 1.1,
// as found in the LICENSE-EE file or at <https://mariadb.com/bsl11>
use std::collections::BTreeMap;
use std::time::Duration;
use bumpalo::Bump;
@@ -31,13 +30,18 @@ impl IndexScheduler {
current_batch: &mut ProcessingBatch,
progress: Progress,
) -> Result<(Vec<Task>, ProcessBatchInfo)> {
let (mut tasks, info) = self.process_batch(*inner_batch, current_batch, progress)?;
let KindWithContent::NetworkTopologyChange(network_topology_change) =
&mut network_task.kind
else {
tracing::error!("unexpected network kind for network task while processing batch");
return Err(Error::CorruptedTaskQueue);
};
let network = network_topology_change.network_for_state();
let (mut tasks, info) =
self.process_batch(*inner_batch, current_batch, progress, network)?;
for task in &tasks {
let Some(network) = task.network.as_ref() else {
continue;
@@ -88,15 +92,21 @@ impl IndexScheduler {
}
};
if let Some((remotes, out_name)) = network_topology_change.export_to_process() {
let moved_documents = self.balance_documents(
let mut moved_documents = None;
if let (Some((remotes, out_name)), Some(new_shards)) =
(network_topology_change.export_to_process(), network_topology_change.new_shards())
{
moved_documents = Some(self.balance_documents(
remotes,
out_name,
network_topology_change.in_name(),
new_shards,
origin,
&progress,
&self.scheduler.must_stop_processing,
)?;
)?);
}
if let Some(moved_documents) = moved_documents {
// we need the mut moved documents to avoid a lifetime error in the previous if let.
network_topology_change.set_moved(moved_documents);
}
network_topology_change.update_state();
@@ -108,18 +118,15 @@ impl IndexScheduler {
Ok((vec![task], Default::default()))
}
fn balance_documents(
fn balance_documents<'a, I: Iterator<Item = (&'a str, &'a Remote)> + Clone>(
&self,
remotes: &BTreeMap<String, Remote>,
remotes: I,
out_name: &str,
in_name: Option<&str>,
new_shards: Shards,
network_change_origin: &Origin,
progress: &Progress,
must_stop_processing: &crate::scheduler::MustStopProcessing,
) -> crate::Result<u64> {
let new_shards =
Shards::from_remotes_local(remotes.keys().map(String::as_str).chain(in_name), in_name);
// TECHDEBT: this spawns a `ureq` agent additionally to `reqwest`. We probably want to harmonize all of this.
let agent = ureq::AgentBuilder::new().timeout(Duration::from_secs(5)).build();
@@ -182,7 +189,7 @@ impl IndexScheduler {
let fields_ids_map = index.fields_ids_map(&index_rtxn)?;
for (remote_name, remote) in remotes {
for (remote_name, remote) in remotes.clone() {
let documents_to_move =
documents_to_move_to.remove(remote_name).unwrap_or_default();

View File

@@ -231,7 +231,12 @@ impl IndexScheduler {
let handle = std::thread::Builder::new()
.name(String::from("batch-operation"))
.spawn_scoped(s, move || {
cloned_index_scheduler.process_batch(batch, processing_batch, p)
cloned_index_scheduler.process_batch(
batch,
processing_batch,
p,
&self.network(),
)
})
.unwrap();

View File

@@ -10,6 +10,7 @@ use meilisearch_types::heed::{RoTxn, RwTxn};
use meilisearch_types::milli::heed::CompactionOption;
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
use meilisearch_types::milli::{self, ChannelCongestion};
use meilisearch_types::network::Network;
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task};
use meilisearch_types::versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
use milli::update::Settings as MilliSettings;
@@ -55,6 +56,7 @@ impl IndexScheduler {
batch: Batch,
current_batch: &mut ProcessingBatch,
progress: Progress,
network: &Network,
) -> Result<(Vec<Task>, ProcessBatchInfo)> {
#[cfg(test)]
{
@@ -176,6 +178,7 @@ impl IndexScheduler {
op,
&progress,
current_batch.embedder_stats.clone(),
network,
)?;
{
@@ -235,6 +238,7 @@ impl IndexScheduler {
Batch::IndexUpdate { index_uid, primary_key, new_index_uid: None, task },
current_batch,
progress,
network,
)
}
Batch::IndexUpdate { index_uid, primary_key, new_index_uid, mut task } => {

View File

@@ -8,6 +8,7 @@ use meilisearch_types::milli::progress::{EmbedderStats, Progress};
use meilisearch_types::milli::update::new::indexer::{self, UpdateByFunction};
use meilisearch_types::milli::update::DocumentAdditionResult;
use meilisearch_types::milli::{self, ChannelCongestion, Filter};
use meilisearch_types::network::Network;
use meilisearch_types::settings::apply_settings_to_builder;
use meilisearch_types::tasks::{Details, KindWithContent, Status, Task};
use meilisearch_types::Index;
@@ -36,6 +37,7 @@ impl IndexScheduler {
operation: IndexOperation,
progress: &Progress,
embedder_stats: Arc<EmbedderStats>,
network: &Network,
) -> Result<(Vec<Task>, Option<ChannelCongestion>)> {
let indexer_alloc = Bump::new();
let started_processing_at = std::time::Instant::now();
@@ -67,8 +69,6 @@ impl IndexScheduler {
IndexOperation::DocumentOperation { index_uid, primary_key, operations, mut tasks } => {
progress.update_progress(DocumentOperationProgress::RetrievingConfig);
let network = self.network();
let shards = network.shards();
// TODO: at some point, for better efficiency we might want to reuse the bumpalo for successive batches.
@@ -504,6 +504,7 @@ impl IndexScheduler {
},
progress,
embedder_stats.clone(),
network,
)?;
let (settings_tasks, _congestion) = self.apply_index_operation(
@@ -512,6 +513,7 @@ impl IndexScheduler {
IndexOperation::Settings { index_uid, settings, tasks: settings_tasks },
progress,
embedder_stats,
network,
)?;
let mut tasks = settings_tasks;

View File

@@ -58,7 +58,7 @@ impl super::UpgradeIndexScheduler for MigrateNetwork {
})
.collect();
let network = Network { local, remotes, leader, version: Uuid::now_v7() };
let network = Network { local, remotes, leader, version: Uuid::nil() };
set_network(env, wtxn, &network)?;
Ok(())

View File

@@ -899,6 +899,17 @@ pub enum BatchStopReason {
SettingsWithDocumentOperation {
id: TaskId,
},
NetworkTask {
id: TaskId,
},
NetworkTaskOlderTasks {
id: TaskId,
inner_reason: String,
},
NetworkTaskImportTasks {
id: TaskId,
inner_reason: String,
},
}
impl BatchStopReason {
@@ -987,6 +998,24 @@ impl Display for BatchStopReason {
"stopped before task with id {id} because it is a document operation which cannot be batched with settings changes"
)
}
BatchStopReason::NetworkTask { id } => {
write!(
f,
"stopped after task with id {id} because it is a network topology change task"
)
}
BatchStopReason::NetworkTaskOlderTasks { id, inner_reason } => {
write!(
f,
"stopped after batching network task with id {id} and a batch of older tasks: {inner_reason}"
)
}
BatchStopReason::NetworkTaskImportTasks { id, inner_reason } => {
write!(
f,
"stopped after batching network task with id {id} and a batch of import tasks: {inner_reason}"
)
}
}
}
}

View File

@@ -9,7 +9,7 @@ use utoipa::ToSchema;
use uuid::Uuid;
use crate::error::ResponseError;
use crate::network::{Network, Remote};
use crate::network::Network;
use crate::tasks::{Details, TaskId};
#[cfg(not(feature = "enterprise"))]
@@ -165,37 +165,25 @@ impl From<Result<TaskId, ResponseError>> for RemoteTask {
#[serde(rename_all = "camelCase")]
pub struct NetworkTopologyChange {
state: NetworkTopologyState,
// in name, `None` if the node is no longer part of the network
#[serde(skip_serializing_if = "Option::is_none")]
in_name: Option<String>,
// out name, `None` if the node is new to the network
#[serde(skip_serializing_if = "Option::is_none")]
out_name: Option<String>,
out_remotes: BTreeMap<String, Remote>,
in_remotes: BTreeMap<String, InRemote>,
old_network: Network,
new_network: Network,
stats: NetworkTopologyStats,
}
impl NetworkTopologyChange {
pub fn new(old_network: Network, new_network: Network) -> Self {
// we use our new name as import name
let in_name = new_network.local;
// we use our old name as export name
let out_name = old_network.local.or_else(|| in_name.clone());
let in_name = new_network.local.as_deref();
let out_name = old_network.local.as_deref().or(in_name);
// we export to the new network
let mut out_remotes = new_network.remotes;
// don't export to ourselves
if let Some(in_name) = &in_name {
out_remotes.remove(in_name);
}
let in_remotes = if in_name.is_some() {
old_network
.remotes
.into_keys()
.chain(out_remotes.keys().cloned())
.keys()
.chain(new_network.remotes.keys())
// don't await imports from ourselves
.filter(|name| Some(name.as_str()) != out_name.as_deref())
.filter(|name| Some(name.as_str()) != out_name)
.cloned()
.map(|name| (name, InRemote::new()))
.collect()
} else {
@@ -203,27 +191,25 @@ impl NetworkTopologyChange {
};
Self {
state: NetworkTopologyState::WaitingForOlderTasks,
in_name,
out_name,
out_remotes,
in_remotes,
stats: NetworkTopologyStats { moved_documents: 0 },
new_network,
old_network,
}
}
pub fn in_name(&self) -> Option<&str> {
self.new_network.local.as_deref()
}
pub fn out_name(&self) -> Option<&str> {
self.old_network.local.as_deref().or_else(|| self.in_name())
}
pub fn state(&self) -> NetworkTopologyState {
self.state
}
pub fn out_name(&self) -> Option<&str> {
// unwrap: one of out name or in_name must be defined
self.out_name.as_deref()
}
pub fn in_name(&self) -> Option<&str> {
self.in_name.as_deref()
}
pub fn to_details(&self) -> Details {
let message = match self.state {
NetworkTopologyState::WaitingForOlderTasks => {
@@ -326,7 +312,7 @@ impl NetworkTopologyChange {
pub fn merge(&mut self, other: NetworkTopologyChange) {
// The topology change has a guarantee of forward progress, so for each field we're going to keep the "most advanced" values.
let Self { state, in_name: _, out_name: _, out_remotes: _, in_remotes, stats } = self;
let Self { state, new_network: _, old_network: _, in_remotes, stats } = self;
*state = Ord::max(*state, other.state);
*stats = Ord::max(*stats, other.stats);
@@ -360,6 +346,15 @@ impl NetworkTopologyChange {
}
}
}
pub fn network_for_state(&self) -> &Network {
match self.state {
NetworkTopologyState::WaitingForOlderTasks => &self.old_network,
NetworkTopologyState::ExportingDocuments
| NetworkTopologyState::ImportingDocuments
| NetworkTopologyState::Finished => &self.new_network,
}
}
}
fn merge_import_index_state(left: ImportIndexState, right: ImportIndexState) -> ImportIndexState {

View File

@@ -5,6 +5,7 @@
use std::collections::BTreeMap;
use milli::update::new::indexer::current_edition::sharding::Shards;
use milli::DocumentId;
use roaring::RoaringBitmap;
@@ -16,17 +17,30 @@ use crate::tasks::network::{
};
impl NetworkTopologyChange {
pub fn export_to_process(&self) -> Option<(&BTreeMap<String, Remote>, &str)> {
pub fn export_to_process(
&self,
) -> Option<(impl Iterator<Item = (&str, &Remote)> + Clone, &str)> {
if self.state != NetworkTopologyState::ExportingDocuments {
return None;
}
if self.out_remotes.is_empty() {
if self.new_network.remotes.is_empty() {
return None;
}
let out_name = self.out_name()?;
Some((&self.out_remotes, out_name))
Some((
self.new_network.remotes.iter().filter_map(|(name, remote)| {
// don't export to ourselves
(Some(name.as_str()) != self.in_name()).then_some((name.as_str(), remote))
}),
out_name,
))
}
pub fn new_shards(&self) -> Option<Shards> {
self.new_network.shards()
}
pub fn set_moved(&mut self, moved_documents: u64) {

View File

@@ -184,7 +184,7 @@
}
},
"self": "prod1",
"version": "[uuid]"
"version": "00000000-0000-0000-0000-000000000000"
},
"synchronous": "WaitForResponse"
},