Update tick

This commit is contained in:
Louis Dureuil
2025-12-08 11:13:24 +01:00
parent 961a960fff
commit 7ff517bf3a
2 changed files with 126 additions and 82 deletions

View File

@@ -4,9 +4,11 @@ use std::collections::{BTreeSet, HashSet};
use std::ops::Bound;
use std::sync::Arc;
use convert_case::{Case, Casing as _};
use meilisearch_types::batches::{Batch, BatchEnqueuedAt, BatchId, BatchStats};
use meilisearch_types::heed::{Database, RoTxn, RwTxn};
use meilisearch_types::milli::CboRoaringBitmapCodec;
use meilisearch_types::milli::progress::Progress;
use meilisearch_types::milli::{CboRoaringBitmapCodec, ChannelCongestion};
use meilisearch_types::task_view::DetailsView;
use meilisearch_types::tasks::{
BatchStopReason, Details, IndexSwap, Kind, KindWithContent, Status,
@@ -119,17 +121,8 @@ impl ProcessingBatch {
self.stats.total_nb_tasks = 0;
}
/// Update the timestamp of the tasks and the inner structure of this structure.
pub fn update(&mut self, task: &mut Task) {
// We must re-set this value in case we're dealing with a task that has been added between
// the `processing` and `finished` state
// We must re-set this value in case we're dealing with a task that has been added between
// the `processing` and `finished` state or that failed.
task.batch_uid = Some(self.uid);
// Same
task.started_at = Some(self.started_at);
task.finished_at = self.finished_at;
/// Update batch task from a processed task
pub fn update_from_task(&mut self, task: &Task) {
self.statuses.insert(task.status);
// Craft an aggregation of the details of all the tasks encountered in this batch.
@@ -144,6 +137,63 @@ impl ProcessingBatch {
}
}
/// Update the timestamp of the tasks after they're done
pub fn finish_task(&self, task: &mut Task) {
// We must re-set this value in case we're dealing with a task that has been added between
// the `processing` and `finished` state or that failed.
task.batch_uid = Some(self.uid);
// Same
task.started_at = Some(self.started_at);
task.finished_at = self.finished_at;
}
pub fn write_stats(
&mut self,
progress: &Progress,
congestion: Option<ChannelCongestion>,
pre_commit_dabases_sizes: indexmap::IndexMap<&'static str, usize>,
post_commit_dabases_sizes: indexmap::IndexMap<&'static str, usize>,
) {
self.stats.progress_trace =
progress.accumulated_durations().into_iter().map(|(k, v)| (k, v.into())).collect();
self.stats.write_channel_congestion = congestion.map(|congestion| {
let mut congestion_info = serde_json::Map::new();
congestion_info.insert("attempts".into(), congestion.attempts.into());
congestion_info.insert("blocking_attempts".into(), congestion.blocking_attempts.into());
congestion_info.insert("blocking_ratio".into(), congestion.congestion_ratio().into());
congestion_info
});
self.stats.internal_database_sizes = pre_commit_dabases_sizes
.iter()
.flat_map(|(dbname, pre_size)| {
post_commit_dabases_sizes
.get(dbname)
.map(|post_size| {
use std::cmp::Ordering::{Equal, Greater, Less};
use byte_unit::Byte;
use byte_unit::UnitType::Binary;
let post = Byte::from_u64(*post_size as u64).get_appropriate_unit(Binary);
let diff_size = post_size.abs_diff(*pre_size) as u64;
let diff = Byte::from_u64(diff_size).get_appropriate_unit(Binary);
let sign = match post_size.cmp(pre_size) {
Equal => return None,
Greater => "+",
Less => "-",
};
Some((
dbname.to_case(Case::Camel),
format!("{post:#.2} ({sign}{diff:#.2})").into(),
))
})
.into_iter()
.flatten()
})
.collect();
}
pub fn to_batch(&self) -> Batch {
Batch {
uid: self.uid,