fix dump import

This commit is contained in:
Marin Postma
2021-05-27 14:30:20 +02:00
parent c47369839b
commit b258f4f394
7 changed files with 133 additions and 99 deletions

View File

@@ -9,12 +9,11 @@ use anyhow::bail;
use crate::option::IndexerOpts; use crate::option::IndexerOpts;
use super::update_handler::UpdateHandler; use super::{Unchecked, Index, Settings, update_handler::UpdateHandler};
use super::{Checked, Index, Settings};
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
struct DumpMeta { struct DumpMeta {
settings: Settings<Checked>, settings: Settings<Unchecked>,
primary_key: Option<String>, primary_key: Option<String>,
} }
@@ -33,7 +32,6 @@ impl Index {
} }
fn dump_documents(&self, txn: &RoTxn, path: impl AsRef<Path>) -> anyhow::Result<()> { fn dump_documents(&self, txn: &RoTxn, path: impl AsRef<Path>) -> anyhow::Result<()> {
println!("dumping documents");
let document_file_path = path.as_ref().join(DATA_FILE_NAME); let document_file_path = path.as_ref().join(DATA_FILE_NAME);
let mut document_file = File::create(&document_file_path)?; let mut document_file = File::create(&document_file_path)?;
@@ -61,11 +59,10 @@ impl Index {
} }
fn dump_meta(&self, txn: &RoTxn, path: impl AsRef<Path>) -> anyhow::Result<()> { fn dump_meta(&self, txn: &RoTxn, path: impl AsRef<Path>) -> anyhow::Result<()> {
println!("dumping settings");
let meta_file_path = path.as_ref().join(META_FILE_NAME); let meta_file_path = path.as_ref().join(META_FILE_NAME);
let mut meta_file = File::create(&meta_file_path)?; let mut meta_file = File::create(&meta_file_path)?;
let settings = self.settings_txn(txn)?; let settings = self.settings_txn(txn)?.into_unchecked();
let primary_key = self.primary_key(txn)?.map(String::from); let primary_key = self.primary_key(txn)?.map(String::from);
let meta = DumpMeta { settings, primary_key }; let meta = DumpMeta { settings, primary_key };
@@ -84,12 +81,13 @@ impl Index {
.as_ref() .as_ref()
.file_name() .file_name()
.with_context(|| format!("invalid dump index: {}", src.as_ref().display()))?; .with_context(|| format!("invalid dump index: {}", src.as_ref().display()))?;
let dst_dir_path = dst.as_ref().join(dir_name); let dst_dir_path = dst.as_ref().join("indexes").join(dir_name);
create_dir_all(&dst_dir_path)?; create_dir_all(&dst_dir_path)?;
let meta_path = src.as_ref().join(META_FILE_NAME); let meta_path = src.as_ref().join(META_FILE_NAME);
let mut meta_file = File::open(meta_path)?; let mut meta_file = File::open(meta_path)?;
let DumpMeta { settings, primary_key } = serde_json::from_reader(&mut meta_file)?; let DumpMeta { settings, primary_key } = serde_json::from_reader(&mut meta_file)?;
let settings = settings.check();
let index = Self::open(&dst_dir_path, size as usize)?; let index = Self::open(&dst_dir_path, size as usize)?;
let mut txn = index.write_txn()?; let mut txn = index.write_txn()?;

View File

@@ -87,6 +87,28 @@ impl Settings<Checked> {
_kind: PhantomData, _kind: PhantomData,
} }
} }
pub fn into_unchecked(self) -> Settings<Unchecked> {
let Self {
displayed_attributes,
searchable_attributes,
attributes_for_faceting,
ranking_rules,
stop_words,
distinct_attribute,
..
} = self;
Settings {
displayed_attributes,
searchable_attributes,
attributes_for_faceting,
ranking_rules,
stop_words,
distinct_attribute,
_kind: PhantomData,
}
}
} }
impl Settings<Unchecked> { impl Settings<Unchecked> {

View File

@@ -1,17 +1,18 @@
use super::{DumpError, DumpInfo, DumpMsg, DumpResult, DumpStatus}; use std::path::{Path, PathBuf};
use crate::{helpers::compression, index_controller::dump_actor::Metadata}; use std::sync::Arc;
use crate::index_controller::{update_actor, uuid_resolver};
use async_stream::stream; use async_stream::stream;
use chrono::Utc; use chrono::Utc;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use log::{error, info}; use log::{error, info};
use update_actor::UpdateActorHandle; use update_actor::UpdateActorHandle;
use uuid_resolver::UuidResolverHandle; use uuid_resolver::UuidResolverHandle;
use std::{fs::File, path::{Path, PathBuf}, sync::Arc}; use tokio::sync::{mpsc, oneshot, RwLock};
use tokio::{fs::create_dir_all, sync::{mpsc, oneshot, RwLock}};
use super::{DumpError, DumpInfo, DumpMsg, DumpResult, DumpStatus, DumpTask};
use crate::index_controller::{update_actor, uuid_resolver};
pub const CONCURRENT_DUMP_MSG: usize = 10; pub const CONCURRENT_DUMP_MSG: usize = 10;
const META_FILE_NAME: &'static str = "metadata.json";
pub struct DumpActor<UuidResolver, Update> { pub struct DumpActor<UuidResolver, Update> {
inbox: Option<mpsc::Receiver<DumpMsg>>, inbox: Option<mpsc::Receiver<DumpMsg>>,
@@ -155,54 +156,4 @@ where
}) })
) )
} }
}
struct DumpTask<U, P> {
path: PathBuf,
uuid_resolver: U,
update_handle: P,
uid: String,
update_db_size: u64,
index_db_size: u64,
}
impl<U, P> DumpTask<U, P>
where
U: UuidResolverHandle + Send + Sync + Clone + 'static,
P: UpdateActorHandle + Send + Sync + Clone + 'static,
{
async fn run(self) -> anyhow::Result<()> {
info!("Performing dump.");
create_dir_all(&self.path).await?;
let path_clone = self.path.clone();
let temp_dump_dir = tokio::task::spawn_blocking(|| tempfile::TempDir::new_in(path_clone)).await??;
let temp_dump_path = temp_dump_dir.path().to_owned();
let meta = Metadata::new_v2(self.index_db_size, self.update_db_size);
let meta_path = temp_dump_path.join(META_FILE_NAME);
let mut meta_file = File::create(&meta_path)?;
serde_json::to_writer(&mut meta_file, &meta)?;
let uuids = self.uuid_resolver.dump(temp_dump_path.clone()).await?;
self.update_handle.dump(uuids, temp_dump_path.clone()).await?;
let dump_path = tokio::task::spawn_blocking(move || -> anyhow::Result<PathBuf> {
let temp_dump_file = tempfile::NamedTempFile::new_in(&self.path)?;
compression::to_tar_gz(temp_dump_path, temp_dump_file.path())?;
let dump_path = self.path.join(format!("{}.dump", self.uid));
temp_dump_file.persist(&dump_path)?;
Ok(dump_path)
})
.await??;
info!("Created dump in {:?}.", dump_path);
Ok(())
}
} }

