fixes after review

This commit is contained in:
Alexey Shekhirin
2021-04-08 16:35:28 +03:00
parent 698a1ea582
commit ae1655586c
9 changed files with 56 additions and 89 deletions

View File

@ -44,7 +44,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
read_receiver,
write_receiver,
update_handler,
processing: RwLock::new(Default::default()),
processing: RwLock::new(None),
store,
})
}
@ -183,23 +183,22 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
meta: Processing<UpdateMeta>,
data: File,
) -> Result<UpdateResult> {
let uuid = meta.index_uuid().clone();
*self.processing.write().await = Some(uuid);
let result = {
async fn get_result<S: IndexStore>(actor: &IndexActor<S>, meta: Processing<UpdateMeta>, data: File) -> Result<UpdateResult> {
debug!("Processing update {}", meta.id());
let update_handler = self.update_handler.clone();
let index = match self.store.get(uuid).await? {
let uuid = *meta.index_uuid();
let update_handler = actor.update_handler.clone();
let index = match actor.store.get(uuid).await? {
Some(index) => index,
None => self.store.create(uuid, None).await?,
None => actor.store.create(uuid, None).await?,
};
spawn_blocking(move || update_handler.handle_update(meta, data, index))
.await
.map_err(|e| IndexError::Error(e.into()))
};
}
*self.processing.write().await = Some(meta.index_uuid().clone());
let result = get_result(self, meta, data).await;
*self.processing.write().await = None;
result

View File

@ -72,9 +72,6 @@ where
Some(Snapshot { uuid, path, ret }) => {
let _ = ret.send(self.handle_snapshot(uuid, path).await);
}
Some(IsLocked { uuid, ret }) => {
let _ = ret.send(self.handle_is_locked(uuid).await);
}
None => break,
}
}
@ -226,14 +223,4 @@ where
Ok(())
}
async fn handle_is_locked(&self, uuid: Uuid) -> Result<bool> {
let store = self
.store
.get(uuid)
.await?
.ok_or(UpdateError::UnexistingIndex(uuid))?;
Ok(store.update_lock.is_locked())
}
}

View File

@ -95,11 +95,4 @@ where
let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.")
}
async fn is_locked(&self, uuid: Uuid) -> Result<bool> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::IsLocked { uuid, ret };
let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.")
}
}

View File

@ -34,8 +34,4 @@ pub enum UpdateMsg<D> {
path: PathBuf,
ret: oneshot::Sender<Result<()>>,
},
IsLocked {
uuid: Uuid,
ret: oneshot::Sender<Result<bool>>,
},
}

View File

@ -52,5 +52,4 @@ pub trait UpdateActorHandle {
data: mpsc::Receiver<PayloadData<Self::Data>>,
uuid: Uuid,
) -> Result<UpdateStatus>;
async fn is_locked(&self, uuid: Uuid) -> Result<bool>;
}