mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-25 13:06:27 +00:00 
			
		
		
		
	Introduce a background thread that manage updates to do
This commit is contained in:
		| @@ -7,8 +7,11 @@ edition = "2018" | |||||||
| [dependencies] | [dependencies] | ||||||
| bincode = "1.1.4" | bincode = "1.1.4" | ||||||
| byteorder = "1.3.2" | byteorder = "1.3.2" | ||||||
|  | crossbeam-channel = "0.3.9" | ||||||
| deunicode = "1.0.0" | deunicode = "1.0.0" | ||||||
|  | env_logger = "0.7.0" | ||||||
| hashbrown = { version = "0.6.0", features = ["serde"] } | hashbrown = { version = "0.6.0", features = ["serde"] } | ||||||
|  | log = "0.4.8" | ||||||
| meilidb-schema = { path = "../meilidb-schema", version = "0.1.0" } | meilidb-schema = { path = "../meilidb-schema", version = "0.1.0" } | ||||||
| meilidb-tokenizer = { path = "../meilidb-tokenizer", version = "0.1.0" } | meilidb-tokenizer = { path = "../meilidb-tokenizer", version = "0.1.0" } | ||||||
| once_cell = "1.2.0" | once_cell = "1.2.0" | ||||||
|   | |||||||
| @@ -1,13 +1,41 @@ | |||||||
| use std::collections::HashMap; | use std::collections::HashMap; | ||||||
| use std::io; |  | ||||||
| use std::path::Path; | use std::path::Path; | ||||||
| use std::sync::{Arc, RwLock}; | use std::sync::{Arc, RwLock}; | ||||||
| use crate::{store, Index, MResult}; | use std::{fs, thread}; | ||||||
|  |  | ||||||
|  | use crossbeam_channel::Receiver; | ||||||
|  | use log::error; | ||||||
|  |  | ||||||
|  | use crate::{store, update, Index, MResult}; | ||||||
|  |  | ||||||
| pub struct Database { | pub struct Database { | ||||||
|     pub rkv: Arc<RwLock<rkv::Rkv>>, |     pub rkv: Arc<RwLock<rkv::Rkv>>, | ||||||
|     main_store: rkv::SingleStore, |     main_store: rkv::SingleStore, | ||||||
|     indexes: RwLock<HashMap<String, Index>>, |     indexes_store: rkv::SingleStore, | ||||||
|  |     indexes: RwLock<HashMap<String, (Index, thread::JoinHandle<()>)>>, | ||||||
|  | } | ||||||
|  |  | ||||||
|  | fn update_awaiter(receiver: Receiver<()>, rkv: Arc<RwLock<rkv::Rkv>>, index: Index) { | ||||||
|  |     for () in receiver { | ||||||
|  |         // consume all updates in order (oldest first) | ||||||
|  |         loop { | ||||||
|  |             let rkv = match rkv.read() { | ||||||
|  |                 Ok(rkv) => rkv, | ||||||
|  |                 Err(e) => { error!("rkv RwLock read failed: {}", e); break } | ||||||
|  |             }; | ||||||
|  |             let mut writer = match rkv.write() { | ||||||
|  |                 Ok(writer) => writer, | ||||||
|  |                 Err(e) => { error!("LMDB writer transaction begin failed: {}", e); break } | ||||||
|  |             }; | ||||||
|  |  | ||||||
|  |             match update::update_task(&mut writer, index.clone(), None as Option::<fn(_)>) { | ||||||
|  |                 Ok(true) => if let Err(e) = writer.commit() { error!("update transaction failed: {}", e) }, | ||||||
|  |                 // no more updates to handle for now | ||||||
|  |                 Ok(false) => { writer.abort(); break }, | ||||||
|  |                 Err(e) => { error!("update task failed: {}", e); writer.abort() }, | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| impl Database { | impl Database { | ||||||
| @@ -15,6 +43,8 @@ impl Database { | |||||||
|         let manager = rkv::Manager::singleton(); |         let manager = rkv::Manager::singleton(); | ||||||
|         let mut rkv_write = manager.write().unwrap(); |         let mut rkv_write = manager.write().unwrap(); | ||||||
|  |  | ||||||
|  |         fs::create_dir_all(path.as_ref())?; | ||||||
|  |  | ||||||
|         let rkv = rkv_write |         let rkv = rkv_write | ||||||
|             .get_or_create(path.as_ref(), |path| { |             .get_or_create(path.as_ref(), |path| { | ||||||
|                 let mut builder = rkv::Rkv::environment_builder(); |                 let mut builder = rkv::Rkv::environment_builder(); | ||||||
| @@ -26,12 +56,13 @@ impl Database { | |||||||
|  |  | ||||||
|         let rkv_read = rkv.read().unwrap(); |         let rkv_read = rkv.read().unwrap(); | ||||||
|         let create_options = rkv::store::Options::create(); |         let create_options = rkv::store::Options::create(); | ||||||
|         let main_store = rkv_read.open_single("indexes", create_options)?; |         let main_store = rkv_read.open_single("main", create_options)?; | ||||||
|  |         let indexes_store = rkv_read.open_single("indexes", create_options)?; | ||||||
|  |  | ||||||
|         // list all indexes that needs to be opened |         // list all indexes that needs to be opened | ||||||
|         let mut must_open = Vec::new(); |         let mut must_open = Vec::new(); | ||||||
|         let reader = rkv_read.read()?; |         let reader = rkv_read.read()?; | ||||||
|         for result in main_store.iter_start(&reader)? { |         for result in indexes_store.iter_start(&reader)? { | ||||||
|             let (key, _) = result?; |             let (key, _) = result?; | ||||||
|             if let Ok(index_name) = std::str::from_utf8(key) { |             if let Ok(index_name) = std::str::from_utf8(key) { | ||||||
|                 must_open.push(index_name.to_owned()); |                 must_open.push(index_name.to_owned()); | ||||||
| @@ -43,13 +74,24 @@ impl Database { | |||||||
|         // open the previously aggregated indexes |         // open the previously aggregated indexes | ||||||
|         let mut indexes = HashMap::new(); |         let mut indexes = HashMap::new(); | ||||||
|         for index_name in must_open { |         for index_name in must_open { | ||||||
|             let index = store::open(&rkv_read, &index_name)?; |  | ||||||
|             indexes.insert(index_name, index); |             let (sender, receiver) = crossbeam_channel::bounded(100); | ||||||
|  |             let index = store::open(&rkv_read, &index_name, sender.clone())?; | ||||||
|  |             let rkv_clone = rkv.clone(); | ||||||
|  |             let index_clone = index.clone(); | ||||||
|  |             let handle = thread::spawn(move || update_awaiter(receiver, rkv_clone, index_clone)); | ||||||
|  |  | ||||||
|  |             // send an update notification to make sure that | ||||||
|  |             // possible previous boot updates are consumed | ||||||
|  |             sender.send(()).unwrap(); | ||||||
|  |  | ||||||
|  |             let result = indexes.insert(index_name, (index, handle)); | ||||||
|  |             assert!(result.is_none(), "The index should not have been already open"); | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         drop(rkv_read); |         drop(rkv_read); | ||||||
|  |  | ||||||
|         Ok(Database { rkv, main_store, indexes: RwLock::new(indexes) }) |         Ok(Database { rkv, main_store, indexes_store, indexes: RwLock::new(indexes) }) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub fn open_index(&self, name: impl Into<String>) -> MResult<Index> { |     pub fn open_index(&self, name: impl Into<String>) -> MResult<Index> { | ||||||
| @@ -57,20 +99,26 @@ impl Database { | |||||||
|         let name = name.into(); |         let name = name.into(); | ||||||
|  |  | ||||||
|         match indexes_lock.get(&name) { |         match indexes_lock.get(&name) { | ||||||
|             Some(index) => Ok(*index), |             Some((index, _)) => Ok(index.clone()), | ||||||
|             None => { |             None => { | ||||||
|                 drop(indexes_lock); |                 drop(indexes_lock); | ||||||
|  |  | ||||||
|                 let rkv_lock = self.rkv.read().unwrap(); |                 let rkv_lock = self.rkv.read().unwrap(); | ||||||
|                 let index = store::create(&rkv_lock, &name)?; |                 let (sender, receiver) = crossbeam_channel::bounded(100); | ||||||
|  |                 let index = store::create(&rkv_lock, &name, sender)?; | ||||||
|  |  | ||||||
|                 let mut writer = rkv_lock.write()?; |                 let mut writer = rkv_lock.write()?; | ||||||
|                 let value = rkv::Value::Blob(&[]); |                 let value = rkv::Value::Blob(&[]); | ||||||
|                 self.main_store.put(&mut writer, &name, &value)?; |                 self.indexes_store.put(&mut writer, &name, &value)?; | ||||||
|  |  | ||||||
|                 { |                 { | ||||||
|                     let mut indexes_write = self.indexes.write().unwrap(); |                     let mut indexes_write = self.indexes.write().unwrap(); | ||||||
|                     indexes_write.entry(name).or_insert(index); |                     indexes_write.entry(name).or_insert_with(|| { | ||||||
|  |                         let rkv_clone = self.rkv.clone(); | ||||||
|  |                         let index_clone = index.clone(); | ||||||
|  |                         let handle = thread::spawn(move || update_awaiter(receiver, rkv_clone, index_clone)); | ||||||
|  |                         (index.clone(), handle) | ||||||
|  |                     }); | ||||||
|                 } |                 } | ||||||
|  |  | ||||||
|                 writer.commit()?; |                 writer.commit()?; | ||||||
| @@ -84,4 +132,8 @@ impl Database { | |||||||
|         let indexes = self.indexes.read().unwrap(); |         let indexes = self.indexes.read().unwrap(); | ||||||
|         Ok(indexes.keys().cloned().collect()) |         Ok(indexes.keys().cloned().collect()) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     pub fn main_store(&self) -> rkv::SingleStore { | ||||||
|  |         self.main_store | ||||||
|  |     } | ||||||
| } | } | ||||||
|   | |||||||
| @@ -1,55 +1,62 @@ | |||||||
| use std::{error, fmt}; | use std::{error, fmt, io}; | ||||||
| use crate::serde::SerializerError; | use crate::serde::SerializerError; | ||||||
|  |  | ||||||
| pub type MResult<T> = Result<T, Error>; | pub type MResult<T> = Result<T, Error>; | ||||||
|  |  | ||||||
| #[derive(Debug)] | #[derive(Debug)] | ||||||
| pub enum Error { | pub enum Error { | ||||||
|  |     Io(io::Error), | ||||||
|     SchemaDiffer, |     SchemaDiffer, | ||||||
|     SchemaMissing, |     SchemaMissing, | ||||||
|     WordIndexMissing, |     WordIndexMissing, | ||||||
|     MissingDocumentId, |     MissingDocumentId, | ||||||
|     RkvError(rkv::StoreError), |     Rkv(rkv::StoreError), | ||||||
|     FstError(fst::Error), |     Fst(fst::Error), | ||||||
|     RmpDecodeError(rmp_serde::decode::Error), |     RmpDecode(rmp_serde::decode::Error), | ||||||
|     RmpEncodeError(rmp_serde::encode::Error), |     RmpEncode(rmp_serde::encode::Error), | ||||||
|     BincodeError(bincode::Error), |     Bincode(bincode::Error), | ||||||
|     SerializerError(SerializerError), |     Serializer(SerializerError), | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl From<io::Error> for Error { | ||||||
|  |     fn from(error: io::Error) -> Error { | ||||||
|  |         Error::Io(error) | ||||||
|  |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| impl From<rkv::StoreError> for Error { | impl From<rkv::StoreError> for Error { | ||||||
|     fn from(error: rkv::StoreError) -> Error { |     fn from(error: rkv::StoreError) -> Error { | ||||||
|         Error::RkvError(error) |         Error::Rkv(error) | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| impl From<fst::Error> for Error { | impl From<fst::Error> for Error { | ||||||
|     fn from(error: fst::Error) -> Error { |     fn from(error: fst::Error) -> Error { | ||||||
|         Error::FstError(error) |         Error::Fst(error) | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| impl From<rmp_serde::decode::Error> for Error { | impl From<rmp_serde::decode::Error> for Error { | ||||||
|     fn from(error: rmp_serde::decode::Error) -> Error { |     fn from(error: rmp_serde::decode::Error) -> Error { | ||||||
|         Error::RmpDecodeError(error) |         Error::RmpDecode(error) | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| impl From<rmp_serde::encode::Error> for Error { | impl From<rmp_serde::encode::Error> for Error { | ||||||
|     fn from(error: rmp_serde::encode::Error) -> Error { |     fn from(error: rmp_serde::encode::Error) -> Error { | ||||||
|         Error::RmpEncodeError(error) |         Error::RmpEncode(error) | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| impl From<bincode::Error> for Error { | impl From<bincode::Error> for Error { | ||||||
|     fn from(error: bincode::Error) -> Error { |     fn from(error: bincode::Error) -> Error { | ||||||
|         Error::BincodeError(error) |         Error::Bincode(error) | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| impl From<SerializerError> for Error { | impl From<SerializerError> for Error { | ||||||
|     fn from(error: SerializerError) -> Error { |     fn from(error: SerializerError) -> Error { | ||||||
|         Error::SerializerError(error) |         Error::Serializer(error) | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -57,16 +64,17 @@ impl fmt::Display for Error { | |||||||
|     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | ||||||
|         use self::Error::*; |         use self::Error::*; | ||||||
|         match self { |         match self { | ||||||
|  |             Io(e) => write!(f, "{}", e), | ||||||
|             SchemaDiffer => write!(f, "schemas differ"), |             SchemaDiffer => write!(f, "schemas differ"), | ||||||
|             SchemaMissing => write!(f, "this index does not have a schema"), |             SchemaMissing => write!(f, "this index does not have a schema"), | ||||||
|             WordIndexMissing => write!(f, "this index does not have a word index"), |             WordIndexMissing => write!(f, "this index does not have a word index"), | ||||||
|             MissingDocumentId => write!(f, "document id is missing"), |             MissingDocumentId => write!(f, "document id is missing"), | ||||||
|             RkvError(e) => write!(f, "rkv error; {}", e), |             Rkv(e) => write!(f, "rkv error; {}", e), | ||||||
|             FstError(e) => write!(f, "fst error; {}", e), |             Fst(e) => write!(f, "fst error; {}", e), | ||||||
|             RmpDecodeError(e) => write!(f, "rmp decode error; {}", e), |             RmpDecode(e) => write!(f, "rmp decode error; {}", e), | ||||||
|             RmpEncodeError(e) => write!(f, "rmp encode error; {}", e), |             RmpEncode(e) => write!(f, "rmp encode error; {}", e), | ||||||
|             BincodeError(e) => write!(f, "bincode error; {}", e), |             Bincode(e) => write!(f, "bincode error; {}", e), | ||||||
|             SerializerError(e) => write!(f, "serializer error; {}", e), |             Serializer(e) => write!(f, "serializer error; {}", e), | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| } | } | ||||||
|   | |||||||
| @@ -1,17 +1,25 @@ | |||||||
| use rkv::{Manager, Rkv, SingleStore, Value, StoreOptions}; | use rkv::{Manager, Rkv, SingleStore, Value, StoreOptions}; | ||||||
| use std::{fs, path::Path}; | use std::{fs, path::Path}; | ||||||
| use meilidb_core::{Database, QueryBuilder}; | use meilidb_core::{Database, MResult, QueryBuilder}; | ||||||
|  |  | ||||||
|  | fn main() -> MResult<()> { | ||||||
|  |     env_logger::init(); | ||||||
|  |  | ||||||
| fn main() { |  | ||||||
|     let path = Path::new("test.rkv"); |     let path = Path::new("test.rkv"); | ||||||
|     fs::create_dir_all(path).unwrap(); |     let database = Database::open_or_create(path)?; | ||||||
|  |  | ||||||
|     let database = Database::open_or_create(path).unwrap(); |  | ||||||
|     println!("{:?}", database.indexes_names()); |     println!("{:?}", database.indexes_names()); | ||||||
|  |  | ||||||
|     let hello = database.open_index("hello").unwrap(); |     let hello = database.open_index("hello")?; | ||||||
|     let hello1 = database.open_index("hello1").unwrap(); |     let hello1 = database.open_index("hello1")?; | ||||||
|     let hello2 = database.open_index("hello2").unwrap(); |     let hello2 = database.open_index("hello2")?; | ||||||
|  |  | ||||||
|  |     let mut additions = hello.documents_addition(); | ||||||
|  |     additions.extend(vec![()]); | ||||||
|  |  | ||||||
|  |     let rkv = database.rkv.read().unwrap(); | ||||||
|  |     let writer = rkv.write()?; | ||||||
|  |  | ||||||
|  |     additions.finalize(writer)?; | ||||||
|  |  | ||||||
|     // { |     // { | ||||||
|     //     let mut writer = env.write().unwrap(); |     //     let mut writer = env.write().unwrap(); | ||||||
| @@ -44,4 +52,8 @@ fn main() { | |||||||
|     // let documents = builder.query(&reader, "oubli", 0..20).unwrap(); |     // let documents = builder.query(&reader, "oubli", 0..20).unwrap(); | ||||||
|  |  | ||||||
|     // println!("{:?}", documents); |     // println!("{:?}", documents); | ||||||
|  |  | ||||||
|  |     std::thread::sleep(std::time::Duration::from_secs(10)); | ||||||
|  |  | ||||||
|  |     Ok(()) | ||||||
| } | } | ||||||
|   | |||||||
| @@ -14,6 +14,8 @@ pub use self::synonyms::Synonyms; | |||||||
| pub use self::updates::Updates; | pub use self::updates::Updates; | ||||||
| pub use self::updates_results::UpdatesResults; | pub use self::updates_results::UpdatesResults; | ||||||
|  |  | ||||||
|  | use crate::update; | ||||||
|  |  | ||||||
| fn aligned_to(bytes: &[u8], align: usize) -> bool { | fn aligned_to(bytes: &[u8], align: usize) -> bool { | ||||||
|     (bytes as *const _ as *const () as usize) % align == 0 |     (bytes as *const _ as *const () as usize) % align == 0 | ||||||
| } | } | ||||||
| @@ -46,31 +48,62 @@ fn updates_results_name(name: &str) -> String { | |||||||
|     format!("store-{}-updates-results", name) |     format!("store-{}-updates-results", name) | ||||||
| } | } | ||||||
|  |  | ||||||
| #[derive(Copy, Clone)] | #[derive(Clone)] | ||||||
| pub struct Index { | pub struct Index { | ||||||
|     pub main: Main, |     pub main: Main, | ||||||
|     pub postings_lists: PostingsLists, |     pub postings_lists: PostingsLists, | ||||||
|     pub documents_fields: DocumentsFields, |     pub documents_fields: DocumentsFields, | ||||||
|     pub synonyms: Synonyms, |     pub synonyms: Synonyms, | ||||||
|     pub docs_words: DocsWords, |     pub docs_words: DocsWords, | ||||||
|  |  | ||||||
|     pub updates: Updates, |     pub updates: Updates, | ||||||
|     pub updates_results: UpdatesResults, |     pub updates_results: UpdatesResults, | ||||||
|  |     updates_notifier: crossbeam_channel::Sender<()>, | ||||||
| } | } | ||||||
|  |  | ||||||
| pub fn create(env: &rkv::Rkv, name: &str) -> Result<Index, rkv::StoreError> { | impl Index { | ||||||
|     open_options(env, name, rkv::StoreOptions::create()) |     pub fn documents_addition<D>(&self) -> update::DocumentsAddition<D> { | ||||||
|  |         update::DocumentsAddition::new( | ||||||
|  |             self.updates, | ||||||
|  |             self.updates_results, | ||||||
|  |             self.updates_notifier.clone(), | ||||||
|  |         ) | ||||||
|     } |     } | ||||||
|  |  | ||||||
| pub fn open(env: &rkv::Rkv, name: &str) -> Result<Index, rkv::StoreError> { |     pub fn documents_deletion<D>(&self) -> update::DocumentsDeletion { | ||||||
|  |         update::DocumentsDeletion::new( | ||||||
|  |             self.updates, | ||||||
|  |             self.updates_results, | ||||||
|  |             self.updates_notifier.clone(), | ||||||
|  |         ) | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | pub fn create( | ||||||
|  |     env: &rkv::Rkv, | ||||||
|  |     name: &str, | ||||||
|  |     updates_notifier: crossbeam_channel::Sender<()>, | ||||||
|  | ) -> Result<Index, rkv::StoreError> | ||||||
|  | { | ||||||
|  |     open_options(env, name, rkv::StoreOptions::create(), updates_notifier) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | pub fn open( | ||||||
|  |     env: &rkv::Rkv, | ||||||
|  |     name: &str, | ||||||
|  |     updates_notifier: crossbeam_channel::Sender<()>, | ||||||
|  | ) -> Result<Index, rkv::StoreError> | ||||||
|  | { | ||||||
|     let mut options = rkv::StoreOptions::default(); |     let mut options = rkv::StoreOptions::default(); | ||||||
|     options.create = false; |     options.create = false; | ||||||
|     open_options(env, name, options) |     open_options(env, name, options, updates_notifier) | ||||||
| } | } | ||||||
|  |  | ||||||
| fn open_options( | fn open_options( | ||||||
|     env: &rkv::Rkv, |     env: &rkv::Rkv, | ||||||
|     name: &str, |     name: &str, | ||||||
|     options: rkv::StoreOptions, |     options: rkv::StoreOptions, | ||||||
|  |     updates_notifier: crossbeam_channel::Sender<()>, | ||||||
| ) -> Result<Index, rkv::StoreError> | ) -> Result<Index, rkv::StoreError> | ||||||
| { | { | ||||||
|     // create all the store names |     // create all the store names | ||||||
| @@ -99,5 +132,6 @@ fn open_options( | |||||||
|         docs_words: DocsWords { docs_words }, |         docs_words: DocsWords { docs_words }, | ||||||
|         updates: Updates { updates }, |         updates: Updates { updates }, | ||||||
|         updates_results: UpdatesResults { updates_results }, |         updates_results: UpdatesResults { updates_results }, | ||||||
|  |         updates_notifier, | ||||||
|     }) |     }) | ||||||
| } | } | ||||||
|   | |||||||
| @@ -10,7 +10,7 @@ pub struct Updates { | |||||||
| impl Updates { | impl Updates { | ||||||
|     // TODO we should use the MDB_LAST op but |     // TODO we should use the MDB_LAST op but | ||||||
|     //      it is not exposed by the rkv library |     //      it is not exposed by the rkv library | ||||||
|     fn last_update_id<'a>( |     pub fn last_update_id<'a>( | ||||||
|         &self, |         &self, | ||||||
|         reader: &'a impl rkv::Readable, |         reader: &'a impl rkv::Readable, | ||||||
|     ) -> Result<Option<(u64, Option<Value<'a>>)>, rkv::StoreError> |     ) -> Result<Option<(u64, Option<Value<'a>>)>, rkv::StoreError> | ||||||
| @@ -60,21 +60,18 @@ impl Updates { | |||||||
|         self.updates.get(reader, update_id_bytes).map(|v| v.is_some()) |         self.updates.get(reader, update_id_bytes).map(|v| v.is_some()) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub fn push_back( |     pub fn put_update( | ||||||
|         &self, |         &self, | ||||||
|         writer: &mut rkv::Writer, |         writer: &mut rkv::Writer, | ||||||
|  |         update_id: u64, | ||||||
|         update: &Update, |         update: &Update, | ||||||
|     ) -> MResult<u64> |     ) -> MResult<()> | ||||||
|     { |     { | ||||||
|         let last_update_id = self.last_update_id(writer)?; |         let update_id_bytes = update_id.to_be_bytes(); | ||||||
|         let last_update_id = last_update_id.map_or(0, |(n, _)| n + 1); |  | ||||||
|         let last_update_id_bytes = last_update_id.to_be_bytes(); |  | ||||||
|  |  | ||||||
|         let update = rmp_serde::to_vec_named(&update)?; |         let update = rmp_serde::to_vec_named(&update)?; | ||||||
|         let blob = Value::Blob(&update); |         let blob = Value::Blob(&update); | ||||||
|         self.updates.put(writer, last_update_id_bytes, &blob)?; |         self.updates.put(writer, update_id_bytes, &blob)?; | ||||||
|  |         Ok(()) | ||||||
|         Ok(last_update_id) |  | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub fn pop_front( |     pub fn pop_front( | ||||||
| @@ -82,20 +79,20 @@ impl Updates { | |||||||
|         writer: &mut rkv::Writer, |         writer: &mut rkv::Writer, | ||||||
|     ) -> MResult<Option<(u64, Update)>> |     ) -> MResult<Option<(u64, Update)>> | ||||||
|     { |     { | ||||||
|         let (last_id, last_data) = match self.first_update_id(writer)? { |         let (first_id, first_data) = match self.first_update_id(writer)? { | ||||||
|             Some(entry) => entry, |             Some(entry) => entry, | ||||||
|             None => return Ok(None), |             None => return Ok(None), | ||||||
|         }; |         }; | ||||||
|  |  | ||||||
|         match last_data { |         match first_data { | ||||||
|             Some(Value::Blob(bytes)) => { |             Some(Value::Blob(bytes)) => { | ||||||
|                 let update = rmp_serde::from_read_ref(&bytes)?; |                 let update = rmp_serde::from_read_ref(&bytes)?; | ||||||
|  |  | ||||||
|                 // remove it from the database now |                 // remove it from the database now | ||||||
|                 let last_id_bytes = last_id.to_be_bytes(); |                 let first_id_bytes = first_id.to_be_bytes(); | ||||||
|                 self.updates.delete(writer, last_id_bytes)?; |                 self.updates.delete(writer, first_id_bytes)?; | ||||||
|  |  | ||||||
|                 Ok(Some((last_id, update))) |                 Ok(Some((first_id, update))) | ||||||
|             }, |             }, | ||||||
|             Some(value) => panic!("invalid type {:?}", value), |             Some(value) => panic!("invalid type {:?}", value), | ||||||
|             None => Ok(None), |             None => Ok(None), | ||||||
|   | |||||||
| @@ -1,3 +1,4 @@ | |||||||
|  | use std::convert::TryInto; | ||||||
| use rkv::Value; | use rkv::Value; | ||||||
| use crate::{update::UpdateResult, MResult}; | use crate::{update::UpdateResult, MResult}; | ||||||
|  |  | ||||||
| @@ -7,6 +8,31 @@ pub struct UpdatesResults { | |||||||
| } | } | ||||||
|  |  | ||||||
| impl UpdatesResults { | impl UpdatesResults { | ||||||
|  |     // TODO we should use the MDB_LAST op but | ||||||
|  |     //      it is not exposed by the rkv library | ||||||
|  |     pub fn last_update_id<'a>( | ||||||
|  |         &self, | ||||||
|  |         reader: &'a impl rkv::Readable, | ||||||
|  |     ) -> Result<Option<(u64, Option<Value<'a>>)>, rkv::StoreError> | ||||||
|  |     { | ||||||
|  |         let mut last = None; | ||||||
|  |         let iter = self.updates_results.iter_start(reader)?; | ||||||
|  |         for result in iter { | ||||||
|  |             let (key, data) = result?; | ||||||
|  |             last = Some((key, data)); | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         let (last_key, last_data) = match last { | ||||||
|  |             Some(entry) => entry, | ||||||
|  |             None => return Ok(None), | ||||||
|  |         }; | ||||||
|  |  | ||||||
|  |         let array = last_key.try_into().unwrap(); | ||||||
|  |         let number = u64::from_be_bytes(array); | ||||||
|  |  | ||||||
|  |         Ok(Some((number, last_data))) | ||||||
|  |     } | ||||||
|  |  | ||||||
|     pub fn put_update_result( |     pub fn put_update_result( | ||||||
|         &self, |         &self, | ||||||
|         writer: &mut rkv::Writer, |         writer: &mut rkv::Writer, | ||||||
|   | |||||||
| @@ -13,22 +13,49 @@ use crate::{Error, RankedMap}; | |||||||
|  |  | ||||||
| pub struct DocumentsAddition<D> { | pub struct DocumentsAddition<D> { | ||||||
|     updates_store: store::Updates, |     updates_store: store::Updates, | ||||||
|  |     updates_results_store: store::UpdatesResults, | ||||||
|  |     updates_notifier: crossbeam_channel::Sender<()>, | ||||||
|     documents: Vec<D>, |     documents: Vec<D>, | ||||||
| } | } | ||||||
|  |  | ||||||
| impl<D> DocumentsAddition<D> { | impl<D> DocumentsAddition<D> { | ||||||
|     pub fn new(updates_store: store::Updates) -> DocumentsAddition<D> { |     pub fn new( | ||||||
|         DocumentsAddition { updates_store, documents: Vec::new() } |         updates_store: store::Updates, | ||||||
|  |         updates_results_store: store::UpdatesResults, | ||||||
|  |         updates_notifier: crossbeam_channel::Sender<()>, | ||||||
|  |     ) -> DocumentsAddition<D> | ||||||
|  |     { | ||||||
|  |         DocumentsAddition { | ||||||
|  |             updates_store, | ||||||
|  |             updates_results_store, | ||||||
|  |             updates_notifier, | ||||||
|  |             documents: Vec::new(), | ||||||
|  |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub fn update_document(&mut self, document: D) { |     pub fn update_document(&mut self, document: D) { | ||||||
|         self.documents.push(document); |         self.documents.push(document); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub fn finalize(self, writer: &mut rkv::Writer) -> Result<u64, Error> |     pub fn finalize(self, mut writer: rkv::Writer) -> Result<u64, Error> | ||||||
|     where D: serde::Serialize |     where D: serde::Serialize | ||||||
|     { |     { | ||||||
|         push_documents_addition(writer, self.updates_store, self.documents) |         let update_id = push_documents_addition( | ||||||
|  |             &mut writer, | ||||||
|  |             self.updates_store, | ||||||
|  |             self.updates_results_store, | ||||||
|  |             self.documents, | ||||||
|  |         )?; | ||||||
|  |         writer.commit()?; | ||||||
|  |         let _ = self.updates_notifier.send(()); | ||||||
|  |  | ||||||
|  |         Ok(update_id) | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl<D> Extend<D> for DocumentsAddition<D> { | ||||||
|  |     fn extend<T: IntoIterator<Item=D>>(&mut self, iter: T) { | ||||||
|  |         self.documents.extend(iter) | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -11,12 +11,24 @@ use crate::store; | |||||||
|  |  | ||||||
| pub struct DocumentsDeletion { | pub struct DocumentsDeletion { | ||||||
|     updates_store: store::Updates, |     updates_store: store::Updates, | ||||||
|  |     updates_results_store: store::UpdatesResults, | ||||||
|  |     updates_notifier: crossbeam_channel::Sender<()>, | ||||||
|     documents: Vec<DocumentId>, |     documents: Vec<DocumentId>, | ||||||
| } | } | ||||||
|  |  | ||||||
| impl DocumentsDeletion { | impl DocumentsDeletion { | ||||||
|     pub fn new(updates_store: store::Updates) -> DocumentsDeletion { |     pub fn new( | ||||||
|         DocumentsDeletion { updates_store, documents: Vec::new() } |         updates_store: store::Updates, | ||||||
|  |         updates_results_store: store::UpdatesResults, | ||||||
|  |         updates_notifier: crossbeam_channel::Sender<()>, | ||||||
|  |     ) -> DocumentsDeletion | ||||||
|  |     { | ||||||
|  |         DocumentsDeletion { | ||||||
|  |             updates_store, | ||||||
|  |             updates_results_store, | ||||||
|  |             updates_notifier, | ||||||
|  |             documents: Vec::new(), | ||||||
|  |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub fn delete_document_by_id(&mut self, document_id: DocumentId) { |     pub fn delete_document_by_id(&mut self, document_id: DocumentId) { | ||||||
| @@ -37,8 +49,17 @@ impl DocumentsDeletion { | |||||||
|         Ok(()) |         Ok(()) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub fn finalize(self, writer: &mut rkv::Writer) -> Result<u64, Error> { |     pub fn finalize(self, mut writer: rkv::Writer) -> Result<u64, Error> { | ||||||
|         push_documents_deletion(writer, self.updates_store, self.documents) |         let update_id = push_documents_deletion( | ||||||
|  |             &mut writer, | ||||||
|  |             self.updates_store, | ||||||
|  |             self.updates_results_store, | ||||||
|  |             self.documents, | ||||||
|  |         )?; | ||||||
|  |         writer.commit()?; | ||||||
|  |         let _ = self.updates_notifier.send(()); | ||||||
|  |  | ||||||
|  |         Ok(update_id) | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -5,16 +5,17 @@ pub use self::documents_addition::{DocumentsAddition, apply_documents_addition}; | |||||||
| pub use self::documents_deletion::{DocumentsDeletion, apply_documents_deletion}; | pub use self::documents_deletion::{DocumentsDeletion, apply_documents_deletion}; | ||||||
|  |  | ||||||
| use std::time::{Duration, Instant}; | use std::time::{Duration, Instant}; | ||||||
|  | use log::debug; | ||||||
| use serde::{Serialize, Deserialize}; | use serde::{Serialize, Deserialize}; | ||||||
| use crate::{store, Error, MResult, DocumentId, RankedMap}; | use crate::{store, Error, MResult, DocumentId, RankedMap}; | ||||||
|  |  | ||||||
| #[derive(Serialize, Deserialize)] | #[derive(Debug, Serialize, Deserialize)] | ||||||
| pub enum Update { | pub enum Update { | ||||||
|     DocumentsAddition(Vec<rmpv::Value>), |     DocumentsAddition(Vec<rmpv::Value>), | ||||||
|     DocumentsDeletion(Vec<DocumentId>), |     DocumentsDeletion(Vec<DocumentId>), | ||||||
| } | } | ||||||
|  |  | ||||||
| #[derive(Clone, Serialize, Deserialize)] | #[derive(Debug, Clone, Serialize, Deserialize)] | ||||||
| pub enum UpdateType { | pub enum UpdateType { | ||||||
|     DocumentsAddition { number: usize }, |     DocumentsAddition { number: usize }, | ||||||
|     DocumentsDeletion { number: usize }, |     DocumentsDeletion { number: usize }, | ||||||
| @@ -59,11 +60,29 @@ pub fn update_status<T: rkv::Readable>( | |||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|  | pub fn biggest_update_id( | ||||||
|  |     writer: &mut rkv::Writer, | ||||||
|  |     updates_store: store::Updates, | ||||||
|  |     updates_results_store: store::UpdatesResults, | ||||||
|  | ) -> MResult<Option<u64>> | ||||||
|  | { | ||||||
|  |     let last_update_id = updates_store.last_update_id(writer)?; | ||||||
|  |     let last_update_id = last_update_id.map(|(n, _)| n); | ||||||
|  |  | ||||||
|  |     let last_update_results_id = updates_results_store.last_update_id(writer)?; | ||||||
|  |     let last_update_results_id = last_update_results_id.map(|(n, _)| n); | ||||||
|  |  | ||||||
|  |     let max = last_update_id.max(last_update_results_id); | ||||||
|  |  | ||||||
|  |     Ok(max) | ||||||
|  | } | ||||||
|  |  | ||||||
| pub fn push_documents_addition<D: serde::Serialize>( | pub fn push_documents_addition<D: serde::Serialize>( | ||||||
|     writer: &mut rkv::Writer, |     writer: &mut rkv::Writer, | ||||||
|     updates_store: store::Updates, |     updates_store: store::Updates, | ||||||
|  |     updates_results_store: store::UpdatesResults, | ||||||
|     addition: Vec<D>, |     addition: Vec<D>, | ||||||
| ) -> Result<u64, Error> | ) -> MResult<u64> | ||||||
| { | { | ||||||
|     let mut values = Vec::with_capacity(addition.len()); |     let mut values = Vec::with_capacity(addition.len()); | ||||||
|     for add in addition { |     for add in addition { | ||||||
| @@ -72,22 +91,29 @@ pub fn push_documents_addition<D: serde::Serialize>( | |||||||
|         values.push(add); |         values.push(add); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     let update = Update::DocumentsAddition(values); |     let last_update_id = biggest_update_id(writer, updates_store, updates_results_store)?; | ||||||
|     let update_id = updates_store.push_back(writer, &update)?; |     let last_update_id = last_update_id.map_or(0, |n| n + 1); | ||||||
|  |  | ||||||
|     Ok(update_id) |     let update = Update::DocumentsAddition(values); | ||||||
|  |     let update_id = updates_store.put_update(writer, last_update_id, &update)?; | ||||||
|  |  | ||||||
|  |     Ok(last_update_id) | ||||||
| } | } | ||||||
|  |  | ||||||
| pub fn push_documents_deletion( | pub fn push_documents_deletion( | ||||||
|     writer: &mut rkv::Writer, |     writer: &mut rkv::Writer, | ||||||
|     updates_store: store::Updates, |     updates_store: store::Updates, | ||||||
|  |     updates_results_store: store::UpdatesResults, | ||||||
|     deletion: Vec<DocumentId>, |     deletion: Vec<DocumentId>, | ||||||
| ) -> Result<u64, Error> | ) -> MResult<u64> | ||||||
| { | { | ||||||
|     let update = Update::DocumentsDeletion(deletion); |     let last_update_id = biggest_update_id(writer, updates_store, updates_results_store)?; | ||||||
|     let update_id = updates_store.push_back(writer, &update)?; |     let last_update_id = last_update_id.map_or(0, |n| n + 1); | ||||||
|  |  | ||||||
|     Ok(update_id) |     let update = Update::DocumentsDeletion(deletion); | ||||||
|  |     let update_id = updates_store.put_update(writer, last_update_id, &update)?; | ||||||
|  |  | ||||||
|  |     Ok(last_update_id) | ||||||
| } | } | ||||||
|  |  | ||||||
| pub fn update_task( | pub fn update_task( | ||||||
| @@ -101,21 +127,21 @@ pub fn update_task( | |||||||
|         None => return Ok(false), |         None => return Ok(false), | ||||||
|     }; |     }; | ||||||
|  |  | ||||||
|  |     debug!("Processing update number {}", update_id); | ||||||
|  |  | ||||||
|     let (update_type, result, duration) = match update { |     let (update_type, result, duration) = match update { | ||||||
|         Update::DocumentsAddition(documents) => { |         Update::DocumentsAddition(documents) => { | ||||||
|             let update_type = UpdateType::DocumentsAddition { number: documents.len() }; |             let start = Instant::now(); | ||||||
|  |  | ||||||
|             let schema = match index.main.schema(writer)? { |  | ||||||
|                 Some(schema) => schema, |  | ||||||
|                 None => return Err(Error::SchemaMissing), |  | ||||||
|             }; |  | ||||||
|             let ranked_map = match index.main.ranked_map(writer)? { |             let ranked_map = match index.main.ranked_map(writer)? { | ||||||
|                 Some(ranked_map) => ranked_map, |                 Some(ranked_map) => ranked_map, | ||||||
|                 None => RankedMap::default(), |                 None => RankedMap::default(), | ||||||
|             }; |             }; | ||||||
|  |  | ||||||
|             let start = Instant::now(); |             let update_type = UpdateType::DocumentsAddition { number: documents.len() }; | ||||||
|             let result = apply_documents_addition( |  | ||||||
|  |             let result = match index.main.schema(writer)? { | ||||||
|  |                 Some(schema) => apply_documents_addition( | ||||||
|                     writer, |                     writer, | ||||||
|                     index.main, |                     index.main, | ||||||
|                     index.documents_fields, |                     index.documents_fields, | ||||||
| @@ -124,24 +150,24 @@ pub fn update_task( | |||||||
|                     &schema, |                     &schema, | ||||||
|                     ranked_map, |                     ranked_map, | ||||||
|                     documents, |                     documents, | ||||||
|             ); |                 ), | ||||||
|  |                 None => Err(Error::SchemaMissing), | ||||||
|  |             }; | ||||||
|  |  | ||||||
|             (update_type, result, start.elapsed()) |             (update_type, result, start.elapsed()) | ||||||
|         }, |         }, | ||||||
|         Update::DocumentsDeletion(documents) => { |         Update::DocumentsDeletion(documents) => { | ||||||
|             let update_type = UpdateType::DocumentsDeletion { number: documents.len() }; |             let start = Instant::now(); | ||||||
|  |  | ||||||
|             let schema = match index.main.schema(writer)? { |  | ||||||
|                 Some(schema) => schema, |  | ||||||
|                 None => return Err(Error::SchemaMissing), |  | ||||||
|             }; |  | ||||||
|             let ranked_map = match index.main.ranked_map(writer)? { |             let ranked_map = match index.main.ranked_map(writer)? { | ||||||
|                 Some(ranked_map) => ranked_map, |                 Some(ranked_map) => ranked_map, | ||||||
|                 None => RankedMap::default(), |                 None => RankedMap::default(), | ||||||
|             }; |             }; | ||||||
|  |  | ||||||
|             let start = Instant::now(); |             let update_type = UpdateType::DocumentsDeletion { number: documents.len() }; | ||||||
|             let result = apply_documents_deletion( |  | ||||||
|  |             let result = match index.main.schema(writer)? { | ||||||
|  |                 Some(schema) => apply_documents_deletion( | ||||||
|                     writer, |                     writer, | ||||||
|                     index.main, |                     index.main, | ||||||
|                     index.documents_fields, |                     index.documents_fields, | ||||||
| @@ -150,7 +176,9 @@ pub fn update_task( | |||||||
|                     &schema, |                     &schema, | ||||||
|                     ranked_map, |                     ranked_map, | ||||||
|                     documents, |                     documents, | ||||||
|             ); |                 ), | ||||||
|  |                 None => Err(Error::SchemaMissing), | ||||||
|  |             }; | ||||||
|  |  | ||||||
|             (update_type, result, start.elapsed()) |             (update_type, result, start.elapsed()) | ||||||
|         }, |         }, | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user