mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-25 21:16:28 +00:00 
			
		
		
		
	add the version in the index-scheduler
This commit is contained in:
		| @@ -93,15 +93,16 @@ impl FeatureData { | ||||
|         NUMBER_OF_DATABASES | ||||
|     } | ||||
|  | ||||
|     pub fn new(env: &Env, instance_features: InstanceTogglableFeatures) -> Result<Self> { | ||||
|         let mut wtxn = env.write_txn()?; | ||||
|     pub fn new( | ||||
|         env: &Env, | ||||
|         wtxn: &mut RwTxn, | ||||
|         instance_features: InstanceTogglableFeatures, | ||||
|     ) -> Result<Self> { | ||||
|         let runtime_features_db = | ||||
|             env.create_database(&mut wtxn, Some(db_name::EXPERIMENTAL_FEATURES))?; | ||||
|         wtxn.commit()?; | ||||
|             env.create_database(wtxn, Some(db_name::EXPERIMENTAL_FEATURES))?; | ||||
|  | ||||
|         let txn = env.read_txn()?; | ||||
|         let persisted_features: RuntimeTogglableFeatures = | ||||
|             runtime_features_db.get(&txn, db_name::EXPERIMENTAL_FEATURES)?.unwrap_or_default(); | ||||
|             runtime_features_db.get(wtxn, db_name::EXPERIMENTAL_FEATURES)?.unwrap_or_default(); | ||||
|         let InstanceTogglableFeatures { metrics, logs_route, contains_filter } = instance_features; | ||||
|         let runtime = Arc::new(RwLock::new(RuntimeTogglableFeatures { | ||||
|             metrics: metrics || persisted_features.metrics, | ||||
|   | ||||
| @@ -21,6 +21,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String { | ||||
|         cleanup_enabled: _, | ||||
|         processing_tasks, | ||||
|         env, | ||||
|         version: _, | ||||
|         queue, | ||||
|         scheduler, | ||||
|  | ||||
|   | ||||
| @@ -33,6 +33,7 @@ mod test_utils; | ||||
| pub mod upgrade; | ||||
| mod utils; | ||||
| pub mod uuid_codec; | ||||
| mod versioning; | ||||
|  | ||||
| pub type Result<T, E = Error> = std::result::Result<T, E>; | ||||
| pub type TaskId = u32; | ||||
| @@ -66,6 +67,7 @@ use queue::Queue; | ||||
| use roaring::RoaringBitmap; | ||||
| use scheduler::Scheduler; | ||||
| use time::OffsetDateTime; | ||||
| use versioning::Versioning; | ||||
|  | ||||
| use crate::index_mapper::IndexMapper; | ||||
| use crate::utils::clamp_to_page_size; | ||||
| @@ -134,17 +136,18 @@ pub struct IndexScheduler { | ||||
|     /// The list of tasks currently processing | ||||
|     pub(crate) processing_tasks: Arc<RwLock<ProcessingTasks>>, | ||||
|  | ||||
|     /// A database containing only the version of the index-scheduler | ||||
|     pub version: versioning::Versioning, | ||||
|     /// The queue containing both the tasks and the batches. | ||||
|     pub queue: queue::Queue, | ||||
|  | ||||
|     pub scheduler: scheduler::Scheduler, | ||||
|  | ||||
|     /// In charge of creating, opening, storing and returning indexes. | ||||
|     pub(crate) index_mapper: IndexMapper, | ||||
|  | ||||
|     /// In charge of fetching and setting the status of experimental features. | ||||
|     features: features::FeatureData, | ||||
|  | ||||
|     /// Everything related to the processing of the tasks | ||||
|     pub scheduler: scheduler::Scheduler, | ||||
|  | ||||
|     /// Whether we should automatically cleanup the task queue or not. | ||||
|     pub(crate) cleanup_enabled: bool, | ||||
|  | ||||
| @@ -179,6 +182,7 @@ impl IndexScheduler { | ||||
|         IndexScheduler { | ||||
|             env: self.env.clone(), | ||||
|             processing_tasks: self.processing_tasks.clone(), | ||||
|             version: self.version.clone(), | ||||
|             queue: self.queue.private_clone(), | ||||
|             scheduler: self.scheduler.private_clone(), | ||||
|  | ||||
| @@ -198,13 +202,14 @@ impl IndexScheduler { | ||||
|     } | ||||
|  | ||||
|     pub(crate) const fn nb_db() -> u32 { | ||||
|         Queue::nb_db() + IndexMapper::nb_db() + features::FeatureData::nb_db() | ||||
|         Versioning::nb_db() + Queue::nb_db() + IndexMapper::nb_db() + features::FeatureData::nb_db() | ||||
|     } | ||||
|  | ||||
|     /// Create an index scheduler and start its run loop. | ||||
|     #[allow(private_interfaces)] // because test_utils is private | ||||
|     pub fn new( | ||||
|         options: IndexSchedulerOptions, | ||||
|         from_db_version: (u32, u32, u32), | ||||
|         #[cfg(test)] test_breakpoint_sdr: crossbeam_channel::Sender<(test_utils::Breakpoint, bool)>, | ||||
|         #[cfg(test)] planned_failures: Vec<(usize, test_utils::FailureLocation)>, | ||||
|     ) -> Result<Self> { | ||||
| @@ -241,9 +246,11 @@ impl IndexScheduler { | ||||
|                 .open(&options.tasks_path) | ||||
|         }?; | ||||
|  | ||||
|         let features = features::FeatureData::new(&env, options.instance_features)?; | ||||
|         // We **must** starts by upgrading the version because it'll also upgrade the required database before we can open them | ||||
|         let version = versioning::Versioning::new(&env, from_db_version)?; | ||||
|  | ||||
|         let mut wtxn = env.write_txn()?; | ||||
|         let features = features::FeatureData::new(&env, &mut wtxn, options.instance_features)?; | ||||
|         let queue = Queue::new(&env, &mut wtxn, &options)?; | ||||
|         let index_mapper = IndexMapper::new(&env, &mut wtxn, &options, budget)?; | ||||
|         wtxn.commit()?; | ||||
| @@ -251,6 +258,7 @@ impl IndexScheduler { | ||||
|         // allow unreachable_code to get rids of the warning in the case of a test build. | ||||
|         let this = Self { | ||||
|             processing_tasks: Arc::new(RwLock::new(ProcessingTasks::new())), | ||||
|             version, | ||||
|             queue, | ||||
|             scheduler: Scheduler::new(&options), | ||||
|  | ||||
|   | ||||
| @@ -9,7 +9,7 @@ use meilisearch_types::document_formats::DocumentFormatError; | ||||
| use meilisearch_types::milli::update::IndexDocumentsMethod::ReplaceDocuments; | ||||
| use meilisearch_types::milli::update::IndexerConfig; | ||||
| use meilisearch_types::tasks::KindWithContent; | ||||
| use meilisearch_types::VERSION_FILE_NAME; | ||||
| use meilisearch_types::{versioning, VERSION_FILE_NAME}; | ||||
| use tempfile::{NamedTempFile, TempDir}; | ||||
| use uuid::Uuid; | ||||
| use Breakpoint::*; | ||||
| @@ -113,7 +113,13 @@ impl IndexScheduler { | ||||
|         }; | ||||
|         configuration(&mut options); | ||||
|  | ||||
|         let index_scheduler = Self::new(options, sender, planned_failures).unwrap(); | ||||
|         let version = ( | ||||
|             versioning::VERSION_MAJOR.parse().unwrap(), | ||||
|             versioning::VERSION_MINOR.parse().unwrap(), | ||||
|             versioning::VERSION_PATCH.parse().unwrap(), | ||||
|         ); | ||||
|  | ||||
|         let index_scheduler = Self::new(options, version, sender, planned_failures).unwrap(); | ||||
|  | ||||
|         // To be 100% consistent between all test we're going to start the scheduler right now | ||||
|         // and ensure it's in the expected starting state. | ||||
| @@ -406,8 +412,7 @@ impl IndexSchedulerHandle { | ||||
|             .recv_timeout(std::time::Duration::from_secs(1)) { | ||||
|                 Ok((_, true)) => continue, | ||||
|                 Ok((b, false)) => panic!("The scheduler was supposed to be down but successfully moved to the next breakpoint: {b:?}"), | ||||
|                 Err(RecvTimeoutError::Timeout) => panic!(), | ||||
|                 Err(RecvTimeoutError::Disconnected) => break, | ||||
|                 Err(RecvTimeoutError::Timeout | RecvTimeoutError::Disconnected) => break, | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|   | ||||
| @@ -1,25 +1,28 @@ | ||||
| use std::path::Path; | ||||
|  | ||||
| use anyhow::bail; | ||||
| use meilisearch_types::heed; | ||||
| use meilisearch_types::heed::{Env, RwTxn}; | ||||
| use meilisearch_types::tasks::{Details, KindWithContent, Status, Task}; | ||||
| use meilisearch_types::versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH}; | ||||
| use time::OffsetDateTime; | ||||
| use tracing::info; | ||||
|  | ||||
| use crate::queue::TaskQueue; | ||||
| use crate::IndexSchedulerOptions; | ||||
|  | ||||
| pub fn upgrade_task_queue( | ||||
|     opt: &IndexSchedulerOptions, | ||||
| trait UpgradeIndexScheduler { | ||||
|     fn upgrade(&self, env: &Env, wtxn: &mut RwTxn, original: (u32, u32, u32)) | ||||
|         -> anyhow::Result<()>; | ||||
|     fn target_version(&self) -> (u32, u32, u32); | ||||
| } | ||||
|  | ||||
| pub fn upgrade_index_scheduler( | ||||
|     env: &Env, | ||||
|     from: (u32, u32, u32), | ||||
|     to: (u32, u32, u32), | ||||
| ) -> anyhow::Result<()> { | ||||
|     let current_major: u32 = VERSION_MAJOR.parse().unwrap(); | ||||
|     let current_minor: u32 = VERSION_MINOR.parse().unwrap(); | ||||
|     let current_patch: u32 = VERSION_PATCH.parse().unwrap(); | ||||
|     let current_major = to.0; | ||||
|     let current_minor = to.1; | ||||
|     let current_patch = to.2; | ||||
|  | ||||
|     let upgrade_functions = | ||||
|         [(v1_12_to_current as fn(&Path) -> anyhow::Result<()>, "Upgrading from v1.12 to v1.13")]; | ||||
|     let upgrade_functions: &[&dyn UpgradeIndexScheduler] = &[&V1_12_ToCurrent {}]; | ||||
|  | ||||
|     let start = match from { | ||||
|         (1, 12, _) => 0, | ||||
| @@ -41,20 +44,23 @@ pub fn upgrade_task_queue( | ||||
|         } | ||||
|     }; | ||||
|  | ||||
|     let mut current_version = from; | ||||
|  | ||||
|     info!("Upgrading the task queue"); | ||||
|     for (upgrade, upgrade_name) in upgrade_functions[start..].iter() { | ||||
|         info!("{upgrade_name}"); | ||||
|         (upgrade)(&opt.tasks_path)?; | ||||
|     for upgrade in upgrade_functions[start..].iter() { | ||||
|         let target = upgrade.target_version(); | ||||
|         info!( | ||||
|             "Upgrading from v{}.{}.{} to v{}.{}.{}", | ||||
|             from.0, from.1, from.2, current_version.0, current_version.1, current_version.2 | ||||
|         ); | ||||
|         let mut wtxn = env.write_txn()?; | ||||
|         upgrade.upgrade(env, &mut wtxn, from)?; | ||||
|         wtxn.commit()?; | ||||
|         current_version = target; | ||||
|     } | ||||
|  | ||||
|     let env = unsafe { | ||||
|         heed::EnvOpenOptions::new() | ||||
|             .max_dbs(TaskQueue::nb_db()) | ||||
|             .map_size(opt.task_db_size) | ||||
|             .open(&opt.tasks_path) | ||||
|     }?; | ||||
|     let mut wtxn = env.write_txn()?; | ||||
|     let queue = TaskQueue::new(&env, &mut wtxn)?; | ||||
|     let queue = TaskQueue::new(env, &mut wtxn)?; | ||||
|     let uid = queue.next_task_id(&wtxn)?; | ||||
|     queue.register( | ||||
|         &mut wtxn, | ||||
| @@ -72,12 +78,28 @@ pub fn upgrade_task_queue( | ||||
|         }, | ||||
|     )?; | ||||
|     wtxn.commit()?; | ||||
|     // Should be pretty much instantaneous since we're the only one reading this env | ||||
|     env.prepare_for_closing().wait(); | ||||
|  | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| /// The task queue is 100% compatible with the previous versions | ||||
| fn v1_12_to_current(_path: &Path) -> anyhow::Result<()> { | ||||
|     Ok(()) | ||||
| #[allow(non_camel_case_types)] | ||||
| struct V1_12_ToCurrent {} | ||||
|  | ||||
| impl UpgradeIndexScheduler for V1_12_ToCurrent { | ||||
|     fn upgrade( | ||||
|         &self, | ||||
|         _env: &Env, | ||||
|         _wtxn: &mut RwTxn, | ||||
|         _original: (u32, u32, u32), | ||||
|     ) -> anyhow::Result<()> { | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     fn target_version(&self) -> (u32, u32, u32) { | ||||
|         ( | ||||
|             VERSION_MAJOR.parse().unwrap(), | ||||
|             VERSION_MINOR.parse().unwrap(), | ||||
|             VERSION_PATCH.parse().unwrap(), | ||||
|         ) | ||||
|     } | ||||
| } | ||||
|   | ||||
							
								
								
									
										61
									
								
								crates/index-scheduler/src/versioning.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										61
									
								
								crates/index-scheduler/src/versioning.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,61 @@ | ||||
| use crate::{upgrade::upgrade_index_scheduler, Result}; | ||||
| use meilisearch_types::{ | ||||
|     heed::{types::Str, Database, Env, RoTxn, RwTxn}, | ||||
|     milli::heed_codec::version::VersionCodec, | ||||
|     versioning, | ||||
| }; | ||||
|  | ||||
| /// The number of database used by queue itself | ||||
| const NUMBER_OF_DATABASES: u32 = 1; | ||||
| /// Database const names for the `IndexScheduler`. | ||||
| mod db_name { | ||||
|     pub const VERSION: &str = "version"; | ||||
| } | ||||
| mod entry_name { | ||||
|     pub const MAIN: &str = "main"; | ||||
| } | ||||
|  | ||||
| #[derive(Clone)] | ||||
| pub struct Versioning { | ||||
|     pub version: Database<Str, VersionCodec>, | ||||
| } | ||||
|  | ||||
| impl Versioning { | ||||
|     pub(crate) const fn nb_db() -> u32 { | ||||
|         NUMBER_OF_DATABASES | ||||
|     } | ||||
|  | ||||
|     pub fn get_version(&self, rtxn: &RoTxn) -> Result<Option<(u32, u32, u32)>> { | ||||
|         Ok(self.version.get(rtxn, entry_name::MAIN)?) | ||||
|     } | ||||
|  | ||||
|     pub fn set_version(&self, wtxn: &mut RwTxn, version: (u32, u32, u32)) -> Result<()> { | ||||
|         Ok(self.version.put(wtxn, entry_name::MAIN, &version)?) | ||||
|     } | ||||
|  | ||||
|     pub fn set_current_version(&self, wtxn: &mut RwTxn) -> Result<()> { | ||||
|         let major = versioning::VERSION_MAJOR.parse().unwrap(); | ||||
|         let minor = versioning::VERSION_MINOR.parse().unwrap(); | ||||
|         let patch = versioning::VERSION_PATCH.parse().unwrap(); | ||||
|         self.set_version(wtxn, (major, minor, patch)) | ||||
|     } | ||||
|  | ||||
|     /// Create an index scheduler and start its run loop. | ||||
|     pub(crate) fn new(env: &Env, db_version: (u32, u32, u32)) -> Result<Self> { | ||||
|         let mut wtxn = env.write_txn()?; | ||||
|         let version = env.create_database(&mut wtxn, Some(db_name::VERSION))?; | ||||
|         let this = Self { version }; | ||||
|         let from = this.get_version(&wtxn)?.unwrap_or(db_version); | ||||
|         wtxn.commit()?; | ||||
|  | ||||
|         let bin_major: u32 = versioning::VERSION_MAJOR.parse().unwrap(); | ||||
|         let bin_minor: u32 = versioning::VERSION_MINOR.parse().unwrap(); | ||||
|         let bin_patch: u32 = versioning::VERSION_PATCH.parse().unwrap(); | ||||
|         let to = (bin_major, bin_minor, bin_patch); | ||||
|  | ||||
|         if from != to { | ||||
|             upgrade_index_scheduler(env, from, to)?; | ||||
|         } | ||||
|         Ok(this) | ||||
|     } | ||||
| } | ||||
| @@ -9,6 +9,36 @@ pub static VERSION_MAJOR: &str = env!("CARGO_PKG_VERSION_MAJOR"); | ||||
| pub static VERSION_MINOR: &str = env!("CARGO_PKG_VERSION_MINOR"); | ||||
| pub static VERSION_PATCH: &str = env!("CARGO_PKG_VERSION_PATCH"); | ||||
|  | ||||
| /// Persists the version of the current Meilisearch binary to a VERSION file | ||||
| pub fn update_version_file_for_dumpless_upgrade( | ||||
|     db_path: &Path, | ||||
|     from: (u32, u32, u32), | ||||
|     to: (u32, u32, u32), | ||||
| ) -> Result<(), VersionFileError> { | ||||
|     let (from_major, from_minor, from_patch) = from; | ||||
|     let (to_major, to_minor, to_patch) = to; | ||||
|  | ||||
|     if from_major > to_major | ||||
|         || (from_major == to_major && from_minor > to_minor) | ||||
|         || (from_major == to_major && from_minor == to_minor && from_patch > to_patch) | ||||
|     { | ||||
|         Err(VersionFileError::DowngradeNotSupported { | ||||
|             major: from_major, | ||||
|             minor: from_minor, | ||||
|             patch: from_patch, | ||||
|         }) | ||||
|     } else if from_major < 1 || (from_major == to_major && from_minor < 12) { | ||||
|         Err(VersionFileError::TooOldForAutomaticUpgrade { | ||||
|             major: from_major, | ||||
|             minor: from_minor, | ||||
|             patch: from_patch, | ||||
|         }) | ||||
|     } else { | ||||
|         create_current_version_file(db_path)?; | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// Persists the version of the current Meilisearch binary to a VERSION file | ||||
| pub fn create_current_version_file(db_path: &Path) -> io::Result<()> { | ||||
|     create_version_file(db_path, VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH) | ||||
| @@ -78,6 +108,10 @@ pub enum VersionFileError { | ||||
|         env!("CARGO_PKG_VERSION").to_string() | ||||
|     )] | ||||
|     VersionMismatch { major: u32, minor: u32, patch: u32 }, | ||||
|     #[error("Database version {major}.{minor}.{patch} is higher than the Meilisearch version {VERSION_MAJOR}.{VERSION_MINOR}.{VERSION_PATCH}. Downgrade is not supported")] | ||||
|     DowngradeNotSupported { major: u32, minor: u32, patch: u32 }, | ||||
|     #[error("Database version {major}.{minor}.{patch} is too old for the experimental dumpless upgrade feature. Please generate a dump using the v{major}.{minor}.{patch} and import it in the v{VERSION_MAJOR}.{VERSION_MINOR}.{VERSION_PATCH}")] | ||||
|     TooOldForAutomaticUpgrade { major: u32, minor: u32, patch: u32 }, | ||||
|  | ||||
|     #[error(transparent)] | ||||
|     IoError(#[from] std::io::Error), | ||||
|   | ||||
| @@ -32,7 +32,6 @@ use analytics::Analytics; | ||||
| use anyhow::bail; | ||||
| use error::PayloadError; | ||||
| use extractors::payload::PayloadConfig; | ||||
| use index_scheduler::upgrade::upgrade_task_queue; | ||||
| use index_scheduler::{IndexScheduler, IndexSchedulerOptions}; | ||||
| use meilisearch_auth::AuthController; | ||||
| use meilisearch_types::milli::constants::VERSION_MAJOR; | ||||
| @@ -41,7 +40,8 @@ use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMetho | ||||
| use meilisearch_types::settings::apply_settings_to_builder; | ||||
| use meilisearch_types::tasks::KindWithContent; | ||||
| use meilisearch_types::versioning::{ | ||||
|     create_current_version_file, get_version, VersionFileError, VERSION_MINOR, VERSION_PATCH, | ||||
|     create_current_version_file, get_version, update_version_file_for_dumpless_upgrade, | ||||
|     VersionFileError, VERSION_MINOR, VERSION_PATCH, | ||||
| }; | ||||
| use meilisearch_types::{compression, milli, VERSION_FILE_NAME}; | ||||
| pub use option::Opt; | ||||
| @@ -234,6 +234,10 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc< | ||||
|         instance_features: opt.to_instance_features(), | ||||
|         auto_upgrade: opt.experimental_dumpless_upgrade, | ||||
|     }; | ||||
|     let bin_major: u32 = VERSION_MAJOR.parse().unwrap(); | ||||
|     let bin_minor: u32 = VERSION_MINOR.parse().unwrap(); | ||||
|     let bin_patch: u32 = VERSION_PATCH.parse().unwrap(); | ||||
|     let binary_version = (bin_major, bin_minor, bin_patch); | ||||
|  | ||||
|     let empty_db = is_empty_db(&opt.db_path); | ||||
|     let (index_scheduler, auth_controller) = if let Some(ref snapshot_path) = opt.import_snapshot { | ||||
| @@ -245,6 +249,7 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc< | ||||
|                     opt, | ||||
|                     index_scheduler_opt, | ||||
|                     OnFailure::RemoveDb, | ||||
|                     binary_version, // the db is empty | ||||
|                 )?, | ||||
|                 Err(e) => { | ||||
|                     std::fs::remove_dir_all(&opt.db_path)?; | ||||
| @@ -262,14 +267,18 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc< | ||||
|             bail!("snapshot doesn't exist at {}", snapshot_path.display()) | ||||
|         // the snapshot and the db exist, and we can ignore the snapshot because of the ignore_snapshot_if_db_exists flag | ||||
|         } else { | ||||
|             open_or_create_database(opt, index_scheduler_opt, empty_db)? | ||||
|             open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version)? | ||||
|         } | ||||
|     } else if let Some(ref path) = opt.import_dump { | ||||
|         let src_path_exists = path.exists(); | ||||
|         // the db is empty and the dump exists, import it | ||||
|         if empty_db && src_path_exists { | ||||
|             let (mut index_scheduler, mut auth_controller) = | ||||
|                 open_or_create_database_unchecked(opt, index_scheduler_opt, OnFailure::RemoveDb)?; | ||||
|             let (mut index_scheduler, mut auth_controller) = open_or_create_database_unchecked( | ||||
|                 opt, | ||||
|                 index_scheduler_opt, | ||||
|                 OnFailure::RemoveDb, | ||||
|                 binary_version, // the db is empty | ||||
|             )?; | ||||
|             match import_dump(&opt.db_path, path, &mut index_scheduler, &mut auth_controller) { | ||||
|                 Ok(()) => (index_scheduler, auth_controller), | ||||
|                 Err(e) => { | ||||
| @@ -289,10 +298,10 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc< | ||||
|         // the dump and the db exist and we can ignore the dump because of the ignore_dump_if_db_exists flag | ||||
|         // or, the dump is missing but we can ignore that because of the ignore_missing_dump flag | ||||
|         } else { | ||||
|             open_or_create_database(opt, index_scheduler_opt, empty_db)? | ||||
|             open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version)? | ||||
|         } | ||||
|     } else { | ||||
|         open_or_create_database(opt, index_scheduler_opt, empty_db)? | ||||
|         open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version)? | ||||
|     }; | ||||
|  | ||||
|     // We create a loop in a thread that registers snapshotCreation tasks | ||||
| @@ -322,12 +331,13 @@ fn open_or_create_database_unchecked( | ||||
|     opt: &Opt, | ||||
|     index_scheduler_opt: IndexSchedulerOptions, | ||||
|     on_failure: OnFailure, | ||||
|     version: (u32, u32, u32), | ||||
| ) -> anyhow::Result<(IndexScheduler, AuthController)> { | ||||
|     // we don't want to create anything in the data.ms yet, thus we | ||||
|     // wrap our two builders in a closure that'll be executed later. | ||||
|     let auth_controller = AuthController::new(&opt.db_path, &opt.master_key); | ||||
|     let index_scheduler_builder = | ||||
|         || -> anyhow::Result<_> { Ok(IndexScheduler::new(index_scheduler_opt)?) }; | ||||
|         || -> anyhow::Result<_> { Ok(IndexScheduler::new(index_scheduler_opt, version)?) }; | ||||
|  | ||||
|     match ( | ||||
|         index_scheduler_builder(), | ||||
| @@ -345,25 +355,29 @@ fn open_or_create_database_unchecked( | ||||
| } | ||||
|  | ||||
| /// Ensures Meilisearch version is compatible with the database, returns an error in case of version mismatch. | ||||
| fn check_version_and_update_task_queue( | ||||
|     opt: &Opt, | ||||
|     index_scheduler_opt: &IndexSchedulerOptions, | ||||
| ) -> anyhow::Result<()> { | ||||
|     let (major, minor, patch) = get_version(&opt.db_path)?; | ||||
| /// Returns the version that was contained in the version file | ||||
| fn check_version(opt: &Opt, binary_version: (u32, u32, u32)) -> anyhow::Result<(u32, u32, u32)> { | ||||
|     let (bin_major, bin_minor, bin_patch) = binary_version; | ||||
|     let (db_major, db_minor, db_patch) = get_version(&opt.db_path)?; | ||||
|  | ||||
|     let version_major: u32 = VERSION_MAJOR.parse().unwrap(); | ||||
|     let version_minor: u32 = VERSION_MINOR.parse().unwrap(); | ||||
|     let version_patch: u32 = VERSION_PATCH.parse().unwrap(); | ||||
|  | ||||
|     if major != version_major || minor != version_minor || patch > version_patch { | ||||
|     if db_major != bin_major || db_minor != bin_minor || db_patch > bin_patch { | ||||
|         if opt.experimental_dumpless_upgrade { | ||||
|             return upgrade_task_queue(index_scheduler_opt, (major, minor, patch)); | ||||
|             update_version_file_for_dumpless_upgrade( | ||||
|                 &opt.db_path, | ||||
|                 (db_major, db_minor, db_patch), | ||||
|                 (bin_major, bin_minor, bin_patch), | ||||
|             )?; | ||||
|         } else { | ||||
|             return Err(VersionFileError::VersionMismatch { major, minor, patch }.into()); | ||||
|             return Err(VersionFileError::VersionMismatch { | ||||
|                 major: db_major, | ||||
|                 minor: db_minor, | ||||
|                 patch: db_patch, | ||||
|             } | ||||
|             .into()); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     Ok(()) | ||||
|     Ok((db_major, db_minor, db_patch)) | ||||
| } | ||||
|  | ||||
| /// Ensure you're in a valid state and open the IndexScheduler + AuthController for you. | ||||
| @@ -371,12 +385,11 @@ fn open_or_create_database( | ||||
|     opt: &Opt, | ||||
|     index_scheduler_opt: IndexSchedulerOptions, | ||||
|     empty_db: bool, | ||||
|     binary_version: (u32, u32, u32), | ||||
| ) -> anyhow::Result<(IndexScheduler, AuthController)> { | ||||
|     if !empty_db { | ||||
|         check_version_and_update_task_queue(opt, &index_scheduler_opt)?; | ||||
|     } | ||||
|     let version = if !empty_db { check_version(opt, binary_version)? } else { binary_version }; | ||||
|  | ||||
|     open_or_create_database_unchecked(opt, index_scheduler_opt, OnFailure::KeepDb) | ||||
|     open_or_create_database_unchecked(opt, index_scheduler_opt, OnFailure::KeepDb, version) | ||||
| } | ||||
|  | ||||
| fn import_dump( | ||||
|   | ||||
		Reference in New Issue
	
	Block a user