From 4eeeccb9cdf53c30a1e81d29662ba41721ccce26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 21 Oct 2020 13:52:15 +0200 Subject: [PATCH] Change the UpdateStore to have different processed and pending meta types --- src/subcommand/serve.rs | 2 +- src/update_store.rs | 51 +++++++++++++++++++++++++---------------- 2 files changed, 32 insertions(+), 21 deletions(-) diff --git a/src/subcommand/serve.rs b/src/subcommand/serve.rs index 851691083..a6970431c 100644 --- a/src/subcommand/serve.rs +++ b/src/subcommand/serve.rs @@ -359,7 +359,7 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { }); async fn buf_stream( - update_store: Arc>, + update_store: Arc>, update_status_sender: broadcast::Sender>, mut stream: impl futures::Stream> + Unpin, ) -> Result diff --git a/src/update_store.rs b/src/update_store.rs index 1b0fe4620..304d1a1e6 100644 --- a/src/update_store.rs +++ b/src/update_store.rs @@ -9,24 +9,25 @@ use serde::{Serialize, Deserialize}; use crate::BEU64; #[derive(Clone)] -pub struct UpdateStore { +pub struct UpdateStore { env: Env, pending_meta: Database, SerdeBincode>, pending: Database, ByteSlice>, - processed_meta: Database, SerdeBincode>, + processed_meta: Database, SerdeBincode>, notification_sender: Sender<()>, } -impl UpdateStore { +impl UpdateStore { pub fn open( mut options: EnvOpenOptions, path: P, mut update_function: F, - ) -> heed::Result>> + ) -> heed::Result>> where P: AsRef, - F: FnMut(u64, M, &[u8]) -> heed::Result + Send + 'static, - M: for<'a> Deserialize<'a> + Serialize, + F: FnMut(u64, M, &[u8]) -> heed::Result + Send + 'static, + M: for<'a> Deserialize<'a>, + N: Serialize, { options.max_dbs(3); let env = options.open(path)?; @@ -111,10 +112,11 @@ impl UpdateStore { /// Executes the user provided function on the next pending update (the one with the lowest id). /// This is asynchronous as it let the user process the update with a read-only txn and /// only writing the result meta to the processed-meta store *after* it has been processed. - fn process_pending_update(&self, mut f: F) -> heed::Result> + fn process_pending_update(&self, mut f: F) -> heed::Result> where - F: FnMut(u64, M, &[u8]) -> heed::Result, - M: for<'a> Deserialize<'a> + Serialize, + F: FnMut(u64, M, &[u8]) -> heed::Result, + M: for<'a> Deserialize<'a>, + N: Serialize, { // Create a read transaction to be able to retrieve the pending update in order. let rtxn = self.env.read_txn()?; @@ -152,8 +154,9 @@ impl UpdateStore { pub fn iter_metas(&self, mut f: F) -> heed::Result where M: for<'a> Deserialize<'a>, + N: for<'a> Deserialize<'a>, F: for<'a> FnMut( - heed::RoIter<'a, OwnedType, SerdeBincode>, + heed::RoIter<'a, OwnedType, SerdeBincode>, heed::RoIter<'a, OwnedType, SerdeBincode>, ) -> heed::Result, { @@ -168,23 +171,31 @@ impl UpdateStore { } /// Returns the update associated meta or `None` if the update deosn't exist. - pub fn meta(&self, update_id: u64) -> heed::Result> - where M: for<'a> Deserialize<'a>, + pub fn meta(&self, update_id: u64) -> heed::Result>> + where + M: for<'a> Deserialize<'a>, + N: for<'a> Deserialize<'a>, { let rtxn = self.env.read_txn()?; let key = BEU64::new(update_id); if let Some(meta) = self.pending_meta.get(&rtxn, &key)? { - return Ok(Some(meta)); + return Ok(Some(UpdateStatusMeta::Pending(meta))); } match self.processed_meta.get(&rtxn, &key)? { - Some(meta) => Ok(Some(meta)), + Some(meta) => Ok(Some(UpdateStatusMeta::Processed(meta))), None => Ok(None), } } } +#[derive(Debug, PartialEq, Eq, Hash)] +pub enum UpdateStatusMeta { + Pending(M), + Processed(N), +} + #[cfg(test)] mod tests { use super::*; @@ -195,7 +206,7 @@ mod tests { fn simple() { let dir = tempfile::tempdir().unwrap(); let options = EnvOpenOptions::new(); - let update_store = UpdateStore::open(options, dir, |id, meta: String, content| { + let update_store = UpdateStore::open(options, dir, |_id, meta: String, _content| { Ok(meta + " processed") }).unwrap(); @@ -205,14 +216,14 @@ mod tests { thread::sleep(Duration::from_millis(100)); let meta = update_store.meta(update_id).unwrap().unwrap(); - assert_eq!(meta, "kiki processed"); + assert_eq!(meta, UpdateStatusMeta::Processed(format!("kiki processed"))); } #[test] fn long_running_update() { let dir = tempfile::tempdir().unwrap(); let options = EnvOpenOptions::new(); - let update_store = UpdateStore::open(options, dir, |id, meta: String, content| { + let update_store = UpdateStore::open(options, dir, |_id, meta: String, _content| { thread::sleep(Duration::from_millis(400)); Ok(meta + " processed") }).unwrap(); @@ -234,12 +245,12 @@ mod tests { thread::sleep(Duration::from_millis(400 * 3 + 100)); let meta = update_store.meta(update_id_kiki).unwrap().unwrap(); - assert_eq!(meta, "kiki processed"); + assert_eq!(meta, UpdateStatusMeta::Processed(format!("kiki processed"))); let meta = update_store.meta(update_id_coco).unwrap().unwrap(); - assert_eq!(meta, "coco processed"); + assert_eq!(meta, UpdateStatusMeta::Processed(format!("coco processed"))); let meta = update_store.meta(update_id_cucu).unwrap().unwrap(); - assert_eq!(meta, "cucu processed"); + assert_eq!(meta, UpdateStatusMeta::Processed(format!("cucu processed"))); } }