mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-25 13:06:27 +00:00 
			
		
		
		
	Merge #5147
5147: Batch progress r=dureuill a=irevoire # Pull Request ## Related issue Fixes https://github.com/meilisearch/meilisearch/issues/5068 ## What does this PR do? - ... ## PR checklist Please check if your PR fulfills the following requirements: - [ ] Does this PR fix an existing issue, or have you listed the changes applied in the PR description (and why they are needed)? - [ ] Have you read the contributing guidelines? - [ ] Have you made sure that the title is accurate and descriptive of the changes? Thank you so much for contributing to Meilisearch! Co-authored-by: Tamo <tamo@meilisearch.com>
This commit is contained in:
		| @@ -22,8 +22,7 @@ use std::ffi::OsStr; | ||||
| use std::fmt; | ||||
| use std::fs::{self, File}; | ||||
| use std::io::BufWriter; | ||||
| use std::sync::atomic::{self, AtomicU64}; | ||||
| use std::time::Duration; | ||||
| use std::sync::atomic::Ordering; | ||||
|  | ||||
| use bumpalo::collections::CollectIn; | ||||
| use bumpalo::Bump; | ||||
| @@ -32,6 +31,7 @@ use meilisearch_types::batches::BatchId; | ||||
| use meilisearch_types::heed::{RoTxn, RwTxn}; | ||||
| use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader, PrimaryKey}; | ||||
| use meilisearch_types::milli::heed::CompactionOption; | ||||
| use meilisearch_types::milli::progress::Progress; | ||||
| use meilisearch_types::milli::update::new::indexer::{self, UpdateByFunction}; | ||||
| use meilisearch_types::milli::update::{ | ||||
|     DocumentAdditionResult, IndexDocumentsMethod, Settings as MilliSettings, | ||||
| @@ -41,9 +41,7 @@ use meilisearch_types::milli::vector::parsed_vectors::{ | ||||
| }; | ||||
| use meilisearch_types::milli::{self, Filter, ThreadPoolNoAbortBuilder}; | ||||
| use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked}; | ||||
| use meilisearch_types::tasks::{ | ||||
|     Details, IndexSwap, Kind, KindWithContent, Status, Task, TaskProgress, | ||||
| }; | ||||
| use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task}; | ||||
| use meilisearch_types::{compression, Index, VERSION_FILE_NAME}; | ||||
| use roaring::RoaringBitmap; | ||||
| use time::macros::format_description; | ||||
| @@ -51,6 +49,13 @@ use time::OffsetDateTime; | ||||
| use uuid::Uuid; | ||||
|  | ||||
| use crate::autobatcher::{self, BatchKind}; | ||||
| use crate::processing::{ | ||||
|     AtomicBatchStep, AtomicDocumentStep, AtomicTaskStep, AtomicUpdateFileStep, CreateIndexProgress, | ||||
|     DeleteIndexProgress, DocumentDeletionProgress, DocumentEditionProgress, | ||||
|     DocumentOperationProgress, DumpCreationProgress, InnerSwappingTwoIndexes, SettingsProgress, | ||||
|     SnapshotCreationProgress, SwappingTheIndexes, TaskCancelationProgress, TaskDeletionProgress, | ||||
|     UpdateIndexProgress, VariableNameStep, | ||||
| }; | ||||
| use crate::utils::{self, swap_index_uid_in_task, ProcessingBatch}; | ||||
| use crate::{Error, IndexScheduler, Result, TaskId}; | ||||
|  | ||||
| @@ -561,11 +566,12 @@ impl IndexScheduler { | ||||
|     /// The list of tasks that were processed. The metadata of each task in the returned | ||||
|     /// list is updated accordingly, with the exception of the its date fields | ||||
|     /// [`finished_at`](meilisearch_types::tasks::Task::finished_at) and [`started_at`](meilisearch_types::tasks::Task::started_at). | ||||
|     #[tracing::instrument(level = "trace", skip(self, batch), target = "indexing::scheduler", fields(batch=batch.to_string()))] | ||||
|     #[tracing::instrument(level = "trace", skip(self, batch, progress), target = "indexing::scheduler", fields(batch=batch.to_string()))] | ||||
|     pub(crate) fn process_batch( | ||||
|         &self, | ||||
|         batch: Batch, | ||||
|         current_batch: &mut ProcessingBatch, | ||||
|         progress: Progress, | ||||
|     ) -> Result<Vec<Task>> { | ||||
|         #[cfg(test)] | ||||
|         { | ||||
| @@ -585,8 +591,13 @@ impl IndexScheduler { | ||||
|                     }; | ||||
|  | ||||
|                 let rtxn = self.env.read_txn()?; | ||||
|                 let mut canceled_tasks = | ||||
|                     self.cancel_matched_tasks(&rtxn, task.uid, current_batch, matched_tasks)?; | ||||
|                 let mut canceled_tasks = self.cancel_matched_tasks( | ||||
|                     &rtxn, | ||||
|                     task.uid, | ||||
|                     current_batch, | ||||
|                     matched_tasks, | ||||
|                     &progress, | ||||
|                 )?; | ||||
|  | ||||
|                 task.status = Status::Succeeded; | ||||
|                 match &mut task.details { | ||||
| @@ -617,7 +628,8 @@ impl IndexScheduler { | ||||
|                 } | ||||
|  | ||||
|                 let mut wtxn = self.env.write_txn()?; | ||||
|                 let mut deleted_tasks = self.delete_matched_tasks(&mut wtxn, &matched_tasks)?; | ||||
|                 let mut deleted_tasks = | ||||
|                     self.delete_matched_tasks(&mut wtxn, &matched_tasks, &progress)?; | ||||
|                 wtxn.commit()?; | ||||
|  | ||||
|                 for task in tasks.iter_mut() { | ||||
| @@ -643,6 +655,8 @@ impl IndexScheduler { | ||||
|                 Ok(tasks) | ||||
|             } | ||||
|             Batch::SnapshotCreation(mut tasks) => { | ||||
|                 progress.update_progress(SnapshotCreationProgress::StartTheSnapshotCreation); | ||||
|  | ||||
|                 fs::create_dir_all(&self.snapshots_path)?; | ||||
|                 let temp_snapshot_dir = tempfile::tempdir()?; | ||||
|  | ||||
| @@ -663,6 +677,7 @@ impl IndexScheduler { | ||||
|                 // two read operations as the task processing is synchronous. | ||||
|  | ||||
|                 // 2.1 First copy the LMDB env of the index-scheduler | ||||
|                 progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler); | ||||
|                 let dst = temp_snapshot_dir.path().join("tasks"); | ||||
|                 fs::create_dir_all(&dst)?; | ||||
|                 self.env.copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled)?; | ||||
| @@ -675,18 +690,29 @@ impl IndexScheduler { | ||||
|                 fs::create_dir_all(&update_files_dir)?; | ||||
|  | ||||
|                 // 2.4 Only copy the update files of the enqueued tasks | ||||
|                 for task_id in self.get_status(&rtxn, Status::Enqueued)? { | ||||
|                 progress.update_progress(SnapshotCreationProgress::SnapshotTheUpdateFiles); | ||||
|                 let enqueued = self.get_status(&rtxn, Status::Enqueued)?; | ||||
|                 let (atomic, update_file_progress) = | ||||
|                     AtomicUpdateFileStep::new(enqueued.len() as u32); | ||||
|                 progress.update_progress(update_file_progress); | ||||
|                 for task_id in enqueued { | ||||
|                     let task = self.get_task(&rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?; | ||||
|                     if let Some(content_uuid) = task.content_uuid() { | ||||
|                         let src = self.file_store.get_update_path(content_uuid); | ||||
|                         let dst = update_files_dir.join(content_uuid.to_string()); | ||||
|                         fs::copy(src, dst)?; | ||||
|                     } | ||||
|                     atomic.fetch_add(1, Ordering::Relaxed); | ||||
|                 } | ||||
|  | ||||
|                 // 3. Snapshot every indexes | ||||
|                 for result in self.index_mapper.index_mapping.iter(&rtxn)? { | ||||
|                 progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexes); | ||||
|                 let index_mapping = self.index_mapper.index_mapping; | ||||
|                 let nb_indexes = index_mapping.len(&rtxn)? as u32; | ||||
|  | ||||
|                 for (i, result) in index_mapping.iter(&rtxn)?.enumerate() { | ||||
|                     let (name, uuid) = result?; | ||||
|                     progress.update_progress(VariableNameStep::new(name, i as u32, nb_indexes)); | ||||
|                     let index = self.index_mapper.index(&rtxn, name)?; | ||||
|                     let dst = temp_snapshot_dir.path().join("indexes").join(uuid.to_string()); | ||||
|                     fs::create_dir_all(&dst)?; | ||||
| @@ -698,6 +724,7 @@ impl IndexScheduler { | ||||
|                 drop(rtxn); | ||||
|  | ||||
|                 // 4. Snapshot the auth LMDB env | ||||
|                 progress.update_progress(SnapshotCreationProgress::SnapshotTheApiKeys); | ||||
|                 let dst = temp_snapshot_dir.path().join("auth"); | ||||
|                 fs::create_dir_all(&dst)?; | ||||
|                 // TODO We can't use the open_auth_store_env function here but we should | ||||
| @@ -710,6 +737,7 @@ impl IndexScheduler { | ||||
|                 auth.copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled)?; | ||||
|  | ||||
|                 // 5. Copy and tarball the flat snapshot | ||||
|                 progress.update_progress(SnapshotCreationProgress::CreateTheTarball); | ||||
|                 // 5.1 Find the original name of the database | ||||
|                 // TODO find a better way to get this path | ||||
|                 let mut base_path = self.env.path().to_owned(); | ||||
| @@ -742,6 +770,7 @@ impl IndexScheduler { | ||||
|                 Ok(tasks) | ||||
|             } | ||||
|             Batch::Dump(mut task) => { | ||||
|                 progress.update_progress(DumpCreationProgress::StartTheDumpCreation); | ||||
|                 let started_at = OffsetDateTime::now_utc(); | ||||
|                 let (keys, instance_uid) = | ||||
|                     if let KindWithContent::DumpCreation { keys, instance_uid } = &task.kind { | ||||
| @@ -752,6 +781,7 @@ impl IndexScheduler { | ||||
|                 let dump = dump::DumpWriter::new(*instance_uid)?; | ||||
|  | ||||
|                 // 1. dump the keys | ||||
|                 progress.update_progress(DumpCreationProgress::DumpTheApiKeys); | ||||
|                 let mut dump_keys = dump.create_keys()?; | ||||
|                 for key in keys { | ||||
|                     dump_keys.push_key(key)?; | ||||
| @@ -761,7 +791,13 @@ impl IndexScheduler { | ||||
|                 let rtxn = self.env.read_txn()?; | ||||
|  | ||||
|                 // 2. dump the tasks | ||||
|                 progress.update_progress(DumpCreationProgress::DumpTheTasks); | ||||
|                 let mut dump_tasks = dump.create_tasks_queue()?; | ||||
|  | ||||
|                 let (atomic, update_task_progress) = | ||||
|                     AtomicTaskStep::new(self.all_tasks.len(&rtxn)? as u32); | ||||
|                 progress.update_progress(update_task_progress); | ||||
|  | ||||
|                 for ret in self.all_tasks.iter(&rtxn)? { | ||||
|                     if self.must_stop_processing.get() { | ||||
|                         return Err(Error::AbortedTask); | ||||
| @@ -811,11 +847,22 @@ impl IndexScheduler { | ||||
|                             dump_content_file.flush()?; | ||||
|                         } | ||||
|                     } | ||||
|                     atomic.fetch_add(1, Ordering::Relaxed); | ||||
|                 } | ||||
|                 dump_tasks.flush()?; | ||||
|  | ||||
|                 // 3. Dump the indexes | ||||
|                 progress.update_progress(DumpCreationProgress::DumpTheIndexes); | ||||
|                 let nb_indexes = self.index_mapper.index_mapping.len(&rtxn)? as u32; | ||||
|                 let mut count = 0; | ||||
|                 self.index_mapper.try_for_each_index(&rtxn, |uid, index| -> Result<()> { | ||||
|                     progress.update_progress(VariableNameStep::new( | ||||
|                         uid.to_string(), | ||||
|                         count, | ||||
|                         nb_indexes, | ||||
|                     )); | ||||
|                     count += 1; | ||||
|  | ||||
|                     let rtxn = index.read_txn()?; | ||||
|                     let metadata = IndexMetadata { | ||||
|                         uid: uid.to_owned(), | ||||
| @@ -835,6 +882,12 @@ impl IndexScheduler { | ||||
|                         .embedding_configs(&rtxn) | ||||
|                         .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; | ||||
|  | ||||
|                     let nb_documents = index | ||||
|                         .number_of_documents(&rtxn) | ||||
|                         .map_err(|e| Error::from_milli(e, Some(uid.to_string())))? | ||||
|                         as u32; | ||||
|                     let (atomic, update_document_progress) = AtomicDocumentStep::new(nb_documents); | ||||
|                     progress.update_progress(update_document_progress); | ||||
|                     let documents = index | ||||
|                         .all_documents(&rtxn) | ||||
|                         .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; | ||||
| @@ -904,6 +957,7 @@ impl IndexScheduler { | ||||
|                         } | ||||
|  | ||||
|                         index_dumper.push_document(&document)?; | ||||
|                         atomic.fetch_add(1, Ordering::Relaxed); | ||||
|                     } | ||||
|  | ||||
|                     // 3.2. Dump the settings | ||||
| @@ -918,6 +972,7 @@ impl IndexScheduler { | ||||
|                 })?; | ||||
|  | ||||
|                 // 4. Dump experimental feature settings | ||||
|                 progress.update_progress(DumpCreationProgress::DumpTheExperimentalFeatures); | ||||
|                 let features = self.features().runtime_features(); | ||||
|                 dump.create_experimental_features(features)?; | ||||
|  | ||||
| @@ -928,6 +983,7 @@ impl IndexScheduler { | ||||
|                 if self.must_stop_processing.get() { | ||||
|                     return Err(Error::AbortedTask); | ||||
|                 } | ||||
|                 progress.update_progress(DumpCreationProgress::CompressTheDump); | ||||
|                 let path = self.dumps_path.join(format!("{}.dump", dump_uid)); | ||||
|                 let file = File::create(path)?; | ||||
|                 dump.persist_to(BufWriter::new(file))?; | ||||
| @@ -953,7 +1009,7 @@ impl IndexScheduler { | ||||
|                     .set_currently_updating_index(Some((index_uid.clone(), index.clone()))); | ||||
|  | ||||
|                 let mut index_wtxn = index.write_txn()?; | ||||
|                 let tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?; | ||||
|                 let tasks = self.apply_index_operation(&mut index_wtxn, &index, op, progress)?; | ||||
|  | ||||
|                 { | ||||
|                     let span = tracing::trace_span!(target: "indexing::scheduler", "commit"); | ||||
| @@ -987,6 +1043,8 @@ impl IndexScheduler { | ||||
|                 Ok(tasks) | ||||
|             } | ||||
|             Batch::IndexCreation { index_uid, primary_key, task } => { | ||||
|                 progress.update_progress(CreateIndexProgress::CreatingTheIndex); | ||||
|  | ||||
|                 let wtxn = self.env.write_txn()?; | ||||
|                 if self.index_mapper.exists(&wtxn, &index_uid)? { | ||||
|                     return Err(Error::IndexAlreadyExists(index_uid)); | ||||
| @@ -996,9 +1054,11 @@ impl IndexScheduler { | ||||
|                 self.process_batch( | ||||
|                     Batch::IndexUpdate { index_uid, primary_key, task }, | ||||
|                     current_batch, | ||||
|                     progress, | ||||
|                 ) | ||||
|             } | ||||
|             Batch::IndexUpdate { index_uid, primary_key, mut task } => { | ||||
|                 progress.update_progress(UpdateIndexProgress::UpdatingTheIndex); | ||||
|                 let rtxn = self.env.read_txn()?; | ||||
|                 let index = self.index_mapper.index(&rtxn, &index_uid)?; | ||||
|  | ||||
| @@ -1051,6 +1111,7 @@ impl IndexScheduler { | ||||
|                 Ok(vec![task]) | ||||
|             } | ||||
|             Batch::IndexDeletion { index_uid, index_has_been_created, mut tasks } => { | ||||
|                 progress.update_progress(DeleteIndexProgress::DeletingTheIndex); | ||||
|                 let wtxn = self.env.write_txn()?; | ||||
|  | ||||
|                 // it's possible that the index doesn't exist | ||||
| @@ -1084,6 +1145,8 @@ impl IndexScheduler { | ||||
|                 Ok(tasks) | ||||
|             } | ||||
|             Batch::IndexSwap { mut task } => { | ||||
|                 progress.update_progress(SwappingTheIndexes::EnsuringCorrectnessOfTheSwap); | ||||
|  | ||||
|                 let mut wtxn = self.env.write_txn()?; | ||||
|                 let swaps = if let KindWithContent::IndexSwap { swaps } = &task.kind { | ||||
|                     swaps | ||||
| @@ -1110,8 +1173,20 @@ impl IndexScheduler { | ||||
|                         )); | ||||
|                     } | ||||
|                 } | ||||
|                 for swap in swaps { | ||||
|                     self.apply_index_swap(&mut wtxn, task.uid, &swap.indexes.0, &swap.indexes.1)?; | ||||
|                 progress.update_progress(SwappingTheIndexes::SwappingTheIndexes); | ||||
|                 for (step, swap) in swaps.iter().enumerate() { | ||||
|                     progress.update_progress(VariableNameStep::new( | ||||
|                         format!("swapping index {} and {}", swap.indexes.0, swap.indexes.1), | ||||
|                         step as u32, | ||||
|                         swaps.len() as u32, | ||||
|                     )); | ||||
|                     self.apply_index_swap( | ||||
|                         &mut wtxn, | ||||
|                         &progress, | ||||
|                         task.uid, | ||||
|                         &swap.indexes.0, | ||||
|                         &swap.indexes.1, | ||||
|                     )?; | ||||
|                 } | ||||
|                 wtxn.commit()?; | ||||
|                 task.status = Status::Succeeded; | ||||
| @@ -1121,7 +1196,15 @@ impl IndexScheduler { | ||||
|     } | ||||
|  | ||||
|     /// Swap the index `lhs` with the index `rhs`. | ||||
|     fn apply_index_swap(&self, wtxn: &mut RwTxn, task_id: u32, lhs: &str, rhs: &str) -> Result<()> { | ||||
|     fn apply_index_swap( | ||||
|         &self, | ||||
|         wtxn: &mut RwTxn, | ||||
|         progress: &Progress, | ||||
|         task_id: u32, | ||||
|         lhs: &str, | ||||
|         rhs: &str, | ||||
|     ) -> Result<()> { | ||||
|         progress.update_progress(InnerSwappingTwoIndexes::RetrieveTheTasks); | ||||
|         // 1. Verify that both lhs and rhs are existing indexes | ||||
|         let index_lhs_exists = self.index_mapper.index_exists(wtxn, lhs)?; | ||||
|         if !index_lhs_exists { | ||||
| @@ -1139,14 +1222,21 @@ impl IndexScheduler { | ||||
|         index_rhs_task_ids.remove_range(task_id..); | ||||
|  | ||||
|         // 3. before_name -> new_name in the task's KindWithContent | ||||
|         for task_id in &index_lhs_task_ids | &index_rhs_task_ids { | ||||
|         progress.update_progress(InnerSwappingTwoIndexes::UpdateTheTasks); | ||||
|         let tasks_to_update = &index_lhs_task_ids | &index_rhs_task_ids; | ||||
|         let (atomic, task_progress) = AtomicTaskStep::new(tasks_to_update.len() as u32); | ||||
|         progress.update_progress(task_progress); | ||||
|  | ||||
|         for task_id in tasks_to_update { | ||||
|             let mut task = self.get_task(wtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?; | ||||
|             swap_index_uid_in_task(&mut task, (lhs, rhs)); | ||||
|             self.all_tasks.put(wtxn, &task_id, &task)?; | ||||
|             atomic.fetch_add(1, Ordering::Relaxed); | ||||
|         } | ||||
|  | ||||
|         // 4. remove the task from indexuid = before_name | ||||
|         // 5. add the task to indexuid = after_name | ||||
|         progress.update_progress(InnerSwappingTwoIndexes::UpdateTheIndexesMetadata); | ||||
|         self.update_index(wtxn, lhs, |lhs_tasks| { | ||||
|             *lhs_tasks -= &index_lhs_task_ids; | ||||
|             *lhs_tasks |= &index_rhs_task_ids; | ||||
| @@ -1168,7 +1258,7 @@ impl IndexScheduler { | ||||
|     /// The list of processed tasks. | ||||
|     #[tracing::instrument( | ||||
|         level = "trace", | ||||
|         skip(self, index_wtxn, index), | ||||
|         skip(self, index_wtxn, index, progress), | ||||
|         target = "indexing::scheduler" | ||||
|     )] | ||||
|     fn apply_index_operation<'i>( | ||||
| @@ -1176,44 +1266,12 @@ impl IndexScheduler { | ||||
|         index_wtxn: &mut RwTxn<'i>, | ||||
|         index: &'i Index, | ||||
|         operation: IndexOperation, | ||||
|         progress: Progress, | ||||
|     ) -> Result<Vec<Task>> { | ||||
|         let indexer_alloc = Bump::new(); | ||||
|  | ||||
|         let started_processing_at = std::time::Instant::now(); | ||||
|         let secs_since_started_processing_at = AtomicU64::new(0); | ||||
|         const PRINT_SECS_DELTA: u64 = 5; | ||||
|  | ||||
|         let processing_tasks = self.processing_tasks.clone(); | ||||
|         let must_stop_processing = self.must_stop_processing.clone(); | ||||
|         let send_progress = |progress| { | ||||
|             let now = std::time::Instant::now(); | ||||
|             let elapsed = secs_since_started_processing_at.load(atomic::Ordering::Relaxed); | ||||
|             let previous = started_processing_at + Duration::from_secs(elapsed); | ||||
|             let elapsed = now - previous; | ||||
|  | ||||
|             if elapsed.as_secs() < PRINT_SECS_DELTA { | ||||
|                 return; | ||||
|             } | ||||
|  | ||||
|             secs_since_started_processing_at | ||||
|                 .store((now - started_processing_at).as_secs(), atomic::Ordering::Relaxed); | ||||
|  | ||||
|             let TaskProgress { | ||||
|                 current_step, | ||||
|                 finished_steps, | ||||
|                 total_steps, | ||||
|                 finished_substeps, | ||||
|                 total_substeps, | ||||
|             } = processing_tasks.write().unwrap().update_progress(progress); | ||||
|  | ||||
|             tracing::info!( | ||||
|                 current_step, | ||||
|                 finished_steps, | ||||
|                 total_steps, | ||||
|                 finished_substeps, | ||||
|                 total_substeps | ||||
|             ); | ||||
|         }; | ||||
|  | ||||
|         match operation { | ||||
|             IndexOperation::DocumentClear { index_uid, mut tasks } => { | ||||
| @@ -1245,6 +1303,7 @@ impl IndexScheduler { | ||||
|                 operations, | ||||
|                 mut tasks, | ||||
|             } => { | ||||
|                 progress.update_progress(DocumentOperationProgress::RetrievingConfig); | ||||
|                 // TODO: at some point, for better efficiency we might want to reuse the bumpalo for successive batches. | ||||
|                 // this is made difficult by the fact we're doing private clones of the index scheduler and sending it | ||||
|                 // to a fresh thread. | ||||
| @@ -1300,6 +1359,7 @@ impl IndexScheduler { | ||||
|                     } | ||||
|                 }; | ||||
|  | ||||
|                 progress.update_progress(DocumentOperationProgress::ComputingDocumentChanges); | ||||
|                 let (document_changes, operation_stats, primary_key) = indexer | ||||
|                     .into_changes( | ||||
|                         &indexer_alloc, | ||||
| @@ -1308,7 +1368,7 @@ impl IndexScheduler { | ||||
|                         primary_key.as_deref(), | ||||
|                         &mut new_fields_ids_map, | ||||
|                         &|| must_stop_processing.get(), | ||||
|                         &send_progress, | ||||
|                         progress.clone(), | ||||
|                     ) | ||||
|                     .map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?; | ||||
|  | ||||
| @@ -1344,6 +1404,7 @@ impl IndexScheduler { | ||||
|                     } | ||||
|                 } | ||||
|  | ||||
|                 progress.update_progress(DocumentOperationProgress::Indexing); | ||||
|                 if tasks.iter().any(|res| res.error.is_none()) { | ||||
|                     indexer::index( | ||||
|                         index_wtxn, | ||||
| @@ -1356,7 +1417,7 @@ impl IndexScheduler { | ||||
|                         &document_changes, | ||||
|                         embedders, | ||||
|                         &|| must_stop_processing.get(), | ||||
|                         &send_progress, | ||||
|                         &progress, | ||||
|                     ) | ||||
|                     .map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?; | ||||
|  | ||||
| @@ -1373,6 +1434,8 @@ impl IndexScheduler { | ||||
|                 Ok(tasks) | ||||
|             } | ||||
|             IndexOperation::DocumentEdition { index_uid, mut task } => { | ||||
|                 progress.update_progress(DocumentEditionProgress::RetrievingConfig); | ||||
|  | ||||
|                 let (filter, code) = if let KindWithContent::DocumentEdition { | ||||
|                     filter_expr, | ||||
|                     context: _, | ||||
| @@ -1446,6 +1509,7 @@ impl IndexScheduler { | ||||
|                     }; | ||||
|  | ||||
|                     let candidates_count = candidates.len(); | ||||
|                     progress.update_progress(DocumentEditionProgress::ComputingDocumentChanges); | ||||
|                     let indexer = UpdateByFunction::new(candidates, context.clone(), code.clone()); | ||||
|                     let document_changes = pool | ||||
|                         .install(|| { | ||||
| @@ -1459,6 +1523,7 @@ impl IndexScheduler { | ||||
|                         .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?; | ||||
|                     let embedders = self.embedders(index_uid.clone(), embedders)?; | ||||
|  | ||||
|                     progress.update_progress(DocumentEditionProgress::Indexing); | ||||
|                     indexer::index( | ||||
|                         index_wtxn, | ||||
|                         index, | ||||
| @@ -1470,7 +1535,7 @@ impl IndexScheduler { | ||||
|                         &document_changes, | ||||
|                         embedders, | ||||
|                         &|| must_stop_processing.get(), | ||||
|                         &send_progress, | ||||
|                         &progress, | ||||
|                     ) | ||||
|                     .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?; | ||||
|  | ||||
| @@ -1511,6 +1576,8 @@ impl IndexScheduler { | ||||
|                 Ok(vec![task]) | ||||
|             } | ||||
|             IndexOperation::DocumentDeletion { mut tasks, index_uid } => { | ||||
|                 progress.update_progress(DocumentDeletionProgress::RetrievingConfig); | ||||
|  | ||||
|                 let mut to_delete = RoaringBitmap::new(); | ||||
|                 let external_documents_ids = index.external_documents_ids(); | ||||
|  | ||||
| @@ -1601,6 +1668,7 @@ impl IndexScheduler { | ||||
|                         } | ||||
|                     }; | ||||
|  | ||||
|                     progress.update_progress(DocumentDeletionProgress::DeleteDocuments); | ||||
|                     let mut indexer = indexer::DocumentDeletion::new(); | ||||
|                     let candidates_count = to_delete.len(); | ||||
|                     indexer.delete_documents_by_docids(to_delete); | ||||
| @@ -1610,6 +1678,7 @@ impl IndexScheduler { | ||||
|                         .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?; | ||||
|                     let embedders = self.embedders(index_uid.clone(), embedders)?; | ||||
|  | ||||
|                     progress.update_progress(DocumentDeletionProgress::Indexing); | ||||
|                     indexer::index( | ||||
|                         index_wtxn, | ||||
|                         index, | ||||
| @@ -1621,7 +1690,7 @@ impl IndexScheduler { | ||||
|                         &document_changes, | ||||
|                         embedders, | ||||
|                         &|| must_stop_processing.get(), | ||||
|                         &send_progress, | ||||
|                         &progress, | ||||
|                     ) | ||||
|                     .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?; | ||||
|  | ||||
| @@ -1638,6 +1707,7 @@ impl IndexScheduler { | ||||
|                 Ok(tasks) | ||||
|             } | ||||
|             IndexOperation::Settings { index_uid, settings, mut tasks } => { | ||||
|                 progress.update_progress(SettingsProgress::RetrievingAndMergingTheSettings); | ||||
|                 let indexer_config = self.index_mapper.indexer_config(); | ||||
|                 let mut builder = milli::update::Settings::new(index_wtxn, index, indexer_config); | ||||
|  | ||||
| @@ -1651,6 +1721,7 @@ impl IndexScheduler { | ||||
|                     task.status = Status::Succeeded; | ||||
|                 } | ||||
|  | ||||
|                 progress.update_progress(SettingsProgress::ApplyTheSettings); | ||||
|                 builder | ||||
|                     .execute( | ||||
|                         |indexing_step| tracing::debug!(update = ?indexing_step), | ||||
| @@ -1673,12 +1744,14 @@ impl IndexScheduler { | ||||
|                         index_uid: index_uid.clone(), | ||||
|                         tasks: cleared_tasks, | ||||
|                     }, | ||||
|                     progress.clone(), | ||||
|                 )?; | ||||
|  | ||||
|                 let settings_tasks = self.apply_index_operation( | ||||
|                     index_wtxn, | ||||
|                     index, | ||||
|                     IndexOperation::Settings { index_uid, settings, tasks: settings_tasks }, | ||||
|                     progress, | ||||
|                 )?; | ||||
|  | ||||
|                 let mut tasks = settings_tasks; | ||||
| @@ -1695,15 +1768,18 @@ impl IndexScheduler { | ||||
|         &self, | ||||
|         wtxn: &mut RwTxn, | ||||
|         matched_tasks: &RoaringBitmap, | ||||
|         progress: &Progress, | ||||
|     ) -> Result<RoaringBitmap> { | ||||
|         progress.update_progress(TaskDeletionProgress::DeletingTasksDateTime); | ||||
|  | ||||
|         // 1. Remove from this list the tasks that we are not allowed to delete | ||||
|         let enqueued_tasks = self.get_status(wtxn, Status::Enqueued)?; | ||||
|         let processing_tasks = &self.processing_tasks.read().unwrap().processing.clone(); | ||||
|  | ||||
|         let all_task_ids = self.all_task_ids(wtxn)?; | ||||
|         let mut to_delete_tasks = all_task_ids & matched_tasks; | ||||
|         to_delete_tasks -= processing_tasks; | ||||
|         to_delete_tasks -= enqueued_tasks; | ||||
|         to_delete_tasks -= &**processing_tasks; | ||||
|         to_delete_tasks -= &enqueued_tasks; | ||||
|  | ||||
|         // 2. We now have a list of tasks to delete, delete them | ||||
|  | ||||
| @@ -1714,6 +1790,8 @@ impl IndexScheduler { | ||||
|         // The tasks that have been removed *per batches*. | ||||
|         let mut affected_batches: HashMap<BatchId, RoaringBitmap> = HashMap::new(); | ||||
|  | ||||
|         let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u32); | ||||
|         progress.update_progress(task_progress); | ||||
|         for task_id in to_delete_tasks.iter() { | ||||
|             let task = self.get_task(wtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?; | ||||
|  | ||||
| @@ -1737,22 +1815,35 @@ impl IndexScheduler { | ||||
|             if let Some(batch_uid) = task.batch_uid { | ||||
|                 affected_batches.entry(batch_uid).or_default().insert(task_id); | ||||
|             } | ||||
|             atomic_progress.fetch_add(1, Ordering::Relaxed); | ||||
|         } | ||||
|  | ||||
|         progress.update_progress(TaskDeletionProgress::DeletingTasksMetadata); | ||||
|         let (atomic_progress, task_progress) = AtomicTaskStep::new( | ||||
|             (affected_indexes.len() + affected_statuses.len() + affected_kinds.len()) as u32, | ||||
|         ); | ||||
|         progress.update_progress(task_progress); | ||||
|         for index in affected_indexes.iter() { | ||||
|             self.update_index(wtxn, index, |bitmap| *bitmap -= &to_delete_tasks)?; | ||||
|             atomic_progress.fetch_add(1, Ordering::Relaxed); | ||||
|         } | ||||
|  | ||||
|         for status in affected_statuses.iter() { | ||||
|             self.update_status(wtxn, *status, |bitmap| *bitmap -= &to_delete_tasks)?; | ||||
|             atomic_progress.fetch_add(1, Ordering::Relaxed); | ||||
|         } | ||||
|  | ||||
|         for kind in affected_kinds.iter() { | ||||
|             self.update_kind(wtxn, *kind, |bitmap| *bitmap -= &to_delete_tasks)?; | ||||
|             atomic_progress.fetch_add(1, Ordering::Relaxed); | ||||
|         } | ||||
|  | ||||
|         progress.update_progress(TaskDeletionProgress::DeletingTasks); | ||||
|         let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u32); | ||||
|         progress.update_progress(task_progress); | ||||
|         for task in to_delete_tasks.iter() { | ||||
|             self.all_tasks.delete(wtxn, &task)?; | ||||
|             atomic_progress.fetch_add(1, Ordering::Relaxed); | ||||
|         } | ||||
|         for canceled_by in affected_canceled_by { | ||||
|             if let Some(mut tasks) = self.canceled_by.get(wtxn, &canceled_by)? { | ||||
| @@ -1764,6 +1855,9 @@ impl IndexScheduler { | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|         progress.update_progress(TaskDeletionProgress::DeletingBatches); | ||||
|         let (atomic_progress, batch_progress) = AtomicBatchStep::new(affected_batches.len() as u32); | ||||
|         progress.update_progress(batch_progress); | ||||
|         for (batch_id, to_delete_tasks) in affected_batches { | ||||
|             if let Some(mut tasks) = self.batch_to_tasks_mapping.get(wtxn, &batch_id)? { | ||||
|                 tasks -= &to_delete_tasks; | ||||
| @@ -1805,6 +1899,7 @@ impl IndexScheduler { | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|             atomic_progress.fetch_add(1, Ordering::Relaxed); | ||||
|         } | ||||
|  | ||||
|         Ok(to_delete_tasks) | ||||
| @@ -1819,21 +1914,36 @@ impl IndexScheduler { | ||||
|         cancel_task_id: TaskId, | ||||
|         current_batch: &mut ProcessingBatch, | ||||
|         matched_tasks: &RoaringBitmap, | ||||
|         progress: &Progress, | ||||
|     ) -> Result<Vec<Task>> { | ||||
|         progress.update_progress(TaskCancelationProgress::RetrievingTasks); | ||||
|  | ||||
|         // 1. Remove from this list the tasks that we are not allowed to cancel | ||||
|         //    Notice that only the _enqueued_ ones are cancelable and we should | ||||
|         //    have already aborted the indexation of the _processing_ ones | ||||
|         let cancelable_tasks = self.get_status(rtxn, Status::Enqueued)?; | ||||
|         let tasks_to_cancel = cancelable_tasks & matched_tasks; | ||||
|  | ||||
|         // 2. We now have a list of tasks to cancel, cancel them | ||||
|         let mut tasks = self.get_existing_tasks(rtxn, tasks_to_cancel.iter())?; | ||||
|         let (task_progress, progress_obj) = AtomicTaskStep::new(tasks_to_cancel.len() as u32); | ||||
|         progress.update_progress(progress_obj); | ||||
|  | ||||
|         // 2. We now have a list of tasks to cancel, cancel them | ||||
|         let mut tasks = self.get_existing_tasks( | ||||
|             rtxn, | ||||
|             tasks_to_cancel.iter().inspect(|_| { | ||||
|                 task_progress.fetch_add(1, Ordering::Relaxed); | ||||
|             }), | ||||
|         )?; | ||||
|  | ||||
|         progress.update_progress(TaskCancelationProgress::UpdatingTasks); | ||||
|         let (task_progress, progress_obj) = AtomicTaskStep::new(tasks_to_cancel.len() as u32); | ||||
|         progress.update_progress(progress_obj); | ||||
|         for task in tasks.iter_mut() { | ||||
|             task.status = Status::Canceled; | ||||
|             task.canceled_by = Some(cancel_task_id); | ||||
|             task.details = task.details.as_ref().map(|d| d.to_failed()); | ||||
|             current_batch.processing(Some(task)); | ||||
|             task_progress.fetch_add(1, Ordering::Relaxed); | ||||
|         } | ||||
|  | ||||
|         Ok(tasks) | ||||
|   | ||||
| @@ -3,10 +3,6 @@ use std::sync::{Arc, RwLock}; | ||||
| use std::time::Duration; | ||||
| use std::{fs, thread}; | ||||
|  | ||||
| use self::index_map::IndexMap; | ||||
| use self::IndexStatus::{Available, BeingDeleted, Closing, Missing}; | ||||
| use crate::uuid_codec::UuidCodec; | ||||
| use crate::{Error, Result}; | ||||
| use meilisearch_types::heed::types::{SerdeJson, Str}; | ||||
| use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn}; | ||||
| use meilisearch_types::milli; | ||||
| @@ -17,6 +13,11 @@ use time::OffsetDateTime; | ||||
| use tracing::error; | ||||
| use uuid::Uuid; | ||||
|  | ||||
| use self::index_map::IndexMap; | ||||
| use self::IndexStatus::{Available, BeingDeleted, Closing, Missing}; | ||||
| use crate::uuid_codec::UuidCodec; | ||||
| use crate::{Error, Result}; | ||||
|  | ||||
| mod index_map; | ||||
|  | ||||
| const INDEX_MAPPING: &str = "index-mapping"; | ||||
|   | ||||
| @@ -353,7 +353,7 @@ pub fn snapshot_canceled_by(rtxn: &RoTxn, db: Database<BEU32, RoaringBitmapCodec | ||||
|  | ||||
| pub fn snapshot_batch(batch: &Batch) -> String { | ||||
|     let mut snap = String::new(); | ||||
|     let Batch { uid, details, stats, started_at, finished_at } = batch; | ||||
|     let Batch { uid, details, stats, started_at, finished_at, progress: _ } = batch; | ||||
|     if let Some(finished_at) = finished_at { | ||||
|         assert!(finished_at > started_at); | ||||
|     } | ||||
|   | ||||
| @@ -26,6 +26,7 @@ mod index_mapper; | ||||
| #[cfg(test)] | ||||
| mod insta_snapshot; | ||||
| mod lru; | ||||
| mod processing; | ||||
| mod utils; | ||||
| pub mod uuid_codec; | ||||
|  | ||||
| @@ -56,12 +57,12 @@ use meilisearch_types::heed::types::{SerdeBincode, SerdeJson, Str, I128}; | ||||
| use meilisearch_types::heed::{self, Database, Env, PutFlags, RoTxn, RwTxn}; | ||||
| use meilisearch_types::milli::documents::DocumentsBatchBuilder; | ||||
| use meilisearch_types::milli::index::IndexEmbeddingConfig; | ||||
| use meilisearch_types::milli::update::new::indexer::document_changes::Progress; | ||||
| use meilisearch_types::milli::update::IndexerConfig; | ||||
| use meilisearch_types::milli::vector::{Embedder, EmbedderOptions, EmbeddingConfigs}; | ||||
| use meilisearch_types::milli::{self, CboRoaringBitmapCodec, Index, RoaringBitmapCodec, BEU32}; | ||||
| use meilisearch_types::task_view::TaskView; | ||||
| use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task, TaskProgress}; | ||||
| use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; | ||||
| use processing::ProcessingTasks; | ||||
| use rayon::current_num_threads; | ||||
| use rayon::prelude::{IntoParallelIterator, ParallelIterator}; | ||||
| use roaring::RoaringBitmap; | ||||
| @@ -72,7 +73,8 @@ use utils::{filter_out_references_to_newer_tasks, keep_ids_within_datetimes, map | ||||
| use uuid::Uuid; | ||||
|  | ||||
| use crate::index_mapper::IndexMapper; | ||||
| use crate::utils::{check_index_swap_validity, clamp_to_page_size, ProcessingBatch}; | ||||
| use crate::processing::{AtomicTaskStep, BatchProgress}; | ||||
| use crate::utils::{check_index_swap_validity, clamp_to_page_size}; | ||||
|  | ||||
| pub(crate) type BEI128 = I128<BE>; | ||||
|  | ||||
| @@ -163,48 +165,6 @@ impl Query { | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[derive(Debug, Clone)] | ||||
| pub struct ProcessingTasks { | ||||
|     batch: Option<ProcessingBatch>, | ||||
|     /// The list of tasks ids that are currently running. | ||||
|     processing: RoaringBitmap, | ||||
|     /// The progress on processing tasks | ||||
|     progress: Option<TaskProgress>, | ||||
| } | ||||
|  | ||||
| impl ProcessingTasks { | ||||
|     /// Creates an empty `ProcessingAt` struct. | ||||
|     fn new() -> ProcessingTasks { | ||||
|         ProcessingTasks { batch: None, processing: RoaringBitmap::new(), progress: None } | ||||
|     } | ||||
|  | ||||
|     /// Stores the currently processing tasks, and the date time at which it started. | ||||
|     fn start_processing(&mut self, processing_batch: ProcessingBatch, processing: RoaringBitmap) { | ||||
|         self.batch = Some(processing_batch); | ||||
|         self.processing = processing; | ||||
|     } | ||||
|  | ||||
|     fn update_progress(&mut self, progress: Progress) -> TaskProgress { | ||||
|         self.progress.get_or_insert_with(TaskProgress::default).update(progress) | ||||
|     } | ||||
|  | ||||
|     /// Set the processing tasks to an empty list | ||||
|     fn stop_processing(&mut self) -> Self { | ||||
|         self.progress = None; | ||||
|  | ||||
|         Self { | ||||
|             batch: std::mem::take(&mut self.batch), | ||||
|             processing: std::mem::take(&mut self.processing), | ||||
|             progress: None, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /// Returns `true` if there, at least, is one task that is currently processing that we must stop. | ||||
|     fn must_cancel_processing_tasks(&self, canceled_tasks: &RoaringBitmap) -> bool { | ||||
|         !self.processing.is_disjoint(canceled_tasks) | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[derive(Default, Clone, Debug)] | ||||
| struct MustStopProcessing(Arc<AtomicBool>); | ||||
|  | ||||
| @@ -813,7 +773,7 @@ impl IndexScheduler { | ||||
|             let mut batch_tasks = RoaringBitmap::new(); | ||||
|             for batch_uid in batch_uids { | ||||
|                 if processing_batch.as_ref().map_or(false, |batch| batch.uid == *batch_uid) { | ||||
|                     batch_tasks |= &processing_tasks; | ||||
|                     batch_tasks |= &*processing_tasks; | ||||
|                 } else { | ||||
|                     batch_tasks |= self.tasks_in_batch(rtxn, *batch_uid)?; | ||||
|                 } | ||||
| @@ -827,13 +787,13 @@ impl IndexScheduler { | ||||
|                 match status { | ||||
|                     // special case for Processing tasks | ||||
|                     Status::Processing => { | ||||
|                         status_tasks |= &processing_tasks; | ||||
|                         status_tasks |= &*processing_tasks; | ||||
|                     } | ||||
|                     status => status_tasks |= &self.get_status(rtxn, *status)?, | ||||
|                 }; | ||||
|             } | ||||
|             if !status.contains(&Status::Processing) { | ||||
|                 tasks -= &processing_tasks; | ||||
|                 tasks -= &*processing_tasks; | ||||
|             } | ||||
|             tasks &= status_tasks; | ||||
|         } | ||||
| @@ -882,7 +842,7 @@ impl IndexScheduler { | ||||
|         // Once we have filtered the two subsets, we put them back together and assign it back to `tasks`. | ||||
|         tasks = { | ||||
|             let (mut filtered_non_processing_tasks, mut filtered_processing_tasks) = | ||||
|                 (&tasks - &processing_tasks, &tasks & &processing_tasks); | ||||
|                 (&tasks - &*processing_tasks, &tasks & &*processing_tasks); | ||||
|  | ||||
|             // special case for Processing tasks | ||||
|             // A closure that clears the filtered_processing_tasks if their started_at date falls outside the given bounds | ||||
| @@ -1090,7 +1050,7 @@ impl IndexScheduler { | ||||
|         // Once we have filtered the two subsets, we put them back together and assign it back to `batches`. | ||||
|         batches = { | ||||
|             let (mut filtered_non_processing_batches, mut filtered_processing_batches) = | ||||
|                 (&batches - &processing.processing, &batches & &processing.processing); | ||||
|                 (&batches - &*processing.processing, &batches & &*processing.processing); | ||||
|  | ||||
|             // special case for Processing batches | ||||
|             // A closure that clears the filtered_processing_batches if their started_at date falls outside the given bounds | ||||
| @@ -1606,7 +1566,8 @@ impl IndexScheduler { | ||||
|  | ||||
|         // We reset the must_stop flag to be sure that we don't stop processing tasks | ||||
|         self.must_stop_processing.reset(); | ||||
|         self.processing_tasks | ||||
|         let progress = self | ||||
|             .processing_tasks | ||||
|             .write() | ||||
|             .unwrap() | ||||
|             // We can clone the processing batch here because we don't want its modification to affect the view of the processing batches | ||||
| @@ -1619,11 +1580,12 @@ impl IndexScheduler { | ||||
|         let res = { | ||||
|             let cloned_index_scheduler = self.private_clone(); | ||||
|             let processing_batch = &mut processing_batch; | ||||
|             let progress = progress.clone(); | ||||
|             std::thread::scope(|s| { | ||||
|                 let handle = std::thread::Builder::new() | ||||
|                     .name(String::from("batch-operation")) | ||||
|                     .spawn_scoped(s, move || { | ||||
|                         cloned_index_scheduler.process_batch(batch, processing_batch) | ||||
|                         cloned_index_scheduler.process_batch(batch, processing_batch, progress) | ||||
|                     }) | ||||
|                     .unwrap(); | ||||
|                 handle.join().unwrap_or(Err(Error::ProcessBatchPanicked)) | ||||
| @@ -1636,6 +1598,7 @@ impl IndexScheduler { | ||||
|         #[cfg(test)] | ||||
|         self.maybe_fail(tests::FailureLocation::AcquiringWtxn)?; | ||||
|  | ||||
|         progress.update_progress(BatchProgress::WritingTasksToDisk); | ||||
|         processing_batch.finished(); | ||||
|         let mut wtxn = self.env.write_txn().map_err(Error::HeedTransaction)?; | ||||
|         let mut canceled = RoaringBitmap::new(); | ||||
| @@ -1645,12 +1608,15 @@ impl IndexScheduler { | ||||
|                 #[cfg(test)] | ||||
|                 self.breakpoint(Breakpoint::ProcessBatchSucceeded); | ||||
|  | ||||
|                 let (task_progress, task_progress_obj) = AtomicTaskStep::new(tasks.len() as u32); | ||||
|                 progress.update_progress(task_progress_obj); | ||||
|                 let mut success = 0; | ||||
|                 let mut failure = 0; | ||||
|                 let mut canceled_by = None; | ||||
|  | ||||
|                 #[allow(unused_variables)] | ||||
|                 for (i, mut task) in tasks.into_iter().enumerate() { | ||||
|                     task_progress.fetch_add(1, Ordering::Relaxed); | ||||
|                     processing_batch.update(&mut task); | ||||
|                     if task.status == Status::Canceled { | ||||
|                         canceled.insert(task.uid); | ||||
| @@ -1718,8 +1684,12 @@ impl IndexScheduler { | ||||
|             Err(err) => { | ||||
|                 #[cfg(test)] | ||||
|                 self.breakpoint(Breakpoint::ProcessBatchFailed); | ||||
|                 let (task_progress, task_progress_obj) = AtomicTaskStep::new(ids.len() as u32); | ||||
|                 progress.update_progress(task_progress_obj); | ||||
|  | ||||
|                 let error: ResponseError = err.into(); | ||||
|                 for id in ids.iter() { | ||||
|                     task_progress.fetch_add(1, Ordering::Relaxed); | ||||
|                     let mut task = self | ||||
|                         .get_task(&wtxn, id) | ||||
|                         .map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))? | ||||
|   | ||||
							
								
								
									
										316
									
								
								crates/index-scheduler/src/processing.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										316
									
								
								crates/index-scheduler/src/processing.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,316 @@ | ||||
| use std::borrow::Cow; | ||||
| use std::sync::Arc; | ||||
|  | ||||
| use enum_iterator::Sequence; | ||||
| use meilisearch_types::milli::progress::{AtomicSubStep, NamedStep, Progress, ProgressView, Step}; | ||||
| use meilisearch_types::milli::{make_atomic_progress, make_enum_progress}; | ||||
| use roaring::RoaringBitmap; | ||||
|  | ||||
| use crate::utils::ProcessingBatch; | ||||
|  | ||||
| #[derive(Clone)] | ||||
| pub struct ProcessingTasks { | ||||
|     pub batch: Option<Arc<ProcessingBatch>>, | ||||
|     /// The list of tasks ids that are currently running. | ||||
|     pub processing: Arc<RoaringBitmap>, | ||||
|     /// The progress on processing tasks | ||||
|     pub progress: Option<Progress>, | ||||
| } | ||||
|  | ||||
| impl ProcessingTasks { | ||||
|     /// Creates an empty `ProcessingAt` struct. | ||||
|     pub fn new() -> ProcessingTasks { | ||||
|         ProcessingTasks { batch: None, processing: Arc::new(RoaringBitmap::new()), progress: None } | ||||
|     } | ||||
|  | ||||
|     pub fn get_progress_view(&self) -> Option<ProgressView> { | ||||
|         Some(self.progress.as_ref()?.as_progress_view()) | ||||
|     } | ||||
|  | ||||
|     /// Stores the currently processing tasks, and the date time at which it started. | ||||
|     pub fn start_processing( | ||||
|         &mut self, | ||||
|         processing_batch: ProcessingBatch, | ||||
|         processing: RoaringBitmap, | ||||
|     ) -> Progress { | ||||
|         self.batch = Some(Arc::new(processing_batch)); | ||||
|         self.processing = Arc::new(processing); | ||||
|         let progress = Progress::default(); | ||||
|         progress.update_progress(BatchProgress::ProcessingTasks); | ||||
|         self.progress = Some(progress.clone()); | ||||
|  | ||||
|         progress | ||||
|     } | ||||
|  | ||||
|     /// Set the processing tasks to an empty list | ||||
|     pub fn stop_processing(&mut self) -> Self { | ||||
|         self.progress = None; | ||||
|  | ||||
|         Self { | ||||
|             batch: std::mem::take(&mut self.batch), | ||||
|             processing: std::mem::take(&mut self.processing), | ||||
|             progress: None, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /// Returns `true` if there, at least, is one task that is currently processing that we must stop. | ||||
|     pub fn must_cancel_processing_tasks(&self, canceled_tasks: &RoaringBitmap) -> bool { | ||||
|         !self.processing.is_disjoint(canceled_tasks) | ||||
|     } | ||||
| } | ||||
|  | ||||
| make_enum_progress! { | ||||
|     pub enum BatchProgress { | ||||
|         ProcessingTasks, | ||||
|         WritingTasksToDisk, | ||||
|     } | ||||
| } | ||||
|  | ||||
| make_enum_progress! { | ||||
|     pub enum TaskCancelationProgress { | ||||
|         RetrievingTasks, | ||||
|         UpdatingTasks, | ||||
|     } | ||||
| } | ||||
|  | ||||
| make_enum_progress! { | ||||
|     pub enum TaskDeletionProgress { | ||||
|         DeletingTasksDateTime, | ||||
|         DeletingTasksMetadata, | ||||
|         DeletingTasks, | ||||
|         DeletingBatches, | ||||
|     } | ||||
| } | ||||
|  | ||||
| make_enum_progress! { | ||||
|     pub enum SnapshotCreationProgress { | ||||
|         StartTheSnapshotCreation, | ||||
|         SnapshotTheIndexScheduler, | ||||
|         SnapshotTheUpdateFiles, | ||||
|         SnapshotTheIndexes, | ||||
|         SnapshotTheApiKeys, | ||||
|         CreateTheTarball, | ||||
|     } | ||||
| } | ||||
|  | ||||
| make_enum_progress! { | ||||
|     pub enum DumpCreationProgress { | ||||
|         StartTheDumpCreation, | ||||
|         DumpTheApiKeys, | ||||
|         DumpTheTasks, | ||||
|         DumpTheIndexes, | ||||
|         DumpTheExperimentalFeatures, | ||||
|         CompressTheDump, | ||||
|     } | ||||
| } | ||||
|  | ||||
| make_enum_progress! { | ||||
|     pub enum CreateIndexProgress { | ||||
|         CreatingTheIndex, | ||||
|     } | ||||
| } | ||||
|  | ||||
| make_enum_progress! { | ||||
|     pub enum UpdateIndexProgress { | ||||
|         UpdatingTheIndex, | ||||
|     } | ||||
| } | ||||
|  | ||||
| make_enum_progress! { | ||||
|     pub enum DeleteIndexProgress { | ||||
|         DeletingTheIndex, | ||||
|     } | ||||
| } | ||||
|  | ||||
| make_enum_progress! { | ||||
|     pub enum SwappingTheIndexes { | ||||
|         EnsuringCorrectnessOfTheSwap, | ||||
|         SwappingTheIndexes, | ||||
|     } | ||||
| } | ||||
|  | ||||
| make_enum_progress! { | ||||
|     pub enum InnerSwappingTwoIndexes { | ||||
|         RetrieveTheTasks, | ||||
|         UpdateTheTasks, | ||||
|         UpdateTheIndexesMetadata, | ||||
|     } | ||||
| } | ||||
|  | ||||
| make_enum_progress! { | ||||
|     pub enum DocumentOperationProgress { | ||||
|         RetrievingConfig, | ||||
|         ComputingDocumentChanges, | ||||
|         Indexing, | ||||
|     } | ||||
| } | ||||
|  | ||||
| make_enum_progress! { | ||||
|     pub enum DocumentEditionProgress { | ||||
|         RetrievingConfig, | ||||
|         ComputingDocumentChanges, | ||||
|         Indexing, | ||||
|     } | ||||
| } | ||||
|  | ||||
| make_enum_progress! { | ||||
|     pub enum DocumentDeletionProgress { | ||||
|         RetrievingConfig, | ||||
|         DeleteDocuments, | ||||
|         Indexing, | ||||
|     } | ||||
| } | ||||
|  | ||||
| make_enum_progress! { | ||||
|     pub enum SettingsProgress { | ||||
|         RetrievingAndMergingTheSettings, | ||||
|         ApplyTheSettings, | ||||
|     } | ||||
| } | ||||
|  | ||||
| make_atomic_progress!(Task alias AtomicTaskStep => "task" ); | ||||
| make_atomic_progress!(Document alias AtomicDocumentStep => "document" ); | ||||
| make_atomic_progress!(Batch alias AtomicBatchStep => "batch" ); | ||||
| make_atomic_progress!(UpdateFile alias AtomicUpdateFileStep => "update file" ); | ||||
|  | ||||
| pub struct VariableNameStep { | ||||
|     name: String, | ||||
|     current: u32, | ||||
|     total: u32, | ||||
| } | ||||
|  | ||||
| impl VariableNameStep { | ||||
|     pub fn new(name: impl Into<String>, current: u32, total: u32) -> Self { | ||||
|         Self { name: name.into(), current, total } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl Step for VariableNameStep { | ||||
|     fn name(&self) -> Cow<'static, str> { | ||||
|         self.name.clone().into() | ||||
|     } | ||||
|  | ||||
|     fn current(&self) -> u32 { | ||||
|         self.current | ||||
|     } | ||||
|  | ||||
|     fn total(&self) -> u32 { | ||||
|         self.total | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[cfg(test)] | ||||
| mod test { | ||||
|     use std::sync::atomic::Ordering; | ||||
|  | ||||
|     use meili_snap::{json_string, snapshot}; | ||||
|  | ||||
|     use super::*; | ||||
|  | ||||
|     #[test] | ||||
|     fn one_level() { | ||||
|         let mut processing = ProcessingTasks::new(); | ||||
|         processing.start_processing(ProcessingBatch::new(0), RoaringBitmap::new()); | ||||
|         snapshot!(json_string!(processing.get_progress_view()), @r#" | ||||
|         { | ||||
|           "steps": [ | ||||
|             { | ||||
|               "currentStep": "processing tasks", | ||||
|               "finished": 0, | ||||
|               "total": 2 | ||||
|             } | ||||
|           ], | ||||
|           "percentage": 0.0 | ||||
|         } | ||||
|         "#); | ||||
|         processing.progress.as_ref().unwrap().update_progress(BatchProgress::WritingTasksToDisk); | ||||
|         snapshot!(json_string!(processing.get_progress_view()), @r#" | ||||
|         { | ||||
|           "steps": [ | ||||
|             { | ||||
|               "currentStep": "writing tasks to disk", | ||||
|               "finished": 1, | ||||
|               "total": 2 | ||||
|             } | ||||
|           ], | ||||
|           "percentage": 50.0 | ||||
|         } | ||||
|         "#); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn task_progress() { | ||||
|         let mut processing = ProcessingTasks::new(); | ||||
|         processing.start_processing(ProcessingBatch::new(0), RoaringBitmap::new()); | ||||
|         let (atomic, tasks) = AtomicTaskStep::new(10); | ||||
|         processing.progress.as_ref().unwrap().update_progress(tasks); | ||||
|         snapshot!(json_string!(processing.get_progress_view()), @r#" | ||||
|         { | ||||
|           "steps": [ | ||||
|             { | ||||
|               "currentStep": "processing tasks", | ||||
|               "finished": 0, | ||||
|               "total": 2 | ||||
|             }, | ||||
|             { | ||||
|               "currentStep": "task", | ||||
|               "finished": 0, | ||||
|               "total": 10 | ||||
|             } | ||||
|           ], | ||||
|           "percentage": 0.0 | ||||
|         } | ||||
|         "#); | ||||
|         atomic.fetch_add(6, Ordering::Relaxed); | ||||
|         snapshot!(json_string!(processing.get_progress_view()), @r#" | ||||
|         { | ||||
|           "steps": [ | ||||
|             { | ||||
|               "currentStep": "processing tasks", | ||||
|               "finished": 0, | ||||
|               "total": 2 | ||||
|             }, | ||||
|             { | ||||
|               "currentStep": "task", | ||||
|               "finished": 6, | ||||
|               "total": 10 | ||||
|             } | ||||
|           ], | ||||
|           "percentage": 30.000002 | ||||
|         } | ||||
|         "#); | ||||
|         processing.progress.as_ref().unwrap().update_progress(BatchProgress::WritingTasksToDisk); | ||||
|         snapshot!(json_string!(processing.get_progress_view()), @r#" | ||||
|         { | ||||
|           "steps": [ | ||||
|             { | ||||
|               "currentStep": "writing tasks to disk", | ||||
|               "finished": 1, | ||||
|               "total": 2 | ||||
|             } | ||||
|           ], | ||||
|           "percentage": 50.0 | ||||
|         } | ||||
|         "#); | ||||
|         let (atomic, tasks) = AtomicTaskStep::new(5); | ||||
|         processing.progress.as_ref().unwrap().update_progress(tasks); | ||||
|         atomic.fetch_add(4, Ordering::Relaxed); | ||||
|         snapshot!(json_string!(processing.get_progress_view()), @r#" | ||||
|         { | ||||
|           "steps": [ | ||||
|             { | ||||
|               "currentStep": "writing tasks to disk", | ||||
|               "finished": 1, | ||||
|               "total": 2 | ||||
|             }, | ||||
|             { | ||||
|               "currentStep": "task", | ||||
|               "finished": 4, | ||||
|               "total": 5 | ||||
|             } | ||||
|           ], | ||||
|           "percentage": 90.0 | ||||
|         } | ||||
|         "#); | ||||
|     } | ||||
| } | ||||
| @@ -134,6 +134,7 @@ impl ProcessingBatch { | ||||
|     pub fn to_batch(&self) -> Batch { | ||||
|         Batch { | ||||
|             uid: self.uid, | ||||
|             progress: None, | ||||
|             details: self.details.clone(), | ||||
|             stats: self.stats.clone(), | ||||
|             started_at: self.started_at, | ||||
| @@ -187,6 +188,7 @@ impl IndexScheduler { | ||||
|             &batch.uid, | ||||
|             &Batch { | ||||
|                 uid: batch.uid, | ||||
|                 progress: None, | ||||
|                 details: batch.details, | ||||
|                 stats: batch.stats, | ||||
|                 started_at: batch.started_at, | ||||
| @@ -273,7 +275,9 @@ impl IndexScheduler { | ||||
|             .into_iter() | ||||
|             .map(|batch_id| { | ||||
|                 if Some(batch_id) == processing.batch.as_ref().map(|batch| batch.uid) { | ||||
|                     Ok(processing.batch.as_ref().unwrap().to_batch()) | ||||
|                     let mut batch = processing.batch.as_ref().unwrap().to_batch(); | ||||
|                     batch.progress = processing.get_progress_view(); | ||||
|                     Ok(batch) | ||||
|                 } else { | ||||
|                     self.get_batch(rtxn, batch_id) | ||||
|                         .and_then(|task| task.ok_or(Error::CorruptedTaskQueue)) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user