start integrating the index-scheduler in the meilisearch codebase

This commit is contained in:
Tamo
2022-09-14 16:16:53 +02:00
committed by Clément Renault
parent b816535e33
commit fc098022c7
28 changed files with 679 additions and 3170 deletions

View File

@ -1,71 +0,0 @@
use std::fmt;
use meilisearch_types::error::{Code, ErrorCode};
use meilisearch_types::index_uid::IndexUidFormatError;
use meilisearch_types::internal_error;
use tokio::sync::mpsc::error::SendError as MpscSendError;
use tokio::sync::oneshot::error::RecvError as OneshotRecvError;
use uuid::Uuid;
use crate::{error::MilliError, index::error::IndexError, update_file_store::UpdateFileStoreError};
pub type Result<T> = std::result::Result<T, IndexResolverError>;
#[derive(thiserror::Error, Debug)]
pub enum IndexResolverError {
#[error("{0}")]
IndexError(#[from] IndexError),
#[error("Index `{0}` already exists.")]
IndexAlreadyExists(String),
#[error("Index `{0}` not found.")]
UnexistingIndex(String),
#[error("A primary key is already present. It's impossible to update it")]
ExistingPrimaryKey,
#[error("An internal error has occurred. `{0}`.")]
Internal(Box<dyn std::error::Error + Send + Sync + 'static>),
#[error("The creation of the `{0}` index has failed due to `Index uuid is already assigned`.")]
UuidAlreadyExists(Uuid),
#[error("{0}")]
Milli(#[from] milli::Error),
#[error("{0}")]
BadlyFormatted(#[from] IndexUidFormatError),
}
impl<T> From<MpscSendError<T>> for IndexResolverError
where
T: Send + Sync + 'static + fmt::Debug,
{
fn from(other: tokio::sync::mpsc::error::SendError<T>) -> Self {
Self::Internal(Box::new(other))
}
}
impl From<OneshotRecvError> for IndexResolverError {
fn from(other: tokio::sync::oneshot::error::RecvError) -> Self {
Self::Internal(Box::new(other))
}
}
internal_error!(
IndexResolverError: milli::heed::Error,
uuid::Error,
std::io::Error,
tokio::task::JoinError,
serde_json::Error,
UpdateFileStoreError
);
impl ErrorCode for IndexResolverError {
fn error_code(&self) -> Code {
match self {
IndexResolverError::IndexError(e) => e.error_code(),
IndexResolverError::IndexAlreadyExists(_) => Code::IndexAlreadyExists,
IndexResolverError::UnexistingIndex(_) => Code::IndexNotFound,
IndexResolverError::ExistingPrimaryKey => Code::PrimaryKeyAlreadyPresent,
IndexResolverError::Internal(_) => Code::Internal,
IndexResolverError::UuidAlreadyExists(_) => Code::CreateIndex,
IndexResolverError::Milli(e) => MilliError(e).error_code(),
IndexResolverError::BadlyFormatted(_) => Code::InvalidIndexUid,
}
}
}

View File

@ -1,223 +0,0 @@
use std::collections::HashSet;
use std::fs::{create_dir_all, File};
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use walkdir::WalkDir;
use milli::heed::types::{SerdeBincode, Str};
use milli::heed::{CompactionOption, Database, Env};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use super::error::{IndexResolverError, Result};
use crate::tasks::task::TaskId;
#[derive(Serialize, Deserialize)]
pub struct DumpEntry {
pub uid: String,
pub index_meta: IndexMeta,
}
const UUIDS_DB_PATH: &str = "index_uuids";
#[async_trait::async_trait]
#[cfg_attr(test, mockall::automock)]
pub trait IndexMetaStore: Sized {
// Create a new entry for `name`. Return an error if `err` and the entry already exists, return
// the uuid otherwise.
async fn get(&self, uid: String) -> Result<(String, Option<IndexMeta>)>;
async fn delete(&self, uid: String) -> Result<Option<IndexMeta>>;
async fn list(&self) -> Result<Vec<(String, IndexMeta)>>;
async fn insert(&self, name: String, meta: IndexMeta) -> Result<()>;
async fn snapshot(&self, path: PathBuf) -> Result<HashSet<Uuid>>;
async fn get_size(&self) -> Result<u64>;
async fn dump(&self, path: PathBuf) -> Result<()>;
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct IndexMeta {
pub uuid: Uuid,
pub creation_task_id: TaskId,
}
#[derive(Clone)]
pub struct HeedMetaStore {
env: Arc<Env>,
db: Database<Str, SerdeBincode<IndexMeta>>,
}
impl Drop for HeedMetaStore {
fn drop(&mut self) {
if Arc::strong_count(&self.env) == 1 {
self.env.as_ref().clone().prepare_for_closing();
}
}
}
impl HeedMetaStore {
pub fn new(env: Arc<milli::heed::Env>) -> Result<Self> {
let db = env.create_database(Some("uuids"))?;
Ok(Self { env, db })
}
fn get(&self, name: &str) -> Result<Option<IndexMeta>> {
let env = self.env.clone();
let db = self.db;
let txn = env.read_txn()?;
match db.get(&txn, name)? {
Some(meta) => Ok(Some(meta)),
None => Ok(None),
}
}
fn delete(&self, uid: String) -> Result<Option<IndexMeta>> {
let env = self.env.clone();
let db = self.db;
let mut txn = env.write_txn()?;
match db.get(&txn, &uid)? {
Some(meta) => {
db.delete(&mut txn, &uid)?;
txn.commit()?;
Ok(Some(meta))
}
None => Ok(None),
}
}
fn list(&self) -> Result<Vec<(String, IndexMeta)>> {
let env = self.env.clone();
let db = self.db;
let txn = env.read_txn()?;
let mut entries = Vec::new();
for entry in db.iter(&txn)? {
let (name, meta) = entry?;
entries.push((name.to_string(), meta))
}
Ok(entries)
}
pub(crate) fn insert(&self, name: String, meta: IndexMeta) -> Result<()> {
let env = self.env.clone();
let db = self.db;
let mut txn = env.write_txn()?;
if db.get(&txn, &name)?.is_some() {
return Err(IndexResolverError::IndexAlreadyExists(name));
}
db.put(&mut txn, &name, &meta)?;
txn.commit()?;
Ok(())
}
fn snapshot(&self, mut path: PathBuf) -> Result<HashSet<Uuid>> {
// Write transaction to acquire a lock on the database.
let txn = self.env.write_txn()?;
let mut entries = HashSet::new();
for entry in self.db.iter(&txn)? {
let (_, IndexMeta { uuid, .. }) = entry?;
entries.insert(uuid);
}
// only perform snapshot if there are indexes
if !entries.is_empty() {
path.push(UUIDS_DB_PATH);
create_dir_all(&path).unwrap();
path.push("data.mdb");
self.env.copy_to_path(path, CompactionOption::Enabled)?;
}
Ok(entries)
}
fn get_size(&self) -> Result<u64> {
Ok(WalkDir::new(self.env.path())
.into_iter()
.filter_map(|entry| entry.ok())
.filter_map(|entry| entry.metadata().ok())
.filter(|metadata| metadata.is_file())
.fold(0, |acc, m| acc + m.len()))
}
pub fn dump(&self, path: PathBuf) -> Result<()> {
let dump_path = path.join(UUIDS_DB_PATH);
create_dir_all(&dump_path)?;
let dump_file_path = dump_path.join("data.jsonl");
let mut dump_file = File::create(&dump_file_path)?;
let txn = self.env.read_txn()?;
for entry in self.db.iter(&txn)? {
let (uid, index_meta) = entry?;
let uid = uid.to_string();
let entry = DumpEntry { uid, index_meta };
serde_json::to_writer(&mut dump_file, &entry)?;
dump_file.write_all(b"\n").unwrap();
}
Ok(())
}
pub fn load_dump(src: impl AsRef<Path>, env: Arc<milli::heed::Env>) -> Result<()> {
let src_indexes = src.as_ref().join(UUIDS_DB_PATH).join("data.jsonl");
let indexes = File::open(&src_indexes)?;
let mut indexes = BufReader::new(indexes);
let mut line = String::new();
let db = Self::new(env)?;
let mut txn = db.env.write_txn()?;
loop {
match indexes.read_line(&mut line) {
Ok(0) => break,
Ok(_) => {
let DumpEntry { uid, index_meta } = serde_json::from_str(&line)?;
db.db.put(&mut txn, &uid, &index_meta)?;
}
Err(e) => return Err(e.into()),
}
line.clear();
}
txn.commit()?;
Ok(())
}
}
#[async_trait::async_trait]
impl IndexMetaStore for HeedMetaStore {
async fn get(&self, name: String) -> Result<(String, Option<IndexMeta>)> {
let this = self.clone();
tokio::task::spawn_blocking(move || this.get(&name).map(|res| (name, res))).await?
}
async fn delete(&self, uid: String) -> Result<Option<IndexMeta>> {
let this = self.clone();
tokio::task::spawn_blocking(move || this.delete(uid)).await?
}
async fn list(&self) -> Result<Vec<(String, IndexMeta)>> {
let this = self.clone();
tokio::task::spawn_blocking(move || this.list()).await?
}
async fn insert(&self, name: String, meta: IndexMeta) -> Result<()> {
let this = self.clone();
tokio::task::spawn_blocking(move || this.insert(name, meta)).await?
}
async fn snapshot(&self, path: PathBuf) -> Result<HashSet<Uuid>> {
let this = self.clone();
tokio::task::spawn_blocking(move || this.snapshot(path)).await?
}
async fn get_size(&self) -> Result<u64> {
self.get_size()
}
async fn dump(&self, path: PathBuf) -> Result<()> {
let this = self.clone();
Ok(tokio::task::spawn_blocking(move || this.dump(path)).await??)
}
}

View File

@ -1,685 +0,0 @@
pub mod error;
pub mod index_store;
pub mod meta_store;
use std::convert::TryFrom;
use std::path::Path;
use std::sync::Arc;
use error::{IndexResolverError, Result};
use index_store::{IndexStore, MapIndexStore};
use meilisearch_types::error::ResponseError;
use meilisearch_types::index_uid::IndexUid;
use meta_store::{HeedMetaStore, IndexMetaStore};
use milli::heed::Env;
use milli::update::{DocumentDeletionResult, IndexerConfig};
use time::OffsetDateTime;
use tokio::task::spawn_blocking;
use uuid::Uuid;
use crate::index::{error::Result as IndexResult, Index};
use crate::options::IndexerOpts;
use crate::tasks::task::{DocumentDeletion, Task, TaskContent, TaskEvent, TaskId, TaskResult};
use crate::update_file_store::UpdateFileStore;
use self::meta_store::IndexMeta;
pub type HardStateIndexResolver = IndexResolver<HeedMetaStore, MapIndexStore>;
#[cfg(not(test))]
pub use real::IndexResolver;
#[cfg(test)]
pub use test::MockIndexResolver as IndexResolver;
pub fn create_index_resolver(
path: impl AsRef<Path>,
index_size: usize,
indexer_opts: &IndexerOpts,
meta_env: Arc<milli::heed::Env>,
file_store: UpdateFileStore,
) -> anyhow::Result<HardStateIndexResolver> {
let uuid_store = HeedMetaStore::new(meta_env)?;
let index_store = MapIndexStore::new(&path, index_size, indexer_opts)?;
Ok(IndexResolver::new(uuid_store, index_store, file_store))
}
mod real {
use super::*;
pub struct IndexResolver<U, I> {
pub(super) index_uuid_store: U,
pub(super) index_store: I,
pub(super) file_store: UpdateFileStore,
}
impl IndexResolver<HeedMetaStore, MapIndexStore> {
pub fn load_dump(
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
index_db_size: usize,
env: Arc<Env>,
indexer_opts: &IndexerOpts,
) -> anyhow::Result<()> {
HeedMetaStore::load_dump(&src, env)?;
let indexes_path = src.as_ref().join("indexes");
let indexes = indexes_path.read_dir()?;
let indexer_config = IndexerConfig::try_from(indexer_opts)?;
for index in indexes {
Index::load_dump(&index?.path(), &dst, index_db_size, &indexer_config)?;
}
Ok(())
}
}
impl<U, I> IndexResolver<U, I>
where
U: IndexMetaStore,
I: IndexStore,
{
pub fn new(index_uuid_store: U, index_store: I, file_store: UpdateFileStore) -> Self {
Self {
index_uuid_store,
index_store,
file_store,
}
}
pub async fn process_document_addition_batch(&self, tasks: &mut [Task]) {
fn get_content_uuid(task: &Task) -> Uuid {
match task {
Task {
content: TaskContent::DocumentAddition { content_uuid, .. },
..
} => *content_uuid,
_ => panic!("unexpected task in the document addition batch"),
}
}
let content_uuids = tasks.iter().map(get_content_uuid).collect::<Vec<_>>();
match tasks.first() {
Some(Task {
id,
content:
TaskContent::DocumentAddition {
merge_strategy,
primary_key,
allow_index_creation,
index_uid,
..
},
..
}) => {
let primary_key = primary_key.clone();
let method = *merge_strategy;
let index = if *allow_index_creation {
self.get_or_create_index(index_uid.clone(), *id).await
} else {
self.get_index(index_uid.as_str().to_string()).await
};
// If the index doesn't exist and we are not allowed to create it with the first
// task, we must fails the whole batch.
let now = OffsetDateTime::now_utc();
let index = match index {
Ok(index) => index,
Err(e) => {
let error = ResponseError::from(e);
for task in tasks.iter_mut() {
task.events.push(TaskEvent::Failed {
error: error.clone(),
timestamp: now,
});
}
return;
}
};
let file_store = self.file_store.clone();
let result = spawn_blocking(move || {
index.update_documents(
method,
primary_key,
file_store,
content_uuids.into_iter(),
)
})
.await;
match result {
Ok(Ok(results)) => {
for (task, result) in tasks.iter_mut().zip(results) {
let event = match result {
Ok(addition) => {
TaskEvent::succeeded(TaskResult::DocumentAddition {
indexed_documents: addition.indexed_documents,
})
}
Err(error) => {
TaskEvent::failed(IndexResolverError::from(error))
}
};
task.events.push(event);
}
}
Ok(Err(e)) => {
let event = TaskEvent::failed(e);
for task in tasks.iter_mut() {
task.events.push(event.clone());
}
}
Err(e) => {
let event = TaskEvent::failed(IndexResolverError::from(e));
for task in tasks.iter_mut() {
task.events.push(event.clone());
}
}
}
}
_ => panic!("invalid batch!"),
}
}
pub async fn delete_content_file(&self, content_uuid: Uuid) -> Result<()> {
self.file_store.delete(content_uuid).await?;
Ok(())
}
async fn process_task_inner(&self, task: &Task) -> Result<TaskResult> {
match &task.content {
TaskContent::DocumentAddition { .. } => {
panic!("updates should be handled by batch")
}
TaskContent::DocumentDeletion {
deletion: DocumentDeletion::Ids(ids),
index_uid,
} => {
let ids = ids.clone();
let index = self.get_index(index_uid.clone().into_inner()).await?;
let DocumentDeletionResult {
deleted_documents, ..
} = spawn_blocking(move || index.delete_documents(&ids)).await??;
Ok(TaskResult::DocumentDeletion { deleted_documents })
}
TaskContent::DocumentDeletion {
deletion: DocumentDeletion::Clear,
index_uid,
} => {
let index = self.get_index(index_uid.clone().into_inner()).await?;
let deleted_documents = spawn_blocking(move || -> IndexResult<u64> {
let number_documents = index.stats()?.number_of_documents;
index.clear_documents()?;
Ok(number_documents)
})
.await??;
Ok(TaskResult::ClearAll { deleted_documents })
}
TaskContent::SettingsUpdate {
settings,
is_deletion,
allow_index_creation,
index_uid,
} => {
let index = if *is_deletion || !*allow_index_creation {
self.get_index(index_uid.clone().into_inner()).await?
} else {
self.get_or_create_index(index_uid.clone(), task.id).await?
};
let settings = settings.clone();
spawn_blocking(move || index.update_settings(&settings.check())).await??;
Ok(TaskResult::Other)
}
TaskContent::IndexDeletion { index_uid } => {
let index = self.delete_index(index_uid.clone().into_inner()).await?;
let deleted_documents = spawn_blocking(move || -> IndexResult<u64> {
Ok(index.stats()?.number_of_documents)
})
.await??;
Ok(TaskResult::ClearAll { deleted_documents })
}
TaskContent::IndexCreation {
primary_key,
index_uid,
} => {
let index = self.create_index(index_uid.clone(), task.id).await?;
if let Some(primary_key) = primary_key {
let primary_key = primary_key.clone();
spawn_blocking(move || index.update_primary_key(primary_key)).await??;
}
Ok(TaskResult::Other)
}
TaskContent::IndexUpdate {
primary_key,
index_uid,
} => {
let index = self.get_index(index_uid.clone().into_inner()).await?;
if let Some(primary_key) = primary_key {
let primary_key = primary_key.clone();
spawn_blocking(move || index.update_primary_key(primary_key)).await??;
}
Ok(TaskResult::Other)
}
_ => unreachable!("Invalid task for index resolver"),
}
}
pub async fn process_task(&self, task: &mut Task) {
match self.process_task_inner(task).await {
Ok(res) => task.events.push(TaskEvent::succeeded(res)),
Err(e) => task.events.push(TaskEvent::failed(e)),
}
}
pub async fn dump(&self, path: impl AsRef<Path>) -> Result<()> {
for (_, index) in self.list().await? {
index.dump(&path)?;
}
self.index_uuid_store.dump(path.as_ref().to_owned()).await?;
Ok(())
}
async fn create_index(&self, uid: IndexUid, creation_task_id: TaskId) -> Result<Index> {
match self.index_uuid_store.get(uid.into_inner()).await? {
(uid, Some(_)) => Err(IndexResolverError::IndexAlreadyExists(uid)),
(uid, None) => {
let uuid = Uuid::new_v4();
let index = self.index_store.create(uuid).await?;
match self
.index_uuid_store
.insert(
uid,
IndexMeta {
uuid,
creation_task_id,
},
)
.await
{
Err(e) => {
match self.index_store.delete(uuid).await {
Ok(Some(index)) => {
index.close();
}
Ok(None) => (),
Err(e) => log::error!("Error while deleting index: {:?}", e),
}
Err(e)
}
Ok(()) => Ok(index),
}
}
}
}
/// Get or create an index with name `uid`.
pub async fn get_or_create_index(&self, uid: IndexUid, task_id: TaskId) -> Result<Index> {
match self.create_index(uid, task_id).await {
Ok(index) => Ok(index),
Err(IndexResolverError::IndexAlreadyExists(uid)) => self.get_index(uid).await,
Err(e) => Err(e),
}
}
pub async fn list(&self) -> Result<Vec<(String, Index)>> {
let uuids = self.index_uuid_store.list().await?;
let mut indexes = Vec::new();
for (name, IndexMeta { uuid, .. }) in uuids {
match self.index_store.get(uuid).await? {
Some(index) => indexes.push((name, index)),
None => {
// we found an unexisting index, we remove it from the uuid store
let _ = self.index_uuid_store.delete(name).await;
}
}
}
Ok(indexes)
}
pub async fn delete_index(&self, uid: String) -> Result<Index> {
match self.index_uuid_store.delete(uid.clone()).await? {
Some(IndexMeta { uuid, .. }) => match self.index_store.delete(uuid).await? {
Some(index) => {
index.clone().close();
Ok(index)
}
None => Err(IndexResolverError::UnexistingIndex(uid)),
},
None => Err(IndexResolverError::UnexistingIndex(uid)),
}
}
pub async fn get_index(&self, uid: String) -> Result<Index> {
match self.index_uuid_store.get(uid).await? {
(name, Some(IndexMeta { uuid, .. })) => {
match self.index_store.get(uuid).await? {
Some(index) => Ok(index),
None => {
// For some reason we got a uuid to an unexisting index, we return an error,
// and remove the uuid from the uuid store.
let _ = self.index_uuid_store.delete(name.clone()).await;
Err(IndexResolverError::UnexistingIndex(name))
}
}
}
(name, _) => Err(IndexResolverError::UnexistingIndex(name)),
}
}
pub async fn get_index_creation_task_id(&self, index_uid: String) -> Result<TaskId> {
let (uid, meta) = self.index_uuid_store.get(index_uid).await?;
meta.map(
|IndexMeta {
creation_task_id, ..
}| creation_task_id,
)
.ok_or(IndexResolverError::UnexistingIndex(uid))
}
}
}
#[cfg(test)]
mod test {
use crate::index::IndexStats;
use super::index_store::MockIndexStore;
use super::meta_store::MockIndexMetaStore;
use super::*;
use futures::future::ok;
use milli::FieldDistribution;
use nelson::Mocker;
pub enum MockIndexResolver<U, I> {
Real(super::real::IndexResolver<U, I>),
Mock(Mocker),
}
impl MockIndexResolver<HeedMetaStore, MapIndexStore> {
pub fn load_dump(
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
index_db_size: usize,
env: Arc<Env>,
indexer_opts: &IndexerOpts,
) -> anyhow::Result<()> {
super::real::IndexResolver::load_dump(src, dst, index_db_size, env, indexer_opts)
}
}
impl<U, I> MockIndexResolver<U, I>
where
U: IndexMetaStore,
I: IndexStore,
{
pub fn new(index_uuid_store: U, index_store: I, file_store: UpdateFileStore) -> Self {
Self::Real(super::real::IndexResolver {
index_uuid_store,
index_store,
file_store,
})
}
pub fn mock(mocker: Mocker) -> Self {
Self::Mock(mocker)
}
pub async fn process_document_addition_batch(&self, tasks: &mut [Task]) {
match self {
IndexResolver::Real(r) => r.process_document_addition_batch(tasks).await,
IndexResolver::Mock(m) => unsafe {
m.get("process_document_addition_batch").call(tasks)
},
}
}
pub async fn process_task(&self, task: &mut Task) {
match self {
IndexResolver::Real(r) => r.process_task(task).await,
IndexResolver::Mock(m) => unsafe { m.get("process_task").call(task) },
}
}
pub async fn dump(&self, path: impl AsRef<Path>) -> Result<()> {
match self {
IndexResolver::Real(r) => r.dump(path).await,
IndexResolver::Mock(_) => todo!(),
}
}
/// Get or create an index with name `uid`.
pub async fn get_or_create_index(&self, uid: IndexUid, task_id: TaskId) -> Result<Index> {
match self {
IndexResolver::Real(r) => r.get_or_create_index(uid, task_id).await,
IndexResolver::Mock(_) => todo!(),
}
}
pub async fn list(&self) -> Result<Vec<(String, Index)>> {
match self {
IndexResolver::Real(r) => r.list().await,
IndexResolver::Mock(_) => todo!(),
}
}
pub async fn delete_index(&self, uid: String) -> Result<Index> {
match self {
IndexResolver::Real(r) => r.delete_index(uid).await,
IndexResolver::Mock(_) => todo!(),
}
}
pub async fn get_index(&self, uid: String) -> Result<Index> {
match self {
IndexResolver::Real(r) => r.get_index(uid).await,
IndexResolver::Mock(_) => todo!(),
}
}
pub async fn get_index_creation_task_id(&self, index_uid: String) -> Result<TaskId> {
match self {
IndexResolver::Real(r) => r.get_index_creation_task_id(index_uid).await,
IndexResolver::Mock(_) => todo!(),
}
}
pub async fn delete_content_file(&self, content_uuid: Uuid) -> Result<()> {
match self {
IndexResolver::Real(r) => r.delete_content_file(content_uuid).await,
IndexResolver::Mock(m) => unsafe {
m.get("delete_content_file").call(content_uuid)
},
}
}
}
#[actix_rt::test]
async fn test_remove_unknown_index() {
let mut meta_store = MockIndexMetaStore::new();
meta_store
.expect_delete()
.once()
.returning(|_| Box::pin(ok(None)));
let index_store = MockIndexStore::new();
let mocker = Mocker::default();
let file_store = UpdateFileStore::mock(mocker);
let index_resolver = IndexResolver::new(meta_store, index_store, file_store);
let mut task = Task {
id: 1,
content: TaskContent::IndexDeletion {
index_uid: IndexUid::new_unchecked("test"),
},
events: Vec::new(),
};
index_resolver.process_task(&mut task).await;
assert!(matches!(task.events[0], TaskEvent::Failed { .. }));
}
#[actix_rt::test]
async fn test_remove_index() {
let mut meta_store = MockIndexMetaStore::new();
meta_store.expect_delete().once().returning(|_| {
Box::pin(ok(Some(IndexMeta {
uuid: Uuid::new_v4(),
creation_task_id: 1,
})))
});
let mut index_store = MockIndexStore::new();
index_store.expect_delete().once().returning(|_| {
let mocker = Mocker::default();
mocker.when::<(), ()>("close").then(|_| ());
mocker
.when::<(), IndexResult<IndexStats>>("stats")
.then(|_| {
Ok(IndexStats {
size: 10,
number_of_documents: 10,
is_indexing: None,
field_distribution: FieldDistribution::default(),
})
});
Box::pin(ok(Some(Index::mock(mocker))))
});
let mocker = Mocker::default();
let file_store = UpdateFileStore::mock(mocker);
let index_resolver = IndexResolver::new(meta_store, index_store, file_store);
let mut task = Task {
id: 1,
content: TaskContent::IndexDeletion {
index_uid: IndexUid::new_unchecked("test"),
},
events: Vec::new(),
};
index_resolver.process_task(&mut task).await;
assert!(matches!(task.events[0], TaskEvent::Succeeded { .. }));
}
#[actix_rt::test]
async fn test_delete_documents() {
let mut meta_store = MockIndexMetaStore::new();
meta_store.expect_get().once().returning(|_| {
Box::pin(ok((
"test".to_string(),
Some(IndexMeta {
uuid: Uuid::new_v4(),
creation_task_id: 1,
}),
)))
});
let mut index_store = MockIndexStore::new();
index_store.expect_get().once().returning(|_| {
let mocker = Mocker::default();
mocker
.when::<(), IndexResult<()>>("clear_documents")
.once()
.then(|_| Ok(()));
mocker
.when::<(), IndexResult<IndexStats>>("stats")
.once()
.then(|_| {
Ok(IndexStats {
size: 10,
number_of_documents: 10,
is_indexing: None,
field_distribution: FieldDistribution::default(),
})
});
Box::pin(ok(Some(Index::mock(mocker))))
});
let mocker = Mocker::default();
let file_store = UpdateFileStore::mock(mocker);
let index_resolver = IndexResolver::new(meta_store, index_store, file_store);
let mut task = Task {
id: 1,
content: TaskContent::DocumentDeletion {
deletion: DocumentDeletion::Clear,
index_uid: IndexUid::new_unchecked("test"),
},
events: Vec::new(),
};
index_resolver.process_task(&mut task).await;
assert!(matches!(task.events[0], TaskEvent::Succeeded { .. }));
}
#[actix_rt::test]
async fn test_index_update() {
let mut meta_store = MockIndexMetaStore::new();
meta_store.expect_get().once().returning(|_| {
Box::pin(ok((
"test".to_string(),
Some(IndexMeta {
uuid: Uuid::new_v4(),
creation_task_id: 1,
}),
)))
});
let mut index_store = MockIndexStore::new();
index_store.expect_get().once().returning(|_| {
let mocker = Mocker::default();
mocker
.when::<String, IndexResult<crate::index::IndexMeta>>("update_primary_key")
.once()
.then(|_| {
Ok(crate::index::IndexMeta {
created_at: OffsetDateTime::now_utc(),
updated_at: OffsetDateTime::now_utc(),
primary_key: Some("key".to_string()),
})
});
Box::pin(ok(Some(Index::mock(mocker))))
});
let mocker = Mocker::default();
let file_store = UpdateFileStore::mock(mocker);
let index_resolver = IndexResolver::new(meta_store, index_store, file_store);
let mut task = Task {
id: 1,
content: TaskContent::IndexUpdate {
primary_key: Some("key".to_string()),
index_uid: IndexUid::new_unchecked("test"),
},
events: Vec::new(),
};
index_resolver.process_task(&mut task).await;
assert!(matches!(task.events[0], TaskEvent::Succeeded { .. }));
}
}