start integrating the index-scheduler in meilisearch-lib

This commit is contained in:
Irevoire
2022-09-21 17:13:09 +02:00
committed by Clément Renault
parent 8f0fd35358
commit 250410495c
7 changed files with 163 additions and 146 deletions

32
Cargo.lock generated
View File

@@ -1020,6 +1020,37 @@ dependencies = [
"syn 1.0.101", "syn 1.0.101",
] ]
[[package]]
name = "derive_builder"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d07adf7be193b71cc36b193d0f5fe60b918a3a9db4dad0449f57bcfd519704a3"
dependencies = [
"derive_builder_macro",
]
[[package]]
name = "derive_builder_core"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f91d4cfa921f1c05904dc3c57b4a32c38aed3340cce209f3a6fd1478babafc4"
dependencies = [
"darling",
"proc-macro2 1.0.39",
"quote 1.0.18",
"syn 1.0.96",
]
[[package]]
name = "derive_builder_macro"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f0314b72bed045f3a68671b3c86328386762c93f82d98c65c3cb5e5f573dd68"
dependencies = [
"derive_builder_core",
"syn 1.0.96",
]
[[package]] [[package]]
name = "derive_more" name = "derive_more"
version = "0.99.17" version = "0.99.17"
@@ -1796,6 +1827,7 @@ dependencies = [
"big_s", "big_s",
"bincode", "bincode",
"csv", "csv",
"derive_builder",
"document-formats", "document-formats",
"file-store", "file-store",
"index", "index",

View File

@@ -22,6 +22,7 @@ thiserror = "1.0.30"
time = { version = "0.3.7", features = ["serde-well-known", "formatting", "parsing", "macros"] } time = { version = "0.3.7", features = ["serde-well-known", "formatting", "parsing", "macros"] }
uuid = { version = "1.1.2", features = ["serde", "v4"] } uuid = { version = "1.1.2", features = ["serde", "v4"] }
synchronoise = "1.0.1" synchronoise = "1.0.1"
derive_builder = "0.11.2"
[dev-dependencies] [dev-dependencies]
nelson = { git = "https://github.com/meilisearch/nelson.git", rev = "675f13885548fb415ead8fbb447e9e6d9314000a"} nelson = { git = "https://github.com/meilisearch/nelson.git", rev = "675f13885548fb415ead8fbb447e9e6d9314000a"}

View File

@@ -1,6 +1,8 @@
use milli::heed; use milli::heed;
use thiserror::Error; use thiserror::Error;
use crate::TaskId;
#[derive(Error, Debug)] #[derive(Error, Debug)]
pub enum Error { pub enum Error {
#[error("Index `{0}` not found")] #[error("Index `{0}` not found")]
@@ -9,6 +11,8 @@ pub enum Error {
IndexAlreadyExists(String), IndexAlreadyExists(String),
#[error("Corrupted task queue.")] #[error("Corrupted task queue.")]
CorruptedTaskQueue, CorruptedTaskQueue,
#[error("Task `{0}` not found")]
TaskNotFound(TaskId),
#[error(transparent)] #[error(transparent)]
Heed(#[from] heed::Error), Heed(#[from] heed::Error),
#[error(transparent)] #[error(transparent)]

View File

@@ -107,6 +107,16 @@ impl IndexMapper {
Ok(index) Ok(index)
} }
pub fn indexes(&self, rtxn: &RoTxn) -> Result<Vec<Index>> {
self.index_mapping
.iter(&rtxn)?
.map(|ret| {
ret.map_err(Error::from)
.and_then(|(name, _)| self.index(rtxn, name))
})
.collect()
}
/// Swap two index name. /// Swap two index name.
pub fn swap(&self, wtxn: &mut RwTxn, lhs: &str, rhs: &str) -> Result<()> { pub fn swap(&self, wtxn: &mut RwTxn, lhs: &str, rhs: &str) -> Result<()> {
let lhs_uuid = self let lhs_uuid = self

View File

@@ -1,6 +1,6 @@
use crate::index_mapper::IndexMapper; use crate::index_mapper::IndexMapper;
use crate::task::{Kind, KindWithContent, Status, Task, TaskView}; use crate::task::{Kind, KindWithContent, Status, Task, TaskView};
use crate::{Error, Result}; use crate::{Error, Result, TaskId};
use file_store::FileStore; use file_store::FileStore;
use index::Index; use index::Index;
use milli::update::IndexerConfig; use milli::update::IndexerConfig;
@@ -20,7 +20,7 @@ use serde::Deserialize;
const DEFAULT_LIMIT: fn() -> u32 = || 20; const DEFAULT_LIMIT: fn() -> u32 = || 20;
#[derive(Debug, Clone, Deserialize)] #[derive(derive_builder::Builder, Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct Query { pub struct Query {
#[serde(default = "DEFAULT_LIMIT")] #[serde(default = "DEFAULT_LIMIT")]
@@ -32,6 +32,38 @@ pub struct Query {
index_uid: Option<Vec<String>>, index_uid: Option<Vec<String>>,
} }
impl Default for Query {
fn default() -> Self {
Self {
limit: DEFAULT_LIMIT(),
from: None,
status: None,
kind: None,
index_uid: None,
}
}
}
impl Query {
pub fn with_status(self, status: Status) -> Self {
let mut status_vec = self.status.unwrap_or_default();
status_vec.push(status);
Self {
status: Some(status_vec),
..self
}
}
pub fn with_kind(self, kind: Kind) -> Self {
let mut kind_vec = self.kind.unwrap_or_default();
kind_vec.push(kind);
Self {
kind: Some(kind_vec),
..self
}
}
}
pub mod db_name { pub mod db_name {
pub const ALL_TASKS: &str = "all-tasks"; pub const ALL_TASKS: &str = "all-tasks";
pub const STATUS: &str = "status"; pub const STATUS: &str = "status";
@@ -73,20 +105,20 @@ pub struct IndexScheduler {
impl IndexScheduler { impl IndexScheduler {
pub fn new( pub fn new(
db_path: PathBuf, tasks_path: PathBuf,
update_file_path: PathBuf, update_file_path: PathBuf,
indexes_path: PathBuf, indexes_path: PathBuf,
index_size: usize, index_size: usize,
indexer_config: IndexerConfig, indexer_config: IndexerConfig,
) -> Result<Self> { ) -> Result<Self> {
std::fs::create_dir_all(&db_path)?; std::fs::create_dir_all(&tasks_path)?;
std::fs::create_dir_all(&update_file_path)?; std::fs::create_dir_all(&update_file_path)?;
std::fs::create_dir_all(&indexes_path)?; std::fs::create_dir_all(&indexes_path)?;
let mut options = heed::EnvOpenOptions::new(); let mut options = heed::EnvOpenOptions::new();
options.max_dbs(6); options.max_dbs(6);
let env = options.open(db_path)?; let env = options.open(tasks_path)?;
// we want to start the loop right away in case meilisearch was ctrl+Ced while processing things // we want to start the loop right away in case meilisearch was ctrl+Ced while processing things
let wake_up = SignalEvent::auto(true); let wake_up = SignalEvent::auto(true);
@@ -115,6 +147,12 @@ impl IndexScheduler {
self.index_mapper.index(&rtxn, name) self.index_mapper.index(&rtxn, name)
} }
/// Return and open all the indexes.
pub fn indexes(&self) -> Result<Vec<Index>> {
let rtxn = self.env.read_txn()?;
self.index_mapper.indexes(&rtxn)
}
/// Returns the tasks corresponding to the query. /// Returns the tasks corresponding to the query.
pub fn get_tasks(&self, query: Query) -> Result<Vec<TaskView>> { pub fn get_tasks(&self, query: Query) -> Result<Vec<TaskView>> {
let rtxn = self.env.read_txn()?; let rtxn = self.env.read_txn()?;
@@ -155,6 +193,15 @@ impl IndexScheduler {
Ok(tasks.into_iter().map(|task| task.as_task_view()).collect()) Ok(tasks.into_iter().map(|task| task.as_task_view()).collect())
} }
/// Returns the tasks corresponding to the query.
pub fn task(&self, uid: TaskId) -> Result<TaskView> {
let rtxn = self.env.read_txn()?;
self.get_task(&rtxn, uid).and_then(|opt| {
opt.ok_or(Error::TaskNotFound(uid))
.map(|task| task.as_task_view())
})
}
/// Register a new task in the scheduler. If it fails and data was associated with the task /// Register a new task in the scheduler. If it fails and data was associated with the task
/// it tries to delete the file. /// it tries to delete the file.
pub fn register(&self, task: KindWithContent) -> Result<TaskView> { pub fn register(&self, task: KindWithContent) -> Result<TaskView> {

View File

@@ -9,12 +9,12 @@ mod utils;
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
pub type TaskId = u32; pub type TaskId = u32;
pub use crate::index_scheduler::IndexScheduler; pub use crate::index_scheduler::{IndexScheduler, Query};
pub use error::Error; pub use error::Error;
/// from the exterior you don't need to know there is multiple type of `Kind` /// from the exterior you don't need to know there is multiple type of `Kind`
pub use task::KindWithContent as TaskKind; pub use task::KindWithContent;
/// from the exterior you don't need to know there is multiple type of `Task` /// from the exterior you don't need to know there is multiple type of `Task`
pub use task::TaskView as Task; pub use task::TaskView;
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {

View File

@@ -1,39 +1,30 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::fmt; use std::fmt;
use std::io::Cursor;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use actix_web::error::PayloadError; use actix_web::error::PayloadError;
use bytes::Bytes; use bytes::Bytes;
use file_store::FileStore;
use futures::Stream; use futures::Stream;
use futures::StreamExt; use index_scheduler::task::{Status, Task};
use index_scheduler::IndexScheduler; use index_scheduler::{IndexScheduler, KindWithContent, TaskId, TaskView};
use index_scheduler::TaskKind;
use meilisearch_auth::SearchRules; use meilisearch_auth::SearchRules;
use meilisearch_types::index_uid::IndexUid;
use milli::update::{IndexDocumentsMethod, IndexerConfig}; use milli::update::{IndexDocumentsMethod, IndexerConfig};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use time::OffsetDateTime; use time::OffsetDateTime;
use tokio::sync::RwLock;
use tokio::task::spawn_blocking; use tokio::task::spawn_blocking;
use tokio::time::sleep; use tokio::time::sleep;
use uuid::Uuid; use uuid::Uuid;
use crate::document_formats::{read_csv, read_json, read_ndjson};
// use crate::dump::{self, load_dump, DumpHandler}; // use crate::dump::{self, load_dump, DumpHandler};
use crate::options::{IndexerOpts, SchedulerConfig}; use crate::options::{IndexerOpts, SchedulerConfig};
use crate::snapshot::{load_snapshot, SnapshotService}; // use crate::snapshot::{load_snapshot, SnapshotService};
use error::Result; use error::Result;
use index::{ use index::{
Checked, Document, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings, Unchecked, Checked, Document, Index, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings, Unchecked,
}; };
use self::error::IndexControllerError;
pub mod error; pub mod error;
pub mod versioning; pub mod versioning;
@@ -187,7 +178,6 @@ impl IndexControllerBuilder {
let meta_env = Arc::new(open_meta_env(db_path.as_ref(), task_store_size)?); let meta_env = Arc::new(open_meta_env(db_path.as_ref(), task_store_size)?);
let file_store = FileStore::new(&db_path)?;
// Create or overwrite the version file for this DB // Create or overwrite the version file for this DB
versioning::create_version_file(db_path.as_ref())?; versioning::create_version_file(db_path.as_ref())?;
@@ -204,13 +194,15 @@ impl IndexControllerBuilder {
max_positions_per_attributes: None, max_positions_per_attributes: None,
}; };
let scheduler = IndexScheduler::new( let index_scheduler = IndexScheduler::new(
db_path.as_ref().to_path_buf(), db_path.as_ref().join("tasks"),
db_path.as_ref().join("update_files"),
db_path.as_ref().join("indexes"),
index_size, index_size,
indexer_config, indexer_config,
file_store, )?;
);
/*
if self.schedule_snapshot { if self.schedule_snapshot {
let snapshot_period = self let snapshot_period = self
.snapshot_interval .snapshot_interval
@@ -230,10 +222,9 @@ impl IndexControllerBuilder {
tokio::task::spawn_local(snapshot_service.run()); tokio::task::spawn_local(snapshot_service.run());
} }
*/
Ok(Meilisearch { Ok(Meilisearch { index_scheduler })
index_scheduler: scheduler,
})
} }
/// Set the index controller builder's max update store size. /// Set the index controller builder's max update store size.
@@ -318,100 +309,25 @@ impl Meilisearch {
IndexControllerBuilder::default() IndexControllerBuilder::default()
} }
pub async fn register_task(&self, task: TaskKind) -> Result<Task> { pub async fn register_task(&self, task: KindWithContent) -> Result<TaskView> {
Ok(self.index_scheduler.register(task).await?) let this = self.clone();
} Ok(
tokio::task::spawn_blocking(move || this.clone().index_scheduler.register(task))
pub async fn get_task(&self, id: TaskId, filter: Option<TaskFilter>) -> Result<Task> { .await??,
let task = self.scheduler.read().await.get_task(id, filter).await?;
Ok(task)
}
pub async fn get_index_task(&self, index_uid: String, task_id: TaskId) -> Result<Task> {
let creation_task_id = self
.index_resolver
.get_index_creation_task_id(index_uid.clone())
.await?;
if task_id < creation_task_id {
return Err(TaskError::UnexistingTask(task_id).into());
}
let mut filter = TaskFilter::default();
filter.filter_index(index_uid);
let task = self
.scheduler
.read()
.await
.get_task(task_id, Some(filter))
.await?;
Ok(task)
}
pub async fn list_tasks(
&self,
filter: Option<TaskFilter>,
limit: Option<usize>,
offset: Option<TaskId>,
) -> Result<Vec<Task>> {
let tasks = self
.scheduler
.read()
.await
.list_tasks(offset, filter, limit)
.await?;
Ok(tasks)
}
pub async fn list_index_task(
&self,
index_uid: String,
limit: Option<usize>,
offset: Option<TaskId>,
) -> Result<Vec<Task>> {
let task_id = self
.index_resolver
.get_index_creation_task_id(index_uid.clone())
.await?;
let mut filter = TaskFilter::default();
filter.filter_index(index_uid);
let tasks = self
.scheduler
.read()
.await
.list_tasks(
Some(offset.unwrap_or_default() + task_id),
Some(filter),
limit,
) )
.await?;
Ok(tasks)
} }
pub async fn list_indexes(&self) -> Result<Vec<IndexMetadata>> { pub async fn get_task(&self, id: TaskId) -> Result<TaskView> {
let indexes = self.index_resolver.list().await?; Ok(self.index_scheduler.task(id)?)
let mut ret = Vec::new();
for (uid, index) in indexes {
let meta = index.meta()?;
let meta = IndexMetadata {
uuid: index.uuid(),
uid,
meta,
};
ret.push(meta);
} }
Ok(ret) pub async fn list_tasks(&self, filter: index_scheduler::Query) -> Result<Vec<TaskView>> {
Ok(self.index_scheduler.get_tasks(filter)?)
} }
pub async fn settings(&self, uid: String) -> Result<Settings<Checked>> { pub async fn list_indexes(&self) -> Result<Vec<Index>> {
let index = self.index_resolver.get_index(uid).await?; let this = self.clone();
let settings = spawn_blocking(move || index.settings()).await??; Ok(spawn_blocking(move || this.index_scheduler.indexes()).await??)
Ok(settings)
} }
/// Return the total number of documents contained in the index + the selected documents. /// Return the total number of documents contained in the index + the selected documents.
@@ -422,11 +338,12 @@ impl Meilisearch {
limit: usize, limit: usize,
attributes_to_retrieve: Option<Vec<String>>, attributes_to_retrieve: Option<Vec<String>>,
) -> Result<(u64, Vec<Document>)> { ) -> Result<(u64, Vec<Document>)> {
let index = self.index_resolver.get_index(uid).await?; let this = self.clone();
let result = spawn_blocking(move || -> Result<_> {
spawn_blocking(move || index.retrieve_documents(offset, limit, attributes_to_retrieve)) let index = this.index_scheduler.index(&uid)?;
.await??; Ok(index.retrieve_documents(offset, limit, attributes_to_retrieve)?)
Ok(result) })
.await?
} }
pub async fn document( pub async fn document(
@@ -435,35 +352,38 @@ impl Meilisearch {
doc_id: String, doc_id: String,
attributes_to_retrieve: Option<Vec<String>>, attributes_to_retrieve: Option<Vec<String>>,
) -> Result<Document> { ) -> Result<Document> {
let index = self.index_resolver.get_index(uid).await?; let this = self.clone();
let document = spawn_blocking(move || -> Result<_> {
spawn_blocking(move || index.retrieve_document(doc_id, attributes_to_retrieve)) let index = this.index_scheduler.index(&uid)?;
.await??; Ok(index.retrieve_document(doc_id, attributes_to_retrieve)?)
Ok(document) })
.await?
} }
pub async fn search(&self, uid: String, query: SearchQuery) -> Result<SearchResult> { pub async fn search(&self, uid: String, query: SearchQuery) -> Result<SearchResult> {
let index = self.index_resolver.get_index(uid).await?; let this = self.clone();
let result = spawn_blocking(move || index.perform_search(query)).await??; spawn_blocking(move || -> Result<_> {
Ok(result) let index = this.index_scheduler.index(&uid)?;
Ok(index.perform_search(query)?)
})
.await?
} }
pub async fn get_index(&self, uid: String) -> Result<IndexMetadata> { pub async fn get_index(&self, uid: String) -> Result<Index> {
let index = self.index_resolver.get_index(uid.clone()).await?; let this = self.clone();
let uuid = index.uuid(); Ok(spawn_blocking(move || this.index_scheduler.index(&uid)).await??)
let meta = spawn_blocking(move || index.meta()).await??;
let meta = IndexMetadata { uuid, uid, meta };
Ok(meta)
} }
pub async fn get_index_stats(&self, uid: String) -> Result<IndexStats> { pub async fn get_index_stats(&self, uid: String) -> Result<IndexStats> {
let processing_tasks = self.scheduler.read().await.get_processing_tasks().await?; let processing_tasks = self
.index_scheduler
.get_tasks(index_scheduler::Query::default().with_status(Status::Processing))?;
// Check if the currently indexing update is from our index. // Check if the currently indexing update is from our index.
let is_indexing = processing_tasks let is_indexing = processing_tasks.first().map_or(false, |task| {
.first() task.index_uid.as_ref().map_or(false, |u| u == &uid)
.map_or(false, |task| task.index_uid().map_or(false, |u| u == uid)); });
let index = self.index_resolver.get_index(uid).await?; let index = self.get_index(uid).await?;
let mut stats = spawn_blocking(move || index.stats()).await??; let mut stats = spawn_blocking(move || index.stats()).await??;
stats.is_indexing = Some(is_indexing); stats.is_indexing = Some(is_indexing);
@@ -474,12 +394,15 @@ impl Meilisearch {
let mut last_task: Option<OffsetDateTime> = None; let mut last_task: Option<OffsetDateTime> = None;
let mut indexes = BTreeMap::new(); let mut indexes = BTreeMap::new();
let mut database_size = 0; let mut database_size = 0;
let processing_tasks = self.scheduler.read().await.get_processing_tasks().await?; let processing_tasks = self
.index_scheduler
.get_tasks(index_scheduler::Query::default().with_status(Status::Processing))?;
for (index_uid, index) in self.index_resolver.list().await? { for index in self.list_indexes().await? {
if !search_rules.is_index_authorized(&index_uid) { if !search_rules.is_index_authorized(&index.name) {
continue; continue;
} }
let index_name = index.name.clone();
let (mut stats, meta) = let (mut stats, meta) =
spawn_blocking::<_, Result<(IndexStats, IndexMeta)>>(move || { spawn_blocking::<_, Result<(IndexStats, IndexMeta)>>(move || {
@@ -496,10 +419,10 @@ impl Meilisearch {
// Check if the currently indexing update is from our index. // Check if the currently indexing update is from our index.
stats.is_indexing = processing_tasks stats.is_indexing = processing_tasks
.first() .first()
.and_then(|p| p.index_uid().map(|u| u == index_uid)) .and_then(|p| p.index_uid.as_ref().map(|u| u == &index_name))
.or(Some(false)); .or(Some(false));
indexes.insert(index_uid, stats); indexes.insert(index_name, stats);
} }
Ok(Stats { Ok(Stats {