implement load uuid_resolver

This commit is contained in:
Marin Postma
2021-05-26 20:42:09 +02:00
parent 9278a6fe59
commit e818c33fec
14 changed files with 438 additions and 411 deletions

View File

@@ -1,27 +1,26 @@
use super::{DumpError, DumpInfo, DumpMsg, DumpResult, DumpStatus}; use super::{DumpError, DumpInfo, DumpMsg, DumpResult, DumpStatus};
use crate::helpers::compression; use crate::helpers::compression;
use crate::index_controller::{index_actor, update_actor, uuid_resolver, IndexMetadata}; use crate::index_controller::{update_actor, uuid_resolver};
use async_stream::stream; use async_stream::stream;
use chrono::Utc; use chrono::Utc;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use log::{error, info}; use log::{error, info};
use std::{ use std::{
collections::HashSet,
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::Arc, sync::Arc,
}; };
use tokio::sync::{mpsc, oneshot, RwLock}; use tokio::{fs::create_dir_all, sync::{mpsc, oneshot, RwLock}};
use uuid::Uuid;
pub const CONCURRENT_DUMP_MSG: usize = 10; pub const CONCURRENT_DUMP_MSG: usize = 10;
pub struct DumpActor<UuidResolver, Index, Update> { pub struct DumpActor<UuidResolver, Update> {
inbox: Option<mpsc::Receiver<DumpMsg>>, inbox: Option<mpsc::Receiver<DumpMsg>>,
uuid_resolver: UuidResolver, uuid_resolver: UuidResolver,
index: Index,
update: Update, update: Update,
dump_path: PathBuf, dump_path: PathBuf,
dump_info: Arc<RwLock<Option<DumpInfo>>>, dump_info: Arc<RwLock<Option<DumpInfo>>>,
_update_db_size: u64,
_index_db_size: u64,
} }
/// Generate uid from creation date /// Generate uid from creation date
@@ -29,26 +28,27 @@ fn generate_uid() -> String {
Utc::now().format("%Y%m%d-%H%M%S%3f").to_string() Utc::now().format("%Y%m%d-%H%M%S%3f").to_string()
} }
impl<UuidResolver, Index, Update> DumpActor<UuidResolver, Index, Update> impl<UuidResolver, Update> DumpActor<UuidResolver, Update>
where where
UuidResolver: uuid_resolver::UuidResolverHandle + Send + Sync + Clone + 'static, UuidResolver: uuid_resolver::UuidResolverHandle + Send + Sync + Clone + 'static,
Index: index_actor::IndexActorHandle + Send + Sync + Clone + 'static,
Update: update_actor::UpdateActorHandle + Send + Sync + Clone + 'static, Update: update_actor::UpdateActorHandle + Send + Sync + Clone + 'static,
{ {
pub fn new( pub fn new(
inbox: mpsc::Receiver<DumpMsg>, inbox: mpsc::Receiver<DumpMsg>,
uuid_resolver: UuidResolver, uuid_resolver: UuidResolver,
index: Index,
update: Update, update: Update,
dump_path: impl AsRef<Path>, dump_path: impl AsRef<Path>,
_index_db_size: u64,
_update_db_size: u64,
) -> Self { ) -> Self {
Self { Self {
inbox: Some(inbox), inbox: Some(inbox),
uuid_resolver, uuid_resolver,
index,
update, update,
dump_path: dump_path.as_ref().into(), dump_path: dump_path.as_ref().into(),
dump_info: Arc::new(RwLock::new(None)), dump_info: Arc::new(RwLock::new(None)),
_index_db_size,
_update_db_size,
} }
} }
@@ -155,7 +155,7 @@ where
} }
async fn perform_dump<UuidResolver, Update>( async fn perform_dump<UuidResolver, Update>(
dump_path: PathBuf, path: PathBuf,
uuid_resolver: UuidResolver, uuid_resolver: UuidResolver,
update_handle: Update, update_handle: Update,
uid: String, uid: String,
@@ -166,19 +166,23 @@ where
{ {
info!("Performing dump."); info!("Performing dump.");
let dump_path_clone = dump_path.clone(); create_dir_all(&path).await?;
let temp_dump_path = tokio::task::spawn_blocking(|| tempfile::TempDir::new_in(dump_path_clone)).await??;
let uuids = uuid_resolver.dump(temp_dump_path.path().to_owned()).await?; let path_clone = path.clone();
let temp_dump_dir = tokio::task::spawn_blocking(|| tempfile::TempDir::new_in(path_clone)).await??;
let temp_dump_path = temp_dump_dir.path().to_owned();
update_handle.dump(uuids, temp_dump_path.path().to_owned()).await?; let uuids = uuid_resolver.dump(temp_dump_path.clone()).await?;
update_handle.dump(uuids, temp_dump_path.clone()).await?;
let dump_path = dump_path.join(format!("{}.dump", uid));
let dump_path = tokio::task::spawn_blocking(move || -> anyhow::Result<PathBuf> { let dump_path = tokio::task::spawn_blocking(move || -> anyhow::Result<PathBuf> {
let temp_dump_file = tempfile::NamedTempFile::new_in(&dump_path)?; let temp_dump_file = tempfile::NamedTempFile::new_in(&path)?;
let temp_dump_file_path = temp_dump_file.path().to_owned(); compression::to_tar_gz(temp_dump_path, temp_dump_file.path())?;
compression::to_tar_gz(temp_dump_path, temp_dump_file_path)?;
let dump_path = path.join(format!("{}.dump", uid));
temp_dump_file.persist(&dump_path)?; temp_dump_file.persist(&dump_path)?;
Ok(dump_path) Ok(dump_path)
}) })
.await??; .await??;
@@ -187,29 +191,3 @@ where
Ok(()) Ok(())
} }
async fn list_indexes<UuidResolver, Index>(
uuid_resolver: &UuidResolver,
index: &Index,
) -> anyhow::Result<Vec<IndexMetadata>>
where
UuidResolver: uuid_resolver::UuidResolverHandle,
Index: index_actor::IndexActorHandle,
{
let uuids = uuid_resolver.list().await?;
let mut ret = Vec::new();
for (uid, uuid) in uuids {
let meta = index.get_index_meta(uuid).await?;
let meta = IndexMetadata {
uuid,
name: uid.clone(),
uid,
meta,
};
ret.push(meta);
}
Ok(ret)
}

View File

@@ -29,13 +29,15 @@ impl DumpActorHandleImpl {
pub fn new( pub fn new(
path: impl AsRef<Path>, path: impl AsRef<Path>,
uuid_resolver: crate::index_controller::uuid_resolver::UuidResolverHandleImpl, uuid_resolver: crate::index_controller::uuid_resolver::UuidResolverHandleImpl,
index: crate::index_controller::index_actor::IndexActorHandleImpl,
update: crate::index_controller::update_actor::UpdateActorHandleImpl<Bytes>, update: crate::index_controller::update_actor::UpdateActorHandleImpl<Bytes>,
index_db_size: u64,
update_db_size: u64,
) -> anyhow::Result<Self> { ) -> anyhow::Result<Self> {
let (sender, receiver) = mpsc::channel(10); let (sender, receiver) = mpsc::channel(10);
let actor = DumpActor::new(receiver, uuid_resolver, index, update, path); let actor = DumpActor::new(receiver, uuid_resolver, update, path, index_db_size, update_db_size);
tokio::task::spawn(actor.run()); tokio::task::spawn(actor.run());
Ok(Self { sender }) Ok(Self { sender })
} }
} }

View File

@@ -0,0 +1,2 @@
pub mod v1;
pub mod v2;

View File

@@ -0,0 +1,137 @@
use std::path::Path;
use serde::{Deserialize, Serialize};
use crate::index_controller::IndexMetadata;
#[derive(Serialize, Deserialize, Debug)]
pub struct MetadataV1 {
db_version: String,
indexes: Vec<IndexMetadata>,
}
impl MetadataV1 {
pub fn load_dump(self, _src: impl AsRef<Path>, _dst: impl AsRef<Path>) -> anyhow::Result<()> {
todo!("implement load v1")
}
}
// This is the settings used in the last version of meilisearch exporting dump in V1
//#[derive(Default, Clone, Serialize, Deserialize, Debug)]
//#[serde(rename_all = "camelCase", deny_unknown_fields)]
//struct Settings {
//#[serde(default, deserialize_with = "deserialize_some")]
//pub ranking_rules: Option<Option<Vec<String>>>,
//#[serde(default, deserialize_with = "deserialize_some")]
//pub distinct_attribute: Option<Option<String>>,
//#[serde(default, deserialize_with = "deserialize_some")]
//pub searchable_attributes: Option<Option<Vec<String>>>,
//#[serde(default, deserialize_with = "deserialize_some")]
//pub displayed_attributes: Option<Option<BTreeSet<String>>>,
//#[serde(default, deserialize_with = "deserialize_some")]
//pub stop_words: Option<Option<BTreeSet<String>>>,
//#[serde(default, deserialize_with = "deserialize_some")]
//pub synonyms: Option<Option<BTreeMap<String, Vec<String>>>>,
//#[serde(default, deserialize_with = "deserialize_some")]
//pub attributes_for_faceting: Option<Option<Vec<String>>>,
//}
///// we need to **always** be able to convert the old settings to the settings currently being used
//impl From<Settings> for index_controller::Settings<Unchecked> {
//fn from(settings: Settings) -> Self {
//if settings.synonyms.flatten().is_some() {
//error!("`synonyms` are not yet implemented and thus will be ignored");
//}
//Self {
//distinct_attribute: settings.distinct_attribute,
//// we need to convert the old `Vec<String>` into a `BTreeSet<String>`
//displayed_attributes: settings.displayed_attributes.map(|o| o.map(|vec| vec.into_iter().collect())),
//searchable_attributes: settings.searchable_attributes,
//// we previously had a `Vec<String>` but now we have a `HashMap<String, String>`
//// representing the name of the faceted field + the type of the field. Since the type
//// was not known in the V1 of the dump we are just going to assume everything is a
//// String
//attributes_for_faceting: settings.attributes_for_faceting.map(|o| o.map(|vec| vec.into_iter().map(|key| (key, String::from("string"))).collect())),
//// we need to convert the old `Vec<String>` into a `BTreeSet<String>`
//ranking_rules: settings.ranking_rules.map(|o| o.map(|vec| vec.into_iter().filter_map(|criterion| {
//match criterion.as_str() {
//"words" | "typo" | "proximity" | "attribute" => Some(criterion),
//s if s.starts_with("asc") || s.starts_with("desc") => Some(criterion),
//"wordsPosition" => {
//warn!("The criteria `words` and `wordsPosition` have been merged into a single criterion `words` so `wordsPositon` will be ignored");
//Some(String::from("words"))
//}
//"exactness" => {
//error!("The criterion `{}` is not implemented currently and thus will be ignored", criterion);
//None
//}
//s => {
//error!("Unknown criterion found in the dump: `{}`, it will be ignored", s);
//None
//}
//}
//}).collect())),
//// we need to convert the old `Vec<String>` into a `BTreeSet<String>`
//stop_words: settings.stop_words.map(|o| o.map(|vec| vec.into_iter().collect())),
//_kind: PhantomData,
//}
//}
//}
///// Extract Settings from `settings.json` file present at provided `dir_path`
//fn import_settings(dir_path: &Path) -> anyhow::Result<Settings> {
//let path = dir_path.join("settings.json");
//let file = File::open(path)?;
//let reader = std::io::BufReader::new(file);
//let metadata = serde_json::from_reader(reader)?;
//Ok(metadata)
//}
//pub fn import_dump(
//size: usize,
//uuid: Uuid,
//dump_path: &Path,
//db_path: &Path,
//primary_key: Option<&str>,
//) -> anyhow::Result<()> {
//let index_path = db_path.join(&format!("indexes/index-{}", uuid));
//info!("Importing a dump from an old version of meilisearch with dump version 1");
//std::fs::create_dir_all(&index_path)?;
//let mut options = EnvOpenOptions::new();
//options.map_size(size);
//let index = milli::Index::new(options, index_path)?;
//let index = Index(Arc::new(index));
//// extract `settings.json` file and import content
//let settings = import_settings(&dump_path)?;
//let settings: index_controller::Settings<Unchecked> = settings.into();
//let update_builder = UpdateBuilder::new(0);
//index.update_settings(&settings.check(), update_builder)?;
//let update_builder = UpdateBuilder::new(1);
//let file = File::open(&dump_path.join("documents.jsonl"))?;
//let reader = std::io::BufReader::new(file);
//// TODO: TAMO: waiting for milli. We should use the result
//let _ = index.update_documents(
//UpdateFormat::JsonStream,
//IndexDocumentsMethod::ReplaceDocuments,
//Some(reader),
//update_builder,
//primary_key,
//);
//// the last step: we extract the original milli::Index and close it
//Arc::try_unwrap(index.0)
//.map_err(|_e| "[dumps] At this point no one is supposed to have a reference on the index")
//.unwrap()
//.prepare_for_closing()
//.wait();
//// at this point we should handle the import of the updates, but since the update logic is not handled in
//// meilisearch we are just going to ignore this part
//Ok(())
//}

View File

@@ -0,0 +1,179 @@
use std::{fs::File, io::BufReader, marker::PhantomData, path::Path};
use anyhow::Context;
use chrono::{DateTime, Utc};
use log::info;
use serde::{Deserialize, Serialize};
use crate::index_controller::uuid_resolver::store::UuidStore;
#[derive(Serialize, Deserialize, Debug)]
pub struct MetadataV2<U> {
db_version: String,
index_db_size: usize,
update_db_size: usize,
dump_date: DateTime<Utc>,
_pth: PhantomData<U>,
}
impl<U> MetadataV2<U>
where U: UuidStore,
{
pub fn load_dump(self, src: impl AsRef<Path>, dst: impl AsRef<Path>) -> anyhow::Result<()> {
info!(
"Loading dump from {}, dump database version: {}, dump version: V2",
self.dump_date, self.db_version
);
// get dir in which to load the db:
let dst_dir = dst
.as_ref()
.parent()
.with_context(|| format!("Invalid db path: {}", dst.as_ref().display()))?;
let tmp_dst = tempfile::tempdir_in(dst_dir)?;
self.load_index_resolver(&src, tmp_dst.path())?;
load_updates(&src, tmp_dst.path())?;
load_indexes(&src, tmp_dst.path())?;
Ok(())
}
fn load_index_resolver(
&self,
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
) -> anyhow::Result<()> {
info!("Loading index database.");
let uuid_resolver_path = dst.as_ref().join("uuid_resolver/");
std::fs::create_dir_all(&uuid_resolver_path)?;
U::load_dump(src.as_ref(), dst.as_ref())?;
Ok(())
}
}
fn load_updates(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> anyhow::Result<()> {
info!("Loading updates.");
todo!()
}
fn load_indexes(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> anyhow::Result<()> {
info!("Loading indexes");
todo!()
}
// Extract Settings from `settings.json` file present at provided `dir_path`
//fn import_settings(dir_path: &Path) -> anyhow::Result<Settings<Checked>> {
//let path = dir_path.join("settings.json");
//let file = File::open(path)?;
//let reader = BufReader::new(file);
//let metadata: Settings<Unchecked> = serde_json::from_reader(reader)?;
//Ok(metadata.check())
//}
//pub fn import_dump(
//_db_size: usize,
//update_db_size: usize,
//_uuid: Uuid,
//dump_path: impl AsRef<Path>,
//db_path: impl AsRef<Path>,
//_primary_key: Option<&str>,
//) -> anyhow::Result<()> {
//info!("Dump import started.");
//info!("Importing outstanding updates...");
//import_updates(&dump_path, &db_path, update_db_size)?;
//info!("done importing updates");
//Ok(())
////let index_path = db_path.join(&format!("indexes/index-{}", uuid));
////std::fs::create_dir_all(&index_path)?;
////let mut options = EnvOpenOptions::new();
////options.map_size(size);
////let index = milli::Index::new(options, index_path)?;
////let index = Index(Arc::new(index));
////let mut txn = index.write_txn()?;
////info!("importing the settings...");
////// extract `settings.json` file and import content
////let settings = import_settings(&dump_path)?;
////let update_builder = UpdateBuilder::new(0);
////index.update_settings_txn(&mut txn, &settings, update_builder)?;
////// import the documents in the index
////let update_builder = UpdateBuilder::new(1);
////let file = File::open(&dump_path.join("documents.jsonl"))?;
////let reader = std::io::BufReader::new(file);
////info!("importing the documents...");
////// TODO: TAMO: currently we ignore any error caused by the importation of the documents because
////// if there is no documents nor primary key it'll throw an anyhow error, but we must remove
////// this before the merge on main
////index.update_documents_txn(
////&mut txn,
////UpdateFormat::JsonStream,
////IndexDocumentsMethod::ReplaceDocuments,
////Some(reader),
////update_builder,
////primary_key,
////)?;
////txn.commit()?;
////// the last step: we extract the original milli::Index and close it
////Arc::try_unwrap(index.0)
////.map_err(|_e| "[dumps] At this point no one is supposed to have a reference on the index")
////.unwrap()
////.prepare_for_closing()
////.wait();
////info!("importing the updates...");
////import_updates(dump_path, db_path)
//}
//fn import_updates(
//src_path: impl AsRef<Path>,
//dst_path: impl AsRef<Path>,
//_update_db_size: usize
//) -> anyhow::Result<()> {
//let dst_update_path = dst_path.as_ref().join("updates");
//std::fs::create_dir_all(&dst_update_path)?;
//let dst_update_files_path = dst_update_path.join("update_files");
//std::fs::create_dir_all(&dst_update_files_path)?;
//let options = EnvOpenOptions::new();
//let (update_store, _) = UpdateStore::create(options, &dst_update_path)?;
//let src_update_path = src_path.as_ref().join("updates");
//let src_update_files_path = src_update_path.join("update_files");
//let update_data = File::open(&src_update_path.join("data.jsonl"))?;
//let mut update_data = BufReader::new(update_data);
//let mut wtxn = update_store.env.write_txn()?;
//let mut line = String::new();
//loop {
//match update_data.read_line(&mut line) {
//Ok(_) => {
//let UpdateEntry { uuid, mut update } = serde_json::from_str(&line)?;
//if let Some(path) = update.content_path_mut() {
//let dst_file_path = dst_update_files_path.join(&path);
//let src_file_path = src_update_files_path.join(&path);
//*path = dst_update_files_path.join(&path);
//std::fs::copy(src_file_path, dst_file_path)?;
//}
//update_store.register_raw_updates(&mut wtxn, update, uuid)?;
//}
//_ => break,
//}
//}
//wtxn.commit()?;
//Ok(())
//}

View File

@@ -1,26 +1,18 @@
mod actor; mod actor;
mod handle_impl; mod handle_impl;
mod message; mod message;
mod v1; mod loaders;
mod v2;
use std::{fs::File, path::Path, sync::Arc}; use std::{fs::File, path::Path};
use anyhow::bail; use log::error;
use heed::EnvOpenOptions;
use log::{error, info};
use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat};
#[cfg(test)] #[cfg(test)]
use mockall::automock; use mockall::automock;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tempfile::TempDir;
use thiserror::Error; use thiserror::Error;
use uuid::Uuid;
use super::IndexMetadata; use loaders::v1::MetadataV1;
use crate::helpers::compression; use loaders::v2::MetadataV2;
use crate::index::Index;
use crate::index_controller::uuid_resolver;
pub use actor::DumpActor; pub use actor::DumpActor;
pub use handle_impl::*; pub use handle_impl::*;
@@ -40,31 +32,6 @@ pub enum DumpError {
DumpDoesNotExist(String), DumpDoesNotExist(String),
} }
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
enum DumpVersion {
V1,
V2,
}
impl DumpVersion {
const CURRENT: Self = Self::V2;
/// Select the good importation function from the `DumpVersion` of metadata
pub fn import_index(
self,
size: usize,
uuid: Uuid,
dump_path: &Path,
db_path: &Path,
primary_key: Option<&str>,
) -> anyhow::Result<()> {
match self {
Self::V1 => v1::import_index(size, uuid, dump_path, db_path, primary_key),
Self::V2 => v2::import_index(size, uuid, dump_path, db_path, primary_key),
}
}
}
#[async_trait::async_trait] #[async_trait::async_trait]
#[cfg_attr(test, automock)] #[cfg_attr(test, automock)]
pub trait DumpActorHandle { pub trait DumpActorHandle {
@@ -78,23 +45,19 @@ pub trait DumpActorHandle {
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase", tag = "dump_version")]
pub struct Metadata { pub enum Metadata {
indexes: Vec<IndexMetadata>, V1 {
db_version: String, #[serde(flatten)]
dump_version: DumpVersion, meta: MetadataV1,
},
V2 {
#[serde(flatten)]
meta: MetadataV2,
},
} }
impl Metadata { impl Metadata {
/// Create a Metadata with the current dump version of meilisearch.
pub fn new(indexes: Vec<IndexMetadata>, db_version: String) -> Self {
Metadata {
indexes,
db_version,
dump_version: DumpVersion::CURRENT,
}
}
/// Extract Metadata from `metadata.json` file present at provided `dir_path` /// Extract Metadata from `metadata.json` file present at provided `dir_path`
fn from_path(dir_path: &Path) -> anyhow::Result<Self> { fn from_path(dir_path: &Path) -> anyhow::Result<Self> {
let path = dir_path.join("metadata.json"); let path = dir_path.join("metadata.json");
@@ -155,80 +118,19 @@ impl DumpInfo {
} }
pub fn load_dump( pub fn load_dump(
db_path: impl AsRef<Path>, dst_path: impl AsRef<Path>,
dump_path: impl AsRef<Path>, src_path: impl AsRef<Path>,
size: usize, _index_db_size: u64,
_update_db_size: u64,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
info!("Importing dump from {}...", dump_path.as_ref().display()); let meta_path = src_path.as_ref().join("metadat.json");
let db_path = db_path.as_ref(); let mut meta_file = File::open(&meta_path)?;
let dump_path = dump_path.as_ref(); let meta: Metadata = serde_json::from_reader(&mut meta_file)?;
let uuid_resolver = uuid_resolver::HeedUuidStore::new(&db_path)?;
// extract the dump in a temporary directory match meta {
let tmp_dir = TempDir::new_in(db_path)?; Metadata::V1 { meta } => meta.load_dump(src_path, dst_path)?,
let tmp_dir_path = tmp_dir.path(); Metadata::V2 { meta } => meta.load_dump(src_path, dst_path)?,
compression::from_tar_gz(dump_path, tmp_dir_path)?;
// read dump metadata
let metadata = Metadata::from_path(&tmp_dir_path)?;
// remove indexes which have same `uuid` than indexes to import and create empty indexes
let existing_index_uids = uuid_resolver.list()?;
info!("Deleting indexes already present in the db and provided in the dump...");
for idx in &metadata.indexes {
if let Some((_, uuid)) = existing_index_uids.iter().find(|(s, _)| s == &idx.uid) {
// if we find the index in the `uuid_resolver` it's supposed to exist on the file system
// and we want to delete it
let path = db_path.join(&format!("indexes/index-{}", uuid));
info!("Deleting {}", path.display());
use std::io::ErrorKind::*;
match std::fs::remove_dir_all(path) {
Ok(()) => (),
// if an index was present in the metadata but missing of the fs we can ignore the
// problem because we are going to create it later
Err(e) if e.kind() == NotFound => (),
Err(e) => bail!(e),
}
} else {
// if the index does not exist in the `uuid_resolver` we create it
uuid_resolver.create_uuid(idx.uid.clone(), false)?;
}
} }
// import each indexes content
for idx in metadata.indexes {
let dump_path = tmp_dir_path.join(&idx.uid);
// this cannot fail since we created all the missing uuid in the previous loop
let uuid = uuid_resolver.get_uuid(idx.uid)?.unwrap();
info!(
"Importing dump from {} into {}...",
dump_path.display(),
db_path.display()
);
metadata.dump_version.import_index(
size,
uuid,
&dump_path,
&db_path,
idx.meta.primary_key.as_ref().map(|s| s.as_ref()),
)?;
info!("Dump importation from {} succeed", dump_path.display());
}
// finally we can move all the unprocessed update file into our new DB
// this directory may not exists
let update_path = tmp_dir_path.join("update_files");
let db_update_path = db_path.join("updates/update_files");
if update_path.exists() {
let _ = std::fs::remove_dir_all(db_update_path);
std::fs::rename(
tmp_dir_path.join("update_files"),
db_path.join("updates/update_files"),
)?;
}
info!("Dump importation from {} succeed", dump_path.display());
Ok(()) Ok(())
} }

View File

@@ -1,122 +0,0 @@
use std::{collections::{BTreeMap, BTreeSet}, marker::PhantomData};
use log::warn;
use serde::{Deserialize, Serialize};
use crate::{index::Unchecked, index_controller};
use crate::index::deserialize_some;
use super::*;
/// This is the settings used in the last version of meilisearch exporting dump in V1
#[derive(Default, Clone, Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase", deny_unknown_fields)]
struct Settings {
#[serde(default, deserialize_with = "deserialize_some")]
pub ranking_rules: Option<Option<Vec<String>>>,
#[serde(default, deserialize_with = "deserialize_some")]
pub distinct_attribute: Option<Option<String>>,
#[serde(default, deserialize_with = "deserialize_some")]
pub searchable_attributes: Option<Option<Vec<String>>>,
#[serde(default, deserialize_with = "deserialize_some")]
pub displayed_attributes: Option<Option<BTreeSet<String>>>,
#[serde(default, deserialize_with = "deserialize_some")]
pub stop_words: Option<Option<BTreeSet<String>>>,
#[serde(default, deserialize_with = "deserialize_some")]
pub synonyms: Option<Option<BTreeMap<String, Vec<String>>>>,
#[serde(default, deserialize_with = "deserialize_some")]
pub attributes_for_faceting: Option<Option<Vec<String>>>,
}
/// we need to **always** be able to convert the old settings to the settings currently being used
impl From<Settings> for index_controller::Settings<Unchecked> {
fn from(settings: Settings) -> Self {
if settings.synonyms.flatten().is_some() {
error!("`synonyms` are not yet implemented and thus will be ignored");
}
Self {
distinct_attribute: settings.distinct_attribute,
// we need to convert the old `Vec<String>` into a `BTreeSet<String>`
displayed_attributes: settings.displayed_attributes.map(|o| o.map(|vec| vec.into_iter().collect())),
searchable_attributes: settings.searchable_attributes,
// we previously had a `Vec<String>` but now we have a `HashMap<String, String>`
// representing the name of the faceted field + the type of the field. Since the type
// was not known in the V1 of the dump we are just going to assume everything is a
// String
attributes_for_faceting: settings.attributes_for_faceting.map(|o| o.map(|vec| vec.into_iter().map(|key| (key, String::from("string"))).collect())),
// we need to convert the old `Vec<String>` into a `BTreeSet<String>`
ranking_rules: settings.ranking_rules.map(|o| o.map(|vec| vec.into_iter().filter_map(|criterion| {
match criterion.as_str() {
"words" | "typo" | "proximity" | "attribute" => Some(criterion),
s if s.starts_with("asc") || s.starts_with("desc") => Some(criterion),
"wordsPosition" => {
warn!("The criteria `words` and `wordsPosition` have been merged into a single criterion `words` so `wordsPositon` will be ignored");
Some(String::from("words"))
}
"exactness" => {
error!("The criterion `{}` is not implemented currently and thus will be ignored", criterion);
None
}
s => {
error!("Unknown criterion found in the dump: `{}`, it will be ignored", s);
None
}
}
}).collect())),
// we need to convert the old `Vec<String>` into a `BTreeSet<String>`
stop_words: settings.stop_words.map(|o| o.map(|vec| vec.into_iter().collect())),
_kind: PhantomData,
}
}
}
/// Extract Settings from `settings.json` file present at provided `dir_path`
fn import_settings(dir_path: &Path) -> anyhow::Result<Settings> {
let path = dir_path.join("settings.json");
let file = File::open(path)?;
let reader = std::io::BufReader::new(file);
let metadata = serde_json::from_reader(reader)?;
Ok(metadata)
}
pub fn import_index(size: usize, uuid: Uuid, dump_path: &Path, db_path: &Path, primary_key: Option<&str>) -> anyhow::Result<()> {
let index_path = db_path.join(&format!("indexes/index-{}", uuid));
info!("Importing a dump from an old version of meilisearch with dump version 1");
std::fs::create_dir_all(&index_path)?;
let mut options = EnvOpenOptions::new();
options.map_size(size);
let index = milli::Index::new(options, index_path)?;
let index = Index(Arc::new(index));
// extract `settings.json` file and import content
let settings = import_settings(&dump_path)?;
let settings: index_controller::Settings<Unchecked> = settings.into();
let update_builder = UpdateBuilder::new(0);
index.update_settings(&settings.check(), update_builder)?;
let update_builder = UpdateBuilder::new(1);
let file = File::open(&dump_path.join("documents.jsonl"))?;
let reader = std::io::BufReader::new(file);
// TODO: TAMO: waiting for milli. We should use the result
let _ = index.update_documents(
UpdateFormat::JsonStream,
IndexDocumentsMethod::ReplaceDocuments,
Some(reader),
update_builder,
primary_key,
);
// the last step: we extract the original milli::Index and close it
Arc::try_unwrap(index.0)
.map_err(|_e| "[dumps] At this point no one is supposed to have a reference on the index")
.unwrap()
.prepare_for_closing()
.wait();
// at this point we should handle the import of the updates, but since the update logic is not handled in
// meilisearch we are just going to ignore this part
Ok(())
}

View File

@@ -1,89 +0,0 @@
use heed::EnvOpenOptions;
use log::info;
use uuid::Uuid;
use crate::{index::Unchecked, index_controller::{UpdateStatus, update_actor::UpdateStore}};
use std::io::BufRead;
use milli::{update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat}};
use crate::index::{Checked, Index};
use crate::index_controller::Settings;
use std::{fs::File, path::Path, sync::Arc};
/// Extract Settings from `settings.json` file present at provided `dir_path`
fn import_settings(dir_path: &Path) -> anyhow::Result<Settings<Checked>> {
let path = dir_path.join("settings.json");
let file = File::open(path)?;
let reader = std::io::BufReader::new(file);
let metadata: Settings<Unchecked> = serde_json::from_reader(reader)?;
println!("Meta: {:?}", metadata);
Ok(metadata.check())
}
pub fn import_index(size: usize, uuid: Uuid, dump_path: &Path, db_path: &Path, primary_key: Option<&str>) -> anyhow::Result<()> {
let index_path = db_path.join(&format!("indexes/index-{}", uuid));
std::fs::create_dir_all(&index_path)?;
let mut options = EnvOpenOptions::new();
options.map_size(size);
let index = milli::Index::new(options, index_path)?;
let index = Index(Arc::new(index));
let mut txn = index.write_txn()?;
info!("importing the settings...");
// extract `settings.json` file and import content
let settings = import_settings(&dump_path)?;
let update_builder = UpdateBuilder::new(0);
index.update_settings_txn(&mut txn, &settings, update_builder)?;
// import the documents in the index
let update_builder = UpdateBuilder::new(1);
let file = File::open(&dump_path.join("documents.jsonl"))?;
let reader = std::io::BufReader::new(file);
info!("importing the documents...");
// TODO: TAMO: currently we ignore any error caused by the importation of the documents because
// if there is no documents nor primary key it'll throw an anyhow error, but we must remove
// this before the merge on main
index.update_documents_txn(
&mut txn,
UpdateFormat::JsonStream,
IndexDocumentsMethod::ReplaceDocuments,
Some(reader),
update_builder,
primary_key,
)?;
txn.commit()?;
// the last step: we extract the original milli::Index and close it
Arc::try_unwrap(index.0)
.map_err(|_e| "[dumps] At this point no one is supposed to have a reference on the index")
.unwrap()
.prepare_for_closing()
.wait();
info!("importing the updates...");
import_updates(uuid, dump_path, db_path)
}
fn import_updates(uuid: Uuid, dump_path: &Path, db_path: &Path) -> anyhow::Result<()> {
let update_path = db_path.join("updates");
let options = EnvOpenOptions::new();
// create an UpdateStore to import the updates
std::fs::create_dir_all(&update_path)?;
let (update_store, _) = UpdateStore::create(options, &update_path)?;
let file = File::open(&dump_path.join("updates.jsonl"))?;
let reader = std::io::BufReader::new(file);
let mut wtxn = update_store.env.write_txn()?;
for update in reader.lines() {
let mut update: UpdateStatus = serde_json::from_str(&update?)?;
if let Some(path) = update.content_path_mut() {
*path = update_path.join("update_files").join(&path);
}
update_store.register_raw_updates(&mut wtxn, update, uuid)?;
}
wtxn.commit()?;
Ok(())
}

View File

@@ -14,22 +14,20 @@ use tokio::sync::mpsc;
use tokio::time::sleep; use tokio::time::sleep;
use uuid::Uuid; use uuid::Uuid;
pub use updates::*;
pub use dump_actor::{DumpInfo, DumpStatus};
use dump_actor::DumpActorHandle; use dump_actor::DumpActorHandle;
pub use dump_actor::{DumpInfo, DumpStatus};
use index_actor::IndexActorHandle; use index_actor::IndexActorHandle;
use snapshot::{SnapshotService, load_snapshot}; use snapshot::{load_snapshot, SnapshotService};
use update_actor::UpdateActorHandle; use update_actor::UpdateActorHandle;
pub use updates::*;
use uuid_resolver::{UuidResolverError, UuidResolverHandle}; use uuid_resolver::{UuidResolverError, UuidResolverHandle};
use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings}; use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings};
use crate::option::Opt; use crate::option::Opt;
use dump_actor::load_dump; mod dump_actor;
mod index_actor; mod index_actor;
mod snapshot; mod snapshot;
mod dump_actor;
mod update_actor; mod update_actor;
mod update_handler; mod update_handler;
mod updates; mod updates;
@@ -94,13 +92,8 @@ impl IndexController {
options.ignore_snapshot_if_db_exists, options.ignore_snapshot_if_db_exists,
options.ignore_missing_snapshot, options.ignore_missing_snapshot,
)?; )?;
} else if let Some(ref path) = options.import_dump { } else if let Some(ref _path) = options.import_dump {
load_dump( todo!("implement load dump")
&options.db_path,
path,
index_size,
)?;
} }
std::fs::create_dir_all(&path)?; std::fs::create_dir_all(&path)?;
@@ -112,7 +105,13 @@ impl IndexController {
&path, &path,
update_store_size, update_store_size,
)?; )?;
let dump_handle = dump_actor::DumpActorHandleImpl::new(&options.dumps_dir, uuid_resolver.clone(), index_handle.clone(), update_handle.clone())?; let dump_handle = dump_actor::DumpActorHandleImpl::new(
&options.dumps_dir,
uuid_resolver.clone(),
update_handle.clone(),
options.max_mdb_size.get_bytes(),
options.max_udb_size.get_bytes(),
)?;
if options.schedule_snapshot { if options.schedule_snapshot {
let snapshot_service = SnapshotService::new( let snapshot_service = SnapshotService::new(
@@ -158,7 +157,8 @@ impl IndexController {
// prevent dead_locking between the update_handle::update that waits for the update to be // prevent dead_locking between the update_handle::update that waits for the update to be
// registered and the update_actor that waits for the the payload to be sent to it. // registered and the update_actor that waits for the the payload to be sent to it.
tokio::task::spawn_local(async move { tokio::task::spawn_local(async move {
payload.for_each(|r| async { payload
.for_each(|r| async {
let _ = sender.send(r).await; let _ = sender.send(r).await;
}) })
.await .await

View File

@@ -1,7 +1,7 @@
mod actor; mod actor;
mod handle_impl; mod handle_impl;
mod message; mod message;
mod store; pub mod store;
use std::{collections::HashSet, path::PathBuf}; use std::{collections::HashSet, path::PathBuf};

View File

@@ -15,7 +15,7 @@ use super::UpdateStore;
use crate::index_controller::{index_actor::IndexActorHandle, UpdateStatus}; use crate::index_controller::{index_actor::IndexActorHandle, UpdateStatus};
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
struct UpdateEntry { pub struct UpdateEntry {
uuid: Uuid, uuid: Uuid,
update: UpdateStatus, update: UpdateStatus,
} }

View File

@@ -1,4 +1,4 @@
mod dump; pub mod dump;
mod codec; mod codec;
use std::collections::{BTreeMap, HashSet}; use std::collections::{BTreeMap, HashSet};
@@ -115,7 +115,6 @@ impl UpdateStore {
let (notification_sender, notification_receiver) = mpsc::channel(10); let (notification_sender, notification_receiver) = mpsc::channel(10);
// Send a first notification to trigger the process. // Send a first notification to trigger the process.
let _ = notification_sender.send(());
Ok(( Ok((
Self { Self {
@@ -138,6 +137,9 @@ impl UpdateStore {
let (update_store, mut notification_receiver) = Self::create(options, path)?; let (update_store, mut notification_receiver) = Self::create(options, path)?;
let update_store = Arc::new(update_store); let update_store = Arc::new(update_store);
// trigger the update loop
let _ = update_store.notification_sender.send(());
// Init update loop to perform any pending updates at launch. // Init update loop to perform any pending updates at launch.
// Since we just launched the update store, and we still own the receiving end of the // Since we just launched the update store, and we still own the receiving end of the
// channel, this call is guaranteed to succeed. // channel, this call is guaranteed to succeed.

View File

@@ -1,7 +1,7 @@
mod actor; mod actor;
mod handle_impl; mod handle_impl;
mod message; mod message;
mod store; pub mod store;
use std::collections::HashSet; use std::collections::HashSet;
use std::path::PathBuf; use std::path::PathBuf;

View File

@@ -1,4 +1,4 @@
use std::{collections::HashSet, io::Write}; use std::{collections::HashSet, io::{BufReader, BufRead, Write}};
use std::fs::{create_dir_all, File}; use std::fs::{create_dir_all, File};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
@@ -7,12 +7,19 @@ use heed::{
CompactionOption, Database, Env, EnvOpenOptions, CompactionOption, Database, Env, EnvOpenOptions,
}; };
use uuid::Uuid; use uuid::Uuid;
use serde::{Serialize, Deserialize};
use super::{Result, UuidResolverError, UUID_STORE_SIZE}; use super::{Result, UuidResolverError, UUID_STORE_SIZE};
use crate::helpers::EnvSizer; use crate::helpers::EnvSizer;
#[derive(Serialize, Deserialize)]
struct DumpEntry {
uuid: Uuid,
uid: String,
}
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait UuidStore { pub trait UuidStore: Sized {
// Create a new entry for `name`. Return an error if `err` and the entry already exists, return // Create a new entry for `name`. Return an error if `err` and the entry already exists, return
// the uuid otherwise. // the uuid otherwise.
async fn create_uuid(&self, uid: String, err: bool) -> Result<Uuid>; async fn create_uuid(&self, uid: String, err: bool) -> Result<Uuid>;
@@ -23,6 +30,7 @@ pub trait UuidStore {
async fn snapshot(&self, path: PathBuf) -> Result<HashSet<Uuid>>; async fn snapshot(&self, path: PathBuf) -> Result<HashSet<Uuid>>;
async fn get_size(&self) -> Result<u64>; async fn get_size(&self) -> Result<u64>;
async fn dump(&self, path: PathBuf) -> Result<HashSet<Uuid>>; async fn dump(&self, path: PathBuf) -> Result<HashSet<Uuid>>;
fn load_dump(src: &Path, dst: &Path) -> Result<()>;
} }
#[derive(Clone)] #[derive(Clone)]
@@ -62,11 +70,7 @@ impl HeedUuidStore {
Ok(uuid) Ok(uuid)
} }
} }
} } pub fn get_uuid(&self, name: String) -> Result<Option<Uuid>> { let env = self.env.clone(); let db = self.db;
pub fn get_uuid(&self, name: String) -> Result<Option<Uuid>> {
let env = self.env.clone();
let db = self.db;
let txn = env.read_txn()?; let txn = env.read_txn()?;
match db.get(&txn, &name)? { match db.get(&txn, &name)? {
Some(uuid) => { Some(uuid) => {
@@ -149,11 +153,14 @@ impl HeedUuidStore {
let txn = self.env.read_txn()?; let txn = self.env.read_txn()?;
for entry in self.db.iter(&txn)? { for entry in self.db.iter(&txn)? {
let entry = entry?; let (uid, uuid) = entry?;
let uuid = Uuid::from_slice(entry.1)?; let uuid = Uuid::from_slice(entry.1)?;
uuids.insert(uuid); uuids.insert(uuid);
serde_json::to_writer(&mut dump_file, &serde_json::json!({ "uid": entry.0, "uuid": uuid let entry = DumpEntry {
}))?; dump_file.write(b"\n").unwrap(); uuid, uid
};
serde_json::to_writer(&mut dump_file, &entry)?;
dump_file.write(b"\n").unwrap();
} }
Ok(uuids) Ok(uuids)
@@ -200,4 +207,33 @@ impl UuidStore for HeedUuidStore {
let this = self.clone(); let this = self.clone();
tokio::task::spawn_blocking(move || this.dump(path)).await? tokio::task::spawn_blocking(move || this.dump(path)).await?
} }
async fn load_dump(src: &Path, dst: &Path) -> Result<()> {
let uuid_resolver_path = dst.join("uuid_resolver/");
std::fs::create_dir_all(&uuid_resolver_path)?;
let src_indexes = src.join("index_uuids/data.jsonl");
let indexes = File::Open(&src_indexes)?;
let mut indexes = BufReader::new(indexes);
let mut line = String::new();
let db = Self::new(dst)?;
let mut txn = db.env.write_txn()?;
loop {
match indexes.read_line(&mut line) {
Ok(0) => break,
Ok(_) => {
let DumpEntry { uuid, uid } = serde_json::from_str(&line)?;
db.db.put(&mut txn, &uid, uuid.as_bytes())?;
}
Err(e) => Err(e)?,
}
line.clear();
}
txn.commit()?;
Ok(())
}
} }