View File

@@ -2,7 +2,7 @@ use std::path::Path;
use anyhow::Context; use anyhow::Context;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use log::info; use log::{info, warn};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::{index::Index, index_controller::{update_actor::UpdateStore, uuid_resolver::HeedUuidStore}, option::IndexerOpts}; use crate::{index::Index, index_controller::{update_actor::UpdateStore, uuid_resolver::HeedUuidStore}, option::IndexerOpts};
@@ -29,6 +29,8 @@ impl MetadataV2 {
self, self,
src: impl AsRef<Path>, src: impl AsRef<Path>,
dst: impl AsRef<Path>, dst: impl AsRef<Path>,
_index_db_size: u64,
_update_db_size: u64,
indexing_options: &IndexerOpts, indexing_options: &IndexerOpts,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
info!( info!(
@@ -44,23 +46,26 @@ impl MetadataV2 {
let tmp_dst = tempfile::tempdir_in(dst_dir)?; let tmp_dst = tempfile::tempdir_in(dst_dir)?;
info!("Loading index database."); info!("Loading index database.");
let uuid_resolver_path = dst.as_ref().join("uuid_resolver/"); HeedUuidStore::load_dump(src.as_ref(), &tmp_dst)?;
std::fs::create_dir_all(&uuid_resolver_path)?;
HeedUuidStore::load_dump(src.as_ref(), tmp_dst.as_ref())?;
info!("Loading updates."); info!("Loading updates.");
UpdateStore::load_dump(&src, &tmp_dst.as_ref(), self.update_db_size)?; UpdateStore::load_dump(&src, &tmp_dst, self.update_db_size)?;
info!("Loading indexes"); info!("Loading indexes");
let indexes_path = src.as_ref().join("indexes"); let indexes_path = src.as_ref().join("indexes");
let indexes = indexes_path.read_dir()?; let indexes = indexes_path.read_dir()?;
for index in indexes { for index in indexes {
let index = index?; let index = index?;
Index::load_dump(&index.path(), &dst, self.index_db_size, indexing_options)?; Index::load_dump(&index.path(), &tmp_dst, self.index_db_size, indexing_options)?;
} }
// Persist and atomically rename the db // Persist and atomically rename the db
let persisted_dump = tmp_dst.into_path(); let persisted_dump = tmp_dst.into_path();
if dst.as_ref().exists() {
warn!("Overwriting database at {}", dst.as_ref().display());
std::fs::remove_dir_all(&dst)?;
}
std::fs::rename(&persisted_dump, &dst)?; std::fs::rename(&persisted_dump, &dst)?;
Ok(()) Ok(())

View File

@@ -1,6 +1,7 @@
use std::{fs::File, path::Path}; use std::fs::File;
use std::path::{Path, PathBuf};
use log::error; use log::{error, info};
#[cfg(test)] #[cfg(test)]
use mockall::automock; use mockall::automock;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@@ -12,16 +13,18 @@ use loaders::v2::MetadataV2;
pub use actor::DumpActor; pub use actor::DumpActor;
pub use handle_impl::*; pub use handle_impl::*;
pub use message::DumpMsg; pub use message::DumpMsg;
use tokio::fs::create_dir_all;
use crate::option::IndexerOpts; use super::{update_actor::UpdateActorHandle, uuid_resolver::UuidResolverHandle};
use crate::{helpers::compression, option::IndexerOpts};
use super::uuid_resolver::store::UuidStore;
mod actor; mod actor;
mod handle_impl; mod handle_impl;
mod loaders; mod loaders;
mod message; mod message;
const META_FILE_NAME: &'static str = "metadata.json";
pub type DumpResult<T> = std::result::Result<T, DumpError>; pub type DumpResult<T> = std::result::Result<T, DumpError>;
#[derive(Error, Debug)] #[derive(Error, Debug)]
@@ -66,23 +69,6 @@ impl Metadata {
let meta = MetadataV2::new(index_db_size, update_db_size); let meta = MetadataV2::new(index_db_size, update_db_size);
Self::V2 { meta } Self::V2 { meta }
} }
/// Extract Metadata from `metadata.json` file present at provided `dir_path`
fn from_path(dir_path: &Path) -> anyhow::Result<Self> {
let path = dir_path.join("metadata.json");
let file = File::open(path)?;
let reader = std::io::BufReader::new(file);
let metadata = serde_json::from_reader(reader)?;
Ok(metadata)
}
/// Write Metadata in `metadata.json` file at provided `dir_path`
pub async fn to_path(&self, dir_path: &Path) -> anyhow::Result<()> {
let path = dir_path.join("metadata.json");
tokio::fs::write(path, serde_json::to_string(self)?).await?;
Ok(())
}
} }
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] #[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
@@ -125,21 +111,84 @@ impl DumpInfo {
} }
} }
pub fn load_dump<U: UuidStore>( pub fn load_dump(
dst_path: impl AsRef<Path>, dst_path: impl AsRef<Path>,
src_path: impl AsRef<Path>, src_path: impl AsRef<Path>,
_index_db_size: u64, index_db_size: u64,
_update_db_size: u64, update_db_size: u64,
indexer_opts: &IndexerOpts, indexer_opts: &IndexerOpts,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let meta_path = src_path.as_ref().join("metadat.json"); let tmp_src = tempfile::tempdir_in(".")?;
let tmp_src_path = tmp_src.path();
compression::from_tar_gz(&src_path, tmp_src_path)?;
let meta_path = tmp_src_path.join(META_FILE_NAME);
let mut meta_file = File::open(&meta_path)?; let mut meta_file = File::open(&meta_path)?;
let meta: Metadata = serde_json::from_reader(&mut meta_file)?; let meta: Metadata = serde_json::from_reader(&mut meta_file)?;
match meta { match meta {
Metadata::V1 { meta } => meta.load_dump(src_path, dst_path)?, Metadata::V1 { meta } => meta.load_dump(&tmp_src_path, dst_path)?,
Metadata::V2 { meta } => meta.load_dump(src_path.as_ref(), dst_path.as_ref(), indexer_opts)?, Metadata::V2 { meta } => meta.load_dump(
&tmp_src_path,
dst_path.as_ref(),
index_db_size,
update_db_size,
indexer_opts,
)?,
} }
Ok(()) Ok(())
} }
struct DumpTask<U, P> {
path: PathBuf,
uuid_resolver: U,
update_handle: P,
uid: String,
update_db_size: u64,
index_db_size: u64,
}
impl<U, P> DumpTask<U, P>
where
U: UuidResolverHandle + Send + Sync + Clone + 'static,
P: UpdateActorHandle + Send + Sync + Clone + 'static,
{
async fn run(self) -> anyhow::Result<()> {
info!("Performing dump.");
create_dir_all(&self.path).await?;
let path_clone = self.path.clone();
let temp_dump_dir =
tokio::task::spawn_blocking(|| tempfile::TempDir::new_in(path_clone)).await??;
let temp_dump_path = temp_dump_dir.path().to_owned();
let meta = Metadata::new_v2(self.index_db_size, self.update_db_size);
let meta_path = temp_dump_path.join(META_FILE_NAME);
let mut meta_file = File::create(&meta_path)?;
serde_json::to_writer(&mut meta_file, &meta)?;
let uuids = self.uuid_resolver.dump(temp_dump_path.clone()).await?;
self.update_handle
.dump(uuids, temp_dump_path.clone())
.await?;
let dump_path = tokio::task::spawn_blocking(move || -> anyhow::Result<PathBuf> {
let temp_dump_file = tempfile::NamedTempFile::new_in(&self.path)?;
compression::to_tar_gz(temp_dump_path, temp_dump_file.path())?;
let dump_path = self.path.join(format!("{}.dump", self.uid));
temp_dump_file.persist(&dump_path)?;
Ok(dump_path)
})
.await??;
info!("Created dump in {:?}.", dump_path);
Ok(())
}
}

