mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-25 13:06:27 +00:00 
			
		
		
		
	bug(lib): drop env on last use
fixes the `too many open files` error when running tests by closing the environment on last drop
This commit is contained in:
		| @@ -1,4 +1,5 @@ | ||||
| use std::path::Path; | ||||
| use std::sync::Arc; | ||||
|  | ||||
| use heed::EnvOpenOptions; | ||||
| use log::info; | ||||
| @@ -26,7 +27,7 @@ pub fn load_dump( | ||||
|     let mut options = EnvOpenOptions::new(); | ||||
|     options.map_size(meta_env_size); | ||||
|     options.max_dbs(100); | ||||
|     let env = options.open(&dst)?; | ||||
|     let env = Arc::new(options.open(&dst)?); | ||||
|  | ||||
|     IndexResolver::load_dump( | ||||
|         src.as_ref(), | ||||
|   | ||||
| @@ -192,7 +192,7 @@ impl IndexControllerBuilder { | ||||
|         options.map_size(task_store_size); | ||||
|         options.max_dbs(20); | ||||
|  | ||||
|         let meta_env = options.open(&db_path)?; | ||||
|         let meta_env = Arc::new(options.open(&db_path)?); | ||||
|  | ||||
|         let update_file_store = UpdateFileStore::new(&db_path)?; | ||||
|         // Create or overwrite the version file for this DB | ||||
|   | ||||
| @@ -2,6 +2,7 @@ use std::collections::HashSet; | ||||
| use std::fs::{create_dir_all, File}; | ||||
| use std::io::{BufRead, BufReader, Write}; | ||||
| use std::path::{Path, PathBuf}; | ||||
| use std::sync::Arc; | ||||
|  | ||||
| use heed::types::{SerdeBincode, Str}; | ||||
| use heed::{CompactionOption, Database, Env}; | ||||
| @@ -42,12 +43,20 @@ pub struct IndexMeta { | ||||
|  | ||||
| #[derive(Clone)] | ||||
| pub struct HeedMetaStore { | ||||
|     env: Env, | ||||
|     env: Arc<Env>, | ||||
|     db: Database<Str, SerdeBincode<IndexMeta>>, | ||||
| } | ||||
|  | ||||
| impl Drop for HeedMetaStore { | ||||
|     fn drop(&mut self) { | ||||
|         if Arc::strong_count(&self.env) == 1 { | ||||
|             self.env.as_ref().clone().prepare_for_closing(); | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl HeedMetaStore { | ||||
|     pub fn new(env: heed::Env) -> Result<Self> { | ||||
|     pub fn new(env: Arc<heed::Env>) -> Result<Self> { | ||||
|         let db = env.create_database(Some("uuids"))?; | ||||
|         Ok(Self { env, db }) | ||||
|     } | ||||
| @@ -144,7 +153,7 @@ impl HeedMetaStore { | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     pub fn load_dump(src: impl AsRef<Path>, env: heed::Env) -> Result<()> { | ||||
|     pub fn load_dump(src: impl AsRef<Path>, env: Arc<heed::Env>) -> Result<()> { | ||||
|         let src_indexes = src.as_ref().join(UUIDS_DB_PATH).join("data.jsonl"); | ||||
|         let indexes = File::open(&src_indexes)?; | ||||
|         let mut indexes = BufReader::new(indexes); | ||||
|   | ||||
| @@ -4,6 +4,7 @@ pub mod meta_store; | ||||
|  | ||||
| use std::convert::TryInto; | ||||
| use std::path::Path; | ||||
| use std::sync::Arc; | ||||
|  | ||||
| use chrono::Utc; | ||||
| use error::{IndexResolverError, Result}; | ||||
| @@ -16,13 +17,11 @@ use serde::{Deserialize, Serialize}; | ||||
| use tokio::task::spawn_blocking; | ||||
| use uuid::Uuid; | ||||
|  | ||||
| use crate::index::update_handler::UpdateHandler; | ||||
| use crate::index::{error::Result as IndexResult, Index}; | ||||
| use crate::index::{error::Result as IndexResult, update_handler::UpdateHandler, Index}; | ||||
| use crate::options::IndexerOpts; | ||||
| use crate::tasks::batch::Batch; | ||||
| use crate::tasks::task::{DocumentDeletion, Job, Task, TaskContent, TaskEvent, TaskId, TaskResult}; | ||||
| use crate::tasks::Pending; | ||||
| use crate::tasks::TaskPerformer; | ||||
| use crate::tasks::{Pending, TaskPerformer}; | ||||
| use crate::update_file_store::UpdateFileStore; | ||||
|  | ||||
| use self::meta_store::IndexMeta; | ||||
| @@ -39,7 +38,7 @@ pub fn create_index_resolver( | ||||
|     path: impl AsRef<Path>, | ||||
|     index_size: usize, | ||||
|     indexer_opts: &IndexerOpts, | ||||
|     meta_env: heed::Env, | ||||
|     meta_env: Arc<heed::Env>, | ||||
|     file_store: UpdateFileStore, | ||||
| ) -> anyhow::Result<HardStateIndexResolver> { | ||||
|     let uuid_store = HeedMetaStore::new(meta_env)?; | ||||
| @@ -153,7 +152,7 @@ impl IndexResolver<HeedMetaStore, MapIndexStore> { | ||||
|         src: impl AsRef<Path>, | ||||
|         dst: impl AsRef<Path>, | ||||
|         index_db_size: usize, | ||||
|         env: Env, | ||||
|         env: Arc<Env>, | ||||
|         indexer_opts: &IndexerOpts, | ||||
|     ) -> anyhow::Result<()> { | ||||
|         HeedMetaStore::load_dump(&src, env)?; | ||||
|   | ||||
| @@ -32,7 +32,7 @@ pub trait TaskPerformer: Sync + Send + 'static { | ||||
|     async fn finish(&self, batch: &Batch); | ||||
| } | ||||
|  | ||||
| pub fn create_task_store<P>(env: heed::Env, performer: Arc<P>) -> Result<TaskStore> | ||||
| pub fn create_task_store<P>(env: Arc<heed::Env>, performer: Arc<P>) -> Result<TaskStore> | ||||
| where | ||||
|     P: TaskPerformer, | ||||
| { | ||||
|   | ||||
| @@ -114,7 +114,7 @@ impl Clone for TaskStore { | ||||
| } | ||||
|  | ||||
| impl TaskStore { | ||||
|     pub fn new(env: heed::Env) -> Result<Self> { | ||||
|     pub fn new(env: Arc<heed::Env>) -> Result<Self> { | ||||
|         let mut store = Store::new(env)?; | ||||
|         let unfinished_tasks = store.reset_and_return_unfinished_tasks()?; | ||||
|         let store = Arc::new(store); | ||||
| @@ -293,7 +293,7 @@ impl TaskStore { | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     pub fn load_dump(src: impl AsRef<Path>, env: Env) -> anyhow::Result<()> { | ||||
|     pub fn load_dump(src: impl AsRef<Path>, env: Arc<Env>) -> anyhow::Result<()> { | ||||
|         // create a dummy update field store, since it is not needed right now. | ||||
|         let store = Self::new(env.clone())?; | ||||
|  | ||||
| @@ -340,7 +340,7 @@ pub mod test { | ||||
|     } | ||||
|  | ||||
|     impl MockTaskStore { | ||||
|         pub fn new(env: heed::Env) -> Result<Self> { | ||||
|         pub fn new(env: Arc<heed::Env>) -> Result<Self> { | ||||
|             Ok(Self::Real(TaskStore::new(env)?)) | ||||
|         } | ||||
|  | ||||
| @@ -432,7 +432,7 @@ pub mod test { | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         pub fn load_dump(path: impl AsRef<Path>, env: Env) -> anyhow::Result<()> { | ||||
|         pub fn load_dump(path: impl AsRef<Path>, env: Arc<Env>) -> anyhow::Result<()> { | ||||
|             TaskStore::load_dump(path, env) | ||||
|         } | ||||
|     } | ||||
|   | ||||
| @@ -10,6 +10,7 @@ use std::convert::TryInto; | ||||
| use std::mem::size_of; | ||||
| use std::ops::Range; | ||||
| use std::result::Result as StdResult; | ||||
| use std::sync::Arc; | ||||
|  | ||||
| use heed::types::{ByteSlice, OwnedType, SerdeJson, Unit}; | ||||
| use heed::{BytesDecode, BytesEncode, Database, Env, RoTxn, RwTxn}; | ||||
| @@ -53,18 +54,26 @@ impl<'a> BytesDecode<'a> for IndexUidTaskIdCodec { | ||||
| } | ||||
|  | ||||
| pub struct Store { | ||||
|     env: Env, | ||||
|     env: Arc<Env>, | ||||
|     uids_task_ids: Database<IndexUidTaskIdCodec, Unit>, | ||||
|     tasks: Database<OwnedType<BEU64>, SerdeJson<Task>>, | ||||
| } | ||||
|  | ||||
| impl Drop for Store { | ||||
|     fn drop(&mut self) { | ||||
|         if Arc::strong_count(&self.env) == 1 { | ||||
|             self.env.as_ref().clone().prepare_for_closing(); | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl Store { | ||||
|     /// Create a new store from the specified `Path`. | ||||
|     /// Be really cautious when calling this function, the returned `Store` may | ||||
|     /// be in an invalid state, with dangling processing tasks. | ||||
|     /// You want to patch  all un-finished tasks and put them in your pending | ||||
|     /// queue with the `reset_and_return_unfinished_update` method. | ||||
|     pub fn new(env: heed::Env) -> Result<Self> { | ||||
|     pub fn new(env: Arc<heed::Env>) -> Result<Self> { | ||||
|         let uids_task_ids = env.create_database(Some(UID_TASK_IDS))?; | ||||
|         let tasks = env.create_database(Some(TASKS))?; | ||||
|  | ||||
| @@ -257,10 +266,10 @@ pub mod test { | ||||
|         Fake(Mocker), | ||||
|     } | ||||
|  | ||||
|     pub struct TmpEnv(TempDir, heed::Env); | ||||
|     pub struct TmpEnv(TempDir, Arc<heed::Env>); | ||||
|  | ||||
|     impl TmpEnv { | ||||
|         pub fn env(&self) -> heed::Env { | ||||
|         pub fn env(&self) -> Arc<heed::Env> { | ||||
|             self.1.clone() | ||||
|         } | ||||
|     } | ||||
| @@ -271,13 +280,13 @@ pub mod test { | ||||
|         let mut options = EnvOpenOptions::new(); | ||||
|         options.map_size(4096 * 100000); | ||||
|         options.max_dbs(1000); | ||||
|         let env = options.open(tmp.path()).unwrap(); | ||||
|         let env = Arc::new(options.open(tmp.path()).unwrap()); | ||||
|  | ||||
|         TmpEnv(tmp, env) | ||||
|     } | ||||
|  | ||||
|     impl MockStore { | ||||
|         pub fn new(env: heed::Env) -> Result<Self> { | ||||
|         pub fn new(env: Arc<heed::Env>) -> Result<Self> { | ||||
|             Ok(Self::Real(Store::new(env)?)) | ||||
|         } | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user