register dump handler

This commit is contained in:
ad hoc
2022-05-19 14:51:04 +02:00
parent 60a8249de6
commit 414d0907ce
4 changed files with 25 additions and 20 deletions

View File

@@ -27,7 +27,9 @@ use crate::options::{IndexerOpts, SchedulerConfig};
use crate::snapshot::{load_snapshot, SnapshotService};
use crate::tasks::error::TaskError;
use crate::tasks::task::{DocumentDeletion, Task, TaskContent, TaskId};
use crate::tasks::{BatchHandler, EmptyBatchHandler, Scheduler, TaskFilter, TaskStore};
use crate::tasks::{
BatchHandler, DumpHandler, EmptyBatchHandler, Scheduler, TaskFilter, TaskStore,
};
use error::Result;
use self::error::IndexControllerError;
@@ -74,7 +76,6 @@ pub struct IndexController<U, I> {
pub index_resolver: Arc<IndexResolver<U, I>>,
scheduler: Arc<RwLock<Scheduler>>,
task_store: TaskStore,
dump_path: PathBuf,
pub update_file_store: UpdateFileStore,
}
@@ -86,7 +87,6 @@ impl<U, I> Clone for IndexController<U, I> {
scheduler: self.scheduler.clone(),
update_file_store: self.update_file_store.clone(),
task_store: self.task_store.clone(),
dump_path: self.dump_path.clone(),
}
}
}
@@ -218,15 +218,28 @@ impl IndexControllerBuilder {
update_file_store.clone(),
)?);
let task_store = TaskStore::new(meta_env)?;
let handlers: Vec<Arc<dyn BatchHandler + Sync + Send + 'static>> =
vec![index_resolver.clone(), Arc::new(EmptyBatchHandler)];
let scheduler = Scheduler::new(task_store.clone(), handlers, scheduler_config)?;
let dump_path = self
.dump_dst
.ok_or_else(|| anyhow::anyhow!("Missing dump directory path"))?;
let dump_handler = Arc::new(DumpHandler::new(
update_file_store.clone(),
dump_path,
db_path.as_ref().clone(),
index_size,
task_store_size,
));
let task_store = TaskStore::new(meta_env)?;
// register all the batch handlers for use with the scheduler.
let handlers: Vec<Arc<dyn BatchHandler + Sync + Send + 'static>> = vec![
index_resolver.clone(),
dump_handler,
// dummy handler to catch all empty batches
Arc::new(EmptyBatchHandler),
];
let scheduler = Scheduler::new(task_store.clone(), handlers, scheduler_config)?;
if self.schedule_snapshot {
let snapshot_period = self
.snapshot_interval
@@ -250,7 +263,6 @@ impl IndexControllerBuilder {
Ok(IndexController {
index_resolver,
scheduler,
dump_path,
update_file_store,
task_store,
})
@@ -408,9 +420,7 @@ where
}
pub async fn register_dump_task(&self) -> Result<Task> {
let content = TaskContent::Dump {
path: self.dump_path.clone(),
};
let content = TaskContent::Dump;
let task = self.task_store.register(None, content).await?;
self.scheduler.read().await.notify();
Ok(task)

View File

@@ -1,6 +1,6 @@
use async_trait::async_trait;
pub use batch_handlers::empty_handler::EmptyBatchHandler;
pub use batch_handlers::{dump_handler::DumpHandler, empty_handler::EmptyBatchHandler};
pub use scheduler::Scheduler;
pub use task_store::TaskFilter;

View File

@@ -1,5 +1,3 @@
use std::path::PathBuf;
use meilisearch_error::ResponseError;
use milli::update::{DocumentAdditionResult, IndexDocumentsMethod};
use serde::{Deserialize, Serialize};
@@ -142,10 +140,7 @@ pub enum TaskContent {
IndexUpdate {
primary_key: Option<String>,
},
Dump {
#[cfg_attr(test, proptest(value = "PathBuf::from(\".\")"))]
path: PathBuf,
},
Dump,
}
#[cfg(test)]