mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-31 16:06:31 +00:00 
			
		
		
		
	When canceling an upgrade task, execute the rollback code
This commit is contained in:
		| @@ -6,7 +6,8 @@ use meilisearch_types::batches::{BatchEnqueuedAt, BatchId}; | ||||
| use meilisearch_types::heed::{RoTxn, RwTxn}; | ||||
| use meilisearch_types::milli::progress::{Progress, VariableNameStep}; | ||||
| use meilisearch_types::milli::{self, ChannelCongestion}; | ||||
| use meilisearch_types::tasks::{Details, IndexSwap, KindWithContent, Status, Task}; | ||||
| use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task}; | ||||
| use meilisearch_types::versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH}; | ||||
| use milli::update::Settings as MilliSettings; | ||||
| use roaring::RoaringBitmap; | ||||
|  | ||||
| @@ -144,11 +145,22 @@ impl IndexScheduler { | ||||
|                     self.index_mapper.index(&rtxn, &index_uid)? | ||||
|                 }; | ||||
|  | ||||
|                 let mut index_wtxn = index.write_txn()?; | ||||
|  | ||||
|                 let index_version = index.get_version(&index_wtxn)?.unwrap_or((1, 12, 0)); | ||||
|                 let package_version = (VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH); | ||||
|                 if index_version != (VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH) { | ||||
|                     return Err(Error::IndexVersionMismatch { | ||||
|                         index: index_uid, | ||||
|                         index_version, | ||||
|                         package_version, | ||||
|                     }); | ||||
|                 } | ||||
|  | ||||
|                 // the index operation can take a long time, so save this handle to make it available to the search for the duration of the tick | ||||
|                 self.index_mapper | ||||
|                     .set_currently_updating_index(Some((index_uid.clone(), index.clone()))); | ||||
|  | ||||
|                 let mut index_wtxn = index.write_txn()?; | ||||
|                 let pre_commit_dabases_sizes = index.database_sizes(&index_wtxn)?; | ||||
|                 let (tasks, congestion) = | ||||
|                     self.apply_index_operation(&mut index_wtxn, &index, op, &progress)?; | ||||
| @@ -353,9 +365,11 @@ impl IndexScheduler { | ||||
|                 let KindWithContent::UpgradeDatabase { from } = tasks.last().unwrap().kind else { | ||||
|                     unreachable!(); | ||||
|                 }; | ||||
|  | ||||
|                 let ret = catch_unwind(AssertUnwindSafe(|| self.process_upgrade(from, progress))); | ||||
|                 match ret { | ||||
|                     Ok(Ok(())) => (), | ||||
|                     Ok(Err(Error::AbortedTask)) => return Err(Error::AbortedTask), | ||||
|                     Ok(Err(e)) => return Err(Error::DatabaseUpgrade(Box::new(e))), | ||||
|                     Err(e) => { | ||||
|                         let msg = match e.downcast_ref::<&'static str>() { | ||||
| @@ -653,17 +667,79 @@ impl IndexScheduler { | ||||
|         progress: &Progress, | ||||
|     ) -> Result<Vec<Task>> { | ||||
|         progress.update_progress(TaskCancelationProgress::RetrievingTasks); | ||||
|         let mut tasks_to_cancel = RoaringBitmap::new(); | ||||
|  | ||||
|         let enqueued_tasks = &self.queue.tasks.get_status(rtxn, Status::Enqueued)?; | ||||
|  | ||||
|         // 0. Check if any upgrade task was matched. | ||||
|         //    If so, we cancel all the failed or enqueued upgrade tasks. | ||||
|         let upgrade_tasks = &self.queue.tasks.get_kind(rtxn, Kind::UpgradeDatabase)?; | ||||
|         let is_canceling_upgrade = !matched_tasks.is_disjoint(upgrade_tasks); | ||||
|         if is_canceling_upgrade { | ||||
|             let failed_tasks = self.queue.tasks.get_status(rtxn, Status::Failed)?; | ||||
|             tasks_to_cancel |= upgrade_tasks & (enqueued_tasks | failed_tasks); | ||||
|         } | ||||
|         // 1. Remove from this list the tasks that we are not allowed to cancel | ||||
|         //    Notice that only the _enqueued_ ones are cancelable and we should | ||||
|         //    have already aborted the indexation of the _processing_ ones | ||||
|         let cancelable_tasks = self.queue.tasks.get_status(rtxn, Status::Enqueued)?; | ||||
|         let tasks_to_cancel = cancelable_tasks & matched_tasks; | ||||
|         tasks_to_cancel |= enqueued_tasks & matched_tasks; | ||||
|  | ||||
|         // 2. If we're canceling an upgrade, attempt the rollback | ||||
|         if let Some(latest_upgrade_task) = (&tasks_to_cancel & upgrade_tasks).max() { | ||||
|             progress.update_progress(TaskCancelationProgress::CancelingUpgrade); | ||||
|  | ||||
|             let task = self.queue.tasks.get_task(rtxn, latest_upgrade_task)?.unwrap(); | ||||
|             let Some(Details::UpgradeDatabase { from, to }) = task.details else { | ||||
|                 unreachable!("wrong details for upgrade task {latest_upgrade_task}") | ||||
|             }; | ||||
|  | ||||
|             // check that we are rollbacking an upgrade to the current Meilisearch | ||||
|             let bin_major: u32 = meilisearch_types::versioning::VERSION_MAJOR; | ||||
|             let bin_minor: u32 = meilisearch_types::versioning::VERSION_MINOR; | ||||
|             let bin_patch: u32 = meilisearch_types::versioning::VERSION_PATCH; | ||||
|  | ||||
|             if to == (bin_major, bin_minor, bin_patch) { | ||||
|                 tracing::warn!( | ||||
|                     "Rollbacking from v{}.{}.{} to v{}.{}.{}", | ||||
|                     to.0, | ||||
|                     to.1, | ||||
|                     to.2, | ||||
|                     from.0, | ||||
|                     from.1, | ||||
|                     from.2 | ||||
|                 ); | ||||
|                 match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { | ||||
|                     self.process_rollback(from, progress) | ||||
|                 })) { | ||||
|                     Ok(Ok(())) => {} | ||||
|                     Ok(Err(err)) => return Err(Error::DatabaseUpgrade(Box::new(err))), | ||||
|                     Err(e) => { | ||||
|                         let msg = match e.downcast_ref::<&'static str>() { | ||||
|                             Some(s) => *s, | ||||
|                             None => match e.downcast_ref::<String>() { | ||||
|                                 Some(s) => &s[..], | ||||
|                                 None => "Box<dyn Any>", | ||||
|                             }, | ||||
|                         }; | ||||
|                         return Err(Error::DatabaseUpgrade(Box::new(Error::ProcessBatchPanicked( | ||||
|                             msg.to_string(), | ||||
|                         )))); | ||||
|                     } | ||||
|                 } | ||||
|             } else { | ||||
|                 tracing::debug!( | ||||
|                     "Not rollbacking an upgrade targetting the earlier version v{}.{}.{}", | ||||
|                     bin_major, | ||||
|                     bin_minor, | ||||
|                     bin_patch | ||||
|                 ) | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         // 3. We now have a list of tasks to cancel, cancel them | ||||
|         let (task_progress, progress_obj) = AtomicTaskStep::new(tasks_to_cancel.len() as u32); | ||||
|         progress.update_progress(progress_obj); | ||||
|  | ||||
|         // 2. We now have a list of tasks to cancel, cancel them | ||||
|         let mut tasks = self.queue.tasks.get_existing_tasks( | ||||
|             rtxn, | ||||
|             tasks_to_cancel.iter().inspect(|_| { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user