Add canceledBy task filter

This commit is contained in:
Loïc Lecrenier
2022-11-08 11:46:41 +01:00
parent d5638d2c27
commit 20fa103992
82 changed files with 415 additions and 29 deletions

View File

@@ -85,7 +85,9 @@ pub struct Query {
pub index_uids: Option<Vec<String>>,
/// The [task ids](`meilisearch_types::tasks::Task::uid`) to be matched
pub uids: Option<Vec<TaskId>>,
/// The [task ids](`meilisearch_types::tasks::Task::uid`) of the [`TaskCancelation`](meilisearch_types::tasks::Task::Kind::TaskCancelation) tasks
/// that canceled the matched tasks.
pub canceled_by: Option<Vec<TaskId>>,
/// Exclusive upper bound of the matched tasks' [`enqueued_at`](meilisearch_types::tasks::Task::enqueued_at) field.
pub before_enqueued_at: Option<OffsetDateTime>,
/// Exclusive lower bound of the matched tasks' [`enqueued_at`](meilisearch_types::tasks::Task::enqueued_at) field.
@@ -113,6 +115,7 @@ impl Query {
types: None,
index_uids: None,
uids: None,
canceled_by: None,
before_enqueued_at: None,
after_enqueued_at: None,
before_started_at: None,
@@ -185,6 +188,7 @@ mod db_name {
pub const STATUS: &str = "status";
pub const KIND: &str = "kind";
pub const INDEX_TASKS: &str = "index-tasks";
pub const CANCELED_BY: &str = "canceled_by";
pub const ENQUEUED_AT: &str = "enqueued-at";
pub const STARTED_AT: &str = "started-at";
pub const FINISHED_AT: &str = "finished-at";
@@ -256,6 +260,9 @@ pub struct IndexScheduler {
/// Store the tasks associated to an index.
pub(crate) index_tasks: Database<Str, RoaringBitmapCodec>,
/// Store the tasks that were canceled by a task uid
pub(crate) canceled_by: Database<OwnedType<BEU32>, RoaringBitmapCodec>,
/// Store the task ids of tasks which were enqueued at a specific date
pub(crate) enqueued_at: Database<OwnedType<BEI128>, CboRoaringBitmapCodec>,
@@ -316,6 +323,7 @@ impl IndexScheduler {
status: self.status,
kind: self.kind,
index_tasks: self.index_tasks,
canceled_by: self.canceled_by,
enqueued_at: self.enqueued_at,
started_at: self.started_at,
finished_at: self.finished_at,
@@ -349,7 +357,7 @@ impl IndexScheduler {
std::fs::create_dir_all(&options.dumps_path)?;
let env = heed::EnvOpenOptions::new()
.max_dbs(9)
.max_dbs(10)
.map_size(options.task_db_size)
.open(options.tasks_path)?;
let file_store = FileStore::new(&options.update_file_path)?;
@@ -363,6 +371,7 @@ impl IndexScheduler {
status: env.create_database(Some(db_name::STATUS))?,
kind: env.create_database(Some(db_name::KIND))?,
index_tasks: env.create_database(Some(db_name::INDEX_TASKS))?,
canceled_by: env.create_database(Some(db_name::CANCELED_BY))?,
enqueued_at: env.create_database(Some(db_name::ENQUEUED_AT))?,
started_at: env.create_database(Some(db_name::STARTED_AT))?,
finished_at: env.create_database(Some(db_name::FINISHED_AT))?,
@@ -403,7 +412,6 @@ impl IndexScheduler {
/// only once per index scheduler.
fn run(&self) {
let run = self.private_clone();
std::thread::spawn(move || loop {
run.wake_up.wait();
@@ -422,6 +430,7 @@ impl IndexScheduler {
) {
std::thread::sleep(Duration::from_secs(1));
}
run.wake_up.signal();
}
}
});
@@ -480,6 +489,16 @@ impl IndexScheduler {
tasks &= &uids;
}
if let Some(canceled_by) = &query.canceled_by {
for cancel_task_uid in canceled_by {
if let Some(canceled_by_uid) =
self.canceled_by.get(rtxn, &BEU32::new(*cancel_task_uid))?
{
tasks &= canceled_by_uid;
}
}
}
if let Some(kind) = &query.types {
let mut kind_tasks = RoaringBitmap::new();
for kind in kind {
@@ -590,9 +609,9 @@ impl IndexScheduler {
) -> Result<RoaringBitmap> {
let mut tasks = self.get_task_ids(rtxn, query)?;
// If the query contains a list of `index_uid`, then we must exclude all the kind that
// arn't associated to one and only one index.
if query.index_uids.is_some() {
// If the query contains a list of index uid or there is a finite list of authorized indexes,
// then we must exclude all the kinds that aren't associated to one and only one index.
if query.index_uids.is_some() || authorized_indexes.is_some() {
for kind in enum_iterator::all::<Kind>().filter(|kind| !kind.related_to_one_index()) {
tasks -= self.get_kind(rtxn, kind)?;
}
@@ -1786,6 +1805,7 @@ mod tests {
.unwrap();
index_scheduler.assert_internally_consistent();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "cancel_task_registered");
// Now we check that we can reach the AbortedIndexation error handling
handle.wait_till(Breakpoint::AbortedIndexation);
index_scheduler.assert_internally_consistent();
@@ -2449,7 +2469,7 @@ mod tests {
.unwrap();
// we asked for all the tasks, but we are only authorized to retrieve the doggo and catto tasks
// -> all tasks except the swap of catto with whalo are returned
snapshot!(snapshot_bitmap(&tasks), @"[0,1,2,]");
snapshot!(snapshot_bitmap(&tasks), @"[0,1,]");
let query = Query::default();
let tasks =
@@ -2459,23 +2479,43 @@ mod tests {
}
#[test]
fn fail_in_create_batch_for_index_creation() {
fn query_tasks_canceled_by() {
let (index_scheduler, handle) =
IndexScheduler::test(true, vec![(1, FailureLocation::InsideCreateBatch)]);
IndexScheduler::test(true, vec![(3, FailureLocation::InsideProcessBatch)]);
let kinds = [index_creation_task("catto", "mouse")];
let kind = index_creation_task("catto", "mouse");
let _ = index_scheduler.register(kind).unwrap();
let kind = index_creation_task("doggo", "sheep");
let _ = index_scheduler.register(kind).unwrap();
let kind = KindWithContent::IndexSwap {
swaps: vec![IndexSwap { indexes: ("catto".to_owned(), "doggo".to_owned()) }],
};
let _task = index_scheduler.register(kind).unwrap();
for kind in kinds {
let _task = index_scheduler.register(kind).unwrap();
index_scheduler.assert_internally_consistent();
}
handle.wait_till(Breakpoint::BatchCreated);
handle.advance_n_batch(1);
let kind = KindWithContent::TaskCancelation {
query: "test_query".to_string(),
tasks: [0, 1, 2, 3].into_iter().collect(),
};
let task_cancelation = index_scheduler.register(kind).unwrap();
handle.advance_n_batch(1);
// We skipped an iteration of `tick` to reach BatchCreated
assert_eq!(*index_scheduler.run_loop_iteration.read().unwrap(), 2);
// Otherwise nothing weird happened
index_scheduler.assert_internally_consistent();
snapshot!(snapshot_index_scheduler(&index_scheduler));
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "start");
let rtxn = index_scheduler.read_txn().unwrap();
let query = Query { canceled_by: Some(vec![task_cancelation.uid]), ..Query::default() };
let tasks =
index_scheduler.get_task_ids_from_authorized_indexes(&rtxn, &query, &None).unwrap();
// 0 is not returned because it was not canceled, 3 is not returned because it is the uid of the
// taskCancelation itself
snapshot!(snapshot_bitmap(&tasks), @"[1,2,]");
let query = Query { canceled_by: Some(vec![task_cancelation.uid]), ..Query::default() };
let tasks = index_scheduler
.get_task_ids_from_authorized_indexes(&rtxn, &query, &Some(vec!["doggo".to_string()]))
.unwrap();
// Return only 1 because the user is not authorized to see task 2
snapshot!(snapshot_bitmap(&tasks), @"[1,]");
}
#[test]