mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-31 07:56:28 +00:00 
			
		
		
		
	update store tests
This commit is contained in:
		| @@ -103,20 +103,31 @@ pub mod test { | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub struct StubBuilder<'a> { | ||||
|     pub struct StubBuilder<'a, A, R> { | ||||
|         name: String, | ||||
|         store: &'a StubStore, | ||||
|         times: Option<usize>, | ||||
|         _f: std::marker::PhantomData<fn(A) -> R> | ||||
|     } | ||||
|  | ||||
|     impl<'a> StubBuilder<'a> { | ||||
|     impl<'a, A: 'static, R: 'static> StubBuilder<'a, A, R> { | ||||
|         /// Asserts the stub has been called exactly `times` times. | ||||
|        #[must_use] | ||||
|         pub fn times(mut self, times: usize) -> Self { | ||||
|             self.times = Some(times); | ||||
|             self | ||||
|         } | ||||
|  | ||||
|         pub fn then<A: 'static, R: 'static>(self, f: impl Fn(A) -> R + Sync + Send + 'static) { | ||||
|         /// Asserts the stub has been called exactly once. | ||||
|        #[must_use] | ||||
|         pub fn once(mut self) -> Self { | ||||
|             self.times = Some(1); | ||||
|             self | ||||
|         } | ||||
|  | ||||
|         /// The function that will be called when the stub is called. This needs to be called to | ||||
|         /// actually build the stub and register it to the stub store. | ||||
|         pub fn then(self, f: impl Fn(A) -> R + Sync + Send + 'static) { | ||||
|             let stub = Stub { | ||||
|                 stub: Box::new(f), | ||||
|                 times: self.times, | ||||
| @@ -130,21 +141,18 @@ pub mod test { | ||||
|  | ||||
|     /// Mocker allows to stub metod call on any struct. you can register stubs by calling | ||||
|     /// `Mocker::when` and retrieve it in the proxy implementation when with `Mocker::get`. | ||||
|     /// | ||||
|     /// Mocker uses unsafe code to erase function types, because `Any` is too restrictive with it's | ||||
|     /// requirement for all stub arguments to be static. Because of that panic inside a stub is UB, | ||||
|     /// and it has been observed to crash with an illegal hardware instruction. Use with caution. | ||||
|     #[derive(Debug, Default)] | ||||
|     pub struct Mocker { | ||||
|         store: StubStore, | ||||
|     } | ||||
|  | ||||
|     impl Mocker { | ||||
|         pub fn when(&self, name: &str) -> StubBuilder { | ||||
|         pub fn when<A, R>(&self, name: &str) -> StubBuilder<A, R> { | ||||
|             StubBuilder { | ||||
|                 name: name.to_string(), | ||||
|                 store: &self.store, | ||||
|                 times: None, | ||||
|                 _f: std::marker::PhantomData, | ||||
|             } | ||||
|         } | ||||
|  | ||||
| @@ -191,7 +199,7 @@ pub mod test { | ||||
|         pub fn handle_update(&self, update: Processing) -> std::result::Result<Processed, Failed> { | ||||
|             match self { | ||||
|                 MockIndex::Vrai(index) => index.handle_update(update), | ||||
|                 MockIndex::Faux(_) => todo!(), | ||||
|                 MockIndex::Faux(faux) => faux.get("handle_update").call(update), | ||||
|             } | ||||
|         } | ||||
|  | ||||
|   | ||||
| @@ -569,149 +569,221 @@ impl UpdateStore { | ||||
|     } | ||||
| } | ||||
|  | ||||
| //#[cfg(test)] | ||||
| //mod test { | ||||
| //use super::*; | ||||
| //use crate::index_controller::{ | ||||
| //index_actor::{error::IndexActorError, MockIndexActorHandle}, | ||||
| //UpdateResult, | ||||
| //}; | ||||
| #[cfg(test)] | ||||
| mod test { | ||||
|     use futures::future::ok; | ||||
|     use mockall::predicate::eq; | ||||
|  | ||||
| //use futures::future::ok; | ||||
|     use crate::index::error::IndexError; | ||||
|     use crate::index::test::Mocker; | ||||
|     use crate::index_controller::index_resolver::uuid_store::MockUuidStore; | ||||
|     use crate::index_controller::index_resolver::index_store::MockIndexStore; | ||||
|     use crate::index_controller::updates::status::{Failed, Processed}; | ||||
|  | ||||
| //#[actix_rt::test] | ||||
| //async fn test_next_id() { | ||||
| //let dir = tempfile::tempdir_in(".").unwrap(); | ||||
| //let mut options = EnvOpenOptions::new(); | ||||
| //let handle = Arc::new(MockIndexActorHandle::new()); | ||||
| //options.map_size(4096 * 100); | ||||
| //let update_store = UpdateStore::open( | ||||
| //options, | ||||
| //dir.path(), | ||||
| //handle, | ||||
| //Arc::new(AtomicBool::new(false)), | ||||
| //) | ||||
| //.unwrap(); | ||||
|     use super::*; | ||||
|  | ||||
| //let index1_uuid = Uuid::new_v4(); | ||||
| //let index2_uuid = Uuid::new_v4(); | ||||
|     #[actix_rt::test] | ||||
|     async fn test_next_id() { | ||||
|         let dir = tempfile::tempdir_in(".").unwrap(); | ||||
|         let mut options = EnvOpenOptions::new(); | ||||
|         let index_store = MockIndexStore::new(); | ||||
|         let uuid_store = MockUuidStore::new(); | ||||
|         let index_resolver = IndexResolver::new(uuid_store, index_store); | ||||
|         let update_file_store = UpdateFileStore::new(dir.path()).unwrap(); | ||||
|         options.map_size(4096 * 100); | ||||
|         let update_store = UpdateStore::open( | ||||
|             options, | ||||
|             dir.path(), | ||||
|             Arc::new(index_resolver), | ||||
|             Arc::new(AtomicBool::new(false)), | ||||
|             update_file_store, | ||||
|         ) | ||||
|             .unwrap(); | ||||
|  | ||||
| //let mut txn = update_store.env.write_txn().unwrap(); | ||||
| //let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap(); | ||||
| //txn.commit().unwrap(); | ||||
| //assert_eq!((0, 0), ids); | ||||
|         let index1_uuid = Uuid::new_v4(); | ||||
|         let index2_uuid = Uuid::new_v4(); | ||||
|  | ||||
| //let mut txn = update_store.env.write_txn().unwrap(); | ||||
| //let ids = update_store.next_update_id(&mut txn, index2_uuid).unwrap(); | ||||
| //txn.commit().unwrap(); | ||||
| //assert_eq!((1, 0), ids); | ||||
|         let mut txn = update_store.env.write_txn().unwrap(); | ||||
|         let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap(); | ||||
|         txn.commit().unwrap(); | ||||
|         assert_eq!((0, 0), ids); | ||||
|  | ||||
| //let mut txn = update_store.env.write_txn().unwrap(); | ||||
| //let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap(); | ||||
| //txn.commit().unwrap(); | ||||
| //assert_eq!((2, 1), ids); | ||||
| //} | ||||
|         let mut txn = update_store.env.write_txn().unwrap(); | ||||
|         let ids = update_store.next_update_id(&mut txn, index2_uuid).unwrap(); | ||||
|         txn.commit().unwrap(); | ||||
|         assert_eq!((1, 0), ids); | ||||
|  | ||||
| //#[actix_rt::test] | ||||
| //async fn test_register_update() { | ||||
| //let dir = tempfile::tempdir_in(".").unwrap(); | ||||
| //let mut options = EnvOpenOptions::new(); | ||||
| //let handle = Arc::new(MockIndexActorHandle::new()); | ||||
| //options.map_size(4096 * 100); | ||||
| //let update_store = UpdateStore::open( | ||||
| //options, | ||||
| //dir.path(), | ||||
| //handle, | ||||
| //Arc::new(AtomicBool::new(false)), | ||||
| //) | ||||
| //.unwrap(); | ||||
| //let meta = UpdateMeta::ClearDocuments; | ||||
| //let uuid = Uuid::new_v4(); | ||||
| //let store_clone = update_store.clone(); | ||||
| //tokio::task::spawn_blocking(move || { | ||||
| //store_clone.register_update(meta, None, uuid).unwrap(); | ||||
| //}) | ||||
| //.await | ||||
| //.unwrap(); | ||||
|         let mut txn = update_store.env.write_txn().unwrap(); | ||||
|         let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap(); | ||||
|         txn.commit().unwrap(); | ||||
|         assert_eq!((2, 1), ids); | ||||
|     } | ||||
|  | ||||
| //let txn = update_store.env.read_txn().unwrap(); | ||||
| //assert!(update_store | ||||
| //.pending_queue | ||||
| //.get(&txn, &(0, uuid, 0)) | ||||
| //.unwrap() | ||||
| //.is_some()); | ||||
| //} | ||||
|     #[actix_rt::test] | ||||
|     async fn test_register_update() { | ||||
|         let dir = tempfile::tempdir_in(".").unwrap(); | ||||
|         let index_store = MockIndexStore::new(); | ||||
|         let uuid_store = MockUuidStore::new(); | ||||
|         let index_resolver = IndexResolver::new(uuid_store, index_store); | ||||
|         let update_file_store = UpdateFileStore::new(dir.path()).unwrap(); | ||||
|         let mut options = EnvOpenOptions::new(); | ||||
|         options.map_size(4096 * 100); | ||||
|         let update_store = UpdateStore::open( | ||||
|             options, | ||||
|             dir.path(), | ||||
|             Arc::new(index_resolver), | ||||
|             Arc::new(AtomicBool::new(false)), | ||||
|             update_file_store, | ||||
|         ) | ||||
|             .unwrap(); | ||||
|         let update = Update::ClearDocuments; | ||||
|         let uuid = Uuid::new_v4(); | ||||
|         let store_clone = update_store.clone(); | ||||
|         tokio::task::spawn_blocking(move || { | ||||
|             store_clone.register_update(uuid, update).unwrap(); | ||||
|         }) | ||||
|         .await | ||||
|             .unwrap(); | ||||
|  | ||||
| //#[actix_rt::test] | ||||
| //async fn test_process_update() { | ||||
| //let dir = tempfile::tempdir_in(".").unwrap(); | ||||
| //let mut handle = MockIndexActorHandle::new(); | ||||
|         let txn = update_store.env.read_txn().unwrap(); | ||||
|         assert!(update_store | ||||
|             .pending_queue | ||||
|             .get(&txn, &(0, uuid, 0)) | ||||
|             .unwrap() | ||||
|             .is_some()); | ||||
|     } | ||||
|  | ||||
| //handle | ||||
| //.expect_update() | ||||
| //.times(2) | ||||
| //.returning(|_index_uuid, processing, _file| { | ||||
| //if processing.id() == 0 { | ||||
| //Box::pin(ok(Ok(processing.process(UpdateResult::Other)))) | ||||
| //} else { | ||||
| //Box::pin(ok(Err( | ||||
| //processing.fail(IndexActorError::ExistingPrimaryKey.into()) | ||||
| //))) | ||||
| //} | ||||
| //}); | ||||
|     #[actix_rt::test] | ||||
|     async fn test_process_update_success() { | ||||
|         let dir = tempfile::tempdir_in(".").unwrap(); | ||||
|         let index_uuid = Uuid::new_v4(); | ||||
|  | ||||
| //let handle = Arc::new(handle); | ||||
|         let mut index_store = MockIndexStore::new(); | ||||
|         index_store | ||||
|             .expect_get() | ||||
|             .with(eq(index_uuid)) | ||||
|             .returning(|_uuid| { | ||||
|                 let mocker = Mocker::default(); | ||||
|                 mocker | ||||
|                     .when::<Processing, std::result::Result<Processed, Failed>>("handle_update") | ||||
|                     .once() | ||||
|                     .then(|update| Ok(update.process(status::UpdateResult::Other))); | ||||
|  | ||||
| //let mut options = EnvOpenOptions::new(); | ||||
| //options.map_size(4096 * 100); | ||||
| //let store = UpdateStore::open( | ||||
| //options, | ||||
| //dir.path(), | ||||
| //handle.clone(), | ||||
| //Arc::new(AtomicBool::new(false)), | ||||
| //) | ||||
| //.unwrap(); | ||||
|                 Box::pin(ok(Some(Index::faux(mocker)))) | ||||
|             }); | ||||
|  | ||||
| //// wait a bit for the event loop exit. | ||||
| //tokio::time::sleep(std::time::Duration::from_millis(50)).await; | ||||
|         let uuid_store = MockUuidStore::new(); | ||||
|         let index_resolver = Arc::new(IndexResolver::new(uuid_store, index_store)); | ||||
|  | ||||
| //let mut txn = store.env.write_txn().unwrap(); | ||||
|  | ||||
| //let update = Enqueued::new(UpdateMeta::ClearDocuments, 0, None); | ||||
| //let uuid = Uuid::new_v4(); | ||||
|         let update_file_store = UpdateFileStore::new(dir.path()).unwrap(); | ||||
|         let mut options = EnvOpenOptions::new(); | ||||
|         options.map_size(4096 * 100); | ||||
|         let store = UpdateStore::open( | ||||
|             options, | ||||
|             dir.path(), | ||||
|             index_resolver.clone(), | ||||
|             Arc::new(AtomicBool::new(false)), | ||||
|             update_file_store, | ||||
|         ) | ||||
|             .unwrap(); | ||||
|  | ||||
| //store | ||||
| //.pending_queue | ||||
| //.put(&mut txn, &(0, uuid, 0), &update) | ||||
| //.unwrap(); | ||||
|         // wait a bit for the event loop exit. | ||||
|         tokio::time::sleep(std::time::Duration::from_millis(50)).await; | ||||
|  | ||||
| //let update = Enqueued::new(UpdateMeta::ClearDocuments, 1, None); | ||||
|         let mut txn = store.env.write_txn().unwrap(); | ||||
|  | ||||
| //store | ||||
| //.pending_queue | ||||
| //.put(&mut txn, &(1, uuid, 1), &update) | ||||
| //.unwrap(); | ||||
|         let update = Enqueued::new(Update::ClearDocuments, 0); | ||||
|  | ||||
| //txn.commit().unwrap(); | ||||
|         store | ||||
|             .pending_queue | ||||
|             .put(&mut txn, &(0, index_uuid, 0), &update) | ||||
|             .unwrap(); | ||||
|  | ||||
| //// Process the pending, and check that it has been moved to the update databases, and | ||||
| //// removed from the pending database. | ||||
| //let store_clone = store.clone(); | ||||
| //tokio::task::spawn_blocking(move || { | ||||
| //store_clone.process_pending_update(handle.clone()).unwrap(); | ||||
| //store_clone.process_pending_update(handle).unwrap(); | ||||
| //}) | ||||
| //.await | ||||
| //.unwrap(); | ||||
|  | ||||
| //let txn = store.env.read_txn().unwrap(); | ||||
|         txn.commit().unwrap(); | ||||
|  | ||||
| //assert!(store.pending_queue.first(&txn).unwrap().is_none()); | ||||
| //let update = store.updates.get(&txn, &(uuid, 0)).unwrap().unwrap(); | ||||
|         // Process the pending, and check that it has been moved to the update databases, and | ||||
|         // removed from the pending database. | ||||
|         let store_clone = store.clone(); | ||||
|         tokio::task::spawn_blocking(move || { | ||||
|             store_clone.process_pending_update(index_resolver).unwrap(); | ||||
|         }) | ||||
|         .await | ||||
|             .unwrap(); | ||||
|  | ||||
| //assert!(matches!(update, UpdateStatus::Processed(_))); | ||||
| //let update = store.updates.get(&txn, &(uuid, 1)).unwrap().unwrap(); | ||||
|         let txn = store.env.read_txn().unwrap(); | ||||
|  | ||||
| //assert!(matches!(update, UpdateStatus::Failed(_))); | ||||
| //} | ||||
| //} | ||||
|         assert!(store.pending_queue.first(&txn).unwrap().is_none()); | ||||
|         let update = store.updates.get(&txn, &(index_uuid, 0)).unwrap().unwrap(); | ||||
|  | ||||
|         assert!(matches!(update, UpdateStatus::Processed(_))); | ||||
|     } | ||||
|  | ||||
|     #[actix_rt::test] | ||||
|     async fn test_process_update_failure() { | ||||
|         let dir = tempfile::tempdir_in(".").unwrap(); | ||||
|         let index_uuid = Uuid::new_v4(); | ||||
|  | ||||
|         let mut index_store = MockIndexStore::new(); | ||||
|         index_store | ||||
|             .expect_get() | ||||
|             .with(eq(index_uuid)) | ||||
|             .returning(|_uuid| { | ||||
|                 let mocker = Mocker::default(); | ||||
|                 mocker | ||||
|                     .when::<Processing, std::result::Result<Processed, Failed>>("handle_update") | ||||
|                     .once() | ||||
|                     .then(|update| Err(update.fail(IndexError::ExistingPrimaryKey))); | ||||
|  | ||||
|                 Box::pin(ok(Some(Index::faux(mocker)))) | ||||
|             }); | ||||
|  | ||||
|         let uuid_store = MockUuidStore::new(); | ||||
|         let index_resolver = Arc::new(IndexResolver::new(uuid_store, index_store)); | ||||
|  | ||||
|  | ||||
|         let update_file_store = UpdateFileStore::new(dir.path()).unwrap(); | ||||
|         let mut options = EnvOpenOptions::new(); | ||||
|         options.map_size(4096 * 100); | ||||
|         let store = UpdateStore::open( | ||||
|             options, | ||||
|             dir.path(), | ||||
|             index_resolver.clone(), | ||||
|             Arc::new(AtomicBool::new(false)), | ||||
|             update_file_store, | ||||
|         ) | ||||
|             .unwrap(); | ||||
|  | ||||
|         // wait a bit for the event loop exit. | ||||
|         tokio::time::sleep(std::time::Duration::from_millis(50)).await; | ||||
|  | ||||
|         let mut txn = store.env.write_txn().unwrap(); | ||||
|  | ||||
|         let update = Enqueued::new(Update::ClearDocuments, 0); | ||||
|  | ||||
|         store | ||||
|             .pending_queue | ||||
|             .put(&mut txn, &(0, index_uuid, 0), &update) | ||||
|             .unwrap(); | ||||
|  | ||||
|  | ||||
|         txn.commit().unwrap(); | ||||
|  | ||||
|         // Process the pending, and check that it has been moved to the update databases, and | ||||
|         // removed from the pending database. | ||||
|         let store_clone = store.clone(); | ||||
|         tokio::task::spawn_blocking(move || { | ||||
|             store_clone.process_pending_update(index_resolver).unwrap(); | ||||
|         }) | ||||
|         .await | ||||
|             .unwrap(); | ||||
|  | ||||
|         let txn = store.env.read_txn().unwrap(); | ||||
|  | ||||
|         assert!(store.pending_queue.first(&txn).unwrap().is_none()); | ||||
|         let update = store.updates.get(&txn, &(index_uuid, 0)).unwrap().unwrap(); | ||||
|  | ||||
|         assert!(matches!(update, UpdateStatus::Failed(_))); | ||||
|     } | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user