WIP: rebasing on master

This commit is contained in:
tamo
2021-04-28 16:43:49 +02:00
parent ceb8d6e1c9
commit e389c088eb
20 changed files with 540 additions and 433 deletions

View File

@ -0,0 +1,258 @@
use std::{
fs::File,
path::{Path, PathBuf},
sync::Arc,
};
use anyhow::bail;
use heed::EnvOpenOptions;
use log::{error, info};
use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat};
use serde::{Deserialize, Serialize};
use tempfile::TempDir;
use tokio::fs;
use tokio::task::spawn_blocking;
use super::update_actor::UpdateActorHandle;
use super::uuid_resolver::UuidResolverHandle;
use super::IndexMetadata;
use crate::index::Index;
use crate::index_controller::uuid_resolver;
use crate::{helpers::compression, index::Settings};
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
enum DumpVersion {
V1,
}
impl DumpVersion {
const CURRENT: Self = Self::V1;
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct DumpMetadata {
indexes: Vec<IndexMetadata>,
db_version: String,
dump_version: DumpVersion,
}
impl DumpMetadata {
/// Create a DumpMetadata with the current dump version of meilisearch.
pub fn new(indexes: Vec<IndexMetadata>, db_version: String) -> Self {
DumpMetadata {
indexes,
db_version,
dump_version: DumpVersion::CURRENT,
}
}
/// Extract DumpMetadata from `metadata.json` file present at provided `dir_path`
fn from_path(dir_path: &Path) -> anyhow::Result<Self> {
let path = dir_path.join("metadata.json");
let file = File::open(path)?;
let reader = std::io::BufReader::new(file);
let metadata = serde_json::from_reader(reader)?;
Ok(metadata)
}
/// Write DumpMetadata in `metadata.json` file at provided `dir_path`
fn to_path(&self, dir_path: &Path) -> anyhow::Result<()> {
let path = dir_path.join("metadata.json");
let file = File::create(path)?;
serde_json::to_writer(file, &self)?;
Ok(())
}
}
pub struct DumpService<U, R> {
uuid_resolver_handle: R,
update_handle: U,
dump_path: PathBuf,
db_name: String,
}
impl<U, R> DumpService<U, R>
where
U: UpdateActorHandle,
R: UuidResolverHandle,
{
pub fn new(
uuid_resolver_handle: R,
update_handle: U,
dump_path: PathBuf,
db_name: String,
) -> Self {
Self {
uuid_resolver_handle,
update_handle,
dump_path,
db_name,
}
}
pub async fn run(self) {
if let Err(e) = self.perform_dump().await {
error!("{}", e);
}
}
async fn perform_dump(&self) -> anyhow::Result<()> {
info!("Performing dump.");
let dump_dir = self.dump_path.clone();
fs::create_dir_all(&dump_dir).await?;
let temp_dump_dir = spawn_blocking(move || tempfile::tempdir_in(dump_dir)).await??;
let temp_dump_path = temp_dump_dir.path().to_owned();
let uuids = self
.uuid_resolver_handle
.dump(temp_dump_path.clone())
.await?;
if uuids.is_empty() {
return Ok(());
}
let tasks = uuids
.iter()
.map(|&uuid| self.update_handle.dump(uuid, temp_dump_path.clone()))
.collect::<Vec<_>>();
futures::future::try_join_all(tasks).await?;
let dump_dir = self.dump_path.clone();
let dump_path = self.dump_path.join(format!("{}.dump", self.db_name));
let dump_path = spawn_blocking(move || -> anyhow::Result<PathBuf> {
let temp_dump_file = tempfile::NamedTempFile::new_in(dump_dir)?;
let temp_dump_file_path = temp_dump_file.path().to_owned();
compression::to_tar_gz(temp_dump_path, temp_dump_file_path)?;
temp_dump_file.persist(&dump_path)?;
Ok(dump_path)
})
.await??;
info!("Created dump in {:?}.", dump_path);
Ok(())
}
}
/// Extract Settings from `settings.json` file present at provided `dir_path`
fn settings_from_path(dir_path: &Path) -> anyhow::Result<Settings> {
let path = dir_path.join("settings.json");
let file = File::open(path)?;
let reader = std::io::BufReader::new(file);
let metadata = serde_json::from_reader(reader)?;
Ok(metadata)
}
/// Write Settings in `settings.json` file at provided `dir_path`
fn settings_to_path(settings: &Settings, dir_path: &Path) -> anyhow::Result<()> {
let path = dir_path.join("settings.json");
let file = File::create(path)?;
serde_json::to_writer(file, settings)?;
Ok(())
}
fn import_index_v1(size: usize, dump_path: &Path, index_path: &Path) -> anyhow::Result<()> {
std::fs::create_dir_all(&index_path)?;
let mut options = EnvOpenOptions::new();
options.map_size(size);
let index = milli::Index::new(options, index_path)?;
let index = Index(Arc::new(index));
// extract `settings.json` file and import content
let settings = settings_from_path(&dump_path)?;
let update_builder = UpdateBuilder::new(0);
index.update_settings(&settings, update_builder)?;
let update_builder = UpdateBuilder::new(1);
let file = File::open(&index_path.join("documents.jsonl"))?;
let reader = std::io::BufReader::new(file);
index.update_documents(
UpdateFormat::JsonStream,
IndexDocumentsMethod::ReplaceDocuments,
reader,
update_builder,
None,
)?;
// the last step: we extract the milli::Index and close it
Arc::try_unwrap(index.0)
.map_err(|_e| "[dumps] At this point no one is supposed to have a reference on the index")
.unwrap()
.prepare_for_closing()
.wait();
Ok(())
}
pub fn load_dump(
db_path: impl AsRef<Path>,
dump_path: impl AsRef<Path>,
size: usize,
) -> anyhow::Result<()> {
info!("Importing dump from {}...", dump_path.as_ref().display());
let db_path = db_path.as_ref();
let dump_path = dump_path.as_ref();
let uuid_resolver = uuid_resolver::UuidResolverHandleImpl::new(&db_path)?;
// extract the dump in a temporary directory
let tmp_dir = TempDir::new()?;
let tmp_dir_path = tmp_dir.path();
compression::from_tar_gz(dump_path, tmp_dir_path)?;
// read dump metadata
let metadata = DumpMetadata::from_path(&tmp_dir_path)?;
// choose importation function from DumpVersion of metadata
let import_index = match metadata.dump_version {
DumpVersion::V1 => import_index_v1,
};
// remove indexes which have same `uuid` than indexes to import and create empty indexes
let existing_index_uids = futures::executor::block_on(uuid_resolver.list())?;
info!("Deleting indexes provided in the dump...");
for idx in &metadata.indexes {
if let Some((_, uuid)) = existing_index_uids.iter().find(|(s, _)| s == &idx.uid) {
// if we find the index in the `uuid_resolver` it's supposed to exist on the file system
// and we want to delete it
let path = db_path.join(&format!("indexes/index-{}", uuid));
info!("Deleting {}", path.display());
use std::io::ErrorKind::*;
match std::fs::remove_dir_all(path) {
Ok(()) => (),
// if an index was present in the metadata but missing of the fs we can ignore the
// problem because we are going to create it later
Err(e) if e.kind() == NotFound => (),
Err(e) => bail!(e),
}
} else {
// if the index does not exist in the `uuid_resolver` we create it
futures::executor::block_on(uuid_resolver.create(idx.uid.clone()))?;
}
}
// import each indexes content
for idx in metadata.indexes {
let dump_path = tmp_dir_path.join(&idx.uid);
let uuid = futures::executor::block_on(uuid_resolver.get(idx.uid))?;
let index_path = db_path.join(&format!("indexes/index-{}", uuid));
info!("Importing dump from {} into {}...", dump_path.display(), index_path.display());
import_index(size, &dump_path, &index_path).unwrap();
info!("Dump importation from {} succeed", dump_path.display());
}
info!("Dump importation from {} succeed", dump_path.display());
Ok(())
}

View File

@ -36,6 +36,9 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
Ok(Self { receiver, update_handler, store })
}
/// `run` poll the write_receiver and read_receiver concurrently, but while messages send
/// through the read channel are processed concurrently, the messages sent through the write
/// channel are processed one at a time.
pub async fn run(mut self) {
let mut receiver = self
.receiver
@ -119,6 +122,9 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
Snapshot { uuid, path, ret } => {
let _ = ret.send(self.handle_snapshot(uuid, path).await);
}
Dump { uuid, path, ret } => {
let _ = ret.send(self.handle_dump(uuid, path).await);
}
GetStats { uuid, ret } => {
let _ = ret.send(self.handle_get_stats(uuid).await);
}
@ -306,7 +312,35 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
Ok(())
}
async fn handle_get_stats(&self, uuid: Uuid) -> IndexResult<IndexStats> {
async fn handle_dump(&self, uuid: Uuid, mut path: PathBuf) -> Result<()> {
use tokio::fs::create_dir_all;
path.push("indexes");
create_dir_all(&path)
.await
.map_err(|e| IndexError::Error(e.into()))?;
if let Some(index) = self.store.get(uuid).await? {
let mut index_path = path.join(format!("index-{}", uuid));
create_dir_all(&index_path)
.await
.map_err(|e| IndexError::Error(e.into()))?;
index_path.push("data.mdb");
spawn_blocking(move || -> anyhow::Result<()> {
// Get write txn to wait for ongoing write transaction before dump.
let _txn = index.write_txn()?;
index.env.copy_to_path(index_path, CompactionOption::Enabled)?;
Ok(())
})
.await
.map_err(|e| IndexError::Error(e.into()))?
.map_err(IndexError::Error)?;
}
Ok(())
}
async fn handle_get_stats(&self, uuid: Uuid) -> Result<IndexStats> {
let index = self
.store
.get(uuid)

View File

@ -136,7 +136,14 @@ impl IndexActorHandle for IndexActorHandleImpl {
Ok(receiver.await.expect("IndexActor has been killed")?)
}
async fn get_index_stats(&self, uuid: Uuid) -> IndexResult<IndexStats> {
async fn dump(&self, uuid: Uuid, path: PathBuf) -> Result<()> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Dump { uuid, path, ret };
let _ = self.read_sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
async fn get_index_stats(&self, uuid: Uuid) -> Result<IndexStats> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::GetStats { uuid, ret };
let _ = self.sender.send(msg).await;

View File

@ -60,6 +60,11 @@ pub enum IndexMsg {
path: PathBuf,
ret: oneshot::Sender<IndexResult<()>>,
},
Dump {
uuid: Uuid,
path: PathBuf,
ret: oneshot::Sender<Result<()>>,
},
GetStats {
uuid: Uuid,
ret: oneshot::Sender<IndexResult<IndexStats>>,

View File

@ -180,5 +180,4 @@ mod test {
async fn get_index_stats(&self, uuid: Uuid) -> IndexResult<IndexStats> {
self.as_ref().get_index_stats(uuid).await
}
}
}

View File

@ -5,7 +5,6 @@ use std::time::Duration;
use actix_web::web::{Bytes, Payload};
use anyhow::bail;
use chrono::{DateTime, Utc};
use futures::stream::StreamExt;
use log::info;
use milli::FieldsDistribution;
@ -25,6 +24,7 @@ use crate::option::Opt;
mod index_actor;
mod snapshot;
mod dump;
mod update_actor;
mod update_handler;
mod updates;
@ -87,6 +87,13 @@ impl IndexController {
options.ignore_snapshot_if_db_exists,
options.ignore_missing_snapshot,
)?;
} else if let Some(ref path) = options.import_dump {
load_dump(
&options.db_path,
path,
index_size,
);
}
std::fs::create_dir_all(&path)?;

View File

@ -71,11 +71,16 @@ where
Some(Delete { uuid, ret }) => {
let _ = ret.send(self.handle_delete(uuid).await);
}
Some(Snapshot { uuids, path, ret }) => {
let _ = ret.send(self.handle_snapshot(uuids, path).await);
Some(Snapshot { uuid, path, ret }) => {
let _ = ret.send(self.handle_snapshot(uuid, path).await);
}
Some(Dump { uuid, path, ret }) => {
let _ = ret.send(self.handle_dump(uuid, path).await);
}
Some(GetInfo { ret }) => {
let _ = ret.send(self.handle_get_info().await);
Some(GetSize { uuid, ret }) => {
let _ = ret.send(self.handle_get_size(uuid).await);
}
None => break,
}
@ -194,9 +199,51 @@ where
}
async fn handle_delete(&self, uuid: Uuid) -> Result<()> {
let store = self.store.clone();
let store = self.store.delete(uuid).await?;
tokio::task::spawn_blocking(move || store.delete_all(uuid))
if let Some(store) = store {
tokio::task::spawn(async move {
let store = get_arc_ownership_blocking(store).await;
tokio::task::spawn_blocking(move || {
store.prepare_for_closing().wait();
info!("Update store {} was closed.", uuid);
});
});
}
Ok(())
}
async fn handle_create(&self, uuid: Uuid) -> Result<()> {
let _ = self.store.get_or_create(uuid).await?;
Ok(())
}
Ok(())
}
async fn handle_create(&self, uuid: Uuid) -> Result<()> {
let _ = self.store.get_or_create(uuid).await?;
Ok(())
}
async fn handle_snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> {
let index_handle = self.index_handle.clone();
if let Some(update_store) = self.store.get(uuid).await? {
tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
// acquire write lock to prevent further writes during snapshot
// the update lock must be acquired BEFORE the write lock to prevent dead lock
let _lock = update_store.update_lock.lock();
let mut txn = update_store.env.write_txn()?;
// create db snapshot
update_store.snapshot(&mut txn, &path, uuid)?;
futures::executor::block_on(
async move { index_handle.snapshot(uuid, path).await },
)?;
Ok(())
})
.await
.map_err(|e| UpdateError::Error(e.into()))?
.map_err(|e| UpdateError::Error(e.into()))?;
@ -245,4 +292,42 @@ where
Ok(info)
}
async fn handle_dump(&self, uuid: Uuid, path: PathBuf) -> Result<()> {
let index_handle = self.index_handle.clone();
if let Some(update_store) = self.store.get(uuid).await? {
tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
// acquire write lock to prevent further writes during the dump
// the update lock must be acquired BEFORE the write lock to prevent dead lock
let _lock = update_store.update_lock.lock();
let mut txn = update_store.env.write_txn()?;
// create db dump
update_store.dump(&mut txn, &path, uuid)?;
futures::executor::block_on(
async move { index_handle.dump(uuid, path).await },
)?;
Ok(())
})
.await
.map_err(|e| UpdateError::Error(e.into()))?
.map_err(|e| UpdateError::Error(e.into()))?;
}
Ok(())
}
async fn handle_get_size(&self, uuid: Uuid) -> Result<u64> {
let size = match self.store.get(uuid).await? {
Some(update_store) => tokio::task::spawn_blocking(move || -> anyhow::Result<u64> {
let txn = update_store.env.read_txn()?;
update_store.get_size(&txn)
})
.await
.map_err(|e| UpdateError::Error(e.into()))?
.map_err(|e| UpdateError::Error(e.into()))?,
None => 0,
};
}

