mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-25 21:16:28 +00:00 
			
		
		
		
	Merge #4626
4626: Edit Documents with Rhai r=ManyTheFish a=Kerollmops This PR introduces a first version of [the _Update Documents with Function_ (internal)](https://www.notion.so/meilisearch/Update-Documents-by-Function-45f87b13e61c4435b73943768a490808). It uses [the Rhai programming language](https://rhai.rs/) to let users express the modifications they want apply. You can read more about the way to use this functions on [the Usage PRD Page](https://meilisearch.notion.site/Edit-Documents-with-Rhai-0cff8fea7655436592e7c8a6de932062?pvs=25). The [prototype is available](https://github.com/meilisearch/meilisearch/actions/runs/9038384483) through Docker by using the following command: ``` docker run -p 7700:7700 -v $(pwd)/meili_data:/meili_data getmeili/meilisearch:prototype-edit-documents-with-rhai-3 ``` ## TODO - [x] Support the `DocumentEdition` task in dumps. - [x] Remove the unwraps and panics. - [x] Improve error codes for the `function` parameter. - [x] [Update Rhai to v1.19.0](https://github.com/rhaiscript/rhai/releases/tag/v1.19.0) 🚀 - [x] Make it an experimental feature (only restrict the HTTP calls). - [x] It must be possible not to send a context. - [x] Rebase on main. - [x] Check that the script cannot do any io. - [x] ~Introduce a `Documents.edit` action or~ require the `Documents.all` action. - [x] Change the `editionCode` to the clearer `function` field name in the tasks. - [x] Support a user provided context and maybe more (but keep function execution isolated for reproducibility). - [x] Support deleting documents when the `doc` is `()` (nil, null). - [x] Support canceling document edition. - [x] Multithread document edition by using rayon (and [rayon-par-bridge](https://docs.rs/rayon-par-bridge/latest/rayon_par_bridge/)). - [x] Limit the number of instruction by function execution. - [ ] ~Expose the limit of instructions in the settings.~ Not sure, in fact. - [x] Ignore unmodified documents in the tasks count. - [x] Make the `filter` field optional (not forced to be `null`). Co-authored-by: Clément Renault <clement@meilisearch.com>
This commit is contained in:
		| @@ -5,6 +5,7 @@ use std::{io, str}; | ||||
|  | ||||
| use heed::{Error as HeedError, MdbError}; | ||||
| use rayon::ThreadPoolBuildError; | ||||
| use rhai::EvalAltResult; | ||||
| use serde_json::Value; | ||||
| use thiserror::Error; | ||||
|  | ||||
| @@ -259,6 +260,14 @@ only composed of alphanumeric characters (a-z A-Z 0-9), hyphens (-) and undersco | ||||
|     InvalidSettingsDimensions { embedder_name: String }, | ||||
|     #[error("`.embedders.{embedder_name}.url`: could not parse `{url}`: {inner_error}")] | ||||
|     InvalidUrl { embedder_name: String, inner_error: url::ParseError, url: String }, | ||||
|     #[error("Document editions cannot modify a document's primary key")] | ||||
|     DocumentEditionCannotModifyPrimaryKey, | ||||
|     #[error("Document editions must keep documents as objects")] | ||||
|     DocumentEditionDocumentMustBeObject, | ||||
|     #[error("Document edition runtime error encountered while running the function: {0}")] | ||||
|     DocumentEditionRuntimeError(Box<EvalAltResult>), | ||||
|     #[error("Document edition runtime error encountered while compiling the function: {0}")] | ||||
|     DocumentEditionCompilationError(rhai::ParseError), | ||||
| } | ||||
|  | ||||
| impl From<crate::vector::Error> for Error { | ||||
|   | ||||
| @@ -45,7 +45,7 @@ pub use search::new::{ | ||||
| }; | ||||
| use serde_json::Value; | ||||
| pub use thread_pool_no_abort::{PanicCatched, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder}; | ||||
| pub use {charabia as tokenizer, heed}; | ||||
| pub use {charabia as tokenizer, heed, rhai}; | ||||
|  | ||||
| pub use self::asc_desc::{AscDesc, AscDescError, Member, SortError}; | ||||
| pub use self::criterion::{default_criteria, Criterion, CriterionError}; | ||||
|   | ||||
| @@ -1,6 +1,7 @@ | ||||
| mod enrich; | ||||
| mod extract; | ||||
| mod helpers; | ||||
| mod parallel; | ||||
| mod transform; | ||||
| mod typed_chunk; | ||||
|  | ||||
| @@ -16,6 +17,8 @@ use grenad::{Merger, MergerBuilder}; | ||||
| use heed::types::Str; | ||||
| use heed::Database; | ||||
| use rand::SeedableRng; | ||||
| use rayon::iter::{ParallelBridge, ParallelIterator}; | ||||
| use rhai::{Dynamic, Engine, OptimizationLevel, Scope}; | ||||
| use roaring::RoaringBitmap; | ||||
| use serde::{Deserialize, Serialize}; | ||||
| use slice_group_by::GroupBy; | ||||
| @@ -32,15 +35,16 @@ pub use self::helpers::{ | ||||
| }; | ||||
| use self::helpers::{grenad_obkv_into_chunks, GrenadParameters}; | ||||
| pub use self::transform::{Transform, TransformOutput}; | ||||
| use crate::documents::{obkv_to_object, DocumentsBatchReader}; | ||||
| use crate::documents::{obkv_to_object, DocumentsBatchBuilder, DocumentsBatchReader}; | ||||
| use crate::error::{Error, InternalError, UserError}; | ||||
| use crate::thread_pool_no_abort::ThreadPoolNoAbortBuilder; | ||||
| pub use crate::update::index_documents::helpers::CursorClonableMmap; | ||||
| use crate::update::index_documents::parallel::ImmutableObkvs; | ||||
| use crate::update::{ | ||||
|     IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst, | ||||
| }; | ||||
| use crate::vector::EmbeddingConfigs; | ||||
| use crate::{CboRoaringBitmapCodec, Index, Result}; | ||||
| use crate::{CboRoaringBitmapCodec, Index, Object, Result}; | ||||
|  | ||||
| static MERGED_DATABASE_COUNT: usize = 7; | ||||
| static PREFIX_DATABASE_COUNT: usize = 4; | ||||
| @@ -172,6 +176,141 @@ where | ||||
|         Ok((self, Ok(indexed_documents))) | ||||
|     } | ||||
|  | ||||
|     #[tracing::instrument(level = "trace", skip_all, target = "indexing::documents")] | ||||
|     pub fn edit_documents( | ||||
|         self, | ||||
|         documents: &RoaringBitmap, | ||||
|         context: Option<Object>, | ||||
|         code: &str, | ||||
|     ) -> Result<(Self, StdResult<(u64, u64), UserError>)> { | ||||
|         // Early return when there is no document to edit | ||||
|         if documents.is_empty() { | ||||
|             return Ok((self, Ok((0, 0)))); | ||||
|         } | ||||
|  | ||||
|         fn rhaimap_to_object(map: rhai::Map) -> Object { | ||||
|             let mut output = Object::new(); | ||||
|             for (key, value) in map { | ||||
|                 let value = serde_json::to_value(&value).unwrap(); | ||||
|                 output.insert(key.into(), value); | ||||
|             } | ||||
|             output | ||||
|         } | ||||
|  | ||||
|         // Setup the security and limits of the Engine | ||||
|         let mut engine = Engine::new(); | ||||
|         engine.set_optimization_level(OptimizationLevel::Full); | ||||
|         engine.set_max_call_levels(1000); | ||||
|         // It is an arbitrary value. We need to let users define this in the settings. | ||||
|         engine.set_max_operations(1_000_000); | ||||
|         engine.set_max_variables(1000); | ||||
|         engine.set_max_functions(30); | ||||
|         engine.set_max_expr_depths(100, 1000); | ||||
|         engine.set_max_string_size(1024 * 1024 * 1024); // 1 GiB | ||||
|         engine.set_max_array_size(10_000); | ||||
|         engine.set_max_map_size(10_000); | ||||
|  | ||||
|         let ast = engine.compile(code).map_err(UserError::DocumentEditionCompilationError)?; | ||||
|         let fields_ids_map = self.index.fields_ids_map(self.wtxn)?; | ||||
|         let primary_key = self.index.primary_key(self.wtxn)?.unwrap(); | ||||
|         let mut documents_batch_builder = tempfile::tempfile().map(DocumentsBatchBuilder::new)?; | ||||
|         let mut documents_to_remove = RoaringBitmap::new(); | ||||
|  | ||||
|         let context: Option<Dynamic> = match context { | ||||
|             Some(context) => { | ||||
|                 Some(serde_json::from_value(context.into()).map_err(InternalError::SerdeJson)?) | ||||
|             } | ||||
|             None => None, | ||||
|         }; | ||||
|  | ||||
|         enum DocumentEdition { | ||||
|             Deleted(crate::DocumentId), | ||||
|             Edited(Object), | ||||
|             Nothing, | ||||
|         } | ||||
|  | ||||
|         let immutable_obkvs = ImmutableObkvs::new( | ||||
|             self.wtxn, | ||||
|             self.index.documents, | ||||
|             fields_ids_map.clone(), | ||||
|             documents.clone(), | ||||
|         )?; | ||||
|  | ||||
|         let processing = documents.into_iter().par_bridge().map(|docid| { | ||||
|             // safety: Both documents *must* exists in the database as | ||||
|             //         their IDs comes from the list of documents ids. | ||||
|             let rhai_document = immutable_obkvs.rhai_map(docid)?.unwrap(); | ||||
|             let json_document = immutable_obkvs.json_map(docid)?.unwrap(); | ||||
|             let document_id = &json_document[primary_key]; | ||||
|  | ||||
|             let mut scope = Scope::new(); | ||||
|             if let Some(context) = context.as_ref().cloned() { | ||||
|                 scope.push_constant_dynamic("context", context.clone()); | ||||
|             } | ||||
|             scope.push("doc", rhai_document); | ||||
|             // That's were the magic happens. We run the user script | ||||
|             // which edits "doc" scope variable reprensenting the document | ||||
|             // and ignore the output and even the type of it, i.e., Dynamic. | ||||
|             let _ = engine | ||||
|                 .eval_ast_with_scope::<Dynamic>(&mut scope, &ast) | ||||
|                 .map_err(UserError::DocumentEditionRuntimeError)?; | ||||
|  | ||||
|             match scope.remove::<Dynamic>("doc") { | ||||
|                 // If the "doc" variable has set to (), we effectively delete the document. | ||||
|                 Some(doc) if doc.is_unit() => Ok(DocumentEdition::Deleted(docid)), | ||||
|                 None => unreachable!("missing doc variable from the Rhai scope"), | ||||
|                 Some(document) => match document.try_cast() { | ||||
|                     Some(document) => { | ||||
|                         let new_document = rhaimap_to_object(document); | ||||
|                         // Note: This condition is not perfect. Sometimes it detect changes | ||||
|                         //       like with floating points numbers and consider updating | ||||
|                         //       the document even if nothing actually changed. | ||||
|                         if json_document != new_document { | ||||
|                             if Some(document_id) != new_document.get(primary_key) { | ||||
|                                 Err(Error::UserError( | ||||
|                                     UserError::DocumentEditionCannotModifyPrimaryKey, | ||||
|                                 )) | ||||
|                             } else { | ||||
|                                 Ok(DocumentEdition::Edited(new_document)) | ||||
|                             } | ||||
|                         } else { | ||||
|                             Ok(DocumentEdition::Nothing) | ||||
|                         } | ||||
|                     } | ||||
|                     None => Err(Error::UserError(UserError::DocumentEditionDocumentMustBeObject)), | ||||
|                 }, | ||||
|             } | ||||
|         }); | ||||
|  | ||||
|         rayon_par_bridge::par_bridge(100, processing, |iterator| { | ||||
|             for result in iterator { | ||||
|                 if (self.should_abort)() { | ||||
|                     return Err(Error::InternalError(InternalError::AbortedIndexation)); | ||||
|                 } | ||||
|  | ||||
|                 match result? { | ||||
|                     DocumentEdition::Deleted(docid) => { | ||||
|                         documents_to_remove.push(docid); | ||||
|                     } | ||||
|                     DocumentEdition::Edited(new_document) => { | ||||
|                         documents_batch_builder.append_json_object(&new_document)?; | ||||
|                     } | ||||
|                     DocumentEdition::Nothing => (), | ||||
|                 } | ||||
|             } | ||||
|  | ||||
|             Ok(()) | ||||
|         })?; | ||||
|  | ||||
|         let file = documents_batch_builder.into_inner()?; | ||||
|         let reader = DocumentsBatchReader::from_reader(file)?; | ||||
|  | ||||
|         let (this, removed) = self.remove_documents_from_db_no_batch(&documents_to_remove)?; | ||||
|         let (this, result) = this.add_documents(reader)?; | ||||
|  | ||||
|         Ok((this, result.map(|added| (removed, added)))) | ||||
|     } | ||||
|  | ||||
|     pub fn with_embedders(mut self, embedders: EmbeddingConfigs) -> Self { | ||||
|         self.embedders = embedders; | ||||
|         self | ||||
|   | ||||
							
								
								
									
										86
									
								
								milli/src/update/index_documents/parallel.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										86
									
								
								milli/src/update/index_documents/parallel.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,86 @@ | ||||
| use heed::types::Bytes; | ||||
| use heed::{Database, RoTxn}; | ||||
| use obkv::KvReaderU16; | ||||
| use roaring::RoaringBitmap; | ||||
|  | ||||
| use crate::{all_obkv_to_json, DocumentId, FieldsIdsMap, Object, ObkvCodec, Result, BEU32}; | ||||
|  | ||||
| pub struct ImmutableObkvs<'t> { | ||||
|     ids: RoaringBitmap, | ||||
|     fields_ids_map: FieldsIdsMap, | ||||
|     slices: Vec<&'t [u8]>, | ||||
| } | ||||
|  | ||||
| impl<'t> ImmutableObkvs<'t> { | ||||
|     /// Creates the structure by fetching all the OBKVs | ||||
|     /// and keeping the transaction making the pointers valid. | ||||
|     pub fn new( | ||||
|         rtxn: &'t RoTxn, | ||||
|         documents_database: Database<BEU32, ObkvCodec>, | ||||
|         fields_ids_map: FieldsIdsMap, | ||||
|         subset: RoaringBitmap, | ||||
|     ) -> heed::Result<Self> { | ||||
|         let mut slices = Vec::new(); | ||||
|         let documents_database = documents_database.remap_data_type::<Bytes>(); | ||||
|         for docid in &subset { | ||||
|             let slice = documents_database.get(rtxn, &docid)?.unwrap(); | ||||
|             slices.push(slice); | ||||
|         } | ||||
|  | ||||
|         Ok(ImmutableObkvs { ids: subset, fields_ids_map, slices }) | ||||
|     } | ||||
|  | ||||
|     /// Returns the OBKVs identified by the given ID. | ||||
|     pub fn obkv(&self, docid: DocumentId) -> heed::Result<Option<KvReaderU16<'t>>> { | ||||
|         match self | ||||
|             .ids | ||||
|             .rank(docid) | ||||
|             .checked_sub(1) | ||||
|             .and_then(|offset| self.slices.get(offset as usize)) | ||||
|         { | ||||
|             Some(bytes) => Ok(Some(KvReaderU16::new(bytes))), | ||||
|             None => Ok(None), | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /// Returns the owned rhai::Map identified by the given ID. | ||||
|     pub fn rhai_map(&self, docid: DocumentId) -> Result<Option<rhai::Map>> { | ||||
|         let obkv = match self.obkv(docid) { | ||||
|             Ok(Some(obkv)) => obkv, | ||||
|             Ok(None) => return Ok(None), | ||||
|             Err(e) => return Err(e.into()), | ||||
|         }; | ||||
|  | ||||
|         let all_keys = obkv.iter().map(|(k, _v)| k).collect::<Vec<_>>(); | ||||
|         let map: Result<rhai::Map> = all_keys | ||||
|             .iter() | ||||
|             .copied() | ||||
|             .flat_map(|id| obkv.get(id).map(|value| (id, value))) | ||||
|             .map(|(id, value)| { | ||||
|                 let name = self.fields_ids_map.name(id).ok_or( | ||||
|                     crate::error::FieldIdMapMissingEntry::FieldId { | ||||
|                         field_id: id, | ||||
|                         process: "all_obkv_to_rhaimap", | ||||
|                     }, | ||||
|                 )?; | ||||
|                 let value = serde_json::from_slice(value) | ||||
|                     .map_err(crate::error::InternalError::SerdeJson)?; | ||||
|                 Ok((name.into(), value)) | ||||
|             }) | ||||
|             .collect(); | ||||
|  | ||||
|         map.map(Some) | ||||
|     } | ||||
|  | ||||
|     pub fn json_map(&self, docid: DocumentId) -> Result<Option<Object>> { | ||||
|         let obkv = match self.obkv(docid) { | ||||
|             Ok(Some(obkv)) => obkv, | ||||
|             Ok(None) => return Ok(None), | ||||
|             Err(e) => return Err(e.into()), | ||||
|         }; | ||||
|  | ||||
|         all_obkv_to_json(obkv, &self.fields_ids_map).map(Some) | ||||
|     } | ||||
| } | ||||
|  | ||||
| unsafe impl Sync for ImmutableObkvs<'_> {} | ||||
		Reference in New Issue
	
	Block a user