mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-25 13:06:27 +00:00 
			
		
		
		
	Merge #3670
3670: Fix addition deletion bug r=irevoire a=irevoire The first commit of this PR is a revert of https://github.com/meilisearch/meilisearch/pull/3667. It re-enable the auto-batching of addition and deletion of tasks. No new changes have been introduced outside of `milli`. So all the changes you see on the autobatcher have actually already been reviewed. It fixes https://github.com/meilisearch/meilisearch/issues/3440. ### What was happening? The issue was that the `external_documents_ids` generated in the `transform` were used in a very strange way that wasn’t compatible with the deletion of documents. Instead of doing a clear merge between the external document IDs of the DB and the one returned by the transform + writing it on disk, we were doing some weird tricks with the soft-deleted to avoid writing the fst on disk as much as possible. The new algorithm may be a bit slower but is way more straightforward and doesn’t change depending on if the soft deletion was used or not. Here is a list of the changes introduced: 1. We now do a clear distinction between the `new_external_documents_ids` coming from the transform and only held on RAM and the `external_documents_ids` coming from the DB. 2. The `new_external_documents_ids` (coming out of the transform) are now represented as an `fst`. We don't need to struggle with the hard, soft distinction + the soft_deleted => That's easier to understand 3. When indexing documents, we merge the `external_documents_ids` coming from the DB and the `new_external_documents_ids` coming from the transform. ### Other things introduced in this PR Since we constantly have to write small, very specialized fuzzers for this kind of bug, we decided to push the one used to reproduce this bug. It's not perfect, but it's easy to improve in the future. It'll also run for as long as possible on every merge on the main branch. Co-authored-by: Tamo <tamo@meilisearch.com> Co-authored-by: Loïc Lecrenier <loic.lecrenier@icloud.com>
This commit is contained in:
		
							
								
								
									
										24
									
								
								.github/workflows/fuzzer-indexing.yml
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										24
									
								
								.github/workflows/fuzzer-indexing.yml
									
									
									
									
										vendored
									
									
										Normal file
									
								
							| @@ -0,0 +1,24 @@ | ||||
| name: Run the indexing fuzzer | ||||
|  | ||||
| on: | ||||
|   push: | ||||
|     branches: | ||||
|       - main | ||||
|  | ||||
| jobs: | ||||
|   fuzz: | ||||
|     name: Setup the action | ||||
|     runs-on: ubuntu-latest | ||||
|     timeout-minutes: 4320 # 72h | ||||
|     steps: | ||||
|       - uses: actions/checkout@v3 | ||||
|       - uses: actions-rs/toolchain@v1 | ||||
|         with: | ||||
|           profile: minimal | ||||
|           toolchain: stable | ||||
|           override: true | ||||
|  | ||||
|       # Run benchmarks | ||||
|       - name: Run the fuzzer | ||||
|         run: | | ||||
|           cargo run --release --bin fuzz-indexing | ||||
							
								
								
									
										726
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										726
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							| @@ -13,7 +13,8 @@ members = [ | ||||
|     "filter-parser", | ||||
|     "flatten-serde-json", | ||||
|     "json-depth-checker", | ||||
|     "benchmarks" | ||||
|     "benchmarks", | ||||
|     "fuzzers", | ||||
| ] | ||||
|  | ||||
| [workspace.package] | ||||
|   | ||||
							
								
								
									
										20
									
								
								fuzzers/Cargo.toml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										20
									
								
								fuzzers/Cargo.toml
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,20 @@ | ||||
| [package] | ||||
| name = "fuzzers" | ||||
| publish = false | ||||
|  | ||||
| version.workspace = true | ||||
| authors.workspace = true | ||||
| description.workspace = true | ||||
| homepage.workspace = true | ||||
| readme.workspace = true | ||||
| edition.workspace = true | ||||
| license.workspace = true | ||||
|  | ||||
| [dependencies] | ||||
| arbitrary = { version = "1.3.0", features = ["derive"] } | ||||
| clap = { version = "4.3.0", features = ["derive"] } | ||||
| fastrand = "1.9.0" | ||||
| milli = { path = "../milli" } | ||||
| serde = { version = "1.0.160", features = ["derive"] } | ||||
| serde_json = { version = "1.0.95", features = ["preserve_order"] } | ||||
| tempfile = "3.5.0" | ||||
							
								
								
									
										3
									
								
								fuzzers/README.md
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										3
									
								
								fuzzers/README.md
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,3 @@ | ||||
| # Fuzzers | ||||
|  | ||||
| The purpose of this crate is to contains all the handmade "fuzzer" we may need. | ||||
							
								
								
									
										152
									
								
								fuzzers/src/bin/fuzz-indexing.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										152
									
								
								fuzzers/src/bin/fuzz-indexing.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,152 @@ | ||||
