add the mapping between the task and batches

This commit is contained in:
Tamo
2024-11-18 10:46:56 +01:00
parent a1251c3c83
commit d489f5635f
167 changed files with 730 additions and 68 deletions

View File

@ -94,7 +94,12 @@ impl IndexScheduler {
Ok(self.all_batches.get(rtxn, &batch_id)?)
}
pub(crate) fn write_batch(&self, wtxn: &mut RwTxn, batch: CachedBatch) -> Result<()> {
pub(crate) fn write_batch(
&self,
wtxn: &mut RwTxn,
batch: CachedBatch,
tasks: &RoaringBitmap,
) -> Result<()> {
self.all_batches.put(
wtxn,
&batch.uid,
@ -104,6 +109,7 @@ impl IndexScheduler {
finished_at: Some(batch.finished_at),
},
)?;
self.batch_to_tasks_mapping.put(wtxn, &batch.uid, tasks)?;
for status in batch.statuses {
self.update_batch_status(wtxn, status, |bitmap| {
@ -233,6 +239,11 @@ impl IndexScheduler {
Ok(())
}
/// Returns the whole set of tasks that belongs to this batch.
pub(crate) fn tasks_in_batch(&self, rtxn: &RoTxn, batch_id: BatchId) -> Result<RoaringBitmap> {
Ok(self.batch_to_tasks_mapping.get(rtxn, &batch_id)?.unwrap_or_default())
}
/// Returns the whole set of tasks that belongs to this index.
pub(crate) fn index_tasks(&self, rtxn: &RoTxn, index: &str) -> Result<RoaringBitmap> {
Ok(self.index_tasks.get(rtxn, index)?.unwrap_or_default())
@ -558,7 +569,6 @@ impl IndexScheduler {
let Task {
uid,
/// We should iterate over the list of batch to ensure this task is effectively in the right batch
batch_uid,
enqueued_at,
started_at,
@ -570,6 +580,14 @@ impl IndexScheduler {
kind,
} = task;
assert_eq!(uid, task.uid);
if let Some(ref batch) = batch_uid {
assert!(self
.batch_to_tasks_mapping
.get(&rtxn, batch)
.unwrap()
.unwrap()
.contains(uid));
}
if let Some(task_index_uid) = &task_index_uid {
assert!(self
.index_tasks