Introduce the task cancelation task type

This commit is contained in:
Kerollmops
2022-10-17 17:19:17 +02:00
committed by Clément Renault
parent f177c97671
commit 1ca9a67c49
7 changed files with 131 additions and 41 deletions

View File

@@ -113,7 +113,8 @@ pub enum KindDump {
lhs: String, lhs: String,
rhs: String, rhs: String,
}, },
CancelTask { TaskCancelation {
query: String,
tasks: Vec<TaskId>, tasks: Vec<TaskId>,
}, },
DeleteTasks { DeleteTasks {
@@ -181,7 +182,9 @@ impl From<KindWithContent> for KindDump {
KindDump::IndexUpdate { primary_key } KindDump::IndexUpdate { primary_key }
} }
KindWithContent::IndexSwap { lhs, rhs } => KindDump::IndexSwap { lhs, rhs }, KindWithContent::IndexSwap { lhs, rhs } => KindDump::IndexSwap { lhs, rhs },
KindWithContent::CancelTask { tasks } => KindDump::CancelTask { tasks }, KindWithContent::TaskCancelation { query, tasks } => {
KindDump::TaskCancelation { query, tasks }
}
KindWithContent::TaskDeletion { query, tasks } => { KindWithContent::TaskDeletion { query, tasks } => {
KindDump::DeleteTasks { query, tasks } KindDump::DeleteTasks { query, tasks }
} }

View File

@@ -22,7 +22,7 @@ enum AutobatchKind {
IndexDeletion, IndexDeletion,
IndexUpdate, IndexUpdate,
IndexSwap, IndexSwap,
CancelTask, TaskCancelation,
TaskDeletion, TaskDeletion,
DumpExport, DumpExport,
Snapshot, Snapshot,
@@ -62,7 +62,7 @@ impl From<KindWithContent> for AutobatchKind {
KindWithContent::IndexCreation { .. } => AutobatchKind::IndexCreation, KindWithContent::IndexCreation { .. } => AutobatchKind::IndexCreation,
KindWithContent::IndexUpdate { .. } => AutobatchKind::IndexUpdate, KindWithContent::IndexUpdate { .. } => AutobatchKind::IndexUpdate,
KindWithContent::IndexSwap { .. } => AutobatchKind::IndexSwap, KindWithContent::IndexSwap { .. } => AutobatchKind::IndexSwap,
KindWithContent::CancelTask { .. } => AutobatchKind::CancelTask, KindWithContent::TaskCancelation { .. } => AutobatchKind::TaskCancelation,
KindWithContent::TaskDeletion { .. } => AutobatchKind::TaskDeletion, KindWithContent::TaskDeletion { .. } => AutobatchKind::TaskDeletion,
KindWithContent::DumpExport { .. } => AutobatchKind::DumpExport, KindWithContent::DumpExport { .. } => AutobatchKind::DumpExport,
KindWithContent::Snapshot => AutobatchKind::Snapshot, KindWithContent::Snapshot => AutobatchKind::Snapshot,
@@ -153,7 +153,7 @@ impl BatchKind {
allow_index_creation, allow_index_creation,
settings_ids: vec![task_id], settings_ids: vec![task_id],
}), }),
K::DumpExport | K::Snapshot | K::CancelTask | K::TaskDeletion => { K::DumpExport | K::Snapshot | K::TaskCancelation | K::TaskDeletion => {
unreachable!() unreachable!()
} }
} }
@@ -378,7 +378,7 @@ impl BatchKind {
import_ids, import_ids,
}) })
} }
(_, K::CancelTask | K::TaskDeletion | K::DumpExport | K::Snapshot) => { (_, K::TaskCancelation | K::TaskDeletion | K::DumpExport | K::Snapshot) => {
unreachable!() unreachable!()
} }
( (

View File

@@ -1,3 +1,4 @@
use std::sync::atomic::Ordering::Relaxed;
use std::collections::HashSet; use std::collections::HashSet;
use std::fs::File; use std::fs::File;
use std::io::BufWriter; use std::io::BufWriter;
@@ -5,10 +6,8 @@ use std::io::BufWriter;
use crate::{autobatcher::BatchKind, Error, IndexScheduler, Result, TaskId}; use crate::{autobatcher::BatchKind, Error, IndexScheduler, Result, TaskId};
use dump::IndexMetadata; use dump::IndexMetadata;
use meilisearch_types::milli::documents::obkv_to_object;
use meilisearch_types::tasks::{Details, Kind, KindWithContent, Status, Task};
use log::{debug, info}; use log::{debug, info};
use meilisearch_types::milli::update::IndexDocumentsConfig; use meilisearch_types::milli::update::IndexDocumentsConfig;
use meilisearch_types::milli::update::{ use meilisearch_types::milli::update::{
DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsMethod, DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsMethod,
@@ -16,7 +15,9 @@ use meilisearch_types::milli::update::{
use meilisearch_types::milli::{ use meilisearch_types::milli::{
self, documents::DocumentsBatchReader, update::Settings as MilliSettings, BEU32, self, documents::DocumentsBatchReader, update::Settings as MilliSettings, BEU32,
}; };
use meilisearch_types::milli::documents::obkv_to_object;
use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked}; use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked};
use meilisearch_types::tasks::{Details, Kind, KindWithContent, Status, Task};
use meilisearch_types::{ use meilisearch_types::{
heed::{RoTxn, RwTxn}, heed::{RoTxn, RwTxn},
Index, Index,
@@ -27,7 +28,7 @@ use uuid::Uuid;
#[derive(Debug)] #[derive(Debug)]
pub(crate) enum Batch { pub(crate) enum Batch {
Cancel(Task), TaskCancelation(Task),
TaskDeletion(Task), TaskDeletion(Task),
Snapshot(Vec<Task>), Snapshot(Vec<Task>),
Dump(Task), Dump(Task),
@@ -103,7 +104,7 @@ pub(crate) enum IndexOperation {
impl Batch { impl Batch {
pub fn ids(&self) -> Vec<TaskId> { pub fn ids(&self) -> Vec<TaskId> {
match self { match self {
Batch::Cancel(task) Batch::TaskCancelation(task)
| Batch::TaskDeletion(task) | Batch::TaskDeletion(task)
| Batch::Dump(task) | Batch::Dump(task)
| Batch::IndexCreation { task, .. } | Batch::IndexCreation { task, .. }
@@ -378,11 +379,11 @@ impl IndexScheduler {
/// 5. We get the *next* tasks to process for a specific index. /// 5. We get the *next* tasks to process for a specific index.
pub(crate) fn create_next_batch(&self, rtxn: &RoTxn) -> Result<Option<Batch>> { pub(crate) fn create_next_batch(&self, rtxn: &RoTxn) -> Result<Option<Batch>> {
let enqueued = &self.get_status(rtxn, Status::Enqueued)?; let enqueued = &self.get_status(rtxn, Status::Enqueued)?;
let to_cancel = self.get_kind(rtxn, Kind::CancelTask)? & enqueued; let to_cancel = self.get_kind(rtxn, Kind::TaskCancelation)? & enqueued;
// 1. we get the last task to cancel. // 1. we get the last task to cancel.
if let Some(task_id) = to_cancel.max() { if let Some(task_id) = to_cancel.max() {
return Ok(Some(Batch::Cancel( return Ok(Some(Batch::TaskCancelation(
self.get_task(rtxn, task_id)? self.get_task(rtxn, task_id)?
.ok_or(Error::CorruptedTaskQueue)?, .ok_or(Error::CorruptedTaskQueue)?,
))); )));
@@ -457,7 +458,33 @@ impl IndexScheduler {
pub(crate) fn process_batch(&self, batch: Batch) -> Result<Vec<Task>> { pub(crate) fn process_batch(&self, batch: Batch) -> Result<Vec<Task>> {
match batch { match batch {
Batch::Cancel(_) => todo!(), Batch::TaskCancelation(mut task) => {
// 1. Retrieve the tasks that matched the query at enqueue-time.
let matched_tasks =
if let KindWithContent::TaskCancelation { tasks, query: _ } = &task.kind {
tasks
} else {
unreachable!()
};
let mut wtxn = self.env.write_txn()?;
let nbr_canceled_tasks = self.cancel_matched_tasks(&mut wtxn, matched_tasks)?;
task.status = Status::Succeeded;
match &mut task.details {
Some(Details::TaskCancelation {
matched_tasks: _,
canceled_tasks,
original_query: _,
}) => {
*canceled_tasks = Some(nbr_canceled_tasks);
}
_ => unreachable!(),
}
wtxn.commit()?;
Ok(vec![task])
}
Batch::TaskDeletion(mut task) => { Batch::TaskDeletion(mut task) => {
// 1. Retrieve the tasks that matched the query at enqueue-time. // 1. Retrieve the tasks that matched the query at enqueue-time.
let matched_tasks = let matched_tasks =
@@ -652,7 +679,11 @@ impl IndexScheduler {
self.index_mapper.indexer_config(), self.index_mapper.indexer_config(),
); );
builder.set_primary_key(primary_key); builder.set_primary_key(primary_key);
builder.execute(|_| ())?; let must_stop = self.processing_tasks.read().unwrap().must_stop.clone();
builder.execute(
|indexing_step| debug!("update: {:?}", indexing_step),
|| must_stop.load(Relaxed),
)?;
index_wtxn.commit()?; index_wtxn.commit()?;
} }
@@ -730,6 +761,7 @@ impl IndexScheduler {
content_files, content_files,
mut tasks, mut tasks,
} => { } => {
let must_stop = self.processing_tasks.read().unwrap().must_stop.clone();
let indexer_config = self.index_mapper.indexer_config(); let indexer_config = self.index_mapper.indexer_config();
// TODO use the code from the IndexCreate operation // TODO use the code from the IndexCreate operation
if let Some(primary_key) = primary_key { if let Some(primary_key) = primary_key {
@@ -737,7 +769,10 @@ impl IndexScheduler {
let mut builder = let mut builder =
milli::update::Settings::new(index_wtxn, index, indexer_config); milli::update::Settings::new(index_wtxn, index, indexer_config);
builder.set_primary_key(primary_key); builder.set_primary_key(primary_key);
builder.execute(|_| ())?; builder.execute(
|indexing_step| debug!("update: {:?}", indexing_step),
|| must_stop.clone().load(Relaxed),
)?;
} }
} }
@@ -752,6 +787,7 @@ impl IndexScheduler {
indexer_config, indexer_config,
config, config,
|indexing_step| debug!("update: {:?}", indexing_step), |indexing_step| debug!("update: {:?}", indexing_step),
|| must_stop.load(Relaxed),
)?; )?;
let mut results = Vec::new(); let mut results = Vec::new();
@@ -845,9 +881,11 @@ impl IndexScheduler {
let mut builder = let mut builder =
milli::update::Settings::new(index_wtxn, index, indexer_config); milli::update::Settings::new(index_wtxn, index, indexer_config);
apply_settings_to_builder(&checked_settings, &mut builder); apply_settings_to_builder(&checked_settings, &mut builder);
builder.execute(|indexing_step| { let must_stop = self.processing_tasks.read().unwrap().must_stop.clone();
debug!("update: {:?}", indexing_step); builder.execute(
})?; |indexing_step| debug!("update: {:?}", indexing_step),
|| must_stop.load(Relaxed),
)?;
task.status = Status::Succeeded; task.status = Status::Succeeded;
} }

View File

@@ -18,6 +18,7 @@ use std::sync::{Arc, RwLock};
use file_store::FileStore; use file_store::FileStore;
use meilisearch_types::error::ResponseError; use meilisearch_types::error::ResponseError;
use meilisearch_types::milli;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use synchronoise::SignalEvent; use synchronoise::SignalEvent;
@@ -129,23 +130,26 @@ struct ProcessingTasks {
} }
impl ProcessingTasks { impl ProcessingTasks {
/// Stores the currently processing tasks, the date time at which it started
/// and resets the _must stop_ flag.
fn start_processing_at(&mut self, started_at: OffsetDateTime, processing: RoaringBitmap) { fn start_processing_at(&mut self, started_at: OffsetDateTime, processing: RoaringBitmap) {
self.started_at = started_at; self.started_at = started_at;
self.processing = processing; self.processing = processing;
self.must_stop.store(false, Relaxed);
} }
/// Resets the processing tasks to an empty list.
fn stop_processing_at(&mut self, stopped_at: OffsetDateTime) { fn stop_processing_at(&mut self, stopped_at: OffsetDateTime) {
self.started_at = stopped_at; self.started_at = stopped_at;
self.processing = RoaringBitmap::new(); self.processing = RoaringBitmap::new();
} }
fn cancel_processing_tasks(&self, canceled_tasks: &RoaringBitmap) -> bool { /// Forces the currently processing tasks to stop running if necessary.
fn cancel_processing_tasks(&self, canceled_tasks: &RoaringBitmap) {
// If there, at least, is one task that is currently processing we must stop. // If there, at least, is one task that is currently processing we must stop.
let must_stop = !self.processing.is_disjoint(canceled_tasks); if !self.processing.is_disjoint(canceled_tasks) {
if must_stop {
self.must_stop.store(true, Relaxed); self.must_stop.store(true, Relaxed);
} }
must_stop
} }
} }
@@ -171,6 +175,7 @@ pub struct IndexScheduler {
pub(crate) all_tasks: Database<OwnedType<BEU32>, SerdeJson<Task>>, pub(crate) all_tasks: Database<OwnedType<BEU32>, SerdeJson<Task>>,
/// All the tasks ids grouped by their status. /// All the tasks ids grouped by their status.
// TODO we should not be able to serialize a `Status::Processing` in this database.
pub(crate) status: Database<SerdeBincode<Status>, RoaringBitmapCodec>, pub(crate) status: Database<SerdeBincode<Status>, RoaringBitmapCodec>,
/// All the tasks ids grouped by their kind. /// All the tasks ids grouped by their kind.
pub(crate) kind: Database<SerdeBincode<Kind>, RoaringBitmapCodec>, pub(crate) kind: Database<SerdeBincode<Kind>, RoaringBitmapCodec>,
@@ -354,7 +359,11 @@ impl IndexScheduler {
.take(query.limit.unwrap_or(u32::MAX) as usize), .take(query.limit.unwrap_or(u32::MAX) as usize),
)?; )?;
let ProcessingTasks { started_at, processing, .. } = self let ProcessingTasks {
started_at,
processing,
..
} = self
.processing_tasks .processing_tasks
.read() .read()
.map_err(|_| Error::CorruptedTaskQueue)? .map_err(|_| Error::CorruptedTaskQueue)?
@@ -379,7 +388,7 @@ impl IndexScheduler {
/// Register a new task in the scheduler. If it fails and data was associated with the task /// Register a new task in the scheduler. If it fails and data was associated with the task
/// it tries to delete the file. /// it tries to delete the file.
pub fn register(&self, task: KindWithContent) -> Result<Task> { pub fn register(&self, kind: KindWithContent) -> Result<Task> {
let mut wtxn = self.env.write_txn()?; let mut wtxn = self.env.write_txn()?;
let task = Task { let task = Task {
@@ -388,9 +397,9 @@ impl IndexScheduler {
started_at: None, started_at: None,
finished_at: None, finished_at: None,
error: None, error: None,
details: (&task).into(), details: kind.default_details(),
status: Status::Enqueued, status: Status::Enqueued,
kind: task, kind: kind.clone(),
}; };
self.all_tasks self.all_tasks
.append(&mut wtxn, &BEU32::new(task.uid), &task)?; .append(&mut wtxn, &BEU32::new(task.uid), &task)?;
@@ -419,6 +428,16 @@ impl IndexScheduler {
} }
} }
// If the registered task is a task cancelation
// we inform the processing tasks to stop (if necessary).
if let KindWithContent::TaskCancelation { tasks, .. } = kind {
let tasks_to_cancel = RoaringBitmap::from_iter(tasks);
self.processing_tasks
.read()
.unwrap()
.cancel_processing_tasks(&tasks_to_cancel);
}
// notify the scheduler loop to execute a new tick // notify the scheduler loop to execute a new tick
self.wake_up.signal(); self.wake_up.signal();
@@ -504,7 +523,9 @@ impl IndexScheduler {
primary_key, primary_key,
}, },
KindDump::IndexSwap { lhs, rhs } => KindWithContent::IndexSwap { lhs, rhs }, KindDump::IndexSwap { lhs, rhs } => KindWithContent::IndexSwap { lhs, rhs },
KindDump::CancelTask { tasks } => KindWithContent::CancelTask { tasks }, KindDump::TaskCancelation { query, tasks } => {
KindWithContent::TaskCancelation { query, tasks }
}
KindDump::DeleteTasks { query, tasks } => { KindDump::DeleteTasks { query, tasks } => {
KindWithContent::TaskDeletion { query, tasks } KindWithContent::TaskDeletion { query, tasks }
} }
@@ -618,6 +639,14 @@ impl IndexScheduler {
} }
log::info!("A batch of tasks was successfully completed."); log::info!("A batch of tasks was successfully completed.");
} }
// If we have an abortion error we must stop the tick here and re-schedule tasks.
Err(Error::Milli(milli::Error::InternalError(
milli::InternalError::AbortedIndexation,
))) => {
// TODO should we add a breakpoint here?
wtxn.abort()?;
return Ok(0);
}
// In case of a failure we must get back and patch all the tasks with the error. // In case of a failure we must get back and patch all the tasks with the error.
Err(err) => { Err(err) => {
let error: ResponseError = err.into(); let error: ResponseError = err.into();
@@ -796,7 +825,10 @@ mod tests {
let kinds = [ let kinds = [
index_creation_task("catto", "mouse"), index_creation_task("catto", "mouse"),
replace_document_import_task("catto", None, 0, 12), replace_document_import_task("catto", None, 0, 12),
KindWithContent::CancelTask { tasks: vec![0, 1] }, KindWithContent::TaskCancelation {
query: format!("uid=0,1"),
tasks: vec![0, 1],
},
replace_document_import_task("catto", None, 1, 50), replace_document_import_task("catto", None, 1, 50),
replace_document_import_task("doggo", Some("bone"), 2, 5000), replace_document_import_task("doggo", Some("bone"), 2, 5000),
]; ];

View File

@@ -31,7 +31,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
let mut snap = String::new(); let mut snap = String::new();
let processing_tasks = processing_tasks.read().unwrap().processing; let processing_tasks = processing_tasks.read().unwrap().processing.clone();
snap.push_str(&format!( snap.push_str(&format!(
"### Autobatching Enabled = {autobatching_enabled}\n" "### Autobatching Enabled = {autobatching_enabled}\n"
)); ));
@@ -143,6 +143,13 @@ fn snaphsot_details(d: &Details) -> String {
Details::ClearAll { deleted_documents } => { Details::ClearAll { deleted_documents } => {
format!("{{ deleted_documents: {deleted_documents:?} }}") format!("{{ deleted_documents: {deleted_documents:?} }}")
}, },
Details::TaskCancelation {
matched_tasks,
canceled_tasks,
original_query,
} => {
format!("{{ matched_tasks: {matched_tasks:?}, canceled_tasks: {canceled_tasks:?}, original_query: {original_query:?} }}")
}
Details::TaskDeletion { Details::TaskDeletion {
matched_tasks, matched_tasks,
deleted_tasks, deleted_tasks,

View File

@@ -23,7 +23,7 @@ async fn import_dump_v1() {
}; };
let error = Server::new_with_options(options) let error = Server::new_with_options(options)
.await .await
.map(|_| ()) .map(drop)
.unwrap_err(); .unwrap_err();
assert_eq!(error.to_string(), "The version 1 of the dumps is not supported anymore. You can re-export your dump from a version between 0.21 and 0.24, or start fresh from a version 0.25 onwards."); assert_eq!(error.to_string(), "The version 1 of the dumps is not supported anymore. You can re-export your dump from a version between 0.21 and 0.24, or start fresh from a version 0.25 onwards.");

View File

@@ -44,7 +44,7 @@ impl Task {
match &self.kind { match &self.kind {
DumpExport { .. } DumpExport { .. }
| Snapshot | Snapshot
| CancelTask { .. } | TaskCancelation { .. }
| TaskDeletion { .. } | TaskDeletion { .. }
| IndexSwap { .. } => None, | IndexSwap { .. } => None,
DocumentImport { index_uid, .. } DocumentImport { index_uid, .. }
@@ -62,7 +62,7 @@ impl Task {
use KindWithContent::*; use KindWithContent::*;
match &self.kind { match &self.kind {
DumpExport { .. } | Snapshot | CancelTask { .. } | TaskDeletion { .. } => None, DumpExport { .. } | Snapshot | TaskCancelation { .. } | TaskDeletion { .. } => None,
DocumentImport { index_uid, .. } DocumentImport { index_uid, .. }
| DocumentDeletion { index_uid, .. } | DocumentDeletion { index_uid, .. }
| DocumentClear { index_uid } | DocumentClear { index_uid }
@@ -87,7 +87,7 @@ impl Task {
| KindWithContent::IndexCreation { .. } | KindWithContent::IndexCreation { .. }
| KindWithContent::IndexUpdate { .. } | KindWithContent::IndexUpdate { .. }
| KindWithContent::IndexSwap { .. } | KindWithContent::IndexSwap { .. }
| KindWithContent::CancelTask { .. } | KindWithContent::TaskCancelation { .. }
| KindWithContent::TaskDeletion { .. } | KindWithContent::TaskDeletion { .. }
| KindWithContent::DumpExport { .. } | KindWithContent::DumpExport { .. }
| KindWithContent::Snapshot => None, | KindWithContent::Snapshot => None,
@@ -95,7 +95,7 @@ impl Task {
} }
} }
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub enum KindWithContent { pub enum KindWithContent {
DocumentImport { DocumentImport {
@@ -134,7 +134,8 @@ pub enum KindWithContent {
lhs: String, lhs: String,
rhs: String, rhs: String,
}, },
CancelTask { TaskCancelation {
query: String,
tasks: Vec<TaskId>, tasks: Vec<TaskId>,
}, },
TaskDeletion { TaskDeletion {
@@ -160,7 +161,7 @@ impl KindWithContent {
KindWithContent::IndexDeletion { .. } => Kind::IndexDeletion, KindWithContent::IndexDeletion { .. } => Kind::IndexDeletion,
KindWithContent::IndexUpdate { .. } => Kind::IndexUpdate, KindWithContent::IndexUpdate { .. } => Kind::IndexUpdate,
KindWithContent::IndexSwap { .. } => Kind::IndexSwap, KindWithContent::IndexSwap { .. } => Kind::IndexSwap,
KindWithContent::CancelTask { .. } => Kind::CancelTask, KindWithContent::TaskCancelation { .. } => Kind::TaskCancelation,
KindWithContent::TaskDeletion { .. } => Kind::TaskDeletion, KindWithContent::TaskDeletion { .. } => Kind::TaskDeletion,
KindWithContent::DumpExport { .. } => Kind::DumpExport, KindWithContent::DumpExport { .. } => Kind::DumpExport,
KindWithContent::Snapshot => Kind::Snapshot, KindWithContent::Snapshot => Kind::Snapshot,
@@ -171,7 +172,7 @@ impl KindWithContent {
use KindWithContent::*; use KindWithContent::*;
match self { match self {
DumpExport { .. } | Snapshot | CancelTask { .. } | TaskDeletion { .. } => None, DumpExport { .. } | Snapshot | TaskCancelation { .. } | TaskDeletion { .. } => None,
DocumentImport { index_uid, .. } DocumentImport { index_uid, .. }
| DocumentDeletion { index_uid, .. } | DocumentDeletion { index_uid, .. }
| DocumentClear { index_uid } | DocumentClear { index_uid }
@@ -214,7 +215,7 @@ impl KindWithContent {
KindWithContent::IndexSwap { .. } => { KindWithContent::IndexSwap { .. } => {
todo!() todo!()
} }
KindWithContent::CancelTask { .. } => { KindWithContent::TaskCancelation { .. } => {
None // TODO: check correctness of this return value None // TODO: check correctness of this return value
} }
KindWithContent::TaskDeletion { query, tasks } => Some(Details::TaskDeletion { KindWithContent::TaskDeletion { query, tasks } => Some(Details::TaskDeletion {
@@ -250,7 +251,7 @@ impl From<&KindWithContent> for Option<Details> {
primary_key: primary_key.clone(), primary_key: primary_key.clone(),
}), }),
KindWithContent::IndexSwap { .. } => None, KindWithContent::IndexSwap { .. } => None,
KindWithContent::CancelTask { .. } => None, KindWithContent::TaskCancelation { .. } => None,
KindWithContent::TaskDeletion { query, tasks } => Some(Details::TaskDeletion { KindWithContent::TaskDeletion { query, tasks } => Some(Details::TaskDeletion {
matched_tasks: tasks.len(), matched_tasks: tasks.len(),
deleted_tasks: None, deleted_tasks: None,
@@ -327,7 +328,7 @@ pub enum Kind {
IndexDeletion, IndexDeletion,
IndexUpdate, IndexUpdate,
IndexSwap, IndexSwap,
CancelTask, TaskCancelation,
TaskDeletion, TaskDeletion,
DumpExport, DumpExport,
Snapshot, Snapshot,
@@ -349,6 +350,10 @@ impl FromStr for Kind {
Ok(Kind::DocumentDeletion) Ok(Kind::DocumentDeletion)
} else if kind.eq_ignore_ascii_case("settingsUpdate") { } else if kind.eq_ignore_ascii_case("settingsUpdate") {
Ok(Kind::Settings) Ok(Kind::Settings)
} else if kind.eq_ignore_ascii_case("TaskCancelation") {
Ok(Kind::TaskCancelation)
} else if kind.eq_ignore_ascii_case("TaskDeletion") {
Ok(Kind::TaskDeletion)
} else if kind.eq_ignore_ascii_case("dumpCreation") { } else if kind.eq_ignore_ascii_case("dumpCreation") {
Ok(Kind::DumpExport) Ok(Kind::DumpExport)
} else { } else {
@@ -392,6 +397,11 @@ pub enum Details {
ClearAll { ClearAll {
deleted_documents: Option<u64>, deleted_documents: Option<u64>,
}, },
TaskCancelation {
matched_tasks: usize,
canceled_tasks: Option<usize>,
original_query: String,
},
TaskDeletion { TaskDeletion {
matched_tasks: u64, matched_tasks: u64,
deleted_tasks: Option<usize>, deleted_tasks: Option<usize>,