Implements the get all batches route with filters working

This commit is contained in:
Tamo
2024-11-14 17:31:02 +01:00
parent 6062914654
commit a1251c3c83
179 changed files with 5362 additions and 849 deletions

View File

@ -3,7 +3,7 @@
use std::collections::{BTreeSet, HashSet};
use std::ops::Bound;
use meilisearch_types::batches::BatchId;
use meilisearch_types::batches::{Batch, BatchId};
use meilisearch_types::heed::types::DecodeIgnore;
use meilisearch_types::heed::{Database, RoTxn, RwTxn};
use meilisearch_types::milli::CboRoaringBitmapCodec;
@ -11,13 +11,64 @@ use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status
use roaring::{MultiOps, RoaringBitmap};
use time::OffsetDateTime;
use crate::{Error, IndexScheduler, Result, Task, TaskId, BEI128};
use crate::{Error, IndexScheduler, ProcessingTasks, Result, Task, TaskId, BEI128};
/// This structure contains all the information required to write a batch in the database without reading the tasks.
/// It'll stay in RAM so it must be small.
pub(crate) struct CachedBatch {
uid: BatchId,
statuses: HashSet<Status>,
kinds: HashSet<Kind>,
indexes: HashSet<String>,
canceled_by: HashSet<TaskId>,
oldest_enqueued_at: Option<OffsetDateTime>,
earliest_enqueued_at: Option<OffsetDateTime>,
started_at: OffsetDateTime,
finished_at: OffsetDateTime,
}
impl CachedBatch {
pub fn new(uid: BatchId, started_at: OffsetDateTime, finished_at: OffsetDateTime) -> Self {
Self {
uid,
statuses: HashSet::default(),
kinds: HashSet::default(),
indexes: HashSet::default(),
canceled_by: HashSet::default(),
oldest_enqueued_at: None,
earliest_enqueued_at: None,
started_at,
finished_at,
}
}
pub fn update(&mut self, task: &Task) {
self.statuses.insert(task.status);
self.kinds.insert(task.kind.as_kind());
self.indexes.extend(task.indexes().iter().map(|s| s.to_string()));
if let Some(canceled_by) = task.canceled_by {
self.canceled_by.insert(canceled_by);
}
self.oldest_enqueued_at =
Some(self.oldest_enqueued_at.map_or(task.enqueued_at, |oldest_enqueued_at| {
task.enqueued_at.min(oldest_enqueued_at)
}));
self.earliest_enqueued_at =
Some(self.earliest_enqueued_at.map_or(task.enqueued_at, |earliest_enqueued_at| {
task.enqueued_at.max(earliest_enqueued_at)
}));
}
}
impl IndexScheduler {
pub(crate) fn all_task_ids(&self, rtxn: &RoTxn) -> Result<RoaringBitmap> {
enum_iterator::all().map(|s| self.get_status(rtxn, s)).union()
}
pub(crate) fn all_batch_ids(&self, rtxn: &RoTxn) -> Result<RoaringBitmap> {
enum_iterator::all().map(|s| self.get_batch_status(rtxn, s)).union()
}
pub(crate) fn last_task_id(&self, rtxn: &RoTxn) -> Result<Option<TaskId>> {
Ok(self.all_tasks.remap_data_type::<DecodeIgnore>().last(rtxn)?.map(|(k, _)| k + 1))
}
@ -39,6 +90,50 @@ impl IndexScheduler {
Ok(self.all_tasks.get(rtxn, &task_id)?)
}
pub(crate) fn get_batch(&self, rtxn: &RoTxn, batch_id: BatchId) -> Result<Option<Batch>> {
Ok(self.all_batches.get(rtxn, &batch_id)?)
}
pub(crate) fn write_batch(&self, wtxn: &mut RwTxn, batch: CachedBatch) -> Result<()> {
self.all_batches.put(
wtxn,
&batch.uid,
&Batch {
uid: batch.uid,
started_at: batch.started_at,
finished_at: Some(batch.finished_at),
},
)?;
for status in batch.statuses {
self.update_batch_status(wtxn, status, |bitmap| {
bitmap.insert(batch.uid);
})?;
}
for kind in batch.kinds {
self.update_batch_kind(wtxn, kind, |bitmap| {
bitmap.insert(batch.uid);
})?;
}
for index in batch.indexes {
self.update_batch_index(wtxn, &index, |bitmap| {
bitmap.insert(batch.uid);
})?;
}
if let Some(enqueued_at) = batch.oldest_enqueued_at {
insert_task_datetime(wtxn, self.batch_enqueued_at, enqueued_at, batch.uid)?;
}
if let Some(enqueued_at) = batch.earliest_enqueued_at {
insert_task_datetime(wtxn, self.batch_enqueued_at, enqueued_at, batch.uid)?;
}
insert_task_datetime(wtxn, self.batch_started_at, batch.started_at, batch.uid)?;
insert_task_datetime(wtxn, self.batch_finished_at, batch.finished_at, batch.uid)?;
Ok(())
}
/// Convert an iterator to a `Vec` of tasks. The tasks MUST exist or a
/// `CorruptedTaskQueue` error will be throwed.
pub(crate) fn get_existing_tasks_with_batch_id(
@ -72,11 +167,29 @@ impl IndexScheduler {
.collect::<Result<_>>()
}
/// Convert an iterator to a `Vec` of batches. The batches MUST exist or a
/// `CorruptedTaskQueue` error will be throwed.
pub(crate) fn get_existing_batches(
&self,
rtxn: &RoTxn,
tasks: impl IntoIterator<Item = BatchId>,
) -> Result<Vec<Batch>> {
tasks
.into_iter()
.map(|batch_id| {
self.get_batch(rtxn, batch_id)
.and_then(|task| task.ok_or(Error::CorruptedTaskQueue))
})
.collect::<Result<_>>()
}
pub(crate) fn update_task(&self, wtxn: &mut RwTxn, task: &Task) -> Result<()> {
let old_task = self.get_task(wtxn, task.uid)?.ok_or(Error::CorruptedTaskQueue)?;
debug_assert_eq!(old_task.uid, task.uid);
debug_assert!(old_task.batch_uid.is_none() && task.batch_uid.is_some());
// TODO: This shouldn't ever happen, we should assert it
if old_task == *task {
return Ok(());
}
@ -142,6 +255,28 @@ impl IndexScheduler {
Ok(())
}
/// Returns the whole set of batches that belongs to this index.
pub(crate) fn index_batches(&self, rtxn: &RoTxn, index: &str) -> Result<RoaringBitmap> {
Ok(self.batch_index_tasks.get(rtxn, index)?.unwrap_or_default())
}
pub(crate) fn update_batch_index(
&self,
wtxn: &mut RwTxn,
index: &str,
f: impl Fn(&mut RoaringBitmap),
) -> Result<()> {
let mut batches = self.index_batches(wtxn, index)?;
f(&mut batches);
if batches.is_empty() {
self.batch_index_tasks.delete(wtxn, index)?;
} else {
self.batch_index_tasks.put(wtxn, index, &batches)?;
}
Ok(())
}
pub(crate) fn get_status(&self, rtxn: &RoTxn, status: Status) -> Result<RoaringBitmap> {
Ok(self.status.get(rtxn, &status)?.unwrap_or_default())
}
@ -168,6 +303,32 @@ impl IndexScheduler {
Ok(())
}
pub(crate) fn get_batch_status(&self, rtxn: &RoTxn, status: Status) -> Result<RoaringBitmap> {
Ok(self.batch_status.get(rtxn, &status)?.unwrap_or_default())
}
pub(crate) fn put_batch_status(
&self,
wtxn: &mut RwTxn,
status: Status,
bitmap: &RoaringBitmap,
) -> Result<()> {
Ok(self.batch_status.put(wtxn, &status, bitmap)?)
}
pub(crate) fn update_batch_status(
&self,
wtxn: &mut RwTxn,
status: Status,
f: impl Fn(&mut RoaringBitmap),
) -> Result<()> {
let mut tasks = self.get_batch_status(wtxn, status)?;
f(&mut tasks);
self.put_batch_status(wtxn, status, &tasks)?;
Ok(())
}
pub(crate) fn get_kind(&self, rtxn: &RoTxn, kind: Kind) -> Result<RoaringBitmap> {
Ok(self.kind.get(rtxn, &kind)?.unwrap_or_default())
}
@ -193,6 +354,32 @@ impl IndexScheduler {
Ok(())
}
pub(crate) fn get_batch_kind(&self, rtxn: &RoTxn, kind: Kind) -> Result<RoaringBitmap> {
Ok(self.batch_kind.get(rtxn, &kind)?.unwrap_or_default())
}
pub(crate) fn put_batch_kind(
&self,
wtxn: &mut RwTxn,
kind: Kind,
bitmap: &RoaringBitmap,
) -> Result<()> {
Ok(self.batch_kind.put(wtxn, &kind, bitmap)?)
}
pub(crate) fn update_batch_kind(
&self,
wtxn: &mut RwTxn,
kind: Kind,
f: impl Fn(&mut RoaringBitmap),
) -> Result<()> {
let mut tasks = self.get_batch_kind(wtxn, kind)?;
f(&mut tasks);
self.put_batch_kind(wtxn, kind, &tasks)?;
Ok(())
}
}
pub(crate) fn insert_task_datetime(
@ -227,6 +414,7 @@ pub(crate) fn remove_task_datetime(
Ok(())
}
// TODO: Rename the function since it also applies to batches
pub(crate) fn keep_tasks_within_datetimes(
rtxn: &RoTxn,
tasks: &mut RoaringBitmap,