diff --git a/Cargo.lock b/Cargo.lock index a5f2b5149..c197ae7f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3266,6 +3266,7 @@ dependencies = [ "tempfile", "thiserror 2.0.16", "time", + "tokio", "tracing", "ureq", "uuid", diff --git a/crates/index-scheduler/Cargo.toml b/crates/index-scheduler/Cargo.toml index 20cc49686..a96e669d2 100644 --- a/crates/index-scheduler/Cargo.toml +++ b/crates/index-scheduler/Cargo.toml @@ -45,6 +45,7 @@ tracing = "0.1.41" ureq = "2.12.1" uuid = { version = "1.17.0", features = ["serde", "v4"] } backoff = "0.4.0" +tokio = { version = "1.47.1", features = ["full"] } [dev-dependencies] big_s = "1.0.2" diff --git a/crates/index-scheduler/src/insta_snapshot.rs b/crates/index-scheduler/src/insta_snapshot.rs index df043ad87..e6b7c6f9f 100644 --- a/crates/index-scheduler/src/insta_snapshot.rs +++ b/crates/index-scheduler/src/insta_snapshot.rs @@ -36,6 +36,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String { run_loop_iteration: _, embedders: _, chat_settings: _, + runtime: _, } = scheduler; let rtxn = env.read_txn().unwrap(); diff --git a/crates/index-scheduler/src/lib.rs b/crates/index-scheduler/src/lib.rs index 5e31feab8..8b3e71d25 100644 --- a/crates/index-scheduler/src/lib.rs +++ b/crates/index-scheduler/src/lib.rs @@ -216,6 +216,8 @@ pub struct IndexScheduler { /// A counter that is incremented before every call to [`tick`](IndexScheduler::tick) #[cfg(test)] run_loop_iteration: Arc>, + + runtime: Option, } impl IndexScheduler { @@ -242,6 +244,7 @@ impl IndexScheduler { run_loop_iteration: self.run_loop_iteration.clone(), features: self.features.clone(), chat_settings: self.chat_settings, + runtime: self.runtime.clone(), } } @@ -260,6 +263,7 @@ impl IndexScheduler { options: IndexSchedulerOptions, auth_env: Env, from_db_version: (u32, u32, u32), + runtime: Option, #[cfg(test)] test_breakpoint_sdr: crossbeam_channel::Sender<(test_utils::Breakpoint, bool)>, #[cfg(test)] planned_failures: Vec<(usize, test_utils::FailureLocation)>, ) -> Result { @@ -341,6 +345,7 @@ impl IndexScheduler { run_loop_iteration: Arc::new(RwLock::new(0)), features, chat_settings, + runtime, }; this.run(); diff --git a/crates/index-scheduler/src/test_utils.rs b/crates/index-scheduler/src/test_utils.rs index 36de0ed9e..4fb92998d 100644 --- a/crates/index-scheduler/src/test_utils.rs +++ b/crates/index-scheduler/src/test_utils.rs @@ -126,7 +126,7 @@ impl IndexScheduler { std::fs::create_dir_all(&options.auth_path).unwrap(); let auth_env = open_auth_store_env(&options.auth_path).unwrap(); let index_scheduler = - Self::new(options, auth_env, version, sender, planned_failures).unwrap(); + Self::new(options, auth_env, version, None, sender, planned_failures).unwrap(); // To be 100% consistent between all test we're going to start the scheduler right now // and ensure it's in the expected starting state. diff --git a/crates/meilisearch/src/lib.rs b/crates/meilisearch/src/lib.rs index 37c2cb458..ec507de7f 100644 --- a/crates/meilisearch/src/lib.rs +++ b/crates/meilisearch/src/lib.rs @@ -216,7 +216,10 @@ enum OnFailure { KeepDb, } -pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc, Arc)> { +pub fn setup_meilisearch( + opt: &Opt, + handle: tokio::runtime::Handle, +) -> anyhow::Result<(Arc, Arc)> { let index_scheduler_opt = IndexSchedulerOptions { version_file_path: opt.db_path.join(VERSION_FILE_NAME), auth_path: opt.db_path.join("auth"), @@ -256,6 +259,7 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc, Arc< index_scheduler_opt, OnFailure::RemoveDb, binary_version, // the db is empty + handle, )?, Err(e) => { std::fs::remove_dir_all(&opt.db_path)?; @@ -273,7 +277,7 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc, Arc< bail!("snapshot doesn't exist at {}", snapshot_path.display()) // the snapshot and the db exist, and we can ignore the snapshot because of the ignore_snapshot_if_db_exists flag } else { - open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version)? + open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version, handle)? } } else if let Some(ref path) = opt.import_dump { let src_path_exists = path.exists(); @@ -284,6 +288,7 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc, Arc< index_scheduler_opt, OnFailure::RemoveDb, binary_version, // the db is empty + handle, )?; match import_dump(&opt.db_path, path, &mut index_scheduler, &mut auth_controller) { Ok(()) => (index_scheduler, auth_controller), @@ -304,10 +309,10 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc, Arc< // the dump and the db exist and we can ignore the dump because of the ignore_dump_if_db_exists flag // or, the dump is missing but we can ignore that because of the ignore_missing_dump flag } else { - open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version)? + open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version, handle)? } } else { - open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version)? + open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version, handle)? }; // We create a loop in a thread that registers snapshotCreation tasks @@ -338,6 +343,7 @@ fn open_or_create_database_unchecked( index_scheduler_opt: IndexSchedulerOptions, on_failure: OnFailure, version: (u32, u32, u32), + handle: tokio::runtime::Handle, ) -> anyhow::Result<(IndexScheduler, AuthController)> { // we don't want to create anything in the data.ms yet, thus we // wrap our two builders in a closure that'll be executed later. @@ -345,7 +351,7 @@ fn open_or_create_database_unchecked( let auth_env = open_auth_store_env(&index_scheduler_opt.auth_path).unwrap(); let auth_controller = AuthController::new(auth_env.clone(), &opt.master_key); let index_scheduler_builder = || -> anyhow::Result<_> { - Ok(IndexScheduler::new(index_scheduler_opt, auth_env, version)?) + Ok(IndexScheduler::new(index_scheduler_opt, auth_env, version, Some(handle))?) }; match ( @@ -452,6 +458,7 @@ fn open_or_create_database( index_scheduler_opt: IndexSchedulerOptions, empty_db: bool, binary_version: (u32, u32, u32), + handle: tokio::runtime::Handle, ) -> anyhow::Result<(IndexScheduler, AuthController)> { let version = if !empty_db { check_version(opt, &index_scheduler_opt, binary_version)? @@ -459,7 +466,7 @@ fn open_or_create_database( binary_version }; - open_or_create_database_unchecked(opt, index_scheduler_opt, OnFailure::KeepDb, version) + open_or_create_database_unchecked(opt, index_scheduler_opt, OnFailure::KeepDb, version, handle) } fn import_dump( diff --git a/crates/meilisearch/src/main.rs b/crates/meilisearch/src/main.rs index b16dda097..be0beb97f 100644 --- a/crates/meilisearch/src/main.rs +++ b/crates/meilisearch/src/main.rs @@ -76,7 +76,10 @@ fn on_panic(info: &std::panic::PanicHookInfo) { #[actix_web::main] async fn main() -> anyhow::Result<()> { - try_main().await.inspect_err(|error| { + // won't panic inside of tokio::main + let runtime = tokio::runtime::Handle::current(); + + try_main(runtime).await.inspect_err(|error| { tracing::error!(%error); let mut current = error.source(); let mut depth = 0; @@ -88,7 +91,7 @@ async fn main() -> anyhow::Result<()> { }) } -async fn try_main() -> anyhow::Result<()> { +async fn try_main(runtime: tokio::runtime::Handle) -> anyhow::Result<()> { let (opt, config_read_from) = Opt::try_build()?; std::panic::set_hook(Box::new(on_panic)); @@ -122,7 +125,7 @@ async fn try_main() -> anyhow::Result<()> { _ => (), } - let (index_scheduler, auth_controller) = setup_meilisearch(&opt)?; + let (index_scheduler, auth_controller) = setup_meilisearch(&opt, runtime)?; let analytics = analytics::Analytics::new(&opt, index_scheduler.clone(), auth_controller.clone()).await; diff --git a/crates/meilisearch/tests/common/server.rs b/crates/meilisearch/tests/common/server.rs index b87dbe0ad..49c3f25cc 100644 --- a/crates/meilisearch/tests/common/server.rs +++ b/crates/meilisearch/tests/common/server.rs @@ -49,8 +49,8 @@ impl Server { } let options = default_settings(dir.path()); - - let (index_scheduler, auth) = setup_meilisearch(&options).unwrap(); + let handle = tokio::runtime::Handle::current(); + let (index_scheduler, auth) = setup_meilisearch(&options, handle).unwrap(); let service = Service { index_scheduler, auth, options, api_key: None }; Server { service, _dir: Some(dir), _marker: PhantomData } @@ -65,7 +65,9 @@ impl Server { options.master_key = Some("MASTER_KEY".to_string()); - let (index_scheduler, auth) = setup_meilisearch(&options).unwrap(); + let handle = tokio::runtime::Handle::current(); + + let (index_scheduler, auth) = setup_meilisearch(&options, handle).unwrap(); let service = Service { index_scheduler, auth, options, api_key: None }; Server { service, _dir: Some(dir), _marker: PhantomData } @@ -78,7 +80,9 @@ impl Server { } pub async fn new_with_options(options: Opt) -> Result { - let (index_scheduler, auth) = setup_meilisearch(&options)?; + let handle = tokio::runtime::Handle::current(); + + let (index_scheduler, auth) = setup_meilisearch(&options, handle)?; let service = Service { index_scheduler, auth, options, api_key: None }; Ok(Server { service, _dir: None, _marker: PhantomData }) @@ -217,8 +221,9 @@ impl Server { } let options = default_settings(dir.path()); + let handle = tokio::runtime::Handle::current(); - let (index_scheduler, auth) = setup_meilisearch(&options).unwrap(); + let (index_scheduler, auth) = setup_meilisearch(&options, handle).unwrap(); let service = Service { index_scheduler, auth, api_key: None, options }; Server { service, _dir: Some(dir), _marker: PhantomData }