Implement TaskDeletion in the index scheduler

This commit is contained in:
Loïc Lecrenier
2022-10-06 16:53:21 +02:00
committed by Clément Renault
parent fe84f2648b
commit dc81992eb2
4 changed files with 248 additions and 14 deletions

View File

@@ -71,7 +71,7 @@ impl BatchKind {
allow_index_creation,
settings_ids: vec![task_id],
}),
Kind::DumpExport | Kind::Snapshot | Kind::CancelTask => unreachable!(),
Kind::DumpExport | Kind::Snapshot | Kind::CancelTask | Kind::DeleteTasks => unreachable!(),
}
}
@@ -320,7 +320,9 @@ impl BatchKind {
import_ids,
})
}
(_, Kind::CancelTask | Kind::DumpExport | Kind::Snapshot) => unreachable!(),
(_, Kind::CancelTask | Kind::DeleteTasks | Kind::DumpExport | Kind::Snapshot) => {
unreachable!()
}
(
BatchKind::IndexCreation { .. }
| BatchKind::IndexDeletion { .. }

View File

@@ -7,14 +7,16 @@ use index::apply_settings_to_builder;
use index::error::IndexError;
use index::{Settings, Unchecked};
use log::{debug, info};
use milli::documents::DocumentsBatchReader;
use milli::heed::{RoTxn, RwTxn};
use milli::update::IndexDocumentsConfig;
use milli::update::{DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsMethod};
use milli::{documents::DocumentsBatchReader, BEU32};
use roaring::RoaringBitmap;
use uuid::Uuid;
pub(crate) enum Batch {
Cancel(Task),
DeleteTasks(Task),
Snapshot(Vec<Task>),
Dump(Vec<Task>),
IndexOperation(IndexOperation),
@@ -89,6 +91,7 @@ impl Batch {
pub fn ids(&self) -> Vec<TaskId> {
match self {
Batch::Cancel(task)
| Batch::DeleteTasks(task)
| Batch::IndexCreation { task, .. }
| Batch::IndexUpdate { task, .. } => vec![task.uid],
Batch::Snapshot(tasks) | Batch::Dump(tasks) | Batch::IndexDeletion { tasks, .. } => {
@@ -355,9 +358,10 @@ impl IndexScheduler {
/// Create the next batch to be processed;
/// 1. We get the *last* task to cancel.
/// 2. We get the *next* snapshot to process.
/// 3. We get the *next* dump to process.
/// 4. We get the *next* tasks to process for a specific index.
/// 2. We get the *next* task to delete.
/// 3. We get the *next* snapshot to process.
/// 4. We get the *next* dump to process.
/// 5. We get the *next* tasks to process for a specific index.
pub(crate) fn create_next_batch(&self, rtxn: &RoTxn) -> Result<Option<Batch>> {
let enqueued = &self.get_status(rtxn, Status::Enqueued)?;
let to_cancel = self.get_kind(rtxn, Kind::CancelTask)? & enqueued;
@@ -370,7 +374,17 @@ impl IndexScheduler {
)));
}
// 2. we batch the snapshot.
// 2. we get the next task to delete
let to_delete = self.get_kind(rtxn, Kind::DeleteTasks)?;
if let Some(task_id) = to_delete.min() {
let task = self
.get_task(rtxn, task_id)?
.ok_or(Error::CorruptedTaskQueue)?;
println!("DeletionTask: {task:?}");
return Ok(Some(Batch::DeleteTasks(task)));
}
// 3. we batch the snapshot.
let to_snapshot = self.get_kind(rtxn, Kind::Snapshot)? & enqueued;
if !to_snapshot.is_empty() {
return Ok(Some(Batch::Snapshot(
@@ -378,13 +392,13 @@ impl IndexScheduler {
)));
}
// 3. we batch the dumps.
// 4. we batch the dumps.
let to_dump = self.get_kind(rtxn, Kind::DumpExport)? & enqueued;
if !to_dump.is_empty() {
return Ok(Some(Batch::Dump(self.get_existing_tasks(rtxn, to_dump)?)));
}
// 4. We take the next task and try to batch all the tasks associated with this index.
// 5. We take the next task and try to batch all the tasks associated with this index.
if let Some(task_id) = enqueued.min() {
let task = self
.get_task(rtxn, task_id)?
@@ -427,6 +441,34 @@ impl IndexScheduler {
pub(crate) fn process_batch(&self, batch: Batch) -> Result<Vec<Task>> {
match batch {
Batch::Cancel(_) => todo!(),
Batch::DeleteTasks(mut task) => {
println!("delete task: {task:?}");
// 1. Retrieve the tasks that matched the quety at enqueue-time
let matched_tasks =
if let KindWithContent::DeleteTasks { tasks, query: _ } = &task.kind {
tasks
} else {
unreachable!()
};
println!("matched tasks: {matched_tasks:?}");
let mut wtxn = self.env.write_txn()?;
let nbr_deleted_tasks = self.delete_matched_tasks(&mut wtxn, matched_tasks)?;
println!("nbr_deleted_tasks: {nbr_deleted_tasks}");
task.status = Status::Succeeded;
match &mut task.details {
Some(Details::DeleteTasks {
matched_tasks: _,
deleted_tasks,
original_query: _,
}) => {
*deleted_tasks = Some(nbr_deleted_tasks);
}
_ => unreachable!(),
}
wtxn.commit()?;
Ok(vec![task])
}
Batch::Snapshot(_) => todo!(),
Batch::Dump(_) => todo!(),
Batch::IndexOperation(operation) => {
@@ -481,6 +523,7 @@ impl IndexScheduler {
primary_key,
mut task,
} => {
println!("IndexUpdate task: {task:?}");
let rtxn = self.env.read_txn()?;
let index = self.index_mapper.index(&rtxn, &index_uid)?;
@@ -767,4 +810,55 @@ impl IndexScheduler {
}
}
}
/// Delete each given task from all the databases (if it is deleteable).
///
/// Return the number of tasks that were actually deleted
fn delete_matched_tasks(&self, wtxn: &mut RwTxn, matched_tasks: &[u32]) -> Result<usize> {
// 1. Remove from this list the tasks that we are not allowed to delete
let enqueued_tasks = self.get_status(wtxn, Status::Enqueued)?;
let processing_tasks = &self.processing_tasks.read().unwrap().1;
let to_delete_tasks = matched_tasks
.iter()
.filter(|&&task_id| {
!processing_tasks.contains(task_id) && !enqueued_tasks.contains(task_id)
})
.copied();
let to_delete_tasks = RoaringBitmap::from_iter(to_delete_tasks);
// 2. We now have a list of tasks to delete, delete them
for task_id in to_delete_tasks.iter() {
let task = self.all_tasks.get(wtxn, &BEU32::new(task_id))?.unwrap();
self.delete_task(wtxn, &task)?;
}
Ok(to_delete_tasks.len() as usize)
}
/// Delete the given task from all the databases
fn delete_task(&self, wtxn: &mut RwTxn, task: &Task) -> Result<()> {
let task_id = BEU32::new(task.uid);
if let Some(indexes) = task.indexes() {
for index in indexes {
self.update_index(wtxn, index, |bitmap| {
bitmap.remove(task.uid);
})?;
}
}
self.update_status(wtxn, task.status, |bitmap| {
bitmap.remove(task.uid);
})?;
self.update_kind(wtxn, task.kind.as_kind(), |bitmap| {
(bitmap.remove(task.uid));
})?;
task.remove_data()?;
self.all_tasks.delete(wtxn, &task_id)?;
Ok(())
}
}

View File

@@ -19,7 +19,7 @@ use std::sync::{Arc, RwLock};
use file_store::{File, FileStore};
use meilisearch_types::error::ResponseError;
use roaring::RoaringBitmap;
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use synchronoise::SignalEvent;
use time::OffsetDateTime;
use uuid::Uuid;
@@ -34,7 +34,7 @@ use crate::task::Task;
const DEFAULT_LIMIT: fn() -> u32 = || 20;
#[derive(derive_builder::Builder, Debug, Clone, Deserialize)]
#[derive(derive_builder::Builder, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct Query {
#[serde(default = "DEFAULT_LIMIT")]
@@ -319,7 +319,7 @@ impl IndexScheduler {
started_at: None,
finished_at: None,
error: None,
details: None,
details: task.default_details(),
status: Status::Enqueued,
kind: task,
};
@@ -449,6 +449,8 @@ impl IndexScheduler {
#[cfg(test)]
mod tests {
use std::os::unix::process;
use big_s::S;
use insta::*;
use milli::update::IndexDocumentsMethod::ReplaceDocuments;
@@ -688,6 +690,100 @@ mod tests {
assert_eq!(tasks[3].status, Status::Succeeded);
}
#[test]
fn task_deletion() {
let (index_scheduler, handle) = IndexScheduler::test();
let to_enqueue = [
KindWithContent::IndexCreation {
index_uid: S("catto"),
primary_key: Some(S("mouse")),
},
KindWithContent::DocumentImport {
index_uid: S("catto"),
primary_key: None,
method: ReplaceDocuments,
content_file: Uuid::new_v4(),
documents_count: 12,
allow_index_creation: true,
},
KindWithContent::DocumentImport {
index_uid: S("doggo"),
primary_key: Some(S("bone")),
method: ReplaceDocuments,
content_file: Uuid::new_v4(),
documents_count: 5000,
allow_index_creation: true,
},
];
for task in to_enqueue {
let _ = index_scheduler.register(task).unwrap();
}
let rtxn = index_scheduler.env.read_txn().unwrap();
let mut all_tasks = Vec::new();
for ret in index_scheduler.all_tasks.iter(&rtxn).unwrap() {
all_tasks.push(ret.unwrap().0);
}
rtxn.commit().unwrap();
assert_smol_debug_snapshot!(all_tasks, @"[U32(0), U32(1), U32(2)]");
index_scheduler.register(KindWithContent::DeleteTasks {
query: "test_query".to_owned(),
tasks: vec![0, 1],
});
let rtxn = index_scheduler.env.read_txn().unwrap();
let task = index_scheduler
.all_tasks
.get(&rtxn, &BEU32::new(3))
.unwrap()
.unwrap();
rtxn.commit().unwrap();
println!("TASK IN DB: {task:?}");
let rtxn = index_scheduler.env.read_txn().unwrap();
let mut all_tasks = Vec::new();
for ret in index_scheduler.all_tasks.iter(&rtxn).unwrap() {
all_tasks.push(ret.unwrap().0);
}
rtxn.commit().unwrap();
assert_smol_debug_snapshot!(all_tasks, @"[U32(0), U32(1), U32(2), U32(3)]");
handle.wait_till(Breakpoint::BatchCreated);
// the last task, with uid = 3, should be marked as processing
let processing_tasks = &index_scheduler.processing_tasks.read().unwrap().1;
assert_smol_debug_snapshot!(processing_tasks, @"RoaringBitmap<[3]>");
let rtxn = index_scheduler.env.read_txn().unwrap();
let task = index_scheduler
.all_tasks
.get(&rtxn, &BEU32::new(3))
.unwrap()
.unwrap();
rtxn.commit().unwrap();
println!("TASK IN DB: {task:?}");
// handle.wait_till(Breakpoint::AfterProcessing);
// let processing_tasks = &index_scheduler.processing_tasks.read().unwrap().1;
// assert_smol_debug_snapshot!(processing_tasks, @"RoaringBitmap<[]>");
// let rtxn = index_scheduler.env.read_txn().unwrap();
// let mut all_tasks = Vec::new();
// for ret in index_scheduler.all_tasks.iter(&rtxn).unwrap() {
// all_tasks.push(ret.unwrap().0);
// }
// rtxn.commit().unwrap();
// assert_smol_debug_snapshot!(all_tasks, @"[U32(0), U32(1), U32(2), U32(3)]");
// index_scheduler.register(KindWithContent::DocumentClear { index_uid: 0 });
// index_scheduler.register(KindWithContent::CancelTask { tasks: vec![0] });
// index_scheduler.register(KindWithContendt::DeleteTasks { tasks: vec![0] });
}
#[test]
fn document_addition() {
let (index_scheduler, handle) = IndexScheduler::test(true);

View File

@@ -8,7 +8,7 @@ use std::{fmt::Write, path::PathBuf, str::FromStr};
use time::{Duration, OffsetDateTime};
use uuid::Uuid;
use crate::{Error, TaskId};
use crate::{Error, Query, TaskId};
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
@@ -170,6 +170,10 @@ pub enum KindWithContent {
CancelTask {
tasks: Vec<TaskId>,
},
DeleteTasks {
query: String,
tasks: Vec<TaskId>,
},
DumpExport {
output: PathBuf,
},
@@ -200,6 +204,7 @@ impl KindWithContent {
KindWithContent::IndexUpdate { .. } => Kind::IndexUpdate,
KindWithContent::IndexSwap { .. } => Kind::IndexSwap,
KindWithContent::CancelTask { .. } => Kind::CancelTask,
KindWithContent::DeleteTasks { .. } => Kind::DeleteTasks,
KindWithContent::DumpExport { .. } => Kind::DumpExport,
KindWithContent::Snapshot => Kind::Snapshot,
}
@@ -222,6 +227,7 @@ impl KindWithContent {
| IndexUpdate { .. }
| IndexSwap { .. }
| CancelTask { .. }
| DeleteTasks { .. }
| DumpExport { .. }
| Snapshot => Ok(()), // There is nothing to persist for all these tasks
}
@@ -244,6 +250,7 @@ impl KindWithContent {
| IndexUpdate { .. }
| IndexSwap { .. }
| CancelTask { .. }
| DeleteTasks { .. }
| DumpExport { .. }
| Snapshot => Ok(()), // There is no data associated with all these tasks
}
@@ -253,7 +260,7 @@ impl KindWithContent {
use KindWithContent::*;
match self {
DumpExport { .. } | Snapshot | CancelTask { .. } => None,
DumpExport { .. } | Snapshot | CancelTask { .. } | DeleteTasks { .. } => None,
DocumentImport { index_uid, .. }
| DocumentDeletion { index_uid, .. }
| DocumentClear { index_uid }
@@ -299,6 +306,11 @@ impl KindWithContent {
KindWithContent::CancelTask { .. } => {
todo!()
}
KindWithContent::DeleteTasks { query, tasks } => Some(Details::DeleteTasks {
matched_tasks: tasks.len(),
deleted_tasks: None,
original_query: query.clone(),
}),
KindWithContent::DumpExport { .. } => None,
KindWithContent::Snapshot => None,
}
@@ -322,6 +334,7 @@ pub enum Kind {
IndexUpdate,
IndexSwap,
CancelTask,
DeleteTasks,
DumpExport,
Snapshot,
}
@@ -352,6 +365,7 @@ impl FromStr for Kind {
"index_update" => Ok(Kind::IndexUpdate),
"index_swap" => Ok(Kind::IndexSwap),
"cancel_task" => Ok(Kind::CancelTask),
"delete_tasks" => Ok(Kind::DeleteTasks),
"dump_export" => Ok(Kind::DumpExport),
"snapshot" => Ok(Kind::Snapshot),
s => Err(Error::InvalidKind(s.to_string())),
@@ -384,6 +398,12 @@ pub enum Details {
#[serde(rename_all = "camelCase")]
ClearAll { deleted_documents: Option<u64> },
#[serde(rename_all = "camelCase")]
DeleteTasks {
matched_tasks: usize,
deleted_tasks: Option<usize>,
original_query: String,
},
#[serde(rename_all = "camelCase")]
Dump { dump_uid: String },
}
@@ -437,3 +457,25 @@ fn serialize_duration<S: Serializer>(
None => serializer.serialize_none(),
}
}
#[cfg(test)]
mod tests {
use milli::heed::{types::SerdeJson, BytesDecode, BytesEncode};
use crate::assert_smol_debug_snapshot;
use super::Details;
#[test]
fn bad_deser() {
let details = Details::DeleteTasks {
matched_tasks: 1,
deleted_tasks: None,
original_query: "hello".to_owned(),
};
let serialised = SerdeJson::<Details>::bytes_encode(&details).unwrap();
let deserialised = SerdeJson::<Details>::bytes_decode(&serialised).unwrap();
assert_smol_debug_snapshot!(details, @r###"DeleteTasks { matched_tasks: 1, deleted_tasks: None, original_query: "hello" }"###);
assert_smol_debug_snapshot!(deserialised, @"Settings { settings: Settings { displayed_attributes: NotSet, searchable_attributes: NotSet, filterable_attributes: NotSet, sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, synonyms: NotSet, distinct_attribute: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, _kind: PhantomData } }");
}
}