| use std::num::NonZeroUsize; | ||||
| use std::path::PathBuf; | ||||
| use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; | ||||
| use std::time::Duration; | ||||
|  | ||||
| use arbitrary::{Arbitrary, Unstructured}; | ||||
| use clap::Parser; | ||||
| use fuzzers::Operation; | ||||
| use milli::heed::EnvOpenOptions; | ||||
| use milli::update::{IndexDocuments, IndexDocumentsConfig, IndexerConfig}; | ||||
| use milli::Index; | ||||
| use tempfile::TempDir; | ||||
|  | ||||
| #[derive(Debug, Arbitrary)] | ||||
| struct Batch([Operation; 5]); | ||||
|  | ||||
| #[derive(Debug, Clone, Parser)] | ||||
| struct Opt { | ||||
|     /// The number of fuzzer to run in parallel. | ||||
|     #[clap(long)] | ||||
|     par: Option<NonZeroUsize>, | ||||
|     // We need to put a lot of newlines in the following documentation or else everything gets collapsed on one line | ||||
|     /// The path in which the databases will be created. | ||||
|     /// Using a ramdisk is recommended. | ||||
|     /// | ||||
|     /// Linux: | ||||
|     /// | ||||
|     /// sudo mount -t tmpfs -o size=2g tmpfs ramdisk # to create it | ||||
|     /// | ||||
|     /// sudo umount ramdisk # to remove it | ||||
|     /// | ||||
|     /// MacOS: | ||||
|     /// | ||||
|     /// diskutil erasevolume HFS+ 'RAM Disk' `hdiutil attach -nobrowse -nomount ram://4194304 # create it | ||||
|     /// | ||||
|     /// hdiutil detach /dev/:the_disk | ||||
|     #[clap(long)] | ||||
|     path: Option<PathBuf>, | ||||
| } | ||||
|  | ||||
| fn main() { | ||||
|     let opt = Opt::parse(); | ||||
|     let progression: &'static AtomicUsize = Box::leak(Box::new(AtomicUsize::new(0))); | ||||
|     let stop: &'static AtomicBool = Box::leak(Box::new(AtomicBool::new(false))); | ||||
|  | ||||
|     let par = opt.par.unwrap_or_else(|| std::thread::available_parallelism().unwrap()).get(); | ||||
|     let mut handles = Vec::with_capacity(par); | ||||
|  | ||||
|     for _ in 0..par { | ||||
|         let opt = opt.clone(); | ||||
|  | ||||
|         let handle = std::thread::spawn(move || { | ||||
|             let mut options = EnvOpenOptions::new(); | ||||
|             options.map_size(1024 * 1024 * 1024 * 1024); | ||||
|             let tempdir = match opt.path { | ||||
|                 Some(path) => TempDir::new_in(path).unwrap(), | ||||
|                 None => TempDir::new().unwrap(), | ||||
|             }; | ||||
|             let index = Index::new(options, tempdir.path()).unwrap(); | ||||
|             let indexer_config = IndexerConfig::default(); | ||||
|             let index_documents_config = IndexDocumentsConfig::default(); | ||||
|  | ||||
|             std::thread::scope(|s| { | ||||
|                 loop { | ||||
|                     if stop.load(Ordering::Relaxed) { | ||||
|                         return; | ||||
|                     } | ||||
|                     let v: Vec<u8> = | ||||
|                         std::iter::repeat_with(|| fastrand::u8(..)).take(1000).collect(); | ||||
|  | ||||
|                     let mut data = Unstructured::new(&v); | ||||
|                     let batches = <[Batch; 5]>::arbitrary(&mut data).unwrap(); | ||||
|                     // will be used to display the error once a thread crashes | ||||
|                     let dbg_input = format!("{:#?}", batches); | ||||
|  | ||||
|                     let handle = s.spawn(|| { | ||||
|                         let mut wtxn = index.write_txn().unwrap(); | ||||
|  | ||||
|                         for batch in batches { | ||||
|                             let mut builder = IndexDocuments::new( | ||||
|                                 &mut wtxn, | ||||
|                                 &index, | ||||
|                                 &indexer_config, | ||||
|                                 index_documents_config.clone(), | ||||
|                                 |_| (), | ||||
|                                 || false, | ||||
|                             ) | ||||
|                             .unwrap(); | ||||
|  | ||||
|                             for op in batch.0 { | ||||
|                                 match op { | ||||
|                                     Operation::AddDoc(doc) => { | ||||
|                                         let documents = | ||||
|                                             milli::documents::objects_from_json_value(doc.to_d()); | ||||
|                                         let documents = | ||||
|                                             milli::documents::documents_batch_reader_from_objects( | ||||
|                                                 documents, | ||||
|                                             ); | ||||
|                                         let (b, _added) = builder.add_documents(documents).unwrap(); | ||||
|                                         builder = b; | ||||
|                                     } | ||||
|                                     Operation::DeleteDoc(id) => { | ||||
|                                         let (b, _removed) = | ||||
|                                             builder.remove_documents(vec![id.to_s()]).unwrap(); | ||||
|                                         builder = b; | ||||
|                                     } | ||||
|                                 } | ||||
|                             } | ||||
|                             builder.execute().unwrap(); | ||||
|  | ||||
|                             // after executing a batch we check if the database is corrupted | ||||
|                             let res = index.search(&wtxn).execute().unwrap(); | ||||
|                             index.documents(&wtxn, res.documents_ids).unwrap(); | ||||
|                             progression.fetch_add(1, Ordering::Relaxed); | ||||
|                         } | ||||
|                         wtxn.abort().unwrap(); | ||||
|                     }); | ||||
|                     if let err @ Err(_) = handle.join() { | ||||
|                         stop.store(true, Ordering::Relaxed); | ||||
|                         err.expect(&dbg_input); | ||||
|                     } | ||||
|                 } | ||||
|             }); | ||||
|         }); | ||||
|         handles.push(handle); | ||||
|     } | ||||
|  | ||||
|     std::thread::spawn(|| { | ||||
|         let mut last_value = 0; | ||||
|         let start = std::time::Instant::now(); | ||||
|         loop { | ||||
|             let total = progression.load(Ordering::Relaxed); | ||||
|             let elapsed = start.elapsed().as_secs(); | ||||
|             if elapsed > 3600 { | ||||
|                 // after 1 hour, stop the fuzzer, success | ||||
|                 std::process::exit(0); | ||||
|             } | ||||
|             println!( | ||||
|                 "Has been running for {:?} seconds. Tested {} new values for a total of {}.", | ||||
|                 elapsed, | ||||
|                 total - last_value, | ||||
|                 total | ||||
|             ); | ||||
|             last_value = total; | ||||
|             std::thread::sleep(Duration::from_secs(1)); | ||||
|         } | ||||
|     }); | ||||
|  | ||||
|     for handle in handles { | ||||
|         handle.join().unwrap(); | ||||
|     } | ||||
| } | ||||
							
								
								
									
										46
									
								
								fuzzers/src/lib.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										46
									
								
								fuzzers/src/lib.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,46 @@ | ||||
