mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-25 21:16:28 +00:00 
			
		
		
		
	chore: Simplify the update application
This commit is contained in:
		
				
					committed by
					
						 Clément Renault
						Clément Renault
					
				
			
			
				
	
			
			
			
						parent
						
							4deee93a55
						
					
				
				
					commit
					b0be06540a
				
			| @@ -1,15 +1,15 @@ | ||||
| use std::collections::HashSet; | ||||
| use std::sync::Arc; | ||||
|  | ||||
| use meilidb_core::DocumentId; | ||||
| use fst::{SetBuilder, set::OpBuilder}; | ||||
| use sdset::{SetOperation, duo::Union}; | ||||
| use serde::Serialize; | ||||
|  | ||||
| use crate::indexer::Indexer; | ||||
| use crate::serde::{extract_document_id, Serializer, RamDocumentStore}; | ||||
| use crate::RankedMap; | ||||
|  | ||||
| use super::{Error, Index, FinalDocumentsDeletion}; | ||||
| use super::{Error, Index, apply_documents_deletion}; | ||||
| use super::index::Cache; | ||||
|  | ||||
| pub struct DocumentsAddition<'a, D> { | ||||
| @@ -33,71 +33,56 @@ impl<'a, D> DocumentsAddition<'a, D> { | ||||
|     } | ||||
| } | ||||
|  | ||||
| pub struct FinalDocumentsAddition<'a> { | ||||
|     inner: &'a Index, | ||||
|     document_ids: HashSet<DocumentId>, | ||||
|     document_store: RamDocumentStore, | ||||
|     indexer: Indexer, | ||||
|     ranked_map: RankedMap, | ||||
| } | ||||
|  | ||||
| impl<'a> FinalDocumentsAddition<'a> { | ||||
|     pub fn new(inner: &'a Index, ranked_map: RankedMap) -> FinalDocumentsAddition<'a> { | ||||
|         FinalDocumentsAddition { | ||||
|             inner, | ||||
|             document_ids: HashSet::new(), | ||||
|             document_store: RamDocumentStore::new(), | ||||
|             indexer: Indexer::new(), | ||||
|             ranked_map, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn update_document<D>(&mut self, document: D) -> Result<(), Error> | ||||
|     where D: serde::Serialize, | ||||
| pub fn apply_documents_addition( | ||||
|     index: &Index, | ||||
|     mut ranked_map: RankedMap, | ||||
|     addition: Vec<serde_json::Value>, | ||||
| ) -> Result<(), Error> | ||||
| { | ||||
|         let schema = &self.inner.schema(); | ||||
|     let mut document_ids = HashSet::new(); | ||||
|     let mut document_store = RamDocumentStore::new(); | ||||
|     let mut indexer = Indexer::new(); | ||||
|  | ||||
|     let schema = &index.schema(); | ||||
|     let identifier = schema.identifier_name(); | ||||
|  | ||||
|     for document in addition { | ||||
|         let document_id = match extract_document_id(identifier, &document)? { | ||||
|             Some(id) => id, | ||||
|             None => return Err(Error::MissingDocumentId), | ||||
|         }; | ||||
|  | ||||
|         // 1. store the document id for future deletion | ||||
|         self.document_ids.insert(document_id); | ||||
|         document_ids.insert(document_id); | ||||
|  | ||||
|         // 2. index the document fields in ram stores | ||||
|         let serializer = Serializer { | ||||
|             schema, | ||||
|             document_store: &mut self.document_store, | ||||
|             indexer: &mut self.indexer, | ||||
|             ranked_map: &mut self.ranked_map, | ||||
|             document_store: &mut document_store, | ||||
|             indexer: &mut indexer, | ||||
|             ranked_map: &mut ranked_map, | ||||
|             document_id, | ||||
|         }; | ||||
|  | ||||
|         document.serialize(serializer)?; | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     pub fn finalize(self) -> Result<(), Error> { | ||||
|         let ref_index = self.inner.as_ref(); | ||||
|     let ref_index = index.as_ref(); | ||||
|     let docs_words = ref_index.docs_words_index; | ||||
|     let documents = ref_index.documents_index; | ||||
|     let main = ref_index.main_index; | ||||
|     let words = ref_index.words_index; | ||||
|  | ||||
|     // 1. remove the previous documents match indexes | ||||
|         let mut documents_deletion = FinalDocumentsDeletion::new(self.inner, self.ranked_map.clone()); | ||||
|         documents_deletion.extend(self.document_ids); | ||||
|         documents_deletion.finalize()?; | ||||
|     let document_ids = document_ids.into_iter().collect(); | ||||
|     apply_documents_deletion(index, ranked_map.clone(), document_ids)?; | ||||
|  | ||||
|     // 2. insert new document attributes in the database | ||||
|         for ((id, attr), value) in self.document_store.into_inner() { | ||||
|     for ((id, attr), value) in document_store.into_inner() { | ||||
|         documents.set_document_field(id, attr, value)?; | ||||
|     } | ||||
|  | ||||
|         let indexed = self.indexer.build(); | ||||
|     let indexed = indexer.build(); | ||||
|     let mut delta_words_builder = SetBuilder::memory(); | ||||
|  | ||||
|     for (word, delta_set) in indexed.words_doc_indexes { | ||||
| @@ -138,18 +123,16 @@ impl<'a> FinalDocumentsAddition<'a> { | ||||
|     }; | ||||
|  | ||||
|     main.set_words_set(&words)?; | ||||
|         main.set_ranked_map(&self.ranked_map)?; | ||||
|     main.set_ranked_map(&ranked_map)?; | ||||
|  | ||||
|     // update the "consistent" view of the Index | ||||
|     let cache = ref_index.cache; | ||||
|     let words = Arc::new(words); | ||||
|         let ranked_map = self.ranked_map; | ||||
|     let synonyms = cache.synonyms.clone(); | ||||
|     let schema = cache.schema.clone(); | ||||
|  | ||||
|     let cache = Cache { words, synonyms, schema, ranked_map }; | ||||
|         self.inner.cache.store(Arc::new(cache)); | ||||
|     index.cache.store(Arc::new(cache)); | ||||
|  | ||||
|     Ok(()) | ||||
| } | ||||
| } | ||||
|   | ||||
| @@ -21,10 +21,25 @@ impl<'a> DocumentsDeletion<'a> { | ||||
|         DocumentsDeletion { index, documents: Vec::new() } | ||||
|     } | ||||
|  | ||||
|     pub fn delete_document(&mut self, document_id: DocumentId) { | ||||
|     pub fn delete_document_by_id(&mut self, document_id: DocumentId) { | ||||
|         self.documents.push(document_id); | ||||
|     } | ||||
|  | ||||
|     pub fn delete_document<D>(&mut self, document: D) -> Result<(), Error> | ||||
|     where D: serde::Serialize, | ||||
|     { | ||||
|         let schema = self.index.schema(); | ||||
|         let identifier = schema.identifier_name(); | ||||
|         let document_id = match extract_document_id(identifier, &document)? { | ||||
|             Some(id) => id, | ||||
|             None => return Err(Error::MissingDocumentId), | ||||
|         }; | ||||
|  | ||||
|         self.delete_document_by_id(document_id); | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     pub fn finalize(self) -> Result<u64, Error> { | ||||
|         self.index.push_documents_deletion(self.documents) | ||||
|     } | ||||
| @@ -36,46 +51,20 @@ impl Extend<DocumentId> for DocumentsDeletion<'_> { | ||||
|     } | ||||
| } | ||||
|  | ||||
| pub struct FinalDocumentsDeletion<'a> { | ||||
|     inner: &'a Index, | ||||
|     documents: Vec<DocumentId>, | ||||
|     ranked_map: RankedMap, | ||||
| } | ||||
|  | ||||
| impl<'a> FinalDocumentsDeletion<'a> { | ||||
|     pub fn new(inner: &'a Index, ranked_map: RankedMap) -> FinalDocumentsDeletion { | ||||
|         FinalDocumentsDeletion { inner, documents: Vec::new(), ranked_map } | ||||
|     } | ||||
|  | ||||
|     fn delete_document_by_id(&mut self, id: DocumentId) { | ||||
|         self.documents.push(id); | ||||
|     } | ||||
|  | ||||
|     pub fn delete_document<D>(&mut self, document: D) -> Result<(), Error> | ||||
|     where D: serde::Serialize, | ||||
| pub fn apply_documents_deletion( | ||||
|     index: &Index, | ||||
|     mut ranked_map: RankedMap, | ||||
|     deletion: Vec<DocumentId>, | ||||
| ) -> Result<(), Error> | ||||
| { | ||||
|         let schema = &self.inner.schema(); | ||||
|         let identifier = schema.identifier_name(); | ||||
|  | ||||
|         let document_id = match extract_document_id(identifier, &document)? { | ||||
|             Some(id) => id, | ||||
|             None => return Err(Error::MissingDocumentId), | ||||
|         }; | ||||
|  | ||||
|         self.delete_document_by_id(document_id); | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     pub fn finalize(mut self) -> Result<(), Error> { | ||||
|         let ref_index = self.inner.as_ref(); | ||||
|         let schema = self.inner.schema(); | ||||
|     let ref_index = index.as_ref(); | ||||
|     let schema = index.schema(); | ||||
|     let docs_words = ref_index.docs_words_index; | ||||
|     let documents = ref_index.documents_index; | ||||
|     let main = ref_index.main_index; | ||||
|     let words = ref_index.words_index; | ||||
|  | ||||
|         let idset = SetBuf::from_dirty(self.documents); | ||||
|     let idset = SetBuf::from_dirty(deletion); | ||||
|  | ||||
|     // collect the ranked attributes according to the schema | ||||
|     let ranked_attrs: Vec<_> = schema.iter() | ||||
| @@ -88,7 +77,7 @@ impl<'a> FinalDocumentsDeletion<'a> { | ||||
|     for id in idset { | ||||
|         // remove all the ranked attributes from the ranked_map | ||||
|         for ranked_attr in &ranked_attrs { | ||||
|                 self.ranked_map.remove(id, *ranked_attr); | ||||
|             ranked_map.remove(id, *ranked_attr); | ||||
|         } | ||||
|  | ||||
|         if let Some(words) = docs_words.doc_words(id)? { | ||||
| @@ -141,24 +130,16 @@ impl<'a> FinalDocumentsDeletion<'a> { | ||||
|     }; | ||||
|  | ||||
|     main.set_words_set(&words)?; | ||||
|         main.set_ranked_map(&self.ranked_map)?; | ||||
|     main.set_ranked_map(&ranked_map)?; | ||||
|  | ||||
|     // update the "consistent" view of the Index | ||||
|     let cache = ref_index.cache; | ||||
|     let words = Arc::new(words); | ||||
|         let ranked_map = self.ranked_map; | ||||
|     let synonyms = cache.synonyms.clone(); | ||||
|     let schema = cache.schema.clone(); | ||||
|  | ||||
|     let cache = Cache { words, synonyms, schema, ranked_map }; | ||||
|         self.inner.cache.store(Arc::new(cache)); | ||||
|     index.cache.store(Arc::new(cache)); | ||||
|  | ||||
|     Ok(()) | ||||
| } | ||||
| } | ||||
|  | ||||
| impl Extend<DocumentId> for FinalDocumentsDeletion<'_> { | ||||
|     fn extend<T: IntoIterator<Item=DocumentId>>(&mut self, iter: T) { | ||||
|         self.documents.extend(iter) | ||||
|     } | ||||
| } | ||||
|   | ||||
| @@ -1,5 +1,4 @@ | ||||
| use std::collections::{HashSet, BTreeMap}; | ||||
| use std::convert::TryInto; | ||||
| use std::sync::Arc; | ||||
| use std::thread; | ||||
|  | ||||
| @@ -24,10 +23,10 @@ use self::synonyms_index::SynonymsIndex; | ||||
| use self::words_index::WordsIndex; | ||||
|  | ||||
| use super::{ | ||||
|     DocumentsAddition, FinalDocumentsAddition, | ||||
|     DocumentsDeletion, FinalDocumentsDeletion, | ||||
|     SynonymsAddition, FinalSynonymsAddition, | ||||
|     SynonymsDeletion, FinalSynonymsDeletion, | ||||
|     DocumentsAddition, DocumentsDeletion, | ||||
|     SynonymsAddition, SynonymsDeletion, | ||||
|     apply_documents_addition, apply_documents_deletion, | ||||
|     apply_synonyms_addition, apply_synonyms_deletion, | ||||
| }; | ||||
|  | ||||
| mod custom_settings_index; | ||||
| @@ -71,33 +70,23 @@ fn spawn_update_system(index: Index) -> thread::JoinHandle<()> { | ||||
|                 let results = &index.updates_results_index; | ||||
|                 (updates, results).transaction(|(updates, results)| { | ||||
|                     let update = updates.remove(&key)?.unwrap(); | ||||
|                     let array_id = key.as_ref().try_into().unwrap(); | ||||
|                     let id = u64::from_be_bytes(array_id); | ||||
|  | ||||
|                     // this is an emulation of the try block (#31436) | ||||
|                     let result: Result<(), Error> = (|| { | ||||
|                         match bincode::deserialize(&update)? { | ||||
|                             UpdateOwned::DocumentsAddition(documents) => { | ||||
|                                 let ranked_map = index.cache.load().ranked_map.clone(); | ||||
|                                 let mut addition = FinalDocumentsAddition::new(&index, ranked_map); | ||||
|                                 for document in documents { | ||||
|                                     addition.update_document(document)?; | ||||
|                                 } | ||||
|                                 addition.finalize()?; | ||||
|                                 apply_documents_addition(&index, ranked_map, documents)?; | ||||
|                             }, | ||||
|                             UpdateOwned::DocumentsDeletion(documents) => { | ||||
|                                 let ranked_map = index.cache.load().ranked_map.clone(); | ||||
|                                 let mut deletion = FinalDocumentsDeletion::new(&index, ranked_map); | ||||
|                                 deletion.extend(documents); | ||||
|                                 deletion.finalize()?; | ||||
|                                 apply_documents_deletion(&index, ranked_map, documents)?; | ||||
|                             }, | ||||
|                             UpdateOwned::SynonymsAddition(synonyms) => { | ||||
|                                 let addition = FinalSynonymsAddition::from_map(&index, synonyms); | ||||
|                                 addition.finalize()?; | ||||
|                                 apply_synonyms_addition(&index, synonyms)?; | ||||
|                             }, | ||||
|                             UpdateOwned::SynonymsDeletion(synonyms) => { | ||||
|                                 let deletion = FinalSynonymsDeletion::from_map(&index, synonyms); | ||||
|                                 deletion.finalize()?; | ||||
|                                 apply_synonyms_deletion(&index, synonyms)?; | ||||
|                             }, | ||||
|                         } | ||||
|                         Ok(()) | ||||
| @@ -105,7 +94,7 @@ fn spawn_update_system(index: Index) -> thread::JoinHandle<()> { | ||||
|  | ||||
|                     let result = result.map_err(|e| e.to_string()); | ||||
|                     let value = bincode::serialize(&result).unwrap(); | ||||
|                     results.insert(&array_id, value) | ||||
|                     results.insert(&key, value) | ||||
|                 }) | ||||
|                 .unwrap(); | ||||
|             } | ||||
| @@ -310,7 +299,12 @@ impl Index { | ||||
|         self.raw_push_update(update) | ||||
|     } | ||||
|  | ||||
|     pub(crate) fn push_documents_deletion(&self, deletion: Vec<DocumentId>) -> Result<u64, Error> { | ||||
|     pub(crate) fn push_documents_deletion( | ||||
|         &self, | ||||
|         deletion: Vec<DocumentId>, | ||||
|     ) -> Result<u64, Error> | ||||
|     { | ||||
|         let deletion = Update::<()>::DocumentsDeletion(deletion); | ||||
|         let update = bincode::serialize(&deletion)?; | ||||
|         self.raw_push_update(update) | ||||
|     } | ||||
| @@ -320,6 +314,7 @@ impl Index { | ||||
|         addition: BTreeMap<String, Vec<String>>, | ||||
|     ) -> Result<u64, Error> | ||||
|     { | ||||
|         let addition = Update::<()>::SynonymsAddition(addition); | ||||
|         let update = bincode::serialize(&addition)?; | ||||
|         self.raw_push_update(update) | ||||
|     } | ||||
| @@ -329,6 +324,7 @@ impl Index { | ||||
|         deletion: BTreeMap<String, Option<Vec<String>>>, | ||||
|     ) -> Result<u64, Error> | ||||
|     { | ||||
|         let deletion = Update::<()>::SynonymsDeletion(deletion); | ||||
|         let update = bincode::serialize(&deletion)?; | ||||
|         self.raw_push_update(update) | ||||
|     } | ||||
|   | ||||
| @@ -15,10 +15,15 @@ mod synonyms_deletion; | ||||
| pub use self::error::Error; | ||||
| pub use self::index::{Index, CustomSettingsIndex}; | ||||
|  | ||||
| use self::documents_addition::{DocumentsAddition, FinalDocumentsAddition}; | ||||
| use self::documents_deletion::{DocumentsDeletion, FinalDocumentsDeletion}; | ||||
| use self::synonyms_addition::{SynonymsAddition, FinalSynonymsAddition}; | ||||
| use self::synonyms_deletion::{SynonymsDeletion, FinalSynonymsDeletion}; | ||||
| pub use self::documents_addition::DocumentsAddition; | ||||
| pub use self::documents_deletion::DocumentsDeletion; | ||||
| pub use self::synonyms_addition::SynonymsAddition; | ||||
| pub use self::synonyms_deletion::SynonymsDeletion; | ||||
|  | ||||
| use self::documents_addition::apply_documents_addition; | ||||
| use self::documents_deletion::apply_documents_deletion; | ||||
| use self::synonyms_addition::apply_synonyms_addition; | ||||
| use self::synonyms_deletion::apply_synonyms_deletion; | ||||
|  | ||||
| fn load_indexes(tree: &sled::Tree) -> Result<HashSet<String>, Error> { | ||||
|     match tree.get("indexes")? { | ||||
|   | ||||
| @@ -21,10 +21,10 @@ impl<'a> SynonymsAddition<'a> { | ||||
|     pub fn add_synonym<S, T, I>(&mut self, synonym: S, alternatives: I) | ||||
|     where S: AsRef<str>, | ||||
|           T: AsRef<str>, | ||||
|           I: Iterator<Item=T>, | ||||
|           I: IntoIterator<Item=T>, | ||||
|     { | ||||
|         let synonym = normalize_str(synonym.as_ref()); | ||||
|         let alternatives = alternatives.map(|s| s.as_ref().to_lowercase()); | ||||
|         let alternatives = alternatives.into_iter().map(|s| s.as_ref().to_lowercase()); | ||||
|         self.synonyms.entry(synonym).or_insert_with(Vec::new).extend(alternatives); | ||||
|     } | ||||
|  | ||||
| @@ -33,42 +33,18 @@ impl<'a> SynonymsAddition<'a> { | ||||
|     } | ||||
| } | ||||
|  | ||||
| pub struct FinalSynonymsAddition<'a> { | ||||
|     inner: &'a Index, | ||||
|     synonyms: BTreeMap<String, Vec<String>>, | ||||
| } | ||||
|  | ||||
| impl<'a> FinalSynonymsAddition<'a> { | ||||
|     pub fn new(inner: &'a Index) -> FinalSynonymsAddition<'a> { | ||||
|         FinalSynonymsAddition { inner, synonyms: BTreeMap::new() } | ||||
|     } | ||||
|  | ||||
|     pub fn from_map( | ||||
|         inner: &'a Index, | ||||
|         synonyms: BTreeMap<String, Vec<String>>, | ||||
|     ) -> FinalSynonymsAddition<'a> | ||||
| pub fn apply_synonyms_addition( | ||||
|     index: &Index, | ||||
|     addition: BTreeMap<String, Vec<String>>, | ||||
| ) -> Result<(), Error> | ||||
| { | ||||
|         FinalSynonymsAddition { inner, synonyms } | ||||
|     } | ||||
|  | ||||
|     pub fn add_synonym<S, T, I>(&mut self, synonym: S, alternatives: I) | ||||
|     where S: AsRef<str>, | ||||
|           T: AsRef<str>, | ||||
|           I: IntoIterator<Item=T>, | ||||
|     { | ||||
|         let synonym = normalize_str(synonym.as_ref()); | ||||
|         let alternatives = alternatives.into_iter().map(|s| s.as_ref().to_lowercase()); | ||||
|         self.synonyms.entry(synonym).or_insert_with(Vec::new).extend(alternatives); | ||||
|     } | ||||
|  | ||||
|     pub fn finalize(self) -> Result<(), Error> { | ||||
|         let ref_index = self.inner.as_ref(); | ||||
|     let ref_index = index.as_ref(); | ||||
|     let synonyms = ref_index.synonyms_index; | ||||
|     let main = ref_index.main_index; | ||||
|  | ||||
|     let mut synonyms_builder = SetBuilder::memory(); | ||||
|  | ||||
|         for (synonym, alternatives) in self.synonyms { | ||||
|     for (synonym, alternatives) in addition { | ||||
|         synonyms_builder.insert(&synonym).unwrap(); | ||||
|  | ||||
|         let alternatives = { | ||||
| @@ -112,8 +88,7 @@ impl<'a> FinalSynonymsAddition<'a> { | ||||
|     let schema = cache.schema.clone(); | ||||
|  | ||||
|     let cache = Cache { words, synonyms, schema, ranked_map }; | ||||
|         self.inner.cache.store(Arc::new(cache)); | ||||
|     index.cache.store(Arc::new(cache)); | ||||
|  | ||||
|     Ok(()) | ||||
| } | ||||
| } | ||||
|   | ||||
| @@ -43,32 +43,18 @@ impl<'a> SynonymsDeletion<'a> { | ||||
|     } | ||||
| } | ||||
|  | ||||
| pub struct FinalSynonymsDeletion<'a> { | ||||
|     inner: &'a Index, | ||||
|     synonyms: BTreeMap<String, Option<Vec<String>>>, | ||||
| } | ||||
|  | ||||
| impl<'a> FinalSynonymsDeletion<'a> { | ||||
|     pub fn new(inner: &'a Index) -> FinalSynonymsDeletion<'a> { | ||||
|         FinalSynonymsDeletion { inner, synonyms: BTreeMap::new() } | ||||
|     } | ||||
|  | ||||
|     pub fn from_map( | ||||
|         inner: &'a Index, | ||||
|         synonyms: BTreeMap<String, Option<Vec<String>>>, | ||||
|     ) -> FinalSynonymsDeletion<'a> | ||||
| pub fn apply_synonyms_deletion( | ||||
|     index: &Index, | ||||
|     deletion: BTreeMap<String, Option<Vec<String>>>, | ||||
| ) -> Result<(), Error> | ||||
| { | ||||
|         FinalSynonymsDeletion { inner, synonyms } | ||||
|     } | ||||
|  | ||||
|     pub fn finalize(self) -> Result<(), Error> { | ||||
|         let ref_index = self.inner.as_ref(); | ||||
|     let ref_index = index.as_ref(); | ||||
|     let synonyms = ref_index.synonyms_index; | ||||
|     let main = ref_index.main_index; | ||||
|  | ||||
|     let mut delete_whole_synonym_builder = SetBuilder::memory(); | ||||
|  | ||||
|         for (synonym, alternatives) in self.synonyms { | ||||
|     for (synonym, alternatives) in deletion { | ||||
|         match alternatives { | ||||
|             Some(alternatives) => { | ||||
|                 let prev_alternatives = synonyms.alternatives_to(synonym.as_bytes())?; | ||||
| @@ -145,8 +131,7 @@ impl<'a> FinalSynonymsDeletion<'a> { | ||||
|     let schema = cache.schema.clone(); | ||||
|  | ||||
|     let cache = Cache { words, synonyms, schema, ranked_map }; | ||||
|         self.inner.cache.store(Arc::new(cache)); | ||||
|     index.cache.store(Arc::new(cache)); | ||||
|  | ||||
|     Ok(()) | ||||
| } | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user