mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-30 23:46:28 +00:00 
			
		
		
		
	Merge #4900
4900: Indexer edition 2024 r=Kerollmops a=dureuill This PR is implementing the indexer edition 2024, largely inspired by [the ideas from this blog post](https://blog.kerollmops.com/meilisearch-is-too-slow). Fixes https://github.com/meilisearch/meilisearch/issues/4985 ## Features - Stream-first approach to reading documents. - Minimum disk write operations. - RAM usage-first approach to avoid modifying common bitmaps on disk but in memory. - Reduced LMDB fragmentation by writing entries only once... - ...computing the final version of the entries in parallel... - ...and storing them in write-optimized data structures before sending them to the BTree (LMDB). - Indexing in multiple transactions to improve large dataset support (dumps). Co-authored-by: ManyTheFish <many@meilisearch.com> Co-authored-by: Clément Renault <clement@meilisearch.com> Co-authored-by: Louis Dureuil <louis@meilisearch.com>
This commit is contained in:
		| @@ -22,6 +22,7 @@ flate2 = "1.0.30" | ||||
| meilisearch-auth = { path = "../meilisearch-auth" } | ||||
| meilisearch-types = { path = "../meilisearch-types" } | ||||
| page_size = "0.6.0" | ||||
| raw-collections = { git = "https://github.com/meilisearch/raw-collections.git", version = "0.1.0" } | ||||
| rayon = "1.10.0" | ||||
| roaring = { version = "0.10.6", features = ["serde"] } | ||||
| serde = { version = "1.0.204", features = ["derive"] } | ||||
| @@ -29,6 +30,7 @@ serde_json = { version = "1.0.120", features = ["preserve_order"] } | ||||
| synchronoise = "1.0.1" | ||||
| tempfile = "3.10.1" | ||||
| thiserror = "1.0.61" | ||||
| memmap2 = "0.9.4" | ||||
| time = { version = "0.3.36", features = [ | ||||
|     "serde-well-known", | ||||
|     "formatting", | ||||
| @@ -38,6 +40,7 @@ time = { version = "0.3.36", features = [ | ||||
| tracing = "0.1.40" | ||||
| ureq = "2.10.0" | ||||
| uuid = { version = "1.10.0", features = ["serde", "v4"] } | ||||
| bumpalo = "3.16.0" | ||||
|  | ||||
| [dev-dependencies] | ||||
| arroy = "0.5.0" | ||||
|   | ||||
| @@ -22,22 +22,27 @@ use std::ffi::OsStr; | ||||
| use std::fmt; | ||||
| use std::fs::{self, File}; | ||||
| use std::io::BufWriter; | ||||
| use std::sync::atomic::{self, AtomicU64}; | ||||
| use std::time::Duration; | ||||
|  | ||||
| use bumpalo::collections::CollectIn; | ||||
| use bumpalo::Bump; | ||||
| use dump::IndexMetadata; | ||||
| use meilisearch_types::batches::BatchId; | ||||
| use meilisearch_types::error::Code; | ||||
| use meilisearch_types::heed::{RoTxn, RwTxn}; | ||||
| use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader}; | ||||
| use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader, PrimaryKey}; | ||||
| use meilisearch_types::milli::heed::CompactionOption; | ||||
| use meilisearch_types::milli::update::{ | ||||
|     IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings as MilliSettings, | ||||
| }; | ||||
| use meilisearch_types::milli::update::new::indexer::{self, UpdateByFunction}; | ||||
| use meilisearch_types::milli::update::{IndexDocumentsMethod, Settings as MilliSettings}; | ||||
| use meilisearch_types::milli::vector::parsed_vectors::{ | ||||
|     ExplicitVectors, VectorOrArrayOfVectors, RESERVED_VECTORS_FIELD_NAME, | ||||
| }; | ||||
| use meilisearch_types::milli::{self, Filter, Object}; | ||||
| use meilisearch_types::milli::{self, Filter, ThreadPoolNoAbortBuilder}; | ||||
| use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked}; | ||||
| use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task}; | ||||
| use meilisearch_types::tasks::{ | ||||
|     Details, IndexSwap, Kind, KindWithContent, Status, Task, TaskProgress, | ||||
| }; | ||||
| use meilisearch_types::{compression, Index, VERSION_FILE_NAME}; | ||||
| use roaring::RoaringBitmap; | ||||
| use time::macros::format_description; | ||||
| @@ -46,7 +51,7 @@ use uuid::Uuid; | ||||
|  | ||||
| use crate::autobatcher::{self, BatchKind}; | ||||
| use crate::utils::{self, swap_index_uid_in_task, ProcessingBatch}; | ||||
| use crate::{Error, IndexScheduler, MustStopProcessing, Result, TaskId}; | ||||
| use crate::{Error, IndexScheduler, Result, TaskId}; | ||||
|  | ||||
| /// Represents a combination of tasks that can all be processed at the same time. | ||||
| /// | ||||
| @@ -886,10 +891,8 @@ impl IndexScheduler { | ||||
|                             while let Some(doc) = | ||||
|                                 cursor.next_document().map_err(milli::Error::from)? | ||||
|                             { | ||||
|                                 dump_content_file.push_document(&obkv_to_object( | ||||
|                                     &doc, | ||||
|                                     &documents_batch_index, | ||||
|                                 )?)?; | ||||
|                                 dump_content_file | ||||
|                                     .push_document(&obkv_to_object(doc, &documents_batch_index)?)?; | ||||
|                             } | ||||
|                             dump_content_file.flush()?; | ||||
|                         } | ||||
| @@ -1232,6 +1235,44 @@ impl IndexScheduler { | ||||
|         index: &'i Index, | ||||
|         operation: IndexOperation, | ||||
|     ) -> Result<Vec<Task>> { | ||||
|         let indexer_alloc = Bump::new(); | ||||
|  | ||||
|         let started_processing_at = std::time::Instant::now(); | ||||
|         let secs_since_started_processing_at = AtomicU64::new(0); | ||||
|         const PRINT_SECS_DELTA: u64 = 5; | ||||
|  | ||||
|         let processing_tasks = self.processing_tasks.clone(); | ||||
|         let must_stop_processing = self.must_stop_processing.clone(); | ||||
|         let send_progress = |progress| { | ||||
|             let now = std::time::Instant::now(); | ||||
|             let elapsed = secs_since_started_processing_at.load(atomic::Ordering::Relaxed); | ||||
|             let previous = started_processing_at + Duration::from_secs(elapsed); | ||||
|             let elapsed = now - previous; | ||||
|  | ||||
|             if elapsed.as_secs() < PRINT_SECS_DELTA { | ||||
|                 return; | ||||
|             } | ||||
|  | ||||
|             secs_since_started_processing_at | ||||
|                 .store((now - started_processing_at).as_secs(), atomic::Ordering::Relaxed); | ||||
|  | ||||
|             let TaskProgress { | ||||
|                 current_step, | ||||
|                 finished_steps, | ||||
|                 total_steps, | ||||
|                 finished_substeps, | ||||
|                 total_substeps, | ||||
|             } = processing_tasks.write().unwrap().update_progress(progress); | ||||
|  | ||||
|             tracing::info!( | ||||
|                 current_step, | ||||
|                 finished_steps, | ||||
|                 total_steps, | ||||
|                 finished_substeps, | ||||
|                 total_substeps | ||||
|             ); | ||||
|         }; | ||||
|  | ||||
|         match operation { | ||||
|             IndexOperation::DocumentClear { mut tasks, .. } => { | ||||
|                 let count = milli::update::ClearDocuments::new(index_wtxn, index).execute()?; | ||||
| @@ -1261,155 +1302,144 @@ impl IndexScheduler { | ||||
|                 operations, | ||||
|                 mut tasks, | ||||
|             } => { | ||||
|                 let started_processing_at = std::time::Instant::now(); | ||||
|                 let mut primary_key_has_been_set = false; | ||||
|                 let must_stop_processing = self.must_stop_processing.clone(); | ||||
|                 let indexer_config = self.index_mapper.indexer_config(); | ||||
|  | ||||
|                 if let Some(primary_key) = primary_key { | ||||
|                     match index.primary_key(index_wtxn)? { | ||||
|                         // if a primary key was set AND had already been defined in the index | ||||
|                         // but to a different value, we can make the whole batch fail. | ||||
|                         Some(pk) => { | ||||
|                             if primary_key != pk { | ||||
|                                 return Err(milli::Error::from( | ||||
|                                     milli::UserError::PrimaryKeyCannotBeChanged(pk.to_string()), | ||||
|                                 ) | ||||
|                                 .into()); | ||||
|                             } | ||||
|                         } | ||||
|                         // if the primary key was set and there was no primary key set for this index | ||||
|                         // we set it to the received value before starting the indexing process. | ||||
|                         None => { | ||||
|                             let mut builder = | ||||
|                                 milli::update::Settings::new(index_wtxn, index, indexer_config); | ||||
|                             builder.set_primary_key(primary_key); | ||||
|                             builder.execute( | ||||
|                                 |indexing_step| tracing::debug!(update = ?indexing_step), | ||||
|                                 || must_stop_processing.clone().get(), | ||||
|                             )?; | ||||
|                             primary_key_has_been_set = true; | ||||
|                 // TODO: at some point, for better efficiency we might want to reuse the bumpalo for successive batches. | ||||
|                 // this is made difficult by the fact we're doing private clones of the index scheduler and sending it | ||||
|                 // to a fresh thread. | ||||
|                 let mut content_files = Vec::new(); | ||||
|                 for operation in &operations { | ||||
|                     if let DocumentOperation::Add(content_uuid) = operation { | ||||
|                         let content_file = self.file_store.get_update(*content_uuid)?; | ||||
|                         let mmap = unsafe { memmap2::Mmap::map(&content_file)? }; | ||||
|                         if !mmap.is_empty() { | ||||
|                             content_files.push(mmap); | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|  | ||||
|                 let config = IndexDocumentsConfig { update_method: method, ..Default::default() }; | ||||
|                 let rtxn = index.read_txn()?; | ||||
|                 let db_fields_ids_map = index.fields_ids_map(&rtxn)?; | ||||
|                 let mut new_fields_ids_map = db_fields_ids_map.clone(); | ||||
|  | ||||
|                 let embedder_configs = index.embedding_configs(index_wtxn)?; | ||||
|                 // TODO: consider Arc'ing the map too (we only need read access + we'll be cloning it multiple times, so really makes sense) | ||||
|                 let embedders = self.embedders(embedder_configs)?; | ||||
|  | ||||
|                 let mut builder = milli::update::IndexDocuments::new( | ||||
|                     index_wtxn, | ||||
|                     index, | ||||
|                     indexer_config, | ||||
|                     config, | ||||
|                     |indexing_step| tracing::trace!(?indexing_step, "Update"), | ||||
|                     || must_stop_processing.get(), | ||||
|                 )?; | ||||
|  | ||||
|                 for (operation, task) in operations.into_iter().zip(tasks.iter_mut()) { | ||||
|                 let mut content_files_iter = content_files.iter(); | ||||
|                 let mut indexer = indexer::DocumentOperation::new(method); | ||||
|                 let embedders = index.embedding_configs(index_wtxn)?; | ||||
|                 let embedders = self.embedders(embedders)?; | ||||
|                 for operation in operations { | ||||
|                     match operation { | ||||
|                         DocumentOperation::Add(content_uuid) => { | ||||
|                             let content_file = self.file_store.get_update(content_uuid)?; | ||||
|                             let reader = DocumentsBatchReader::from_reader(content_file) | ||||
|                                 .map_err(milli::Error::from)?; | ||||
|                             let (new_builder, user_result) = builder.add_documents(reader)?; | ||||
|                             builder = new_builder; | ||||
|  | ||||
|                             builder = builder.with_embedders(embedders.clone()); | ||||
|  | ||||
|                             let received_documents = | ||||
|                                 if let Some(Details::DocumentAdditionOrUpdate { | ||||
|                                     received_documents, | ||||
|                                     .. | ||||
|                                 }) = task.details | ||||
|                                 { | ||||
|                                     received_documents | ||||
|                                 } else { | ||||
|                                     // In the case of a `documentAdditionOrUpdate` the details MUST be set | ||||
|                                     unreachable!(); | ||||
|                                 }; | ||||
|  | ||||
|                             match user_result { | ||||
|                                 Ok(count) => { | ||||
|                                     task.status = Status::Succeeded; | ||||
|                                     task.details = Some(Details::DocumentAdditionOrUpdate { | ||||
|                                         received_documents, | ||||
|                                         indexed_documents: Some(count), | ||||
|                                     }) | ||||
|                                 } | ||||
|                                 Err(e) => { | ||||
|                                     task.status = Status::Failed; | ||||
|                                     task.details = Some(Details::DocumentAdditionOrUpdate { | ||||
|                                         received_documents, | ||||
|                                         indexed_documents: Some(0), | ||||
|                                     }); | ||||
|                                     task.error = Some(milli::Error::from(e).into()); | ||||
|                                 } | ||||
|                             } | ||||
|                         DocumentOperation::Add(_content_uuid) => { | ||||
|                             let mmap = content_files_iter.next().unwrap(); | ||||
|                             indexer.add_documents(mmap)?; | ||||
|                         } | ||||
|                         DocumentOperation::Delete(document_ids) => { | ||||
|                             let (new_builder, user_result) = | ||||
|                                 builder.remove_documents(document_ids)?; | ||||
|                             builder = new_builder; | ||||
|                             // Uses Invariant: remove documents actually always returns Ok for the inner result | ||||
|                             let count = user_result.unwrap(); | ||||
|                             let provided_ids = | ||||
|                                 if let Some(Details::DocumentDeletion { provided_ids, .. }) = | ||||
|                                     task.details | ||||
|                                 { | ||||
|                                     provided_ids | ||||
|                                 } else { | ||||
|                                     // In the case of a `documentAdditionOrUpdate` the details MUST be set | ||||
|                                     unreachable!(); | ||||
|                                 }; | ||||
|  | ||||
|                             task.status = Status::Succeeded; | ||||
|                             task.details = Some(Details::DocumentDeletion { | ||||
|                                 provided_ids, | ||||
|                                 deleted_documents: Some(count), | ||||
|                             }); | ||||
|                             let document_ids: bumpalo::collections::vec::Vec<_> = document_ids | ||||
|                                 .iter() | ||||
|                                 .map(|s| &*indexer_alloc.alloc_str(s)) | ||||
|                                 .collect_in(&indexer_alloc); | ||||
|                             indexer.delete_documents(document_ids.into_bump_slice()); | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|  | ||||
|                 if !tasks.iter().all(|res| res.error.is_some()) { | ||||
|                     let addition = builder.execute()?; | ||||
|                 let local_pool; | ||||
|                 let indexer_config = self.index_mapper.indexer_config(); | ||||
|                 let pool = match &indexer_config.thread_pool { | ||||
|                     Some(pool) => pool, | ||||
|                     None => { | ||||
|                         local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); | ||||
|                         &local_pool | ||||
|                     } | ||||
|                 }; | ||||
|  | ||||
|                 let (document_changes, operation_stats, primary_key) = indexer.into_changes( | ||||
|                     &indexer_alloc, | ||||
|                     index, | ||||
|                     &rtxn, | ||||
|                     primary_key.as_deref(), | ||||
|                     &mut new_fields_ids_map, | ||||
|                     &|| must_stop_processing.get(), | ||||
|                     &send_progress, | ||||
|                 )?; | ||||
|  | ||||
|                 let mut addition = 0; | ||||
|                 for (stats, task) in operation_stats.into_iter().zip(&mut tasks) { | ||||
|                     addition += stats.document_count; | ||||
|                     match stats.error { | ||||
|                         Some(error) => { | ||||
|                             task.status = Status::Failed; | ||||
|                             task.error = Some(milli::Error::UserError(error).into()); | ||||
|                         } | ||||
|                         None => task.status = Status::Succeeded, | ||||
|                     } | ||||
|  | ||||
|                     task.details = match task.details { | ||||
|                         Some(Details::DocumentAdditionOrUpdate { received_documents, .. }) => { | ||||
|                             Some(Details::DocumentAdditionOrUpdate { | ||||
|                                 received_documents, | ||||
|                                 indexed_documents: Some(stats.document_count), | ||||
|                             }) | ||||
|                         } | ||||
|                         Some(Details::DocumentDeletion { provided_ids, .. }) => { | ||||
|                             Some(Details::DocumentDeletion { | ||||
|                                 provided_ids, | ||||
|                                 deleted_documents: Some(stats.document_count), | ||||
|                             }) | ||||
|                         } | ||||
|                         _ => { | ||||
|                             // In the case of a `documentAdditionOrUpdate` or `DocumentDeletion` | ||||
|                             // the details MUST be set to either addition or deletion | ||||
|                             unreachable!(); | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|  | ||||
|                 if tasks.iter().any(|res| res.error.is_none()) { | ||||
|                     pool.install(|| { | ||||
|                         indexer::index( | ||||
|                             index_wtxn, | ||||
|                             index, | ||||
|                             indexer_config.grenad_parameters(), | ||||
|                             &db_fields_ids_map, | ||||
|                             new_fields_ids_map, | ||||
|                             primary_key, | ||||
|                             &document_changes, | ||||
|                             embedders, | ||||
|                             &|| must_stop_processing.get(), | ||||
|                             &send_progress, | ||||
|                         ) | ||||
|                     }) | ||||
|                     .unwrap()?; | ||||
|  | ||||
|                     tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done"); | ||||
|                 } else if primary_key_has_been_set { | ||||
|                     // Everything failed but we've set a primary key. | ||||
|                     // We need to remove it. | ||||
|                     let mut builder = | ||||
|                         milli::update::Settings::new(index_wtxn, index, indexer_config); | ||||
|                     builder.reset_primary_key(); | ||||
|                     builder.execute( | ||||
|                         |indexing_step| tracing::trace!(update = ?indexing_step), | ||||
|                         || must_stop_processing.clone().get(), | ||||
|                     )?; | ||||
|                 } | ||||
|  | ||||
|                 Ok(tasks) | ||||
|             } | ||||
|             IndexOperation::DocumentEdition { mut task, .. } => { | ||||
|                 let (filter, context, function) = | ||||
|                     if let KindWithContent::DocumentEdition { | ||||
|                         filter_expr, context, function, .. | ||||
|                     } = &task.kind | ||||
|                     { | ||||
|                         (filter_expr, context, function) | ||||
|                     } else { | ||||
|                         unreachable!() | ||||
|                     }; | ||||
|                 let result_count = edit_documents_by_function( | ||||
|                     index_wtxn, | ||||
|                     filter, | ||||
|                     context.clone(), | ||||
|                 let (filter, code) = if let KindWithContent::DocumentEdition { | ||||
|                     filter_expr, | ||||
|                     context: _, | ||||
|                     function, | ||||
|                     self.index_mapper.indexer_config(), | ||||
|                     self.must_stop_processing.clone(), | ||||
|                     index, | ||||
|                 ); | ||||
|                     .. | ||||
|                 } = &task.kind | ||||
|                 { | ||||
|                     (filter_expr, function) | ||||
|                 } else { | ||||
|                     unreachable!() | ||||
|                 }; | ||||
|  | ||||
|                 let candidates = match filter.as_ref().map(Filter::from_json) { | ||||
|                     Some(Ok(Some(filter))) => { | ||||
|                         filter.evaluate(index_wtxn, index).map_err(|err| match err { | ||||
|                             milli::Error::UserError(milli::UserError::InvalidFilter(_)) => { | ||||
|                                 Error::from(err).with_custom_error_code(Code::InvalidDocumentFilter) | ||||
|                             } | ||||
|                             e => e.into(), | ||||
|                         })? | ||||
|                     } | ||||
|                     None | Some(Ok(None)) => index.documents_ids(index_wtxn)?, | ||||
|                     Some(Err(e)) => return Err(e.into()), | ||||
|                 }; | ||||
|  | ||||
|                 let (original_filter, context, function) = if let Some(Details::DocumentEdition { | ||||
|                     original_filter, | ||||
|                     context, | ||||
| @@ -1423,6 +1453,68 @@ impl IndexScheduler { | ||||
|                     unreachable!(); | ||||
|                 }; | ||||
|  | ||||
|                 if candidates.is_empty() { | ||||
|                     task.status = Status::Succeeded; | ||||
|                     task.details = Some(Details::DocumentEdition { | ||||
|                         original_filter, | ||||
|                         context, | ||||
|                         function, | ||||
|                         deleted_documents: Some(0), | ||||
|                         edited_documents: Some(0), | ||||
|                     }); | ||||
|  | ||||
|                     return Ok(vec![task]); | ||||
|                 } | ||||
|  | ||||
|                 let rtxn = index.read_txn()?; | ||||
|                 let db_fields_ids_map = index.fields_ids_map(&rtxn)?; | ||||
|                 let mut new_fields_ids_map = db_fields_ids_map.clone(); | ||||
|                 // candidates not empty => index not empty => a primary key is set | ||||
|                 let primary_key = index.primary_key(&rtxn)?.unwrap(); | ||||
|  | ||||
|                 let primary_key = PrimaryKey::new_or_insert(primary_key, &mut new_fields_ids_map) | ||||
|                     .map_err(milli::Error::from)?; | ||||
|  | ||||
|                 let result_count = Ok((candidates.len(), candidates.len())) as Result<_>; | ||||
|  | ||||
|                 if task.error.is_none() { | ||||
|                     let local_pool; | ||||
|                     let indexer_config = self.index_mapper.indexer_config(); | ||||
|                     let pool = match &indexer_config.thread_pool { | ||||
|                         Some(pool) => pool, | ||||
|                         None => { | ||||
|                             local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); | ||||
|                             &local_pool | ||||
|                         } | ||||
|                     }; | ||||
|  | ||||
|                     pool.install(|| { | ||||
|                         let indexer = | ||||
|                             UpdateByFunction::new(candidates, context.clone(), code.clone()); | ||||
|                         let document_changes = indexer.into_changes(&primary_key)?; | ||||
|                         let embedders = index.embedding_configs(index_wtxn)?; | ||||
|                         let embedders = self.embedders(embedders)?; | ||||
|  | ||||
|                         indexer::index( | ||||
|                             index_wtxn, | ||||
|                             index, | ||||
|                             indexer_config.grenad_parameters(), | ||||
|                             &db_fields_ids_map, | ||||
|                             new_fields_ids_map, | ||||
|                             None, // cannot change primary key in DocumentEdition | ||||
|                             &document_changes, | ||||
|                             embedders, | ||||
|                             &|| must_stop_processing.get(), | ||||
|                             &send_progress, | ||||
|                         )?; | ||||
|  | ||||
|                         Result::Ok(()) | ||||
|                     }) | ||||
|                     .unwrap()?; | ||||
|  | ||||
|                     // tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done"); | ||||
|                 } | ||||
|  | ||||
|                 match result_count { | ||||
|                     Ok((deleted_documents, edited_documents)) => { | ||||
|                         task.status = Status::Succeeded; | ||||
| @@ -1523,26 +1615,55 @@ impl IndexScheduler { | ||||
|                     } | ||||
|                 } | ||||
|  | ||||
|                 let config = IndexDocumentsConfig { | ||||
|                     update_method: IndexDocumentsMethod::ReplaceDocuments, | ||||
|                     ..Default::default() | ||||
|                 }; | ||||
|                 if to_delete.is_empty() { | ||||
|                     return Ok(tasks); | ||||
|                 } | ||||
|  | ||||
|                 let must_stop_processing = self.must_stop_processing.clone(); | ||||
|                 let mut builder = milli::update::IndexDocuments::new( | ||||
|                     index_wtxn, | ||||
|                     index, | ||||
|                     self.index_mapper.indexer_config(), | ||||
|                     config, | ||||
|                     |indexing_step| tracing::debug!(update = ?indexing_step), | ||||
|                     || must_stop_processing.get(), | ||||
|                 )?; | ||||
|                 let rtxn = index.read_txn()?; | ||||
|                 let db_fields_ids_map = index.fields_ids_map(&rtxn)?; | ||||
|                 let mut new_fields_ids_map = db_fields_ids_map.clone(); | ||||
|  | ||||
|                 let (new_builder, _count) = | ||||
|                     builder.remove_documents_from_db_no_batch(&to_delete)?; | ||||
|                 builder = new_builder; | ||||
|                 // to_delete not empty => index not empty => primary key set | ||||
|                 let primary_key = index.primary_key(&rtxn)?.unwrap(); | ||||
|  | ||||
|                 let _ = builder.execute()?; | ||||
|                 let primary_key = PrimaryKey::new_or_insert(primary_key, &mut new_fields_ids_map) | ||||
|                     .map_err(milli::Error::from)?; | ||||
|  | ||||
|                 if !tasks.iter().all(|res| res.error.is_some()) { | ||||
|                     let local_pool; | ||||
|                     let indexer_config = self.index_mapper.indexer_config(); | ||||
|                     let pool = match &indexer_config.thread_pool { | ||||
|                         Some(pool) => pool, | ||||
|                         None => { | ||||
|                             local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); | ||||
|                             &local_pool | ||||
|                         } | ||||
|                     }; | ||||
|  | ||||
|                     let mut indexer = indexer::DocumentDeletion::new(); | ||||
|                     indexer.delete_documents_by_docids(to_delete); | ||||
|                     let document_changes = indexer.into_changes(&indexer_alloc, primary_key); | ||||
|                     let embedders = index.embedding_configs(index_wtxn)?; | ||||
|                     let embedders = self.embedders(embedders)?; | ||||
|  | ||||
|                     pool.install(|| { | ||||
|                         indexer::index( | ||||
|                             index_wtxn, | ||||
|                             index, | ||||
|                             indexer_config.grenad_parameters(), | ||||
|                             &db_fields_ids_map, | ||||
|                             new_fields_ids_map, | ||||
|                             None, // document deletion never changes primary key | ||||
|                             &document_changes, | ||||
|                             embedders, | ||||
|                             &|| must_stop_processing.get(), | ||||
|                             &send_progress, | ||||
|                         ) | ||||
|                     }) | ||||
|                     .unwrap()?; | ||||
|  | ||||
|                     // tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done"); | ||||
|                 } | ||||
|  | ||||
|                 Ok(tasks) | ||||
|             } | ||||
| @@ -1560,7 +1681,6 @@ impl IndexScheduler { | ||||
|                     task.status = Status::Succeeded; | ||||
|                 } | ||||
|  | ||||
|                 let must_stop_processing = self.must_stop_processing.clone(); | ||||
|                 builder.execute( | ||||
|                     |indexing_step| tracing::debug!(update = ?indexing_step), | ||||
|                     || must_stop_processing.get(), | ||||
| @@ -1784,44 +1904,3 @@ impl IndexScheduler { | ||||
|         Ok(tasks) | ||||
|     } | ||||
| } | ||||
|  | ||||
| fn edit_documents_by_function<'a>( | ||||
|     wtxn: &mut RwTxn<'a>, | ||||
|     filter: &Option<serde_json::Value>, | ||||
|     context: Option<Object>, | ||||
|     code: &str, | ||||
|     indexer_config: &IndexerConfig, | ||||
|     must_stop_processing: MustStopProcessing, | ||||
|     index: &'a Index, | ||||
| ) -> Result<(u64, u64)> { | ||||
|     let candidates = match filter.as_ref().map(Filter::from_json) { | ||||
|         Some(Ok(Some(filter))) => filter.evaluate(wtxn, index).map_err(|err| match err { | ||||
|             milli::Error::UserError(milli::UserError::InvalidFilter(_)) => { | ||||
|                 Error::from(err).with_custom_error_code(Code::InvalidDocumentFilter) | ||||
|             } | ||||
|             e => e.into(), | ||||
|         })?, | ||||
|         None | Some(Ok(None)) => index.documents_ids(wtxn)?, | ||||
|         Some(Err(e)) => return Err(e.into()), | ||||
|     }; | ||||
|  | ||||
|     let config = IndexDocumentsConfig { | ||||
|         update_method: IndexDocumentsMethod::ReplaceDocuments, | ||||
|         ..Default::default() | ||||
|     }; | ||||
|  | ||||
|     let mut builder = milli::update::IndexDocuments::new( | ||||
|         wtxn, | ||||
|         index, | ||||
|         indexer_config, | ||||
|         config, | ||||
|         |indexing_step| tracing::debug!(update = ?indexing_step), | ||||
|         || must_stop_processing.get(), | ||||
|     )?; | ||||
|  | ||||
|     let (new_builder, count) = builder.edit_documents(&candidates, context, code)?; | ||||
|     builder = new_builder; | ||||
|  | ||||
|     let _ = builder.execute()?; | ||||
|     Ok(count.unwrap()) | ||||
| } | ||||
|   | ||||
| @@ -56,11 +56,12 @@ use meilisearch_types::heed::types::{SerdeBincode, SerdeJson, Str, I128}; | ||||
| use meilisearch_types::heed::{self, Database, Env, PutFlags, RoTxn, RwTxn}; | ||||
| use meilisearch_types::milli::documents::DocumentsBatchBuilder; | ||||
| use meilisearch_types::milli::index::IndexEmbeddingConfig; | ||||
| use meilisearch_types::milli::update::new::indexer::document_changes::Progress; | ||||
| use meilisearch_types::milli::update::IndexerConfig; | ||||
| use meilisearch_types::milli::vector::{Embedder, EmbedderOptions, EmbeddingConfigs}; | ||||
| use meilisearch_types::milli::{self, CboRoaringBitmapCodec, Index, RoaringBitmapCodec, BEU32}; | ||||
| use meilisearch_types::task_view::TaskView; | ||||
| use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; | ||||
| use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task, TaskProgress}; | ||||
| use rayon::current_num_threads; | ||||
| use rayon::prelude::{IntoParallelIterator, ParallelIterator}; | ||||
| use roaring::RoaringBitmap; | ||||
| @@ -167,12 +168,14 @@ pub struct ProcessingTasks { | ||||
|     batch: Option<ProcessingBatch>, | ||||
|     /// The list of tasks ids that are currently running. | ||||
|     processing: RoaringBitmap, | ||||
|     /// The progress on processing tasks | ||||
|     progress: Option<TaskProgress>, | ||||
| } | ||||
|  | ||||
| impl ProcessingTasks { | ||||
|     /// Creates an empty `ProcessingAt` struct. | ||||
|     fn new() -> ProcessingTasks { | ||||
|         ProcessingTasks { batch: None, processing: RoaringBitmap::new() } | ||||
|         ProcessingTasks { batch: None, processing: RoaringBitmap::new(), progress: None } | ||||
|     } | ||||
|  | ||||
|     /// Stores the currently processing tasks, and the date time at which it started. | ||||
| @@ -181,11 +184,18 @@ impl ProcessingTasks { | ||||
|         self.processing = processing; | ||||
|     } | ||||
|  | ||||
|     fn update_progress(&mut self, progress: Progress) -> TaskProgress { | ||||
|         self.progress.get_or_insert_with(TaskProgress::default).update(progress) | ||||
|     } | ||||
|  | ||||
|     /// Set the processing tasks to an empty list | ||||
|     fn stop_processing(&mut self) -> Self { | ||||
|         self.progress = None; | ||||
|  | ||||
|         Self { | ||||
|             batch: std::mem::take(&mut self.batch), | ||||
|             processing: std::mem::take(&mut self.processing), | ||||
|             progress: None, | ||||
|         } | ||||
|     } | ||||
|  | ||||
| @@ -768,7 +778,7 @@ impl IndexScheduler { | ||||
|  | ||||
|     /// Return the task ids matched by the given query from the index scheduler's point of view. | ||||
|     pub(crate) fn get_task_ids(&self, rtxn: &RoTxn, query: &Query) -> Result<RoaringBitmap> { | ||||
|         let ProcessingTasks { batch: processing_batch, processing: processing_tasks } = | ||||
|         let ProcessingTasks { batch: processing_batch, processing: processing_tasks, progress: _ } = | ||||
|             self.processing_tasks.read().unwrap().clone(); | ||||
|         let Query { | ||||
|             limit, | ||||
| @@ -1352,9 +1362,12 @@ impl IndexScheduler { | ||||
|         let tasks = | ||||
|             self.get_existing_tasks(&rtxn, tasks.take(query.limit.unwrap_or(u32::MAX) as usize))?; | ||||
|  | ||||
|         let ProcessingTasks { batch, processing } = | ||||
|         let ProcessingTasks { batch, processing, progress } = | ||||
|             self.processing_tasks.read().map_err(|_| Error::CorruptedTaskQueue)?.clone(); | ||||
|  | ||||
|         // ignored for now, might be added to batch details later | ||||
|         let _ = progress; | ||||
|  | ||||
|         let ret = tasks.into_iter(); | ||||
|         if processing.is_empty() || batch.is_none() { | ||||
|             Ok((ret.collect(), total)) | ||||
| @@ -2290,7 +2303,7 @@ mod tests { | ||||
|                 dumps_path: tempdir.path().join("dumps"), | ||||
|                 webhook_url: None, | ||||
|                 webhook_authorization_header: None, | ||||
|                 task_db_size: 1000 * 1000, // 1 MB, we don't use MiB on purpose. | ||||
|                 task_db_size: 1000 * 1000 * 10, // 10 MB, we don't use MiB on purpose. | ||||
|                 index_base_map_size: 1000 * 1000, // 1 MB, we don't use MiB on purpose. | ||||
|                 enable_mdb_writemap: false, | ||||
|                 index_growth_amount: 1000 * 1000 * 1000 * 1000, // 1 TB | ||||
| @@ -5186,12 +5199,10 @@ mod tests { | ||||
|         handle.advance_one_successful_batch(); | ||||
|         snapshot!(snapshot_index_scheduler(&index_scheduler), name: "only_first_task_succeed"); | ||||
|  | ||||
|         // The second batch should fail. | ||||
|         handle.advance_one_failed_batch(); | ||||
|         handle.advance_one_successful_batch(); | ||||
|         snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_task_fails"); | ||||
|  | ||||
|         // The second batch should fail. | ||||
|         handle.advance_one_failed_batch(); | ||||
|         handle.advance_one_successful_batch(); | ||||
|         snapshot!(snapshot_index_scheduler(&index_scheduler), name: "third_task_fails"); | ||||
|  | ||||
|         // Is the primary key still what we expect? | ||||
| @@ -5251,8 +5262,7 @@ mod tests { | ||||
|         handle.advance_one_successful_batch(); | ||||
|         snapshot!(snapshot_index_scheduler(&index_scheduler), name: "only_first_task_succeed"); | ||||
|  | ||||
|         // The second batch should fail and contains two tasks. | ||||
|         handle.advance_one_failed_batch(); | ||||
|         handle.advance_one_successful_batch(); | ||||
|         snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_and_third_tasks_fails"); | ||||
|  | ||||
|         // Is the primary key still what we expect? | ||||
| @@ -5331,7 +5341,8 @@ mod tests { | ||||
|         snapshot!(primary_key, @"id"); | ||||
|  | ||||
|         // We're trying to `bork` again, but now there is already a primary key set for this index. | ||||
|         handle.advance_one_failed_batch(); | ||||
|         // NOTE: it's marked as successful because the batch didn't fails, it's the individual tasks that failed. | ||||
|         handle.advance_one_successful_batch(); | ||||
|         snapshot!(snapshot_index_scheduler(&index_scheduler), name: "fourth_task_fails"); | ||||
|  | ||||
|         // Finally the last task should succeed since its primary key is the same as the valid one. | ||||
| @@ -5491,7 +5502,7 @@ mod tests { | ||||
|         snapshot!(primary_key.is_none(), @"false"); | ||||
|  | ||||
|         // The second batch should contains only one task that fails because it tries to update the primary key to `bork`. | ||||
|         handle.advance_one_failed_batch(); | ||||
|         handle.advance_one_successful_batch(); | ||||
|         snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_task_fails"); | ||||
|  | ||||
|         // The third batch should succeed and only contains one task. | ||||
| @@ -6136,9 +6147,10 @@ mod tests { | ||||
|  | ||||
|             let configs = index_scheduler.embedders(configs).unwrap(); | ||||
|             let (hf_embedder, _, _) = configs.get(&simple_hf_name).unwrap(); | ||||
|             let beagle_embed = hf_embedder.embed_one(S("Intel the beagle best doggo")).unwrap(); | ||||
|             let lab_embed = hf_embedder.embed_one(S("Max the lab best doggo")).unwrap(); | ||||
|             let patou_embed = hf_embedder.embed_one(S("kefir the patou best doggo")).unwrap(); | ||||
|             let beagle_embed = | ||||
|                 hf_embedder.embed_one(S("Intel the beagle best doggo"), None).unwrap(); | ||||
|             let lab_embed = hf_embedder.embed_one(S("Max the lab best doggo"), None).unwrap(); | ||||
|             let patou_embed = hf_embedder.embed_one(S("kefir the patou best doggo"), None).unwrap(); | ||||
|             (fakerest_name, simple_hf_name, beagle_embed, lab_embed, patou_embed) | ||||
|         }; | ||||
|  | ||||
|   | ||||
| @@ -1,5 +1,5 @@ | ||||
| --- | ||||
| source: crates/index-scheduler/src/lib.rs | ||||
| source: crates/crates/index-scheduler/src/lib.rs | ||||
| snapshot_kind: text | ||||
| --- | ||||
| ### Autobatching Enabled = true | ||||
| @@ -23,7 +23,7 @@ succeeded [0,1,2,] | ||||
| doggos [0,1,2,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Index Mapper: | ||||
| doggos: { number_of_documents: 1, field_distribution: {"_vectors": 1, "breed": 1, "doggo": 1, "id": 1} } | ||||
| doggos: { number_of_documents: 1, field_distribution: {"breed": 1, "doggo": 1, "id": 1} } | ||||
|  | ||||
| ---------------------------------------------------------------------- | ||||
| ### Canceled By: | ||||
|   | ||||
| @@ -1,5 +1,5 @@ | ||||
| --- | ||||
| source: crates/index-scheduler/src/lib.rs | ||||
| source: crates/crates/index-scheduler/src/lib.rs | ||||
| snapshot_kind: text | ||||
| --- | ||||
| ### Autobatching Enabled = true | ||||
| @@ -23,7 +23,7 @@ succeeded [0,1,] | ||||
| doggos [0,1,2,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Index Mapper: | ||||
| doggos: { number_of_documents: 1, field_distribution: {"_vectors": 1, "breed": 1, "doggo": 1, "id": 1} } | ||||
| doggos: { number_of_documents: 1, field_distribution: {"breed": 1, "doggo": 1, "id": 1} } | ||||
|  | ||||
| ---------------------------------------------------------------------- | ||||
| ### Canceled By: | ||||
|   | ||||
| @@ -1,5 +1,5 @@ | ||||
| --- | ||||
| source: crates/index-scheduler/src/lib.rs | ||||
| source: crates/crates/index-scheduler/src/lib.rs | ||||
| snapshot_kind: text | ||||
| --- | ||||
| ### Autobatching Enabled = true | ||||
| @@ -22,7 +22,7 @@ succeeded [0,1,] | ||||
| doggos [0,1,] | ||||
| ---------------------------------------------------------------------- | ||||
| ### Index Mapper: | ||||
| doggos: { number_of_documents: 1, field_distribution: {"_vectors": 1, "breed": 1, "doggo": 1, "id": 1} } | ||||
| doggos: { number_of_documents: 1, field_distribution: {"breed": 1, "doggo": 1, "id": 1} } | ||||
|  | ||||
| ---------------------------------------------------------------------- | ||||
| ### Canceled By: | ||||
|   | ||||
										
											
												File diff suppressed because one or more lines are too long
											
										
									
								
							
		Reference in New Issue
	
	Block a user