| use arbitrary::Arbitrary; | ||||
| use serde_json::{json, Value}; | ||||
|  | ||||
| #[derive(Debug, Arbitrary)] | ||||
| pub enum Document { | ||||
|     One, | ||||
|     Two, | ||||
|     Three, | ||||
|     Four, | ||||
|     Five, | ||||
|     Six, | ||||
| } | ||||
|  | ||||
| impl Document { | ||||
|     pub fn to_d(&self) -> Value { | ||||
|         match self { | ||||
|             Document::One => json!({ "id": 0, "doggo": "bernese" }), | ||||
|             Document::Two => json!({ "id": 0, "doggo": "golden" }), | ||||
|             Document::Three => json!({ "id": 0, "catto": "jorts" }), | ||||
|             Document::Four => json!({ "id": 1, "doggo": "bernese" }), | ||||
|             Document::Five => json!({ "id": 1, "doggo": "golden" }), | ||||
|             Document::Six => json!({ "id": 1, "catto": "jorts" }), | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[derive(Debug, Arbitrary)] | ||||
| pub enum DocId { | ||||
|     Zero, | ||||
|     One, | ||||
| } | ||||
|  | ||||
| impl DocId { | ||||
|     pub fn to_s(&self) -> String { | ||||
|         match self { | ||||
|             DocId::Zero => "0".to_string(), | ||||
|             DocId::One => "1".to_string(), | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[derive(Debug, Arbitrary)] | ||||
| pub enum Operation { | ||||
|     AddDoc(Document), | ||||
|     DeleteDoc(DocId), | ||||
| } | ||||
| @@ -321,9 +321,18 @@ impl BatchKind { | ||||
|                 }) | ||||
|             } | ||||
|             ( | ||||
|                 this @ BatchKind::DocumentOperation { .. }, | ||||
|                 BatchKind::DocumentOperation { method, allow_index_creation, primary_key, mut operation_ids }, | ||||
|                 K::DocumentDeletion, | ||||
|             ) => Break(this), | ||||
|             ) => { | ||||
|                 operation_ids.push(id); | ||||
|  | ||||
|                 Continue(BatchKind::DocumentOperation { | ||||
|                     method, | ||||
|                     allow_index_creation, | ||||
|                     primary_key, | ||||
|                     operation_ids, | ||||
|                 }) | ||||
|             } | ||||
|             // but we can't autobatch documents if it's not the same kind | ||||
|             // this match branch MUST be AFTER the previous one | ||||
|             ( | ||||
| @@ -346,7 +355,35 @@ impl BatchKind { | ||||
|                 deletion_ids.push(id); | ||||
|                 Continue(BatchKind::DocumentClear { ids: deletion_ids }) | ||||
|             } | ||||
|             // we can't autobatch a deletion and an import | ||||
|             // we can autobatch the deletion and import if the index already exists | ||||
|             ( | ||||
|                 BatchKind::DocumentDeletion { mut deletion_ids }, | ||||
|                 K::DocumentImport { method, allow_index_creation, primary_key } | ||||
|             ) if index_already_exists => { | ||||
|                 deletion_ids.push(id); | ||||
|  | ||||
|                 Continue(BatchKind::DocumentOperation { | ||||
|                     method, | ||||
|                     allow_index_creation, | ||||
|                     primary_key, | ||||
|                     operation_ids: deletion_ids, | ||||
|                 }) | ||||
|             } | ||||
|             // we can autobatch the deletion and import if both can't create an index | ||||
|             ( | ||||
|                 BatchKind::DocumentDeletion { mut deletion_ids }, | ||||
|                 K::DocumentImport { method, allow_index_creation, primary_key } | ||||
|             ) if !allow_index_creation => { | ||||
|                 deletion_ids.push(id); | ||||
|  | ||||
|                 Continue(BatchKind::DocumentOperation { | ||||
|                     method, | ||||
|                     allow_index_creation, | ||||
|                     primary_key, | ||||
|                     operation_ids: deletion_ids, | ||||
|                 }) | ||||
|             } | ||||
|             // we can't autobatch a deletion and an import if the index does not exists but would be created by an addition | ||||
|             ( | ||||
|                 this @ BatchKind::DocumentDeletion { .. }, | ||||
|                 K::DocumentImport { .. } | ||||
| @@ -648,36 +685,36 @@ mod tests { | ||||
|         debug_snapshot!(autobatch_from(false,None,  [settings(false)]), @"Some((Settings { allow_index_creation: false, settings_ids: [0] }, false))"); | ||||
|         debug_snapshot!(autobatch_from(false,None,  [settings(false), settings(false), settings(false)]), @"Some((Settings { allow_index_creation: false, settings_ids: [0, 1, 2] }, false))"); | ||||
|  | ||||
|         // We can't autobatch document addition with document deletion | ||||
|         debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), doc_del()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); | ||||
|         debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), doc_del()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); | ||||
|         debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, None), doc_del()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))"); | ||||
|         debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, false, None), doc_del()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))"); | ||||
|         debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0] }, true))"###); | ||||
|         debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0] }, true))"###); | ||||
|         debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0] }, false))"###); | ||||
|         debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, false, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0] }, false))"###); | ||||
|         debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, true, None), doc_del()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); | ||||
|         debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, true, None), doc_del()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))"); | ||||
|         debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, false, None), doc_del()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))"); | ||||
|         debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, false, None), doc_del()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))"); | ||||
|         debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, true, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0] }, true))"###); | ||||
|         debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, true, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0] }, true))"###); | ||||
|         debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, false, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0] }, false))"###); | ||||
|         debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, false, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0] }, false))"###); | ||||
|         // we also can't do the only way around | ||||
|         debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); | ||||
|         debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); | ||||
|         debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(ReplaceDocuments, false, None)]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); | ||||
|         debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); | ||||
|         debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(ReplaceDocuments, true, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); | ||||
|         debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(UpdateDocuments, true, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); | ||||
|         debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(ReplaceDocuments, false, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); | ||||
|         debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(UpdateDocuments, false, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); | ||||
|         debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, false, None)]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); | ||||
|         debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); | ||||
|         debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, false, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); | ||||
|         debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, false, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))"); | ||||
|         // We can autobatch document addition with document deletion | ||||
|         debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), doc_del()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1] }, true))"); | ||||
|         debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), doc_del()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1] }, true))"); | ||||
|         debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, None), doc_del()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); | ||||
|         debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, false, None), doc_del()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); | ||||
|         debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0, 1] }, true))"###); | ||||
|         debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0, 1] }, true))"###); | ||||
|         debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); | ||||
|         debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, false, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); | ||||
|         debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, true, None), doc_del()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1] }, true))"); | ||||
|         debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, true, None), doc_del()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1] }, true))"); | ||||
|         debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, false, None), doc_del()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); | ||||
|         debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, false, None), doc_del()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); | ||||
|         debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, true, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0, 1] }, true))"###); | ||||
|         debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, true, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0, 1] }, true))"###); | ||||
|         debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, false, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); | ||||
|         debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, false, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); | ||||
|         // And the other way around | ||||
|         debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1] }, false))"); | ||||
|         debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1] }, false))"); | ||||
|         debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(ReplaceDocuments, false, None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); | ||||
|         debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); | ||||
|         debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(ReplaceDocuments, true, Some("catto"))]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); | ||||
|         debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(UpdateDocuments, true, Some("catto"))]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); | ||||
|         debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(ReplaceDocuments, false, Some("catto"))]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); | ||||
|         debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(UpdateDocuments, false, Some("catto"))]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); | ||||
|         debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, false, None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); | ||||
|         debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); | ||||
|         debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, false, Some("catto"))]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); | ||||
|         debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, false, Some("catto"))]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|   | ||||
| @@ -2075,6 +2075,105 @@ mod tests { | ||||
|         snapshot!(snapshot_index_scheduler(&index_scheduler), name: "both_task_succeeded"); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn document_addition_and_document_deletion() { | ||||
|         let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); | ||||
|  | ||||
|         let content = r#"[ | ||||
|             { "id": 1, "doggo": "jean bob" }, | ||||
|             { "id": 2, "catto": "jorts" }, | ||||
|             { "id": 3, "doggo": "bork" } | ||||
|         ]"#; | ||||
|  | ||||
|         let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); | ||||
|         let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); | ||||
|         file.persist().unwrap(); | ||||
|         index_scheduler | ||||
|             .register(KindWithContent::DocumentAdditionOrUpdate { | ||||
|                 index_uid: S("doggos"), | ||||
|                 primary_key: Some(S("id")), | ||||
|                 method: ReplaceDocuments, | ||||
|                 content_file: uuid, | ||||
|                 documents_count, | ||||
|                 allow_index_creation: true, | ||||
|             }) | ||||
|             .unwrap(); | ||||
|         snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); | ||||
|         index_scheduler | ||||
|             .register(KindWithContent::DocumentDeletion { | ||||
|                 index_uid: S("doggos"), | ||||
|                 documents_ids: vec![S("1"), S("2")], | ||||
|             }) | ||||
|             .unwrap(); | ||||
|         snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task"); | ||||
|  | ||||
|         handle.advance_one_successful_batch(); // The addition AND deletion should've been batched together | ||||
|         snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_processing_the_batch"); | ||||
|  | ||||
|         let index = index_scheduler.index("doggos").unwrap(); | ||||
|         let rtxn = index.read_txn().unwrap(); | ||||
|         let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); | ||||
|         let field_ids = field_ids_map.ids().collect::<Vec<_>>(); | ||||
|         let documents = index | ||||
|             .all_documents(&rtxn) | ||||
|             .unwrap() | ||||
|             .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) | ||||
|             .collect::<Vec<_>>(); | ||||
|         snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn document_deletion_and_document_addition() { | ||||
|         let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); | ||||
|         index_scheduler | ||||
|             .register(KindWithContent::DocumentDeletion { | ||||
|                 index_uid: S("doggos"), | ||||
|                 documents_ids: vec![S("1"), S("2")], | ||||
|             }) | ||||
|             .unwrap(); | ||||
|         snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); | ||||
|  | ||||
|         let content = r#"[ | ||||
|             { "id": 1, "doggo": "jean bob" }, | ||||
|             { "id": 2, "catto": "jorts" }, | ||||
|             { "id": 3, "doggo": "bork" } | ||||
|         ]"#; | ||||
|  | ||||
|         let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); | ||||
|         let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); | ||||
|         file.persist().unwrap(); | ||||
|         index_scheduler | ||||
|             .register(KindWithContent::DocumentAdditionOrUpdate { | ||||
|                 index_uid: S("doggos"), | ||||
|                 primary_key: Some(S("id")), | ||||
|                 method: ReplaceDocuments, | ||||
|                 content_file: uuid, | ||||
|                 documents_count, | ||||
|                 allow_index_creation: true, | ||||
|             }) | ||||
|             .unwrap(); | ||||
|         snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task"); | ||||
|  | ||||
|         // The deletion should have failed because it can't create an index | ||||
|         handle.advance_one_failed_batch(); | ||||
|         snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_failing_the_deletion"); | ||||
|  | ||||
|         // The addition should works | ||||
|         handle.advance_one_successful_batch(); | ||||
|         snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_last_successful_addition"); | ||||
|  | ||||
|         let index = index_scheduler.index("doggos").unwrap(); | ||||
|         let rtxn = index.read_txn().unwrap(); | ||||
|         let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); | ||||
|         let field_ids = field_ids_map.ids().collect::<Vec<_>>(); | ||||
|         let documents = index | ||||
|             .all_documents(&rtxn) | ||||
|             .unwrap() | ||||
|             .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) | ||||
|             .collect::<Vec<_>>(); | ||||
|         snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn do_not_batch_task_of_different_indexes() { | ||||
|         let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); | ||||
|   | ||||
| @@ -0,0 +1,43 @@ | ||||
| --- | ||||
| source: index-scheduler/src/lib.rs | ||||
| --- | ||||
| ### Autobatching Enabled = true | ||||
| ### Processing Tasks: | ||||
| [] | ||||
| ---------------------------------------------------------------------- | ||||
| ### All Tasks: | ||||
| 0 {uid: 0, status: succeeded, details: { received_documents: 3, indexed_documents: Some(3) }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 3, allow_index_creation: true }} | ||||
| 1 {uid: 1, status: succeeded, details: { received_document_ids: 2, deleted_documents: Some(2) }, kind: DocumentDeletion { index_uid: "doggos", documents_ids: ["1", "2"] }} | ||||
| ---------------------------------------------------------------------- | ||||
| ### Status: | ||||
| enqueued [] | ||||
| succeeded [0,1,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Kind: | ||||
| "documentAdditionOrUpdate" [0,] | ||||
| "documentDeletion" [1,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Index Tasks: | ||||
| doggos [0,1,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Index Mapper: | ||||
| doggos: { number_of_documents: 1, field_distribution: {"doggo": 1, "id": 1} } | ||||
|  | ||||
| ---------------------------------------------------------------------- | ||||
| ### Canceled By: | ||||
|  | ||||
| ---------------------------------------------------------------------- | ||||
| ### Enqueued At: | ||||
| [timestamp] [0,] | ||||
| [timestamp] [1,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Started At: | ||||
| [timestamp] [0,1,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Finished At: | ||||
| [timestamp] [0,1,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### File Store: | ||||
|  | ||||
| ---------------------------------------------------------------------- | ||||
|  | ||||
| @@ -0,0 +1,9 @@ | ||||
| --- | ||||
| source: index-scheduler/src/lib.rs | ||||
| --- | ||||
| [ | ||||
|   { | ||||
|     "id": 3, | ||||
|     "doggo": "bork" | ||||
|   } | ||||
| ] | ||||
| @@ -0,0 +1,37 @@ | ||||
| --- | ||||
| source: index-scheduler/src/lib.rs | ||||
| --- | ||||
| ### Autobatching Enabled = true | ||||
| ### Processing Tasks: | ||||
| [] | ||||
| ---------------------------------------------------------------------- | ||||
| ### All Tasks: | ||||
| 0 {uid: 0, status: enqueued, details: { received_documents: 3, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 3, allow_index_creation: true }} | ||||
| ---------------------------------------------------------------------- | ||||
| ### Status: | ||||
| enqueued [0,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Kind: | ||||
| "documentAdditionOrUpdate" [0,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Index Tasks: | ||||
| doggos [0,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Index Mapper: | ||||
|  | ||||
| ---------------------------------------------------------------------- | ||||
| ### Canceled By: | ||||
|  | ||||
| ---------------------------------------------------------------------- | ||||
| ### Enqueued At: | ||||
| [timestamp] [0,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Started At: | ||||
| ---------------------------------------------------------------------- | ||||
| ### Finished At: | ||||
| ---------------------------------------------------------------------- | ||||
| ### File Store: | ||||
| 00000000-0000-0000-0000-000000000000 | ||||
|  | ||||
| ---------------------------------------------------------------------- | ||||
|  | ||||
| @@ -0,0 +1,40 @@ | ||||
| --- | ||||
| source: index-scheduler/src/lib.rs | ||||
| --- | ||||
| ### Autobatching Enabled = true | ||||
| ### Processing Tasks: | ||||
| [] | ||||
| ---------------------------------------------------------------------- | ||||
| ### All Tasks: | ||||
| 0 {uid: 0, status: enqueued, details: { received_documents: 3, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 3, allow_index_creation: true }} | ||||
| 1 {uid: 1, status: enqueued, details: { received_document_ids: 2, deleted_documents: None }, kind: DocumentDeletion { index_uid: "doggos", documents_ids: ["1", "2"] }} | ||||
| ---------------------------------------------------------------------- | ||||
| ### Status: | ||||
| enqueued [0,1,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Kind: | ||||
| "documentAdditionOrUpdate" [0,] | ||||
| "documentDeletion" [1,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Index Tasks: | ||||
| doggos [0,1,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Index Mapper: | ||||
|  | ||||
| ---------------------------------------------------------------------- | ||||
| ### Canceled By: | ||||
|  | ||||
| ---------------------------------------------------------------------- | ||||
| ### Enqueued At: | ||||
| [timestamp] [0,] | ||||
| [timestamp] [1,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Started At: | ||||
| ---------------------------------------------------------------------- | ||||
| ### Finished At: | ||||
| ---------------------------------------------------------------------- | ||||
| ### File Store: | ||||
| 00000000-0000-0000-0000-000000000000 | ||||
|  | ||||
| ---------------------------------------------------------------------- | ||||
|  | ||||
| @@ -0,0 +1,43 @@ | ||||
| --- | ||||
| source: index-scheduler/src/lib.rs | ||||
| --- | ||||
| ### Autobatching Enabled = true | ||||
| ### Processing Tasks: | ||||
| [] | ||||
| ---------------------------------------------------------------------- | ||||
| ### All Tasks: | ||||
| 0 {uid: 0, status: failed, error: ResponseError { code: 200, message: "Index `doggos` not found.", error_code: "index_not_found", error_type: "invalid_request", error_link: "https://docs.meilisearch.com/errors#index_not_found" }, details: { received_document_ids: 2, deleted_documents: Some(0) }, kind: DocumentDeletion { index_uid: "doggos", documents_ids: ["1", "2"] }} | ||||
| 1 {uid: 1, status: enqueued, details: { received_documents: 3, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 3, allow_index_creation: true }} | ||||
| ---------------------------------------------------------------------- | ||||
| ### Status: | ||||
| enqueued [1,] | ||||
| failed [0,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Kind: | ||||
| "documentAdditionOrUpdate" [1,] | ||||
| "documentDeletion" [0,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Index Tasks: | ||||
| doggos [0,1,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Index Mapper: | ||||
|  | ||||
| ---------------------------------------------------------------------- | ||||
| ### Canceled By: | ||||
|  | ||||
| ---------------------------------------------------------------------- | ||||
| ### Enqueued At: | ||||
| [timestamp] [0,] | ||||
| [timestamp] [1,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Started At: | ||||
| [timestamp] [0,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Finished At: | ||||
| [timestamp] [0,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### File Store: | ||||
| 00000000-0000-0000-0000-000000000000 | ||||
|  | ||||
| ---------------------------------------------------------------------- | ||||
|  | ||||
| @@ -0,0 +1,46 @@ | ||||
| --- | ||||
| source: index-scheduler/src/lib.rs | ||||
| --- | ||||
| ### Autobatching Enabled = true | ||||
| ### Processing Tasks: | ||||
| [] | ||||
| ---------------------------------------------------------------------- | ||||
| ### All Tasks: | ||||
| 0 {uid: 0, status: failed, error: ResponseError { code: 200, message: "Index `doggos` not found.", error_code: "index_not_found", error_type: "invalid_request", error_link: "https://docs.meilisearch.com/errors#index_not_found" }, details: { received_document_ids: 2, deleted_documents: Some(0) }, kind: DocumentDeletion { index_uid: "doggos", documents_ids: ["1", "2"] }} | ||||
| 1 {uid: 1, status: succeeded, details: { received_documents: 3, indexed_documents: Some(3) }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 3, allow_index_creation: true }} | ||||
| ---------------------------------------------------------------------- | ||||
| ### Status: | ||||
| enqueued [] | ||||
| succeeded [1,] | ||||
| failed [0,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Kind: | ||||
| "documentAdditionOrUpdate" [1,] | ||||
| "documentDeletion" [0,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Index Tasks: | ||||
| doggos [0,1,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Index Mapper: | ||||
| doggos: { number_of_documents: 3, field_distribution: {"catto": 1, "doggo": 2, "id": 3} } | ||||
|  | ||||
| ---------------------------------------------------------------------- | ||||
| ### Canceled By: | ||||
|  | ||||
| ---------------------------------------------------------------------- | ||||
| ### Enqueued At: | ||||
| [timestamp] [0,] | ||||
| [timestamp] [1,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Started At: | ||||
| [timestamp] [0,] | ||||
| [timestamp] [1,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Finished At: | ||||
| [timestamp] [0,] | ||||
| [timestamp] [1,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### File Store: | ||||
|  | ||||
| ---------------------------------------------------------------------- | ||||
|  | ||||
| @@ -0,0 +1,17 @@ | ||||
| --- | ||||
| source: index-scheduler/src/lib.rs | ||||
| --- | ||||
| [ | ||||
|   { | ||||
|     "id": 1, | ||||
|     "doggo": "jean bob" | ||||
|   }, | ||||
|   { | ||||
|     "id": 2, | ||||
|     "catto": "jorts" | ||||
|   }, | ||||
|   { | ||||
|     "id": 3, | ||||
|     "doggo": "bork" | ||||
|   } | ||||
| ] | ||||
| @@ -0,0 +1,36 @@ | ||||
| --- | ||||
| source: index-scheduler/src/lib.rs | ||||
| --- | ||||
| ### Autobatching Enabled = true | ||||
| ### Processing Tasks: | ||||
| [] | ||||
| ---------------------------------------------------------------------- | ||||
| ### All Tasks: | ||||
| 0 {uid: 0, status: enqueued, details: { received_document_ids: 2, deleted_documents: None }, kind: DocumentDeletion { index_uid: "doggos", documents_ids: ["1", "2"] }} | ||||
| ---------------------------------------------------------------------- | ||||
| ### Status: | ||||
| enqueued [0,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Kind: | ||||
| "documentDeletion" [0,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Index Tasks: | ||||
| doggos [0,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Index Mapper: | ||||
|  | ||||
| ---------------------------------------------------------------------- | ||||
| ### Canceled By: | ||||
|  | ||||
| ---------------------------------------------------------------------- | ||||
| ### Enqueued At: | ||||
| [timestamp] [0,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Started At: | ||||
| ---------------------------------------------------------------------- | ||||
| ### Finished At: | ||||
| ---------------------------------------------------------------------- | ||||
| ### File Store: | ||||
|  | ||||
| ---------------------------------------------------------------------- | ||||
|  | ||||
| @@ -0,0 +1,40 @@ | ||||
| --- | ||||
| source: index-scheduler/src/lib.rs | ||||
| --- | ||||
| ### Autobatching Enabled = true | ||||
| ### Processing Tasks: | ||||
| [] | ||||
| ---------------------------------------------------------------------- | ||||
| ### All Tasks: | ||||
| 0 {uid: 0, status: enqueued, details: { received_document_ids: 2, deleted_documents: None }, kind: DocumentDeletion { index_uid: "doggos", documents_ids: ["1", "2"] }} | ||||
| 1 {uid: 1, status: enqueued, details: { received_documents: 3, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 3, allow_index_creation: true }} | ||||
| ---------------------------------------------------------------------- | ||||
| ### Status: | ||||
| enqueued [0,1,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Kind: | ||||
| "documentAdditionOrUpdate" [1,] | ||||
| "documentDeletion" [0,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Index Tasks: | ||||
| doggos [0,1,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Index Mapper: | ||||
|  | ||||
| ---------------------------------------------------------------------- | ||||
| ### Canceled By: | ||||
|  | ||||
| ---------------------------------------------------------------------- | ||||
| ### Enqueued At: | ||||
| [timestamp] [0,] | ||||
| [timestamp] [1,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Started At: | ||||
| ---------------------------------------------------------------------- | ||||
| ### Finished At: | ||||
| ---------------------------------------------------------------------- | ||||
| ### File Store: | ||||
| 00000000-0000-0000-0000-000000000000 | ||||
|  | ||||
| ---------------------------------------------------------------------- | ||||
|  | ||||
| @@ -75,9 +75,6 @@ maplit = "1.0.2" | ||||
| md5 = "0.7.0" | ||||
| rand = { version = "0.8.5", features = ["small_rng"] } | ||||
|  | ||||
| [target.'cfg(fuzzing)'.dev-dependencies] | ||||
| fuzzcheck = "0.12.1" | ||||
|  | ||||
| [features] | ||||
| all-tokenizations = ["charabia/default"] | ||||
|  | ||||
|   | ||||
| @@ -111,7 +111,6 @@ pub enum Error { | ||||
|     Io(#[from] io::Error), | ||||
| } | ||||
|  | ||||
| #[cfg(test)] | ||||
| pub fn objects_from_json_value(json: serde_json::Value) -> Vec<crate::Object> { | ||||
|     let documents = match json { | ||||
|         object @ serde_json::Value::Object(_) => vec![object], | ||||
| @@ -141,7 +140,6 @@ macro_rules! documents { | ||||
|     }}; | ||||
| } | ||||
|  | ||||
| #[cfg(test)] | ||||
| pub fn documents_batch_reader_from_objects( | ||||
|     objects: impl IntoIterator<Item = Object>, | ||||
| ) -> DocumentsBatchReader<std::io::Cursor<Vec<u8>>> { | ||||
|   | ||||
| @@ -106,22 +106,30 @@ impl<'a> ExternalDocumentsIds<'a> { | ||||
|         map | ||||
|     } | ||||
|  | ||||
|     /// Return an fst of the combined hard and soft deleted ID. | ||||
|     pub fn to_fst<'b>(&'b self) -> fst::Result<Cow<'b, fst::Map<Cow<'a, [u8]>>>> { | ||||
|         if self.soft.is_empty() { | ||||
|             return Ok(Cow::Borrowed(&self.hard)); | ||||
|         } | ||||
|         let union_op = self.hard.op().add(&self.soft).r#union(); | ||||
|  | ||||
|         let mut iter = union_op.into_stream(); | ||||
|         let mut new_hard_builder = fst::MapBuilder::memory(); | ||||
|         while let Some((external_id, marked_docids)) = iter.next() { | ||||
|             let value = indexed_last_value(marked_docids).unwrap(); | ||||
|             if value != DELETED_ID { | ||||
|                 new_hard_builder.insert(external_id, value)?; | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         drop(iter); | ||||
|  | ||||
|         Ok(Cow::Owned(new_hard_builder.into_map().map_data(Cow::Owned)?)) | ||||
|     } | ||||
|  | ||||
|     fn merge_soft_into_hard(&mut self) -> fst::Result<()> { | ||||
|         if self.soft.len() >= self.hard.len() / 2 { | ||||
|             let union_op = self.hard.op().add(&self.soft).r#union(); | ||||
|  | ||||
|             let mut iter = union_op.into_stream(); | ||||
|             let mut new_hard_builder = fst::MapBuilder::memory(); | ||||
|             while let Some((external_id, marked_docids)) = iter.next() { | ||||
|                 let value = indexed_last_value(marked_docids).unwrap(); | ||||
|                 if value != DELETED_ID { | ||||
|                     new_hard_builder.insert(external_id, value)?; | ||||
|                 } | ||||
|             } | ||||
|  | ||||
|             drop(iter); | ||||
|  | ||||
|             self.hard = new_hard_builder.into_map().map_data(Cow::Owned)?; | ||||
|             self.hard = self.to_fst()?.into_owned(); | ||||
|             self.soft = fst::Map::default().map_data(Cow::Owned)?; | ||||
|         } | ||||
|  | ||||
|   | ||||
| @@ -71,7 +71,6 @@ impl std::fmt::Display for DeletionStrategy { | ||||
| pub(crate) struct DetailedDocumentDeletionResult { | ||||
|     pub deleted_documents: u64, | ||||
|     pub remaining_documents: u64, | ||||
|     pub soft_deletion_used: bool, | ||||
| } | ||||
|  | ||||
| impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { | ||||
| @@ -108,11 +107,8 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { | ||||
|         Some(docid) | ||||
|     } | ||||
|     pub fn execute(self) -> Result<DocumentDeletionResult> { | ||||
|         let DetailedDocumentDeletionResult { | ||||
|             deleted_documents, | ||||
|             remaining_documents, | ||||
|             soft_deletion_used: _, | ||||
|         } = self.execute_inner()?; | ||||
|         let DetailedDocumentDeletionResult { deleted_documents, remaining_documents } = | ||||
|             self.execute_inner()?; | ||||
|  | ||||
|         Ok(DocumentDeletionResult { deleted_documents, remaining_documents }) | ||||
|     } | ||||
| @@ -133,7 +129,6 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { | ||||
|             return Ok(DetailedDocumentDeletionResult { | ||||
|                 deleted_documents: 0, | ||||
|                 remaining_documents: 0, | ||||
|                 soft_deletion_used: false, | ||||
|             }); | ||||
|         } | ||||
|  | ||||
| @@ -149,7 +144,6 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { | ||||
|             return Ok(DetailedDocumentDeletionResult { | ||||
|                 deleted_documents: current_documents_ids_len, | ||||
|                 remaining_documents, | ||||
|                 soft_deletion_used: false, | ||||
|             }); | ||||
|         } | ||||
|  | ||||
| @@ -218,7 +212,6 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { | ||||
|             return Ok(DetailedDocumentDeletionResult { | ||||
|                 deleted_documents: self.to_delete_docids.len(), | ||||
|                 remaining_documents: documents_ids.len(), | ||||
|                 soft_deletion_used: true, | ||||
|             }); | ||||
|         } | ||||
|  | ||||
| @@ -441,7 +434,6 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { | ||||
|         Ok(DetailedDocumentDeletionResult { | ||||
|             deleted_documents: self.to_delete_docids.len(), | ||||
|             remaining_documents: documents_ids.len(), | ||||
|             soft_deletion_used: false, | ||||
|         }) | ||||
|     } | ||||
|  | ||||
|   | ||||
| @@ -236,7 +236,7 @@ where | ||||
|             primary_key, | ||||
|             fields_ids_map, | ||||
|             field_distribution, | ||||
|             mut external_documents_ids, | ||||
|             new_external_documents_ids, | ||||
|             new_documents_ids, | ||||
|             replaced_documents_ids, | ||||
|             documents_count, | ||||
| @@ -363,9 +363,6 @@ where | ||||
|             deletion_builder.delete_documents(&replaced_documents_ids); | ||||
|             let deleted_documents_result = deletion_builder.execute_inner()?; | ||||
|             debug!("{} documents actually deleted", deleted_documents_result.deleted_documents); | ||||
|             if !deleted_documents_result.soft_deletion_used { | ||||
|                 external_documents_ids.delete_soft_deleted_documents_ids_from_fsts()?; | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         let index_documents_ids = self.index.documents_ids(self.wtxn)?; | ||||
| @@ -445,6 +442,9 @@ where | ||||
|         self.index.put_primary_key(self.wtxn, &primary_key)?; | ||||
|  | ||||
|         // We write the external documents ids into the main database. | ||||
|         let mut external_documents_ids = self.index.external_documents_ids(self.wtxn)?; | ||||
|         external_documents_ids.insert_ids(&new_external_documents_ids)?; | ||||
|         let external_documents_ids = external_documents_ids.into_static(); | ||||
|         self.index.put_external_documents_ids(self.wtxn, &external_documents_ids)?; | ||||
|  | ||||
|         let all_documents_ids = index_documents_ids | new_documents_ids; | ||||
| @@ -2514,4 +2514,170 @@ mod tests { | ||||
|         db_snap!(index, word_fid_docids, 3, @"4c2e2a1832e5802796edc1638136d933"); | ||||
|         db_snap!(index, word_position_docids, 3, @"74f556b91d161d997a89468b4da1cb8f"); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn reproduce_the_bug() { | ||||
|         /* | ||||
|             [milli/examples/fuzz.rs:69] &batches = [ | ||||
|             Batch( | ||||
|                 [ | ||||
|                     AddDoc( | ||||
|                         { "id": 1, "doggo": "bernese" }, => internal 0 | ||||
|                     ), | ||||
|                 ], | ||||
|             ), | ||||
|             Batch( | ||||
|                 [ | ||||
|                     DeleteDoc( | ||||
|                         1, => delete internal 0 | ||||
|                     ), | ||||
|                     AddDoc( | ||||
|                         { "id": 0, "catto": "jorts" }, => internal 1 | ||||
|                     ), | ||||
|                 ], | ||||
|             ), | ||||
|             Batch( | ||||
|                 [ | ||||
|                     AddDoc( | ||||
|                         { "id": 1, "catto": "jorts" }, => internal 2 | ||||
|                     ), | ||||
|                 ], | ||||
|             ), | ||||
|         ] | ||||
|         */ | ||||
|         let mut index = TempIndex::new(); | ||||
|         index.index_documents_config.deletion_strategy = DeletionStrategy::AlwaysHard; | ||||
|  | ||||
|         // START OF BATCH | ||||
|  | ||||
|         println!("--- ENTERING BATCH 1"); | ||||
|  | ||||
|         let mut wtxn = index.write_txn().unwrap(); | ||||
|  | ||||
|         let builder = IndexDocuments::new( | ||||
|             &mut wtxn, | ||||
|             &index, | ||||
|             &index.indexer_config, | ||||
|             index.index_documents_config.clone(), | ||||
|             |_| (), | ||||
|             || false, | ||||
|         ) | ||||
|         .unwrap(); | ||||
|  | ||||
|         // OP | ||||
|  | ||||
|         let documents = documents!([ | ||||
|             { "id": 1, "doggo": "bernese" }, | ||||
|         ]); | ||||
|         let (builder, added) = builder.add_documents(documents).unwrap(); | ||||
|         insta::assert_display_snapshot!(added.unwrap(), @"1"); | ||||
|  | ||||
|         // FINISHING | ||||
|         let addition = builder.execute().unwrap(); | ||||
|         insta::assert_debug_snapshot!(addition, @r###" | ||||
|         DocumentAdditionResult { | ||||
|             indexed_documents: 1, | ||||
|             number_of_documents: 1, | ||||
|         } | ||||
|         "###); | ||||
|         wtxn.commit().unwrap(); | ||||
|  | ||||
|         db_snap!(index, documents, @r###" | ||||
|         {"id":1,"doggo":"bernese"} | ||||
|         "###); | ||||
|         db_snap!(index, external_documents_ids, @r###" | ||||
|         soft: | ||||
|         hard: | ||||
|         1                        0 | ||||
|         "###); | ||||
|  | ||||
|         // A first batch of documents has been inserted | ||||
|  | ||||
|         // BATCH 2 | ||||
|  | ||||
|         println!("--- ENTERING BATCH 2"); | ||||
|  | ||||
|         let mut wtxn = index.write_txn().unwrap(); | ||||
|  | ||||
|         let builder = IndexDocuments::new( | ||||
|             &mut wtxn, | ||||
|             &index, | ||||
|             &index.indexer_config, | ||||
|             index.index_documents_config.clone(), | ||||
|             |_| (), | ||||
|             || false, | ||||
|         ) | ||||
|         .unwrap(); | ||||
|  | ||||
|         let (builder, removed) = builder.remove_documents(vec![S("1")]).unwrap(); | ||||
|         insta::assert_display_snapshot!(removed.unwrap(), @"1"); | ||||
|  | ||||
|         let documents = documents!([ | ||||
|             { "id": 0, "catto": "jorts" }, | ||||
|         ]); | ||||
|         let (builder, added) = builder.add_documents(documents).unwrap(); | ||||
|         insta::assert_display_snapshot!(added.unwrap(), @"1"); | ||||
|  | ||||
|         let addition = builder.execute().unwrap(); | ||||
|         insta::assert_debug_snapshot!(addition, @r###" | ||||
|         DocumentAdditionResult { | ||||
|             indexed_documents: 1, | ||||
|             number_of_documents: 1, | ||||
|         } | ||||
|         "###); | ||||
|         wtxn.commit().unwrap(); | ||||
|  | ||||
|         db_snap!(index, documents, @r###" | ||||
|         {"id":0,"catto":"jorts"} | ||||
|         "###); | ||||
|  | ||||
|         db_snap!(index, external_documents_ids, @r###" | ||||
|         soft: | ||||
|         hard: | ||||
|         0                        1 | ||||
|         "###); | ||||
|  | ||||
|         db_snap!(index, soft_deleted_documents_ids, @"[]"); | ||||
|  | ||||
|         // BATCH 3 | ||||
|  | ||||
|         println!("--- ENTERING BATCH 3"); | ||||
|  | ||||
|         let mut wtxn = index.write_txn().unwrap(); | ||||
|  | ||||
|         let builder = IndexDocuments::new( | ||||
|             &mut wtxn, | ||||
|             &index, | ||||
|             &index.indexer_config, | ||||
|             index.index_documents_config.clone(), | ||||
|             |_| (), | ||||
|             || false, | ||||
|         ) | ||||
|         .unwrap(); | ||||
|  | ||||
|         let documents = documents!([ | ||||
|             { "id": 1, "catto": "jorts" }, | ||||
|         ]); | ||||
|         let (builder, added) = builder.add_documents(documents).unwrap(); | ||||
|         insta::assert_display_snapshot!(added.unwrap(), @"1"); | ||||
|  | ||||
|         let addition = builder.execute().unwrap(); | ||||
|         insta::assert_debug_snapshot!(addition, @r###" | ||||
|         DocumentAdditionResult { | ||||
|             indexed_documents: 1, | ||||
|             number_of_documents: 2, | ||||
|         } | ||||
|         "###); | ||||
|         wtxn.commit().unwrap(); | ||||
|  | ||||
|         db_snap!(index, documents, @r###" | ||||
|         {"id":1,"catto":"jorts"} | ||||
|         {"id":0,"catto":"jorts"} | ||||
|         "###); | ||||
|  | ||||
|         // Ensuring all the returned IDs actually exists | ||||
|         let rtxn = index.read_txn().unwrap(); | ||||
|         let res = index.search(&rtxn).execute().unwrap(); | ||||
|         index.documents(&rtxn, res.documents_ids).unwrap(); | ||||
|     } | ||||
| } | ||||
|   | ||||
| @@ -21,15 +21,14 @@ use crate::error::{Error, InternalError, UserError}; | ||||
| use crate::index::{db_name, main_key}; | ||||
| use crate::update::{AvailableDocumentsIds, ClearDocuments, UpdateIndexingStep}; | ||||
| use crate::{ | ||||
|     ExternalDocumentsIds, FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldsIdsMap, Index, | ||||
|     Result, BEU32, | ||||
|     FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldsIdsMap, Index, Result, BEU32, | ||||
| }; | ||||
|  | ||||
| pub struct TransformOutput { | ||||
|     pub primary_key: String, | ||||
|     pub fields_ids_map: FieldsIdsMap, | ||||
|     pub field_distribution: FieldDistribution, | ||||
|     pub external_documents_ids: ExternalDocumentsIds<'static>, | ||||
|     pub new_external_documents_ids: fst::Map<Cow<'static, [u8]>>, | ||||
|     pub new_documents_ids: RoaringBitmap, | ||||
|     pub replaced_documents_ids: RoaringBitmap, | ||||
|     pub documents_count: usize, | ||||
| @@ -568,8 +567,6 @@ impl<'a, 'i> Transform<'a, 'i> { | ||||
|             }))? | ||||
|             .to_string(); | ||||
|  | ||||
|         let mut external_documents_ids = self.index.external_documents_ids(wtxn)?; | ||||
|  | ||||
|         // We create a final writer to write the new documents in order from the sorter. | ||||
|         let mut writer = create_writer( | ||||
|             self.indexer_settings.chunk_compression_type, | ||||
| @@ -651,13 +648,12 @@ impl<'a, 'i> Transform<'a, 'i> { | ||||
|             fst_new_external_documents_ids_builder.insert(key, value) | ||||
|         })?; | ||||
|         let new_external_documents_ids = fst_new_external_documents_ids_builder.into_map(); | ||||
|         external_documents_ids.insert_ids(&new_external_documents_ids)?; | ||||
|  | ||||
|         Ok(TransformOutput { | ||||
|             primary_key, | ||||
|             fields_ids_map: self.fields_ids_map, | ||||
|             field_distribution, | ||||
|             external_documents_ids: external_documents_ids.into_static(), | ||||
|             new_external_documents_ids: new_external_documents_ids.map_data(Cow::Owned).unwrap(), | ||||
|             new_documents_ids: self.new_documents_ids, | ||||
|             replaced_documents_ids: self.replaced_documents_ids, | ||||
|             documents_count: self.documents_count, | ||||
| @@ -691,7 +687,8 @@ impl<'a, 'i> Transform<'a, 'i> { | ||||
|         let new_external_documents_ids = { | ||||
|             let mut external_documents_ids = self.index.external_documents_ids(wtxn)?; | ||||
|             external_documents_ids.delete_soft_deleted_documents_ids_from_fsts()?; | ||||
|             external_documents_ids | ||||
|             // This call should be free and can't fail since the previous method merged both fsts. | ||||
|             external_documents_ids.into_static().to_fst()?.into_owned() | ||||
|         }; | ||||
|  | ||||
|         let documents_ids = self.index.documents_ids(wtxn)?; | ||||
| @@ -776,7 +773,7 @@ impl<'a, 'i> Transform<'a, 'i> { | ||||
|             primary_key, | ||||
|             fields_ids_map: new_fields_ids_map, | ||||
|             field_distribution, | ||||
|             external_documents_ids: new_external_documents_ids.into_static(), | ||||
|             new_external_documents_ids, | ||||
|             new_documents_ids: documents_ids, | ||||
|             replaced_documents_ids: RoaringBitmap::default(), | ||||
|             documents_count, | ||||
|   | ||||
		Reference in New Issue
	
	Block a user