fix snapshots

This commit is contained in:
Marin Postma
2021-04-14 17:53:12 +02:00
parent 2b154524bb
commit 33830d5ecf
15 changed files with 109 additions and 101 deletions

View File

@ -8,12 +8,12 @@ use tokio::fs;
use tokio::io::{AsyncSeekExt, AsyncWriteExt};
use tokio::sync::mpsc;
use uuid::Uuid;
use futures::StreamExt;
use super::{PayloadData, Result, UpdateError, UpdateMsg, UpdateStore};
use crate::index_controller::index_actor::IndexActorHandle;
use crate::index_controller::index_actor::{IndexActorHandle, CONCURRENT_INDEX_MSG};
use crate::index_controller::{UpdateMeta, UpdateStatus};
pub struct UpdateActor<D, I> {
path: PathBuf,
store: Arc<UpdateStore>,
@ -32,10 +32,7 @@ where
path: impl AsRef<Path>,
index_handle: I,
) -> anyhow::Result<Self> {
let path = path
.as_ref()
.to_owned()
.join("updates");
let path = path.as_ref().to_owned().join("updates");
std::fs::create_dir_all(&path)?;
@ -81,11 +78,11 @@ where
Some(Delete { uuid, ret }) => {
let _ = ret.send(self.handle_delete(uuid).await);
}
Some(Snapshot { uuid, path, ret }) => {
let _ = ret.send(self.handle_snapshot(uuid, path).await);
Some(Snapshot { uuids, path, ret }) => {
let _ = ret.send(self.handle_snapshot(uuids, path).await);
}
Some(GetSize { uuid, ret }) => {
let _ = ret.send(self.handle_get_size(uuid).await);
Some(GetSize { ret }) => {
let _ = ret.send(self.handle_get_size().await);
}
None => break,
}
@ -200,7 +197,7 @@ where
Ok(())
}
async fn handle_snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> {
async fn handle_snapshot(&self, uuids: Vec<Uuid>, path: PathBuf) -> Result<()> {
let index_handle = self.index_handle.clone();
let update_store = self.store.clone();
tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
@ -210,32 +207,41 @@ where
let mut txn = update_store.env.write_txn()?;
// create db snapshot
update_store.snapshot(&mut txn, &path, uuid)?;
update_store.snapshot(&mut txn, &path)?;
futures::executor::block_on(
async move { index_handle.snapshot(uuid, path).await },
)?;
Ok(())
// Perform the snapshot of each index concurently. Only a third of the capabilities of
// the index actor at a time not to put too much pressure on the index actor
let path = &path;
let handle = &index_handle;
let mut stream = futures::stream::iter(uuids.iter())
.map(|&uuid| handle.snapshot(uuid, path.clone()))
.buffer_unordered(CONCURRENT_INDEX_MSG / 3);
futures::executor::block_on(async {
while let Some(res) = stream.next().await {
res?;
}
Ok(())
})
})
.await
.map_err(|e| UpdateError::Error(e.into()))?
.map_err(|e| UpdateError::Error(e.into()))?;
.map_err(|e| UpdateError::Error(e.into()))?
.map_err(|e| UpdateError::Error(e.into()))?;
Ok(())
}
async fn handle_get_size(&self, uuid: Uuid) -> Result<u64> {
let size = match self.store.get(uuid).await? {
Some(update_store) => tokio::task::spawn_blocking(move || -> anyhow::Result<u64> {
let txn = update_store.env.read_txn()?;
async fn handle_get_size(&self) -> Result<u64> {
let update_store = self.store.clone();
let size = tokio::task::spawn_blocking(move || -> anyhow::Result<u64> {
let txn = update_store.env.read_txn()?;
update_store.get_size(&txn)
})
.await
.map_err(|e| UpdateError::Error(e.into()))?
.map_err(|e| UpdateError::Error(e.into()))?,
None => 0,
};
update_store.get_size(&txn)
})
.await
.map_err(|e| UpdateError::Error(e.into()))?
.map_err(|e| UpdateError::Error(e.into()))?;
Ok(size)
}

View File

@ -6,8 +6,7 @@ use uuid::Uuid;
use crate::index_controller::IndexActorHandle;
use super::{
PayloadData, Result, UpdateActor, UpdateActorHandle, UpdateMeta,
UpdateMsg, UpdateStatus,
PayloadData, Result, UpdateActor, UpdateActorHandle, UpdateMeta, UpdateMsg, UpdateStatus,
};
#[derive(Clone)]
@ -27,7 +26,7 @@ where
where
I: IndexActorHandle + Clone + Send + Sync + 'static,
{
let path = path.as_ref().to_owned().join("updates");
let path = path.as_ref().to_owned();
let (sender, receiver) = mpsc::channel(100);
let actor = UpdateActor::new(update_store_size, receiver, path, index_handle)?;
@ -64,16 +63,16 @@ where
receiver.await.expect("update actor killed.")
}
async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> {
async fn snapshot(&self, uuids: Vec<Uuid>, path: PathBuf) -> Result<()> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::Snapshot { uuid, path, ret };
let msg = UpdateMsg::Snapshot { uuids, path, ret };
let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.")
}
async fn get_size(&self, uuid: Uuid) -> Result<u64> {
async fn get_size(&self) -> Result<u64> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::GetSize { uuid, ret };
let msg = UpdateMsg::GetSize { ret };
let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.")
}

View File

@ -26,12 +26,11 @@ pub enum UpdateMsg<D> {
ret: oneshot::Sender<Result<()>>,
},
Snapshot {
uuid: Uuid,
uuids: Vec<Uuid>,
path: PathBuf,
ret: oneshot::Sender<Result<()>>,
},
GetSize {
uuid: Uuid,
ret: oneshot::Sender<Result<u64>>,
},
}

