mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-26 13:36:27 +00:00 
			
		
		
		
	feat: Database holds a DatabaseView and sync update ingestions
This commit is contained in:
		| @@ -1,12 +1,11 @@ | |||||||
|  | use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard}; | ||||||
| use std::error::Error; | use std::error::Error; | ||||||
| use std::path::Path; | use std::path::Path; | ||||||
| use std::ops::Deref; | use std::ops::Deref; | ||||||
| use std::sync::Arc; |  | ||||||
| use std::fmt; |  | ||||||
|  |  | ||||||
| use rocksdb::rocksdb_options::{DBOptions, IngestExternalFileOptions, ColumnFamilyOptions}; | use rocksdb::rocksdb_options::{DBOptions, IngestExternalFileOptions, ColumnFamilyOptions}; | ||||||
| use rocksdb::{DB, DBVector, MergeOperands, SeekKey}; |  | ||||||
| use rocksdb::rocksdb::{Writable, Snapshot}; | use rocksdb::rocksdb::{Writable, Snapshot}; | ||||||
|  | use rocksdb::{DB, DBVector, MergeOperands}; | ||||||
|  |  | ||||||
| pub use self::document_key::{DocumentKey, DocumentKeyAttr}; | pub use self::document_key::{DocumentKey, DocumentKeyAttr}; | ||||||
| pub use self::database_view::{DatabaseView, DocumentIter}; | pub use self::database_view::{DatabaseView, DocumentIter}; | ||||||
| @@ -43,8 +42,15 @@ where D: Deref<Target=DB> | |||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| #[derive(Clone)] | pub struct Database { | ||||||
| pub struct Database(Arc<DB>); |     // DB is under a Mutex to sync update ingestions and separate DB update locking | ||||||
|  |     // and DatabaseView acquiring locking in other words: | ||||||
|  |     // "Block readers the minimum possible amount of time" | ||||||
|  |     db: Mutex<Arc<DB>>, | ||||||
|  |  | ||||||
|  |     // This view is updated each time the DB ingests an update | ||||||
|  |     view: RwLock<DatabaseView<Arc<DB>>>, | ||||||
|  | } | ||||||
|  |  | ||||||
| impl Database { | impl Database { | ||||||
|     pub fn create<P: AsRef<Path>>(path: P, schema: Schema) -> Result<Database, Box<Error>> { |     pub fn create<P: AsRef<Path>>(path: P, schema: Schema) -> Result<Database, Box<Error>> { | ||||||
| @@ -68,7 +74,11 @@ impl Database { | |||||||
|         schema.write_to(&mut schema_bytes)?; |         schema.write_to(&mut schema_bytes)?; | ||||||
|         db.put(DATA_SCHEMA, &schema_bytes)?; |         db.put(DATA_SCHEMA, &schema_bytes)?; | ||||||
|  |  | ||||||
|         Ok(Database(Arc::new(db))) |         let db = Arc::new(db); | ||||||
|  |         let snapshot = Snapshot::new(db.clone()); | ||||||
|  |         let view = RwLock::new(DatabaseView::new(snapshot)?); | ||||||
|  |  | ||||||
|  |         Ok(Database { db: Mutex::new(db), view }) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub fn open<P: AsRef<Path>>(path: P) -> Result<Database, Box<Error>> { |     pub fn open<P: AsRef<Path>>(path: P) -> Result<Database, Box<Error>> { | ||||||
| @@ -88,58 +98,65 @@ impl Database { | |||||||
|             None => return Err(String::from("Database does not contain a schema").into()), |             None => return Err(String::from("Database does not contain a schema").into()), | ||||||
|         }; |         }; | ||||||
|  |  | ||||||
|         Ok(Database(Arc::new(db))) |         let db = Arc::new(db); | ||||||
|  |         let snapshot = Snapshot::new(db.clone()); | ||||||
|  |         let view = RwLock::new(DatabaseView::new(snapshot)?); | ||||||
|  |  | ||||||
|  |         Ok(Database { db: Mutex::new(db), view }) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub fn ingest_update_file(&self, update: Update) -> Result<(), Box<Error>> { |     pub fn ingest_update_file(&self, update: Update) -> Result<(), Box<Error>> { | ||||||
|         let move_update = update.can_be_moved(); |         let snapshot = { | ||||||
|         let path = update.into_path_buf(); |             // We must have a mutex here to ensure that update ingestions and compactions | ||||||
|         let path = path.to_string_lossy(); |             // are done atomatically and in the right order. | ||||||
|  |             // This way update ingestions will block other update ingestions without blocking view | ||||||
|  |             // creations while doing the "data-index" compaction | ||||||
|  |             let db = match self.db.lock() { | ||||||
|  |                 Ok(db) => db, | ||||||
|  |                 Err(e) => return Err(e.to_string().into()), | ||||||
|  |             }; | ||||||
|  |  | ||||||
|         let mut options = IngestExternalFileOptions::new(); |             let move_update = update.can_be_moved(); | ||||||
|         options.move_files(move_update); |             let path = update.into_path_buf(); | ||||||
|  |             let path = path.to_string_lossy(); | ||||||
|  |  | ||||||
|         let cf_handle = self.0.cf_handle("default").unwrap(); |             let mut options = IngestExternalFileOptions::new(); | ||||||
|         self.0.ingest_external_file_optimized(&cf_handle, &options, &[&path])?; |             options.move_files(move_update); | ||||||
|  |  | ||||||
|         // compacting to avoid calling the merge operator |             let cf_handle = db.cf_handle("default").expect("\"default\" column family not found"); | ||||||
|         self.0.compact_range(Some(DATA_INDEX), Some(DATA_INDEX)); |             db.ingest_external_file_optimized(&cf_handle, &options, &[&path])?; | ||||||
|  |  | ||||||
|  |             // Compacting to trigger the merge operator only one time | ||||||
|  |             // while ingesting the update and not each time searching | ||||||
|  |             db.compact_range(Some(DATA_INDEX), Some(DATA_INDEX)); | ||||||
|  |  | ||||||
|  |             Snapshot::new(db.clone()) | ||||||
|  |         }; | ||||||
|  |  | ||||||
|  |         // Here we will block the view creation for the minimum amount of time: | ||||||
|  |         // updating the DatabaseView itself with the new database snapshot | ||||||
|  |         let view = DatabaseView::new(snapshot)?; | ||||||
|  |         match self.view.write() { | ||||||
|  |             Ok(mut lock) => *lock = view, | ||||||
|  |             Err(e) => return Err(e.to_string().into()), | ||||||
|  |         } | ||||||
|  |  | ||||||
|         Ok(()) |         Ok(()) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub fn get(&self, key: &[u8]) -> Result<Option<DBVector>, Box<Error>> { |     pub fn get(&self, key: &[u8]) -> Result<Option<DBVector>, Box<Error>> { | ||||||
|         Ok(self.0.get(key)?) |         self.view().get(key) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub fn flush(&self) -> Result<(), Box<Error>> { |     pub fn flush(&self) -> Result<(), Box<Error>> { | ||||||
|         Ok(self.0.flush(true)?) |         match self.db.lock() { | ||||||
|     } |             Ok(db) => Ok(db.flush(true)?), | ||||||
|  |             Err(e) => Err(e.to_string().into()), | ||||||
|     pub fn view(&self) -> Result<DatabaseView<&DB>, Box<Error>> { |  | ||||||
|         let snapshot = self.0.snapshot(); |  | ||||||
|         DatabaseView::new(snapshot) |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     pub fn view_arc(&self) -> Result<DatabaseView<Arc<DB>>, Box<Error>> { |  | ||||||
|         let snapshot = Snapshot::new(self.0.clone()); |  | ||||||
|         DatabaseView::new(snapshot) |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| impl fmt::Debug for Database { |  | ||||||
|     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |  | ||||||
|         write!(f, "Database([")?; |  | ||||||
|         let mut iter = self.0.iter(); |  | ||||||
|         iter.seek(SeekKey::Start); |  | ||||||
|         let mut first = true; |  | ||||||
|         for (key, _value) in &mut iter { |  | ||||||
|             if !first { write!(f, ", ")?; } |  | ||||||
|             first = false; |  | ||||||
|             let key = String::from_utf8_lossy(&key); |  | ||||||
|             write!(f, "{:?}", key)?; |  | ||||||
|         } |         } | ||||||
|         write!(f, "])") |     } | ||||||
|  |  | ||||||
|  |     pub fn view(&self) -> RwLockReadGuard<DatabaseView<Arc<DB>>> { | ||||||
|  |         self.view.read().unwrap() | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -229,7 +246,7 @@ mod tests { | |||||||
|  |  | ||||||
|         update.set_move(true); |         update.set_move(true); | ||||||
|         database.ingest_update_file(update)?; |         database.ingest_update_file(update)?; | ||||||
|         let view = database.view()?; |         let view = database.view(); | ||||||
|  |  | ||||||
|         let de_doc0: SimpleDoc = view.retrieve_document(0)?; |         let de_doc0: SimpleDoc = view.retrieve_document(0)?; | ||||||
|         let de_doc1: SimpleDoc = view.retrieve_document(1)?; |         let de_doc1: SimpleDoc = view.retrieve_document(1)?; | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user