mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-26 13:36:27 +00:00 
			
		
		
		
	expose the number of database in the index-scheduler and rewrite the lib.rs to use the value provided in the options instead of a magic number
This commit is contained in:
		| @@ -7,7 +7,12 @@ use meilisearch_types::heed::{Database, Env, RwTxn}; | ||||
| use crate::error::FeatureNotEnabledError; | ||||
| use crate::Result; | ||||
|  | ||||
| const EXPERIMENTAL_FEATURES: &str = "experimental-features"; | ||||
| /// The number of database used by features | ||||
| const NUMBER_OF_DATABASES: u32 = 1; | ||||
| /// Database const names for the `FeatureData`. | ||||
| mod db_name { | ||||
|     pub const EXPERIMENTAL_FEATURES: &str = "experimental-features"; | ||||
| } | ||||
|  | ||||
| #[derive(Clone)] | ||||
| pub(crate) struct FeatureData { | ||||
| @@ -84,14 +89,19 @@ impl RoFeatures { | ||||
| } | ||||
|  | ||||
| impl FeatureData { | ||||
|     pub(crate) const fn nb_db() -> u32 { | ||||
|         NUMBER_OF_DATABASES | ||||
|     } | ||||
|  | ||||
|     pub fn new(env: &Env, instance_features: InstanceTogglableFeatures) -> Result<Self> { | ||||
|         let mut wtxn = env.write_txn()?; | ||||
|         let runtime_features_db = env.create_database(&mut wtxn, Some(EXPERIMENTAL_FEATURES))?; | ||||
|         let runtime_features_db = | ||||
|             env.create_database(&mut wtxn, Some(db_name::EXPERIMENTAL_FEATURES))?; | ||||
|         wtxn.commit()?; | ||||
|  | ||||
|         let txn = env.read_txn()?; | ||||
|         let persisted_features: RuntimeTogglableFeatures = | ||||
|             runtime_features_db.get(&txn, EXPERIMENTAL_FEATURES)?.unwrap_or_default(); | ||||
|             runtime_features_db.get(&txn, db_name::EXPERIMENTAL_FEATURES)?.unwrap_or_default(); | ||||
|         let InstanceTogglableFeatures { metrics, logs_route, contains_filter } = instance_features; | ||||
|         let runtime = Arc::new(RwLock::new(RuntimeTogglableFeatures { | ||||
|             metrics: metrics || persisted_features.metrics, | ||||
| @@ -108,7 +118,7 @@ impl FeatureData { | ||||
|         mut wtxn: RwTxn, | ||||
|         features: RuntimeTogglableFeatures, | ||||
|     ) -> Result<()> { | ||||
|         self.persisted.put(&mut wtxn, EXPERIMENTAL_FEATURES, &features)?; | ||||
|         self.persisted.put(&mut wtxn, db_name::EXPERIMENTAL_FEATURES, &features)?; | ||||
|         wtxn.commit()?; | ||||
|  | ||||
|         // safe to unwrap, the lock will only fail if: | ||||
|   | ||||
| @@ -20,8 +20,13 @@ use crate::{Error, IndexBudget, IndexSchedulerOptions, Result}; | ||||
|  | ||||
| mod index_map; | ||||
|  | ||||
| const INDEX_MAPPING: &str = "index-mapping"; | ||||
| const INDEX_STATS: &str = "index-stats"; | ||||
| /// The number of database used by index mapper | ||||
| const NUMBER_OF_DATABASES: u32 = 2; | ||||
| /// Database const names for the `IndexMapper`. | ||||
| mod db_name { | ||||
|     pub const INDEX_MAPPING: &str = "index-mapping"; | ||||
|     pub const INDEX_STATS: &str = "index-stats"; | ||||
| } | ||||
|  | ||||
| /// Structure managing meilisearch's indexes. | ||||
| /// | ||||
| @@ -138,6 +143,10 @@ impl IndexStats { | ||||
| } | ||||
|  | ||||
| impl IndexMapper { | ||||
|     pub(crate) const fn nb_db() -> u32 { | ||||
|         NUMBER_OF_DATABASES | ||||
|     } | ||||
|  | ||||
|     pub fn new( | ||||
|         env: &Env, | ||||
|         wtxn: &mut RwTxn, | ||||
| @@ -146,8 +155,8 @@ impl IndexMapper { | ||||
|     ) -> Result<Self> { | ||||
|         Ok(Self { | ||||
|             index_map: Arc::new(RwLock::new(IndexMap::new(budget.index_count))), | ||||
|             index_mapping: env.create_database(wtxn, Some(INDEX_MAPPING))?, | ||||
|             index_stats: env.create_database(wtxn, Some(INDEX_STATS))?, | ||||
|             index_mapping: env.create_database(wtxn, Some(db_name::INDEX_MAPPING))?, | ||||
|             index_stats: env.create_database(wtxn, Some(db_name::INDEX_STATS))?, | ||||
|             base_path: options.indexes_path.clone(), | ||||
|             index_base_map_size: budget.map_size, | ||||
|             index_growth_amount: options.index_growth_amount, | ||||
|   | ||||
| @@ -197,6 +197,10 @@ impl IndexScheduler { | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub(crate) const fn nb_db() -> u32 { | ||||
|         Queue::nb_db() + IndexMapper::nb_db() + features::FeatureData::nb_db() | ||||
|     } | ||||
|  | ||||
|     /// Create an index scheduler and start its run loop. | ||||
|     #[allow(private_interfaces)] // because test_utils is private | ||||
|     pub fn new( | ||||
| @@ -232,7 +236,7 @@ impl IndexScheduler { | ||||
|  | ||||
|         let env = unsafe { | ||||
|             heed::EnvOpenOptions::new() | ||||
|                 .max_dbs(19) | ||||
|                 .max_dbs(Self::nb_db()) | ||||
|                 .map_size(budget.task_db_size) | ||||
|                 .open(&options.tasks_path) | ||||
|         }?; | ||||
|   | ||||
| @@ -17,6 +17,8 @@ use crate::utils::{ | ||||
| }; | ||||
| use crate::{Error, Result, BEI128}; | ||||
|  | ||||
| /// The number of database used by the batch queue | ||||
| const NUMBER_OF_DATABASES: u32 = 7; | ||||
| /// Database const names for the `IndexScheduler`. | ||||
| mod db_name { | ||||
|     pub const ALL_BATCHES: &str = "all-batches"; | ||||
| @@ -60,6 +62,10 @@ impl BatchQueue { | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub(crate) const fn nb_db() -> u32 { | ||||
|         NUMBER_OF_DATABASES | ||||
|     } | ||||
|  | ||||
|     pub(super) fn new(env: &Env, wtxn: &mut RwTxn) -> Result<Self> { | ||||
|         Ok(Self { | ||||
|             all_batches: env.create_database(wtxn, Some(db_name::ALL_BATCHES))?, | ||||
|   | ||||
| @@ -28,6 +28,8 @@ use crate::utils::{ | ||||
| }; | ||||
| use crate::{Error, IndexSchedulerOptions, Result, TaskId}; | ||||
|  | ||||
| /// The number of database used by queue itself | ||||
| const NUMBER_OF_DATABASES: u32 = 1; | ||||
| /// Database const names for the `IndexScheduler`. | ||||
| mod db_name { | ||||
|     pub const BATCH_TO_TASKS_MAPPING: &str = "batch-to-tasks-mapping"; | ||||
| @@ -148,6 +150,10 @@ impl Queue { | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub(crate) const fn nb_db() -> u32 { | ||||
|         tasks::TaskQueue::nb_db() + batches::BatchQueue::nb_db() + NUMBER_OF_DATABASES | ||||
|     } | ||||
|  | ||||
|     /// Create an index scheduler and start its run loop. | ||||
|     pub(crate) fn new( | ||||
|         env: &Env, | ||||
|   | ||||
| @@ -14,9 +14,12 @@ use crate::utils::{ | ||||
| }; | ||||
| use crate::{Error, Result, TaskId, BEI128}; | ||||
|  | ||||
| /// The number of database used by the task queue | ||||
| const NUMBER_OF_DATABASES: u32 = 8; | ||||
| /// Database const names for the `IndexScheduler`. | ||||
| mod db_name { | ||||
|     pub const ALL_TASKS: &str = "all-tasks"; | ||||
|  | ||||
|     pub const STATUS: &str = "status"; | ||||
|     pub const KIND: &str = "kind"; | ||||
|     pub const INDEX_TASKS: &str = "index-tasks"; | ||||
| @@ -61,6 +64,10 @@ impl TaskQueue { | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub(crate) const fn nb_db() -> u32 { | ||||
|         NUMBER_OF_DATABASES | ||||
|     } | ||||
|  | ||||
|     pub(crate) fn new(env: &Env, wtxn: &mut RwTxn) -> Result<Self> { | ||||
|         Ok(Self { | ||||
|             all_tasks: env.create_database(wtxn, Some(db_name::ALL_TASKS))?, | ||||
|   | ||||
| @@ -8,8 +8,12 @@ use time::OffsetDateTime; | ||||
| use tracing::info; | ||||
|  | ||||
| use crate::queue::TaskQueue; | ||||
| use crate::IndexSchedulerOptions; | ||||
|  | ||||
| pub fn upgrade_task_queue(tasks_path: &Path, from: (u32, u32, u32)) -> anyhow::Result<()> { | ||||
| pub fn upgrade_task_queue( | ||||
|     opt: &IndexSchedulerOptions, | ||||
|     from: (u32, u32, u32), | ||||
| ) -> anyhow::Result<()> { | ||||
|     let current_major: u32 = VERSION_MAJOR.parse().unwrap(); | ||||
|     let current_minor: u32 = VERSION_MINOR.parse().unwrap(); | ||||
|     let current_patch: u32 = VERSION_PATCH.parse().unwrap(); | ||||
| @@ -40,15 +44,14 @@ pub fn upgrade_task_queue(tasks_path: &Path, from: (u32, u32, u32)) -> anyhow::R | ||||
|     info!("Upgrading the task queue"); | ||||
|     for (upgrade, upgrade_name) in upgrade_functions[start..].iter() { | ||||
|         info!("{upgrade_name}"); | ||||
|         (upgrade)(tasks_path)?; | ||||
|         (upgrade)(&opt.tasks_path)?; | ||||
|     } | ||||
|  | ||||
|     let env = unsafe { | ||||
|         heed::EnvOpenOptions::new() | ||||
|             .max_dbs(19) | ||||
|             // Since that's the only database memory-mapped currently we don't need to check the budget yet | ||||
|             .map_size(100 * 1024 * 1024) | ||||
|             .open(tasks_path) | ||||
|             .max_dbs(TaskQueue::nb_db()) | ||||
|             .map_size(opt.task_db_size) | ||||
|             .open(&opt.tasks_path) | ||||
|     }?; | ||||
|     let mut wtxn = env.write_txn()?; | ||||
|     let queue = TaskQueue::new(&env, &mut wtxn)?; | ||||
|   | ||||
| @@ -210,13 +210,42 @@ enum OnFailure { | ||||
| } | ||||
|  | ||||
| pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<AuthController>)> { | ||||
|     let index_scheduler_opt = IndexSchedulerOptions { | ||||
|         version_file_path: opt.db_path.join(VERSION_FILE_NAME), | ||||
|         auth_path: opt.db_path.join("auth"), | ||||
|         tasks_path: opt.db_path.join("tasks"), | ||||
|         update_file_path: opt.db_path.join("update_files"), | ||||
|         indexes_path: opt.db_path.join("indexes"), | ||||
|         snapshots_path: opt.snapshot_dir.clone(), | ||||
|         dumps_path: opt.dump_dir.clone(), | ||||
|         webhook_url: opt.task_webhook_url.as_ref().map(|url| url.to_string()), | ||||
|         webhook_authorization_header: opt.task_webhook_authorization_header.clone(), | ||||
|         task_db_size: opt.max_task_db_size.as_u64() as usize, | ||||
|         index_base_map_size: opt.max_index_size.as_u64() as usize, | ||||
|         enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage, | ||||
|         indexer_config: Arc::new((&opt.indexer_options).try_into()?), | ||||
|         autobatching_enabled: true, | ||||
|         cleanup_enabled: !opt.experimental_replication_parameters, | ||||
|         max_number_of_tasks: 1_000_000, | ||||
|         max_number_of_batched_tasks: opt.experimental_max_number_of_batched_tasks, | ||||
|         batched_tasks_size_limit: opt.experimental_limit_batched_tasks_total_size, | ||||
|         index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().as_u64() as usize, | ||||
|         index_count: DEFAULT_INDEX_COUNT, | ||||
|         instance_features: opt.to_instance_features(), | ||||
|         auto_upgrade: opt.experimental_dumpless_upgrade, | ||||
|     }; | ||||
|  | ||||
|     let empty_db = is_empty_db(&opt.db_path); | ||||
|     let (index_scheduler, auth_controller) = if let Some(ref snapshot_path) = opt.import_snapshot { | ||||
|         let snapshot_path_exists = snapshot_path.exists(); | ||||
|         // the db is empty and the snapshot exists, import it | ||||
|         if empty_db && snapshot_path_exists { | ||||
|             match compression::from_tar_gz(snapshot_path, &opt.db_path) { | ||||
|                 Ok(()) => open_or_create_database_unchecked(opt, OnFailure::RemoveDb)?, | ||||
|                 Ok(()) => open_or_create_database_unchecked( | ||||
|                     opt, | ||||
|                     index_scheduler_opt, | ||||
|                     OnFailure::RemoveDb, | ||||
|                 )?, | ||||
|                 Err(e) => { | ||||
|                     std::fs::remove_dir_all(&opt.db_path)?; | ||||
|                     return Err(e); | ||||
| @@ -233,14 +262,14 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc< | ||||
|             bail!("snapshot doesn't exist at {}", snapshot_path.display()) | ||||
|         // the snapshot and the db exist, and we can ignore the snapshot because of the ignore_snapshot_if_db_exists flag | ||||
|         } else { | ||||
|             open_or_create_database(opt, empty_db)? | ||||
|             open_or_create_database(opt, index_scheduler_opt, empty_db)? | ||||
|         } | ||||
|     } else if let Some(ref path) = opt.import_dump { | ||||
|         let src_path_exists = path.exists(); | ||||
|         // the db is empty and the dump exists, import it | ||||
|         if empty_db && src_path_exists { | ||||
|             let (mut index_scheduler, mut auth_controller) = | ||||
|                 open_or_create_database_unchecked(opt, OnFailure::RemoveDb)?; | ||||
|                 open_or_create_database_unchecked(opt, index_scheduler_opt, OnFailure::RemoveDb)?; | ||||
|             match import_dump(&opt.db_path, path, &mut index_scheduler, &mut auth_controller) { | ||||
|                 Ok(()) => (index_scheduler, auth_controller), | ||||
|                 Err(e) => { | ||||
| @@ -260,10 +289,10 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc< | ||||
|         // the dump and the db exist and we can ignore the dump because of the ignore_dump_if_db_exists flag | ||||
|         // or, the dump is missing but we can ignore that because of the ignore_missing_dump flag | ||||
|         } else { | ||||
|             open_or_create_database(opt, empty_db)? | ||||
|             open_or_create_database(opt, index_scheduler_opt, empty_db)? | ||||
|         } | ||||
|     } else { | ||||
|         open_or_create_database(opt, empty_db)? | ||||
|         open_or_create_database(opt, index_scheduler_opt, empty_db)? | ||||
|     }; | ||||
|  | ||||
|     // We create a loop in a thread that registers snapshotCreation tasks | ||||
| @@ -291,38 +320,14 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc< | ||||
| /// Try to start the IndexScheduler and AuthController without checking the VERSION file or anything. | ||||
| fn open_or_create_database_unchecked( | ||||
|     opt: &Opt, | ||||
|     index_scheduler_opt: IndexSchedulerOptions, | ||||
|     on_failure: OnFailure, | ||||
| ) -> anyhow::Result<(IndexScheduler, AuthController)> { | ||||
|     // we don't want to create anything in the data.ms yet, thus we | ||||
|     // wrap our two builders in a closure that'll be executed later. | ||||
|     let auth_controller = AuthController::new(&opt.db_path, &opt.master_key); | ||||
|     let instance_features = opt.to_instance_features(); | ||||
|     let index_scheduler_builder = || -> anyhow::Result<_> { | ||||
|         Ok(IndexScheduler::new(IndexSchedulerOptions { | ||||
|             version_file_path: opt.db_path.join(VERSION_FILE_NAME), | ||||
|             auth_path: opt.db_path.join("auth"), | ||||
|             tasks_path: opt.db_path.join("tasks"), | ||||
|             update_file_path: opt.db_path.join("update_files"), | ||||
|             indexes_path: opt.db_path.join("indexes"), | ||||
|             snapshots_path: opt.snapshot_dir.clone(), | ||||
|             dumps_path: opt.dump_dir.clone(), | ||||
|             webhook_url: opt.task_webhook_url.as_ref().map(|url| url.to_string()), | ||||
|             webhook_authorization_header: opt.task_webhook_authorization_header.clone(), | ||||
|             task_db_size: opt.max_task_db_size.as_u64() as usize, | ||||
|             index_base_map_size: opt.max_index_size.as_u64() as usize, | ||||
|             enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage, | ||||
|             indexer_config: Arc::new((&opt.indexer_options).try_into()?), | ||||
|             autobatching_enabled: true, | ||||
|             cleanup_enabled: !opt.experimental_replication_parameters, | ||||
|             max_number_of_tasks: 1_000_000, | ||||
|             max_number_of_batched_tasks: opt.experimental_max_number_of_batched_tasks, | ||||
|             batched_tasks_size_limit: opt.experimental_limit_batched_tasks_total_size, | ||||
|             index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().as_u64() as usize, | ||||
|             index_count: DEFAULT_INDEX_COUNT, | ||||
|             instance_features, | ||||
|             auto_upgrade: opt.experimental_dumpless_upgrade, | ||||
|         })?) | ||||
|     }; | ||||
|     let index_scheduler_builder = | ||||
|         || -> anyhow::Result<_> { Ok(IndexScheduler::new(index_scheduler_opt)?) }; | ||||
|  | ||||
|     match ( | ||||
|         index_scheduler_builder(), | ||||
| @@ -341,18 +346,18 @@ fn open_or_create_database_unchecked( | ||||
|  | ||||
| /// Ensures Meilisearch version is compatible with the database, returns an error in case of version mismatch. | ||||
| fn check_version_and_update_task_queue( | ||||
|     db_path: &Path, | ||||
|     experimental_dumpless_upgrade: bool, | ||||
|     opt: &Opt, | ||||
|     index_scheduler_opt: &IndexSchedulerOptions, | ||||
| ) -> anyhow::Result<()> { | ||||
|     let (major, minor, patch) = get_version(db_path)?; | ||||
|     let (major, minor, patch) = get_version(&opt.db_path)?; | ||||
|  | ||||
|     let version_major: u32 = VERSION_MAJOR.parse().unwrap(); | ||||
|     let version_minor: u32 = VERSION_MINOR.parse().unwrap(); | ||||
|     let version_patch: u32 = VERSION_PATCH.parse().unwrap(); | ||||
|  | ||||
|     if major != version_major || minor != version_minor || patch > version_patch { | ||||
|         if experimental_dumpless_upgrade { | ||||
|             return upgrade_task_queue(&db_path.join("tasks"), (major, minor, patch)); | ||||
|         if opt.experimental_dumpless_upgrade { | ||||
|             return upgrade_task_queue(index_scheduler_opt, (major, minor, patch)); | ||||
|         } else { | ||||
|             return Err(VersionFileError::VersionMismatch { major, minor, patch }.into()); | ||||
|         } | ||||
| @@ -364,13 +369,14 @@ fn check_version_and_update_task_queue( | ||||
| /// Ensure you're in a valid state and open the IndexScheduler + AuthController for you. | ||||
| fn open_or_create_database( | ||||
|     opt: &Opt, | ||||
|     index_scheduler_opt: IndexSchedulerOptions, | ||||
|     empty_db: bool, | ||||
| ) -> anyhow::Result<(IndexScheduler, AuthController)> { | ||||
|     if !empty_db { | ||||
|         check_version_and_update_task_queue(&opt.db_path, opt.experimental_dumpless_upgrade)?; | ||||
|         check_version_and_update_task_queue(opt, &index_scheduler_opt)?; | ||||
|     } | ||||
|  | ||||
|     open_or_create_database_unchecked(opt, OnFailure::KeepDb) | ||||
|     open_or_create_database_unchecked(opt, index_scheduler_opt, OnFailure::KeepDb) | ||||
| } | ||||
|  | ||||
| fn import_dump( | ||||
|   | ||||
		Reference in New Issue
	
	Block a user