mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-31 07:56:28 +00:00 
			
		
		
		
	test dump handler
This commit is contained in:
		| @@ -1,18 +1,10 @@ | ||||
| use std::{fs::File, path::PathBuf, sync::Arc}; | ||||
| #[cfg(not(test))] | ||||
| pub use real::DumpHandler; | ||||
|  | ||||
| #[cfg(test)] | ||||
| pub use test::MockDumpHandler as DumpHandler; | ||||
|  | ||||
| use log::{info, trace}; | ||||
| use meilisearch_auth::AuthController; | ||||
| use milli::heed::Env; | ||||
| use time::{macros::format_description, OffsetDateTime}; | ||||
| use tokio::fs::create_dir_all; | ||||
|  | ||||
| use crate::analytics; | ||||
| use crate::compression::to_tar_gz; | ||||
| use crate::dump::error::{DumpError, Result}; | ||||
| use crate::dump::{MetadataVersion, META_FILE_NAME}; | ||||
| use crate::index_resolver::{index_store::IndexStore, meta_store::IndexMetaStore, IndexResolver}; | ||||
| use crate::tasks::TaskStore; | ||||
| use crate::update_file_store::UpdateFileStore; | ||||
|  | ||||
| /// Generate uid from creation date | ||||
| pub fn generate_uid() -> String { | ||||
| @@ -23,67 +15,166 @@ pub fn generate_uid() -> String { | ||||
|         .unwrap() | ||||
| } | ||||
|  | ||||
| pub struct DumpHandler<U, I> { | ||||
|     pub dump_path: PathBuf, | ||||
|     pub db_path: PathBuf, | ||||
|     pub update_file_store: UpdateFileStore, | ||||
|     pub task_store_size: usize, | ||||
|     pub index_db_size: usize, | ||||
|     pub env: Arc<Env>, | ||||
|     pub index_resolver: Arc<IndexResolver<U, I>>, | ||||
| } | ||||
| mod real { | ||||
|     use std::{fs::File, path::PathBuf, sync::Arc}; | ||||
|  | ||||
| impl<U, I> DumpHandler<U, I> | ||||
| where | ||||
|     U: IndexMetaStore + Sync + Send + 'static, | ||||
|     I: IndexStore + Sync + Send + 'static, | ||||
| { | ||||
|     pub async fn run(&self, uid: String) -> Result<()> { | ||||
|         trace!("Performing dump."); | ||||
|     use log::{info, trace}; | ||||
|     use meilisearch_auth::AuthController; | ||||
|     use milli::heed::Env; | ||||
|     use tokio::fs::create_dir_all; | ||||
|  | ||||
|         create_dir_all(&self.dump_path).await?; | ||||
|     use crate::analytics; | ||||
|     use crate::compression::to_tar_gz; | ||||
|     use crate::dump::error::{DumpError, Result}; | ||||
|     use crate::dump::{MetadataVersion, META_FILE_NAME}; | ||||
|     use crate::index_resolver::{ | ||||
|         index_store::IndexStore, meta_store::IndexMetaStore, IndexResolver, | ||||
|     }; | ||||
|     use crate::tasks::TaskStore; | ||||
|     use crate::update_file_store::UpdateFileStore; | ||||
|  | ||||
|         let temp_dump_dir = tokio::task::spawn_blocking(tempfile::TempDir::new).await??; | ||||
|         let temp_dump_path = temp_dump_dir.path().to_owned(); | ||||
|     pub struct DumpHandler<U, I> { | ||||
|         dump_path: PathBuf, | ||||
|         db_path: PathBuf, | ||||
|         update_file_store: UpdateFileStore, | ||||
|         task_store_size: usize, | ||||
|         index_db_size: usize, | ||||
|         env: Arc<Env>, | ||||
|         index_resolver: Arc<IndexResolver<U, I>>, | ||||
|     } | ||||
|  | ||||
|         let meta = MetadataVersion::new_v5(self.index_db_size, self.task_store_size); | ||||
|         let meta_path = temp_dump_path.join(META_FILE_NAME); | ||||
|         // TODO: blocking | ||||
|         let mut meta_file = File::create(&meta_path)?; | ||||
|         serde_json::to_writer(&mut meta_file, &meta)?; | ||||
|         analytics::copy_user_id(&self.db_path, &temp_dump_path); | ||||
|     impl<U, I> DumpHandler<U, I> | ||||
|     where | ||||
|         U: IndexMetaStore + Sync + Send + 'static, | ||||
|         I: IndexStore + Sync + Send + 'static, | ||||
|     { | ||||
|         pub fn new( | ||||
|             dump_path: PathBuf, | ||||
|             db_path: PathBuf, | ||||
|             update_file_store: UpdateFileStore, | ||||
|             task_store_size: usize, | ||||
|             index_db_size: usize, | ||||
|             env: Arc<Env>, | ||||
|             index_resolver: Arc<IndexResolver<U, I>>, | ||||
|         ) -> Self { | ||||
|             Self { | ||||
|                 dump_path, | ||||
|                 db_path, | ||||
|                 update_file_store, | ||||
|                 task_store_size, | ||||
|                 index_db_size, | ||||
|                 env, | ||||
|                 index_resolver, | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         create_dir_all(&temp_dump_path.join("indexes")).await?; | ||||
|         pub async fn run(&self, uid: String) -> Result<()> { | ||||
|             trace!("Performing dump."); | ||||
|  | ||||
|         // TODO: this is blocking!! | ||||
|         AuthController::dump(&self.db_path, &temp_dump_path)?; | ||||
|         TaskStore::dump( | ||||
|             self.env.clone(), | ||||
|             &temp_dump_path, | ||||
|             self.update_file_store.clone(), | ||||
|         ) | ||||
|         .await?; | ||||
|         self.index_resolver.dump(&temp_dump_path).await?; | ||||
|             create_dir_all(&self.dump_path).await?; | ||||
|  | ||||
|         let dump_path = self.dump_path.clone(); | ||||
|         let dump_path = tokio::task::spawn_blocking(move || -> Result<PathBuf> { | ||||
|             // for now we simply copy the updates/updates_files | ||||
|             // FIXME: We may copy more files than necessary, if new files are added while we are | ||||
|             // performing the dump. We need a way to filter them out. | ||||
|             let temp_dump_dir = tokio::task::spawn_blocking(tempfile::TempDir::new).await??; | ||||
|             let temp_dump_path = temp_dump_dir.path().to_owned(); | ||||
|  | ||||
|             let temp_dump_file = tempfile::NamedTempFile::new_in(&dump_path)?; | ||||
|             to_tar_gz(temp_dump_path, temp_dump_file.path()) | ||||
|                 .map_err(|e| DumpError::Internal(e.into()))?; | ||||
|             let meta = MetadataVersion::new_v5(self.index_db_size, self.task_store_size); | ||||
|             let meta_path = temp_dump_path.join(META_FILE_NAME); | ||||
|             // TODO: blocking | ||||
|             let mut meta_file = File::create(&meta_path)?; | ||||
|             serde_json::to_writer(&mut meta_file, &meta)?; | ||||
|             analytics::copy_user_id(&self.db_path, &temp_dump_path); | ||||
|  | ||||
|             let dump_path = dump_path.join(uid).with_extension("dump"); | ||||
|             temp_dump_file.persist(&dump_path)?; | ||||
|             create_dir_all(&temp_dump_path.join("indexes")).await?; | ||||
|  | ||||
|             Ok(dump_path) | ||||
|         }) | ||||
|         .await??; | ||||
|             // TODO: this is blocking!! | ||||
|             AuthController::dump(&self.db_path, &temp_dump_path)?; | ||||
|             TaskStore::dump( | ||||
|                 self.env.clone(), | ||||
|                 &temp_dump_path, | ||||
|                 self.update_file_store.clone(), | ||||
|             ) | ||||
|             .await?; | ||||
|             self.index_resolver.dump(&temp_dump_path).await?; | ||||
|  | ||||
|         info!("Created dump in {:?}.", dump_path); | ||||
|             let dump_path = self.dump_path.clone(); | ||||
|             let dump_path = tokio::task::spawn_blocking(move || -> Result<PathBuf> { | ||||
|                 // for now we simply copy the updates/updates_files | ||||
|                 // FIXME: We may copy more files than necessary, if new files are added while we are | ||||
|                 // performing the dump. We need a way to filter them out. | ||||
|  | ||||
|         Ok(()) | ||||
|                 let temp_dump_file = tempfile::NamedTempFile::new_in(&dump_path)?; | ||||
|                 to_tar_gz(temp_dump_path, temp_dump_file.path()) | ||||
|                     .map_err(|e| DumpError::Internal(e.into()))?; | ||||
|  | ||||
|                 let dump_path = dump_path.join(uid).with_extension("dump"); | ||||
|                 temp_dump_file.persist(&dump_path)?; | ||||
|  | ||||
|                 Ok(dump_path) | ||||
|             }) | ||||
|             .await??; | ||||
|  | ||||
|             info!("Created dump in {:?}.", dump_path); | ||||
|  | ||||
|             Ok(()) | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[cfg(test)] | ||||
| mod test { | ||||
|     use std::marker::PhantomData; | ||||
|     use std::path::PathBuf; | ||||
|     use std::sync::Arc; | ||||
|  | ||||
|     use milli::heed::Env; | ||||
|     use nelson::Mocker; | ||||
|  | ||||
|     use crate::dump::error::Result; | ||||
|     use crate::index_resolver::IndexResolver; | ||||
|     use crate::index_resolver::{index_store::IndexStore, meta_store::IndexMetaStore}; | ||||
|     use crate::update_file_store::UpdateFileStore; | ||||
|  | ||||
|     use super::*; | ||||
|  | ||||
|     pub enum MockDumpHandler<U, I> { | ||||
|         Real(super::real::DumpHandler<U, I>), | ||||
|         Mock(Mocker, PhantomData<(U, I)>), | ||||
|     } | ||||
|  | ||||
|     impl<U, I> MockDumpHandler<U, I> { | ||||
|         pub fn mock(mocker: Mocker) -> Self { | ||||
|             Self::Mock(mocker, PhantomData) | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     impl<U, I> MockDumpHandler<U, I> | ||||
|     where | ||||
|         U: IndexMetaStore + Sync + Send + 'static, | ||||
|         I: IndexStore + Sync + Send + 'static, | ||||
|     { | ||||
|         pub fn new( | ||||
|             dump_path: PathBuf, | ||||
|             db_path: PathBuf, | ||||
|             update_file_store: UpdateFileStore, | ||||
|             task_store_size: usize, | ||||
|             index_db_size: usize, | ||||
|             env: Arc<Env>, | ||||
|             index_resolver: Arc<IndexResolver<U, I>>, | ||||
|         ) -> Self { | ||||
|             Self::Real(super::real::DumpHandler::new( | ||||
|                 dump_path, | ||||
|                 db_path, | ||||
|                 update_file_store, | ||||
|                 task_store_size, | ||||
|                 index_db_size, | ||||
|                 env, | ||||
|                 index_resolver, | ||||
|             )) | ||||
|         } | ||||
|         pub async fn run(&self, uid: String) -> Result<()> { | ||||
|             match self { | ||||
|                 DumpHandler::Real(real) => real.run(uid).await, | ||||
|                 DumpHandler::Mock(mocker, _) => unsafe { mocker.get("run").call(uid) }, | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| } | ||||
|   | ||||
| @@ -222,15 +222,15 @@ impl IndexControllerBuilder { | ||||
|             .dump_dst | ||||
|             .ok_or_else(|| anyhow::anyhow!("Missing dump directory path"))?; | ||||
|  | ||||
|         let dump_handler = Arc::new(DumpHandler { | ||||
|         let dump_handler = Arc::new(DumpHandler::new( | ||||
|             dump_path, | ||||
|             db_path: db_path.as_ref().into(), | ||||
|             update_file_store: update_file_store.clone(), | ||||
|             db_path.as_ref().into(), | ||||
|             update_file_store.clone(), | ||||
|             task_store_size, | ||||
|             index_db_size: index_size, | ||||
|             env: meta_env.clone(), | ||||
|             index_resolver: index_resolver.clone(), | ||||
|         }); | ||||
|             index_size, | ||||
|             meta_env.clone(), | ||||
|             index_resolver.clone(), | ||||
|         )); | ||||
|         let task_store = TaskStore::new(meta_env)?; | ||||
|  | ||||
|         // register all the batch handlers for use with the scheduler. | ||||
|   | ||||
| @@ -39,3 +39,96 @@ where | ||||
|         () | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[cfg(test)] | ||||
| mod test { | ||||
|     use crate::dump::error::{DumpError, Result as DumpResult}; | ||||
|     use crate::index_resolver::{index_store::MockIndexStore, meta_store::MockIndexMetaStore}; | ||||
|     use crate::tasks::handlers::test::task_to_batch; | ||||
|  | ||||
|     use super::*; | ||||
|  | ||||
|     use nelson::Mocker; | ||||
|     use proptest::prelude::*; | ||||
|  | ||||
|     proptest! { | ||||
|         #[test] | ||||
|         fn finish_does_nothing( | ||||
|             task in any::<Task>(), | ||||
|         ) { | ||||
|             let rt = tokio::runtime::Runtime::new().unwrap(); | ||||
|             let handle = rt.spawn(async { | ||||
|                 let batch = task_to_batch(task); | ||||
|  | ||||
|                 let mocker = Mocker::default(); | ||||
|                 let dump_handler = DumpHandler::<MockIndexMetaStore, MockIndexStore>::mock(mocker); | ||||
|  | ||||
|                 dump_handler.finish(&batch).await; | ||||
|             }); | ||||
|  | ||||
|             rt.block_on(handle).unwrap(); | ||||
|         } | ||||
|  | ||||
|         #[test] | ||||
|         fn test_handle_dump_success( | ||||
|             task in any::<Task>(), | ||||
|         ) { | ||||
|             let rt = tokio::runtime::Runtime::new().unwrap(); | ||||
|             let handle = rt.spawn(async { | ||||
|                 let batch = task_to_batch(task); | ||||
|                 let should_accept = matches!(batch.content, BatchContent::Dump { .. }); | ||||
|  | ||||
|                 let mocker = Mocker::default(); | ||||
|                 if should_accept { | ||||
|                     mocker.when::<String, DumpResult<()>>("run") | ||||
|                     .once() | ||||
|                     .then(|_| Ok(())); | ||||
|                 } | ||||
|  | ||||
|                 let dump_handler = DumpHandler::<MockIndexMetaStore, MockIndexStore>::mock(mocker); | ||||
|  | ||||
|                 let accept = dump_handler.accept(&batch); | ||||
|                 assert_eq!(accept, should_accept); | ||||
|  | ||||
|                 if accept { | ||||
|                     let batch = dump_handler.process_batch(batch).await; | ||||
|                     let last_event = batch.content.first().unwrap().events.last().unwrap(); | ||||
|                     assert!(matches!(last_event, TaskEvent::Succeded { .. })); | ||||
|                 } | ||||
|             }); | ||||
|  | ||||
|             rt.block_on(handle).unwrap(); | ||||
|         } | ||||
|  | ||||
|         #[test] | ||||
|         fn test_handle_dump_error( | ||||
|             task in any::<Task>(), | ||||
|         ) { | ||||
|             let rt = tokio::runtime::Runtime::new().unwrap(); | ||||
|             let handle = rt.spawn(async { | ||||
|                 let batch = task_to_batch(task); | ||||
|                 let should_accept = matches!(batch.content, BatchContent::Dump { .. }); | ||||
|  | ||||
|                 let mocker = Mocker::default(); | ||||
|                 if should_accept { | ||||
|                     mocker.when::<String, DumpResult<()>>("run") | ||||
|                     .once() | ||||
|                     .then(|_| Err(DumpError::Internal("error".into()))); | ||||
|                 } | ||||
|  | ||||
|                 let dump_handler = DumpHandler::<MockIndexMetaStore, MockIndexStore>::mock(mocker); | ||||
|  | ||||
|                 let accept = dump_handler.accept(&batch); | ||||
|                 assert_eq!(accept, should_accept); | ||||
|  | ||||
|                 if accept { | ||||
|                     let batch = dump_handler.process_batch(batch).await; | ||||
|                     let last_event = batch.content.first().unwrap().events.last().unwrap(); | ||||
|                     assert!(matches!(last_event, TaskEvent::Failed { .. })); | ||||
|                 } | ||||
|             }); | ||||
|  | ||||
|             rt.block_on(handle).unwrap(); | ||||
|         } | ||||
|     } | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user