This commit is contained in:
Louis Dureuil
2025-11-26 16:19:40 +01:00
parent c7986ca8c9
commit 60716f13e1
11 changed files with 283 additions and 206 deletions

View File

@@ -467,10 +467,9 @@ impl From<Details> for DetailsView {
..Default::default()
}
}
Details::NetworkTopologyChange { moved_documents, received_documents, message } => {
Details::NetworkTopologyChange { moved_documents, message } => {
DetailsView {
moved_documents: Some(moved_documents),
received_documents: Some(received_documents),
message: Some(message),
..Default::default()
}

View File

@@ -109,7 +109,7 @@ pub struct ImportData {
/// Remote that this task is imported from
pub remote_name: String,
/// Index relevant to this task
pub index_name: String,
pub index_name: Option<String>,
/// Number of documents in this task
pub document_count: u64,
}
@@ -122,7 +122,7 @@ pub struct ImportMetadata {
/// Key unique to this (network_change, index, host, key).
///
/// In practice, an internal document id of one of the documents to import.
pub task_key: DocumentId,
pub task_key: Option<DocumentId>,
/// Total number of documents to import for this index from this host.
pub total_index_documents: u64,
}
@@ -198,7 +198,7 @@ impl NetworkTopologyChange {
out_name,
out_remotes,
in_remotes,
stats: NetworkTopologyStats { received_documents: 0, moved_documents: 0 },
stats: NetworkTopologyStats { moved_documents: 0 },
}
}
@@ -228,6 +228,10 @@ impl NetworkTopologyChange {
Some((&self.out_remotes, out_name))
}
pub fn set_moved(&mut self, moved_documents: u64) {
self.stats.moved_documents = moved_documents;
}
/// Compute the next state from the current state of the task.
pub fn update_state(&mut self) {
self.state = match self.state {
@@ -257,8 +261,8 @@ impl NetworkTopologyChange {
pub fn receive_remote_task(
&mut self,
remote_name: &str,
index_name: &str,
task_key: DocumentId,
index_name: Option<&str>,
task_key: Option<DocumentId>,
document_count: u64,
total_indexes: u64,
total_index_documents: u64,
@@ -273,59 +277,73 @@ impl NetworkTopologyChange {
ImportState::Finished { total_indexes, total_documents: 0 }
} else {
let mut task_keys = BTreeSet::new();
task_keys.insert(task_key);
let mut import_index_state = BTreeMap::new();
import_index_state.insert(
index_name.to_owned(),
ImportIndexState::Ongoing {
total_documents: total_index_documents,
received_documents: document_count,
task_keys,
processed_documents: 0,
},
);
ImportState::Ongoing { import_index_state, total_indexes }
if let Some(index_name) = index_name {
if let Some(task_key) = task_key {
task_keys.insert(task_key);
}
let mut import_index_state = BTreeMap::new();
import_index_state.insert(
index_name.to_owned(),
ImportIndexState::Ongoing {
total_documents: total_index_documents,
received_documents: document_count,
task_keys,
processed_documents: 0,
},
);
ImportState::Ongoing { import_index_state, total_indexes }
} else {
ImportState::WaitingForInitialTask
}
}
}
ImportState::Ongoing { mut import_index_state, total_indexes } => {
if let Some((index_name, mut index_state)) =
import_index_state.remove_entry(index_name)
{
index_state = match index_state {
ImportIndexState::Ongoing {
total_documents,
received_documents: previously_received,
processed_documents,
mut task_keys,
} => {
if !task_keys.insert(task_key) {
return Err(ReceiveTaskError::DuplicateTask(task_key));
}
if let Some(index_name) = index_name {
if let Some((index_name, mut index_state)) =
import_index_state.remove_entry(index_name)
{
index_state = match index_state {
ImportIndexState::Ongoing {
total_documents,
received_documents: previously_received + document_count,
received_documents: previously_received,
processed_documents,
task_keys,
mut task_keys,
} => {
if let Some(task_key) = task_key {
if !task_keys.insert(task_key) {
return Err(ReceiveTaskError::DuplicateTask(task_key));
}
}
ImportIndexState::Ongoing {
total_documents,
received_documents: previously_received + document_count,
processed_documents,
task_keys,
}
}
ImportIndexState::Finished { total_documents } => {
ImportIndexState::Finished { total_documents }
}
};
import_index_state.insert(index_name, index_state);
} else {
let mut task_keys = BTreeSet::new();
if let Some(task_key) = task_key {
task_keys.insert(task_key);
}
ImportIndexState::Finished { total_documents } => {
ImportIndexState::Finished { total_documents }
}
};
import_index_state.insert(index_name, index_state);
let state = ImportIndexState::Ongoing {
total_documents: total_index_documents,
received_documents: document_count,
processed_documents: 0,
task_keys,
};
import_index_state.insert(index_name.to_string(), state);
}
ImportState::Ongoing { import_index_state, total_indexes: total_indexes }
} else {
let mut task_keys = BTreeSet::new();
task_keys.insert(task_key);
let state = ImportIndexState::Ongoing {
total_documents: total_index_documents,
received_documents: document_count,
processed_documents: 0,
task_keys,
};
import_index_state.insert(index_name.to_string(), state);
ImportState::Ongoing { import_index_state, total_indexes }
}
ImportState::Ongoing { import_index_state, total_indexes: total_indexes }
}
ImportState::Finished { total_indexes, total_documents } => {
ImportState::Finished { total_indexes, total_documents }
@@ -484,11 +502,7 @@ impl NetworkTopologyChange {
}
NetworkTopologyState::Finished => "Finished".into(),
};
Details::NetworkTopologyChange {
moved_documents: self.stats.moved_documents,
received_documents: self.stats.received_documents,
message,
}
Details::NetworkTopologyChange { moved_documents: self.stats.moved_documents, message }
}
pub fn is_import_finished(&self) -> bool {
@@ -585,7 +599,7 @@ pub enum NetworkTopologyState {
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize, Eq, PartialOrd, Ord)]
#[serde(rename_all = "camelCase")]
pub struct NetworkTopologyStats {
pub received_documents: u64,
#[serde(default)]
pub moved_documents: u64,
}

View File

@@ -349,7 +349,6 @@ impl KindWithContent {
}),
KindWithContent::NetworkTopologyChange { .. } => Some(Details::NetworkTopologyChange {
moved_documents: 0,
received_documents: 0,
message: "processing tasks for previous network versions".into(),
}),
}
@@ -798,7 +797,6 @@ pub enum Details {
},
NetworkTopologyChange {
moved_documents: u64,
received_documents: u64,
message: String,
},
}
@@ -843,7 +841,7 @@ impl Details {
| Self::Export { .. }
| Self::UpgradeDatabase { .. }
| Self::IndexSwap { .. } => (),
Self::NetworkTopologyChange { moved_documents: _, received_documents: _, message } => {
Self::NetworkTopologyChange { moved_documents: _, message } => {
*message = format!("Failed. Previous status: {}", message);
}
}