update created at when updating index

This commit is contained in:
mpostma
2021-03-08 10:20:53 +01:00
parent ced32afd9f
commit ac4d795eff

View File

@@ -94,11 +94,16 @@ pub enum IndexError {
Error(#[from] anyhow::Error), Error(#[from] anyhow::Error),
#[error("index already exists")] #[error("index already exists")]
IndexAlreadyExists, IndexAlreadyExists,
#[error("Index doesn't exists")]
UnexistingIndex,
} }
#[async_trait::async_trait] #[async_trait::async_trait]
trait IndexStore { trait IndexStore {
async fn create_index(&self, uuid: Uuid, primary_key: Option<String>) -> Result<IndexMeta>; async fn create_index(&self, uuid: Uuid, primary_key: Option<String>) -> Result<IndexMeta>;
async fn update_index<R, F>(&self, uuid: Uuid, f: F) -> Result<R>
where F: FnOnce(Index) -> Result<R> + Send + Sync + 'static,
R: Sync + Send + 'static;
async fn get_or_create(&self, uuid: Uuid) -> Result<Index>; async fn get_or_create(&self, uuid: Uuid) -> Result<Index>;
async fn get(&self, uuid: Uuid) -> Result<Option<Index>>; async fn get(&self, uuid: Uuid) -> Result<Option<Index>>;
async fn delete(&self, uuid: &Uuid) -> Result<Option<Index>>; async fn delete(&self, uuid: &Uuid) -> Result<Option<Index>>;
@@ -238,13 +243,16 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
) { ) {
info!("Processing update {}", meta.id()); info!("Processing update {}", meta.id());
let uuid = meta.index_uuid().clone(); let uuid = meta.index_uuid().clone();
let index = self.store.get_or_create(uuid).await.unwrap();
let update_handler = self.update_handler.clone(); let update_handler = self.update_handler.clone();
tokio::task::spawn_blocking(move || { let handle = self.store.update_index(uuid, |index| {
let handle = tokio::task::spawn_blocking(move || {
let result = update_handler.handle_update(meta, data, index); let result = update_handler.handle_update(meta, data, index);
let _ = ret.send(result); let _ = ret.send(result);
}) });
.await; Ok(handle)
});
handle.await;
} }
async fn handle_settings(&self, uuid: Uuid, ret: oneshot::Sender<Result<Settings>>) { async fn handle_settings(&self, uuid: Uuid, ret: oneshot::Sender<Result<Settings>>) {
@@ -432,10 +440,11 @@ impl IndexStore for MapIndexStore {
async fn create_index(&self, uuid: Uuid, primary_key: Option<String>) -> Result<IndexMeta> { async fn create_index(&self, uuid: Uuid, primary_key: Option<String>) -> Result<IndexMeta> {
let meta = match self.meta_store.write().await.entry(uuid.clone()) { let meta = match self.meta_store.write().await.entry(uuid.clone()) {
Entry::Vacant(entry) => { Entry::Vacant(entry) => {
let now = Utc::now();
let meta = IndexMeta{ let meta = IndexMeta{
uuid, uuid,
created_at: Utc::now(), created_at: now.clone(),
updated_at: Utc::now(), updated_at: now,
primary_key, primary_key,
}; };
entry.insert(meta).clone() entry.insert(meta).clone()
@@ -494,6 +503,23 @@ impl IndexStore for MapIndexStore {
async fn get_meta(&self, uuid: &Uuid) -> Result<Option<IndexMeta>> { async fn get_meta(&self, uuid: &Uuid) -> Result<Option<IndexMeta>> {
Ok(self.meta_store.read().await.get(uuid).cloned()) Ok(self.meta_store.read().await.get(uuid).cloned())
} }
async fn update_index<R, F>(&self, uuid: Uuid, f: F) -> Result<R>
where F: FnOnce(Index) -> Result<R> + Send + Sync + 'static,
R: Sync + Send + 'static,
{
let index = self.get_or_create(uuid.clone()).await?;
let mut meta = self.get_meta(&uuid).await?
.ok_or(IndexError::UnexistingIndex)?;
match f(index) {
Ok(r) => {
meta.updated_at = Utc::now();
self.meta_store.write().await.insert(uuid, meta);
Ok(r)
}
Err(e) => Err(e)
}
}
} }
impl MapIndexStore { impl MapIndexStore {