View File

@@ -25,6 +25,8 @@ use uuid_resolver::{UuidResolverError, UuidResolverHandle};
use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings}; use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings};
use crate::option::Opt; use crate::option::Opt;
use self::dump_actor::load_dump;
mod dump_actor; mod dump_actor;
mod index_actor; mod index_actor;
mod snapshot; mod snapshot;
@@ -91,8 +93,14 @@ impl IndexController {
options.ignore_snapshot_if_db_exists, options.ignore_snapshot_if_db_exists,
options.ignore_missing_snapshot, options.ignore_missing_snapshot,
)?; )?;
} else if let Some(ref _path) = options.import_dump { } else if let Some(ref src_path) = options.import_dump {
todo!("implement load dump") load_dump(
&options.db_path,
src_path,
options.max_mdb_size.get_bytes(),
options.max_udb_size.get_bytes(),
&options.indexer_options,
)?;
} }
std::fs::create_dir_all(&path)?; std::fs::create_dir_all(&path)?;

View File

@@ -178,6 +178,7 @@ impl HeedUuidStore {
Ok(0) => break, Ok(0) => break,
Ok(_) => { Ok(_) => {
let DumpEntry { uuid, uid } = serde_json::from_str(&line)?; let DumpEntry { uuid, uid } = serde_json::from_str(&line)?;
println!("importing {} {}", uid, uuid);
db.db.put(&mut txn, &uid, uuid.as_bytes())?; db.db.put(&mut txn, &uid, uuid.as_bytes())?;
} }
Err(e) => Err(e)?, Err(e) => Err(e)?,