mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-25 04:56:28 +00:00 
			
		
		
		
	handle errors when sendign payload to actor
This commit is contained in:
		| @@ -55,9 +55,16 @@ impl IndexController { | |||||||
|         // registered and the update_actor that waits for the the payload to be sent to it. |         // registered and the update_actor that waits for the the payload to be sent to it. | ||||||
|         tokio::task::spawn_local(async move { |         tokio::task::spawn_local(async move { | ||||||
|             while let Some(bytes) = payload.next().await { |             while let Some(bytes) = payload.next().await { | ||||||
|                 sender.send(bytes.unwrap()).await; |                 match bytes { | ||||||
|  |                     Ok(bytes) => { sender.send(Ok(bytes)).await; }, | ||||||
|  |                     Err(e) => { | ||||||
|  |                         let error: Box<dyn std::error::Error + Sync + Send + 'static> = Box::new(e); | ||||||
|  |                         sender.send(Err(error)).await; }, | ||||||
|  |                 } | ||||||
|             } |             } | ||||||
|         }); |         }); | ||||||
|  |  | ||||||
|  |         // This must be done *AFTER* spawning the task. | ||||||
|         let status = self.update_handle.update(meta, receiver, uuid).await?; |         let status = self.update_handle.update(meta, receiver, uuid).await?; | ||||||
|         Ok(status) |         Ok(status) | ||||||
|     } |     } | ||||||
|   | |||||||
| @@ -10,13 +10,17 @@ use uuid::Uuid; | |||||||
| use tokio::fs::File; | use tokio::fs::File; | ||||||
| use tokio::io::AsyncWriteExt; | use tokio::io::AsyncWriteExt; | ||||||
|  |  | ||||||
| use crate::index_controller::{UpdateMeta, UpdateStatus, UpdateResult, updates::Pending}; | use crate::index_controller::{UpdateMeta, UpdateStatus, UpdateResult}; | ||||||
|  |  | ||||||
| pub type Result<T> = std::result::Result<T, UpdateError>; | pub type Result<T> = std::result::Result<T, UpdateError>; | ||||||
| type UpdateStore = super::update_store::UpdateStore<UpdateMeta, UpdateResult, String>; | type UpdateStore = super::update_store::UpdateStore<UpdateMeta, UpdateResult, String>; | ||||||
|  | type PayloadData<D> = std::result::Result<D, Box<dyn std::error::Error + Sync + Send + 'static>>; | ||||||
|  |  | ||||||
| #[derive(Debug, Error)] | #[derive(Debug, Error)] | ||||||
| pub enum UpdateError {} | pub enum UpdateError { | ||||||
|  |     #[error("error with update: {0}")] | ||||||
|  |     Error(Box<dyn std::error::Error + Sync + Send + 'static>), | ||||||
|  | } | ||||||
|  |  | ||||||
| enum UpdateMsg<D> { | enum UpdateMsg<D> { | ||||||
|     CreateIndex{ |     CreateIndex{ | ||||||
| @@ -26,7 +30,7 @@ enum UpdateMsg<D> { | |||||||
|     Update { |     Update { | ||||||
|         uuid: Uuid, |         uuid: Uuid, | ||||||
|         meta: UpdateMeta, |         meta: UpdateMeta, | ||||||
|         data: mpsc::Receiver<D>, |         data: mpsc::Receiver<PayloadData<D>>, | ||||||
|         ret: oneshot::Sender<Result<UpdateStatus>> |         ret: oneshot::Sender<Result<UpdateStatus>> | ||||||
|     } |     } | ||||||
| } | } | ||||||
| @@ -64,24 +68,35 @@ where D: AsRef<[u8]> + Sized + 'static, | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     async fn handle_update(&self, uuid: Uuid, meta: UpdateMeta, mut payload: mpsc::Receiver<D>, ret: oneshot::Sender<Result<UpdateStatus>>) { |     async fn handle_update(&self, uuid: Uuid, meta: UpdateMeta, mut payload: mpsc::Receiver<PayloadData<D>>, ret: oneshot::Sender<Result<UpdateStatus>>) { | ||||||
|         let store = self.store.clone(); |         let store = self.store.clone(); | ||||||
|         let update_file_id = uuid::Uuid::new_v4(); |         let update_file_id = uuid::Uuid::new_v4(); | ||||||
|         let path = self.path.join(format!("update_{}", update_file_id)); |         let path = self.path.join(format!("update_{}", update_file_id)); | ||||||
|         let mut file = File::create(&path).await.unwrap(); |         let mut file = File::create(&path).await.unwrap(); | ||||||
|  |  | ||||||
|         while let Some(bytes) = payload.recv().await { |         while let Some(bytes) = payload.recv().await { | ||||||
|             file.write_all(bytes.as_ref()).await; |             match bytes { | ||||||
|  |                 Ok(bytes) => { | ||||||
|  |                     file.write_all(bytes.as_ref()).await; | ||||||
|  |                 } | ||||||
|  |                 Err(e) => { | ||||||
|  |                     ret.send(Err(UpdateError::Error(e))); | ||||||
|  |                     return | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         file.flush().await; |         file.flush().await; | ||||||
|  |  | ||||||
|         let file = file.into_std().await; |         let file = file.into_std().await; | ||||||
|  |  | ||||||
|         let result = tokio::task::spawn_blocking(move || -> anyhow::Result<Pending<UpdateMeta>> { |         let result = tokio::task::spawn_blocking(move || { | ||||||
|             Ok(store.register_update(meta, path, uuid)?) |             let result = store | ||||||
|         }).await.unwrap().unwrap(); |                 .register_update(meta, path, uuid) | ||||||
|         let _ = ret.send(Ok(UpdateStatus::Pending(result))); |                 .map(|pending| UpdateStatus::Pending(pending)) | ||||||
|  |                 .map_err(|e| UpdateError::Error(Box::new(e))); | ||||||
|  |             let _ = ret.send(result); | ||||||
|  |         }).await; | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -110,7 +125,7 @@ where D: AsRef<[u8]> + Sized + 'static, | |||||||
|         Self { sender } |         Self { sender } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub async fn update(&self, meta: UpdateMeta, data: mpsc::Receiver<D>, uuid: Uuid) -> Result<UpdateStatus> { |     pub async fn update(&self, meta: UpdateMeta, data: mpsc::Receiver<PayloadData<D>>, uuid: Uuid) -> Result<UpdateStatus> { | ||||||
|         let (ret, receiver) = oneshot::channel(); |         let (ret, receiver) = oneshot::channel(); | ||||||
|         let msg = UpdateMsg::Update { |         let msg = UpdateMsg::Update { | ||||||
|             uuid, |             uuid, | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user