This commit is contained in:
Louis Dureuil
2025-04-02 15:29:47 +02:00
parent 19f4c1ac98
commit 31bda976f2
5 changed files with 288 additions and 68 deletions

View File

@@ -5,9 +5,10 @@ tasks affecting a single index into a [batch](crate::batch::Batch).
The main function of the autobatcher is [`next_autobatch`]. The main function of the autobatcher is [`next_autobatch`].
*/ */
use meilisearch_types::tasks::TaskId;
use std::ops::ControlFlow::{self, Break, Continue}; use std::ops::ControlFlow::{self, Break, Continue};
use meilisearch_types::tasks::{BatchStopReason, TaskId};
use crate::KindWithContent; use crate::KindWithContent;
/// Succinctly describes a task's [`Kind`](meilisearch_types::tasks::Kind) /// Succinctly describes a task's [`Kind`](meilisearch_types::tasks::Kind)
@@ -147,14 +148,38 @@ impl BatchKind {
task_id: TaskId, task_id: TaskId,
kind: KindWithContent, kind: KindWithContent,
primary_key: Option<&str>, primary_key: Option<&str>,
) -> (ControlFlow<BatchKind, BatchKind>, bool) { ) -> (ControlFlow<(BatchKind, BatchStopReason), BatchKind>, bool) {
use AutobatchKind as K; use AutobatchKind as K;
match AutobatchKind::from(kind) { match AutobatchKind::from(kind) {
K::IndexCreation => (Break(BatchKind::IndexCreation { id: task_id }), true), K::IndexCreation => (
K::IndexDeletion => (Break(BatchKind::IndexDeletion { ids: vec![task_id] }), false), Break((
K::IndexUpdate => (Break(BatchKind::IndexUpdate { id: task_id }), false), BatchKind::IndexCreation { id: task_id },
K::IndexSwap => (Break(BatchKind::IndexSwap { id: task_id }), false), BatchStopReason::TaskCannotBeBatched { kind, id: task_id },
)),
true,
),
K::IndexDeletion => (
Break((
BatchKind::IndexDeletion { ids: vec![task_id] },
BatchStopReason::TaskCannotBeBatched { kind, id: task_id },
)),
false,
),
K::IndexUpdate => (
Break((
BatchKind::IndexUpdate { id: task_id },
BatchStopReason::TaskCannotBeBatched { kind, id: task_id },
)),
false,
),
K::IndexSwap => (
Break((
BatchKind::IndexSwap { id: task_id },
BatchStopReason::TaskCannotBeBatched { kind, id: task_id },
)),
false,
),
K::DocumentClear => (Continue(BatchKind::DocumentClear { ids: vec![task_id] }), false), K::DocumentClear => (Continue(BatchKind::DocumentClear { ids: vec![task_id] }), false),
K::DocumentImport { allow_index_creation, primary_key: pk } K::DocumentImport { allow_index_creation, primary_key: pk }
if primary_key.is_none() || pk.is_none() || primary_key == pk.as_deref() => if primary_key.is_none() || pk.is_none() || primary_key == pk.as_deref() =>
@@ -169,15 +194,28 @@ impl BatchKind {
) )
} }
// if the primary key set in the task was different than ours we should stop and make this batch fail asap. // if the primary key set in the task was different than ours we should stop and make this batch fail asap.
K::DocumentImport { allow_index_creation, primary_key } => ( K::DocumentImport { allow_index_creation, primary_key: pk } => (
Break(BatchKind::DocumentOperation { Break((
BatchKind::DocumentOperation {
allow_index_creation, allow_index_creation,
primary_key, primary_key: pk,
operation_ids: vec![task_id], operation_ids: vec![task_id],
}), },
BatchStopReason::PrimaryKeyIndexMismatch {
id: task_id,
in_index: primary_key.unwrap().to_owned(),
in_task: pk.unwrap(),
},
)),
allow_index_creation, allow_index_creation,
), ),
K::DocumentEdition => (Break(BatchKind::DocumentEdition { id: task_id }), false), K::DocumentEdition => (
Break((
BatchKind::DocumentEdition { id: task_id },
BatchStopReason::TaskCannotBeBatched { kind, id: task_id },
)),
false,
),
K::DocumentDeletion { by_filter: includes_by_filter } => ( K::DocumentDeletion { by_filter: includes_by_filter } => (
Continue(BatchKind::DocumentDeletion { Continue(BatchKind::DocumentDeletion {
deletion_ids: vec![task_id], deletion_ids: vec![task_id],
@@ -197,43 +235,40 @@ impl BatchKind {
/// To ease the writing of the code. `true` can be returned when you don't need to create an index /// To ease the writing of the code. `true` can be returned when you don't need to create an index
/// but false can't be returned if you needs to create an index. /// but false can't be returned if you needs to create an index.
#[rustfmt::skip] #[rustfmt::skip]
fn accumulate(self, id: TaskId, kind: AutobatchKind, index_already_exists: bool, primary_key: Option<&str>) -> ControlFlow<BatchKind, BatchKind> { fn accumulate(self, id: TaskId, kind: AutobatchKind, index_already_exists: bool, primary_key: Option<&str>) -> ControlFlow<(BatchKind, BatchStopReason), BatchKind> {
use AutobatchKind as K; use AutobatchKind as K;
let pk: Option<String> = match (self.primary_key(), kind.primary_key(), primary_key) {
// 1. If both task don't interact with primary key -> we can continue
(batch_pk, None | Some(None), _) => {
batch_pk.flatten().to_owned()
},
// 2.1 If we already have a primary-key ->
// 2.1.1 If the task we're trying to accumulate have a pk it must be equal to our primary key
(batch_pk, Some(Some(task_pk)), Some(index_pk)) => if task_pk == index_pk {
Some(task_pk.to_owned())
} else {
return Break((this, BatchStopReason::PrimaryKeyMismatch { id, batch_pk: todo!(), task_pk: todo!() }))
},
// 2.2 If we don't have a primary-key ->
// 2.2.2 If the batch is set to Some(None), the task should be too
(Some(None), Some(None), None) => None,
(Some(None), Some(Some(_)), None) => return Break((this, BatchStopReason::PrimaryKeyMismatch { id, batch_pk: todo!(), task_pk: todo!() })),
(Some(Some(batch_pk)), Some(None), None) => Some(batch_pk.to_owned()),
(Some(Some(batch_pk)), Some(Some(task_pk)), None) => if task_pk == batch_pk {
Some(task_pk.to_owned())
} else {
return Break((this, BatchStopReason::PrimaryKeyMismatch { id, batch_pk: todo!(), task_pk: todo!() }))
},
(None, Some(Some(task_pk)), None) => Some(task_pk.to_owned())
};
match (self, kind) { match (self, kind) {
// We don't batch any of these operations // We don't batch any of these operations
(this, K::IndexCreation | K::IndexUpdate | K::IndexSwap | K::DocumentEdition) => Break(this), (this, K::IndexCreation | K::IndexUpdate | K::IndexSwap | K::DocumentEdition) => Break((this, BatchStopReason::TaskCannotBeBatched { kind, id })),
// We must not batch tasks that don't have the same index creation rights if the index doesn't already exists. // We must not batch tasks that don't have the same index creation rights if the index doesn't already exists.
(this, kind) if !index_already_exists && this.allow_index_creation() == Some(false) && kind.allow_index_creation() == Some(true) => { (this, kind) if !index_already_exists && this.allow_index_creation() == Some(false) && kind.allow_index_creation() == Some(true) => {
Break(this) Break((this, BatchStopReason::IndexCreationMismatch { id }))
},
// NOTE: We need to negate the whole condition since we're checking if we need to break instead of continue.
// I wrote it this way because it's easier to understand than the other way around.
(this, kind) if !(
// 1. If both task don't interact with primary key -> we can continue
(this.primary_key().is_none() && kind.primary_key().is_none()) ||
// 2. Else ->
(
// 2.1 If we already have a primary-key ->
(
primary_key.is_some() &&
// 2.1.1 If the task we're trying to accumulate have a pk it must be equal to our primary key
// 2.1.2 If the task don't have a primary-key -> we can continue
kind.primary_key().is_none_or(|pk| pk == primary_key)
) ||
// 2.2 If we don't have a primary-key ->
(
// 2.2.1 If both the batch and the task have a primary key they should be equal
// 2.2.2 If the batch is set to Some(None), the task should be too
// 2.2.3 If the batch is set to None -> we can continue
this.primary_key().zip(kind.primary_key()).map_or(true, |(this, kind)| this == kind)
)
)
) // closing the negation
=> {
Break(this)
}, },
// The index deletion can batch with everything but must stop after // The index deletion can batch with everything but must stop after
( (
@@ -244,7 +279,7 @@ impl BatchKind {
K::IndexDeletion, K::IndexDeletion,
) => { ) => {
ids.push(id); ids.push(id);
Break(BatchKind::IndexDeletion { ids }) Break((BatchKind::IndexDeletion { ids }, BatchStopReason::IndexDeletion { id }))
} }
( (
BatchKind::ClearAndSettings { settings_ids: mut ids, allow_index_creation: _, mut other }, BatchKind::ClearAndSettings { settings_ids: mut ids, allow_index_creation: _, mut other },
@@ -252,7 +287,7 @@ impl BatchKind {
) => { ) => {
ids.push(id); ids.push(id);
ids.append(&mut other); ids.append(&mut other);
Break(BatchKind::IndexDeletion { ids }) Break((BatchKind::IndexDeletion { ids }, BatchStopReason::IndexDeletion { id }))
} }
( (
@@ -265,7 +300,7 @@ impl BatchKind {
( (
this @ BatchKind::DocumentClear { .. }, this @ BatchKind::DocumentClear { .. },
K::DocumentImport { .. } | K::Settings { .. }, K::DocumentImport { .. } | K::Settings { .. },
) => Break(this), ) => Break((this, BatchStopReason::DocumentOperationWithSettings { id })),
( (
BatchKind::DocumentOperation { allow_index_creation: _, primary_key: _, mut operation_ids }, BatchKind::DocumentOperation { allow_index_creation: _, primary_key: _, mut operation_ids },
K::DocumentClear, K::DocumentClear,
@@ -277,7 +312,7 @@ impl BatchKind {
// we can autobatch different kind of document operations and mix replacements with updates // we can autobatch different kind of document operations and mix replacements with updates
( (
BatchKind::DocumentOperation { allow_index_creation, primary_key: _, mut operation_ids }, BatchKind::DocumentOperation { allow_index_creation, primary_key: _, mut operation_ids },
K::DocumentImport { primary_key: pk, .. }, K::DocumentImport { primary_key, .. },
) => { ) => {
operation_ids.push(id); operation_ids.push(id);
Continue(BatchKind::DocumentOperation { Continue(BatchKind::DocumentOperation {
@@ -287,15 +322,15 @@ impl BatchKind {
}) })
} }
( (
BatchKind::DocumentOperation { allow_index_creation, primary_key, mut operation_ids }, BatchKind::DocumentOperation { allow_index_creation, primary_key: _, mut operation_ids },
K::DocumentDeletion { by_filter: false }, K::DocumentDeletion { by_filter: false },
) => { ) => {
operation_ids.push(id); operation_ids.push(id);
Continue(BatchKind::DocumentOperation { Continue(BatchKind::DocumentOperation {
allow_index_creation, allow_index_creation,
primary_key,
operation_ids, operation_ids,
primary_key: pk,
}) })
} }
// We can't batch a document operation with a delete by filter // We can't batch a document operation with a delete by filter
@@ -303,12 +338,12 @@ impl BatchKind {
this @ BatchKind::DocumentOperation { .. }, this @ BatchKind::DocumentOperation { .. },
K::DocumentDeletion { by_filter: true }, K::DocumentDeletion { by_filter: true },
) => { ) => {
Break(this) Break((this, BatchStopReason::DocumentOperationWithDeletionByFilter { id }))
} }
( (
this @ BatchKind::DocumentOperation { .. }, this @ BatchKind::DocumentOperation { .. },
K::Settings { .. }, K::Settings { .. },
) => Break(this), ) => Break((this, BatchStopReason::DocumentOperationWithSettings { id })),
(BatchKind::DocumentDeletion { mut deletion_ids, includes_by_filter: _ }, K::DocumentClear) => { (BatchKind::DocumentDeletion { mut deletion_ids, includes_by_filter: _ }, K::DocumentClear) => {
deletion_ids.push(id); deletion_ids.push(id);
@@ -318,7 +353,7 @@ impl BatchKind {
( (
this @ BatchKind::DocumentDeletion { deletion_ids: _, includes_by_filter: true }, this @ BatchKind::DocumentDeletion { deletion_ids: _, includes_by_filter: true },
K::DocumentImport { .. } K::DocumentImport { .. }
) => Break(this), ) => Break((this, BatchStopReason::DeletionByFilterWithDocumentOperation { id })),
// we can autobatch the deletion and import if the index already exists // we can autobatch the deletion and import if the index already exists
( (
BatchKind::DocumentDeletion { mut deletion_ids, includes_by_filter: false }, BatchKind::DocumentDeletion { mut deletion_ids, includes_by_filter: false },
@@ -345,18 +380,18 @@ impl BatchKind {
operation_ids: deletion_ids, operation_ids: deletion_ids,
}) })
} }
// we can't autobatch a deletion and an import if the index does not exists but would be created by an addition // we can't autobatch a deletion and an import if the index does not exist but would be created by an addition
( (
this @ BatchKind::DocumentDeletion { .. }, this @ BatchKind::DocumentDeletion { .. },
K::DocumentImport { .. } K::DocumentImport { .. }
) => { ) => {
Break(this) Break((this, BatchStopReason::IndexCreationMismatch { id }))
} }
(BatchKind::DocumentDeletion { mut deletion_ids, includes_by_filter }, K::DocumentDeletion { by_filter }) => { (BatchKind::DocumentDeletion { mut deletion_ids, includes_by_filter }, K::DocumentDeletion { by_filter }) => {
deletion_ids.push(id); deletion_ids.push(id);
Continue(BatchKind::DocumentDeletion { deletion_ids, includes_by_filter: includes_by_filter | by_filter }) Continue(BatchKind::DocumentDeletion { deletion_ids, includes_by_filter: includes_by_filter | by_filter })
} }
(this @ BatchKind::DocumentDeletion { .. }, K::Settings { .. }) => Break(this), (this @ BatchKind::DocumentDeletion { .. }, K::Settings { .. }) => Break((this, BatchStopReason::DocumentOperationWithSettings { id })),
( (
BatchKind::Settings { settings_ids, allow_index_creation }, BatchKind::Settings { settings_ids, allow_index_creation },
@@ -369,7 +404,7 @@ impl BatchKind {
( (
this @ BatchKind::Settings { .. }, this @ BatchKind::Settings { .. },
K::DocumentImport { .. } | K::DocumentDeletion { .. }, K::DocumentImport { .. } | K::DocumentDeletion { .. },
) => Break(this), ) => Break((this, BatchStopReason::SettingsWithDocumentOperation { id })),
( (
BatchKind::Settings { mut settings_ids, allow_index_creation }, BatchKind::Settings { mut settings_ids, allow_index_creation },
K::Settings { .. }, K::Settings { .. },
@@ -448,7 +483,7 @@ pub fn autobatch(
enqueued: Vec<(TaskId, KindWithContent)>, enqueued: Vec<(TaskId, KindWithContent)>,
index_already_exists: bool, index_already_exists: bool,
primary_key: Option<&str>, primary_key: Option<&str>,
) -> Option<(BatchKind, bool)> { ) -> Option<(BatchKind, bool, Option<BatchStopReason>)> {
let mut enqueued = enqueued.into_iter(); let mut enqueued = enqueued.into_iter();
let (id, kind) = enqueued.next()?; let (id, kind) = enqueued.next()?;
@@ -457,7 +492,9 @@ pub fn autobatch(
let (mut acc, must_create_index) = match BatchKind::new(id, kind, primary_key) { let (mut acc, must_create_index) = match BatchKind::new(id, kind, primary_key) {
(Continue(acc), create) => (acc, create), (Continue(acc), create) => (acc, create),
(Break(acc), create) => return Some((acc, create)), (Break((acc, batch_stop_reason)), create) => {
return Some((acc, create, Some(batch_stop_reason)))
}
}; };
// if an index has been created in the previous step we can consider it as existing. // if an index has been created in the previous step we can consider it as existing.
@@ -466,9 +503,11 @@ pub fn autobatch(
for (id, kind) in enqueued { for (id, kind) in enqueued {
acc = match acc.accumulate(id, kind.into(), index_exist, primary_key) { acc = match acc.accumulate(id, kind.into(), index_exist, primary_key) {
Continue(acc) => acc, Continue(acc) => acc,
Break(acc) => return Some((acc, must_create_index)), Break((acc, batch_stop_reason)) => {
return Some((acc, must_create_index, Some(batch_stop_reason)))
}
}; };
} }
Some((acc, must_create_index)) Some((acc, must_create_index, None))
} }

View File

@@ -3,7 +3,7 @@ use std::fmt;
use meilisearch_types::heed::RoTxn; use meilisearch_types::heed::RoTxn;
use meilisearch_types::milli::update::IndexDocumentsMethod; use meilisearch_types::milli::update::IndexDocumentsMethod;
use meilisearch_types::settings::{Settings, Unchecked}; use meilisearch_types::settings::{Settings, Unchecked};
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; use meilisearch_types::tasks::{BatchStopReason, Kind, KindWithContent, Status, Task};
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use uuid::Uuid; use uuid::Uuid;
@@ -440,6 +440,7 @@ impl IndexScheduler {
let mut current_batch = ProcessingBatch::new(batch_id); let mut current_batch = ProcessingBatch::new(batch_id);
let enqueued = &self.queue.tasks.get_status(rtxn, Status::Enqueued)?; let enqueued = &self.queue.tasks.get_status(rtxn, Status::Enqueued)?;
let count_total_enqueued = enqueued.len();
let failed = &self.queue.tasks.get_status(rtxn, Status::Failed)?; let failed = &self.queue.tasks.get_status(rtxn, Status::Failed)?;
// 0. The priority over everything is to upgrade the instance // 0. The priority over everything is to upgrade the instance
@@ -453,6 +454,10 @@ impl IndexScheduler {
current_batch.uid = batch_uid; current_batch.uid = batch_uid;
} }
current_batch.processing(&mut tasks); current_batch.processing(&mut tasks);
current_batch.reason(BatchStopReason::TaskCannotBeBatched {
kind: Kind::UpgradeDatabase,
id: tasks.last().unwrap(),
});
return Ok(Some((Batch::UpgradeDatabase { tasks }, current_batch))); return Ok(Some((Batch::UpgradeDatabase { tasks }, current_batch)));
} }
@@ -462,6 +467,10 @@ impl IndexScheduler {
let mut task = let mut task =
self.queue.tasks.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?; self.queue.tasks.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
current_batch.processing(Some(&mut task)); current_batch.processing(Some(&mut task));
current_batch.reason(BatchStopReason::TaskCannotBeBatched {
kind: Kind::TaskCancelation,
id: tasks.last().unwrap(),
});
return Ok(Some((Batch::TaskCancelation { task }, current_batch))); return Ok(Some((Batch::TaskCancelation { task }, current_batch)));
} }
@@ -470,6 +479,10 @@ impl IndexScheduler {
if !to_delete.is_empty() { if !to_delete.is_empty() {
let mut tasks = self.queue.tasks.get_existing_tasks(rtxn, to_delete)?; let mut tasks = self.queue.tasks.get_existing_tasks(rtxn, to_delete)?;
current_batch.processing(&mut tasks); current_batch.processing(&mut tasks);
current_batch.reason(BatchStopReason::TaskCannotBeBatched {
kind: Kind::TaskDeletion,
id: tasks.last().unwrap(),
});
return Ok(Some((Batch::TaskDeletions(tasks), current_batch))); return Ok(Some((Batch::TaskDeletions(tasks), current_batch)));
} }
@@ -478,6 +491,10 @@ impl IndexScheduler {
if !to_snapshot.is_empty() { if !to_snapshot.is_empty() {
let mut tasks = self.queue.tasks.get_existing_tasks(rtxn, to_snapshot)?; let mut tasks = self.queue.tasks.get_existing_tasks(rtxn, to_snapshot)?;
current_batch.processing(&mut tasks); current_batch.processing(&mut tasks);
current_batch.reason(BatchStopReason::TaskCannotBeBatched {
kind: Kind::SnapshotCreation,
id: tasks.last().unwrap(),
});
return Ok(Some((Batch::SnapshotCreation(tasks), current_batch))); return Ok(Some((Batch::SnapshotCreation(tasks), current_batch)));
} }
@@ -487,6 +504,10 @@ impl IndexScheduler {
let mut task = let mut task =
self.queue.tasks.get_task(rtxn, to_dump)?.ok_or(Error::CorruptedTaskQueue)?; self.queue.tasks.get_task(rtxn, to_dump)?.ok_or(Error::CorruptedTaskQueue)?;
current_batch.processing(Some(&mut task)); current_batch.processing(Some(&mut task));
current_batch.reason(BatchStopReason::TaskCannotBeBatched {
kind: Kind::DumpCreation,
id: tasks.last().unwrap(),
});
return Ok(Some((Batch::Dump(task), current_batch))); return Ok(Some((Batch::Dump(task), current_batch)));
} }
@@ -504,6 +525,10 @@ impl IndexScheduler {
} else { } else {
assert!(matches!(&task.kind, KindWithContent::IndexSwap { swaps } if swaps.is_empty())); assert!(matches!(&task.kind, KindWithContent::IndexSwap { swaps } if swaps.is_empty()));
current_batch.processing(Some(&mut task)); current_batch.processing(Some(&mut task));
current_batch.reason(BatchStopReason::TaskCannotBeBatched {
kind: Kind::IndexSwap,
id: tasks.last().unwrap(),
});
return Ok(Some((Batch::IndexSwap { task }, current_batch))); return Ok(Some((Batch::IndexSwap { task }, current_batch)));
}; };
@@ -525,9 +550,14 @@ impl IndexScheduler {
1 1
}; };
let mut stop_reason = BatchStopReason::default();
let mut enqueued = Vec::new(); let mut enqueued = Vec::new();
let mut total_size: u64 = 0; let mut total_size: u64 = 0;
for task_id in index_tasks.into_iter().take(tasks_limit) { for task_id in index_tasks.into_iter() {
if enqueued.len() >= task_limit {
stop_reason = BatchStopReason::ReachedTaskLimit { task_limit };
break;
}
let task = self let task = self
.queue .queue
.tasks .tasks
@@ -539,16 +569,27 @@ impl IndexScheduler {
total_size = total_size.saturating_add(content_size); total_size = total_size.saturating_add(content_size);
} }
if total_size > self.scheduler.batched_tasks_size_limit && !enqueued.is_empty() { let size_limit = self.scheduler.batched_tasks_size_limit;
if total_size > size_limit && !enqueued.is_empty() {
stop_reason = BatchStopReason::ReachedSizeLimit { size_limit, size: total_size };
break; break;
} }
enqueued.push((task.uid, task.kind)); enqueued.push((task.uid, task.kind));
} }
if let Some((batchkind, create_index)) = stop_reason.replace_unspecified({
if enqueued.len() == count_total_enqueued as usize {
BatchStopReason::ExhaustedEnqueuedTasks
} else {
BatchStopReason::ExhaustedEnqueuedTasksForIndex { index: index_name.to_owned() }
}
});
if let Some((batchkind, create_index, autobatch_stop_reason)) =
autobatcher::autobatch(enqueued, index_already_exists, primary_key.as_deref()) autobatcher::autobatch(enqueued, index_already_exists, primary_key.as_deref())
{ {
current_batch.reason(autobatch_stop_reason.unwrap_or(stop_reason));
return Ok(self return Ok(self
.create_next_batch_index( .create_next_batch_index(
rtxn, rtxn,

View File

@@ -7,7 +7,9 @@ use meilisearch_types::batches::{Batch, BatchEnqueuedAt, BatchId, BatchStats};
use meilisearch_types::heed::{Database, RoTxn, RwTxn}; use meilisearch_types::heed::{Database, RoTxn, RwTxn};
use meilisearch_types::milli::CboRoaringBitmapCodec; use meilisearch_types::milli::CboRoaringBitmapCodec;
use meilisearch_types::task_view::DetailsView; use meilisearch_types::task_view::DetailsView;
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status}; use meilisearch_types::tasks::{
BatchStopReason, Details, IndexSwap, Kind, KindWithContent, Status,
};
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use time::OffsetDateTime; use time::OffsetDateTime;
@@ -33,6 +35,7 @@ pub struct ProcessingBatch {
pub enqueued_at: Option<BatchEnqueuedAt>, pub enqueued_at: Option<BatchEnqueuedAt>,
pub started_at: OffsetDateTime, pub started_at: OffsetDateTime,
pub finished_at: Option<OffsetDateTime>, pub finished_at: Option<OffsetDateTime>,
pub reason: BatchStopReason,
} }
impl ProcessingBatch { impl ProcessingBatch {
@@ -53,6 +56,7 @@ impl ProcessingBatch {
enqueued_at: None, enqueued_at: None,
started_at: OffsetDateTime::now_utc(), started_at: OffsetDateTime::now_utc(),
finished_at: None, finished_at: None,
reason: Default::default(),
} }
} }
@@ -93,6 +97,10 @@ impl ProcessingBatch {
} }
} }
pub fn reason(&mut self, reason: BatchStopReason) {
self.reason = reason;
}
/// Must be called once the batch has finished processing. /// Must be called once the batch has finished processing.
pub fn finished(&mut self) { pub fn finished(&mut self) {
self.details = DetailsView::default(); self.details = DetailsView::default();
@@ -141,6 +149,7 @@ impl ProcessingBatch {
started_at: self.started_at, started_at: self.started_at,
finished_at: self.finished_at, finished_at: self.finished_at,
enqueued_at: self.enqueued_at, enqueued_at: self.enqueued_at,
stop_reason: self.reason.to_string(),
} }
} }
} }

View File

@@ -6,7 +6,7 @@ use time::OffsetDateTime;
use utoipa::ToSchema; use utoipa::ToSchema;
use crate::task_view::DetailsView; use crate::task_view::DetailsView;
use crate::tasks::{Kind, Status}; use crate::tasks::{BatchStopReason, Kind, Status};
pub type BatchId = u32; pub type BatchId = u32;
@@ -28,11 +28,26 @@ pub struct Batch {
// Enqueued at is never displayed and is only required when removing a batch. // Enqueued at is never displayed and is only required when removing a batch.
// It's always some except when upgrading from a database pre v1.12 // It's always some except when upgrading from a database pre v1.12
pub enqueued_at: Option<BatchEnqueuedAt>, pub enqueued_at: Option<BatchEnqueuedAt>,
#[serde(default = "default_stop_reason")]
pub stop_reason: String,
}
fn default_stop_reason() -> String {
BatchStopReason::default().to_string()
} }
impl PartialEq for Batch { impl PartialEq for Batch {
fn eq(&self, other: &Self) -> bool { fn eq(&self, other: &Self) -> bool {
let Self { uid, progress, details, stats, started_at, finished_at, enqueued_at } = self; let Self {
uid,
progress,
details,
stats,
started_at,
finished_at,
enqueued_at,
stop_reason,
} = self;
*uid == other.uid *uid == other.uid
&& progress.is_none() == other.progress.is_none() && progress.is_none() == other.progress.is_none()
@@ -41,6 +56,7 @@ impl PartialEq for Batch {
&& started_at == &other.started_at && started_at == &other.started_at
&& finished_at == &other.finished_at && finished_at == &other.finished_at
&& enqueued_at == &other.enqueued_at && enqueued_at == &other.enqueued_at
&& stop_reason == &other.stop_reason
} }
} }

View File

@@ -675,6 +675,121 @@ impl Details {
} }
} }
#[derive(Default, Debug, Clone)]
pub enum BatchStopReason {
#[default]
Unspecified,
TaskCannotBeBatched {
kind: Kind,
id: TaskId,
},
ExhaustedEnqueuedTasks,
ExhaustedEnqueuedTasksForIndex {
index: String,
},
ReachedTaskLimit {
task_limit: usize,
},
ReachedSizeLimit {
size_limit: usize,
size: usize,
},
PrimaryKeyIndexMismatch {
id: TaskId,
in_index: String,
in_task: String,
},
IndexCreationMismatch {
id: TaskId,
},
PrimaryKeyMismatch {
id: TaskId,
batch_pk: Option<String>,
task_pk: Option<String>,
},
IndexDeletion {
id: TaskId,
},
DocumentOperationWithSettings {
id: TaskId,
},
DocumentOperationWithDeletionByFilter {
id: TaskId,
},
DeletionByFilterWithDocumentOperation {
id: TaskId,
},
SettingsWithDocumentOperation {
id: TaskId,
},
}
impl BatchStopReason {
pub fn replace_unspecified(&mut self, new: BatchStopReason) {
if let BatchStopReason::Unspecified = self {
*self = new;
}
}
}
pub enum PrimaryKeyMismatchReason {}
impl Display for BatchStopReason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BatchStopReason::Unspecified => f.write_str("unspecified"),
BatchStopReason::TaskCannotBeBatched { kind, id } => {
write!(f, "task with id {id} of type `{kind}` cannot be batched")
}
BatchStopReason::ExhaustedEnqueuedTasks => f.write_str("batched all enqueued tasks"),
BatchStopReason::ExhaustedEnqueuedTasksForIndex { index } => {
write!(f, "batched all enqueued tasks for index `{index}`")
}
BatchStopReason::ReachedTaskLimit { task_limit } => {
write!(f, "reached configured batch limit of {task_limit} tasks")
}
BatchStopReason::ReachedSizeLimit { size_limit, size } => write!(
f,
"reached configured batch size limit of {size_limit}B with a total of {size}B"
),
BatchStopReason::PrimaryKeyIndexMismatch { id, in_index, in_task } => {
write!(f, "primary key `{in_task}` in task with id {id} is different from the primary key of the index `{in_index}`")
}
BatchStopReason::IndexCreationMismatch { id } => {
write!(f, "task with id {id} has different index creation rules as in the batch")
}
BatchStopReason::PrimaryKeyMismatch { id, batch_pk, task_pk } => {}
BatchStopReason::IndexDeletion { id } => {
write!(f, "task with id {id} deletes the index")
}
BatchStopReason::DocumentOperationWithSettings { id } => {
write!(
f,
"task with id {id} is a settings change in a batch of document operations"
)
}
BatchStopReason::DocumentOperationWithDeletionByFilter { id } => {
write!(
f,
"task with id {id} is a deletion by filter in a batch of document operations"
)
}
BatchStopReason::DeletionByFilterWithDocumentOperation { id } => {
write!(
f,
"task with id {id} is a document operation in a batch of deletions by filter"
)
}
BatchStopReason::SettingsWithDocumentOperation { id } => {
write!(
f,
"task with id {id} is a document operation in a batch of settings changes"
)
}
}
}
}
/// Serialize a `time::Duration` as a best effort ISO 8601 while waiting for /// Serialize a `time::Duration` as a best effort ISO 8601 while waiting for
/// https://github.com/time-rs/time/issues/378. /// https://github.com/time-rs/time/issues/378.
/// This code is a port of the old code of time that was removed in 0.2. /// This code is a port of the old code of time that was removed in 0.2.