View File

@ -40,8 +40,8 @@ pub trait UpdateActorHandle {
async fn get_all_updates_status(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>>;
async fn update_status(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus>;
async fn delete(&self, uuid: Uuid) -> Result<()>;
async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()>;
async fn get_size(&self, uuid: Uuid) -> Result<u64>;
async fn snapshot(&self, uuids: Vec<Uuid>, path: PathBuf) -> Result<()>;
async fn get_size(&self) -> Result<u64>;
async fn update(
&self,
meta: UpdateMeta,

View File

@ -5,6 +5,7 @@ use std::mem::size_of;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::Context;
use bytemuck::{Pod, Zeroable};
use heed::types::{ByteSlice, DecodeIgnore, SerdeJson};
use heed::{BytesDecode, BytesEncode, CompactionOption, Database, Env, EnvOpenOptions};
@ -106,7 +107,7 @@ where
mut options: EnvOpenOptions,
path: P,
update_handler: U,
) -> heed::Result<Arc<Self>>
) -> anyhow::Result<Arc<Self>>
where
P: AsRef<Path>,
U: HandleUpdate<M, N, E> + Sync + Clone + Send + 'static,
@ -127,6 +128,11 @@ where
let update_lock = Arc::new(Mutex::new(()));
// Init update loop to perform any pending updates at launch.
// Since we just launched the update store, and we still own the receiving end of the
// channel, this call is guarenteed to succeed.
notification_sender.try_send(()).expect("Failed to init update store");
let update_store = Arc::new(UpdateStore {
env,
pending,
@ -277,8 +283,11 @@ where
// to the update handler. Processing store is non persistent to be able recover
// from a failure
let processing = pending.processing();
self.processing.write().replace((index_uuid, processing.clone()));
let file = File::open(&content_path)?;
self.processing
.write()
.replace((index_uuid, processing.clone()));
let file = File::open(&content_path)
.with_context(|| format!("file at path: {:?}", &content_path))?;
// Process the pending update using the provided user function.
let result = handler.handle_update(index_uuid, processing, file)?;
drop(rtxn);
@ -521,9 +530,10 @@ where
fn delete_all<A>(
txn: &mut heed::RwTxn,
uuid: Uuid,
db: Database<ByteSlice, A>
db: Database<ByteSlice, A>,
) -> anyhow::Result<()>
where A: for<'a> heed::BytesDecode<'a>
where
A: for<'a> heed::BytesDecode<'a>,
{
let mut iter = db.prefix_iter_mut(txn, uuid.as_bytes())?;
while let Some(_) = iter.next() {
@ -553,19 +563,17 @@ where
&self,
txn: &mut heed::RwTxn,
path: impl AsRef<Path>,
uuid: Uuid,
) -> anyhow::Result<()> {
let update_path = path.as_ref().join("updates");
create_dir_all(&update_path)?;
let mut snapshot_path = update_path.join(format!("update-{}", uuid));
// acquire write lock to prevent further writes during snapshot
create_dir_all(&snapshot_path)?;
snapshot_path.push("data.mdb");
create_dir_all(&update_path)?;
let db_path = update_path.join("data.mdb");
// create db snapshot
self.env
.copy_to_path(&snapshot_path, CompactionOption::Enabled)?;
.copy_to_path(&db_path, CompactionOption::Enabled)?;
let update_files_path = update_path.join("update_files");
create_dir_all(&update_files_path)?;