mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-09-06 12:46:31 +00:00
Make code more readable
This commit is contained in:
@ -11,7 +11,6 @@ use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status
|
||||
use meilisearch_types::versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
|
||||
use milli::update::Settings as MilliSettings;
|
||||
use roaring::RoaringBitmap;
|
||||
use time::OffsetDateTime;
|
||||
|
||||
use super::create_batch::Batch;
|
||||
use crate::processing::{
|
||||
@ -538,7 +537,6 @@ impl IndexScheduler {
|
||||
|
||||
fn remove_datetimes(
|
||||
wtxn: &mut RwTxn<'_>,
|
||||
range: RangeInclusive<i128>,
|
||||
mut to_remove: HashMap<i128, RoaringBitmap>,
|
||||
db: Database<BEI128, CboRoaringBitmapCodec>,
|
||||
) -> Result<()> {
|
||||
@ -546,6 +544,10 @@ impl IndexScheduler {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let min = to_remove.keys().min().cloned().unwrap_or(i128::MAX);
|
||||
let max = to_remove.keys().max().cloned().unwrap_or(i128::MIN);
|
||||
let range = min..=max;
|
||||
|
||||
let mut iter = db.rev_range_mut(wtxn, &range)?;
|
||||
while let Some(i) = iter.next() {
|
||||
let (timestamp, mut tasks) = i?;
|
||||
@ -564,31 +566,6 @@ impl IndexScheduler {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
trait UnixTimestampNanosOpt {
|
||||
fn unix_timestamp_nanos(self) -> Option<i128>;
|
||||
}
|
||||
impl UnixTimestampNanosOpt for Option<OffsetDateTime> {
|
||||
fn unix_timestamp_nanos(self) -> Option<i128> {
|
||||
self.map(|dt| dt.unix_timestamp_nanos())
|
||||
}
|
||||
}
|
||||
impl UnixTimestampNanosOpt for i128 {
|
||||
fn unix_timestamp_nanos(self) -> Option<i128> {
|
||||
Some(self)
|
||||
}
|
||||
}
|
||||
|
||||
/// Extends a range to include the given value
|
||||
fn extend_range(value: impl UnixTimestampNanosOpt, range: &mut RangeInclusive<i128>) {
|
||||
let Some(value) = value.unix_timestamp_nanos() else { return };
|
||||
if value < *range.start() {
|
||||
*range = value..=*range.end();
|
||||
}
|
||||
if value > *range.end() {
|
||||
*range = *range.start()..=value;
|
||||
}
|
||||
}
|
||||
|
||||
progress.update_progress(TaskDeletionProgress::RetrievingTasks);
|
||||
|
||||
let rtxn = self.env.read_txn()?;
|
||||
@ -607,9 +584,6 @@ impl IndexScheduler {
|
||||
let mut affected_kinds = HashSet::new();
|
||||
let mut affected_canceled_by = RoaringBitmap::new();
|
||||
let mut affected_batches: HashMap<BatchId, RoaringBitmap> = HashMap::new(); // The tasks that have been removed *per batches*.
|
||||
let mut enqueued_range = i128::MAX..=i128::MIN;
|
||||
let mut started_range = i128::MAX..=i128::MIN;
|
||||
let mut finished_range = i128::MAX..=i128::MIN;
|
||||
let mut tasks_enqueued_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
||||
let mut tasks_started_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
||||
let mut tasks_finished_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
||||
@ -625,17 +599,14 @@ impl IndexScheduler {
|
||||
affected_kinds.insert(task.kind.as_kind());
|
||||
|
||||
let enqueued_at = task.enqueued_at.unix_timestamp_nanos();
|
||||
extend_range(enqueued_at, &mut enqueued_range);
|
||||
tasks_enqueued_to_remove.entry(enqueued_at).or_default().insert(task_id);
|
||||
|
||||
if let Some(started_at) = task.started_at.unix_timestamp_nanos() {
|
||||
extend_range(started_at, &mut started_range);
|
||||
tasks_started_to_remove.entry(started_at).or_default().insert(task_id);
|
||||
if let Some(started_at) = task.started_at {
|
||||
tasks_started_to_remove.entry(started_at.unix_timestamp_nanos()).or_default().insert(task_id);
|
||||
}
|
||||
|
||||
if let Some(finished_at) = task.finished_at.unix_timestamp_nanos() {
|
||||
extend_range(finished_at, &mut finished_range);
|
||||
tasks_finished_to_remove.entry(finished_at).or_default().insert(task_id);
|
||||
if let Some(finished_at) = task.finished_at {
|
||||
tasks_finished_to_remove.entry(finished_at.unix_timestamp_nanos()).or_default().insert(task_id);
|
||||
}
|
||||
|
||||
if let Some(canceled_by) = task.canceled_by {
|
||||
@ -669,9 +640,6 @@ impl IndexScheduler {
|
||||
let mut to_remove_from_kinds = HashMap::new();
|
||||
|
||||
// 4. Read affected batches' tasks
|
||||
let mut batches_enqueued_range = i128::MAX..=i128::MIN;
|
||||
let mut batches_started_range = i128::MAX..=i128::MIN;
|
||||
let mut batches_finished_range = i128::MAX..=i128::MIN;
|
||||
let mut batches_enqueued_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
||||
let mut batches_started_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
||||
let mut batches_finished_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
||||
@ -745,8 +713,6 @@ impl IndexScheduler {
|
||||
if let Some(BatchEnqueuedAt { earliest, oldest }) = batch.enqueued_at {
|
||||
let earliest = earliest.unix_timestamp_nanos();
|
||||
let oldest = oldest.unix_timestamp_nanos();
|
||||
extend_range(earliest, &mut batches_enqueued_range);
|
||||
extend_range(oldest, &mut batches_enqueued_range);
|
||||
batches_enqueued_to_remove.entry(earliest).or_default().insert(batch_id);
|
||||
batches_enqueued_to_remove.entry(oldest).or_default().insert(batch_id);
|
||||
} else {
|
||||
@ -759,11 +725,9 @@ impl IndexScheduler {
|
||||
));
|
||||
}
|
||||
let started_at = batch.started_at.unix_timestamp_nanos();
|
||||
extend_range(started_at, &mut batches_started_range);
|
||||
batches_started_to_remove.entry(started_at).or_default().insert(batch_id);
|
||||
if let Some(finished_at) = batch.finished_at {
|
||||
let finished_at = finished_at.unix_timestamp_nanos();
|
||||
extend_range(finished_at, &mut batches_finished_range);
|
||||
batches_finished_to_remove.entry(finished_at).or_default().insert(batch_id);
|
||||
}
|
||||
|
||||
@ -780,21 +744,18 @@ impl IndexScheduler {
|
||||
|
||||
remove_datetimes(
|
||||
wtxn,
|
||||
enqueued_range,
|
||||
tasks_enqueued_to_remove,
|
||||
self.queue.tasks.enqueued_at,
|
||||
)?;
|
||||
|
||||
remove_datetimes(
|
||||
wtxn,
|
||||
started_range,
|
||||
tasks_started_to_remove,
|
||||
self.queue.tasks.started_at,
|
||||
)?;
|
||||
|
||||
remove_datetimes(
|
||||
wtxn,
|
||||
finished_range,
|
||||
tasks_finished_to_remove,
|
||||
self.queue.tasks.finished_at,
|
||||
)?;
|
||||
@ -814,21 +775,18 @@ impl IndexScheduler {
|
||||
|
||||
remove_datetimes(
|
||||
wtxn,
|
||||
batches_enqueued_range,
|
||||
batches_enqueued_to_remove,
|
||||
self.queue.batches.enqueued_at,
|
||||
)?;
|
||||
|
||||
remove_datetimes(
|
||||
wtxn,
|
||||
batches_started_range,
|
||||
batches_started_to_remove,
|
||||
self.queue.batches.started_at,
|
||||
)?;
|
||||
|
||||
remove_datetimes(
|
||||
wtxn,
|
||||
batches_finished_range,
|
||||
batches_finished_to_remove,
|
||||
self.queue.batches.finished_at,
|
||||
)?;
|
||||
|
Reference in New Issue
Block a user