add dump batch handler

This commit is contained in:
ad hoc
2022-05-19 14:44:24 +02:00
parent 46cdc17701
commit 60a8249de6
9 changed files with 166 additions and 176 deletions

View File

@@ -45,8 +45,8 @@ async fn get_dump_status(
meilisearch: GuardedData<ActionPolicy<{ actions::DUMPS_GET }>, MeiliSearch>, meilisearch: GuardedData<ActionPolicy<{ actions::DUMPS_GET }>, MeiliSearch>,
path: web::Path<DumpParam>, path: web::Path<DumpParam>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let res = meilisearch.dump_info(path.dump_uid.clone()).await?; todo!();
debug!("returns: {:?}", res); // debug!("returns: {:?}", res);
Ok(HttpResponse::Ok().json(res)) // Ok(HttpResponse::Ok().json(res))
} }

View File

@@ -9,7 +9,7 @@ use time::macros::format_description;
use time::OffsetDateTime; use time::OffsetDateTime;
use tokio::sync::{mpsc, oneshot, RwLock}; use tokio::sync::{mpsc, oneshot, RwLock};
use super::error::{DumpActorError, Result}; use super::error::{DumpError, Result};
use super::{DumpInfo, DumpJob, DumpMsg, DumpStatus}; use super::{DumpInfo, DumpJob, DumpMsg, DumpStatus};
use crate::tasks::Scheduler; use crate::tasks::Scheduler;
use crate::update_file_store::UpdateFileStore; use crate::update_file_store::UpdateFileStore;
@@ -106,7 +106,7 @@ impl DumpActor {
let _lock = match self.lock.try_lock() { let _lock = match self.lock.try_lock() {
Some(lock) => lock, Some(lock) => lock,
None => { None => {
ret.send(Err(DumpActorError::DumpAlreadyRunning)) ret.send(Err(DumpError::DumpAlreadyRunning))
.expect("Dump actor is dead"); .expect("Dump actor is dead");
return; return;
} }
@@ -123,7 +123,6 @@ impl DumpActor {
dump_path: self.dump_path.clone(), dump_path: self.dump_path.clone(),
db_path: self.analytics_path.clone(), db_path: self.analytics_path.clone(),
update_file_store: self.update_file_store.clone(), update_file_store: self.update_file_store.clone(),
scheduler: self.scheduler.clone(),
uid: uid.clone(), uid: uid.clone(),
update_db_size: self.update_db_size, update_db_size: self.update_db_size,
index_db_size: self.index_db_size, index_db_size: self.index_db_size,
@@ -155,7 +154,7 @@ impl DumpActor {
async fn handle_dump_info(&self, uid: String) -> Result<DumpInfo> { async fn handle_dump_info(&self, uid: String) -> Result<DumpInfo> {
match self.dump_infos.read().await.get(&uid) { match self.dump_infos.read().await.get(&uid) {
Some(info) => Ok(info.clone()), Some(info) => Ok(info.clone()),
_ => Err(DumpActorError::DumpDoesNotExist(uid)), _ => Err(DumpError::DumpDoesNotExist(uid)),
} }
} }
} }

View File

@@ -3,10 +3,10 @@ use meilisearch_error::{internal_error, Code, ErrorCode};
use crate::{index_resolver::error::IndexResolverError, tasks::error::TaskError}; use crate::{index_resolver::error::IndexResolverError, tasks::error::TaskError};
pub type Result<T> = std::result::Result<T, DumpActorError>; pub type Result<T> = std::result::Result<T, DumpError>;
#[derive(thiserror::Error, Debug)] #[derive(thiserror::Error, Debug)]
pub enum DumpActorError { pub enum DumpError {
#[error("A dump is already processing. You must wait until the current process is finished before requesting another dump.")] #[error("A dump is already processing. You must wait until the current process is finished before requesting another dump.")]
DumpAlreadyRunning, DumpAlreadyRunning,
#[error("Dump `{0}` not found.")] #[error("Dump `{0}` not found.")]
@@ -18,7 +18,7 @@ pub enum DumpActorError {
} }
internal_error!( internal_error!(
DumpActorError: milli::heed::Error, DumpError: milli::heed::Error,
std::io::Error, std::io::Error,
tokio::task::JoinError, tokio::task::JoinError,
tokio::sync::oneshot::error::RecvError, tokio::sync::oneshot::error::RecvError,
@@ -29,13 +29,13 @@ internal_error!(
TaskError TaskError
); );
impl ErrorCode for DumpActorError { impl ErrorCode for DumpError {
fn error_code(&self) -> Code { fn error_code(&self) -> Code {
match self { match self {
DumpActorError::DumpAlreadyRunning => Code::DumpAlreadyInProgress, DumpError::DumpAlreadyRunning => Code::DumpAlreadyInProgress,
DumpActorError::DumpDoesNotExist(_) => Code::DumpNotFound, DumpError::DumpDoesNotExist(_) => Code::DumpNotFound,
DumpActorError::Internal(_) => Code::Internal, DumpError::Internal(_) => Code::Internal,
DumpActorError::IndexResolver(e) => e.error_code(), DumpError::IndexResolver(e) => e.error_code(),
} }
} }
} }

View File

@@ -1,7 +1,7 @@
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use super::error::Result; use super::error::Result;
use super::{DumpActorHandle, DumpInfo, DumpMsg}; use super::{DumpActorHandle, DumpMsg};
#[derive(Clone)] #[derive(Clone)]
pub struct DumpActorHandleImpl { pub struct DumpActorHandleImpl {

View File

@@ -1,32 +1,30 @@
use std::fs::File; use std::fs::File;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::bail; use anyhow::bail;
use log::info; use log::{info, trace};
use meilisearch_auth::AuthController;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use time::OffsetDateTime; use time::OffsetDateTime;
pub use actor::DumpActor;
pub use handle_impl::*;
pub use message::DumpMsg;
use tempfile::TempDir; use tempfile::TempDir;
use tokio::sync::RwLock; use tokio::fs::create_dir_all;
use crate::compression::from_tar_gz; use crate::analytics;
use crate::compression::{from_tar_gz, to_tar_gz};
use crate::dump::error::DumpError;
use crate::options::IndexerOpts; use crate::options::IndexerOpts;
use crate::tasks::Scheduler;
use crate::update_file_store::UpdateFileStore; use crate::update_file_store::UpdateFileStore;
use error::Result; use error::Result;
use self::loaders::{v2, v3, v4}; use self::loaders::{v2, v3, v4};
mod actor; // mod actor;
mod compat; mod compat;
pub mod error; pub mod error;
mod handle_impl; // mod handle_impl;
mod loaders; mod loaders;
mod message; // mod message;
const META_FILE_NAME: &str = "metadata.json"; const META_FILE_NAME: &str = "metadata.json";
@@ -51,18 +49,6 @@ impl Metadata {
} }
} }
#[async_trait::async_trait]
#[cfg_attr(test, mockall::automock)]
pub trait DumpActorHandle {
/// Start the creation of a dump
/// Implementation: [handle_impl::DumpActorHandleImpl::create_dump]
async fn create_dump(&self) -> Result<DumpInfo>;
/// Return the status of an already created dump
/// Implementation: [handle_impl::DumpActorHandleImpl::dump_info]
async fn dump_info(&self, uid: String) -> Result<DumpInfo>;
}
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct MetadataV1 { pub struct MetadataV1 {
@@ -159,49 +145,6 @@ pub enum DumpStatus {
Failed, Failed,
} }
#[derive(Debug, Serialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct DumpInfo {
pub uid: String,
pub status: DumpStatus,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
#[serde(with = "time::serde::rfc3339")]
started_at: OffsetDateTime,
#[serde(
skip_serializing_if = "Option::is_none",
with = "time::serde::rfc3339::option"
)]
finished_at: Option<OffsetDateTime>,
}
impl DumpInfo {
pub fn new(uid: String, status: DumpStatus) -> Self {
Self {
uid,
status,
error: None,
started_at: OffsetDateTime::now_utc(),
finished_at: None,
}
}
pub fn with_error(&mut self, error: String) {
self.status = DumpStatus::Failed;
self.finished_at = Some(OffsetDateTime::now_utc());
self.error = Some(error);
}
pub fn done(&mut self) {
self.finished_at = Some(OffsetDateTime::now_utc());
self.status = DumpStatus::Done;
}
pub fn dump_already_in_progress(&self) -> bool {
self.status == DumpStatus::InProgress
}
}
pub fn load_dump( pub fn load_dump(
dst_path: impl AsRef<Path>, dst_path: impl AsRef<Path>,
src_path: impl AsRef<Path>, src_path: impl AsRef<Path>,
@@ -313,76 +256,59 @@ fn persist_dump(dst_path: impl AsRef<Path>, tmp_dst: TempDir) -> anyhow::Result<
} }
pub struct DumpJob { pub struct DumpJob {
dump_path: PathBuf, pub dump_path: PathBuf,
db_path: PathBuf, pub db_path: PathBuf,
update_file_store: UpdateFileStore, pub update_file_store: UpdateFileStore,
scheduler: Arc<RwLock<Scheduler>>, pub uid: String,
uid: String, pub update_db_size: usize,
update_db_size: usize, pub index_db_size: usize,
index_db_size: usize,
} }
impl DumpJob { impl DumpJob {
async fn run(self) -> Result<()> { pub async fn run(self) -> Result<()> {
// trace!("Performing dump."); trace!("Performing dump.");
//
// create_dir_all(&self.dump_path).await?; create_dir_all(&self.dump_path).await?;
//
// let temp_dump_dir = tokio::task::spawn_blocking(tempfile::TempDir::new).await??; let temp_dump_dir = tokio::task::spawn_blocking(tempfile::TempDir::new).await??;
// let temp_dump_path = temp_dump_dir.path().to_owned(); let temp_dump_path = temp_dump_dir.path().to_owned();
//
// let meta = MetadataVersion::new_v4(self.index_db_size, self.update_db_size); let meta = MetadataVersion::new_v4(self.index_db_size, self.update_db_size);
// let meta_path = temp_dump_path.join(META_FILE_NAME); let meta_path = temp_dump_path.join(META_FILE_NAME);
// let mut meta_file = File::create(&meta_path)?; let mut meta_file = File::create(&meta_path)?;
// serde_json::to_writer(&mut meta_file, &meta)?; serde_json::to_writer(&mut meta_file, &meta)?;
// analytics::copy_user_id(&self.db_path, &temp_dump_path); analytics::copy_user_id(&self.db_path, &temp_dump_path);
//
// create_dir_all(&temp_dump_path.join("indexes")).await?; create_dir_all(&temp_dump_path.join("indexes")).await?;
//
// let (sender, receiver) = oneshot::channel(); AuthController::dump(&self.db_path, &temp_dump_path)?;
// // TODO: Dump indexes and updates
// self.scheduler
// .write() //TODO(marin): this is not right, the scheduler should dump itself, not do it here...
// .await
// .schedule_job(Job::Dump {
// ret: sender,
// path: temp_dump_path.clone(),
// })
// .await;
//
// // wait until the job has started performing before finishing the dump process
// let sender = receiver.await??;
//
// AuthController::dump(&self.db_path, &temp_dump_path)?;
//
// //TODO(marin): this is not right, the scheduler should dump itself, not do it here...
// self.scheduler // self.scheduler
// .read() // .read()
// .await // .await
// .dump(&temp_dump_path, self.update_file_store.clone()) // .dump(&temp_dump_path, self.update_file_store.clone())
// .await?; // .await?;
//
// let dump_path = tokio::task::spawn_blocking(move || -> Result<PathBuf> { let dump_path = tokio::task::spawn_blocking(move || -> Result<PathBuf> {
// // for now we simply copy the updates/updates_files // for now we simply copy the updates/updates_files
// // FIXME: We may copy more files than necessary, if new files are added while we are // FIXME: We may copy more files than necessary, if new files are added while we are
// // performing the dump. We need a way to filter them out. // performing the dump. We need a way to filter them out.
//
// let temp_dump_file = tempfile::NamedTempFile::new_in(&self.dump_path)?; let temp_dump_file = tempfile::NamedTempFile::new_in(&self.dump_path)?;
// to_tar_gz(temp_dump_path, temp_dump_file.path()) to_tar_gz(temp_dump_path, temp_dump_file.path())
// .map_err(|e| DumpActorError::Internal(e.into()))?; .map_err(|e| DumpError::Internal(e.into()))?;
//
// let dump_path = self.dump_path.join(self.uid).with_extension("dump"); let dump_path = self.dump_path.join(self.uid).with_extension("dump");
// temp_dump_file.persist(&dump_path)?; temp_dump_file.persist(&dump_path)?;
//
// Ok(dump_path) Ok(dump_path)
// }) })
// .await??; .await??;
//
// // notify the update loop that we are finished performing the dump. info!("Created dump in {:?}.", dump_path);
// let _ = sender.send(());
//
// info!("Created dump in {:?}.", dump_path);
//
Ok(()) Ok(())
} }
} }

View File

@@ -6,7 +6,7 @@ use tokio::task::JoinError;
use super::DocumentAdditionFormat; use super::DocumentAdditionFormat;
use crate::document_formats::DocumentFormatError; use crate::document_formats::DocumentFormatError;
use crate::dump::error::DumpActorError; use crate::dump::error::DumpError;
use crate::index::error::IndexError; use crate::index::error::IndexError;
use crate::tasks::error::TaskError; use crate::tasks::error::TaskError;
use crate::update_file_store::UpdateFileStoreError; use crate::update_file_store::UpdateFileStoreError;
@@ -28,7 +28,7 @@ pub enum IndexControllerError {
#[error("{0}")] #[error("{0}")]
TaskError(#[from] TaskError), TaskError(#[from] TaskError),
#[error("{0}")] #[error("{0}")]
DumpError(#[from] DumpActorError), DumpError(#[from] DumpError),
#[error("{0}")] #[error("{0}")]
DocumentFormatError(#[from] DocumentFormatError), DocumentFormatError(#[from] DocumentFormatError),
#[error("A {0} payload is missing.")] #[error("A {0} payload is missing.")]

View File

@@ -13,13 +13,13 @@ use futures::StreamExt;
use milli::update::IndexDocumentsMethod; use milli::update::IndexDocumentsMethod;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use time::OffsetDateTime; use time::OffsetDateTime;
use tokio::sync::{mpsc, RwLock}; use tokio::sync::RwLock;
use tokio::task::spawn_blocking; use tokio::task::spawn_blocking;
use tokio::time::sleep; use tokio::time::sleep;
use uuid::Uuid; use uuid::Uuid;
use crate::document_formats::{read_csv, read_json, read_ndjson}; use crate::document_formats::{read_csv, read_json, read_ndjson};
use crate::dump::{self, load_dump, DumpActor, DumpActorHandle, DumpActorHandleImpl, DumpInfo}; use crate::dump::load_dump;
use crate::index::{ use crate::index::{
Checked, Document, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings, Unchecked, Checked, Document, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings, Unchecked,
}; };
@@ -75,7 +75,6 @@ pub struct IndexController<U, I> {
scheduler: Arc<RwLock<Scheduler>>, scheduler: Arc<RwLock<Scheduler>>,
task_store: TaskStore, task_store: TaskStore,
dump_path: PathBuf, dump_path: PathBuf,
dump_handle: dump::DumpActorHandleImpl,
pub update_file_store: UpdateFileStore, pub update_file_store: UpdateFileStore,
} }
@@ -85,7 +84,6 @@ impl<U, I> Clone for IndexController<U, I> {
Self { Self {
index_resolver: self.index_resolver.clone(), index_resolver: self.index_resolver.clone(),
scheduler: self.scheduler.clone(), scheduler: self.scheduler.clone(),
dump_handle: self.dump_handle.clone(),
update_file_store: self.update_file_store.clone(), update_file_store: self.update_file_store.clone(),
task_store: self.task_store.clone(), task_store: self.task_store.clone(),
dump_path: self.dump_path.clone(), dump_path: self.dump_path.clone(),
@@ -228,23 +226,6 @@ impl IndexControllerBuilder {
let dump_path = self let dump_path = self
.dump_dst .dump_dst
.ok_or_else(|| anyhow::anyhow!("Missing dump directory path"))?; .ok_or_else(|| anyhow::anyhow!("Missing dump directory path"))?;
let dump_handle = {
let analytics_path = &db_path;
let (sender, receiver) = mpsc::channel(10);
let actor = DumpActor::new(
receiver,
update_file_store.clone(),
scheduler.clone(),
dump_path.clone(),
analytics_path,
index_size,
task_store_size,
);
tokio::task::spawn_local(actor.run());
DumpActorHandleImpl { sender }
};
if self.schedule_snapshot { if self.schedule_snapshot {
let snapshot_period = self let snapshot_period = self
@@ -269,7 +250,6 @@ impl IndexControllerBuilder {
Ok(IndexController { Ok(IndexController {
index_resolver, index_resolver,
scheduler, scheduler,
dump_handle,
dump_path, dump_path,
update_file_store, update_file_store,
task_store, task_store,
@@ -633,14 +613,6 @@ where
indexes, indexes,
}) })
} }
pub async fn create_dump(&self) -> Result<DumpInfo> {
Ok(self.dump_handle.create_dump().await?)
}
pub async fn dump_info(&self, uid: String) -> Result<DumpInfo> {
Ok(self.dump_handle.dump_info(uid).await?)
}
} }
pub async fn get_arc_ownership_blocking<T>(mut item: Arc<T>) -> T { pub async fn get_arc_ownership_blocking<T>(mut item: Arc<T>) -> T {

View File

@@ -0,0 +1,92 @@
use std::path::{Path, PathBuf};
use log::{error, trace};
use time::{macros::format_description, OffsetDateTime};
use crate::dump::DumpJob;
use crate::tasks::batch::{Batch, BatchContent};
use crate::tasks::BatchHandler;
use crate::update_file_store::UpdateFileStore;
pub struct DumpHandler {
update_file_store: UpdateFileStore,
dump_path: PathBuf,
db_path: PathBuf,
update_db_size: usize,
index_db_size: usize,
}
/// Generate uid from creation date
fn generate_uid() -> String {
OffsetDateTime::now_utc()
.format(format_description!(
"[year repr:full][month repr:numerical][day padding:zero]-[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]"
))
.unwrap()
}
impl DumpHandler {
pub fn new(
update_file_store: UpdateFileStore,
dump_path: impl AsRef<Path>,
db_path: impl AsRef<Path>,
index_db_size: usize,
update_db_size: usize,
) -> Self {
Self {
update_file_store,
dump_path: dump_path.as_ref().into(),
db_path: db_path.as_ref().into(),
index_db_size,
update_db_size,
}
}
async fn create_dump(&self) {
let uid = generate_uid();
let task = DumpJob {
dump_path: self.dump_path.clone(),
db_path: self.db_path.clone(),
update_file_store: self.update_file_store.clone(),
uid: uid.clone(),
update_db_size: self.update_db_size,
index_db_size: self.index_db_size,
};
let task_result = tokio::task::spawn_local(task.run()).await;
match task_result {
Ok(Ok(())) => {
trace!("Dump succeed");
}
Ok(Err(e)) => {
error!("Dump failed: {}", e);
}
Err(_) => {
error!("Dump panicked. Dump status set to failed");
}
};
}
}
#[async_trait::async_trait]
impl BatchHandler for DumpHandler {
fn accept(&self, batch: &Batch) -> bool {
matches!(batch.content, BatchContent::Dump { .. })
}
async fn process_batch(&self, batch: Batch) -> Batch {
match batch.content {
BatchContent::Dump { .. } => {
self.create_dump().await;
batch
}
_ => unreachable!("invalid batch content for dump"),
}
}
async fn finish(&self, _: &Batch) {
()
}
}

View File

@@ -1,2 +1,3 @@
pub mod dump_handler;
pub mod empty_handler; pub mod empty_handler;
mod index_resolver_handler; mod index_resolver_handler;