mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-25 13:06:27 +00:00 
			
		
		
		
	Introduce the reindex_all_documents indexing function
This commit is contained in:
		| @@ -5,7 +5,7 @@ use sdset::{duo::Union, SetOperation}; | ||||
| use serde::Serialize; | ||||
|  | ||||
| use crate::raw_indexer::RawIndexer; | ||||
| use crate::serde::{extract_document_id, RamDocumentStore, Serializer}; | ||||
| use crate::serde::{extract_document_id, serialize_value, Serializer}; | ||||
| use crate::store; | ||||
| use crate::update::{apply_documents_deletion, next_update_id, Update}; | ||||
| use crate::{Error, MResult, RankedMap}; | ||||
| @@ -84,12 +84,9 @@ pub fn apply_documents_addition( | ||||
|     documents_fields_counts_store: store::DocumentsFieldsCounts, | ||||
|     postings_lists_store: store::PostingsLists, | ||||
|     docs_words_store: store::DocsWords, | ||||
|     mut ranked_map: RankedMap, | ||||
|     addition: Vec<serde_json::Value>, | ||||
| ) -> MResult<()> { | ||||
|     let mut document_ids = HashSet::new(); | ||||
|     let mut document_store = RamDocumentStore::new(); | ||||
|     let mut document_fields_counts = HashMap::new(); | ||||
|     let mut documents_ids = HashSet::new(); | ||||
|     let mut indexer = RawIndexer::new(); | ||||
|  | ||||
|     let schema = match main_store.schema(writer)? { | ||||
| @@ -99,20 +96,47 @@ pub fn apply_documents_addition( | ||||
|  | ||||
|     let identifier = schema.identifier_name(); | ||||
|  | ||||
|     // 1. store documents ids for future deletion | ||||
|     for document in addition.iter() { | ||||
|         let document_id = match extract_document_id(identifier, &document)? { | ||||
|             Some(id) => id, | ||||
|             None => return Err(Error::MissingDocumentId), | ||||
|         }; | ||||
|  | ||||
|         if !documents_ids.insert(document_id) { | ||||
|             return Err(Error::DuplicateDocument); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     // 2. remove the documents posting lists | ||||
|     let number_of_inserted_documents = documents_ids.len(); | ||||
|     apply_documents_deletion( | ||||
|         writer, | ||||
|         main_store, | ||||
|         documents_fields_store, | ||||
|         documents_fields_counts_store, | ||||
|         postings_lists_store, | ||||
|         docs_words_store, | ||||
|         documents_ids.into_iter().collect(), | ||||
|     )?; | ||||
|  | ||||
|     let mut ranked_map = match main_store.ranked_map(writer)? { | ||||
|         Some(ranked_map) => ranked_map, | ||||
|         None => RankedMap::default(), | ||||
|     }; | ||||
|  | ||||
|     // 3. index the documents fields in the stores | ||||
|     for document in addition { | ||||
|         let document_id = match extract_document_id(identifier, &document)? { | ||||
|             Some(id) => id, | ||||
|             None => return Err(Error::MissingDocumentId), | ||||
|         }; | ||||
|  | ||||
|         // 1. store the document id for future deletion | ||||
|         document_ids.insert(document_id); | ||||
|  | ||||
|         // 2. index the document fields in ram stores | ||||
|         let serializer = Serializer { | ||||
|             txn: writer, | ||||
|             schema: &schema, | ||||
|             document_store: &mut document_store, | ||||
|             document_fields_counts: &mut document_fields_counts, | ||||
|             document_store: documents_fields_store, | ||||
|             document_fields_counts: documents_fields_counts_store, | ||||
|             indexer: &mut indexer, | ||||
|             ranked_map: &mut ranked_map, | ||||
|             document_id, | ||||
| @@ -121,29 +145,93 @@ pub fn apply_documents_addition( | ||||
|         document.serialize(serializer)?; | ||||
|     } | ||||
|  | ||||
|     // 1. remove the previous documents match indexes | ||||
|     let documents_to_insert = document_ids.iter().cloned().collect(); | ||||
|     apply_documents_deletion( | ||||
|     write_documents_addition_index( | ||||
|         writer, | ||||
|         main_store, | ||||
|         documents_fields_store, | ||||
|         documents_fields_counts_store, | ||||
|         postings_lists_store, | ||||
|         docs_words_store, | ||||
|         ranked_map.clone(), | ||||
|         documents_to_insert, | ||||
|     )?; | ||||
|         ranked_map, | ||||
|         number_of_inserted_documents, | ||||
|         indexer, | ||||
|     ) | ||||
| } | ||||
|  | ||||
|     // 2. insert new document attributes in the database | ||||
|     for ((id, attr), value) in document_store.into_inner() { | ||||
|         documents_fields_store.put_document_field(writer, id, attr, &value)?; | ||||
| pub fn reindex_all_documents( | ||||
|     writer: &mut heed::RwTxn, | ||||
|     main_store: store::Main, | ||||
|     documents_fields_store: store::DocumentsFields, | ||||
|     documents_fields_counts_store: store::DocumentsFieldsCounts, | ||||
|     postings_lists_store: store::PostingsLists, | ||||
|     docs_words_store: store::DocsWords, | ||||
| ) -> MResult<()> { | ||||
|     let schema = match main_store.schema(writer)? { | ||||
|         Some(schema) => schema, | ||||
|         None => return Err(Error::SchemaMissing), | ||||
|     }; | ||||
|  | ||||
|     let mut ranked_map = RankedMap::default(); | ||||
|  | ||||
|     // 1. retrieve all documents ids | ||||
|     let mut documents_ids_to_reindex = Vec::new(); | ||||
|     for result in documents_fields_counts_store.documents_ids(writer)? { | ||||
|         let document_id = result?; | ||||
|         documents_ids_to_reindex.push(document_id); | ||||
|     } | ||||
|  | ||||
|     // 3. insert new document attributes counts | ||||
|     for ((id, attr), count) in document_fields_counts { | ||||
|         documents_fields_counts_store.put_document_field_count(writer, id, attr, count)?; | ||||
|     // 2. remove the documents posting lists | ||||
|     let number_of_inserted_documents = documents_ids_to_reindex.len(); | ||||
|     main_store.put_words_fst(writer, &fst::Set::default())?; | ||||
|     main_store.put_ranked_map(writer, &ranked_map)?; | ||||
|     main_store.put_number_of_documents(writer, |_| 0)?; | ||||
|     postings_lists_store.clear(writer)?; | ||||
|  | ||||
|     // 3. re-index one document by one document (otherwise we make the borrow checker unhappy) | ||||
|     let mut indexer = RawIndexer::new(); | ||||
|     let mut ram_store = HashMap::new(); | ||||
|  | ||||
|     for document_id in documents_ids_to_reindex { | ||||
|         for result in documents_fields_store.document_fields(writer, document_id)? { | ||||
|             let (attr, bytes) = result?; | ||||
|             let value: serde_json::Value = serde_json::from_slice(bytes)?; | ||||
|             ram_store.insert((document_id, attr), value); | ||||
|         } | ||||
|  | ||||
|         for ((docid, attr), value) in ram_store.drain() { | ||||
|             serialize_value( | ||||
|                 writer, | ||||
|                 attr, | ||||
|                 schema.props(attr), | ||||
|                 docid, | ||||
|                 documents_fields_store, | ||||
|                 documents_fields_counts_store, | ||||
|                 &mut indexer, | ||||
|                 &mut ranked_map, | ||||
|                 &value, | ||||
|             )?; | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     // 4. write the new index in the main store | ||||
|     write_documents_addition_index( | ||||
|         writer, | ||||
|         main_store, | ||||
|         postings_lists_store, | ||||
|         docs_words_store, | ||||
|         ranked_map, | ||||
|         number_of_inserted_documents, | ||||
|         indexer, | ||||
|     ) | ||||
| } | ||||
|  | ||||
| pub fn write_documents_addition_index( | ||||
|     writer: &mut heed::RwTxn, | ||||
|     main_store: store::Main, | ||||
|     postings_lists_store: store::PostingsLists, | ||||
|     docs_words_store: store::DocsWords, | ||||
|     ranked_map: RankedMap, | ||||
|     number_of_inserted_documents: usize, | ||||
|     indexer: RawIndexer, | ||||
| ) -> MResult<()> { | ||||
|     let indexed = indexer.build(); | ||||
|     let mut delta_words_builder = SetBuilder::memory(); | ||||
|  | ||||
| @@ -186,9 +274,7 @@ pub fn apply_documents_addition( | ||||
|  | ||||
|     main_store.put_words_fst(writer, &words)?; | ||||
|     main_store.put_ranked_map(writer, &ranked_map)?; | ||||
|  | ||||
|     let inserted_documents_len = document_ids.len() as u64; | ||||
|     main_store.put_number_of_documents(writer, |old| old + inserted_documents_len)?; | ||||
|     main_store.put_number_of_documents(writer, |old| old + number_of_inserted_documents as u64)?; | ||||
|  | ||||
|     Ok(()) | ||||
| } | ||||
|   | ||||
| @@ -88,7 +88,6 @@ pub fn apply_documents_deletion( | ||||
|     documents_fields_counts_store: store::DocumentsFieldsCounts, | ||||
|     postings_lists_store: store::PostingsLists, | ||||
|     docs_words_store: store::DocsWords, | ||||
|     mut ranked_map: RankedMap, | ||||
|     deletion: Vec<DocumentId>, | ||||
| ) -> MResult<()> { | ||||
|     let idset = SetBuf::from_dirty(deletion); | ||||
| @@ -98,6 +97,11 @@ pub fn apply_documents_deletion( | ||||
|         None => return Err(Error::SchemaMissing), | ||||
|     }; | ||||
|  | ||||
|     let mut ranked_map = match main_store.ranked_map(writer)? { | ||||
|         Some(ranked_map) => ranked_map, | ||||
|         None => RankedMap::default(), | ||||
|     }; | ||||
|  | ||||
|     // collect the ranked attributes according to the schema | ||||
|     let ranked_attrs: Vec<_> = schema | ||||
|         .iter() | ||||
| @@ -181,7 +185,6 @@ pub fn apply_documents_deletion( | ||||
|  | ||||
|     main_store.put_words_fst(writer, &words)?; | ||||
|     main_store.put_ranked_map(writer, &ranked_map)?; | ||||
|  | ||||
|     main_store.put_number_of_documents(writer, |old| old - deleted_documents_len)?; | ||||
|  | ||||
|     Ok(()) | ||||
|   | ||||
| @@ -20,7 +20,7 @@ use heed::Result as ZResult; | ||||
| use log::debug; | ||||
| use serde::{Deserialize, Serialize}; | ||||
|  | ||||
| use crate::{store, DocumentId, MResult, RankedMap}; | ||||
| use crate::{store, DocumentId, MResult}; | ||||
| use meilidb_schema::Schema; | ||||
|  | ||||
| #[derive(Debug, Clone, Serialize, Deserialize)] | ||||
| @@ -113,7 +113,15 @@ pub fn update_task(writer: &mut heed::RwTxn, index: store::Index) -> MResult<Opt | ||||
|             let update_type = UpdateType::Schema { | ||||
|                 schema: schema.clone(), | ||||
|             }; | ||||
|             let result = apply_schema_update(writer, index.main, &schema); | ||||
|             let result = apply_schema_update( | ||||
|                 writer, | ||||
|                 &schema, | ||||
|                 index.main, | ||||
|                 index.documents_fields, | ||||
|                 index.documents_fields_counts, | ||||
|                 index.postings_lists, | ||||
|                 index.docs_words, | ||||
|             ); | ||||
|  | ||||
|             (update_type, result, start.elapsed()) | ||||
|         } | ||||
| @@ -128,11 +136,6 @@ pub fn update_task(writer: &mut heed::RwTxn, index: store::Index) -> MResult<Opt | ||||
|         Update::DocumentsAddition(documents) => { | ||||
|             let start = Instant::now(); | ||||
|  | ||||
|             let ranked_map = match index.main.ranked_map(writer)? { | ||||
|                 Some(ranked_map) => ranked_map, | ||||
|                 None => RankedMap::default(), | ||||
|             }; | ||||
|  | ||||
|             let update_type = UpdateType::DocumentsAddition { | ||||
|                 number: documents.len(), | ||||
|             }; | ||||
| @@ -144,7 +147,6 @@ pub fn update_task(writer: &mut heed::RwTxn, index: store::Index) -> MResult<Opt | ||||
|                 index.documents_fields_counts, | ||||
|                 index.postings_lists, | ||||
|                 index.docs_words, | ||||
|                 ranked_map, | ||||
|                 documents, | ||||
|             ); | ||||
|  | ||||
| @@ -153,11 +155,6 @@ pub fn update_task(writer: &mut heed::RwTxn, index: store::Index) -> MResult<Opt | ||||
|         Update::DocumentsDeletion(documents) => { | ||||
|             let start = Instant::now(); | ||||
|  | ||||
|             let ranked_map = match index.main.ranked_map(writer)? { | ||||
|                 Some(ranked_map) => ranked_map, | ||||
|                 None => RankedMap::default(), | ||||
|             }; | ||||
|  | ||||
|             let update_type = UpdateType::DocumentsDeletion { | ||||
|                 number: documents.len(), | ||||
|             }; | ||||
| @@ -169,7 +166,6 @@ pub fn update_task(writer: &mut heed::RwTxn, index: store::Index) -> MResult<Opt | ||||
|                 index.documents_fields_counts, | ||||
|                 index.postings_lists, | ||||
|                 index.docs_words, | ||||
|                 ranked_map, | ||||
|                 documents, | ||||
|             ); | ||||
|  | ||||
|   | ||||
| @@ -1,19 +1,58 @@ | ||||
| use meilidb_schema::{Diff, Schema}; | ||||
|  | ||||
| use crate::update::documents_addition::reindex_all_documents; | ||||
| use crate::update::{next_update_id, Update}; | ||||
| use crate::{error::UnsupportedOperation, store, MResult}; | ||||
| use meilidb_schema::Schema; | ||||
|  | ||||
| pub fn apply_schema_update( | ||||
|     writer: &mut heed::RwTxn, | ||||
|     main_store: store::Main, | ||||
|     new_schema: &Schema, | ||||
|     main_store: store::Main, | ||||
|     documents_fields_store: store::DocumentsFields, | ||||
|     documents_fields_counts_store: store::DocumentsFieldsCounts, | ||||
|     postings_lists_store: store::PostingsLists, | ||||
|     docs_words_store: store::DocsWords, | ||||
| ) -> MResult<()> { | ||||
|     if main_store.schema(writer)?.is_some() { | ||||
|         return Err(UnsupportedOperation::SchemaAlreadyExists.into()); | ||||
|     use UnsupportedOperation::{ | ||||
|         CannotIntroduceNewSchemaAttribute, CannotRemoveSchemaAttribute, | ||||
|         CannotReorderSchemaAttribute, CannotUpdateSchemaIdentifier, | ||||
|     }; | ||||
|  | ||||
|     let mut need_full_reindexing = false; | ||||
|  | ||||
|     if let Some(old_schema) = main_store.schema(writer)? { | ||||
|         for diff in meilidb_schema::diff(&old_schema, new_schema) { | ||||
|             match diff { | ||||
|                 Diff::IdentChange { .. } => return Err(CannotUpdateSchemaIdentifier.into()), | ||||
|                 Diff::AttrMove { .. } => return Err(CannotReorderSchemaAttribute.into()), | ||||
|                 Diff::AttrPropsChange { old, new, .. } => { | ||||
|                     if new.indexed != old.indexed { | ||||
|                         need_full_reindexing = true; | ||||
|                     } | ||||
|                     if new.ranked != old.ranked { | ||||
|                         need_full_reindexing = true; | ||||
|                     } | ||||
|                 } | ||||
|                 Diff::NewAttr { .. } => return Err(CannotIntroduceNewSchemaAttribute.into()), | ||||
|                 Diff::RemovedAttr { .. } => return Err(CannotRemoveSchemaAttribute.into()), | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     main_store | ||||
|         .put_schema(writer, new_schema) | ||||
|         .map_err(Into::into) | ||||
|     main_store.put_schema(writer, new_schema)?; | ||||
|  | ||||
|     if need_full_reindexing { | ||||
|         reindex_all_documents( | ||||
|             writer, | ||||
|             main_store, | ||||
|             documents_fields_store, | ||||
|             documents_fields_counts_store, | ||||
|             postings_lists_store, | ||||
|             docs_words_store, | ||||
|         )? | ||||
|     } | ||||
|  | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| pub fn push_schema_update( | ||||
|   | ||||
		Reference in New Issue
	
	Block a user