mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-25 13:06:27 +00:00 
			
		
		
		
	implement index mock
This commit is contained in:
		| @@ -13,7 +13,7 @@ use crate::index::update_handler::UpdateHandler; | ||||
| use crate::index::updates::apply_settings_to_builder; | ||||
|  | ||||
| use super::error::Result; | ||||
| use super::{Index, Settings, Unchecked}; | ||||
| use super::{index::Index, Settings, Unchecked}; | ||||
|  | ||||
| #[derive(Serialize, Deserialize)] | ||||
| struct DumpMeta { | ||||
|   | ||||
| @@ -1,287 +1,294 @@ | ||||
| use std::collections::{BTreeSet, HashSet}; | ||||
| use std::fs::create_dir_all; | ||||
| use std::marker::PhantomData; | ||||
| use std::ops::Deref; | ||||
| use std::path::Path; | ||||
| use std::sync::Arc; | ||||
|  | ||||
| use chrono::{DateTime, Utc}; | ||||
| use heed::{EnvOpenOptions, RoTxn}; | ||||
| use milli::update::Setting; | ||||
| use milli::{obkv_to_json, FieldDistribution, FieldId}; | ||||
| use serde::{Deserialize, Serialize}; | ||||
| use serde_json::{Map, Value}; | ||||
|  | ||||
| use error::Result; | ||||
| pub use search::{default_crop_length, SearchQuery, SearchResult, DEFAULT_SEARCH_LIMIT}; | ||||
| pub use updates::{apply_settings_to_builder, Checked, Facets, Settings, Unchecked}; | ||||
| use uuid::Uuid; | ||||
|  | ||||
| use crate::index_controller::update_file_store::UpdateFileStore; | ||||
| use crate::EnvSizer; | ||||
|  | ||||
| use self::error::IndexError; | ||||
| use self::update_handler::UpdateHandler; | ||||
|  | ||||
| pub mod error; | ||||
| pub mod update_handler; | ||||
|  | ||||
| mod dump; | ||||
| mod search; | ||||
| mod updates; | ||||
| mod index; | ||||
|  | ||||
| pub type Document = Map<String, Value>; | ||||
| pub use index::{Document, IndexMeta, IndexStats}; | ||||
|  | ||||
| #[derive(Debug, Serialize, Deserialize, Clone)] | ||||
| #[serde(rename_all = "camelCase")] | ||||
| pub struct IndexMeta { | ||||
|     created_at: DateTime<Utc>, | ||||
|     pub updated_at: DateTime<Utc>, | ||||
|     pub primary_key: Option<String>, | ||||
| } | ||||
| #[cfg(not(test))] | ||||
| pub use index::Index; | ||||
|  | ||||
| #[derive(Serialize, Debug)] | ||||
| #[serde(rename_all = "camelCase")] | ||||
| pub struct IndexStats { | ||||
|     #[serde(skip)] | ||||
|     pub size: u64, | ||||
|     pub number_of_documents: u64, | ||||
|     /// Whether the current index is performing an update. It is initially `None` when the | ||||
|     /// index returns it, since it is the `UpdateStore` that knows what index is currently indexing. It is | ||||
|     /// later set to either true or false, we we retrieve the information from the `UpdateStore` | ||||
|     pub is_indexing: Option<bool>, | ||||
|     pub field_distribution: FieldDistribution, | ||||
| } | ||||
| #[cfg(test)] | ||||
| pub use test::MockIndex as Index; | ||||
|  | ||||
| impl IndexMeta { | ||||
|     pub fn new(index: &Index) -> Result<Self> { | ||||
|         let txn = index.read_txn()?; | ||||
|         Self::new_txn(index, &txn) | ||||
| #[cfg(test)] | ||||
| mod test { | ||||
|     use std::any::Any; | ||||
|     use std::collections::HashMap; | ||||
|     use std::path::PathBuf; | ||||
|     use std::sync::Mutex; | ||||
|     use std::{path::Path, sync::Arc}; | ||||
|  | ||||
|     use serde_json::{Map, Value}; | ||||
|     use uuid::Uuid; | ||||
|  | ||||
|     use crate::index_controller::update_file_store::UpdateFileStore; | ||||
|     use crate::index_controller::updates::status::{Failed, Processed, Processing}; | ||||
|  | ||||
|     use super::{Checked, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings}; | ||||
|     use super::index::Index; | ||||
|     use super::error::Result; | ||||
|     use super::update_handler::UpdateHandler; | ||||
|  | ||||
|     #[derive(Debug, Clone)] | ||||
|     pub enum MockIndex { | ||||
|         Vrai(Index), | ||||
|         Faux(Arc<FauxIndex>), | ||||
|     } | ||||
|  | ||||
|     fn new_txn(index: &Index, txn: &heed::RoTxn) -> Result<Self> { | ||||
|         let created_at = index.created_at(txn)?; | ||||
|         let updated_at = index.updated_at(txn)?; | ||||
|         let primary_key = index.primary_key(txn)?.map(String::from); | ||||
|         Ok(Self { | ||||
|             created_at, | ||||
|             updated_at, | ||||
|             primary_key, | ||||
|         }) | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[derive(Clone, derivative::Derivative)] | ||||
| #[derivative(Debug)] | ||||
| pub struct Index { | ||||
|     pub uuid: Uuid, | ||||
|     #[derivative(Debug = "ignore")] | ||||
|     pub inner: Arc<milli::Index>, | ||||
|     #[derivative(Debug = "ignore")] | ||||
|     update_file_store: Arc<UpdateFileStore>, | ||||
|     #[derivative(Debug = "ignore")] | ||||
|     update_handler: Arc<UpdateHandler>, | ||||
| } | ||||
|  | ||||
| impl Deref for Index { | ||||
|     type Target = milli::Index; | ||||
|  | ||||
|     fn deref(&self) -> &Self::Target { | ||||
|         self.inner.as_ref() | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl Index { | ||||
|     pub fn open( | ||||
|         path: impl AsRef<Path>, | ||||
|         size: usize, | ||||
|         update_file_store: Arc<UpdateFileStore>, | ||||
|         uuid: Uuid, | ||||
|         update_handler: Arc<UpdateHandler>, | ||||
|     ) -> Result<Self> { | ||||
|         create_dir_all(&path)?; | ||||
|         let mut options = EnvOpenOptions::new(); | ||||
|         options.map_size(size); | ||||
|         let inner = Arc::new(milli::Index::new(options, &path)?); | ||||
|         Ok(Index { | ||||
|             inner, | ||||
|             update_file_store, | ||||
|             uuid, | ||||
|             update_handler, | ||||
|         }) | ||||
|     pub struct Stub<A, R> { | ||||
|         name: String, | ||||
|         times: Option<usize>, | ||||
|         stub: Box<dyn Fn(A) -> R + Sync + Send>, | ||||
|         exact: bool, | ||||
|     } | ||||
|  | ||||
|     pub fn stats(&self) -> Result<IndexStats> { | ||||
|         let rtxn = self.read_txn()?; | ||||
|  | ||||
|         Ok(IndexStats { | ||||
|             size: self.size(), | ||||
|             number_of_documents: self.number_of_documents(&rtxn)?, | ||||
|             is_indexing: None, | ||||
|             field_distribution: self.field_distribution(&rtxn)?, | ||||
|         }) | ||||
|     impl<A, R> Drop for Stub<A, R> { | ||||
|         fn drop(&mut self) { | ||||
|             if self.exact { | ||||
|                 if !matches!(self.times, Some(0)) { | ||||
|                     panic!("{} not called the correct amount of times", self.name); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn meta(&self) -> Result<IndexMeta> { | ||||
|         IndexMeta::new(self) | ||||
|     } | ||||
|     pub fn settings(&self) -> Result<Settings<Checked>> { | ||||
|         let txn = self.read_txn()?; | ||||
|         self.settings_txn(&txn) | ||||
|     impl<A, R> Stub<A, R> { | ||||
|         fn call(&mut self, args: A) -> R { | ||||
|             match self.times { | ||||
|                 Some(0) => panic!("{} called to many times", self.name), | ||||
|                 Some(ref mut times) => { *times -= 1; }, | ||||
|                 None => (), | ||||
|             } | ||||
|  | ||||
|             (self.stub)(args) | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn settings_txn(&self, txn: &RoTxn) -> Result<Settings<Checked>> { | ||||
|         let displayed_attributes = self | ||||
|             .displayed_fields(txn)? | ||||
|             .map(|fields| fields.into_iter().map(String::from).collect()); | ||||
|  | ||||
|         let searchable_attributes = self | ||||
|             .searchable_fields(txn)? | ||||
|             .map(|fields| fields.into_iter().map(String::from).collect()); | ||||
|  | ||||
|         let filterable_attributes = self.filterable_fields(txn)?.into_iter().collect(); | ||||
|  | ||||
|         let sortable_attributes = self.sortable_fields(txn)?.into_iter().collect(); | ||||
|  | ||||
|         let criteria = self | ||||
|             .criteria(txn)? | ||||
|             .into_iter() | ||||
|             .map(|c| c.to_string()) | ||||
|             .collect(); | ||||
|  | ||||
|         let stop_words = self | ||||
|             .stop_words(txn)? | ||||
|             .map(|stop_words| -> Result<BTreeSet<_>> { | ||||
|                 Ok(stop_words.stream().into_strs()?.into_iter().collect()) | ||||
|             }) | ||||
|             .transpose()? | ||||
|             .unwrap_or_else(BTreeSet::new); | ||||
|         let distinct_field = self.distinct_field(txn)?.map(String::from); | ||||
|  | ||||
|         // in milli each word in the synonyms map were split on their separator. Since we lost | ||||
|         // this information we are going to put space between words. | ||||
|         let synonyms = self | ||||
|             .synonyms(txn)? | ||||
|             .iter() | ||||
|             .map(|(key, values)| { | ||||
|                 ( | ||||
|                     key.join(" "), | ||||
|                     values.iter().map(|value| value.join(" ")).collect(), | ||||
|                 ) | ||||
|             }) | ||||
|             .collect(); | ||||
|  | ||||
|         Ok(Settings { | ||||
|             displayed_attributes: match displayed_attributes { | ||||
|                 Some(attrs) => Setting::Set(attrs), | ||||
|                 None => Setting::Reset, | ||||
|             }, | ||||
|             searchable_attributes: match searchable_attributes { | ||||
|                 Some(attrs) => Setting::Set(attrs), | ||||
|                 None => Setting::Reset, | ||||
|             }, | ||||
|             filterable_attributes: Setting::Set(filterable_attributes), | ||||
|             sortable_attributes: Setting::Set(sortable_attributes), | ||||
|             ranking_rules: Setting::Set(criteria), | ||||
|             stop_words: Setting::Set(stop_words), | ||||
|             distinct_attribute: match distinct_field { | ||||
|                 Some(field) => Setting::Set(field), | ||||
|                 None => Setting::Reset, | ||||
|             }, | ||||
|             synonyms: Setting::Set(synonyms), | ||||
|             _kind: PhantomData, | ||||
|         }) | ||||
|     #[derive(Debug, Default)] | ||||
|     struct StubStore { | ||||
|         inner: Arc<Mutex<HashMap<String, Box<dyn Any + Sync + Send>>>> | ||||
|     } | ||||
|  | ||||
|     pub fn retrieve_documents<S: AsRef<str>>( | ||||
|         &self, | ||||
|         offset: usize, | ||||
|         limit: usize, | ||||
|         attributes_to_retrieve: Option<Vec<S>>, | ||||
|     ) -> Result<Vec<Map<String, Value>>> { | ||||
|         let txn = self.read_txn()?; | ||||
|     #[derive(Debug, Default)] | ||||
|     pub struct FauxIndex { | ||||
|         store: StubStore, | ||||
|     } | ||||
|  | ||||
|         let fields_ids_map = self.fields_ids_map(&txn)?; | ||||
|         let fields_to_display = | ||||
|             self.fields_to_display(&txn, &attributes_to_retrieve, &fields_ids_map)?; | ||||
|  | ||||
|         let iter = self.documents.range(&txn, &(..))?.skip(offset).take(limit); | ||||
|  | ||||
|         let mut documents = Vec::new(); | ||||
|  | ||||
|         for entry in iter { | ||||
|             let (_id, obkv) = entry?; | ||||
|             let object = obkv_to_json(&fields_to_display, &fields_ids_map, obkv)?; | ||||
|             documents.push(object); | ||||
|     impl StubStore { | ||||
|         pub fn insert<A: 'static, R: 'static>(&self, name: String, stub: Stub<A, R>) { | ||||
|             let mut lock = self.inner.lock().unwrap(); | ||||
|             lock.insert(name, Box::new(stub)); | ||||
|         } | ||||
|  | ||||
|         Ok(documents) | ||||
|         pub fn get_mut<A, B>(&self, name: &str) -> Option<&mut Stub<A, B>> { | ||||
|             let mut lock = self.inner.lock().unwrap(); | ||||
|             match lock.get_mut(name) { | ||||
|                 Some(s) => { | ||||
|                     let s = s.as_mut() as *mut dyn Any as *mut Stub<A, B>; | ||||
|                     Some(unsafe { &mut *s }) | ||||
|                 } | ||||
|                 None => None, | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn retrieve_document<S: AsRef<str>>( | ||||
|         &self, | ||||
|         doc_id: String, | ||||
|         attributes_to_retrieve: Option<Vec<S>>, | ||||
|     ) -> Result<Map<String, Value>> { | ||||
|         let txn = self.read_txn()?; | ||||
|  | ||||
|         let fields_ids_map = self.fields_ids_map(&txn)?; | ||||
|  | ||||
|         let fields_to_display = | ||||
|             self.fields_to_display(&txn, &attributes_to_retrieve, &fields_ids_map)?; | ||||
|  | ||||
|         let internal_id = self | ||||
|             .external_documents_ids(&txn)? | ||||
|             .get(doc_id.as_bytes()) | ||||
|             .ok_or_else(|| IndexError::DocumentNotFound(doc_id.clone()))?; | ||||
|  | ||||
|         let document = self | ||||
|             .documents(&txn, std::iter::once(internal_id))? | ||||
|             .into_iter() | ||||
|             .next() | ||||
|             .map(|(_, d)| d) | ||||
|             .ok_or(IndexError::DocumentNotFound(doc_id))?; | ||||
|  | ||||
|         let document = obkv_to_json(&fields_to_display, &fields_ids_map, document)?; | ||||
|  | ||||
|         Ok(document) | ||||
|     pub struct StubBuilder<'a> { | ||||
|         name: String, | ||||
|         store: &'a StubStore, | ||||
|         times: Option<usize>, | ||||
|         exact: bool, | ||||
|     } | ||||
|  | ||||
|     pub fn size(&self) -> u64 { | ||||
|         self.env.size() | ||||
|     impl<'a> StubBuilder<'a> { | ||||
|        #[must_use] | ||||
|         pub fn times(mut self, times: usize) -> Self { | ||||
|             self.times = Some(times); | ||||
|             self | ||||
|         } | ||||
|  | ||||
|        #[must_use] | ||||
|         pub fn exact(mut self, times: usize) -> Self { | ||||
|             self.times = Some(times); | ||||
|             self.exact = true; | ||||
|             self | ||||
|         } | ||||
|  | ||||
|         pub fn then<A: 'static, R: 'static>(self, f: impl Fn(A) -> R + Sync + Send + 'static) { | ||||
|             let stub = Stub { | ||||
|                 stub: Box::new(f), | ||||
|                 times: self.times, | ||||
|                 exact: self.exact, | ||||
|                 name: self.name.clone(), | ||||
|             }; | ||||
|  | ||||
|             self.store.insert(self.name, stub); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn fields_to_display<S: AsRef<str>>( | ||||
|         &self, | ||||
|         txn: &heed::RoTxn, | ||||
|         attributes_to_retrieve: &Option<Vec<S>>, | ||||
|         fields_ids_map: &milli::FieldsIdsMap, | ||||
|     ) -> Result<Vec<FieldId>> { | ||||
|         let mut displayed_fields_ids = match self.displayed_fields_ids(txn)? { | ||||
|             Some(ids) => ids.into_iter().collect::<Vec<_>>(), | ||||
|             None => fields_ids_map.iter().map(|(id, _)| id).collect(), | ||||
|         }; | ||||
|     impl FauxIndex { | ||||
|         pub fn when(&self, name: &str) -> StubBuilder { | ||||
|             StubBuilder { | ||||
|                 name: name.to_string(), | ||||
|                 store: &self.store, | ||||
|                 times: None, | ||||
|                 exact: false, | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         let attributes_to_retrieve_ids = match attributes_to_retrieve { | ||||
|             Some(attrs) => attrs | ||||
|                 .iter() | ||||
|                 .filter_map(|f| fields_ids_map.id(f.as_ref())) | ||||
|                 .collect::<HashSet<_>>(), | ||||
|             None => fields_ids_map.iter().map(|(id, _)| id).collect(), | ||||
|         }; | ||||
|  | ||||
|         displayed_fields_ids.retain(|fid| attributes_to_retrieve_ids.contains(fid)); | ||||
|         Ok(displayed_fields_ids) | ||||
|         pub fn get<'a, A, R>(&'a self, name: &str) -> &'a mut Stub<A, R> { | ||||
|             match self.store.get_mut(name) { | ||||
|                 Some(stub) => stub, | ||||
|                 None => panic!("unexpected call to {}", name), | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn snapshot(&self, path: impl AsRef<Path>) -> Result<()> { | ||||
|         let mut dst = path.as_ref().join(format!("indexes/{}/", self.uuid)); | ||||
|         create_dir_all(&dst)?; | ||||
|         dst.push("data.mdb"); | ||||
|         let _txn = self.write_txn()?; | ||||
|         self.inner | ||||
|             .env | ||||
|             .copy_to_path(dst, heed::CompactionOption::Enabled)?; | ||||
|         Ok(()) | ||||
|     impl MockIndex { | ||||
|         pub fn faux(faux: FauxIndex) -> Self { | ||||
|             Self::Faux(Arc::new(faux)) | ||||
|         } | ||||
|  | ||||
|         pub fn open( | ||||
|             path: impl AsRef<Path>, | ||||
|             size: usize, | ||||
|             update_file_store: Arc<UpdateFileStore>, | ||||
|             uuid: Uuid, | ||||
|             update_handler: Arc<UpdateHandler>, | ||||
|         ) -> Result<Self> { | ||||
|             let index = Index::open(path, size, update_file_store, uuid, update_handler)?; | ||||
|             Ok(Self::Vrai(index)) | ||||
|         } | ||||
|  | ||||
|         pub fn load_dump( | ||||
|             src: impl AsRef<Path>, | ||||
|             dst: impl AsRef<Path>, | ||||
|             size: usize, | ||||
|             update_handler: &UpdateHandler, | ||||
|         ) -> anyhow::Result<()> { | ||||
|             Index::load_dump(src, dst, size, update_handler)?; | ||||
|             Ok(()) | ||||
|         } | ||||
|  | ||||
|         pub fn handle_update(&self, update: Processing) -> std::result::Result<Processed, Failed> { | ||||
|             match self { | ||||
|                 MockIndex::Vrai(index) => index.handle_update(update), | ||||
|                 MockIndex::Faux(_) => todo!(), | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         pub fn uuid(&self) -> Uuid { | ||||
|             match self { | ||||
|                 MockIndex::Vrai(index) => index.uuid(), | ||||
|                 MockIndex::Faux(_) => todo!(), | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         pub fn stats(&self) -> Result<IndexStats> { | ||||
|             match self { | ||||
|                 MockIndex::Vrai(index) => index.stats(), | ||||
|                 MockIndex::Faux(_) => todo!(), | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         pub fn meta(&self) -> Result<IndexMeta> { | ||||
|             match self { | ||||
|                 MockIndex::Vrai(index) => index.meta(), | ||||
|                 MockIndex::Faux(_) => todo!(), | ||||
|             } | ||||
|         } | ||||
|         pub fn settings(&self) -> Result<Settings<Checked>> { | ||||
|             match self { | ||||
|                 MockIndex::Vrai(index) => index.settings(), | ||||
|                 MockIndex::Faux(_) => todo!(), | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         pub fn retrieve_documents<S: AsRef<str>>( | ||||
|             &self, | ||||
|             offset: usize, | ||||
|             limit: usize, | ||||
|             attributes_to_retrieve: Option<Vec<S>>, | ||||
|         ) -> Result<Vec<Map<String, Value>>> { | ||||
|             match self { | ||||
|                 MockIndex::Vrai(index) => index.retrieve_documents(offset, limit, attributes_to_retrieve), | ||||
|                 MockIndex::Faux(_) => todo!(), | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         pub fn retrieve_document<S: AsRef<str>>( | ||||
|             &self, | ||||
|             doc_id: String, | ||||
|             attributes_to_retrieve: Option<Vec<S>>, | ||||
|         ) -> Result<Map<String, Value>> { | ||||
|             match self { | ||||
|                 MockIndex::Vrai(index) => index.retrieve_document(doc_id, attributes_to_retrieve), | ||||
|                 MockIndex::Faux(_) => todo!(), | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         pub fn size(&self) -> u64 { | ||||
|             match self { | ||||
|                 MockIndex::Vrai(index) => index.size(), | ||||
|                 MockIndex::Faux(_) => todo!(), | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         pub fn snapshot(&self, path: impl AsRef<Path>) -> Result<()> { | ||||
|             match self { | ||||
|                 MockIndex::Vrai(index) => index.snapshot(path), | ||||
|                 MockIndex::Faux(faux) => faux.get("snapshot").call(path.as_ref()) | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         pub fn inner(&self) -> &milli::Index { | ||||
|             match self { | ||||
|                 MockIndex::Vrai(index) => index.inner(), | ||||
|                 MockIndex::Faux(_) => todo!(), | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         pub fn update_primary_key(&self, primary_key: Option<String>) -> Result<IndexMeta> { | ||||
|             match self { | ||||
|                 MockIndex::Vrai(index) => index.update_primary_key(primary_key), | ||||
|                 MockIndex::Faux(_) => todo!(), | ||||
|             } | ||||
|         } | ||||
|         pub fn perform_search(&self, query: SearchQuery) -> Result<SearchResult> { | ||||
|             match self { | ||||
|                 MockIndex::Vrai(index) => index.perform_search(query), | ||||
|                 MockIndex::Faux(_) => todo!(), | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         pub fn dump(&self, path: impl AsRef<Path>) -> Result<()> { | ||||
|             match self { | ||||
|                 MockIndex::Vrai(index) => index.dump(path), | ||||
|                 MockIndex::Faux(_) => todo!(), | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn test_faux_index() { | ||||
|         let faux = FauxIndex::default(); | ||||
|         faux | ||||
|             .when("snapshot") | ||||
|             .exact(2) | ||||
|             .then(|path: &Path| -> Result<()> { | ||||
|                 println!("path: {}", path.display()); | ||||
|                 Ok(()) | ||||
|             }); | ||||
|  | ||||
|         let index = MockIndex::faux(faux); | ||||
|  | ||||
|         let path = PathBuf::from("hello"); | ||||
|         index.snapshot(&path).unwrap(); | ||||
|         index.snapshot(&path).unwrap(); | ||||
|     } | ||||
| } | ||||
|   | ||||
| @@ -12,10 +12,9 @@ use serde::{Deserialize, Serialize}; | ||||
| use serde_json::{json, Value}; | ||||
|  | ||||
| use crate::index::error::FacetError; | ||||
| use crate::index::IndexError; | ||||
|  | ||||
| use super::error::Result; | ||||
| use super::Index; | ||||
| use super::error::{Result, IndexError}; | ||||
| use super::index::Index; | ||||
|  | ||||
| pub type Document = IndexMap<String, Value>; | ||||
| type MatchesInfo = BTreeMap<String, Vec<MatchInfo>>; | ||||
|   | ||||
| @@ -12,7 +12,7 @@ use crate::index_controller::updates::status::{Failed, Processed, Processing, Up | ||||
| use crate::Update; | ||||
|  | ||||
| use super::error::{IndexError, Result}; | ||||
| use super::{Index, IndexMeta}; | ||||
| use super::index::{Index, IndexMeta}; | ||||
|  | ||||
| fn serialize_with_wildcard<S>( | ||||
|     field: &Setting<Vec<String>>, | ||||
|   | ||||
| @@ -17,6 +17,7 @@ use crate::options::IndexerOpts; | ||||
| type AsyncMap<K, V> = Arc<RwLock<HashMap<K, V>>>; | ||||
|  | ||||
| #[async_trait::async_trait] | ||||
| #[cfg_attr(test, mockall::automock)] | ||||
| pub trait IndexStore { | ||||
|     async fn create(&self, uuid: Uuid, primary_key: Option<String>) -> Result<Index>; | ||||
|     async fn get(&self, uuid: Uuid) -> Result<Option<Index>>; | ||||
| @@ -72,9 +73,10 @@ impl IndexStore for MapIndexStore { | ||||
|         let index = spawn_blocking(move || -> Result<Index> { | ||||
|             let index = Index::open(path, index_size, file_store, uuid, update_handler)?; | ||||
|             if let Some(primary_key) = primary_key { | ||||
|                 let mut txn = index.write_txn()?; | ||||
|                 let inner = index.inner(); | ||||
|                 let mut txn = inner.write_txn()?; | ||||
|  | ||||
|                 let mut builder = UpdateBuilder::new(0).settings(&mut txn, &index); | ||||
|                 let mut builder = UpdateBuilder::new(0).settings(&mut txn, index.inner()); | ||||
|                 builder.set_primary_key(primary_key); | ||||
|                 builder.execute(|_, _| ())?; | ||||
|  | ||||
|   | ||||
| @@ -22,6 +22,7 @@ struct DumpEntry { | ||||
| const UUIDS_DB_PATH: &str = "index_uuids"; | ||||
|  | ||||
| #[async_trait::async_trait] | ||||
| #[cfg_attr(test, mockall::automock)] | ||||
| pub trait UuidStore: Sized { | ||||
|     // Create a new entry for `name`. Return an error if `err` and the entry already exists, return | ||||
|     // the uuid otherwise. | ||||
|   | ||||
| @@ -314,7 +314,7 @@ impl IndexController { | ||||
|         for (uid, index) in indexes { | ||||
|             let meta = index.meta()?; | ||||
|             let meta = IndexMetadata { | ||||
|                 uuid: index.uuid, | ||||
|                 uuid: index.uuid(), | ||||
|                 name: uid.clone(), | ||||
|                 uid, | ||||
|                 meta, | ||||
| @@ -366,7 +366,7 @@ impl IndexController { | ||||
|         index_settings.uid.take(); | ||||
|  | ||||
|         let index = self.index_resolver.get_index(uid.clone()).await?; | ||||
|         let uuid = index.uuid; | ||||
|         let uuid = index.uuid(); | ||||
|         let meta = | ||||
|             spawn_blocking(move || index.update_primary_key(index_settings.primary_key)).await??; | ||||
|         let meta = IndexMetadata { | ||||
| @@ -386,7 +386,7 @@ impl IndexController { | ||||
|  | ||||
|     pub async fn get_index(&self, uid: String) -> Result<IndexMetadata> { | ||||
|         let index = self.index_resolver.get_index(uid.clone()).await?; | ||||
|         let uuid = index.uuid; | ||||
|         let uuid = index.uuid(); | ||||
|         let meta = spawn_blocking(move || index.meta()).await??; | ||||
|         let meta = IndexMetadata { | ||||
|             uuid, | ||||
| @@ -400,7 +400,7 @@ impl IndexController { | ||||
|     pub async fn get_index_stats(&self, uid: String) -> Result<IndexStats> { | ||||
|         let update_infos = UpdateMsg::get_info(&self.update_sender).await?; | ||||
|         let index = self.index_resolver.get_index(uid).await?; | ||||
|         let uuid = index.uuid; | ||||
|         let uuid = index.uuid(); | ||||
|         let mut stats = spawn_blocking(move || index.stats()).await??; | ||||
|         // Check if the currently indexing update is from our index. | ||||
|         stats.is_indexing = Some(Some(uuid) == update_infos.processing); | ||||
| @@ -414,7 +414,7 @@ impl IndexController { | ||||
|         let mut indexes = BTreeMap::new(); | ||||
|  | ||||
|         for (index_uid, index) in self.index_resolver.list().await? { | ||||
|             let uuid = index.uuid; | ||||
|             let uuid = index.uuid(); | ||||
|             let (mut stats, meta) = spawn_blocking::<_, IndexResult<_>>(move || { | ||||
|                 let stats = index.stats()?; | ||||
|                 let meta = index.meta()?; | ||||
| @@ -461,7 +461,7 @@ impl IndexController { | ||||
|         let meta = spawn_blocking(move || -> IndexResult<_> { | ||||
|             let meta = index.meta()?; | ||||
|             let meta = IndexMetadata { | ||||
|                 uuid: index.uuid, | ||||
|                 uuid: index.uuid(), | ||||
|                 uid: uid.clone(), | ||||
|                 name: uid, | ||||
|                 meta, | ||||
|   | ||||
| @@ -125,133 +125,133 @@ pub fn load_snapshot( | ||||
|     } | ||||
| } | ||||
|  | ||||
| //#[cfg(test)] | ||||
| //mod test { | ||||
| //use std::iter::FromIterator; | ||||
| //use std::{collections::HashSet, sync::Arc}; | ||||
| #[cfg(test)] | ||||
| mod test { | ||||
|     //use std::iter::FromIterator; | ||||
|     //use std::{collections::HashSet, sync::Arc}; | ||||
|  | ||||
| //use futures::future::{err, ok}; | ||||
| //use rand::Rng; | ||||
| //use tokio::time::timeout; | ||||
| //use uuid::Uuid; | ||||
|     //use futures::future::{err, ok}; | ||||
|     //use rand::Rng; | ||||
|     //use tokio::time::timeout; | ||||
|     //use uuid::Uuid; | ||||
|  | ||||
| //use super::*; | ||||
|     //use super::*; | ||||
|  | ||||
| //#[actix_rt::test] | ||||
| //async fn test_normal() { | ||||
| //let mut rng = rand::thread_rng(); | ||||
| //let uuids_num: usize = rng.gen_range(5..10); | ||||
| //let uuids = (0..uuids_num) | ||||
| //.map(|_| Uuid::new_v4()) | ||||
| //.collect::<HashSet<_>>(); | ||||
|     //#[actix_rt::test] | ||||
|     //async fn test_normal() { | ||||
|         //let mut rng = rand::thread_rng(); | ||||
|         //let uuids_num: usize = rng.gen_range(5..10); | ||||
|         //let uuids = (0..uuids_num) | ||||
|             //.map(|_| Uuid::new_v4()) | ||||
|             //.collect::<HashSet<_>>(); | ||||
|  | ||||
| //let mut uuid_resolver = MockUuidResolverHandle::new(); | ||||
| //let uuids_clone = uuids.clone(); | ||||
| //uuid_resolver | ||||
| //.expect_snapshot() | ||||
| //.times(1) | ||||
| //.returning(move |_| Box::pin(ok(uuids_clone.clone()))); | ||||
|         //let mut uuid_resolver = MockUuidResolverHandle::new(); | ||||
|         //let uuids_clone = uuids.clone(); | ||||
|         //uuid_resolver | ||||
|             //.expect_snapshot() | ||||
|             //.times(1) | ||||
|             //.returning(move |_| Box::pin(ok(uuids_clone.clone()))); | ||||
|  | ||||
| //let uuids_clone = uuids.clone(); | ||||
| //let mut index_handle = MockIndexActorHandle::new(); | ||||
| //index_handle | ||||
| //.expect_snapshot() | ||||
| //.withf(move |uuid, _path| uuids_clone.contains(uuid)) | ||||
| //.times(uuids_num) | ||||
| //.returning(move |_, _| Box::pin(ok(()))); | ||||
|         //let uuids_clone = uuids.clone(); | ||||
|         //let mut index_handle = MockIndexActorHandle::new(); | ||||
|         //index_handle | ||||
|             //.expect_snapshot() | ||||
|             //.withf(move |uuid, _path| uuids_clone.contains(uuid)) | ||||
|             //.times(uuids_num) | ||||
|             //.returning(move |_, _| Box::pin(ok(()))); | ||||
|  | ||||
| //let dir = tempfile::tempdir_in(".").unwrap(); | ||||
| //let handle = Arc::new(index_handle); | ||||
| //let update_handle = | ||||
| //UpdateActorHandleImpl::<Vec<u8>>::new(handle.clone(), dir.path(), 4096 * 100).unwrap(); | ||||
|         //let dir = tempfile::tempdir_in(".").unwrap(); | ||||
|         //let handle = Arc::new(index_handle); | ||||
|         //let update_handle = | ||||
|             //UpdateActorHandleImpl::<Vec<u8>>::new(handle.clone(), dir.path(), 4096 * 100).unwrap(); | ||||
|  | ||||
| //let snapshot_path = tempfile::tempdir_in(".").unwrap(); | ||||
| //let snapshot_service = SnapshotService::new( | ||||
| //uuid_resolver, | ||||
| //update_handle, | ||||
| //Duration::from_millis(100), | ||||
| //snapshot_path.path().to_owned(), | ||||
| //"data.ms".to_string(), | ||||
| //); | ||||
|         //let snapshot_path = tempfile::tempdir_in(".").unwrap(); | ||||
|         //let snapshot_service = SnapshotService::new( | ||||
|             //uuid_resolver, | ||||
|             //update_handle, | ||||
|             //Duration::from_millis(100), | ||||
|             //snapshot_path.path().to_owned(), | ||||
|             //"data.ms".to_string(), | ||||
|         //); | ||||
|  | ||||
| //snapshot_service.perform_snapshot().await.unwrap(); | ||||
| //} | ||||
|         //snapshot_service.perform_snapshot().await.unwrap(); | ||||
|     //} | ||||
|  | ||||
| //#[actix_rt::test] | ||||
| //async fn error_performing_uuid_snapshot() { | ||||
| //let mut uuid_resolver = MockUuidResolverHandle::new(); | ||||
| //uuid_resolver | ||||
| //.expect_snapshot() | ||||
| //.times(1) | ||||
| ////abitrary error | ||||
| //.returning(|_| Box::pin(err(UuidResolverError::NameAlreadyExist))); | ||||
|     //#[actix_rt::test] | ||||
|     //async fn error_performing_uuid_snapshot() { | ||||
|     //let mut uuid_resolver = MockUuidResolverHandle::new(); | ||||
|     //uuid_resolver | ||||
|     //.expect_snapshot() | ||||
|     //.times(1) | ||||
|     ////abitrary error | ||||
|     //.returning(|_| Box::pin(err(UuidResolverError::NameAlreadyExist))); | ||||
|  | ||||
| //let update_handle = MockUpdateActorHandle::new(); | ||||
|     //let update_handle = MockUpdateActorHandle::new(); | ||||
|  | ||||
| //let snapshot_path = tempfile::tempdir_in(".").unwrap(); | ||||
| //let snapshot_service = SnapshotService::new( | ||||
| //uuid_resolver, | ||||
| //update_handle, | ||||
| //Duration::from_millis(100), | ||||
| //snapshot_path.path().to_owned(), | ||||
| //"data.ms".to_string(), | ||||
| //); | ||||
|     //let snapshot_path = tempfile::tempdir_in(".").unwrap(); | ||||
|     //let snapshot_service = SnapshotService::new( | ||||
|     //uuid_resolver, | ||||
|     //update_handle, | ||||
|     //Duration::from_millis(100), | ||||
|     //snapshot_path.path().to_owned(), | ||||
|     //"data.ms".to_string(), | ||||
|     //); | ||||
|  | ||||
| //assert!(snapshot_service.perform_snapshot().await.is_err()); | ||||
| ////Nothing was written to the file | ||||
| //assert!(!snapshot_path.path().join("data.ms.snapshot").exists()); | ||||
| //} | ||||
|     //assert!(snapshot_service.perform_snapshot().await.is_err()); | ||||
|     ////Nothing was written to the file | ||||
|     //assert!(!snapshot_path.path().join("data.ms.snapshot").exists()); | ||||
|     //} | ||||
|  | ||||
| //#[actix_rt::test] | ||||
| //async fn error_performing_index_snapshot() { | ||||
| //let uuid = Uuid::new_v4(); | ||||
| //let mut uuid_resolver = MockUuidResolverHandle::new(); | ||||
| //uuid_resolver | ||||
| //.expect_snapshot() | ||||
| //.times(1) | ||||
| //.returning(move |_| Box::pin(ok(HashSet::from_iter(Some(uuid))))); | ||||
|     //#[actix_rt::test] | ||||
|     //async fn error_performing_index_snapshot() { | ||||
|     //let uuid = Uuid::new_v4(); | ||||
|     //let mut uuid_resolver = MockUuidResolverHandle::new(); | ||||
|     //uuid_resolver | ||||
|     //.expect_snapshot() | ||||
|     //.times(1) | ||||
|     //.returning(move |_| Box::pin(ok(HashSet::from_iter(Some(uuid))))); | ||||
|  | ||||
| //let mut update_handle = MockUpdateActorHandle::new(); | ||||
| //update_handle | ||||
| //.expect_snapshot() | ||||
| ////abitrary error | ||||
| //.returning(|_, _| Box::pin(err(UpdateActorError::UnexistingUpdate(0)))); | ||||
|     //let mut update_handle = MockUpdateActorHandle::new(); | ||||
|     //update_handle | ||||
|     //.expect_snapshot() | ||||
|     ////abitrary error | ||||
|     //.returning(|_, _| Box::pin(err(UpdateActorError::UnexistingUpdate(0)))); | ||||
|  | ||||
| //let snapshot_path = tempfile::tempdir_in(".").unwrap(); | ||||
| //let snapshot_service = SnapshotService::new( | ||||
| //uuid_resolver, | ||||
| //update_handle, | ||||
| //Duration::from_millis(100), | ||||
| //snapshot_path.path().to_owned(), | ||||
| //"data.ms".to_string(), | ||||
| //); | ||||
|     //let snapshot_path = tempfile::tempdir_in(".").unwrap(); | ||||
|     //let snapshot_service = SnapshotService::new( | ||||
|     //uuid_resolver, | ||||
|     //update_handle, | ||||
|     //Duration::from_millis(100), | ||||
|     //snapshot_path.path().to_owned(), | ||||
|     //"data.ms".to_string(), | ||||
|     //); | ||||
|  | ||||
| //assert!(snapshot_service.perform_snapshot().await.is_err()); | ||||
| ////Nothing was written to the file | ||||
| //assert!(!snapshot_path.path().join("data.ms.snapshot").exists()); | ||||
| //} | ||||
|     //assert!(snapshot_service.perform_snapshot().await.is_err()); | ||||
|     ////Nothing was written to the file | ||||
|     //assert!(!snapshot_path.path().join("data.ms.snapshot").exists()); | ||||
|     //} | ||||
|  | ||||
| //#[actix_rt::test] | ||||
| //async fn test_loop() { | ||||
| //let mut uuid_resolver = MockUuidResolverHandle::new(); | ||||
| //uuid_resolver | ||||
| //.expect_snapshot() | ||||
| ////we expect the funtion to be called between 2 and 3 time in the given interval. | ||||
| //.times(2..4) | ||||
| ////abitrary error, to short-circuit the function | ||||
| //.returning(move |_| Box::pin(err(UuidResolverError::NameAlreadyExist))); | ||||
|     //#[actix_rt::test] | ||||
|     //async fn test_loop() { | ||||
|     //let mut uuid_resolver = MockUuidResolverHandle::new(); | ||||
|     //uuid_resolver | ||||
|     //.expect_snapshot() | ||||
|     ////we expect the funtion to be called between 2 and 3 time in the given interval. | ||||
|     //.times(2..4) | ||||
|     ////abitrary error, to short-circuit the function | ||||
|     //.returning(move |_| Box::pin(err(UuidResolverError::NameAlreadyExist))); | ||||
|  | ||||
| //let update_handle = MockUpdateActorHandle::new(); | ||||
|     //let update_handle = MockUpdateActorHandle::new(); | ||||
|  | ||||
| //let snapshot_path = tempfile::tempdir_in(".").unwrap(); | ||||
| //let snapshot_service = SnapshotService::new( | ||||
| //uuid_resolver, | ||||
| //update_handle, | ||||
| //Duration::from_millis(100), | ||||
| //snapshot_path.path().to_owned(), | ||||
| //"data.ms".to_string(), | ||||
| //); | ||||
|     //let snapshot_path = tempfile::tempdir_in(".").unwrap(); | ||||
|     //let snapshot_service = SnapshotService::new( | ||||
|     //uuid_resolver, | ||||
|     //update_handle, | ||||
|     //Duration::from_millis(100), | ||||
|     //snapshot_path.path().to_owned(), | ||||
|     //"data.ms".to_string(), | ||||
|     //); | ||||
|  | ||||
| //let _ = timeout(Duration::from_millis(300), snapshot_service.run()).await; | ||||
| //} | ||||
| //} | ||||
|     //let _ = timeout(Duration::from_millis(300), snapshot_service.run()).await; | ||||
|     //} | ||||
| } | ||||
|   | ||||
| @@ -34,7 +34,7 @@ impl UpdateStore { | ||||
|         // txn must *always* be acquired after state lock, or it will dead lock. | ||||
|         let txn = self.env.write_txn()?; | ||||
|  | ||||
|         let uuids = indexes.iter().map(|i| i.uuid).collect(); | ||||
|         let uuids = indexes.iter().map(|i| i.uuid()).collect(); | ||||
|  | ||||
|         self.dump_updates(&txn, &uuids, &path)?; | ||||
|  | ||||
|   | ||||
| @@ -509,7 +509,7 @@ impl UpdateStore { | ||||
|  | ||||
|         let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data(); | ||||
|  | ||||
|         let uuids: HashSet<_> = indexes.iter().map(|i| i.uuid).collect(); | ||||
|         let uuids: HashSet<_> = indexes.iter().map(|i| i.uuid()).collect(); | ||||
|         for entry in pendings { | ||||
|             let ((_, uuid, _), pending) = entry?; | ||||
|             if uuids.contains(&uuid) { | ||||
| @@ -528,7 +528,7 @@ impl UpdateStore { | ||||
|         let path = path.as_ref().to_owned(); | ||||
|         indexes | ||||
|             .par_iter() | ||||
|             .try_for_each(|index| index.snapshot(path.clone())) | ||||
|             .try_for_each(|index| index.snapshot(&path)) | ||||
|             .unwrap(); | ||||
|  | ||||
|         Ok(()) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user