mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-31 07:56:28 +00:00 
			
		
		
		
	Support merging update and replacement operations
This commit is contained in:
		| @@ -27,7 +27,7 @@ pub struct Update<'doc> { | ||||
|     docid: DocumentId, | ||||
|     external_document_id: &'doc str, | ||||
|     new: Versions<'doc>, | ||||
|     has_deletion: bool, | ||||
|     from_scratch: bool, | ||||
| } | ||||
|  | ||||
| pub struct Insertion<'doc> { | ||||
| @@ -109,9 +109,9 @@ impl<'doc> Update<'doc> { | ||||
|         docid: DocumentId, | ||||
|         external_document_id: &'doc str, | ||||
|         new: Versions<'doc>, | ||||
|         has_deletion: bool, | ||||
|         from_scratch: bool, | ||||
|     ) -> Self { | ||||
|         Update { docid, new, external_document_id, has_deletion } | ||||
|         Update { docid, new, external_document_id, from_scratch } | ||||
|     } | ||||
|  | ||||
|     pub fn docid(&self) -> DocumentId { | ||||
| @@ -154,7 +154,7 @@ impl<'doc> Update<'doc> { | ||||
|         index: &'t Index, | ||||
|         mapper: &'t Mapper, | ||||
|     ) -> Result<MergedDocument<'_, 'doc, 't, Mapper>> { | ||||
|         if self.has_deletion { | ||||
|         if self.from_scratch { | ||||
|             Ok(MergedDocument::without_db(DocumentFromVersions::new(&self.new))) | ||||
|         } else { | ||||
|             MergedDocument::with_db( | ||||
| @@ -207,7 +207,7 @@ impl<'doc> Update<'doc> { | ||||
|             cached_current = Some(current); | ||||
|         } | ||||
|  | ||||
|         if !self.has_deletion { | ||||
|         if !self.from_scratch { | ||||
|             // no field deletion, so fields that don't appear in `updated` cannot have changed | ||||
|             return Ok(changed); | ||||
|         } | ||||
| @@ -257,7 +257,7 @@ impl<'doc> Update<'doc> { | ||||
|         doc_alloc: &'doc Bump, | ||||
|         embedders: &'doc EmbeddingConfigs, | ||||
|     ) -> Result<Option<MergedVectorDocument<'doc>>> { | ||||
|         if self.has_deletion { | ||||
|         if self.from_scratch { | ||||
|             MergedVectorDocument::without_db( | ||||
|                 self.external_document_id, | ||||
|                 &self.new, | ||||
|   | ||||
| @@ -26,23 +26,36 @@ use crate::{DocumentId, Error, FieldsIdsMap, Index, InternalError, Result, UserE | ||||
| #[derive(Default)] | ||||
| pub struct DocumentOperation<'pl> { | ||||
|     operations: Vec<Payload<'pl>>, | ||||
|     method: MergeMethod, | ||||
| } | ||||
|  | ||||
| impl<'pl> DocumentOperation<'pl> { | ||||
|     pub fn new(method: IndexDocumentsMethod) -> Self { | ||||
|         Self { operations: Default::default(), method: MergeMethod::from(method) } | ||||
|     pub fn new() -> Self { | ||||
|         Self { operations: Default::default() } | ||||
|     } | ||||
|  | ||||
|     /// TODO please give me a type | ||||
|     /// Append a replacement of documents. | ||||
|     /// | ||||
|     /// The payload is expected to be in the NDJSON format | ||||
|     pub fn add_documents(&mut self, payload: &'pl Mmap) -> Result<()> { | ||||
|     pub fn replace_documents(&mut self, payload: &'pl Mmap) -> Result<()> { | ||||
|         #[cfg(unix)] | ||||
|         payload.advise(memmap2::Advice::Sequential)?; | ||||
|         self.operations.push(Payload::Addition(&payload[..])); | ||||
|         self.operations.push(Payload::Replace(&payload[..])); | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     /// Append an update of documents. | ||||
|     /// | ||||
|     /// The payload is expected to be in the NDJSON format | ||||
|     pub fn update_documents(&mut self, payload: &'pl Mmap) -> Result<()> { | ||||
|         #[cfg(unix)] | ||||
|         payload.advise(memmap2::Advice::Sequential)?; | ||||
|         self.operations.push(Payload::Update(&payload[..])); | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     /// Append a deletion of documents IDs. | ||||
|     /// | ||||
|     /// The list is a set of external documents IDs. | ||||
|     pub fn delete_documents(&mut self, to_delete: &'pl [&'pl str]) { | ||||
|         self.operations.push(Payload::Deletion(to_delete)) | ||||
|     } | ||||
| @@ -63,7 +76,7 @@ impl<'pl> DocumentOperation<'pl> { | ||||
|         MSP: Fn() -> bool, | ||||
|     { | ||||
|         progress.update_progress(IndexingStep::PreparingPayloads); | ||||
|         let Self { operations, method } = self; | ||||
|         let Self { operations } = self; | ||||
|  | ||||
|         let documents_ids = index.documents_ids(rtxn)?; | ||||
|         let mut operations_stats = Vec::new(); | ||||
| @@ -83,7 +96,7 @@ impl<'pl> DocumentOperation<'pl> { | ||||
|  | ||||
|             let mut bytes = 0; | ||||
|             let result = match operation { | ||||
|                 Payload::Addition(payload) => extract_addition_payload_changes( | ||||
|                 Payload::Replace(payload) => extract_addition_payload_changes( | ||||
|                     indexer, | ||||
|                     index, | ||||
|                     rtxn, | ||||
| @@ -93,7 +106,20 @@ impl<'pl> DocumentOperation<'pl> { | ||||
|                     &mut available_docids, | ||||
|                     &mut bytes, | ||||
|                     &docids_version_offsets, | ||||
|                     method, | ||||
|                     IndexDocumentsMethod::ReplaceDocuments, | ||||
|                     payload, | ||||
|                 ), | ||||
|                 Payload::Update(payload) => extract_addition_payload_changes( | ||||
|                     indexer, | ||||
|                     index, | ||||
|                     rtxn, | ||||
|                     primary_key_from_op, | ||||
|                     &mut primary_key, | ||||
|                     new_fields_ids_map, | ||||
|                     &mut available_docids, | ||||
|                     &mut bytes, | ||||
|                     &docids_version_offsets, | ||||
|                     IndexDocumentsMethod::UpdateDocuments, | ||||
|                     payload, | ||||
|                 ), | ||||
|                 Payload::Deletion(to_delete) => extract_deletion_payload_changes( | ||||
| @@ -101,7 +127,6 @@ impl<'pl> DocumentOperation<'pl> { | ||||
|                     rtxn, | ||||
|                     &mut available_docids, | ||||
|                     &docids_version_offsets, | ||||
|                     method, | ||||
|                     to_delete, | ||||
|                 ), | ||||
|             }; | ||||
| @@ -127,20 +152,15 @@ impl<'pl> DocumentOperation<'pl> { | ||||
|             docids_version_offsets.drain().collect_in(indexer); | ||||
|  | ||||
|         // Reorder the offsets to make sure we iterate on the file sequentially | ||||
|         // And finally sort them | ||||
|         docids_version_offsets.sort_unstable_by_key(|(_, po)| method.sort_key(&po.operations)); | ||||
|         // And finally sort them. This clearly speeds up reading the update files. | ||||
|         docids_version_offsets | ||||
|             .sort_unstable_by_key(|(_, po)| first_update_pointer(&po.operations).unwrap_or(0)); | ||||
|  | ||||
|         let docids_version_offsets = docids_version_offsets.into_bump_slice(); | ||||
|         Ok((DocumentOperationChanges { docids_version_offsets }, operations_stats, primary_key)) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl Default for DocumentOperation<'_> { | ||||
|     fn default() -> Self { | ||||
|         DocumentOperation::new(IndexDocumentsMethod::default()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[allow(clippy::too_many_arguments)] | ||||
| fn extract_addition_payload_changes<'r, 'pl: 'r>( | ||||
|     indexer: &'pl Bump, | ||||
| @@ -152,9 +172,11 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>( | ||||
|     available_docids: &mut AvailableIds, | ||||
|     bytes: &mut u64, | ||||
|     main_docids_version_offsets: &hashbrown::HashMap<&'pl str, PayloadOperations<'pl>>, | ||||
|     method: MergeMethod, | ||||
|     method: IndexDocumentsMethod, | ||||
|     payload: &'pl [u8], | ||||
| ) -> Result<hashbrown::HashMap<&'pl str, PayloadOperations<'pl>>> { | ||||
|     use IndexDocumentsMethod::{ReplaceDocuments, UpdateDocuments}; | ||||
|  | ||||
|     let mut new_docids_version_offsets = hashbrown::HashMap::<&str, PayloadOperations<'pl>>::new(); | ||||
|  | ||||
|     let mut previous_offset = 0; | ||||
| @@ -205,48 +227,82 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>( | ||||
|             None => { | ||||
|                 match index.external_documents_ids().get(rtxn, external_id) { | ||||
|                     Ok(Some(docid)) => match new_docids_version_offsets.entry(external_id) { | ||||
|                         Entry::Occupied(mut entry) => { | ||||
|                             entry.get_mut().push_addition(document_offset) | ||||
|                         } | ||||
|                         Entry::Occupied(mut entry) => match method { | ||||
|                             ReplaceDocuments => entry.get_mut().push_replacement(document_offset), | ||||
|                             UpdateDocuments => entry.get_mut().push_update(document_offset), | ||||
|                         }, | ||||
|                         Entry::Vacant(entry) => { | ||||
|                             entry.insert(PayloadOperations::new_addition( | ||||
|                                 method, | ||||
|                                 docid, | ||||
|                                 false, // is new | ||||
|                                 document_offset, | ||||
|                             )); | ||||
|                             match method { | ||||
|                                 ReplaceDocuments => { | ||||
|                                     entry.insert(PayloadOperations::new_replacement( | ||||
|                                         docid, | ||||
|                                         false, // is new | ||||
|                                         document_offset, | ||||
|                                     )); | ||||
|                                 } | ||||
|                                 UpdateDocuments => { | ||||
|                                     entry.insert(PayloadOperations::new_update( | ||||
|                                         docid, | ||||
|                                         false, // is new | ||||
|                                         document_offset, | ||||
|                                     )); | ||||
|                                 } | ||||
|                             } | ||||
|                         } | ||||
|                     }, | ||||
|                     Ok(None) => match new_docids_version_offsets.entry(external_id) { | ||||
|                         Entry::Occupied(mut entry) => { | ||||
|                             entry.get_mut().push_addition(document_offset) | ||||
|                         } | ||||
|                         Entry::Occupied(mut entry) => match method { | ||||
|                             ReplaceDocuments => entry.get_mut().push_replacement(document_offset), | ||||
|                             UpdateDocuments => entry.get_mut().push_update(document_offset), | ||||
|                         }, | ||||
|                         Entry::Vacant(entry) => { | ||||
|                             let docid = match available_docids.next() { | ||||
|                                 Some(docid) => docid, | ||||
|                                 None => return Err(UserError::DocumentLimitReached.into()), | ||||
|                             }; | ||||
|                             entry.insert(PayloadOperations::new_addition( | ||||
|                                 method, | ||||
|                                 docid, | ||||
|                                 true, // is new | ||||
|                                 document_offset, | ||||
|                             )); | ||||
|  | ||||
|                             match method { | ||||
|                                 ReplaceDocuments => { | ||||
|                                     entry.insert(PayloadOperations::new_replacement( | ||||
|                                         docid, | ||||
|                                         true, // is new | ||||
|                                         document_offset, | ||||
|                                     )); | ||||
|                                 } | ||||
|                                 UpdateDocuments => { | ||||
|                                     entry.insert(PayloadOperations::new_update( | ||||
|                                         docid, | ||||
|                                         true, // is new | ||||
|                                         document_offset, | ||||
|                                     )); | ||||
|                                 } | ||||
|                             } | ||||
|                         } | ||||
|                     }, | ||||
|                     Err(e) => return Err(e.into()), | ||||
|                 } | ||||
|             } | ||||
|             Some(payload_operations) => match new_docids_version_offsets.entry(external_id) { | ||||
|                 Entry::Occupied(mut entry) => entry.get_mut().push_addition(document_offset), | ||||
|                 Entry::Vacant(entry) => { | ||||
|                     entry.insert(PayloadOperations::new_addition( | ||||
|                         method, | ||||
|                         payload_operations.docid, | ||||
|                         payload_operations.is_new, | ||||
|                         document_offset, | ||||
|                     )); | ||||
|                 } | ||||
|                 Entry::Occupied(mut entry) => match method { | ||||
|                     ReplaceDocuments => entry.get_mut().push_replacement(document_offset), | ||||
|                     UpdateDocuments => entry.get_mut().push_update(document_offset), | ||||
|                 }, | ||||
|                 Entry::Vacant(entry) => match method { | ||||
|                     ReplaceDocuments => { | ||||
|                         entry.insert(PayloadOperations::new_replacement( | ||||
|                             payload_operations.docid, | ||||
|                             payload_operations.is_new, | ||||
|                             document_offset, | ||||
|                         )); | ||||
|                     } | ||||
|                     UpdateDocuments => { | ||||
|                         entry.insert(PayloadOperations::new_update( | ||||
|                             payload_operations.docid, | ||||
|                             payload_operations.is_new, | ||||
|                             document_offset, | ||||
|                         )); | ||||
|                     } | ||||
|                 }, | ||||
|             }, | ||||
|         } | ||||
|  | ||||
| @@ -279,7 +335,6 @@ fn extract_deletion_payload_changes<'s, 'pl: 's>( | ||||
|     rtxn: &RoTxn, | ||||
|     available_docids: &mut AvailableIds, | ||||
|     main_docids_version_offsets: &hashbrown::HashMap<&'s str, PayloadOperations<'pl>>, | ||||
|     method: MergeMethod, | ||||
|     to_delete: &'pl [&'pl str], | ||||
| ) -> Result<hashbrown::HashMap<&'s str, PayloadOperations<'pl>>> { | ||||
|     let mut new_docids_version_offsets = hashbrown::HashMap::<&str, PayloadOperations<'pl>>::new(); | ||||
| @@ -293,7 +348,7 @@ fn extract_deletion_payload_changes<'s, 'pl: 's>( | ||||
|                             Entry::Occupied(mut entry) => entry.get_mut().push_deletion(), | ||||
|                             Entry::Vacant(entry) => { | ||||
|                                 entry.insert(PayloadOperations::new_deletion( | ||||
|                                     method, docid, false, // is new | ||||
|                                     docid, false, // is new | ||||
|                                 )); | ||||
|                             } | ||||
|                         } | ||||
| @@ -307,7 +362,7 @@ fn extract_deletion_payload_changes<'s, 'pl: 's>( | ||||
|                             Entry::Occupied(mut entry) => entry.get_mut().push_deletion(), | ||||
|                             Entry::Vacant(entry) => { | ||||
|                                 entry.insert(PayloadOperations::new_deletion( | ||||
|                                     method, docid, true, // is new | ||||
|                                     docid, true, // is new | ||||
|                                 )); | ||||
|                             } | ||||
|                         } | ||||
| @@ -319,7 +374,6 @@ fn extract_deletion_payload_changes<'s, 'pl: 's>( | ||||
|                 Entry::Occupied(mut entry) => entry.get_mut().push_deletion(), | ||||
|                 Entry::Vacant(entry) => { | ||||
|                     entry.insert(PayloadOperations::new_deletion( | ||||
|                         method, | ||||
|                         payload_operations.docid, | ||||
|                         payload_operations.is_new, | ||||
|                     )); | ||||
| @@ -370,13 +424,7 @@ impl<'pl> DocumentChanges<'pl> for DocumentOperationChanges<'pl> { | ||||
|         'pl: 'doc, | ||||
|     { | ||||
|         let (external_doc, payload_operations) = item; | ||||
|         payload_operations.merge_method.merge( | ||||
|             payload_operations.docid, | ||||
|             external_doc, | ||||
|             payload_operations.is_new, | ||||
|             &context.doc_alloc, | ||||
|             &payload_operations.operations[..], | ||||
|         ) | ||||
|         payload_operations.merge(external_doc, &context.doc_alloc) | ||||
|     } | ||||
|  | ||||
|     fn len(&self) -> usize { | ||||
| @@ -389,7 +437,8 @@ pub struct DocumentOperationChanges<'pl> { | ||||
| } | ||||
|  | ||||
| pub enum Payload<'pl> { | ||||
|     Addition(&'pl [u8]), | ||||
|     Replace(&'pl [u8]), | ||||
|     Update(&'pl [u8]), | ||||
|     Deletion(&'pl [&'pl str]), | ||||
| } | ||||
|  | ||||
| @@ -406,31 +455,30 @@ pub struct PayloadOperations<'pl> { | ||||
|     pub is_new: bool, | ||||
|     /// The operations to perform, in order, on this document. | ||||
|     pub operations: Vec<InnerDocOp<'pl>>, | ||||
|     /// The merge method we are using to merge payloads and documents. | ||||
|     merge_method: MergeMethod, | ||||
| } | ||||
|  | ||||
| impl<'pl> PayloadOperations<'pl> { | ||||
|     fn new_deletion(merge_method: MergeMethod, docid: DocumentId, is_new: bool) -> Self { | ||||
|         Self { docid, is_new, operations: vec![InnerDocOp::Deletion], merge_method } | ||||
|     fn new_replacement(docid: DocumentId, is_new: bool, offset: DocumentOffset<'pl>) -> Self { | ||||
|         Self { docid, is_new, operations: vec![InnerDocOp::Replace(offset)] } | ||||
|     } | ||||
|  | ||||
|     fn new_addition( | ||||
|         merge_method: MergeMethod, | ||||
|         docid: DocumentId, | ||||
|         is_new: bool, | ||||
|         offset: DocumentOffset<'pl>, | ||||
|     ) -> Self { | ||||
|         Self { docid, is_new, operations: vec![InnerDocOp::Addition(offset)], merge_method } | ||||
|     fn new_update(docid: DocumentId, is_new: bool, offset: DocumentOffset<'pl>) -> Self { | ||||
|         Self { docid, is_new, operations: vec![InnerDocOp::Update(offset)] } | ||||
|     } | ||||
|  | ||||
|     fn new_deletion(docid: DocumentId, is_new: bool) -> Self { | ||||
|         Self { docid, is_new, operations: vec![InnerDocOp::Deletion] } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<'pl> PayloadOperations<'pl> { | ||||
|     fn push_addition(&mut self, offset: DocumentOffset<'pl>) { | ||||
|         if self.merge_method.useless_previous_changes() { | ||||
|             self.operations.clear(); | ||||
|         } | ||||
|         self.operations.push(InnerDocOp::Addition(offset)) | ||||
|     fn push_replacement(&mut self, offset: DocumentOffset<'pl>) { | ||||
|         self.operations.clear(); | ||||
|         self.operations.push(InnerDocOp::Replace(offset)) | ||||
|     } | ||||
|  | ||||
|     fn push_update(&mut self, offset: DocumentOffset<'pl>) { | ||||
|         self.operations.push(InnerDocOp::Update(offset)) | ||||
|     } | ||||
|  | ||||
|     fn push_deletion(&mut self) { | ||||
| @@ -440,16 +488,114 @@ impl<'pl> PayloadOperations<'pl> { | ||||
|  | ||||
|     fn append_operations(&mut self, mut operations: Vec<InnerDocOp<'pl>>) { | ||||
|         debug_assert!(!operations.is_empty()); | ||||
|         if self.merge_method.useless_previous_changes() { | ||||
|         if matches!(operations.first(), Some(InnerDocOp::Deletion | InnerDocOp::Replace(_))) { | ||||
|             self.operations.clear(); | ||||
|         } | ||||
|         self.operations.append(&mut operations); | ||||
|     } | ||||
|  | ||||
|     /// Returns only the most recent version of a document based on the updates from the payloads. | ||||
|     /// | ||||
|     /// This function is only meant to be used when doing a replacement and not an update. | ||||
|     fn merge<'doc>( | ||||
|         &self, | ||||
|         external_doc: &'doc str, | ||||
|         doc_alloc: &'doc Bump, | ||||
|     ) -> Result<Option<DocumentChange<'doc>>> | ||||
|     where | ||||
|         'pl: 'doc, | ||||
|     { | ||||
|         match self.operations.last() { | ||||
|             Some(InnerDocOp::Replace(DocumentOffset { content })) => { | ||||
|                 let document = serde_json::from_slice(content).unwrap(); | ||||
|                 let document = | ||||
|                     RawMap::from_raw_value_and_hasher(document, FxBuildHasher, doc_alloc) | ||||
|                         .map_err(UserError::SerdeJson)?; | ||||
|  | ||||
|                 if self.is_new { | ||||
|                     Ok(Some(DocumentChange::Insertion(Insertion::create( | ||||
|                         self.docid, | ||||
|                         external_doc, | ||||
|                         Versions::single(document), | ||||
|                     )))) | ||||
|                 } else { | ||||
|                     Ok(Some(DocumentChange::Update(Update::create( | ||||
|                         self.docid, | ||||
|                         external_doc, | ||||
|                         Versions::single(document), | ||||
|                         true, | ||||
|                     )))) | ||||
|                 } | ||||
|             } | ||||
|             Some(InnerDocOp::Update(_)) => { | ||||
|                 // Search the first operation that is a tombstone which resets the document. | ||||
|                 let last_tombstone = self | ||||
|                     .operations | ||||
|                     .iter() | ||||
|                     .rposition(|op| matches!(op, InnerDocOp::Deletion | InnerDocOp::Replace(_))); | ||||
|  | ||||
|                 // Track when we must ignore previous document versions from the rtxn. | ||||
|                 let from_scratch = last_tombstone.is_some(); | ||||
|  | ||||
|                 // We ignore deletion and keep the replacement to create the appropriate versions. | ||||
|                 let operations = match last_tombstone { | ||||
|                     Some(i) => match self.operations[i] { | ||||
|                         InnerDocOp::Deletion => &self.operations[i + 1..], | ||||
|                         InnerDocOp::Replace(_) => &self.operations[i..], | ||||
|                         InnerDocOp::Update(_) => unreachable!("Found a non-tombstone operation"), | ||||
|                     }, | ||||
|                     None => &self.operations[..], | ||||
|                 }; | ||||
|  | ||||
|                 // We collect the versions to generate the appropriate document. | ||||
|                 let versions = operations.iter().map(|operation| { | ||||
|                     let DocumentOffset { content } = match operation { | ||||
|                         InnerDocOp::Replace(offset) | InnerDocOp::Update(offset) => offset, | ||||
|                         InnerDocOp::Deletion => unreachable!("Deletion in document operations"), | ||||
|                     }; | ||||
|  | ||||
|                     let document = serde_json::from_slice(content).unwrap(); | ||||
|                     let document = | ||||
|                         RawMap::from_raw_value_and_hasher(document, FxBuildHasher, doc_alloc) | ||||
|                             .map_err(UserError::SerdeJson)?; | ||||
|  | ||||
|                     Ok(document) | ||||
|                 }); | ||||
|  | ||||
|                 let Some(versions) = Versions::multiple(versions)? else { return Ok(None) }; | ||||
|  | ||||
|                 if self.is_new { | ||||
|                     Ok(Some(DocumentChange::Insertion(Insertion::create( | ||||
|                         self.docid, | ||||
|                         external_doc, | ||||
|                         versions, | ||||
|                     )))) | ||||
|                 } else { | ||||
|                     Ok(Some(DocumentChange::Update(Update::create( | ||||
|                         self.docid, | ||||
|                         external_doc, | ||||
|                         versions, | ||||
|                         from_scratch, | ||||
|                     )))) | ||||
|                 } | ||||
|             } | ||||
|             Some(InnerDocOp::Deletion) => { | ||||
|                 return if self.is_new { | ||||
|                     Ok(None) | ||||
|                 } else { | ||||
|                     let deletion = Deletion::create(self.docid, external_doc); | ||||
|                     Ok(Some(DocumentChange::Deletion(deletion))) | ||||
|                 }; | ||||
|             } | ||||
|             None => unreachable!("We must not have an empty set of operations on a document"), | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[derive(Clone)] | ||||
| pub enum InnerDocOp<'pl> { | ||||
|     Addition(DocumentOffset<'pl>), | ||||
|     Replace(DocumentOffset<'pl>), | ||||
|     Update(DocumentOffset<'pl>), | ||||
|     Deletion, | ||||
| } | ||||
|  | ||||
| @@ -461,231 +607,14 @@ pub struct DocumentOffset<'pl> { | ||||
|     pub content: &'pl [u8], | ||||
| } | ||||
|  | ||||
| trait MergeChanges { | ||||
|     /// Whether the payloads in the list of operations are useless or not. | ||||
|     fn useless_previous_changes(&self) -> bool; | ||||
|  | ||||
|     /// Returns a key that is used to order the payloads the right way. | ||||
|     fn sort_key(&self, docops: &[InnerDocOp]) -> usize; | ||||
|  | ||||
|     fn merge<'doc>( | ||||
|         &self, | ||||
|         docid: DocumentId, | ||||
|         external_docid: &'doc str, | ||||
|         is_new: bool, | ||||
|         doc_alloc: &'doc Bump, | ||||
|         operations: &'doc [InnerDocOp], | ||||
|     ) -> Result<Option<DocumentChange<'doc>>>; | ||||
| } | ||||
|  | ||||
| #[derive(Debug, Clone, Copy)] | ||||
| enum MergeMethod { | ||||
|     ForReplacement(MergeDocumentForReplacement), | ||||
|     ForUpdates(MergeDocumentForUpdates), | ||||
| } | ||||
|  | ||||
| impl MergeChanges for MergeMethod { | ||||
|     fn useless_previous_changes(&self) -> bool { | ||||
|         match self { | ||||
|             MergeMethod::ForReplacement(merge) => merge.useless_previous_changes(), | ||||
|             MergeMethod::ForUpdates(merge) => merge.useless_previous_changes(), | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn sort_key(&self, docops: &[InnerDocOp]) -> usize { | ||||
|         match self { | ||||
|             MergeMethod::ForReplacement(merge) => merge.sort_key(docops), | ||||
|             MergeMethod::ForUpdates(merge) => merge.sort_key(docops), | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn merge<'doc>( | ||||
|         &self, | ||||
|         docid: DocumentId, | ||||
|         external_docid: &'doc str, | ||||
|         is_new: bool, | ||||
|         doc_alloc: &'doc Bump, | ||||
|         operations: &'doc [InnerDocOp], | ||||
|     ) -> Result<Option<DocumentChange<'doc>>> { | ||||
|         match self { | ||||
|             MergeMethod::ForReplacement(merge) => { | ||||
|                 merge.merge(docid, external_docid, is_new, doc_alloc, operations) | ||||
|             } | ||||
|             MergeMethod::ForUpdates(merge) => { | ||||
|                 merge.merge(docid, external_docid, is_new, doc_alloc, operations) | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl From<IndexDocumentsMethod> for MergeMethod { | ||||
|     fn from(method: IndexDocumentsMethod) -> Self { | ||||
|         match method { | ||||
|             IndexDocumentsMethod::ReplaceDocuments => { | ||||
|                 MergeMethod::ForReplacement(MergeDocumentForReplacement) | ||||
|             } | ||||
|             IndexDocumentsMethod::UpdateDocuments => { | ||||
|                 MergeMethod::ForUpdates(MergeDocumentForUpdates) | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[derive(Debug, Clone, Copy)] | ||||
| struct MergeDocumentForReplacement; | ||||
|  | ||||
| impl MergeChanges for MergeDocumentForReplacement { | ||||
|     fn useless_previous_changes(&self) -> bool { | ||||
|         true | ||||
|     } | ||||
|  | ||||
|     /// Reorders to read only the last change. | ||||
|     fn sort_key(&self, docops: &[InnerDocOp]) -> usize { | ||||
|         let f = |ido: &_| match ido { | ||||
|             InnerDocOp::Addition(add) => Some(add.content.as_ptr() as usize), | ||||
|             InnerDocOp::Deletion => None, | ||||
|         }; | ||||
|         docops.iter().rev().find_map(f).unwrap_or(0) | ||||
|     } | ||||
|  | ||||
|     /// Returns only the most recent version of a document based on the updates from the payloads. | ||||
|     /// | ||||
|     /// This function is only meant to be used when doing a replacement and not an update. | ||||
|     fn merge<'doc>( | ||||
|         &self, | ||||
|         docid: DocumentId, | ||||
|         external_doc: &'doc str, | ||||
|         is_new: bool, | ||||
|         doc_alloc: &'doc Bump, | ||||
|         operations: &'doc [InnerDocOp], | ||||
|     ) -> Result<Option<DocumentChange<'doc>>> { | ||||
|         match operations.last() { | ||||
|             Some(InnerDocOp::Addition(DocumentOffset { content })) => { | ||||
|                 let document = serde_json::from_slice(content).unwrap(); | ||||
|                 let document = | ||||
|                     RawMap::from_raw_value_and_hasher(document, FxBuildHasher, doc_alloc) | ||||
|                         .map_err(UserError::SerdeJson)?; | ||||
|  | ||||
|                 if is_new { | ||||
|                     Ok(Some(DocumentChange::Insertion(Insertion::create( | ||||
|                         docid, | ||||
|                         external_doc, | ||||
|                         Versions::single(document), | ||||
|                     )))) | ||||
|                 } else { | ||||
|                     Ok(Some(DocumentChange::Update(Update::create( | ||||
|                         docid, | ||||
|                         external_doc, | ||||
|                         Versions::single(document), | ||||
|                         true, | ||||
|                     )))) | ||||
|                 } | ||||
|             } | ||||
|             Some(InnerDocOp::Deletion) => { | ||||
|                 return if is_new { | ||||
|                     Ok(None) | ||||
|                 } else { | ||||
|                     let deletion = Deletion::create(docid, external_doc); | ||||
|                     Ok(Some(DocumentChange::Deletion(deletion))) | ||||
|                 }; | ||||
|             } | ||||
|             None => unreachable!("We must not have empty set of operations on a document"), | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[derive(Debug, Clone, Copy)] | ||||
| struct MergeDocumentForUpdates; | ||||
|  | ||||
| impl MergeChanges for MergeDocumentForUpdates { | ||||
|     fn useless_previous_changes(&self) -> bool { | ||||
|         false | ||||
|     } | ||||
|  | ||||
|     /// Reorders to read the first changes first so that it's faster to read the first one and then the rest. | ||||
|     fn sort_key(&self, docops: &[InnerDocOp]) -> usize { | ||||
|         let f = |ido: &_| match ido { | ||||
|             InnerDocOp::Addition(add) => Some(add.content.as_ptr() as usize), | ||||
|             InnerDocOp::Deletion => None, | ||||
|         }; | ||||
|         docops.iter().find_map(f).unwrap_or(0) | ||||
|     } | ||||
|  | ||||
|     /// Reads the previous version of a document from the database, the new versions | ||||
|     /// in the grenad update files and merges them to generate a new boxed obkv. | ||||
|     /// | ||||
|     /// This function is only meant to be used when doing an update and not a replacement. | ||||
|     fn merge<'doc>( | ||||
|         &self, | ||||
|         docid: DocumentId, | ||||
|         external_docid: &'doc str, | ||||
|         is_new: bool, | ||||
|         doc_alloc: &'doc Bump, | ||||
|         operations: &'doc [InnerDocOp], | ||||
|     ) -> Result<Option<DocumentChange<'doc>>> { | ||||
|         if operations.is_empty() { | ||||
|             unreachable!("We must not have empty set of operations on a document"); | ||||
|         } | ||||
|  | ||||
|         let last_deletion = operations.iter().rposition(|op| matches!(op, InnerDocOp::Deletion)); | ||||
|         let operations = &operations[last_deletion.map_or(0, |i| i + 1)..]; | ||||
|  | ||||
|         let has_deletion = last_deletion.is_some(); | ||||
|  | ||||
|         if operations.is_empty() { | ||||
|             return if is_new { | ||||
|                 Ok(None) | ||||
|             } else { | ||||
|                 let deletion = Deletion::create(docid, external_docid); | ||||
|                 Ok(Some(DocumentChange::Deletion(deletion))) | ||||
|             }; | ||||
|         } | ||||
|  | ||||
|         let versions = match operations { | ||||
|             [single] => { | ||||
|                 let DocumentOffset { content } = match single { | ||||
|                     InnerDocOp::Addition(offset) => offset, | ||||
|                     InnerDocOp::Deletion => { | ||||
|                         unreachable!("Deletion in document operations") | ||||
|                     } | ||||
|                 }; | ||||
|                 let document = serde_json::from_slice(content).unwrap(); | ||||
|                 let document = | ||||
|                     RawMap::from_raw_value_and_hasher(document, FxBuildHasher, doc_alloc) | ||||
|                         .map_err(UserError::SerdeJson)?; | ||||
|  | ||||
|                 Some(Versions::single(document)) | ||||
|             } | ||||
|             operations => { | ||||
|                 let versions = operations.iter().map(|operation| { | ||||
|                     let DocumentOffset { content } = match operation { | ||||
|                         InnerDocOp::Addition(offset) => offset, | ||||
|                         InnerDocOp::Deletion => { | ||||
|                             unreachable!("Deletion in document operations") | ||||
|                         } | ||||
|                     }; | ||||
|  | ||||
|                     let document = serde_json::from_slice(content).unwrap(); | ||||
|                     let document = | ||||
|                         RawMap::from_raw_value_and_hasher(document, FxBuildHasher, doc_alloc) | ||||
|                             .map_err(UserError::SerdeJson)?; | ||||
|                     Ok(document) | ||||
|                 }); | ||||
|                 Versions::multiple(versions)? | ||||
|             } | ||||
|         }; | ||||
|  | ||||
|         let Some(versions) = versions else { return Ok(None) }; | ||||
|  | ||||
|         if is_new { | ||||
|             Ok(Some(DocumentChange::Insertion(Insertion::create(docid, external_docid, versions)))) | ||||
|         } else { | ||||
|             Ok(Some(DocumentChange::Update(Update::create( | ||||
|                 docid, | ||||
|                 external_docid, | ||||
|                 versions, | ||||
|                 has_deletion, | ||||
|             )))) | ||||
|         } | ||||
|     } | ||||
| /// Returns the first pointer of the first change in a document. | ||||
| /// | ||||
| /// This is used to sort the documents in update file content order | ||||
| /// and read the update file in order to largely speed up the indexation. | ||||
| pub fn first_update_pointer(docops: &[InnerDocOp]) -> Option<usize> { | ||||
|     docops.iter().find_map(|ido: &_| match ido { | ||||
|         InnerDocOp::Replace(replace) => Some(replace.content.as_ptr() as usize), | ||||
|         InnerDocOp::Update(update) => Some(update.content.as_ptr() as usize), | ||||
|         InnerDocOp::Deletion => None, | ||||
|     }) | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user