add some stats on the batches

This commit is contained in:
Tamo
2024-11-19 18:19:41 +01:00
parent 229fa0f902
commit 4abcd9c04e
110 changed files with 688 additions and 323 deletions

View File

@ -3,7 +3,7 @@
use std::collections::{BTreeSet, HashSet};
use std::ops::Bound;
use meilisearch_types::batches::{Batch, BatchId};
use meilisearch_types::batches::{Batch, BatchId, BatchStats};
use meilisearch_types::heed::types::DecodeIgnore;
use meilisearch_types::heed::{Database, RoTxn, RwTxn};
use meilisearch_types::milli::CboRoaringBitmapCodec;
@ -16,10 +16,16 @@ use crate::{Error, IndexScheduler, Result, Task, TaskId, BEI128};
/// This structure contains all the information required to write a batch in the database without reading the tasks.
/// It'll stay in RAM so it must be small.
/// The usage is the following:
/// 1. Create the structure with its batch id.
/// 2. Call `processing` on all the task that we know are currently processing in the batch (it can change in the future)
/// 3. Call `finished` once the batch has been processed.
/// 4. Call `update` on all the tasks.
#[derive(Debug, Clone)]
pub(crate) struct ProcessingBatch {
pub uid: BatchId,
pub details: DetailsView,
pub stats: BatchStats,
pub statuses: HashSet<Status>,
pub kinds: HashSet<Kind>,
@ -28,6 +34,7 @@ pub(crate) struct ProcessingBatch {
pub oldest_enqueued_at: Option<OffsetDateTime>,
pub earliest_enqueued_at: Option<OffsetDateTime>,
pub started_at: OffsetDateTime,
pub finished_at: Option<OffsetDateTime>,
}
impl ProcessingBatch {
@ -39,6 +46,7 @@ impl ProcessingBatch {
Self {
uid,
details: DetailsView::default(),
stats: BatchStats::default(),
statuses,
kinds: HashSet::default(),
@ -47,21 +55,10 @@ impl ProcessingBatch {
oldest_enqueued_at: None,
earliest_enqueued_at: None,
started_at: OffsetDateTime::now_utc(),
finished_at: None,
}
}
/// Remove the Processing status and update the real statuses of the tasks.
pub fn update(&mut self, task: &Task) {
// Craft an aggregation of the details of all the tasks encountered in this batch.
if task.status != Status::Failed {
if let Some(ref details) = task.details {
self.details.accumulate(&DetailsView::from(details.clone()));
}
}
self.statuses.clear();
self.statuses.insert(task.status);
}
/// Update itself with the content of the task and update the batch id in the task.
pub fn processing<'a>(&mut self, tasks: impl IntoIterator<Item = &'a mut Task>) {
for task in tasks.into_iter() {
@ -82,6 +79,45 @@ impl ProcessingBatch {
}));
}
}
/// Must be called once the batch has finished processing.
pub fn finished(&mut self) {
self.finished_at = Some(OffsetDateTime::now_utc());
// Initially we inserted ourselves as a processing batch, that's not the case anymore.
self.statuses.clear();
// We're going to recount the number of tasks AFTER processing the batch because
// tasks may add themselves to a batch while its processing.
self.stats.total_nb_tasks = 0;
}
/// Update the timestamp of the tasks and the inner structure of this sturcture.
pub fn update(&mut self, task: &mut Task) {
// We must re-set this value in case we're dealing with a task that has been added between
// the `processing` and `finished` state
// We must re-set this value in case we're dealing with a task that has been added between
// the `processing` and `finished` state or that failed.
task.batch_uid = Some(self.uid);
// Same
task.started_at = Some(self.started_at);
task.finished_at = self.finished_at;
self.statuses.insert(task.status);
// Craft an aggregation of the details of all the tasks encountered in this batch.
if task.status != Status::Failed {
if let Some(ref details) = task.details {
self.details.accumulate(&DetailsView::from(details.clone()));
}
}
self.stats.total_nb_tasks += 1;
*self.stats.status.entry(task.status).or_default() += 1;
*self.stats.types.entry(task.kind.as_kind()).or_default() += 1;
if let Some(index_uid) = task.index_uid() {
*self.stats.index_uids.entry(index_uid.to_string()).or_default() += 1;
}
}
}
impl IndexScheduler {
@ -123,7 +159,6 @@ impl IndexScheduler {
wtxn: &mut RwTxn,
batch: ProcessingBatch,
tasks: &RoaringBitmap,
finished_at: OffsetDateTime,
) -> Result<()> {
self.all_batches.put(
wtxn,
@ -131,8 +166,9 @@ impl IndexScheduler {
&Batch {
uid: batch.uid,
details: batch.details,
stats: batch.stats,
started_at: batch.started_at,
finished_at: Some(finished_at),
finished_at: batch.finished_at,
},
)?;
self.batch_to_tasks_mapping.put(wtxn, &batch.uid, tasks)?;
@ -162,7 +198,7 @@ impl IndexScheduler {
insert_task_datetime(wtxn, self.batch_enqueued_at, enqueued_at, batch.uid)?;
}
insert_task_datetime(wtxn, self.batch_started_at, batch.started_at, batch.uid)?;
insert_task_datetime(wtxn, self.batch_finished_at, finished_at, batch.uid)?;
insert_task_datetime(wtxn, self.batch_finished_at, batch.finished_at.unwrap(), batch.uid)?;
Ok(())
}