Separate the update and main databases

We used the heed typed transaction to make it safe (https://github.com/Kerollmops/heed/pull/27).
This commit is contained in:
Clément Renault
2019-11-26 16:12:06 +01:00
parent 86a87d6032
commit d08b76a323
41 changed files with 498 additions and 414 deletions

View File

@ -14,8 +14,12 @@ use crate::{store, update, Index, MResult};
pub type BoxUpdateFn = Box<dyn Fn(&str, update::ProcessedUpdateResult) + Send + Sync + 'static>;
type ArcSwapFn = arc_swap::ArcSwapOption<BoxUpdateFn>;
pub struct MainT;
pub struct UpdateT;
pub struct Database {
pub env: heed::Env,
env: heed::Env,
update_env: heed::Env,
common_store: heed::PolyDatabase,
indexes_store: heed::Database<Str, Unit>,
indexes: RwLock<HashMap<String, (Index, thread::JoinHandle<MResult<()>>)>>,
@ -45,6 +49,7 @@ pub type UpdateEventsEmitter = Sender<UpdateEvent>;
fn update_awaiter(
receiver: UpdateEvents,
env: heed::Env,
update_env: heed::Env,
index_uid: &str,
update_fn: Arc<ArcSwapFn>,
index: Index,
@ -52,42 +57,54 @@ fn update_awaiter(
let mut receiver = receiver.into_iter();
while let Some(UpdateEvent::NewUpdate) = receiver.next() {
loop {
// instantiate a main/parent transaction
let mut writer = break_try!(env.write_txn(), "LMDB write transaction begin failed");
// We instantiate a *write* transaction to *block* the thread
// until the *other*, notifiying, thread commits
let result = update_env.typed_write_txn::<UpdateT>();
let update_reader = break_try!(result, "LMDB read transaction (update) begin failed");
// retrieve the update that needs to be processed
let result = index.updates.pop_front(&mut writer);
let result = index.updates.first_update(&update_reader);
let (update_id, update) = match break_try!(result, "pop front update failed") {
Some(value) => value,
None => {
debug!("no more updates");
writer.abort();
break;
}
};
// instantiate a nested transaction
let result = env.nested_write_txn(&mut writer);
let mut nested_writer = break_try!(result, "LMDB nested write transaction failed");
// do not keep the reader for too long
update_reader.abort();
// try to apply the update to the database using the nested transaction
let result = update::update_task(&mut nested_writer, index.clone(), update_id, update);
// instantiate a transaction to touch to the main env
let result = env.typed_write_txn::<MainT>();
let mut main_writer = break_try!(result, "LMDB nested write transaction failed");
// try to apply the update to the database using the main transaction
let result = update::update_task(&mut main_writer, &index, update_id, update);
let status = break_try!(result, "update task failed");
// commit the nested transaction if the update was successful, abort it otherwise
// commit the main transaction if the update was successful, abort it otherwise
if status.error.is_none() {
break_try!(nested_writer.commit(), "commit nested transaction failed");
break_try!(main_writer.commit(), "commit nested transaction failed");
} else {
nested_writer.abort()
main_writer.abort()
}
// write the result of the update in the updates-results store
let updates_results = index.updates_results;
let result = updates_results.put_update_result(&mut writer, update_id, &status);
// now that the update has been processed we can instantiate
// a transaction to move the result to the updates-results store
let result = update_env.typed_write_txn::<UpdateT>();
let mut update_writer = break_try!(result, "LMDB write transaction begin failed");
// always commit the main/parent transaction, even if the update was unsuccessful
// definitely remove the update from the updates store
index.updates.del_update(&mut update_writer, update_id)?;
// write the result of the updates-results store
let updates_results = index.updates_results;
let result = updates_results.put_update_result(&mut update_writer, update_id, &status);
// always commit the main transaction, even if the update was unsuccessful
break_try!(result, "update result store commit failed");
break_try!(writer.commit(), "update parent transaction failed");
break_try!(update_writer.commit(), "update transaction commit failed");
// call the user callback when the update and the result are written consistently
if let Some(ref callback) = *update_fn.load() {
@ -98,9 +115,11 @@ fn update_awaiter(
debug!("update loop system stopped");
let mut writer = env.write_txn()?;
store::clear(&mut writer, &index)?;
let mut writer = env.typed_write_txn::<MainT>()?;
let mut update_writer = update_env.typed_write_txn::<UpdateT>()?;
store::clear(&mut writer, &mut update_writer, &index)?;
writer.commit()?;
update_writer.commit()?;
debug!("store {} cleared", index_uid);
@ -109,12 +128,20 @@ fn update_awaiter(
impl Database {
pub fn open_or_create(path: impl AsRef<Path>) -> MResult<Database> {
fs::create_dir_all(path.as_ref())?;
let main_path = path.as_ref().join("main");
let update_path = path.as_ref().join("update");
fs::create_dir_all(&main_path)?;
let env = heed::EnvOpenOptions::new()
.map_size(10 * 1024 * 1024 * 1024) // 10GB
.max_dbs(3000)
.open(path)?;
.open(main_path)?;
fs::create_dir_all(&update_path)?;
let update_env = heed::EnvOpenOptions::new()
.map_size(10 * 1024 * 1024 * 1024) // 10GB
.max_dbs(3000)
.open(update_path)?;
let common_store = env.create_poly_database(Some("common"))?;
let indexes_store = env.create_database::<Str, Unit>(Some("indexes"))?;
@ -134,7 +161,7 @@ impl Database {
let mut indexes = HashMap::new();
for index_uid in must_open {
let (sender, receiver) = crossbeam_channel::bounded(100);
let index = match store::open(&env, &index_uid, sender.clone())? {
let index = match store::open(&env, &update_env, &index_uid, sender.clone())? {
Some(index) => index,
None => {
log::warn!(
@ -146,6 +173,7 @@ impl Database {
};
let env_clone = env.clone();
let update_env_clone = update_env.clone();
let index_clone = index.clone();
let name_clone = index_uid.clone();
let update_fn_clone = update_fn.clone();
@ -154,6 +182,7 @@ impl Database {
update_awaiter(
receiver,
env_clone,
update_env_clone,
&name_clone,
update_fn_clone,
index_clone,
@ -173,6 +202,7 @@ impl Database {
Ok(Database {
env,
update_env,
common_store,
indexes_store,
indexes: RwLock::new(indexes),
@ -196,12 +226,13 @@ impl Database {
Entry::Occupied(_) => Err(crate::Error::IndexAlreadyExists),
Entry::Vacant(entry) => {
let (sender, receiver) = crossbeam_channel::bounded(100);
let index = store::create(&self.env, name, sender)?;
let index = store::create(&self.env, &self.update_env, name, sender)?;
let mut writer = self.env.write_txn()?;
self.indexes_store.put(&mut writer, name, &())?;
let env_clone = self.env.clone();
let update_env_clone = self.update_env.clone();
let index_clone = index.clone();
let name_clone = name.to_owned();
let update_fn_clone = self.update_fn.clone();
@ -210,6 +241,7 @@ impl Database {
update_awaiter(
receiver,
env_clone,
update_env_clone,
&name_clone,
update_fn_clone,
index_clone,
@ -259,6 +291,22 @@ impl Database {
self.update_fn.swap(None);
}
pub fn main_read_txn(&self) -> heed::Result<heed::RoTxn<MainT>> {
self.env.typed_read_txn::<MainT>()
}
pub fn main_write_txn(&self) -> heed::Result<heed::RwTxn<MainT>> {
self.env.typed_write_txn::<MainT>()
}
pub fn update_read_txn(&self) -> heed::Result<heed::RoTxn<UpdateT>> {
self.update_env.typed_read_txn::<UpdateT>()
}
pub fn update_write_txn(&self) -> heed::Result<heed::RwTxn<UpdateT>> {
self.update_env.typed_write_txn::<UpdateT>()
}
pub fn copy_and_compact_to_path<P: AsRef<Path>>(&self, path: P) -> ZResult<File> {
self.env.copy_to_path(path, CompactionOption::Enabled)
}
@ -288,7 +336,7 @@ mod tests {
let dir = tempfile::tempdir().unwrap();
let database = Database::open_or_create(dir.path()).unwrap();
let env = &database.env;
let db = &database;
let (sender, receiver) = mpsc::sync_channel(100);
let update_fn = move |_name: &str, update: ProcessedUpdateResult| {
@ -313,9 +361,9 @@ mod tests {
toml::from_str(data).unwrap()
};
let mut writer = env.write_txn().unwrap();
let _update_id = index.schema_update(&mut writer, schema).unwrap();
writer.commit().unwrap();
let mut update_writer = db.update_write_txn().unwrap();
let _update_id = index.schema_update(&mut update_writer, schema).unwrap();
update_writer.commit().unwrap();
let mut additions = index.documents_addition();
@ -334,15 +382,15 @@ mod tests {
additions.update_document(doc1);
additions.update_document(doc2);
let mut writer = env.write_txn().unwrap();
let update_id = additions.finalize(&mut writer).unwrap();
writer.commit().unwrap();
let mut update_writer = db.update_write_txn().unwrap();
let update_id = additions.finalize(&mut update_writer).unwrap();
update_writer.commit().unwrap();
// block until the transaction is processed
let _ = receiver.into_iter().find(|id| *id == update_id);
let reader = env.read_txn().unwrap();
let result = index.update_status(&reader, update_id).unwrap();
let update_reader = db.update_read_txn().unwrap();
let result = index.update_status(&update_reader, update_id).unwrap();
assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_none());
}
@ -351,7 +399,7 @@ mod tests {
let dir = tempfile::tempdir().unwrap();
let database = Database::open_or_create(dir.path()).unwrap();
let env = &database.env;
let db = &database;
let (sender, receiver) = mpsc::sync_channel(100);
let update_fn = move |_name: &str, update: ProcessedUpdateResult| {
@ -376,9 +424,9 @@ mod tests {
toml::from_str(data).unwrap()
};
let mut writer = env.write_txn().unwrap();
let _update_id = index.schema_update(&mut writer, schema).unwrap();
writer.commit().unwrap();
let mut update_writer = db.update_write_txn().unwrap();
let _update_id = index.schema_update(&mut update_writer, schema).unwrap();
update_writer.commit().unwrap();
let mut additions = index.documents_addition();
@ -396,15 +444,15 @@ mod tests {
additions.update_document(doc1);
additions.update_document(doc2);
let mut writer = env.write_txn().unwrap();
let update_id = additions.finalize(&mut writer).unwrap();
writer.commit().unwrap();
let mut update_writer = db.update_write_txn().unwrap();
let update_id = additions.finalize(&mut update_writer).unwrap();
update_writer.commit().unwrap();
// block until the transaction is processed
let _ = receiver.into_iter().find(|id| *id == update_id);
let reader = env.read_txn().unwrap();
let result = index.update_status(&reader, update_id).unwrap();
let update_reader = db.update_read_txn().unwrap();
let result = index.update_status(&update_reader, update_id).unwrap();
assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_some());
}
@ -413,7 +461,7 @@ mod tests {
let dir = tempfile::tempdir().unwrap();
let database = Database::open_or_create(dir.path()).unwrap();
let env = &database.env;
let db = &database;
let (sender, receiver) = mpsc::sync_channel(100);
let update_fn = move |_name: &str, update: ProcessedUpdateResult| {
@ -434,9 +482,9 @@ mod tests {
toml::from_str(data).unwrap()
};
let mut writer = env.write_txn().unwrap();
let _update_id = index.schema_update(&mut writer, schema).unwrap();
writer.commit().unwrap();
let mut update_writer = db.update_write_txn().unwrap();
let _update_id = index.schema_update(&mut update_writer, schema).unwrap();
update_writer.commit().unwrap();
let mut additions = index.documents_addition();
@ -447,15 +495,15 @@ mod tests {
additions.update_document(doc1);
let mut writer = env.write_txn().unwrap();
let update_id = additions.finalize(&mut writer).unwrap();
writer.commit().unwrap();
let mut update_writer = db.update_write_txn().unwrap();
let update_id = additions.finalize(&mut update_writer).unwrap();
update_writer.commit().unwrap();
// block until the transaction is processed
let _ = receiver.into_iter().find(|id| *id == update_id);
let reader = env.read_txn().unwrap();
let result = index.update_status(&reader, update_id).unwrap();
let update_reader = db.update_read_txn().unwrap();
let result = index.update_status(&update_reader, update_id).unwrap();
assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_none());
}
@ -464,7 +512,7 @@ mod tests {
let dir = tempfile::tempdir().unwrap();
let database = Database::open_or_create(dir.path()).unwrap();
let env = &database.env;
let db = &database;
let (sender, receiver) = mpsc::sync_channel(100);
let update_fn = move |_name: &str, update: ProcessedUpdateResult| {
@ -489,9 +537,9 @@ mod tests {
toml::from_str(data).unwrap()
};
let mut writer = env.write_txn().unwrap();
let _update_id = index.schema_update(&mut writer, schema).unwrap();
writer.commit().unwrap();
let mut update_writer = db.update_write_txn().unwrap();
let _update_id = index.schema_update(&mut update_writer, schema).unwrap();
update_writer.commit().unwrap();
let mut additions = index.documents_addition();
@ -510,9 +558,9 @@ mod tests {
additions.update_document(doc1);
additions.update_document(doc2);
let mut writer = env.write_txn().unwrap();
let _update_id = additions.finalize(&mut writer).unwrap();
writer.commit().unwrap();
let mut update_writer = db.update_write_txn().unwrap();
let _update_id = additions.finalize(&mut update_writer).unwrap();
update_writer.commit().unwrap();
let schema = {
let data = r#"
@ -537,7 +585,7 @@ mod tests {
toml::from_str(data).unwrap()
};
let mut writer = env.write_txn().unwrap();
let mut writer = db.update_write_txn().unwrap();
let update_id = index.schema_update(&mut writer, schema).unwrap();
writer.commit().unwrap();
@ -545,10 +593,10 @@ mod tests {
let _ = receiver.iter().find(|id| *id == update_id);
// check if it has been accepted
let reader = env.read_txn().unwrap();
let result = index.update_status(&reader, update_id).unwrap();
let update_reader = db.update_read_txn().unwrap();
let result = index.update_status(&update_reader, update_id).unwrap();
assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_none());
reader.abort();
update_reader.abort();
let mut additions = index.documents_addition();
@ -571,7 +619,7 @@ mod tests {
additions.update_document(doc1);
additions.update_document(doc2);
let mut writer = env.write_txn().unwrap();
let mut writer = db.update_write_txn().unwrap();
let update_id = additions.finalize(&mut writer).unwrap();
writer.commit().unwrap();
@ -579,11 +627,13 @@ mod tests {
let _ = receiver.iter().find(|id| *id == update_id);
// check if it has been accepted
let reader = env.read_txn().unwrap();
let result = index.update_status(&reader, update_id).unwrap();
let update_reader = db.update_read_txn().unwrap();
let result = index.update_status(&update_reader, update_id).unwrap();
assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_none());
update_reader.abort();
// even try to search for a document
let reader = db.main_read_txn().unwrap();
let results = index.query_builder().query(&reader, "21 ", 0..20).unwrap();
assert_matches!(results.len(), 1);
@ -617,7 +667,7 @@ mod tests {
toml::from_str(data).unwrap()
};
let mut writer = env.write_txn().unwrap();
let mut writer = db.update_write_txn().unwrap();
let update_id = index.schema_update(&mut writer, schema).unwrap();
writer.commit().unwrap();
@ -625,8 +675,8 @@ mod tests {
let _ = receiver.iter().find(|id| *id == update_id);
// check if it has been accepted
let reader = env.read_txn().unwrap();
let result = index.update_status(&reader, update_id).unwrap();
let update_reader = db.update_read_txn().unwrap();
let result = index.update_status(&update_reader, update_id).unwrap();
assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_some());
}
@ -635,7 +685,7 @@ mod tests {
let dir = tempfile::tempdir().unwrap();
let database = Database::open_or_create(dir.path()).unwrap();
let env = &database.env;
let db = &database;
let (sender, receiver) = mpsc::sync_channel(100);
let update_fn = move |_name: &str, update: ProcessedUpdateResult| {
@ -660,7 +710,7 @@ mod tests {
toml::from_str(data).unwrap()
};
let mut writer = env.write_txn().unwrap();
let mut writer = db.update_write_txn().unwrap();
let _update_id = index.schema_update(&mut writer, schema).unwrap();
writer.commit().unwrap();
@ -683,17 +733,19 @@ mod tests {
additions.update_document(doc1);
additions.update_document(doc2);
let mut writer = env.write_txn().unwrap();
let mut writer = db.update_write_txn().unwrap();
let update_id = additions.finalize(&mut writer).unwrap();
writer.commit().unwrap();
// block until the transaction is processed
let _ = receiver.into_iter().find(|id| *id == update_id);
let reader = env.read_txn().unwrap();
let result = index.update_status(&reader, update_id).unwrap();
let update_reader = db.update_read_txn().unwrap();
let result = index.update_status(&update_reader, update_id).unwrap();
assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_none());
update_reader.abort();
let reader = db.main_read_txn().unwrap();
let document: Option<IgnoredAny> = index.document(&reader, None, DocumentId(25)).unwrap();
assert!(document.is_none());
@ -713,7 +765,7 @@ mod tests {
let dir = tempfile::tempdir().unwrap();
let database = Database::open_or_create(dir.path()).unwrap();
let env = &database.env;
let db = &database;
let (sender, receiver) = mpsc::sync_channel(100);
let update_fn = move |_name: &str, update: ProcessedUpdateResult| {
@ -741,7 +793,7 @@ mod tests {
toml::from_str(data).unwrap()
};
let mut writer = env.write_txn().unwrap();
let mut writer = db.update_write_txn().unwrap();
let _update_id = index.schema_update(&mut writer, schema).unwrap();
writer.commit().unwrap();
@ -764,17 +816,19 @@ mod tests {
additions.update_document(doc1);
additions.update_document(doc2);
let mut writer = env.write_txn().unwrap();
let mut writer = db.update_write_txn().unwrap();
let update_id = additions.finalize(&mut writer).unwrap();
writer.commit().unwrap();
// block until the transaction is processed
let _ = receiver.iter().find(|id| *id == update_id);
let reader = env.read_txn().unwrap();
let result = index.update_status(&reader, update_id).unwrap();
let update_reader = db.update_read_txn().unwrap();
let result = index.update_status(&update_reader, update_id).unwrap();
assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_none());
update_reader.abort();
let reader = db.main_read_txn().unwrap();
let document: Option<IgnoredAny> = index.document(&reader, None, DocumentId(25)).unwrap();
assert!(document.is_none());
@ -807,17 +861,19 @@ mod tests {
partial_additions.update_document(partial_doc1);
partial_additions.update_document(partial_doc2);
let mut writer = env.write_txn().unwrap();
let mut writer = db.update_write_txn().unwrap();
let update_id = partial_additions.finalize(&mut writer).unwrap();
writer.commit().unwrap();
// block until the transaction is processed
let _ = receiver.iter().find(|id| *id == update_id);
let reader = env.read_txn().unwrap();
let result = index.update_status(&reader, update_id).unwrap();
let update_reader = db.update_read_txn().unwrap();
let result = index.update_status(&update_reader, update_id).unwrap();
assert_matches!(result, Some(UpdateStatus::Processed { content }) if content.error.is_none());
update_reader.abort();
let reader = db.main_read_txn().unwrap();
let document: Option<serde_json::Value> = index
.document(&reader, None, DocumentId(7900334843754999545))
.unwrap();
@ -846,7 +902,7 @@ mod tests {
let dir = tempfile::tempdir().unwrap();
let database = Arc::new(Database::open_or_create(dir.path()).unwrap());
let env = &database.env;
let db = &database;
let (sender, receiver) = mpsc::sync_channel(100);
let db_cloned = database.clone();
@ -877,7 +933,7 @@ mod tests {
};
// add a schema to the index
let mut writer = env.write_txn().unwrap();
let mut writer = db.update_write_txn().unwrap();
let _update_id = index.schema_update(&mut writer, schema).unwrap();
writer.commit().unwrap();
@ -899,7 +955,7 @@ mod tests {
additions.update_document(doc1);
additions.update_document(doc2);
let mut writer = env.write_txn().unwrap();
let mut writer = db.update_write_txn().unwrap();
let update_id = additions.finalize(&mut writer).unwrap();
writer.commit().unwrap();
@ -919,7 +975,7 @@ mod tests {
let dir = tempfile::tempdir().unwrap();
let database = Database::open_or_create(dir.path()).unwrap();
let env = &database.env;
let db = &database;
let (sender, receiver) = mpsc::sync_channel(100);
let update_fn = move |_name: &str, update: ProcessedUpdateResult| {
@ -944,7 +1000,7 @@ mod tests {
toml::from_str(data).unwrap()
};
let mut writer = env.write_txn().unwrap();
let mut writer = db.update_write_txn().unwrap();
let _update_id = index.schema_update(&mut writer, schema).unwrap();
writer.commit().unwrap();
@ -967,15 +1023,14 @@ mod tests {
additions.update_document(doc1);
additions.update_document(doc2);
let mut writer = env.write_txn().unwrap();
let mut writer = db.update_write_txn().unwrap();
let update_id = additions.finalize(&mut writer).unwrap();
writer.commit().unwrap();
// block until the transaction is processed
let _ = receiver.into_iter().find(|id| *id == update_id);
let reader = env.read_txn().unwrap();
let reader = db.main_read_txn().unwrap();
let schema = index.main.schema(&reader).unwrap().unwrap();
let ranked_map = index.main.ranked_map(&reader).unwrap().unwrap();