View File

@ -78,6 +78,20 @@ where
receiver.await.expect("update actor killed.")
}
async fn dump(&self, uuid: Uuid, path: PathBuf) -> Result<()> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::Dump { uuid, path, ret };
let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.")
}
async fn get_size(&self, uuid: Uuid) -> Result<u64> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::GetSize { uuid, ret };
let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.")
}
async fn update(
&self,
meta: UpdateMeta,

View File

@ -31,7 +31,16 @@ pub enum UpdateMsg<D> {
path: PathBuf,
ret: oneshot::Sender<Result<()>>,
},
Dump {
uuid: Uuid,
path: PathBuf,
ret: oneshot::Sender<Result<()>>,
},
GetInfo {
ret: oneshot::Sender<Result<UpdateStoreInfo>>,
},
GetSize {
uuid: Uuid,
ret: oneshot::Sender<Result<u64>>,
},
}

View File

@ -40,8 +40,11 @@ pub trait UpdateActorHandle {
async fn get_all_updates_status(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>>;
async fn update_status(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus>;
async fn delete(&self, uuid: Uuid) -> Result<()>;
async fn snapshot(&self, uuids: HashSet<Uuid>, path: PathBuf) -> Result<()>;
async fn create(&self, uuid: Uuid) -> Result<()>;
async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()>;
async fn dump(&self, uuid: Uuid, path: PathBuf) -> Result<()>;
async fn get_info(&self) -> Result<UpdateStoreInfo>;
async fn get_size(&self, uuid: Uuid) -> Result<u64>;
async fn update(
&self,
meta: UpdateMeta,

View File

@ -499,9 +499,56 @@ impl UpdateStore {
Ok(())
}
pub fn dump(
&self,
txn: &mut heed::RwTxn,
path: impl AsRef<Path>,
uuid: Uuid,
) -> anyhow::Result<()> {
let update_path = path.as_ref().join("updates");
create_dir_all(&update_path)?;
let mut dump_path = update_path.join(format!("update-{}", uuid));
// acquire write lock to prevent further writes during dump
create_dir_all(&dump_path)?;
dump_path.push("data.mdb");
// create db dump
self.env.copy_to_path(&dump_path, CompactionOption::Enabled)?;
let update_files_path = update_path.join("update_files");
create_dir_all(&update_files_path)?;
for path in self.pending.iter(&txn)? {
let (_, path) = path?;
let name = path.file_name().unwrap();
let to = update_files_path.join(name);
copy(path, to)?;
}
Ok(())
}
pub fn get_info(&self) -> anyhow::Result<UpdateStoreInfo> {
let mut size = self.env.size();
let txn = self.env.read_txn()?;
for entry in self.pending_queue.iter(&txn)? {
let (_, pending) = entry?;
if let Some(path) = pending.content_path() {
size += File::open(path)?.metadata()?.len();
}
}
let processing = match *self.state.read() {
State::Processing(uuid, _) => Some(uuid),
_ => None,
};
Ok(UpdateStoreInfo { size, processing })
}
pub fn get_size(&self, txn: &heed::RoTxn) -> anyhow::Result<u64> {
let mut size = self.env.size();
let txn = self.env.read_txn()?;
for entry in self.pending_queue.iter(&txn)? {
let (_, pending) = entry?;

View File

@ -41,6 +41,9 @@ impl<S: UuidStore> UuidResolverActor<S> {
Some(SnapshotRequest { path, ret }) => {
let _ = ret.send(self.handle_snapshot(path).await);
}
Some(DumpRequest { path, ret }) => {
let _ = ret.send(self.handle_dump(path).await);
}
Some(GetSize { ret }) => {
let _ = ret.send(self.handle_get_size().await);
}
@ -82,6 +85,10 @@ impl<S: UuidStore> UuidResolverActor<S> {
self.store.snapshot(path).await
}
async fn handle_dump(&self, path: PathBuf) -> Result<Vec<Uuid>> {
self.store.dump(path).await
}
async fn handle_insert(&self, uid: String, uuid: Uuid) -> Result<()> {
if !is_index_uid_valid(&uid) {
return Err(UuidError::BadlyFormatted(uid));

View File

@ -68,6 +68,7 @@ impl UuidResolverHandle for UuidResolverHandleImpl {
.expect("Uuid resolver actor has been killed")?)
}
/// TODO: we should merge this function with the dump function
async fn snapshot(&self, path: PathBuf) -> Result<HashSet<Uuid>> {
let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::SnapshotRequest { path, ret };
@ -77,6 +78,15 @@ impl UuidResolverHandle for UuidResolverHandleImpl {
.expect("Uuid resolver actor has been killed")?)
}
async fn dump(&self, path: PathBuf) -> Result<Vec<Uuid>> {
let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::DumpRequest { path, ret };
let _ = self.sender.send(msg).await;
Ok(receiver
.await
.expect("Uuid resolver actor has been killed")?)
}
async fn get_size(&self) -> Result<u64> {
let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::GetSize { ret };

View File

@ -31,6 +31,10 @@ pub enum UuidResolveMsg {
path: PathBuf,
ret: oneshot::Sender<Result<HashSet<Uuid>>>,
},
DumpRequest {
path: PathBuf,
ret: oneshot::Sender<Result<Vec<Uuid>>>,
},
GetSize {
ret: oneshot::Sender<Result<u64>>,
},

View File

@ -31,6 +31,7 @@ pub trait UuidResolverHandle {
async fn delete(&self, name: String) -> anyhow::Result<Uuid>;
async fn list(&self) -> anyhow::Result<Vec<(String, Uuid)>>;
async fn snapshot(&self, path: PathBuf) -> Result<HashSet<Uuid>>;
async fn dump(&self, path: PathBuf) -> Result<Vec<Uuid>>;
async fn get_size(&self) -> Result<u64>;
}

View File

@ -21,6 +21,7 @@ pub trait UuidStore {
async fn list(&self) -> Result<Vec<(String, Uuid)>>;
async fn insert(&self, name: String, uuid: Uuid) -> Result<()>;
async fn snapshot(&self, path: PathBuf) -> Result<HashSet<Uuid>>;
async fn dump(&self, path: PathBuf) -> Result<Vec<Uuid>>;
async fn get_size(&self) -> Result<u64>;
}
@ -130,6 +131,8 @@ impl UuidStore for HeedUuidStore {
.await?
}
// TODO: we should merge this function and the following function for the dump. it's exactly
// the same code
async fn snapshot(&self, mut path: PathBuf) -> Result<HashSet<Uuid>> {
let env = self.env.clone();
let db = self.db;
@ -155,6 +158,31 @@ impl UuidStore for HeedUuidStore {
.await?
}
async fn dump(&self, mut path: PathBuf) -> Result<Vec<Uuid>> {
let env = self.env.clone();
let db = self.db;
tokio::task::spawn_blocking(move || {
// Write transaction to acquire a lock on the database.
let txn = env.write_txn()?;
let mut entries = Vec::new();
for entry in db.iter(&txn)? {
let (_, uuid) = entry?;
let uuid = Uuid::from_slice(uuid)?;
entries.push(uuid)
}
// only perform dump if there are indexes
if !entries.is_empty() {
path.push("index_uuids");
create_dir_all(&path).unwrap();
path.push("data.mdb");
env.copy_to_path(path, CompactionOption::Enabled)?;
}
Ok(entries)
})
.await?
}
async fn get_size(&self) -> Result<u64> {
Ok(self.env.size())
}