mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-12-13 07:57:02 +00:00
Compare commits
4 Commits
prototype-
...
change-net
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4e4d6dfe2c | ||
|
|
65944df325 | ||
|
|
ed3cb36dca | ||
|
|
316998ce97 |
@@ -745,6 +745,7 @@ impl IndexScheduler {
|
||||
mut current_batch: ProcessingBatch,
|
||||
) -> Result<Option<(Batch, ProcessingBatch)>> {
|
||||
current_batch.processing(Some(&mut task));
|
||||
current_batch.reason(BatchStopReason::NetworkTask { id: task.uid });
|
||||
|
||||
let change_version =
|
||||
task.network.as_ref().map(|network| network.network_version()).unwrap_or_default();
|
||||
@@ -777,11 +778,16 @@ impl IndexScheduler {
|
||||
task_version >= change_version
|
||||
});
|
||||
|
||||
let (batch, current_batch) = res?;
|
||||
let (batch, mut current_batch) = res?;
|
||||
|
||||
let batch = match batch {
|
||||
Some(batch) => {
|
||||
let inner_batch = Box::new(batch);
|
||||
let inner_reason = current_batch.reason.to_string();
|
||||
current_batch.reason(BatchStopReason::NetworkTaskOlderTasks {
|
||||
id: task.uid,
|
||||
inner_reason,
|
||||
});
|
||||
|
||||
Batch::NetworkIndexBatch { network_task: task, inner_batch }
|
||||
}
|
||||
@@ -819,10 +825,15 @@ impl IndexScheduler {
|
||||
task_version != change_version
|
||||
});
|
||||
|
||||
let (batch, current_batch) = res?;
|
||||
let (batch, mut current_batch) = res?;
|
||||
|
||||
let batch = batch.map(|batch| {
|
||||
let inner_batch = Box::new(batch);
|
||||
let inner_reason = current_batch.reason.to_string();
|
||||
current_batch.reason(BatchStopReason::NetworkTaskOlderTasks {
|
||||
id: task.uid,
|
||||
inner_reason,
|
||||
});
|
||||
|
||||
(Batch::NetworkIndexBatch { network_task: task, inner_batch }, current_batch)
|
||||
});
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
// Use of this source code is governed by the Business Source License 1.1,
|
||||
// as found in the LICENSE-EE file or at <https://mariadb.com/bsl11>
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::time::Duration;
|
||||
|
||||
use bumpalo::Bump;
|
||||
@@ -31,13 +30,18 @@ impl IndexScheduler {
|
||||
current_batch: &mut ProcessingBatch,
|
||||
progress: Progress,
|
||||
) -> Result<(Vec<Task>, ProcessBatchInfo)> {
|
||||
let (mut tasks, info) = self.process_batch(*inner_batch, current_batch, progress)?;
|
||||
let KindWithContent::NetworkTopologyChange(network_topology_change) =
|
||||
&mut network_task.kind
|
||||
else {
|
||||
tracing::error!("unexpected network kind for network task while processing batch");
|
||||
return Err(Error::CorruptedTaskQueue);
|
||||
};
|
||||
|
||||
let network = network_topology_change.network_for_state();
|
||||
|
||||
let (mut tasks, info) =
|
||||
self.process_batch(*inner_batch, current_batch, progress, network)?;
|
||||
|
||||
for task in &tasks {
|
||||
let Some(network) = task.network.as_ref() else {
|
||||
continue;
|
||||
@@ -88,15 +92,21 @@ impl IndexScheduler {
|
||||
}
|
||||
};
|
||||
|
||||
if let Some((remotes, out_name)) = network_topology_change.export_to_process() {
|
||||
let moved_documents = self.balance_documents(
|
||||
let mut moved_documents = None;
|
||||
if let (Some((remotes, out_name)), Some(new_shards)) =
|
||||
(network_topology_change.export_to_process(), network_topology_change.new_shards())
|
||||
{
|
||||
moved_documents = Some(self.balance_documents(
|
||||
remotes,
|
||||
out_name,
|
||||
network_topology_change.in_name(),
|
||||
new_shards,
|
||||
origin,
|
||||
&progress,
|
||||
&self.scheduler.must_stop_processing,
|
||||
)?;
|
||||
)?);
|
||||
}
|
||||
if let Some(moved_documents) = moved_documents {
|
||||
// we need the mut moved documents to avoid a lifetime error in the previous if let.
|
||||
network_topology_change.set_moved(moved_documents);
|
||||
}
|
||||
network_topology_change.update_state();
|
||||
@@ -108,18 +118,15 @@ impl IndexScheduler {
|
||||
Ok((vec![task], Default::default()))
|
||||
}
|
||||
|
||||
fn balance_documents(
|
||||
fn balance_documents<'a, I: Iterator<Item = (&'a str, &'a Remote)> + Clone>(
|
||||
&self,
|
||||
remotes: &BTreeMap<String, Remote>,
|
||||
remotes: I,
|
||||
out_name: &str,
|
||||
in_name: Option<&str>,
|
||||
new_shards: Shards,
|
||||
network_change_origin: &Origin,
|
||||
progress: &Progress,
|
||||
must_stop_processing: &crate::scheduler::MustStopProcessing,
|
||||
) -> crate::Result<u64> {
|
||||
let new_shards =
|
||||
Shards::from_remotes_local(remotes.keys().map(String::as_str).chain(in_name), in_name);
|
||||
|
||||
// TECHDEBT: this spawns a `ureq` agent additionally to `reqwest`. We probably want to harmonize all of this.
|
||||
let agent = ureq::AgentBuilder::new().timeout(Duration::from_secs(5)).build();
|
||||
|
||||
@@ -182,7 +189,7 @@ impl IndexScheduler {
|
||||
|
||||
let fields_ids_map = index.fields_ids_map(&index_rtxn)?;
|
||||
|
||||
for (remote_name, remote) in remotes {
|
||||
for (remote_name, remote) in remotes.clone() {
|
||||
let documents_to_move =
|
||||
documents_to_move_to.remove(remote_name).unwrap_or_default();
|
||||
|
||||
|
||||
@@ -231,7 +231,12 @@ impl IndexScheduler {
|
||||
let handle = std::thread::Builder::new()
|
||||
.name(String::from("batch-operation"))
|
||||
.spawn_scoped(s, move || {
|
||||
cloned_index_scheduler.process_batch(batch, processing_batch, p)
|
||||
cloned_index_scheduler.process_batch(
|
||||
batch,
|
||||
processing_batch,
|
||||
p,
|
||||
&self.network(),
|
||||
)
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ use meilisearch_types::heed::{RoTxn, RwTxn};
|
||||
use meilisearch_types::milli::heed::CompactionOption;
|
||||
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
|
||||
use meilisearch_types::milli::{self, ChannelCongestion};
|
||||
use meilisearch_types::network::Network;
|
||||
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task};
|
||||
use meilisearch_types::versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
|
||||
use milli::update::Settings as MilliSettings;
|
||||
@@ -55,6 +56,7 @@ impl IndexScheduler {
|
||||
batch: Batch,
|
||||
current_batch: &mut ProcessingBatch,
|
||||
progress: Progress,
|
||||
network: &Network,
|
||||
) -> Result<(Vec<Task>, ProcessBatchInfo)> {
|
||||
#[cfg(test)]
|
||||
{
|
||||
@@ -176,6 +178,7 @@ impl IndexScheduler {
|
||||
op,
|
||||
&progress,
|
||||
current_batch.embedder_stats.clone(),
|
||||
network,
|
||||
)?;
|
||||
|
||||
{
|
||||
@@ -235,6 +238,7 @@ impl IndexScheduler {
|
||||
Batch::IndexUpdate { index_uid, primary_key, new_index_uid: None, task },
|
||||
current_batch,
|
||||
progress,
|
||||
network,
|
||||
)
|
||||
}
|
||||
Batch::IndexUpdate { index_uid, primary_key, new_index_uid, mut task } => {
|
||||
|
||||
@@ -8,6 +8,7 @@ use meilisearch_types::milli::progress::{EmbedderStats, Progress};
|
||||
use meilisearch_types::milli::update::new::indexer::{self, UpdateByFunction};
|
||||
use meilisearch_types::milli::update::DocumentAdditionResult;
|
||||
use meilisearch_types::milli::{self, ChannelCongestion, Filter};
|
||||
use meilisearch_types::network::Network;
|
||||
use meilisearch_types::settings::apply_settings_to_builder;
|
||||
use meilisearch_types::tasks::{Details, KindWithContent, Status, Task};
|
||||
use meilisearch_types::Index;
|
||||
@@ -36,6 +37,7 @@ impl IndexScheduler {
|
||||
operation: IndexOperation,
|
||||
progress: &Progress,
|
||||
embedder_stats: Arc<EmbedderStats>,
|
||||
network: &Network,
|
||||
) -> Result<(Vec<Task>, Option<ChannelCongestion>)> {
|
||||
let indexer_alloc = Bump::new();
|
||||
let started_processing_at = std::time::Instant::now();
|
||||
@@ -67,8 +69,6 @@ impl IndexScheduler {
|
||||
IndexOperation::DocumentOperation { index_uid, primary_key, operations, mut tasks } => {
|
||||
progress.update_progress(DocumentOperationProgress::RetrievingConfig);
|
||||
|
||||
let network = self.network();
|
||||
|
||||
let shards = network.shards();
|
||||
|
||||
// TODO: at some point, for better efficiency we might want to reuse the bumpalo for successive batches.
|
||||
@@ -504,6 +504,7 @@ impl IndexScheduler {
|
||||
},
|
||||
progress,
|
||||
embedder_stats.clone(),
|
||||
network,
|
||||
)?;
|
||||
|
||||
let (settings_tasks, _congestion) = self.apply_index_operation(
|
||||
@@ -512,6 +513,7 @@ impl IndexScheduler {
|
||||
IndexOperation::Settings { index_uid, settings, tasks: settings_tasks },
|
||||
progress,
|
||||
embedder_stats,
|
||||
network,
|
||||
)?;
|
||||
|
||||
let mut tasks = settings_tasks;
|
||||
|
||||
@@ -58,7 +58,7 @@ impl super::UpgradeIndexScheduler for MigrateNetwork {
|
||||
})
|
||||
.collect();
|
||||
|
||||
let network = Network { local, remotes, leader, version: Uuid::now_v7() };
|
||||
let network = Network { local, remotes, leader, version: Uuid::nil() };
|
||||
|
||||
set_network(env, wtxn, &network)?;
|
||||
Ok(())
|
||||
|
||||
@@ -899,6 +899,17 @@ pub enum BatchStopReason {
|
||||
SettingsWithDocumentOperation {
|
||||
id: TaskId,
|
||||
},
|
||||
NetworkTask {
|
||||
id: TaskId,
|
||||
},
|
||||
NetworkTaskOlderTasks {
|
||||
id: TaskId,
|
||||
inner_reason: String,
|
||||
},
|
||||
NetworkTaskImportTasks {
|
||||
id: TaskId,
|
||||
inner_reason: String,
|
||||
},
|
||||
}
|
||||
|
||||
impl BatchStopReason {
|
||||
@@ -987,6 +998,24 @@ impl Display for BatchStopReason {
|
||||
"stopped before task with id {id} because it is a document operation which cannot be batched with settings changes"
|
||||
)
|
||||
}
|
||||
BatchStopReason::NetworkTask { id } => {
|
||||
write!(
|
||||
f,
|
||||
"stopped after task with id {id} because it is a network topology change task"
|
||||
)
|
||||
}
|
||||
BatchStopReason::NetworkTaskOlderTasks { id, inner_reason } => {
|
||||
write!(
|
||||
f,
|
||||
"stopped after batching network task with id {id} and a batch of older tasks: {inner_reason}"
|
||||
)
|
||||
}
|
||||
BatchStopReason::NetworkTaskImportTasks { id, inner_reason } => {
|
||||
write!(
|
||||
f,
|
||||
"stopped after batching network task with id {id} and a batch of import tasks: {inner_reason}"
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,7 +9,7 @@ use utoipa::ToSchema;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::error::ResponseError;
|
||||
use crate::network::{Network, Remote};
|
||||
use crate::network::Network;
|
||||
use crate::tasks::{Details, TaskId};
|
||||
|
||||
#[cfg(not(feature = "enterprise"))]
|
||||
@@ -165,37 +165,25 @@ impl From<Result<TaskId, ResponseError>> for RemoteTask {
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct NetworkTopologyChange {
|
||||
state: NetworkTopologyState,
|
||||
// in name, `None` if the node is no longer part of the network
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
in_name: Option<String>,
|
||||
// out name, `None` if the node is new to the network
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
out_name: Option<String>,
|
||||
out_remotes: BTreeMap<String, Remote>,
|
||||
in_remotes: BTreeMap<String, InRemote>,
|
||||
old_network: Network,
|
||||
new_network: Network,
|
||||
stats: NetworkTopologyStats,
|
||||
}
|
||||
|
||||
impl NetworkTopologyChange {
|
||||
pub fn new(old_network: Network, new_network: Network) -> Self {
|
||||
// we use our new name as import name
|
||||
let in_name = new_network.local;
|
||||
// we use our old name as export name
|
||||
let out_name = old_network.local.or_else(|| in_name.clone());
|
||||
let in_name = new_network.local.as_deref();
|
||||
let out_name = old_network.local.as_deref().or(in_name);
|
||||
|
||||
// we export to the new network
|
||||
let mut out_remotes = new_network.remotes;
|
||||
// don't export to ourselves
|
||||
if let Some(in_name) = &in_name {
|
||||
out_remotes.remove(in_name);
|
||||
}
|
||||
let in_remotes = if in_name.is_some() {
|
||||
old_network
|
||||
.remotes
|
||||
.into_keys()
|
||||
.chain(out_remotes.keys().cloned())
|
||||
.keys()
|
||||
.chain(new_network.remotes.keys())
|
||||
// don't await imports from ourselves
|
||||
.filter(|name| Some(name.as_str()) != out_name.as_deref())
|
||||
.filter(|name| Some(name.as_str()) != out_name)
|
||||
.cloned()
|
||||
.map(|name| (name, InRemote::new()))
|
||||
.collect()
|
||||
} else {
|
||||
@@ -203,27 +191,25 @@ impl NetworkTopologyChange {
|
||||
};
|
||||
Self {
|
||||
state: NetworkTopologyState::WaitingForOlderTasks,
|
||||
in_name,
|
||||
out_name,
|
||||
out_remotes,
|
||||
in_remotes,
|
||||
stats: NetworkTopologyStats { moved_documents: 0 },
|
||||
new_network,
|
||||
old_network,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn in_name(&self) -> Option<&str> {
|
||||
self.new_network.local.as_deref()
|
||||
}
|
||||
|
||||
pub fn out_name(&self) -> Option<&str> {
|
||||
self.old_network.local.as_deref().or_else(|| self.in_name())
|
||||
}
|
||||
|
||||
pub fn state(&self) -> NetworkTopologyState {
|
||||
self.state
|
||||
}
|
||||
|
||||
pub fn out_name(&self) -> Option<&str> {
|
||||
// unwrap: one of out name or in_name must be defined
|
||||
self.out_name.as_deref()
|
||||
}
|
||||
|
||||
pub fn in_name(&self) -> Option<&str> {
|
||||
self.in_name.as_deref()
|
||||
}
|
||||
|
||||
pub fn to_details(&self) -> Details {
|
||||
let message = match self.state {
|
||||
NetworkTopologyState::WaitingForOlderTasks => {
|
||||
@@ -326,7 +312,7 @@ impl NetworkTopologyChange {
|
||||
|
||||
pub fn merge(&mut self, other: NetworkTopologyChange) {
|
||||
// The topology change has a guarantee of forward progress, so for each field we're going to keep the "most advanced" values.
|
||||
let Self { state, in_name: _, out_name: _, out_remotes: _, in_remotes, stats } = self;
|
||||
let Self { state, new_network: _, old_network: _, in_remotes, stats } = self;
|
||||
|
||||
*state = Ord::max(*state, other.state);
|
||||
*stats = Ord::max(*stats, other.stats);
|
||||
@@ -360,6 +346,15 @@ impl NetworkTopologyChange {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn network_for_state(&self) -> &Network {
|
||||
match self.state {
|
||||
NetworkTopologyState::WaitingForOlderTasks => &self.old_network,
|
||||
NetworkTopologyState::ExportingDocuments
|
||||
| NetworkTopologyState::ImportingDocuments
|
||||
| NetworkTopologyState::Finished => &self.new_network,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn merge_import_index_state(left: ImportIndexState, right: ImportIndexState) -> ImportIndexState {
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use milli::update::new::indexer::current_edition::sharding::Shards;
|
||||
use milli::DocumentId;
|
||||
use roaring::RoaringBitmap;
|
||||
|
||||
@@ -16,17 +17,30 @@ use crate::tasks::network::{
|
||||
};
|
||||
|
||||
impl NetworkTopologyChange {
|
||||
pub fn export_to_process(&self) -> Option<(&BTreeMap<String, Remote>, &str)> {
|
||||
pub fn export_to_process(
|
||||
&self,
|
||||
) -> Option<(impl Iterator<Item = (&str, &Remote)> + Clone, &str)> {
|
||||
if self.state != NetworkTopologyState::ExportingDocuments {
|
||||
return None;
|
||||
}
|
||||
|
||||
if self.out_remotes.is_empty() {
|
||||
if self.new_network.remotes.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let out_name = self.out_name()?;
|
||||
Some((&self.out_remotes, out_name))
|
||||
Some((
|
||||
self.new_network.remotes.iter().filter_map(|(name, remote)| {
|
||||
// don't export to ourselves
|
||||
|
||||
(Some(name.as_str()) != self.in_name()).then_some((name.as_str(), remote))
|
||||
}),
|
||||
out_name,
|
||||
))
|
||||
}
|
||||
|
||||
pub fn new_shards(&self) -> Option<Shards> {
|
||||
self.new_network.shards()
|
||||
}
|
||||
|
||||
pub fn set_moved(&mut self, moved_documents: u64) {
|
||||
|
||||
@@ -184,7 +184,7 @@
|
||||
}
|
||||
},
|
||||
"self": "prod1",
|
||||
"version": "[uuid]"
|
||||
"version": "00000000-0000-0000-0000-000000000000"
|
||||
},
|
||||
"synchronous": "WaitForResponse"
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user