mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-12-23 21:06:58 +00:00
Fix update task to support updating multiple times the same task
This commit is contained in:
@@ -3,7 +3,8 @@ use std::ops::{Bound, RangeBounds};
|
||||
use meilisearch_types::heed::types::{DecodeIgnore, SerdeBincode, SerdeJson, Str};
|
||||
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn, WithoutTls};
|
||||
use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32};
|
||||
use meilisearch_types::tasks::{Kind, Status, Task};
|
||||
use meilisearch_types::tasks::network::DbTaskNetwork;
|
||||
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
|
||||
use roaring::{MultiOps, RoaringBitmap};
|
||||
use time::OffsetDateTime;
|
||||
|
||||
@@ -114,14 +115,15 @@ impl TaskQueue {
|
||||
/// - CorruptedTaskQueue: The task doesn't exist in the database
|
||||
pub(crate) fn update_task(&self, wtxn: &mut RwTxn, task: &mut Task) -> Result<()> {
|
||||
let old_task = self.get_task(wtxn, task.uid)?.ok_or(Error::CorruptedTaskQueue)?;
|
||||
let reprocessing = old_task.status != Status::Enqueued;
|
||||
// network topology tasks may be processed multiple times.
|
||||
let maybe_reprocessing = old_task.status != Status::Enqueued
|
||||
|| task.kind.as_kind() == Kind::NetworkTopologyChange;
|
||||
|
||||
debug_assert!(old_task != *task);
|
||||
debug_assert_eq!(old_task.uid, task.uid);
|
||||
|
||||
// If we're processing a task that failed it may already contains a batch_uid
|
||||
debug_assert!(
|
||||
reprocessing || (old_task.batch_uid.is_none() && task.batch_uid.is_some()),
|
||||
maybe_reprocessing || (old_task.batch_uid.is_none() && task.batch_uid.is_some()),
|
||||
"\n==> old: {old_task:?}\n==> new: {task:?}"
|
||||
);
|
||||
|
||||
@@ -143,13 +145,24 @@ impl TaskQueue {
|
||||
})?;
|
||||
}
|
||||
|
||||
// Avoids rewriting part of the network topology change because of TOCTOU errors
|
||||
if let (
|
||||
KindWithContent::NetworkTopologyChange(old_state),
|
||||
KindWithContent::NetworkTopologyChange(new_state),
|
||||
) = (old_task.kind, &mut task.kind)
|
||||
{
|
||||
new_state.merge(old_state);
|
||||
// the state possibly just changed, rewrite the details
|
||||
task.details = Some(new_state.to_details());
|
||||
}
|
||||
|
||||
assert_eq!(
|
||||
old_task.enqueued_at, task.enqueued_at,
|
||||
"Cannot update a task's enqueued_at time"
|
||||
);
|
||||
if old_task.started_at != task.started_at {
|
||||
assert!(
|
||||
reprocessing || old_task.started_at.is_none(),
|
||||
maybe_reprocessing || old_task.started_at.is_none(),
|
||||
"Cannot update a task's started_at time"
|
||||
);
|
||||
if let Some(started_at) = old_task.started_at {
|
||||
@@ -161,7 +174,7 @@ impl TaskQueue {
|
||||
}
|
||||
if old_task.finished_at != task.finished_at {
|
||||
assert!(
|
||||
reprocessing || old_task.finished_at.is_none(),
|
||||
maybe_reprocessing || old_task.finished_at.is_none(),
|
||||
"Cannot update a task's finished_at time"
|
||||
);
|
||||
if let Some(finished_at) = old_task.finished_at {
|
||||
@@ -175,7 +188,16 @@ impl TaskQueue {
|
||||
task.network = match (old_task.network, task.network.take()) {
|
||||
(None, None) => None,
|
||||
(None, Some(network)) | (Some(network), None) => Some(network),
|
||||
(Some(_), Some(network)) => Some(network),
|
||||
(Some(left), Some(right)) => Some(match (left, right) {
|
||||
(
|
||||
DbTaskNetwork::Remotes { remote_tasks: mut left, network_version: _ },
|
||||
DbTaskNetwork::Remotes { remote_tasks: mut right, network_version },
|
||||
) => {
|
||||
left.append(&mut right);
|
||||
DbTaskNetwork::Remotes { remote_tasks: left, network_version }
|
||||
}
|
||||
(_, right) => right,
|
||||
}),
|
||||
};
|
||||
|
||||
self.all_tasks.put(wtxn, &task.uid, task)?;
|
||||
|
||||
Reference in New Issue
Block a user