mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-24 20:46:27 +00:00 
			
		
		
		
	create uuid on successful update addition
also change resolve to get in uuid resolver
This commit is contained in:
		
							
								
								
									
										2
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										2
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							| @@ -1786,7 +1786,7 @@ dependencies = [ | ||||
|  | ||||
| [[package]] | ||||
| name = "meilisearch-http" | ||||
| version = "0.21.0-alpha" | ||||
| version = "0.21.0-alpha.1" | ||||
| dependencies = [ | ||||
|  "actix-cors", | ||||
|  "actix-http 3.0.0-beta.4", | ||||
|   | ||||
| @@ -16,10 +16,12 @@ use milli::update::{IndexDocumentsMethod, UpdateFormat}; | ||||
| use serde::{Deserialize, Serialize}; | ||||
| use tokio::sync::mpsc; | ||||
| use tokio::time::sleep; | ||||
| use uuid::Uuid; | ||||
|  | ||||
| use crate::index::{Document, SearchQuery, SearchResult}; | ||||
| use crate::index::{Facets, Settings, UpdateResult}; | ||||
| pub use updates::{Failed, Processed, Processing}; | ||||
| use uuid_resolver::UuidError; | ||||
|  | ||||
| pub type UpdateStatus = updates::UpdateStatus<UpdateMeta, UpdateResult, String>; | ||||
|  | ||||
| @@ -83,7 +85,7 @@ impl IndexController { | ||||
|         mut payload: Payload, | ||||
|         primary_key: Option<String>, | ||||
|     ) -> anyhow::Result<UpdateStatus> { | ||||
|         let uuid = self.uuid_resolver.get_or_create(uid).await?; | ||||
|         let perform_udpate = |uuid| async move { | ||||
|             let meta = UpdateMeta::DocumentsAddition { | ||||
|                 method, | ||||
|                 format, | ||||
| @@ -109,12 +111,23 @@ impl IndexController { | ||||
|             }); | ||||
|  | ||||
|             // This must be done *AFTER* spawning the task. | ||||
|         let status = self.update_handle.update(meta, receiver, uuid).await?; | ||||
|             self.update_handle.update(meta, receiver, uuid).await | ||||
|         }; | ||||
|  | ||||
|         match self.uuid_resolver.get(uid).await { | ||||
|             Ok(uuid) => Ok(perform_udpate(uuid).await?), | ||||
|             Err(UuidError::UnexistingIndex(name)) => { | ||||
|                 let uuid = Uuid::new_v4(); | ||||
|                 let status = perform_udpate(uuid).await?; | ||||
|                 self.uuid_resolver.insert(name, uuid).await?; | ||||
|                 Ok(status) | ||||
|             } | ||||
|             Err(e) => Err(e.into()), | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub async fn clear_documents(&self, uid: String) -> anyhow::Result<UpdateStatus> { | ||||
|         let uuid = self.uuid_resolver.resolve(uid).await?; | ||||
|         let uuid = self.uuid_resolver.get(uid).await?; | ||||
|         let meta = UpdateMeta::ClearDocuments; | ||||
|         let (_, receiver) = mpsc::channel(1); | ||||
|         let status = self.update_handle.update(meta, receiver, uuid).await?; | ||||
| @@ -126,7 +139,7 @@ impl IndexController { | ||||
|         uid: String, | ||||
|         document_ids: Vec<String>, | ||||
|     ) -> anyhow::Result<UpdateStatus> { | ||||
|         let uuid = self.uuid_resolver.resolve(uid).await?; | ||||
|         let uuid = self.uuid_resolver.get(uid).await?; | ||||
|         let meta = UpdateMeta::DeleteDocuments; | ||||
|         let (sender, receiver) = mpsc::channel(10); | ||||
|  | ||||
| @@ -146,27 +159,24 @@ impl IndexController { | ||||
|         settings: Settings, | ||||
|         create: bool, | ||||
|     ) -> anyhow::Result<UpdateStatus> { | ||||
|         let uuid = if create { | ||||
|             let uuid = self.uuid_resolver.get_or_create(uid).await?; | ||||
|             // We need to create the index upfront, since it would otherwise only be created when | ||||
|             // the update is processed. This would make calls to GET index to fail until the update | ||||
|             // is complete. Since this is get or create, we ignore the error when the index already | ||||
|             // exists. | ||||
|             match self.index_handle.create_index(uuid, None).await { | ||||
|                 Ok(_) | Err(index_actor::IndexError::IndexAlreadyExists) => (), | ||||
|                 Err(e) => return Err(e.into()), | ||||
|             } | ||||
|             uuid | ||||
|         } else { | ||||
|             self.uuid_resolver.resolve(uid).await? | ||||
|         }; | ||||
|         let perform_udpate = |uuid| async move { | ||||
|             let meta = UpdateMeta::Settings(settings); | ||||
|             // Nothing so send, drop the sender right away, as not to block the update actor. | ||||
|             let (_, receiver) = mpsc::channel(1); | ||||
|             self.update_handle.update(meta, receiver, uuid).await | ||||
|         }; | ||||
|  | ||||
|         let status = self.update_handle.update(meta, receiver, uuid).await?; | ||||
|         match self.uuid_resolver.get(uid).await { | ||||
|             Ok(uuid) => Ok(perform_udpate(uuid).await?), | ||||
|             Err(UuidError::UnexistingIndex(name)) if create => { | ||||
|                 let uuid = Uuid::new_v4(); | ||||
|                 let status = perform_udpate(uuid).await?; | ||||
|                 self.uuid_resolver.insert(name, uuid).await?; | ||||
|                 Ok(status) | ||||
|             } | ||||
|             Err(e) => Err(e.into()), | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub async fn create_index( | ||||
|         &self, | ||||
| @@ -190,13 +200,13 @@ impl IndexController { | ||||
|     } | ||||
|  | ||||
|     pub async fn update_status(&self, uid: String, id: u64) -> anyhow::Result<UpdateStatus> { | ||||
|         let uuid = self.uuid_resolver.resolve(uid).await?; | ||||
|         let uuid = self.uuid_resolver.get(uid).await?; | ||||
|         let result = self.update_handle.update_status(uuid, id).await?; | ||||
|         Ok(result) | ||||
|     } | ||||
|  | ||||
|     pub async fn all_update_status(&self, uid: String) -> anyhow::Result<Vec<UpdateStatus>> { | ||||
|         let uuid = self.uuid_resolver.resolve(uid).await?; | ||||
|         let uuid = self.uuid_resolver.get(uid).await?; | ||||
|         let result = self.update_handle.get_all_updates_status(uuid).await?; | ||||
|         Ok(result) | ||||
|     } | ||||
| @@ -216,7 +226,7 @@ impl IndexController { | ||||
|     } | ||||
|  | ||||
|     pub async fn settings(&self, uid: String) -> anyhow::Result<Settings> { | ||||
|         let uuid = self.uuid_resolver.resolve(uid.clone()).await?; | ||||
|         let uuid = self.uuid_resolver.get(uid.clone()).await?; | ||||
|         let settings = self.index_handle.settings(uuid).await?; | ||||
|         Ok(settings) | ||||
|     } | ||||
| @@ -228,7 +238,7 @@ impl IndexController { | ||||
|         limit: usize, | ||||
|         attributes_to_retrieve: Option<Vec<String>>, | ||||
|     ) -> anyhow::Result<Vec<Document>> { | ||||
|         let uuid = self.uuid_resolver.resolve(uid.clone()).await?; | ||||
|         let uuid = self.uuid_resolver.get(uid.clone()).await?; | ||||
|         let documents = self | ||||
|             .index_handle | ||||
|             .documents(uuid, offset, limit, attributes_to_retrieve) | ||||
| @@ -242,7 +252,7 @@ impl IndexController { | ||||
|         doc_id: String, | ||||
|         attributes_to_retrieve: Option<Vec<String>>, | ||||
|     ) -> anyhow::Result<Document> { | ||||
|         let uuid = self.uuid_resolver.resolve(uid.clone()).await?; | ||||
|         let uuid = self.uuid_resolver.get(uid.clone()).await?; | ||||
|         let document = self | ||||
|             .index_handle | ||||
|             .document(uuid, doc_id, attributes_to_retrieve) | ||||
| @@ -259,20 +269,20 @@ impl IndexController { | ||||
|             bail!("Can't change the index uid.") | ||||
|         } | ||||
|  | ||||
|         let uuid = self.uuid_resolver.resolve(uid.clone()).await?; | ||||
|         let uuid = self.uuid_resolver.get(uid.clone()).await?; | ||||
|         let meta = self.index_handle.update_index(uuid, index_settings).await?; | ||||
|         let meta = IndexMetadata { name: uid.clone(), uid, meta }; | ||||
|         Ok(meta) | ||||
|     } | ||||
|  | ||||
|     pub async fn search(&self, uid: String, query: SearchQuery) -> anyhow::Result<SearchResult> { | ||||
|         let uuid = self.uuid_resolver.resolve(uid).await?; | ||||
|         let uuid = self.uuid_resolver.get(uid).await?; | ||||
|         let result = self.index_handle.search(uuid, query).await?; | ||||
|         Ok(result) | ||||
|     } | ||||
|  | ||||
|     pub async fn get_index(&self, uid: String) -> anyhow::Result<IndexMetadata> { | ||||
|         let uuid = self.uuid_resolver.resolve(uid.clone()).await?; | ||||
|         let uuid = self.uuid_resolver.get(uid.clone()).await?; | ||||
|         let meta = self.index_handle.get_index_meta(uuid).await?; | ||||
|         let meta = IndexMetadata { name: uid.clone(), uid, meta }; | ||||
|         Ok(meta) | ||||
|   | ||||
| @@ -13,11 +13,7 @@ pub type Result<T> = std::result::Result<T, UuidError>; | ||||
|  | ||||
| #[derive(Debug)] | ||||
| enum UuidResolveMsg { | ||||
|     Resolve { | ||||
|         uid: String, | ||||
|         ret: oneshot::Sender<Result<Uuid>>, | ||||
|     }, | ||||
|     GetOrCreate { | ||||
|     Get { | ||||
|         uid: String, | ||||
|         ret: oneshot::Sender<Result<Uuid>>, | ||||
|     }, | ||||
| @@ -32,6 +28,11 @@ enum UuidResolveMsg { | ||||
|     List { | ||||
|         ret: oneshot::Sender<Result<Vec<(String, Uuid)>>>, | ||||
|     }, | ||||
|     Insert { | ||||
|         uuid: Uuid, | ||||
|         name: String, | ||||
|         ret: oneshot::Sender<Result<()>>, | ||||
|     } | ||||
| } | ||||
|  | ||||
| struct UuidResolverActor<S> { | ||||
| @@ -54,11 +55,8 @@ impl<S: UuidStore> UuidResolverActor<S> { | ||||
|                 Some(Create { uid: name, ret }) => { | ||||
|                     let _ = ret.send(self.handle_create(name).await); | ||||
|                 } | ||||
|                 Some(GetOrCreate { uid: name, ret }) => { | ||||
|                     let _ = ret.send(self.handle_get_or_create(name).await); | ||||
|                 } | ||||
|                 Some(Resolve { uid: name, ret }) => { | ||||
|                     let _ = ret.send(self.handle_resolve(name).await); | ||||
|                 Some(Get { uid: name, ret }) => { | ||||
|                     let _ = ret.send(self.handle_get(name).await); | ||||
|                 } | ||||
|                 Some(Delete { uid: name, ret }) => { | ||||
|                     let _ = ret.send(self.handle_delete(name).await); | ||||
| @@ -66,6 +64,9 @@ impl<S: UuidStore> UuidResolverActor<S> { | ||||
|                 Some(List { ret }) => { | ||||
|                     let _ = ret.send(self.handle_list().await); | ||||
|                 } | ||||
|                 Some(Insert { ret, uuid, name }) => { | ||||
|                     let _ = ret.send(self.handle_insert(name, uuid).await); | ||||
|                 } | ||||
|                 // all senders have been dropped, need to quit. | ||||
|                 None => break, | ||||
|             } | ||||
| @@ -81,14 +82,7 @@ impl<S: UuidStore> UuidResolverActor<S> { | ||||
|         self.store.create_uuid(uid, true).await | ||||
|     } | ||||
|  | ||||
|     async fn handle_get_or_create(&self, uid: String) -> Result<Uuid> { | ||||
|         if !is_index_uid_valid(&uid) { | ||||
|             return Err(UuidError::BadlyFormatted(uid)); | ||||
|         } | ||||
|         self.store.create_uuid(uid, false).await | ||||
|     } | ||||
|  | ||||
|     async fn handle_resolve(&self, uid: String) -> Result<Uuid> { | ||||
|     async fn handle_get(&self, uid: String) -> Result<Uuid> { | ||||
|         self.store | ||||
|             .get_uuid(uid.clone()) | ||||
|             .await? | ||||
| @@ -106,6 +100,14 @@ impl<S: UuidStore> UuidResolverActor<S> { | ||||
|         let result = self.store.list().await?; | ||||
|         Ok(result) | ||||
|     } | ||||
|  | ||||
|     async fn handle_insert(&self, uid: String, uuid: Uuid) -> Result<()> { | ||||
|         if !is_index_uid_valid(&uid) { | ||||
|             return Err(UuidError::BadlyFormatted(uid)); | ||||
|         } | ||||
|         self.store.insert(uid, uuid).await?; | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| fn is_index_uid_valid(uid: &str) -> bool { | ||||
| @@ -127,18 +129,9 @@ impl UuidResolverHandle { | ||||
|         Ok(Self { sender }) | ||||
|     } | ||||
|  | ||||
|     pub async fn resolve(&self, name: String) -> anyhow::Result<Uuid> { | ||||
|     pub async fn get(&self, name: String) -> Result<Uuid> { | ||||
|         let (ret, receiver) = oneshot::channel(); | ||||
|         let msg = UuidResolveMsg::Resolve { uid: name, ret }; | ||||
|         let _ = self.sender.send(msg).await; | ||||
|         Ok(receiver | ||||
|             .await | ||||
|             .expect("Uuid resolver actor has been killed")?) | ||||
|     } | ||||
|  | ||||
|     pub async fn get_or_create(&self, name: String) -> Result<Uuid> { | ||||
|         let (ret, receiver) = oneshot::channel(); | ||||
|         let msg = UuidResolveMsg::GetOrCreate { uid: name, ret }; | ||||
|         let msg = UuidResolveMsg::Get { uid: name, ret }; | ||||
|         let _ = self.sender.send(msg).await; | ||||
|         Ok(receiver | ||||
|             .await | ||||
| @@ -171,6 +164,15 @@ impl UuidResolverHandle { | ||||
|             .await | ||||
|             .expect("Uuid resolver actor has been killed")?) | ||||
|     } | ||||
|  | ||||
|     pub async fn insert(&self, name: String, uuid: Uuid) -> anyhow::Result<()> { | ||||
|         let (ret, receiver) = oneshot::channel(); | ||||
|         let msg = UuidResolveMsg::Insert { ret, name, uuid }; | ||||
|         let _ = self.sender.send(msg).await; | ||||
|         Ok(receiver | ||||
|             .await | ||||
|             .expect("Uuid resolver actor has been killed")?) | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[derive(Debug, Error)] | ||||
| @@ -197,6 +199,7 @@ trait UuidStore { | ||||
|     async fn get_uuid(&self, uid: String) -> Result<Option<Uuid>>; | ||||
|     async fn delete(&self, uid: String) -> Result<Option<Uuid>>; | ||||
|     async fn list(&self) -> Result<Vec<(String, Uuid)>>; | ||||
|     async fn insert(&self, name: String, uuid: Uuid) -> Result<()>; | ||||
| } | ||||
|  | ||||
| struct HeedUuidStore { | ||||
| @@ -292,4 +295,16 @@ impl UuidStore for HeedUuidStore { | ||||
|         }) | ||||
|         .await? | ||||
|     } | ||||
|  | ||||
|     async fn insert(&self, name: String, uuid: Uuid) -> Result<()> { | ||||
|         let env = self.env.clone(); | ||||
|         let db = self.db; | ||||
|         tokio::task::spawn_blocking(move || { | ||||
|             let mut txn = env.write_txn()?; | ||||
|             db.put(&mut txn, &name, uuid.as_bytes())?; | ||||
|             txn.commit()?; | ||||
|             Ok(()) | ||||
|         }) | ||||
|         .await? | ||||
|     } | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user