Move function to utils

This commit is contained in:
Mubelotix
2025-08-12 16:47:21 +02:00
parent 84244a74df
commit d7776ec82b
2 changed files with 27 additions and 28 deletions

View File

@ -1,5 +1,4 @@
use std::collections::{BTreeSet, HashMap, HashSet};
use std::ops::RangeInclusive;
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::sync::atomic::Ordering;
@ -18,7 +17,7 @@ use crate::processing::{
InnerSwappingTwoIndexes, SwappingTheIndexes, TaskCancelationProgress, TaskDeletionProgress,
UpdateIndexProgress,
};
use crate::utils::{swap_index_uid_in_task, ProcessingBatch};
use crate::utils::{consecutive_ranges, swap_index_uid_in_task, ProcessingBatch};
use crate::{Error, IndexScheduler, Result, TaskId, BEI128};
#[derive(Debug, Default)]
@ -511,30 +510,6 @@ impl IndexScheduler {
matched_tasks: &RoaringBitmap,
progress: &Progress,
) -> Result<RoaringBitmap> {
/// Given a **sorted** iterator of `u32`, return an iterator of the ranges of consecutive values it contains.
fn consecutive_ranges(
iter: impl IntoIterator<Item = u32>,
) -> impl Iterator<Item = RangeInclusive<u32>> {
let mut iter = iter.into_iter().peekable();
std::iter::from_fn(move || {
let start = iter.next()?;
let mut end = start;
while let Some(&next) = iter.peek() {
if next == end + 1 {
end = next;
iter.next();
} else {
break;
}
}
Some(start..=end)
})
}
fn remove_task_datetimes(
wtxn: &mut RwTxn<'_>,
mut to_remove: HashMap<i128, RoaringBitmap>,
@ -575,7 +550,7 @@ impl IndexScheduler {
}
}
if let Some(delete_range_start) = delete_range_start.take() {
delete_ranges.push(delete_range_start..(max+1));
delete_ranges.push(delete_range_start..(max + 1));
}
for range in delete_ranges {

View File

@ -2,7 +2,7 @@
use crate::milli::progress::EmbedderStats;
use std::collections::{BTreeSet, HashSet};
use std::ops::Bound;
use std::ops::{Bound, RangeInclusive};
use std::sync::Arc;
use meilisearch_types::batches::{Batch, BatchEnqueuedAt, BatchId, BatchStats};
@ -159,6 +159,30 @@ impl ProcessingBatch {
}
}
/// Given a **sorted** iterator of `u32`, return an iterator of the ranges of consecutive values it contains.
pub(crate) fn consecutive_ranges(
iter: impl IntoIterator<Item = u32>,
) -> impl Iterator<Item = RangeInclusive<u32>> {
let mut iter = iter.into_iter().peekable();
std::iter::from_fn(move || {
let start = iter.next()?;
let mut end = start;
while let Some(&next) = iter.peek() {
if next == end + 1 {
end = next;
iter.next();
} else {
break;
}
}
Some(start..=end)
})
}
pub(crate) fn insert_task_datetime(
wtxn: &mut RwTxn,
database: Database<BEI128, CboRoaringBitmapCodec>,