mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-25 04:56:28 +00:00 
			
		
		
		
	Change the UpdateStore to have different processed and pending meta types
This commit is contained in:
		| @@ -359,7 +359,7 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { | |||||||
|         }); |         }); | ||||||
|  |  | ||||||
|     async fn buf_stream( |     async fn buf_stream( | ||||||
|         update_store: Arc<UpdateStore<String>>, |         update_store: Arc<UpdateStore<String, String>>, | ||||||
|         update_status_sender: broadcast::Sender<UpdateStatus<String>>, |         update_status_sender: broadcast::Sender<UpdateStatus<String>>, | ||||||
|         mut stream: impl futures::Stream<Item=Result<impl bytes::Buf, warp::Error>> + Unpin, |         mut stream: impl futures::Stream<Item=Result<impl bytes::Buf, warp::Error>> + Unpin, | ||||||
|     ) -> Result<impl warp::Reply, warp::Rejection> |     ) -> Result<impl warp::Reply, warp::Rejection> | ||||||
|   | |||||||
| @@ -9,24 +9,25 @@ use serde::{Serialize, Deserialize}; | |||||||
| use crate::BEU64; | use crate::BEU64; | ||||||
|  |  | ||||||
| #[derive(Clone)] | #[derive(Clone)] | ||||||
| pub struct UpdateStore<M> { | pub struct UpdateStore<M, N> { | ||||||
|     env: Env, |     env: Env, | ||||||
|     pending_meta: Database<OwnedType<BEU64>, SerdeBincode<M>>, |     pending_meta: Database<OwnedType<BEU64>, SerdeBincode<M>>, | ||||||
|     pending: Database<OwnedType<BEU64>, ByteSlice>, |     pending: Database<OwnedType<BEU64>, ByteSlice>, | ||||||
|     processed_meta: Database<OwnedType<BEU64>, SerdeBincode<M>>, |     processed_meta: Database<OwnedType<BEU64>, SerdeBincode<N>>, | ||||||
|     notification_sender: Sender<()>, |     notification_sender: Sender<()>, | ||||||
| } | } | ||||||
|  |  | ||||||
| impl<M: 'static> UpdateStore<M> { | impl<M: 'static, N: 'static> UpdateStore<M, N> { | ||||||
|     pub fn open<P, F>( |     pub fn open<P, F>( | ||||||
|         mut options: EnvOpenOptions, |         mut options: EnvOpenOptions, | ||||||
|         path: P, |         path: P, | ||||||
|         mut update_function: F, |         mut update_function: F, | ||||||
|     ) -> heed::Result<Arc<UpdateStore<M>>> |     ) -> heed::Result<Arc<UpdateStore<M, N>>> | ||||||
|     where |     where | ||||||
|         P: AsRef<Path>, |         P: AsRef<Path>, | ||||||
|         F: FnMut(u64, M, &[u8]) -> heed::Result<M> + Send + 'static, |         F: FnMut(u64, M, &[u8]) -> heed::Result<N> + Send + 'static, | ||||||
|         M: for<'a> Deserialize<'a> + Serialize, |         M: for<'a> Deserialize<'a>, | ||||||
|  |         N: Serialize, | ||||||
|     { |     { | ||||||
|         options.max_dbs(3); |         options.max_dbs(3); | ||||||
|         let env = options.open(path)?; |         let env = options.open(path)?; | ||||||
| @@ -111,10 +112,11 @@ impl<M: 'static> UpdateStore<M> { | |||||||
|     /// Executes the user provided function on the next pending update (the one with the lowest id). |     /// Executes the user provided function on the next pending update (the one with the lowest id). | ||||||
|     /// This is asynchronous as it let the user process the update with a read-only txn and |     /// This is asynchronous as it let the user process the update with a read-only txn and | ||||||
|     /// only writing the result meta to the processed-meta store *after* it has been processed. |     /// only writing the result meta to the processed-meta store *after* it has been processed. | ||||||
|     fn process_pending_update<F>(&self, mut f: F) -> heed::Result<Option<(u64, M)>> |     fn process_pending_update<F>(&self, mut f: F) -> heed::Result<Option<(u64, N)>> | ||||||
|     where |     where | ||||||
|         F: FnMut(u64, M, &[u8]) -> heed::Result<M>, |         F: FnMut(u64, M, &[u8]) -> heed::Result<N>, | ||||||
|         M: for<'a> Deserialize<'a> + Serialize, |         M: for<'a> Deserialize<'a>, | ||||||
|  |         N: Serialize, | ||||||
|     { |     { | ||||||
|         // Create a read transaction to be able to retrieve the pending update in order. |         // Create a read transaction to be able to retrieve the pending update in order. | ||||||
|         let rtxn = self.env.read_txn()?; |         let rtxn = self.env.read_txn()?; | ||||||
| @@ -152,8 +154,9 @@ impl<M: 'static> UpdateStore<M> { | |||||||
|     pub fn iter_metas<F, T>(&self, mut f: F) -> heed::Result<T> |     pub fn iter_metas<F, T>(&self, mut f: F) -> heed::Result<T> | ||||||
|     where |     where | ||||||
|         M: for<'a> Deserialize<'a>, |         M: for<'a> Deserialize<'a>, | ||||||
|  |         N: for<'a> Deserialize<'a>, | ||||||
|         F: for<'a> FnMut( |         F: for<'a> FnMut( | ||||||
|             heed::RoIter<'a, OwnedType<BEU64>, SerdeBincode<M>>, |             heed::RoIter<'a, OwnedType<BEU64>, SerdeBincode<N>>, | ||||||
|             heed::RoIter<'a, OwnedType<BEU64>, SerdeBincode<M>>, |             heed::RoIter<'a, OwnedType<BEU64>, SerdeBincode<M>>, | ||||||
|         ) -> heed::Result<T>, |         ) -> heed::Result<T>, | ||||||
|     { |     { | ||||||
| @@ -168,23 +171,31 @@ impl<M: 'static> UpdateStore<M> { | |||||||
|     } |     } | ||||||
|  |  | ||||||
|     /// Returns the update associated meta or `None` if the update deosn't exist. |     /// Returns the update associated meta or `None` if the update deosn't exist. | ||||||
|     pub fn meta(&self, update_id: u64) -> heed::Result<Option<M>> |     pub fn meta(&self, update_id: u64) -> heed::Result<Option<UpdateStatusMeta<M, N>>> | ||||||
|     where M: for<'a> Deserialize<'a>, |     where | ||||||
|  |         M: for<'a> Deserialize<'a>, | ||||||
|  |         N: for<'a> Deserialize<'a>, | ||||||
|     { |     { | ||||||
|         let rtxn = self.env.read_txn()?; |         let rtxn = self.env.read_txn()?; | ||||||
|         let key = BEU64::new(update_id); |         let key = BEU64::new(update_id); | ||||||
|  |  | ||||||
|         if let Some(meta) = self.pending_meta.get(&rtxn, &key)? { |         if let Some(meta) = self.pending_meta.get(&rtxn, &key)? { | ||||||
|             return Ok(Some(meta)); |             return Ok(Some(UpdateStatusMeta::Pending(meta))); | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         match self.processed_meta.get(&rtxn, &key)? { |         match self.processed_meta.get(&rtxn, &key)? { | ||||||
|             Some(meta) => Ok(Some(meta)), |             Some(meta) => Ok(Some(UpdateStatusMeta::Processed(meta))), | ||||||
|             None => Ok(None), |             None => Ok(None), | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|  | #[derive(Debug, PartialEq, Eq, Hash)] | ||||||
|  | pub enum UpdateStatusMeta<M, N> { | ||||||
|  |     Pending(M), | ||||||
|  |     Processed(N), | ||||||
|  | } | ||||||
|  |  | ||||||
| #[cfg(test)] | #[cfg(test)] | ||||||
| mod tests { | mod tests { | ||||||
|     use super::*; |     use super::*; | ||||||
| @@ -195,7 +206,7 @@ mod tests { | |||||||
|     fn simple() { |     fn simple() { | ||||||
|         let dir = tempfile::tempdir().unwrap(); |         let dir = tempfile::tempdir().unwrap(); | ||||||
|         let options = EnvOpenOptions::new(); |         let options = EnvOpenOptions::new(); | ||||||
|         let update_store = UpdateStore::open(options, dir, |id, meta: String, content| { |         let update_store = UpdateStore::open(options, dir, |_id, meta: String, _content| { | ||||||
|             Ok(meta + " processed") |             Ok(meta + " processed") | ||||||
|         }).unwrap(); |         }).unwrap(); | ||||||
|  |  | ||||||
| @@ -205,14 +216,14 @@ mod tests { | |||||||
|         thread::sleep(Duration::from_millis(100)); |         thread::sleep(Duration::from_millis(100)); | ||||||
|  |  | ||||||
|         let meta = update_store.meta(update_id).unwrap().unwrap(); |         let meta = update_store.meta(update_id).unwrap().unwrap(); | ||||||
|         assert_eq!(meta, "kiki processed"); |         assert_eq!(meta, UpdateStatusMeta::Processed(format!("kiki processed"))); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     #[test] |     #[test] | ||||||
|     fn long_running_update() { |     fn long_running_update() { | ||||||
|         let dir = tempfile::tempdir().unwrap(); |         let dir = tempfile::tempdir().unwrap(); | ||||||
|         let options = EnvOpenOptions::new(); |         let options = EnvOpenOptions::new(); | ||||||
|         let update_store = UpdateStore::open(options, dir, |id, meta: String, content| { |         let update_store = UpdateStore::open(options, dir, |_id, meta: String, _content| { | ||||||
|             thread::sleep(Duration::from_millis(400)); |             thread::sleep(Duration::from_millis(400)); | ||||||
|             Ok(meta + " processed") |             Ok(meta + " processed") | ||||||
|         }).unwrap(); |         }).unwrap(); | ||||||
| @@ -234,12 +245,12 @@ mod tests { | |||||||
|         thread::sleep(Duration::from_millis(400 * 3 + 100)); |         thread::sleep(Duration::from_millis(400 * 3 + 100)); | ||||||
|  |  | ||||||
|         let meta = update_store.meta(update_id_kiki).unwrap().unwrap(); |         let meta = update_store.meta(update_id_kiki).unwrap().unwrap(); | ||||||
|         assert_eq!(meta, "kiki processed"); |         assert_eq!(meta, UpdateStatusMeta::Processed(format!("kiki processed"))); | ||||||
|  |  | ||||||
|         let meta = update_store.meta(update_id_coco).unwrap().unwrap(); |         let meta = update_store.meta(update_id_coco).unwrap().unwrap(); | ||||||
|         assert_eq!(meta, "coco processed"); |         assert_eq!(meta, UpdateStatusMeta::Processed(format!("coco processed"))); | ||||||
|  |  | ||||||
|         let meta = update_store.meta(update_id_cucu).unwrap().unwrap(); |         let meta = update_store.meta(update_id_cucu).unwrap().unwrap(); | ||||||
|         assert_eq!(meta, "cucu processed"); |         assert_eq!(meta, UpdateStatusMeta::Processed(format!("cucu processed"))); | ||||||
|     } |     } | ||||||
| } | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user