diff --git a/crates/index-scheduler/src/queue/tasks.rs b/crates/index-scheduler/src/queue/tasks.rs index 83c698ebe..631202500 100644 --- a/crates/index-scheduler/src/queue/tasks.rs +++ b/crates/index-scheduler/src/queue/tasks.rs @@ -3,7 +3,8 @@ use std::ops::{Bound, RangeBounds}; use meilisearch_types::heed::types::{DecodeIgnore, SerdeBincode, SerdeJson, Str}; use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn, WithoutTls}; use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32}; -use meilisearch_types::tasks::{Kind, Status, Task}; +use meilisearch_types::tasks::network::DbTaskNetwork; +use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; use roaring::{MultiOps, RoaringBitmap}; use time::OffsetDateTime; @@ -114,14 +115,15 @@ impl TaskQueue { /// - CorruptedTaskQueue: The task doesn't exist in the database pub(crate) fn update_task(&self, wtxn: &mut RwTxn, task: &mut Task) -> Result<()> { let old_task = self.get_task(wtxn, task.uid)?.ok_or(Error::CorruptedTaskQueue)?; - let reprocessing = old_task.status != Status::Enqueued; + // network topology tasks may be processed multiple times. + let maybe_reprocessing = old_task.status != Status::Enqueued + || task.kind.as_kind() == Kind::NetworkTopologyChange; - debug_assert!(old_task != *task); debug_assert_eq!(old_task.uid, task.uid); // If we're processing a task that failed it may already contains a batch_uid debug_assert!( - reprocessing || (old_task.batch_uid.is_none() && task.batch_uid.is_some()), + maybe_reprocessing || (old_task.batch_uid.is_none() && task.batch_uid.is_some()), "\n==> old: {old_task:?}\n==> new: {task:?}" ); @@ -143,13 +145,24 @@ impl TaskQueue { })?; } + // Avoids rewriting part of the network topology change because of TOCTOU errors + if let ( + KindWithContent::NetworkTopologyChange(old_state), + KindWithContent::NetworkTopologyChange(new_state), + ) = (old_task.kind, &mut task.kind) + { + new_state.merge(old_state); + // the state possibly just changed, rewrite the details + task.details = Some(new_state.to_details()); + } + assert_eq!( old_task.enqueued_at, task.enqueued_at, "Cannot update a task's enqueued_at time" ); if old_task.started_at != task.started_at { assert!( - reprocessing || old_task.started_at.is_none(), + maybe_reprocessing || old_task.started_at.is_none(), "Cannot update a task's started_at time" ); if let Some(started_at) = old_task.started_at { @@ -161,7 +174,7 @@ impl TaskQueue { } if old_task.finished_at != task.finished_at { assert!( - reprocessing || old_task.finished_at.is_none(), + maybe_reprocessing || old_task.finished_at.is_none(), "Cannot update a task's finished_at time" ); if let Some(finished_at) = old_task.finished_at { @@ -175,7 +188,16 @@ impl TaskQueue { task.network = match (old_task.network, task.network.take()) { (None, None) => None, (None, Some(network)) | (Some(network), None) => Some(network), - (Some(_), Some(network)) => Some(network), + (Some(left), Some(right)) => Some(match (left, right) { + ( + DbTaskNetwork::Remotes { remote_tasks: mut left, network_version: _ }, + DbTaskNetwork::Remotes { remote_tasks: mut right, network_version }, + ) => { + left.append(&mut right); + DbTaskNetwork::Remotes { remote_tasks: left, network_version } + } + (_, right) => right, + }), }; self.all_tasks.put(wtxn, &task.uid, task)?;