From 860c993ef782d12b5111cdbb6815eb3c69b558ce Mon Sep 17 00:00:00 2001 From: Tamo Date: Wed, 8 Feb 2023 20:53:19 +0100 Subject: [PATCH] Handle the autobatching of deletion and addition in the scheduler --- index-scheduler/src/batch.rs | 182 +++++++++++------- index-scheduler/src/lib.rs | 94 +++++++++ .../after_processing_the_batch.snap | 42 ++++ .../documents.snap | 9 + .../registered_the_first_task.snap | 37 ++++ .../registered_the_second_task.snap | 40 ++++ .../registered_the_first_task.snap | 36 ++++ .../registered_the_second_task.snap | 40 ++++ 8 files changed, 409 insertions(+), 71 deletions(-) create mode 100644 index-scheduler/src/snapshots/lib.rs/document_addition_and_document_deletion/after_processing_the_batch.snap create mode 100644 index-scheduler/src/snapshots/lib.rs/document_addition_and_document_deletion/documents.snap create mode 100644 index-scheduler/src/snapshots/lib.rs/document_addition_and_document_deletion/registered_the_first_task.snap create mode 100644 index-scheduler/src/snapshots/lib.rs/document_addition_and_document_deletion/registered_the_second_task.snap create mode 100644 index-scheduler/src/snapshots/lib.rs/document_deletion_and_document_addition/registered_the_first_task.snap create mode 100644 index-scheduler/src/snapshots/lib.rs/document_deletion_and_document_addition/registered_the_second_task.snap diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 50833c0b7..c3c4229a5 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -86,15 +86,27 @@ pub(crate) enum Batch { }, } +#[derive(Debug)] +pub(crate) enum DocumentOperation { + Add(Uuid), + Delete(Vec), +} + +#[derive(Debug)] +pub(crate) enum DocumentOperationResult { + Add(DocumentAdditionResult), + Delete(DocumentDeletionResult), +} + /// A [batch](Batch) that combines multiple tasks operating on an index. #[derive(Debug)] pub(crate) enum IndexOperation { - DocumentImport { + DocumentOperation { index_uid: String, primary_key: Option, method: IndexDocumentsMethod, documents_counts: Vec, - content_files: Vec, + operations: Vec, tasks: Vec, }, DocumentDeletion { @@ -121,13 +133,13 @@ pub(crate) enum IndexOperation { settings: Vec<(bool, Settings)>, settings_tasks: Vec, }, - SettingsAndDocumentImport { + SettingsAndDocumentOperation { index_uid: String, primary_key: Option, method: IndexDocumentsMethod, documents_counts: Vec, - content_files: Vec, + operations: Vec, document_import_tasks: Vec, // The boolean indicates if it's a settings deletion or creation. @@ -149,13 +161,13 @@ impl Batch { tasks.iter().map(|task| task.uid).collect() } Batch::IndexOperation { op, .. } => match op { - IndexOperation::DocumentImport { tasks, .. } + IndexOperation::DocumentOperation { tasks, .. } | IndexOperation::DocumentDeletion { tasks, .. } | IndexOperation::Settings { tasks, .. } | IndexOperation::DocumentClear { tasks, .. } => { tasks.iter().map(|task| task.uid).collect() } - IndexOperation::SettingsAndDocumentImport { + IndexOperation::SettingsAndDocumentOperation { document_import_tasks: tasks, settings_tasks: other, .. @@ -174,12 +186,12 @@ impl Batch { impl IndexOperation { pub fn index_uid(&self) -> &str { match self { - IndexOperation::DocumentImport { index_uid, .. } + IndexOperation::DocumentOperation { index_uid, .. } | IndexOperation::DocumentDeletion { index_uid, .. } | IndexOperation::DocumentClear { index_uid, .. } | IndexOperation::Settings { index_uid, .. } | IndexOperation::DocumentClearAndSetting { index_uid, .. } - | IndexOperation::SettingsAndDocumentImport { index_uid, .. } => index_uid, + | IndexOperation::SettingsAndDocumentOperation { index_uid, .. } => index_uid, } } } @@ -206,17 +218,22 @@ impl IndexScheduler { }, must_create_index, })), - BatchKind::DocumentOperation { method, operation_ids: import_ids, .. } => { - let tasks = self.get_existing_tasks(rtxn, import_ids)?; - let primary_key = match &tasks[0].kind { - KindWithContent::DocumentAdditionOrUpdate { primary_key, .. } => { - primary_key.clone() - } - _ => unreachable!(), - }; + BatchKind::DocumentOperation { method, operation_ids, .. } => { + let tasks = self.get_existing_tasks(rtxn, operation_ids)?; + let primary_key = tasks + .iter() + .find_map(|task| match task.kind { + KindWithContent::DocumentAdditionOrUpdate { ref primary_key, .. } => { + // we want to stop on the first document addition + Some(primary_key.clone()) + } + KindWithContent::DocumentDeletion { .. } => None, + _ => unreachable!(), + }) + .flatten(); let mut documents_counts = Vec::new(); - let mut content_files = Vec::new(); + let mut operations = Vec::new(); for task in tasks.iter() { match task.kind { @@ -226,19 +243,23 @@ impl IndexScheduler { .. } => { documents_counts.push(documents_count); - content_files.push(content_file); + operations.push(DocumentOperation::Add(content_file)); + } + KindWithContent::DocumentDeletion { ref documents_ids, .. } => { + documents_counts.push(documents_ids.len() as u64); + operations.push(DocumentOperation::Delete(documents_ids.clone())); } _ => unreachable!(), } } Ok(Some(Batch::IndexOperation { - op: IndexOperation::DocumentImport { + op: IndexOperation::DocumentOperation { index_uid, primary_key, method, documents_counts, - content_files, + operations, tasks, }, must_create_index, @@ -327,7 +348,7 @@ impl IndexScheduler { method, allow_index_creation, primary_key, - operation_ids: import_ids, + operation_ids, } => { let settings = self.create_next_batch_index( rtxn, @@ -343,7 +364,7 @@ impl IndexScheduler { method, allow_index_creation, primary_key, - operation_ids: import_ids, + operation_ids, }, must_create_index, )?; @@ -352,10 +373,10 @@ impl IndexScheduler { ( Some(Batch::IndexOperation { op: - IndexOperation::DocumentImport { + IndexOperation::DocumentOperation { primary_key, documents_counts, - content_files, + operations, tasks: document_import_tasks, .. }, @@ -366,12 +387,12 @@ impl IndexScheduler { .. }), ) => Ok(Some(Batch::IndexOperation { - op: IndexOperation::SettingsAndDocumentImport { + op: IndexOperation::SettingsAndDocumentOperation { index_uid, primary_key, method, documents_counts, - content_files, + operations, document_import_tasks, settings, settings_tasks, @@ -987,12 +1008,12 @@ impl IndexScheduler { Ok(tasks) } - IndexOperation::DocumentImport { + IndexOperation::DocumentOperation { index_uid: _, primary_key, method, documents_counts, - content_files, + operations, mut tasks, } => { let mut primary_key_has_been_set = false; @@ -1037,26 +1058,68 @@ impl IndexScheduler { || must_stop_processing.get(), )?; - let mut results = Vec::new(); - for content_uuid in content_files.into_iter() { - let content_file = self.file_store.get_update(content_uuid)?; - let reader = DocumentsBatchReader::from_reader(content_file) - .map_err(milli::Error::from)?; - let (new_builder, user_result) = builder.add_documents(reader)?; - builder = new_builder; + for (operation, task) in operations.into_iter().zip(tasks.iter_mut()) { + match operation { + DocumentOperation::Add(content_uuid) => { + let content_file = self.file_store.get_update(content_uuid)?; + let reader = DocumentsBatchReader::from_reader(content_file) + .map_err(milli::Error::from)?; + let (new_builder, user_result) = builder.add_documents(reader)?; + builder = new_builder; - let user_result = match user_result { - Ok(count) => Ok(DocumentAdditionResult { - indexed_documents: count, - number_of_documents: count, // TODO: this is wrong, we should use the value stored in the Details. - }), - Err(e) => Err(milli::Error::from(e)), - }; + let Some(Details::DocumentAdditionOrUpdate { received_documents, .. }) = task.details + // In the case of a `documentAdditionOrUpdate` the details MUST be set + else { unreachable!(); }; - results.push(user_result); + match user_result { + Ok(count) => { + task.status = Status::Succeeded; + task.details = Some(Details::DocumentAdditionOrUpdate { + received_documents, + indexed_documents: Some(count), + }) + } + Err(e) => { + task.status = Status::Failed; + task.details = Some(Details::DocumentAdditionOrUpdate { + received_documents, + indexed_documents: Some(0), + }); + task.error = Some(milli::Error::from(e).into()); + } + } + } + DocumentOperation::Delete(document_ids) => { + let (new_builder, user_result) = + builder.remove_documents(document_ids)?; + builder = new_builder; + + let Some(Details::DocumentDeletion { provided_ids, .. }) = task.details + // In the case of a `documentAdditionOrUpdate` the details MUST be set + else { unreachable!(); }; + + match user_result { + Ok(count) => { + task.status = Status::Succeeded; + task.details = Some(Details::DocumentDeletion { + provided_ids, + deleted_documents: Some(count), + }); + } + Err(e) => { + task.status = Status::Failed; + task.details = Some(Details::DocumentDeletion { + provided_ids, + deleted_documents: Some(0), + }); + task.error = Some(milli::Error::from(e).into()); + } + } + } + } } - if results.iter().any(|res| res.is_ok()) { + if !tasks.iter().all(|res| res.error.is_some()) { let addition = builder.execute()?; info!("document addition done: {:?}", addition); } else if primary_key_has_been_set { @@ -1071,29 +1134,6 @@ impl IndexScheduler { )?; } - for (task, (ret, count)) in - tasks.iter_mut().zip(results.into_iter().zip(documents_counts)) - { - match ret { - Ok(DocumentAdditionResult { indexed_documents, number_of_documents }) => { - task.status = Status::Succeeded; - task.details = Some(Details::DocumentAdditionOrUpdate { - received_documents: number_of_documents, - indexed_documents: Some(indexed_documents), - }); - } - Err(error) => { - task.status = Status::Failed; - task.details = Some(Details::DocumentAdditionOrUpdate { - received_documents: count, - // if there was an error we indexed 0 documents. - indexed_documents: Some(0), - }); - task.error = Some(error.into()) - } - } - } - Ok(tasks) } IndexOperation::DocumentDeletion { index_uid: _, documents, mut tasks } => { @@ -1136,12 +1176,12 @@ impl IndexScheduler { Ok(tasks) } - IndexOperation::SettingsAndDocumentImport { + IndexOperation::SettingsAndDocumentOperation { index_uid, primary_key, method, documents_counts, - content_files, + operations, document_import_tasks, settings, settings_tasks, @@ -1159,12 +1199,12 @@ impl IndexScheduler { let mut import_tasks = self.apply_index_operation( index_wtxn, index, - IndexOperation::DocumentImport { + IndexOperation::DocumentOperation { index_uid, primary_key, method, documents_counts, - content_files, + operations, tasks: document_import_tasks, }, )?; diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 387dac2d0..f1b177cb2 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -1679,6 +1679,100 @@ mod tests { snapshot!(snapshot_index_scheduler(&index_scheduler), name: "both_task_succeeded"); } + #[test] + fn document_addition_and_document_deletion() { + let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); + + let content = r#"[ + { "id": 1, "doggo": "jean bob" }, + { "id": 2, "catto": "jorts" }, + { "id": 3, "doggo": "bork" } + ]"#; + + let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); + file.persist().unwrap(); + index_scheduler + .register(KindWithContent::DocumentAdditionOrUpdate { + index_uid: S("doggos"), + primary_key: Some(S("id")), + method: ReplaceDocuments, + content_file: uuid, + documents_count, + allow_index_creation: true, + }) + .unwrap(); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); + index_scheduler + .register(KindWithContent::DocumentDeletion { + index_uid: S("doggos"), + documents_ids: vec![S("1"), S("2")], + }) + .unwrap(); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task"); + + handle.advance_one_successful_batch(); // The addition AND deletion should've been batched together + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_processing_the_batch"); + + let index = index_scheduler.index("doggos").unwrap(); + let rtxn = index.read_txn().unwrap(); + let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let field_ids = field_ids_map.ids().collect::>(); + let documents = index + .all_documents(&rtxn) + .unwrap() + .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .collect::>(); + snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); + } + + #[test] + fn document_deletion_and_document_addition() { + let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); + index_scheduler + .register(KindWithContent::DocumentDeletion { + index_uid: S("doggos"), + documents_ids: vec![S("1"), S("2")], + }) + .unwrap(); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); + + let content = r#"[ + { "id": 1, "doggo": "jean bob" }, + { "id": 2, "catto": "jorts" }, + { "id": 3, "doggo": "bork" } + ]"#; + + let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); + let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); + file.persist().unwrap(); + index_scheduler + .register(KindWithContent::DocumentAdditionOrUpdate { + index_uid: S("doggos"), + primary_key: Some(S("id")), + method: ReplaceDocuments, + content_file: uuid, + documents_count, + allow_index_creation: true, + }) + .unwrap(); + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task"); + + handle.advance_one_successful_batch(); // The deletion AND addition should've been batched together + snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_processing_the_batch"); + + let index = index_scheduler.index("doggos").unwrap(); + let rtxn = index.read_txn().unwrap(); + let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let field_ids = field_ids_map.ids().collect::>(); + let documents = index + .all_documents(&rtxn) + .unwrap() + .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .collect::>(); + snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); + } + #[test] fn do_not_batch_task_of_different_indexes() { let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); diff --git a/index-scheduler/src/snapshots/lib.rs/document_addition_and_document_deletion/after_processing_the_batch.snap b/index-scheduler/src/snapshots/lib.rs/document_addition_and_document_deletion/after_processing_the_batch.snap new file mode 100644 index 000000000..f70496b81 --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/document_addition_and_document_deletion/after_processing_the_batch.snap @@ -0,0 +1,42 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: succeeded, details: { received_documents: 3, indexed_documents: Some(3) }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 3, allow_index_creation: true }} +1 {uid: 1, status: succeeded, details: { received_document_ids: 2, deleted_documents: Some(2) }, kind: DocumentDeletion { index_uid: "doggos", documents_ids: ["1", "2"] }} +---------------------------------------------------------------------- +### Status: +enqueued [] +succeeded [0,1,] +---------------------------------------------------------------------- +### Kind: +"documentAdditionOrUpdate" [0,] +"documentDeletion" [1,] +---------------------------------------------------------------------- +### Index Tasks: +doggos [0,1,] +---------------------------------------------------------------------- +### Index Mapper: +["doggos"] +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +---------------------------------------------------------------------- +### Started At: +[timestamp] [0,1,] +---------------------------------------------------------------------- +### Finished At: +[timestamp] [0,1,] +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- + diff --git a/index-scheduler/src/snapshots/lib.rs/document_addition_and_document_deletion/documents.snap b/index-scheduler/src/snapshots/lib.rs/document_addition_and_document_deletion/documents.snap new file mode 100644 index 000000000..2b56b71d1 --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/document_addition_and_document_deletion/documents.snap @@ -0,0 +1,9 @@ +--- +source: index-scheduler/src/lib.rs +--- +[ + { + "id": 3, + "doggo": "bork" + } +] diff --git a/index-scheduler/src/snapshots/lib.rs/document_addition_and_document_deletion/registered_the_first_task.snap b/index-scheduler/src/snapshots/lib.rs/document_addition_and_document_deletion/registered_the_first_task.snap new file mode 100644 index 000000000..35dc0b41a --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/document_addition_and_document_deletion/registered_the_first_task.snap @@ -0,0 +1,37 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: enqueued, details: { received_documents: 3, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 3, allow_index_creation: true }} +---------------------------------------------------------------------- +### Status: +enqueued [0,] +---------------------------------------------------------------------- +### Kind: +"documentAdditionOrUpdate" [0,] +---------------------------------------------------------------------- +### Index Tasks: +doggos [0,] +---------------------------------------------------------------------- +### Index Mapper: +[] +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +---------------------------------------------------------------------- +### Started At: +---------------------------------------------------------------------- +### Finished At: +---------------------------------------------------------------------- +### File Store: +00000000-0000-0000-0000-000000000000 + +---------------------------------------------------------------------- + diff --git a/index-scheduler/src/snapshots/lib.rs/document_addition_and_document_deletion/registered_the_second_task.snap b/index-scheduler/src/snapshots/lib.rs/document_addition_and_document_deletion/registered_the_second_task.snap new file mode 100644 index 000000000..bd65a6d99 --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/document_addition_and_document_deletion/registered_the_second_task.snap @@ -0,0 +1,40 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: enqueued, details: { received_documents: 3, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 3, allow_index_creation: true }} +1 {uid: 1, status: enqueued, details: { received_document_ids: 2, deleted_documents: None }, kind: DocumentDeletion { index_uid: "doggos", documents_ids: ["1", "2"] }} +---------------------------------------------------------------------- +### Status: +enqueued [0,1,] +---------------------------------------------------------------------- +### Kind: +"documentAdditionOrUpdate" [0,] +"documentDeletion" [1,] +---------------------------------------------------------------------- +### Index Tasks: +doggos [0,1,] +---------------------------------------------------------------------- +### Index Mapper: +[] +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +---------------------------------------------------------------------- +### Started At: +---------------------------------------------------------------------- +### Finished At: +---------------------------------------------------------------------- +### File Store: +00000000-0000-0000-0000-000000000000 + +---------------------------------------------------------------------- + diff --git a/index-scheduler/src/snapshots/lib.rs/document_deletion_and_document_addition/registered_the_first_task.snap b/index-scheduler/src/snapshots/lib.rs/document_deletion_and_document_addition/registered_the_first_task.snap new file mode 100644 index 000000000..9356e6dba --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/document_deletion_and_document_addition/registered_the_first_task.snap @@ -0,0 +1,36 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: enqueued, details: { received_document_ids: 2, deleted_documents: None }, kind: DocumentDeletion { index_uid: "doggos", documents_ids: ["1", "2"] }} +---------------------------------------------------------------------- +### Status: +enqueued [0,] +---------------------------------------------------------------------- +### Kind: +"documentDeletion" [0,] +---------------------------------------------------------------------- +### Index Tasks: +doggos [0,] +---------------------------------------------------------------------- +### Index Mapper: +[] +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +---------------------------------------------------------------------- +### Started At: +---------------------------------------------------------------------- +### Finished At: +---------------------------------------------------------------------- +### File Store: + +---------------------------------------------------------------------- + diff --git a/index-scheduler/src/snapshots/lib.rs/document_deletion_and_document_addition/registered_the_second_task.snap b/index-scheduler/src/snapshots/lib.rs/document_deletion_and_document_addition/registered_the_second_task.snap new file mode 100644 index 000000000..89e341184 --- /dev/null +++ b/index-scheduler/src/snapshots/lib.rs/document_deletion_and_document_addition/registered_the_second_task.snap @@ -0,0 +1,40 @@ +--- +source: index-scheduler/src/lib.rs +--- +### Autobatching Enabled = true +### Processing Tasks: +[] +---------------------------------------------------------------------- +### All Tasks: +0 {uid: 0, status: enqueued, details: { received_document_ids: 2, deleted_documents: None }, kind: DocumentDeletion { index_uid: "doggos", documents_ids: ["1", "2"] }} +1 {uid: 1, status: enqueued, details: { received_documents: 3, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 3, allow_index_creation: true }} +---------------------------------------------------------------------- +### Status: +enqueued [0,1,] +---------------------------------------------------------------------- +### Kind: +"documentAdditionOrUpdate" [1,] +"documentDeletion" [0,] +---------------------------------------------------------------------- +### Index Tasks: +doggos [0,1,] +---------------------------------------------------------------------- +### Index Mapper: +[] +---------------------------------------------------------------------- +### Canceled By: + +---------------------------------------------------------------------- +### Enqueued At: +[timestamp] [0,] +[timestamp] [1,] +---------------------------------------------------------------------- +### Started At: +---------------------------------------------------------------------- +### Finished At: +---------------------------------------------------------------------- +### File Store: +00000000-0000-0000-0000-000000000000 + +---------------------------------------------------------------------- +