mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-07-27 16:51:01 +00:00
restructure project
This commit is contained in:
@ -0,0 +1,127 @@
|
||||
mod index_actor;
|
||||
mod update_actor;
|
||||
mod uuid_resolver;
|
||||
mod update_store;
|
||||
mod update_handler;
|
||||
|
||||
use std::path::Path;
|
||||
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use uuid::Uuid;
|
||||
use super::IndexMetadata;
|
||||
use futures::stream::StreamExt;
|
||||
use actix_web::web::Payload;
|
||||
use super::UpdateMeta;
|
||||
use crate::index::{SearchResult, SearchQuery};
|
||||
use actix_web::web::Bytes;
|
||||
|
||||
use crate::index::Settings;
|
||||
use super::UpdateStatus;
|
||||
|
||||
pub struct IndexController {
|
||||
uuid_resolver: uuid_resolver::UuidResolverHandle,
|
||||
index_handle: index_actor::IndexActorHandle,
|
||||
update_handle: update_actor::UpdateActorHandle<Bytes>,
|
||||
}
|
||||
|
||||
enum IndexControllerMsg {
|
||||
CreateIndex {
|
||||
uuid: Uuid,
|
||||
primary_key: Option<String>,
|
||||
ret: oneshot::Sender<anyhow::Result<IndexMetadata>>,
|
||||
},
|
||||
Shutdown,
|
||||
}
|
||||
|
||||
impl IndexController {
|
||||
pub fn new(path: impl AsRef<Path>) -> Self {
|
||||
let uuid_resolver = uuid_resolver::UuidResolverHandle::new();
|
||||
let index_actor = index_actor::IndexActorHandle::new(&path);
|
||||
let update_handle = update_actor::UpdateActorHandle::new(index_actor.clone(), &path);
|
||||
Self { uuid_resolver, index_handle: index_actor, update_handle }
|
||||
}
|
||||
|
||||
pub async fn add_documents(
|
||||
&self,
|
||||
index: String,
|
||||
method: milli::update::IndexDocumentsMethod,
|
||||
format: milli::update::UpdateFormat,
|
||||
mut payload: Payload,
|
||||
primary_key: Option<String>,
|
||||
) -> anyhow::Result<super::UpdateStatus> {
|
||||
let uuid = self.uuid_resolver.get_or_create(index).await?;
|
||||
let meta = UpdateMeta::DocumentsAddition { method, format, primary_key };
|
||||
let (sender, receiver) = mpsc::channel(10);
|
||||
|
||||
// It is necessary to spawn a local task to senf the payload to the update handle to
|
||||
// prevent dead_locking between the update_handle::update that waits for the update to be
|
||||
// registered and the update_actor that waits for the the payload to be sent to it.
|
||||
tokio::task::spawn_local(async move {
|
||||
while let Some(bytes) = payload.next().await {
|
||||
match bytes {
|
||||
Ok(bytes) => { sender.send(Ok(bytes)).await; },
|
||||
Err(e) => {
|
||||
let error: Box<dyn std::error::Error + Sync + Send + 'static> = Box::new(e);
|
||||
sender.send(Err(error)).await; },
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// This must be done *AFTER* spawning the task.
|
||||
let status = self.update_handle.update(meta, receiver, uuid).await?;
|
||||
Ok(status)
|
||||
}
|
||||
|
||||
fn clear_documents(&self, index: String) -> anyhow::Result<UpdateStatus> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn delete_documents(&self, index: String, document_ids: Vec<String>) -> anyhow::Result<super::UpdateStatus> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn update_settings(&self, index_uid: String, settings: Settings) -> anyhow::Result<super::UpdateStatus> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
pub async fn create_index(&self, index_settings: super::IndexSettings) -> anyhow::Result<super::IndexMetadata> {
|
||||
let super::IndexSettings { name, primary_key } = index_settings;
|
||||
let uuid = self.uuid_resolver.create(name.unwrap()).await?;
|
||||
let index_meta = self.index_handle.create_index(uuid, primary_key).await?;
|
||||
Ok(index_meta)
|
||||
}
|
||||
|
||||
fn delete_index(&self, index_uid: String) -> anyhow::Result<()> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn swap_indices(&self, index1_uid: String, index2_uid: String) -> anyhow::Result<()> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
pub fn index(&self, name: String) -> anyhow::Result<Option<std::sync::Arc<milli::Index>>> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn update_status(&self, index: String, id: u64) -> anyhow::Result<Option<UpdateStatus>> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn all_update_status(&self, index: String) -> anyhow::Result<Vec<super::UpdateStatus>> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
pub fn list_indexes(&self) -> anyhow::Result<Vec<super::IndexMetadata>> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn update_index(&self, name: String, index_settings: super::IndexSettings) -> anyhow::Result<super::IndexMetadata> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
pub async fn search(&self, name: String, query: SearchQuery) -> anyhow::Result<SearchResult> {
|
||||
let uuid = self.uuid_resolver.resolve(name).await.unwrap().unwrap();
|
||||
let result = self.index_handle.search(uuid, query).await?;
|
||||
Ok(result)
|
||||
}
|
||||
}
|
590
meilisearch-http/src/index_controller/index_actor.rs
Normal file
590
meilisearch-http/src/index_controller/index_actor.rs
Normal file
@ -0,0 +1,590 @@
|
||||
use std::collections::{hash_map::Entry, HashMap};
|
||||
use std::fs::{create_dir_all, remove_dir_all, File};
|
||||
use std::future::Future;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_stream::stream;
|
||||
use chrono::{DateTime, Utc};
|
||||
use futures::pin_mut;
|
||||
use futures::stream::StreamExt;
|
||||
use heed::EnvOpenOptions;
|
||||
use log::info;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use thiserror::Error;
|
||||
use tokio::sync::{mpsc, oneshot, RwLock};
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::get_arc_ownership_blocking;
|
||||
use super::update_handler::UpdateHandler;
|
||||
use crate::index::UpdateResult as UResult;
|
||||
use crate::index::{Document, Index, SearchQuery, SearchResult, Settings};
|
||||
use crate::index_controller::{
|
||||
updates::{Failed, Processed, Processing},
|
||||
UpdateMeta,
|
||||
};
|
||||
use crate::option::IndexerOpts;
|
||||
|
||||
pub type Result<T> = std::result::Result<T, IndexError>;
|
||||
type AsyncMap<K, V> = Arc<RwLock<HashMap<K, V>>>;
|
||||
type UpdateResult = std::result::Result<Processed<UpdateMeta, UResult>, Failed<UpdateMeta, String>>;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct IndexMeta {
|
||||
uuid: Uuid,
|
||||
created_at: DateTime<Utc>,
|
||||
updated_at: DateTime<Utc>,
|
||||
primary_key: Option<String>,
|
||||
}
|
||||
|
||||
enum IndexMsg {
|
||||
CreateIndex {
|
||||
uuid: Uuid,
|
||||
primary_key: Option<String>,
|
||||
ret: oneshot::Sender<Result<IndexMeta>>,
|
||||
},
|
||||
Update {
|
||||
meta: Processing<UpdateMeta>,
|
||||
data: std::fs::File,
|
||||
ret: oneshot::Sender<Result<UpdateResult>>,
|
||||
},
|
||||
Search {
|
||||
uuid: Uuid,
|
||||
query: SearchQuery,
|
||||
ret: oneshot::Sender<anyhow::Result<SearchResult>>,
|
||||
},
|
||||
Settings {
|
||||
uuid: Uuid,
|
||||
ret: oneshot::Sender<Result<Settings>>,
|
||||
},
|
||||
Documents {
|
||||
uuid: Uuid,
|
||||
attributes_to_retrieve: Option<Vec<String>>,
|
||||
offset: usize,
|
||||
limit: usize,
|
||||
ret: oneshot::Sender<Result<Vec<Document>>>,
|
||||
},
|
||||
Document {
|
||||
uuid: Uuid,
|
||||
attributes_to_retrieve: Option<Vec<String>>,
|
||||
doc_id: String,
|
||||
ret: oneshot::Sender<Result<Document>>,
|
||||
},
|
||||
Delete {
|
||||
uuid: Uuid,
|
||||
ret: oneshot::Sender<Result<()>>,
|
||||
},
|
||||
GetMeta {
|
||||
uuid: Uuid,
|
||||
ret: oneshot::Sender<Result<Option<IndexMeta>>>,
|
||||
},
|
||||
}
|
||||
|
||||
struct IndexActor<S> {
|
||||
read_receiver: Option<mpsc::Receiver<IndexMsg>>,
|
||||
write_receiver: Option<mpsc::Receiver<IndexMsg>>,
|
||||
update_handler: Arc<UpdateHandler>,
|
||||
store: S,
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum IndexError {
|
||||
#[error("error with index: {0}")]
|
||||
Error(#[from] anyhow::Error),
|
||||
#[error("index already exists")]
|
||||
IndexAlreadyExists,
|
||||
#[error("Index doesn't exists")]
|
||||
UnexistingIndex,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
trait IndexStore {
|
||||
async fn create_index(&self, uuid: Uuid, primary_key: Option<String>) -> Result<IndexMeta>;
|
||||
async fn update_index<R, F>(&self, uuid: Uuid, f: F) -> Result<R>
|
||||
where
|
||||
F: FnOnce(Index) -> Result<R> + Send + Sync + 'static,
|
||||
R: Sync + Send + 'static;
|
||||
async fn get_or_create(&self, uuid: Uuid, primary_key: Option<String>) -> Result<Index>;
|
||||
async fn get(&self, uuid: Uuid) -> Result<Option<Index>>;
|
||||
async fn delete(&self, uuid: &Uuid) -> Result<Option<Index>>;
|
||||
async fn get_meta(&self, uuid: &Uuid) -> Result<Option<IndexMeta>>;
|
||||
}
|
||||
|
||||
impl<S: IndexStore + Sync + Send> IndexActor<S> {
|
||||
fn new(
|
||||
read_receiver: mpsc::Receiver<IndexMsg>,
|
||||
write_receiver: mpsc::Receiver<IndexMsg>,
|
||||
store: S,
|
||||
) -> Result<Self> {
|
||||
let options = IndexerOpts::default();
|
||||
let update_handler = UpdateHandler::new(&options)
|
||||
.map_err(|e| IndexError::Error(e.into()))?;
|
||||
let update_handler = Arc::new(update_handler);
|
||||
let read_receiver = Some(read_receiver);
|
||||
let write_receiver = Some(write_receiver);
|
||||
Ok(Self {
|
||||
read_receiver,
|
||||
write_receiver,
|
||||
store,
|
||||
update_handler,
|
||||
})
|
||||
}
|
||||
|
||||
/// `run` poll the write_receiver and read_receiver concurrently, but while messages send
|
||||
/// through the read channel are processed concurrently, the messages sent through the write
|
||||
/// channel are processed one at a time.
|
||||
async fn run(mut self) {
|
||||
let mut read_receiver = self
|
||||
.read_receiver
|
||||
.take()
|
||||
.expect("Index Actor must have a inbox at this point.");
|
||||
|
||||
let read_stream = stream! {
|
||||
loop {
|
||||
match read_receiver.recv().await {
|
||||
Some(msg) => yield msg,
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let mut write_receiver = self
|
||||
.write_receiver
|
||||
.take()
|
||||
.expect("Index Actor must have a inbox at this point.");
|
||||
|
||||
let write_stream = stream! {
|
||||
loop {
|
||||
match write_receiver.recv().await {
|
||||
Some(msg) => yield msg,
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
pin_mut!(write_stream);
|
||||
pin_mut!(read_stream);
|
||||
|
||||
let fut1 = read_stream.for_each_concurrent(Some(10), |msg| self.handle_message(msg));
|
||||
let fut2 = write_stream.for_each_concurrent(Some(1), |msg| self.handle_message(msg));
|
||||
|
||||
let fut1: Box<dyn Future<Output = ()> + Unpin + Send> = Box::new(fut1);
|
||||
let fut2: Box<dyn Future<Output = ()> + Unpin + Send> = Box::new(fut2);
|
||||
|
||||
tokio::join!(fut1, fut2);
|
||||
}
|
||||
|
||||
async fn handle_message(&self, msg: IndexMsg) {
|
||||
use IndexMsg::*;
|
||||
match msg {
|
||||
CreateIndex {
|
||||
uuid,
|
||||
primary_key,
|
||||
ret,
|
||||
} => {
|
||||
let _ = ret.send(self.handle_create_index(uuid, primary_key).await);
|
||||
}
|
||||
Update { ret, meta, data } => {
|
||||
let _ = ret.send(self.handle_update(meta, data).await);
|
||||
}
|
||||
Search { ret, query, uuid } => {
|
||||
let _ = ret.send(self.handle_search(uuid, query).await);
|
||||
}
|
||||
Settings { ret, uuid } => {
|
||||
let _ = ret.send(self.handle_settings(uuid).await);
|
||||
}
|
||||
Documents {
|
||||
ret,
|
||||
uuid,
|
||||
attributes_to_retrieve,
|
||||
offset,
|
||||
limit,
|
||||
} => {
|
||||
let _ = ret.send(
|
||||
self.handle_fetch_documents(uuid, offset, limit, attributes_to_retrieve)
|
||||
.await,
|
||||
);
|
||||
}
|
||||
Document {
|
||||
uuid,
|
||||
attributes_to_retrieve,
|
||||
doc_id,
|
||||
ret,
|
||||
} => {
|
||||
let _ = ret.send(
|
||||
self.handle_fetch_document(uuid, doc_id, attributes_to_retrieve)
|
||||
.await,
|
||||
);
|
||||
}
|
||||
Delete { uuid, ret } => {
|
||||
let _ = ret.send(self.handle_delete(uuid).await);
|
||||
}
|
||||
GetMeta { uuid, ret } => {
|
||||
let _ = ret.send(self.handle_get_meta(uuid).await);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_search(&self, uuid: Uuid, query: SearchQuery) -> anyhow::Result<SearchResult> {
|
||||
let index = self.store
|
||||
.get(uuid)
|
||||
.await?
|
||||
.ok_or(IndexError::UnexistingIndex)?;
|
||||
tokio::task::spawn_blocking(move || index.perform_search(query)).await?
|
||||
}
|
||||
|
||||
async fn handle_create_index(
|
||||
&self,
|
||||
uuid: Uuid,
|
||||
primary_key: Option<String>,
|
||||
) -> Result<IndexMeta> {
|
||||
self.store.create_index(uuid, primary_key).await
|
||||
}
|
||||
|
||||
async fn handle_update(
|
||||
&self,
|
||||
meta: Processing<UpdateMeta>,
|
||||
data: File,
|
||||
) -> Result<UpdateResult> {
|
||||
info!("Processing update {}", meta.id());
|
||||
let uuid = meta.index_uuid().clone();
|
||||
let update_handler = self.update_handler.clone();
|
||||
let handle = self
|
||||
.store
|
||||
.update_index(uuid, |index| {
|
||||
let handle = tokio::task::spawn_blocking(move || {
|
||||
update_handler.handle_update(meta, data, index)
|
||||
});
|
||||
Ok(handle)
|
||||
})
|
||||
.await?;
|
||||
|
||||
handle.await.map_err(|e| IndexError::Error(e.into()))
|
||||
}
|
||||
|
||||
async fn handle_settings(&self, uuid: Uuid) -> Result<Settings> {
|
||||
let index = self.store
|
||||
.get(uuid)
|
||||
.await?
|
||||
.ok_or(IndexError::UnexistingIndex)?;
|
||||
tokio::task::spawn_blocking(move || index.settings().map_err(|e| IndexError::Error(e)))
|
||||
.await
|
||||
.map_err(|e| IndexError::Error(e.into()))?
|
||||
}
|
||||
|
||||
async fn handle_fetch_documents(
|
||||
&self,
|
||||
uuid: Uuid,
|
||||
offset: usize,
|
||||
limit: usize,
|
||||
attributes_to_retrieve: Option<Vec<String>>,
|
||||
) -> Result<Vec<Document>> {
|
||||
let index = self.store.get(uuid)
|
||||
.await?
|
||||
.ok_or(IndexError::UnexistingIndex)?;
|
||||
tokio::task::spawn_blocking(move || {
|
||||
index
|
||||
.retrieve_documents(offset, limit, attributes_to_retrieve)
|
||||
.map_err(|e| IndexError::Error(e))
|
||||
})
|
||||
.await
|
||||
.map_err(|e| IndexError::Error(e.into()))?
|
||||
}
|
||||
|
||||
async fn handle_fetch_document(
|
||||
&self,
|
||||
uuid: Uuid,
|
||||
doc_id: String,
|
||||
attributes_to_retrieve: Option<Vec<String>>,
|
||||
) -> Result<Document> {
|
||||
let index = self
|
||||
.store
|
||||
.get(uuid)
|
||||
.await?
|
||||
.ok_or(IndexError::UnexistingIndex)?;
|
||||
tokio::task::spawn_blocking(move || {
|
||||
index
|
||||
.retrieve_document(doc_id, attributes_to_retrieve)
|
||||
.map_err(|e| IndexError::Error(e))
|
||||
})
|
||||
.await
|
||||
.map_err(|e| IndexError::Error(e.into()))?
|
||||
}
|
||||
|
||||
async fn handle_delete(&self, uuid: Uuid) -> Result<()> {
|
||||
let index = self.store.delete(&uuid).await?;
|
||||
|
||||
if let Some(index) = index {
|
||||
tokio::task::spawn(async move {
|
||||
let index = index.0;
|
||||
let store = get_arc_ownership_blocking(index).await;
|
||||
tokio::task::spawn_blocking(move || {
|
||||
store.prepare_for_closing().wait();
|
||||
info!("Index {} was closed.", uuid);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_get_meta(&self, uuid: Uuid) -> Result<Option<IndexMeta>> {
|
||||
let result = self.store.get_meta(&uuid).await?;
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct IndexActorHandle {
|
||||
read_sender: mpsc::Sender<IndexMsg>,
|
||||
write_sender: mpsc::Sender<IndexMsg>,
|
||||
}
|
||||
|
||||
impl IndexActorHandle {
|
||||
pub fn new(path: impl AsRef<Path>) -> anyhow::Result<Self> {
|
||||
let (read_sender, read_receiver) = mpsc::channel(100);
|
||||
let (write_sender, write_receiver) = mpsc::channel(100);
|
||||
|
||||
let store = MapIndexStore::new(path);
|
||||
let actor = IndexActor::new(read_receiver, write_receiver, store)?;
|
||||
tokio::task::spawn(actor.run());
|
||||
Ok(Self {
|
||||
read_sender,
|
||||
write_sender,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn create_index(&self, uuid: Uuid, primary_key: Option<String>) -> Result<IndexMeta> {
|
||||
let (ret, receiver) = oneshot::channel();
|
||||
let msg = IndexMsg::CreateIndex {
|
||||
ret,
|
||||
uuid,
|
||||
primary_key,
|
||||
};
|
||||
let _ = self.read_sender.send(msg).await;
|
||||
receiver.await.expect("IndexActor has been killed")
|
||||
}
|
||||
|
||||
pub async fn update(
|
||||
&self,
|
||||
meta: Processing<UpdateMeta>,
|
||||
data: std::fs::File,
|
||||
) -> anyhow::Result<UpdateResult> {
|
||||
let (ret, receiver) = oneshot::channel();
|
||||
let msg = IndexMsg::Update { ret, meta, data };
|
||||
let _ = self.read_sender.send(msg).await;
|
||||
Ok(receiver.await.expect("IndexActor has been killed")?)
|
||||
}
|
||||
|
||||
pub async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result<SearchResult> {
|
||||
let (ret, receiver) = oneshot::channel();
|
||||
let msg = IndexMsg::Search { uuid, query, ret };
|
||||
let _ = self.read_sender.send(msg).await;
|
||||
Ok(receiver.await.expect("IndexActor has been killed")?)
|
||||
}
|
||||
|
||||
pub async fn settings(&self, uuid: Uuid) -> Result<Settings> {
|
||||
let (ret, receiver) = oneshot::channel();
|
||||
let msg = IndexMsg::Settings { uuid, ret };
|
||||
let _ = self.read_sender.send(msg).await;
|
||||
Ok(receiver.await.expect("IndexActor has been killed")?)
|
||||
}
|
||||
|
||||
pub async fn documents(
|
||||
&self,
|
||||
uuid: Uuid,
|
||||
offset: usize,
|
||||
limit: usize,
|
||||
attributes_to_retrieve: Option<Vec<String>>,
|
||||
) -> Result<Vec<Document>> {
|
||||
let (ret, receiver) = oneshot::channel();
|
||||
let msg = IndexMsg::Documents {
|
||||
uuid,
|
||||
ret,
|
||||
offset,
|
||||
attributes_to_retrieve,
|
||||
limit,
|
||||
};
|
||||
let _ = self.read_sender.send(msg).await;
|
||||
Ok(receiver.await.expect("IndexActor has been killed")?)
|
||||
}
|
||||
|
||||
pub async fn document(
|
||||
&self,
|
||||
uuid: Uuid,
|
||||
doc_id: String,
|
||||
attributes_to_retrieve: Option<Vec<String>>,
|
||||
) -> Result<Document> {
|
||||
let (ret, receiver) = oneshot::channel();
|
||||
let msg = IndexMsg::Document {
|
||||
uuid,
|
||||
ret,
|
||||
doc_id,
|
||||
attributes_to_retrieve,
|
||||
};
|
||||
let _ = self.read_sender.send(msg).await;
|
||||
Ok(receiver.await.expect("IndexActor has been killed")?)
|
||||
}
|
||||
|
||||
pub async fn delete(&self, uuid: Uuid) -> Result<()> {
|
||||
let (ret, receiver) = oneshot::channel();
|
||||
let msg = IndexMsg::Delete { uuid, ret };
|
||||
let _ = self.read_sender.send(msg).await;
|
||||
Ok(receiver.await.expect("IndexActor has been killed")?)
|
||||
}
|
||||
|
||||
pub async fn get_index_meta(&self, uuid: Uuid) -> Result<Option<IndexMeta>> {
|
||||
let (ret, receiver) = oneshot::channel();
|
||||
let msg = IndexMsg::GetMeta { uuid, ret };
|
||||
let _ = self.read_sender.send(msg).await;
|
||||
Ok(receiver.await.expect("IndexActor has been killed")?)
|
||||
}
|
||||
}
|
||||
|
||||
struct MapIndexStore {
|
||||
root: PathBuf,
|
||||
meta_store: AsyncMap<Uuid, IndexMeta>,
|
||||
index_store: AsyncMap<Uuid, Index>,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl IndexStore for MapIndexStore {
|
||||
async fn create_index(&self, uuid: Uuid, primary_key: Option<String>) -> Result<IndexMeta> {
|
||||
let meta = match self.meta_store.write().await.entry(uuid.clone()) {
|
||||
Entry::Vacant(entry) => {
|
||||
let now = Utc::now();
|
||||
let meta = IndexMeta {
|
||||
uuid,
|
||||
created_at: now.clone(),
|
||||
updated_at: now,
|
||||
primary_key,
|
||||
};
|
||||
entry.insert(meta).clone()
|
||||
}
|
||||
Entry::Occupied(_) => return Err(IndexError::IndexAlreadyExists),
|
||||
};
|
||||
|
||||
let db_path = self.root.join(format!("index-{}", meta.uuid));
|
||||
|
||||
let index: Result<Index> = tokio::task::spawn_blocking(move || {
|
||||
create_dir_all(&db_path).expect("can't create db");
|
||||
let mut options = EnvOpenOptions::new();
|
||||
options.map_size(4096 * 100_000);
|
||||
let index = milli::Index::new(options, &db_path).map_err(|e| IndexError::Error(e))?;
|
||||
let index = Index(Arc::new(index));
|
||||
Ok(index)
|
||||
})
|
||||
.await
|
||||
.expect("thread died");
|
||||
|
||||
self.index_store
|
||||
.write()
|
||||
.await
|
||||
.insert(meta.uuid.clone(), index?);
|
||||
|
||||
Ok(meta)
|
||||
}
|
||||
|
||||
async fn get_or_create(&self, uuid: Uuid, primary_key: Option<String>) -> Result<Index> {
|
||||
match self.index_store.write().await.entry(uuid.clone()) {
|
||||
Entry::Vacant(index_entry) => match self.meta_store.write().await.entry(uuid.clone()) {
|
||||
Entry::Vacant(meta_entry) => {
|
||||
let now = Utc::now();
|
||||
let meta = IndexMeta {
|
||||
uuid,
|
||||
created_at: now.clone(),
|
||||
updated_at: now,
|
||||
primary_key,
|
||||
};
|
||||
let meta = meta_entry.insert(meta);
|
||||
let db_path = self.root.join(format!("index-{}", meta.uuid));
|
||||
|
||||
let index: Result<Index> = tokio::task::spawn_blocking(move || {
|
||||
create_dir_all(&db_path).expect("can't create db");
|
||||
let mut options = EnvOpenOptions::new();
|
||||
options.map_size(4096 * 100_000);
|
||||
let index = milli::Index::new(options, &db_path)
|
||||
.map_err(|e| IndexError::Error(e))?;
|
||||
let index = Index(Arc::new(index));
|
||||
Ok(index)
|
||||
})
|
||||
.await
|
||||
.expect("thread died");
|
||||
|
||||
Ok(index_entry.insert(index?).clone())
|
||||
}
|
||||
Entry::Occupied(entry) => {
|
||||
let meta = entry.get();
|
||||
let db_path = self.root.join(format!("index-{}", meta.uuid));
|
||||
|
||||
let index: Result<Index> = tokio::task::spawn_blocking(move || {
|
||||
create_dir_all(&db_path).expect("can't create db");
|
||||
let mut options = EnvOpenOptions::new();
|
||||
options.map_size(4096 * 100_000);
|
||||
let index = milli::Index::new(options, &db_path)
|
||||
.map_err(|e| IndexError::Error(e))?;
|
||||
let index = Index(Arc::new(index));
|
||||
Ok(index)
|
||||
})
|
||||
.await
|
||||
.expect("thread died");
|
||||
|
||||
Ok(index_entry.insert(index?).clone())
|
||||
}
|
||||
},
|
||||
Entry::Occupied(entry) => Ok(entry.get().clone()),
|
||||
}
|
||||
}
|
||||
|
||||
async fn get(&self, uuid: Uuid) -> Result<Option<Index>> {
|
||||
Ok(self.index_store.read().await.get(&uuid).cloned())
|
||||
}
|
||||
|
||||
async fn delete(&self, uuid: &Uuid) -> Result<Option<Index>> {
|
||||
let index = self.index_store.write().await.remove(&uuid);
|
||||
if index.is_some() {
|
||||
let db_path = self.root.join(format!("index-{}", uuid));
|
||||
remove_dir_all(db_path).unwrap();
|
||||
}
|
||||
Ok(index)
|
||||
}
|
||||
|
||||
async fn get_meta(&self, uuid: &Uuid) -> Result<Option<IndexMeta>> {
|
||||
Ok(self.meta_store.read().await.get(uuid).cloned())
|
||||
}
|
||||
|
||||
async fn update_index<R, F>(&self, uuid: Uuid, f: F) -> Result<R>
|
||||
where
|
||||
F: FnOnce(Index) -> Result<R> + Send + Sync + 'static,
|
||||
R: Sync + Send + 'static,
|
||||
{
|
||||
let index = self.get_or_create(uuid.clone(), None).await?;
|
||||
let mut meta = self
|
||||
.get_meta(&uuid)
|
||||
.await?
|
||||
.ok_or(IndexError::UnexistingIndex)?;
|
||||
match f(index) {
|
||||
Ok(r) => {
|
||||
meta.updated_at = Utc::now();
|
||||
self.meta_store.write().await.insert(uuid, meta);
|
||||
Ok(r)
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl MapIndexStore {
|
||||
fn new(root: impl AsRef<Path>) -> Self {
|
||||
let mut root = root.as_ref().to_owned();
|
||||
root.push("indexes/");
|
||||
let meta_store = Arc::new(RwLock::new(HashMap::new()));
|
||||
let index_store = Arc::new(RwLock::new(HashMap::new()));
|
||||
Self {
|
||||
meta_store,
|
||||
index_store,
|
||||
root,
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,607 @@
|
||||
use std::fs::{create_dir_all, remove_dir_all};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
use chrono::{DateTime, Utc};
|
||||
use dashmap::{mapref::entry::Entry, DashMap};
|
||||
use heed::{
|
||||
types::{ByteSlice, SerdeJson, Str},
|
||||
Database, Env, EnvOpenOptions, RoTxn, RwTxn,
|
||||
};
|
||||
use log::{error, info};
|
||||
use milli::Index;
|
||||
use rayon::ThreadPool;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::update_handler::UpdateHandler;
|
||||
use super::{UpdateMeta, UpdateResult};
|
||||
use crate::option::IndexerOpts;
|
||||
|
||||
type UpdateStore = super::update_store::UpdateStore<UpdateMeta, UpdateResult, String>;
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, PartialEq)]
|
||||
pub struct IndexMeta {
|
||||
update_store_size: u64,
|
||||
index_store_size: u64,
|
||||
pub uuid: Uuid,
|
||||
pub created_at: DateTime<Utc>,
|
||||
pub updated_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl IndexMeta {
|
||||
fn open(
|
||||
&self,
|
||||
path: impl AsRef<Path>,
|
||||
thread_pool: Arc<ThreadPool>,
|
||||
indexer_options: &IndexerOpts,
|
||||
) -> anyhow::Result<(Arc<Index>, Arc<UpdateStore>)> {
|
||||
let update_path = make_update_db_path(&path, &self.uuid);
|
||||
let index_path = make_index_db_path(&path, &self.uuid);
|
||||
|
||||
create_dir_all(&update_path)?;
|
||||
create_dir_all(&index_path)?;
|
||||
|
||||
let mut options = EnvOpenOptions::new();
|
||||
options.map_size(self.index_store_size as usize);
|
||||
let index = Arc::new(Index::new(options, index_path)?);
|
||||
|
||||
let mut options = EnvOpenOptions::new();
|
||||
options.map_size(self.update_store_size as usize);
|
||||
let handler = UpdateHandler::new(indexer_options, index.clone(), thread_pool)?;
|
||||
let update_store = UpdateStore::open(options, update_path, handler)?;
|
||||
|
||||
Ok((index, update_store))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct IndexStore {
|
||||
env: Env,
|
||||
name_to_uuid: Database<Str, ByteSlice>,
|
||||
uuid_to_index: DashMap<Uuid, (Arc<Index>, Arc<UpdateStore>)>,
|
||||
uuid_to_index_meta: Database<ByteSlice, SerdeJson<IndexMeta>>,
|
||||
|
||||
thread_pool: Arc<ThreadPool>,
|
||||
indexer_options: IndexerOpts,
|
||||
}
|
||||
|
||||
impl IndexStore {
|
||||
pub fn new(path: impl AsRef<Path>, indexer_options: IndexerOpts) -> anyhow::Result<Self> {
|
||||
let env = EnvOpenOptions::new()
|
||||
.map_size(4096 * 100)
|
||||
.max_dbs(2)
|
||||
.open(path)?;
|
||||
|
||||
let uuid_to_index = DashMap::new();
|
||||
let name_to_uuid = open_or_create_database(&env, Some("name_to_uid"))?;
|
||||
let uuid_to_index_meta = open_or_create_database(&env, Some("uid_to_index_db"))?;
|
||||
|
||||
let thread_pool = rayon::ThreadPoolBuilder::new()
|
||||
.num_threads(indexer_options.indexing_jobs.unwrap_or(0))
|
||||
.build()?;
|
||||
let thread_pool = Arc::new(thread_pool);
|
||||
|
||||
Ok(Self {
|
||||
env,
|
||||
name_to_uuid,
|
||||
uuid_to_index,
|
||||
uuid_to_index_meta,
|
||||
|
||||
thread_pool,
|
||||
indexer_options,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn delete(&self, index_uid: impl AsRef<str>) -> anyhow::Result<()> {
|
||||
// we remove the references to the index from the index map so it is not accessible anymore
|
||||
let mut txn = self.env.write_txn()?;
|
||||
let uuid = self
|
||||
.index_uuid(&txn, &index_uid)?
|
||||
.with_context(|| format!("Index {:?} doesn't exist", index_uid.as_ref()))?;
|
||||
self.name_to_uuid.delete(&mut txn, index_uid.as_ref())?;
|
||||
self.uuid_to_index_meta.delete(&mut txn, uuid.as_bytes())?;
|
||||
txn.commit()?;
|
||||
// If the index was loaded (i.e it is present in the uuid_to_index map), then we need to
|
||||
// close it. The process goes as follow:
|
||||
//
|
||||
// 1) We want to remove any pending updates from the store.
|
||||
// 2) We try to get ownership on the update store so we can close it. It may take a
|
||||
// couple of tries, but since the update store event loop only has a weak reference to
|
||||
// itself, and we are the only other function holding a reference to it otherwise, we will
|
||||
// get it eventually.
|
||||
// 3) We request a closing of the update store.
|
||||
// 4) We can take ownership on the index, and close it.
|
||||
// 5) We remove all the files from the file system.
|
||||
let index_uid = index_uid.as_ref().to_string();
|
||||
let path = self.env.path().to_owned();
|
||||
if let Some((_, (index, updates))) = self.uuid_to_index.remove(&uuid) {
|
||||
std::thread::spawn(move || {
|
||||
info!("Preparing for {:?} deletion.", index_uid);
|
||||
// this error is non fatal, but may delay the deletion.
|
||||
if let Err(e) = updates.abort_pendings() {
|
||||
error!(
|
||||
"error aborting pending updates when deleting index {:?}: {}",
|
||||
index_uid, e
|
||||
);
|
||||
}
|
||||
let updates = get_arc_ownership_blocking(updates);
|
||||
let close_event = updates.prepare_for_closing();
|
||||
close_event.wait();
|
||||
info!("closed update store for {:?}", index_uid);
|
||||
|
||||
let index = get_arc_ownership_blocking(index);
|
||||
let close_event = index.prepare_for_closing();
|
||||
close_event.wait();
|
||||
|
||||
let update_path = make_update_db_path(&path, &uuid);
|
||||
let index_path = make_index_db_path(&path, &uuid);
|
||||
|
||||
if let Err(e) = remove_dir_all(index_path) {
|
||||
error!("error removing index {:?}: {}", index_uid, e);
|
||||
}
|
||||
|
||||
if let Err(e) = remove_dir_all(update_path) {
|
||||
error!("error removing index {:?}: {}", index_uid, e);
|
||||
}
|
||||
|
||||
info!("index {:?} deleted.", index_uid);
|
||||
});
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn index_uuid(&self, txn: &RoTxn, name: impl AsRef<str>) -> anyhow::Result<Option<Uuid>> {
|
||||
match self.name_to_uuid.get(txn, name.as_ref())? {
|
||||
Some(bytes) => {
|
||||
let uuid = Uuid::from_slice(bytes)?;
|
||||
Ok(Some(uuid))
|
||||
}
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
fn retrieve_index(
|
||||
&self,
|
||||
txn: &RoTxn,
|
||||
uid: Uuid,
|
||||
) -> anyhow::Result<Option<(Arc<Index>, Arc<UpdateStore>)>> {
|
||||
match self.uuid_to_index.entry(uid.clone()) {
|
||||
Entry::Vacant(entry) => match self.uuid_to_index_meta.get(txn, uid.as_bytes())? {
|
||||
Some(meta) => {
|
||||
let path = self.env.path();
|
||||
let (index, updates) =
|
||||
meta.open(path, self.thread_pool.clone(), &self.indexer_options)?;
|
||||
entry.insert((index.clone(), updates.clone()));
|
||||
Ok(Some((index, updates)))
|
||||
}
|
||||
None => Ok(None),
|
||||
},
|
||||
Entry::Occupied(entry) => {
|
||||
let (index, updates) = entry.get();
|
||||
Ok(Some((index.clone(), updates.clone())))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn get_index_txn(
|
||||
&self,
|
||||
txn: &RoTxn,
|
||||
name: impl AsRef<str>,
|
||||
) -> anyhow::Result<Option<(Arc<Index>, Arc<UpdateStore>)>> {
|
||||
match self.index_uuid(&txn, name)? {
|
||||
Some(uid) => self.retrieve_index(&txn, uid),
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn index(
|
||||
&self,
|
||||
name: impl AsRef<str>,
|
||||
) -> anyhow::Result<Option<(Arc<Index>, Arc<UpdateStore>)>> {
|
||||
let txn = self.env.read_txn()?;
|
||||
self.get_index_txn(&txn, name)
|
||||
}
|
||||
|
||||
/// Use this function to perform an update on an index.
|
||||
/// This function also puts a lock on what index is allowed to perform an update.
|
||||
pub fn update_index<F, T>(&self, name: impl AsRef<str>, f: F) -> anyhow::Result<(T, IndexMeta)>
|
||||
where
|
||||
F: FnOnce(&Index) -> anyhow::Result<T>,
|
||||
{
|
||||
let mut txn = self.env.write_txn()?;
|
||||
let (index, _) = self
|
||||
.get_index_txn(&txn, &name)?
|
||||
.with_context(|| format!("Index {:?} doesn't exist", name.as_ref()))?;
|
||||
let result = f(index.as_ref());
|
||||
match result {
|
||||
Ok(ret) => {
|
||||
let meta = self.update_meta(&mut txn, name, |meta| meta.updated_at = Utc::now())?;
|
||||
txn.commit()?;
|
||||
Ok((ret, meta))
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn index_with_meta(
|
||||
&self,
|
||||
name: impl AsRef<str>,
|
||||
) -> anyhow::Result<Option<(Arc<Index>, IndexMeta)>> {
|
||||
let txn = self.env.read_txn()?;
|
||||
let uuid = self.index_uuid(&txn, &name)?;
|
||||
match uuid {
|
||||
Some(uuid) => {
|
||||
let meta = self
|
||||
.uuid_to_index_meta
|
||||
.get(&txn, uuid.as_bytes())?
|
||||
.with_context(|| {
|
||||
format!("unable to retrieve metadata for index {:?}", name.as_ref())
|
||||
})?;
|
||||
let (index, _) = self
|
||||
.retrieve_index(&txn, uuid)?
|
||||
.with_context(|| format!("unable to retrieve index {:?}", name.as_ref()))?;
|
||||
Ok(Some((index, meta)))
|
||||
}
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
fn update_meta<F>(
|
||||
&self,
|
||||
txn: &mut RwTxn,
|
||||
name: impl AsRef<str>,
|
||||
f: F,
|
||||
) -> anyhow::Result<IndexMeta>
|
||||
where
|
||||
F: FnOnce(&mut IndexMeta),
|
||||
{
|
||||
let uuid = self
|
||||
.index_uuid(txn, &name)?
|
||||
.with_context(|| format!("Index {:?} doesn't exist", name.as_ref()))?;
|
||||
let mut meta = self
|
||||
.uuid_to_index_meta
|
||||
.get(txn, uuid.as_bytes())?
|
||||
.with_context(|| format!("couldn't retrieve metadata for index {:?}", name.as_ref()))?;
|
||||
f(&mut meta);
|
||||
self.uuid_to_index_meta.put(txn, uuid.as_bytes(), &meta)?;
|
||||
Ok(meta)
|
||||
}
|
||||
|
||||
pub fn get_or_create_index(
|
||||
&self,
|
||||
name: impl AsRef<str>,
|
||||
update_size: u64,
|
||||
index_size: u64,
|
||||
) -> anyhow::Result<(Arc<Index>, Arc<UpdateStore>)> {
|
||||
let mut txn = self.env.write_txn()?;
|
||||
match self.get_index_txn(&txn, name.as_ref())? {
|
||||
Some(res) => Ok(res),
|
||||
None => {
|
||||
let uuid = Uuid::new_v4();
|
||||
let (index, updates, _) =
|
||||
self.create_index_txn(&mut txn, uuid, name, update_size, index_size)?;
|
||||
// If we fail to commit the transaction, we must delete the database from the
|
||||
// file-system.
|
||||
if let Err(e) = txn.commit() {
|
||||
self.clean_db(uuid);
|
||||
return Err(e)?;
|
||||
}
|
||||
Ok((index, updates))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Remove all the files and data associated with a db uuid.
|
||||
fn clean_db(&self, uuid: Uuid) {
|
||||
let update_db_path = make_update_db_path(self.env.path(), &uuid);
|
||||
let index_db_path = make_index_db_path(self.env.path(), &uuid);
|
||||
|
||||
remove_dir_all(update_db_path).expect("Failed to clean database");
|
||||
remove_dir_all(index_db_path).expect("Failed to clean database");
|
||||
|
||||
self.uuid_to_index.remove(&uuid);
|
||||
}
|
||||
|
||||
fn create_index_txn(
|
||||
&self,
|
||||
txn: &mut RwTxn,
|
||||
uuid: Uuid,
|
||||
name: impl AsRef<str>,
|
||||
update_store_size: u64,
|
||||
index_store_size: u64,
|
||||
) -> anyhow::Result<(Arc<Index>, Arc<UpdateStore>, IndexMeta)> {
|
||||
let created_at = Utc::now();
|
||||
let updated_at = created_at;
|
||||
let meta = IndexMeta {
|
||||
update_store_size,
|
||||
index_store_size,
|
||||
uuid: uuid.clone(),
|
||||
created_at,
|
||||
updated_at,
|
||||
};
|
||||
|
||||
self.name_to_uuid.put(txn, name.as_ref(), uuid.as_bytes())?;
|
||||
self.uuid_to_index_meta.put(txn, uuid.as_bytes(), &meta)?;
|
||||
|
||||
let path = self.env.path();
|
||||
let (index, update_store) =
|
||||
match meta.open(path, self.thread_pool.clone(), &self.indexer_options) {
|
||||
Ok(res) => res,
|
||||
Err(e) => {
|
||||
self.clean_db(uuid);
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
|
||||
self.uuid_to_index
|
||||
.insert(uuid, (index.clone(), update_store.clone()));
|
||||
|
||||
Ok((index, update_store, meta))
|
||||
}
|
||||
|
||||
/// Same as `get_or_create`, but returns an error if the index already exists.
|
||||
pub fn create_index(
|
||||
&self,
|
||||
name: impl AsRef<str>,
|
||||
update_size: u64,
|
||||
index_size: u64,
|
||||
) -> anyhow::Result<(Arc<Index>, Arc<UpdateStore>, IndexMeta)> {
|
||||
let uuid = Uuid::new_v4();
|
||||
let mut txn = self.env.write_txn()?;
|
||||
|
||||
if self.name_to_uuid.get(&txn, name.as_ref())?.is_some() {
|
||||
bail!("index {:?} already exists", name.as_ref())
|
||||
}
|
||||
|
||||
let result = self.create_index_txn(&mut txn, uuid, name, update_size, index_size)?;
|
||||
// If we fail to commit the transaction, we must delete the database from the
|
||||
// file-system.
|
||||
if let Err(e) = txn.commit() {
|
||||
self.clean_db(uuid);
|
||||
return Err(e)?;
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Returns each index associated with its metadata:
|
||||
/// (index_name, IndexMeta, primary_key)
|
||||
/// This method will force all the indexes to be loaded.
|
||||
pub fn list_indexes(&self) -> anyhow::Result<Vec<(String, IndexMeta, Option<String>)>> {
|
||||
let txn = self.env.read_txn()?;
|
||||
let metas = self.name_to_uuid.iter(&txn)?.filter_map(|entry| {
|
||||
entry
|
||||
.map_err(|e| {
|
||||
error!("error decoding entry while listing indexes: {}", e);
|
||||
e
|
||||
})
|
||||
.ok()
|
||||
});
|
||||
let mut indexes = Vec::new();
|
||||
for (name, uuid) in metas {
|
||||
// get index to retrieve primary key
|
||||
let (index, _) = self
|
||||
.get_index_txn(&txn, name)?
|
||||
.with_context(|| format!("could not load index {:?}", name))?;
|
||||
let primary_key = index.primary_key(&index.read_txn()?)?.map(String::from);
|
||||
// retieve meta
|
||||
let meta = self
|
||||
.uuid_to_index_meta
|
||||
.get(&txn, &uuid)?
|
||||
.with_context(|| format!("could not retieve meta for index {:?}", name))?;
|
||||
indexes.push((name.to_owned(), meta, primary_key));
|
||||
}
|
||||
Ok(indexes)
|
||||
}
|
||||
}
|
||||
|
||||
// Loops on an arc to get ownership on the wrapped value. This method sleeps 100ms before retrying.
|
||||
fn get_arc_ownership_blocking<T>(mut item: Arc<T>) -> T {
|
||||
loop {
|
||||
match Arc::try_unwrap(item) {
|
||||
Ok(item) => return item,
|
||||
Err(item_arc) => {
|
||||
item = item_arc;
|
||||
std::thread::sleep(Duration::from_millis(100));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn open_or_create_database<K: 'static, V: 'static>(
|
||||
env: &Env,
|
||||
name: Option<&str>,
|
||||
) -> anyhow::Result<Database<K, V>> {
|
||||
match env.open_database::<K, V>(name)? {
|
||||
Some(db) => Ok(db),
|
||||
None => Ok(env.create_database::<K, V>(name)?),
|
||||
}
|
||||
}
|
||||
|
||||
fn make_update_db_path(path: impl AsRef<Path>, uuid: &Uuid) -> PathBuf {
|
||||
let mut path = path.as_ref().to_path_buf();
|
||||
path.push(format!("update{}", uuid));
|
||||
path
|
||||
}
|
||||
|
||||
fn make_index_db_path(path: impl AsRef<Path>, uuid: &Uuid) -> PathBuf {
|
||||
let mut path = path.as_ref().to_path_buf();
|
||||
path.push(format!("index{}", uuid));
|
||||
path
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use std::path::PathBuf;
|
||||
|
||||
#[test]
|
||||
fn test_make_update_db_path() {
|
||||
let uuid = Uuid::new_v4();
|
||||
assert_eq!(
|
||||
make_update_db_path("/home", &uuid),
|
||||
PathBuf::from(format!("/home/update{}", uuid))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_make_index_db_path() {
|
||||
let uuid = Uuid::new_v4();
|
||||
assert_eq!(
|
||||
make_index_db_path("/home", &uuid),
|
||||
PathBuf::from(format!("/home/index{}", uuid))
|
||||
);
|
||||
}
|
||||
|
||||
mod index_store {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_index_uuid() {
|
||||
let temp = tempfile::tempdir().unwrap();
|
||||
let store = IndexStore::new(temp, IndexerOpts::default()).unwrap();
|
||||
|
||||
let name = "foobar";
|
||||
let txn = store.env.read_txn().unwrap();
|
||||
// name is not found if the uuid in not present in the db
|
||||
assert!(store.index_uuid(&txn, &name).unwrap().is_none());
|
||||
drop(txn);
|
||||
|
||||
// insert an uuid in the the name_to_uuid_db:
|
||||
let uuid = Uuid::new_v4();
|
||||
let mut txn = store.env.write_txn().unwrap();
|
||||
store
|
||||
.name_to_uuid
|
||||
.put(&mut txn, &name, uuid.as_bytes())
|
||||
.unwrap();
|
||||
txn.commit().unwrap();
|
||||
|
||||
// check that the uuid is there
|
||||
let txn = store.env.read_txn().unwrap();
|
||||
assert_eq!(store.index_uuid(&txn, &name).unwrap(), Some(uuid));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_retrieve_index() {
|
||||
let temp = tempfile::tempdir().unwrap();
|
||||
let store = IndexStore::new(temp, IndexerOpts::default()).unwrap();
|
||||
let uuid = Uuid::new_v4();
|
||||
|
||||
let txn = store.env.read_txn().unwrap();
|
||||
assert!(store.retrieve_index(&txn, uuid).unwrap().is_none());
|
||||
|
||||
let created_at = Utc::now();
|
||||
let updated_at = created_at;
|
||||
|
||||
let meta = IndexMeta {
|
||||
update_store_size: 4096 * 100,
|
||||
index_store_size: 4096 * 100,
|
||||
uuid: uuid.clone(),
|
||||
created_at,
|
||||
updated_at,
|
||||
};
|
||||
let mut txn = store.env.write_txn().unwrap();
|
||||
store
|
||||
.uuid_to_index_meta
|
||||
.put(&mut txn, uuid.as_bytes(), &meta)
|
||||
.unwrap();
|
||||
txn.commit().unwrap();
|
||||
|
||||
// the index cache should be empty
|
||||
assert!(store.uuid_to_index.is_empty());
|
||||
|
||||
let txn = store.env.read_txn().unwrap();
|
||||
assert!(store.retrieve_index(&txn, uuid).unwrap().is_some());
|
||||
assert_eq!(store.uuid_to_index.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_index() {
|
||||
let temp = tempfile::tempdir().unwrap();
|
||||
let store = IndexStore::new(temp, IndexerOpts::default()).unwrap();
|
||||
let name = "foobar";
|
||||
|
||||
assert!(store.index(&name).unwrap().is_none());
|
||||
|
||||
let created_at = Utc::now();
|
||||
let updated_at = created_at;
|
||||
|
||||
let uuid = Uuid::new_v4();
|
||||
let meta = IndexMeta {
|
||||
update_store_size: 4096 * 100,
|
||||
index_store_size: 4096 * 100,
|
||||
uuid: uuid.clone(),
|
||||
created_at,
|
||||
updated_at,
|
||||
};
|
||||
let mut txn = store.env.write_txn().unwrap();
|
||||
store
|
||||
.name_to_uuid
|
||||
.put(&mut txn, &name, uuid.as_bytes())
|
||||
.unwrap();
|
||||
store
|
||||
.uuid_to_index_meta
|
||||
.put(&mut txn, uuid.as_bytes(), &meta)
|
||||
.unwrap();
|
||||
txn.commit().unwrap();
|
||||
|
||||
assert!(store.index(&name).unwrap().is_some());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_or_create_index() {
|
||||
let temp = tempfile::tempdir().unwrap();
|
||||
let store = IndexStore::new(temp, IndexerOpts::default()).unwrap();
|
||||
let name = "foobar";
|
||||
|
||||
let update_store_size = 4096 * 100;
|
||||
let index_store_size = 4096 * 100;
|
||||
store
|
||||
.get_or_create_index(&name, update_store_size, index_store_size)
|
||||
.unwrap();
|
||||
let txn = store.env.read_txn().unwrap();
|
||||
let uuid = store.name_to_uuid.get(&txn, &name).unwrap();
|
||||
assert_eq!(store.uuid_to_index.len(), 1);
|
||||
assert!(uuid.is_some());
|
||||
let uuid = Uuid::from_slice(uuid.unwrap()).unwrap();
|
||||
let meta = store
|
||||
.uuid_to_index_meta
|
||||
.get(&txn, uuid.as_bytes())
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(meta.update_store_size, update_store_size);
|
||||
assert_eq!(meta.index_store_size, index_store_size);
|
||||
assert_eq!(meta.uuid, uuid);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_create_index() {
|
||||
let temp = tempfile::tempdir().unwrap();
|
||||
let store = IndexStore::new(temp, IndexerOpts::default()).unwrap();
|
||||
let name = "foobar";
|
||||
|
||||
let update_store_size = 4096 * 100;
|
||||
let index_store_size = 4096 * 100;
|
||||
let uuid = Uuid::new_v4();
|
||||
let mut txn = store.env.write_txn().unwrap();
|
||||
store
|
||||
.create_index_txn(&mut txn, uuid, name, update_store_size, index_store_size)
|
||||
.unwrap();
|
||||
let uuid = store.name_to_uuid.get(&txn, &name).unwrap();
|
||||
assert_eq!(store.uuid_to_index.len(), 1);
|
||||
assert!(uuid.is_some());
|
||||
let uuid = Uuid::from_slice(uuid.unwrap()).unwrap();
|
||||
let meta = store
|
||||
.uuid_to_index_meta
|
||||
.get(&txn, uuid.as_bytes())
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(meta.update_store_size, update_store_size);
|
||||
assert_eq!(meta.index_store_size, index_store_size);
|
||||
assert_eq!(meta.uuid, uuid);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,228 @@
|
||||
mod update_store;
|
||||
mod index_store;
|
||||
mod update_handler;
|
||||
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
use itertools::Itertools;
|
||||
use milli::Index;
|
||||
|
||||
use crate::option::IndexerOpts;
|
||||
use index_store::IndexStore;
|
||||
use super::IndexController;
|
||||
use super::updates::UpdateStatus;
|
||||
use super::{UpdateMeta, UpdateResult, IndexMetadata, IndexSettings};
|
||||
|
||||
pub struct LocalIndexController {
|
||||
indexes: IndexStore,
|
||||
update_db_size: u64,
|
||||
index_db_size: u64,
|
||||
}
|
||||
|
||||
impl LocalIndexController {
|
||||
pub fn new(
|
||||
path: impl AsRef<Path>,
|
||||
opt: IndexerOpts,
|
||||
index_db_size: u64,
|
||||
update_db_size: u64,
|
||||
) -> anyhow::Result<Self> {
|
||||
let indexes = IndexStore::new(path, opt)?;
|
||||
Ok(Self { indexes, index_db_size, update_db_size })
|
||||
}
|
||||
}
|
||||
|
||||
impl IndexController for LocalIndexController {
|
||||
fn add_documents<S: AsRef<str>>(
|
||||
&self,
|
||||
index: S,
|
||||
method: milli::update::IndexDocumentsMethod,
|
||||
format: milli::update::UpdateFormat,
|
||||
data: &[u8],
|
||||
primary_key: Option<String>,
|
||||
) -> anyhow::Result<UpdateStatus<UpdateMeta, UpdateResult, String>> {
|
||||
let (_, update_store) = self.indexes.get_or_create_index(&index, self.update_db_size, self.index_db_size)?;
|
||||
let meta = UpdateMeta::DocumentsAddition { method, format, primary_key };
|
||||
let pending = update_store.register_update(meta, data)?;
|
||||
Ok(pending.into())
|
||||
}
|
||||
|
||||
fn update_settings<S: AsRef<str>>(
|
||||
&self,
|
||||
index: S,
|
||||
settings: super::Settings
|
||||
) -> anyhow::Result<UpdateStatus<UpdateMeta, UpdateResult, String>> {
|
||||
let (_, update_store) = self.indexes.get_or_create_index(&index, self.update_db_size, self.index_db_size)?;
|
||||
let meta = UpdateMeta::Settings(settings);
|
||||
let pending = update_store.register_update(meta, &[])?;
|
||||
Ok(pending.into())
|
||||
}
|
||||
|
||||
fn create_index(&self, index_settings: IndexSettings) -> anyhow::Result<IndexMetadata> {
|
||||
let index_name = index_settings.name.context("Missing name for index")?;
|
||||
let (index, _, meta) = self.indexes.create_index(&index_name, self.update_db_size, self.index_db_size)?;
|
||||
if let Some(ref primary_key) = index_settings.primary_key {
|
||||
if let Err(e) = update_primary_key(index, primary_key).context("error creating index") {
|
||||
// TODO: creating index could not be completed, delete everything.
|
||||
Err(e)?
|
||||
}
|
||||
}
|
||||
|
||||
let meta = IndexMetadata {
|
||||
uid: index_name,
|
||||
uuid: meta.uuid.clone(),
|
||||
created_at: meta.created_at,
|
||||
updated_at: meta.created_at,
|
||||
primary_key: index_settings.primary_key,
|
||||
};
|
||||
|
||||
Ok(meta)
|
||||
}
|
||||
|
||||
fn delete_index<S: AsRef<str>>(&self, index_uid: S) -> anyhow::Result<()> {
|
||||
self.indexes.delete(index_uid)
|
||||
}
|
||||
|
||||
fn swap_indices<S1: AsRef<str>, S2: AsRef<str>>(&self, _index1_uid: S1, _index2_uid: S2) -> anyhow::Result<()> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn index(&self, name: impl AsRef<str>) -> anyhow::Result<Option<Arc<Index>>> {
|
||||
let index = self.indexes.index(name)?.map(|(i, _)| i);
|
||||
Ok(index)
|
||||
}
|
||||
|
||||
fn update_status(&self, index: impl AsRef<str>, id: u64) -> anyhow::Result<Option<UpdateStatus<UpdateMeta, UpdateResult, String>>> {
|
||||
match self.indexes.index(&index)? {
|
||||
Some((_, update_store)) => Ok(update_store.meta(id)?),
|
||||
None => bail!("index {:?} doesn't exist", index.as_ref()),
|
||||
}
|
||||
}
|
||||
|
||||
fn all_update_status(&self, index: impl AsRef<str>) -> anyhow::Result<Vec<UpdateStatus<UpdateMeta, UpdateResult, String>>> {
|
||||
match self.indexes.index(&index)? {
|
||||
Some((_, update_store)) => {
|
||||
let updates = update_store.iter_metas(|processing, processed, pending, aborted, failed| {
|
||||
Ok(processing
|
||||
.map(UpdateStatus::from)
|
||||
.into_iter()
|
||||
.chain(pending.filter_map(|p| p.ok()).map(|(_, u)| UpdateStatus::from(u)))
|
||||
.chain(aborted.filter_map(Result::ok).map(|(_, u)| UpdateStatus::from(u)))
|
||||
.chain(processed.filter_map(Result::ok).map(|(_, u)| UpdateStatus::from(u)))
|
||||
.chain(failed.filter_map(Result::ok).map(|(_, u)| UpdateStatus::from(u)))
|
||||
.sorted_by(|a, b| a.id().cmp(&b.id()))
|
||||
.collect())
|
||||
})?;
|
||||
Ok(updates)
|
||||
}
|
||||
None => bail!("index {} doesn't exist.", index.as_ref()),
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
fn list_indexes(&self) -> anyhow::Result<Vec<IndexMetadata>> {
|
||||
let metas = self.indexes.list_indexes()?;
|
||||
let mut output_meta = Vec::new();
|
||||
for (uid, meta, primary_key) in metas {
|
||||
let created_at = meta.created_at;
|
||||
let uuid = meta.uuid;
|
||||
let updated_at = self
|
||||
.all_update_status(&uid)?
|
||||
.iter()
|
||||
.filter_map(|u| u.processed().map(|u| u.processed_at))
|
||||
.max()
|
||||
.unwrap_or(created_at);
|
||||
|
||||
let index_meta = IndexMetadata {
|
||||
uid,
|
||||
created_at,
|
||||
updated_at,
|
||||
uuid,
|
||||
primary_key,
|
||||
};
|
||||
output_meta.push(index_meta);
|
||||
}
|
||||
Ok(output_meta)
|
||||
}
|
||||
|
||||
fn update_index(&self, uid: impl AsRef<str>, index_settings: IndexSettings) -> anyhow::Result<IndexMetadata> {
|
||||
if index_settings.name.is_some() {
|
||||
bail!("can't udpate an index name.")
|
||||
}
|
||||
|
||||
let (primary_key, meta) = match index_settings.primary_key {
|
||||
Some(ref primary_key) => {
|
||||
self.indexes
|
||||
.update_index(&uid, |index| {
|
||||
let mut txn = index.write_txn()?;
|
||||
if index.primary_key(&txn)?.is_some() {
|
||||
bail!("primary key already exists.")
|
||||
}
|
||||
index.put_primary_key(&mut txn, primary_key)?;
|
||||
txn.commit()?;
|
||||
Ok(Some(primary_key.clone()))
|
||||
})?
|
||||
},
|
||||
None => {
|
||||
let (index, meta) = self.indexes
|
||||
.index_with_meta(&uid)?
|
||||
.with_context(|| format!("index {:?} doesn't exist.", uid.as_ref()))?;
|
||||
let primary_key = index
|
||||
.primary_key(&index.read_txn()?)?
|
||||
.map(String::from);
|
||||
(primary_key, meta)
|
||||
},
|
||||
};
|
||||
|
||||
Ok(IndexMetadata {
|
||||
uid: uid.as_ref().to_string(),
|
||||
uuid: meta.uuid.clone(),
|
||||
created_at: meta.created_at,
|
||||
updated_at: meta.updated_at,
|
||||
primary_key,
|
||||
})
|
||||
}
|
||||
|
||||
fn clear_documents(&self, index: impl AsRef<str>) -> anyhow::Result<super::UpdateStatus> {
|
||||
let (_, update_store) = self.indexes.index(&index)?
|
||||
.with_context(|| format!("Index {:?} doesn't exist", index.as_ref()))?;
|
||||
let meta = UpdateMeta::ClearDocuments;
|
||||
let pending = update_store.register_update(meta, &[])?;
|
||||
Ok(pending.into())
|
||||
}
|
||||
|
||||
fn delete_documents(&self, index: impl AsRef<str>, document_ids: Vec<String>) -> anyhow::Result<super::UpdateStatus> {
|
||||
let (_, update_store) = self.indexes.index(&index)?
|
||||
.with_context(|| format!("Index {:?} doesn't exist", index.as_ref()))?;
|
||||
let meta = UpdateMeta::DeleteDocuments;
|
||||
let content = serde_json::to_vec(&document_ids)?;
|
||||
let pending = update_store.register_update(meta, &content)?;
|
||||
Ok(pending.into())
|
||||
}
|
||||
}
|
||||
|
||||
fn update_primary_key(index: impl AsRef<Index>, primary_key: impl AsRef<str>) -> anyhow::Result<()> {
|
||||
let index = index.as_ref();
|
||||
let mut txn = index.write_txn()?;
|
||||
if index.primary_key(&txn)?.is_some() {
|
||||
bail!("primary key already set.")
|
||||
}
|
||||
index.put_primary_key(&mut txn, primary_key.as_ref())?;
|
||||
txn.commit()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use tempfile::tempdir;
|
||||
use crate::make_index_controller_tests;
|
||||
|
||||
make_index_controller_tests!({
|
||||
let options = IndexerOpts::default();
|
||||
let path = tempdir().unwrap();
|
||||
let size = 4096 * 100;
|
||||
LocalIndexController::new(path, options, size, size).unwrap()
|
||||
});
|
||||
}
|
257
meilisearch-http/src/index_controller/mod.rs
Normal file
257
meilisearch-http/src/index_controller/mod.rs
Normal file
@ -0,0 +1,257 @@
|
||||
mod index_actor;
|
||||
mod update_actor;
|
||||
mod update_handler;
|
||||
mod update_store;
|
||||
mod updates;
|
||||
mod uuid_resolver;
|
||||
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use actix_web::web::{Bytes, Payload};
|
||||
use futures::stream::StreamExt;
|
||||
use milli::update::{IndexDocumentsMethod, UpdateFormat};
|
||||
use serde::{Serialize, Deserialize};
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use tokio::time::sleep;
|
||||
use uuid::Uuid;
|
||||
|
||||
pub use updates::{Processed, Processing, Failed};
|
||||
use crate::index::{SearchResult, SearchQuery, Document};
|
||||
use crate::index::{UpdateResult, Settings, Facets};
|
||||
|
||||
pub type UpdateStatus = updates::UpdateStatus<UpdateMeta, UpdateResult, String>;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct IndexMetadata {
|
||||
name: String,
|
||||
#[serde(flatten)]
|
||||
meta: index_actor::IndexMeta,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(tag = "type")]
|
||||
pub enum UpdateMeta {
|
||||
DocumentsAddition {
|
||||
method: IndexDocumentsMethod,
|
||||
format: UpdateFormat,
|
||||
primary_key: Option<String>,
|
||||
},
|
||||
ClearDocuments,
|
||||
DeleteDocuments,
|
||||
Settings(Settings),
|
||||
Facets(Facets),
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct IndexSettings {
|
||||
pub name: Option<String>,
|
||||
pub primary_key: Option<String>,
|
||||
}
|
||||
|
||||
|
||||
pub struct IndexController {
|
||||
uuid_resolver: uuid_resolver::UuidResolverHandle,
|
||||
index_handle: index_actor::IndexActorHandle,
|
||||
update_handle: update_actor::UpdateActorHandle<Bytes>,
|
||||
}
|
||||
|
||||
enum IndexControllerMsg {
|
||||
CreateIndex {
|
||||
uuid: Uuid,
|
||||
primary_key: Option<String>,
|
||||
ret: oneshot::Sender<anyhow::Result<IndexMetadata>>,
|
||||
},
|
||||
Shutdown,
|
||||
}
|
||||
|
||||
impl IndexController {
|
||||
pub fn new(path: impl AsRef<Path>) -> anyhow::Result<Self> {
|
||||
let uuid_resolver = uuid_resolver::UuidResolverHandle::new();
|
||||
let index_actor = index_actor::IndexActorHandle::new(&path)?;
|
||||
let update_handle = update_actor::UpdateActorHandle::new(index_actor.clone(), &path);
|
||||
Ok(Self { uuid_resolver, index_handle: index_actor, update_handle })
|
||||
}
|
||||
|
||||
pub async fn add_documents(
|
||||
&self,
|
||||
index: String,
|
||||
method: milli::update::IndexDocumentsMethod,
|
||||
format: milli::update::UpdateFormat,
|
||||
mut payload: Payload,
|
||||
primary_key: Option<String>,
|
||||
) -> anyhow::Result<UpdateStatus> {
|
||||
let uuid = self.uuid_resolver.get_or_create(index).await?;
|
||||
let meta = UpdateMeta::DocumentsAddition { method, format, primary_key };
|
||||
let (sender, receiver) = mpsc::channel(10);
|
||||
|
||||
// It is necessary to spawn a local task to senf the payload to the update handle to
|
||||
// prevent dead_locking between the update_handle::update that waits for the update to be
|
||||
// registered and the update_actor that waits for the the payload to be sent to it.
|
||||
tokio::task::spawn_local(async move {
|
||||
while let Some(bytes) = payload.next().await {
|
||||
match bytes {
|
||||
Ok(bytes) => { sender.send(Ok(bytes)).await; },
|
||||
Err(e) => {
|
||||
let error: Box<dyn std::error::Error + Sync + Send + 'static> = Box::new(e);
|
||||
sender.send(Err(error)).await; },
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// This must be done *AFTER* spawning the task.
|
||||
let status = self.update_handle.update(meta, receiver, uuid).await?;
|
||||
Ok(status)
|
||||
}
|
||||
|
||||
pub async fn clear_documents(&self, index: String) -> anyhow::Result<UpdateStatus> {
|
||||
let uuid = self.uuid_resolver.resolve(index).await?;
|
||||
let meta = UpdateMeta::ClearDocuments;
|
||||
let (_, receiver) = mpsc::channel(1);
|
||||
let status = self.update_handle.update(meta, receiver, uuid).await?;
|
||||
Ok(status)
|
||||
}
|
||||
|
||||
pub async fn delete_documents(&self, index: String, document_ids: Vec<String>) -> anyhow::Result<UpdateStatus> {
|
||||
let uuid = self.uuid_resolver.resolve(index).await?;
|
||||
let meta = UpdateMeta::DeleteDocuments;
|
||||
let (sender, receiver) = mpsc::channel(10);
|
||||
|
||||
tokio::task::spawn(async move {
|
||||
let json = serde_json::to_vec(&document_ids).unwrap();
|
||||
let bytes = Bytes::from(json);
|
||||
let _ = sender.send(Ok(bytes)).await;
|
||||
});
|
||||
|
||||
let status = self.update_handle.update(meta, receiver, uuid).await?;
|
||||
Ok(status)
|
||||
}
|
||||
|
||||
pub async fn update_settings(&self, index_uid: String, settings: Settings) -> anyhow::Result<UpdateStatus> {
|
||||
let uuid = self.uuid_resolver.get_or_create(index_uid).await?;
|
||||
let meta = UpdateMeta::Settings(settings);
|
||||
// Nothing so send, drop the sender right away, as not to block the update actor.
|
||||
let (_, receiver) = mpsc::channel(1);
|
||||
|
||||
let status = self.update_handle.update(meta, receiver, uuid).await?;
|
||||
Ok(status)
|
||||
}
|
||||
|
||||
pub async fn create_index(&self, index_settings: IndexSettings) -> anyhow::Result<IndexMetadata> {
|
||||
let IndexSettings { name, primary_key } = index_settings;
|
||||
let name = name.unwrap();
|
||||
let uuid = self.uuid_resolver.create(name.clone()).await?;
|
||||
let meta = self.index_handle.create_index(uuid, primary_key).await?;
|
||||
let meta = IndexMetadata { name, meta };
|
||||
|
||||
Ok(meta)
|
||||
}
|
||||
|
||||
pub async fn delete_index(&self, index_uid: String) -> anyhow::Result<()> {
|
||||
let uuid = self.uuid_resolver
|
||||
.delete(index_uid)
|
||||
.await?;
|
||||
self.update_handle.delete(uuid.clone()).await?;
|
||||
self.index_handle.delete(uuid).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn update_status(&self, index: String, id: u64) -> anyhow::Result<Option<UpdateStatus>> {
|
||||
let uuid = self.uuid_resolver
|
||||
.resolve(index)
|
||||
.await?;
|
||||
let result = self.update_handle.update_status(uuid, id).await?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub async fn all_update_status(&self, index: String) -> anyhow::Result<Vec<UpdateStatus>> {
|
||||
let uuid = self.uuid_resolver
|
||||
.resolve(index).await?;
|
||||
let result = self.update_handle.get_all_updates_status(uuid).await?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub async fn list_indexes(&self) -> anyhow::Result<Vec<IndexMetadata>> {
|
||||
let uuids = self.uuid_resolver.list().await?;
|
||||
|
||||
let mut ret = Vec::new();
|
||||
|
||||
for (name, uuid) in uuids {
|
||||
if let Some(meta) = self.index_handle.get_index_meta(uuid).await? {
|
||||
let meta = IndexMetadata { name, meta };
|
||||
ret.push(meta);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
pub async fn settings(&self, index: String) -> anyhow::Result<Settings> {
|
||||
let uuid = self.uuid_resolver
|
||||
.resolve(index.clone())
|
||||
.await?;
|
||||
let settings = self.index_handle.settings(uuid).await?;
|
||||
Ok(settings)
|
||||
}
|
||||
|
||||
pub async fn documents(
|
||||
&self,
|
||||
index: String,
|
||||
offset: usize,
|
||||
limit: usize,
|
||||
attributes_to_retrieve: Option<Vec<String>>,
|
||||
) -> anyhow::Result<Vec<Document>> {
|
||||
let uuid = self.uuid_resolver
|
||||
.resolve(index.clone())
|
||||
.await?;
|
||||
let documents = self.index_handle.documents(uuid, offset, limit, attributes_to_retrieve).await?;
|
||||
Ok(documents)
|
||||
}
|
||||
|
||||
pub async fn document(
|
||||
&self,
|
||||
index: String,
|
||||
doc_id: String,
|
||||
attributes_to_retrieve: Option<Vec<String>>,
|
||||
) -> anyhow::Result<Document> {
|
||||
let uuid = self.uuid_resolver
|
||||
.resolve(index.clone())
|
||||
.await?;
|
||||
let document = self.index_handle.document(uuid, doc_id, attributes_to_retrieve).await?;
|
||||
Ok(document)
|
||||
}
|
||||
|
||||
fn update_index(&self, name: String, index_settings: IndexSettings) -> anyhow::Result<IndexMetadata> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
pub async fn search(&self, name: String, query: SearchQuery) -> anyhow::Result<SearchResult> {
|
||||
let uuid = self.uuid_resolver.resolve(name).await?;
|
||||
let result = self.index_handle.search(uuid, query).await?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub async fn get_index(&self, name: String) -> anyhow::Result<Option<IndexMetadata>> {
|
||||
let uuid = self.uuid_resolver.resolve(name.clone()).await?;
|
||||
let result = self.index_handle
|
||||
.get_index_meta(uuid)
|
||||
.await?
|
||||
.map(|meta| IndexMetadata { name, meta });
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_arc_ownership_blocking<T>(mut item: Arc<T>) -> T {
|
||||
loop {
|
||||
match Arc::try_unwrap(item) {
|
||||
Ok(item) => return item,
|
||||
Err(item_arc) => {
|
||||
item = item_arc;
|
||||
sleep(Duration::from_millis(100)).await;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
315
meilisearch-http/src/index_controller/update_actor.rs
Normal file
315
meilisearch-http/src/index_controller/update_actor.rs
Normal file
@ -0,0 +1,315 @@
|
||||
use std::collections::{hash_map::Entry, HashMap};
|
||||
use std::fs::{create_dir_all, remove_dir_all};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
|
||||
use itertools::Itertools;
|
||||
use log::info;
|
||||
use super::index_actor::IndexActorHandle;
|
||||
use thiserror::Error;
|
||||
use tokio::fs::File;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::sync::{mpsc, oneshot, RwLock};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::index::UpdateResult;
|
||||
use crate::index_controller::{UpdateMeta, UpdateStatus};
|
||||
use super::get_arc_ownership_blocking;
|
||||
|
||||
pub type Result<T> = std::result::Result<T, UpdateError>;
|
||||
type UpdateStore = super::update_store::UpdateStore<UpdateMeta, UpdateResult, String>;
|
||||
type PayloadData<D> = std::result::Result<D, Box<dyn std::error::Error + Sync + Send + 'static>>;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum UpdateError {
|
||||
#[error("error with update: {0}")]
|
||||
Error(Box<dyn std::error::Error + Sync + Send + 'static>),
|
||||
#[error("Index {0} doesn't exist.")]
|
||||
UnexistingIndex(Uuid),
|
||||
}
|
||||
|
||||
enum UpdateMsg<D> {
|
||||
Update {
|
||||
uuid: Uuid,
|
||||
meta: UpdateMeta,
|
||||
data: mpsc::Receiver<PayloadData<D>>,
|
||||
ret: oneshot::Sender<Result<UpdateStatus>>,
|
||||
},
|
||||
ListUpdates {
|
||||
uuid: Uuid,
|
||||
ret: oneshot::Sender<Result<Vec<UpdateStatus>>>,
|
||||
},
|
||||
GetUpdate {
|
||||
uuid: Uuid,
|
||||
ret: oneshot::Sender<Result<Option<UpdateStatus>>>,
|
||||
id: u64,
|
||||
},
|
||||
Delete {
|
||||
uuid: Uuid,
|
||||
ret: oneshot::Sender<Result<()>>,
|
||||
},
|
||||
}
|
||||
|
||||
struct UpdateActor<D, S> {
|
||||
path: PathBuf,
|
||||
store: S,
|
||||
inbox: mpsc::Receiver<UpdateMsg<D>>,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
trait UpdateStoreStore {
|
||||
async fn get_or_create(&self, uuid: Uuid) -> Result<Arc<UpdateStore>>;
|
||||
async fn delete(&self, uuid: &Uuid) -> Result<Option<Arc<UpdateStore>>>;
|
||||
async fn get(&self, uuid: &Uuid) -> Result<Option<Arc<UpdateStore>>>;
|
||||
}
|
||||
|
||||
impl<D, S> UpdateActor<D, S>
|
||||
where
|
||||
D: AsRef<[u8]> + Sized + 'static,
|
||||
S: UpdateStoreStore,
|
||||
{
|
||||
fn new(store: S, inbox: mpsc::Receiver<UpdateMsg<D>>, path: impl AsRef<Path>) -> Self {
|
||||
let path = path.as_ref().to_owned().join("update_files");
|
||||
create_dir_all(&path).unwrap();
|
||||
Self { store, inbox, path }
|
||||
}
|
||||
|
||||
async fn run(mut self) {
|
||||
use UpdateMsg::*;
|
||||
|
||||
info!("Started update actor.");
|
||||
|
||||
loop {
|
||||
match self.inbox.recv().await {
|
||||
Some(Update {
|
||||
uuid,
|
||||
meta,
|
||||
data,
|
||||
ret,
|
||||
}) => {
|
||||
let _ = ret.send(self.handle_update(uuid, meta, data).await);
|
||||
}
|
||||
Some(ListUpdates { uuid, ret }) => {
|
||||
let _ = ret.send(self.handle_list_updates(uuid).await);
|
||||
} ,
|
||||
Some(GetUpdate { uuid, ret, id }) => {
|
||||
let _ = ret.send(self.handle_get_update(uuid, id).await);
|
||||
}
|
||||
Some(Delete { uuid, ret }) => {
|
||||
let _ = ret.send(self.handle_delete(uuid).await);
|
||||
}
|
||||
None => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_update(
|
||||
&self,
|
||||
uuid: Uuid,
|
||||
meta: UpdateMeta,
|
||||
mut payload: mpsc::Receiver<PayloadData<D>>,
|
||||
) -> Result<UpdateStatus> {
|
||||
let update_store = self.store.get_or_create(uuid).await?;
|
||||
let update_file_id = uuid::Uuid::new_v4();
|
||||
let path = self.path.join(format!("update_{}", update_file_id));
|
||||
let mut file = File::create(&path).await
|
||||
.map_err(|e| UpdateError::Error(Box::new(e)))?;
|
||||
|
||||
while let Some(bytes) = payload.recv().await {
|
||||
match bytes {
|
||||
Ok(bytes) => {
|
||||
file.write_all(bytes.as_ref()).await
|
||||
.map_err(|e| UpdateError::Error(Box::new(e)))?;
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(UpdateError::Error(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
file.flush().await
|
||||
.map_err(|e| UpdateError::Error(Box::new(e)))?;
|
||||
|
||||
let file = file.into_std().await;
|
||||
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let result = update_store
|
||||
.register_update(meta, path, uuid)
|
||||
.map(|pending| UpdateStatus::Pending(pending))
|
||||
.map_err(|e| UpdateError::Error(Box::new(e)));
|
||||
result
|
||||
})
|
||||
.await
|
||||
.map_err(|e| UpdateError::Error(Box::new(e)))?
|
||||
}
|
||||
|
||||
async fn handle_list_updates(
|
||||
&self,
|
||||
uuid: Uuid,
|
||||
) -> Result<Vec<UpdateStatus>> {
|
||||
let store = self.store.get(&uuid).await?;
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let result = match store {
|
||||
Some(update_store) => {
|
||||
let updates = update_store.iter_metas(|processing, processed, pending, aborted, failed| {
|
||||
Ok(processing
|
||||
.map(UpdateStatus::from)
|
||||
.into_iter()
|
||||
.chain(pending.filter_map(|p| p.ok()).map(|(_, u)| UpdateStatus::from(u)))
|
||||
.chain(aborted.filter_map(std::result::Result::ok).map(|(_, u)| UpdateStatus::from(u)))
|
||||
.chain(processed.filter_map(std::result::Result::ok).map(|(_, u)| UpdateStatus::from(u)))
|
||||
.chain(failed.filter_map(std::result::Result::ok).map(|(_, u)| UpdateStatus::from(u)))
|
||||
.sorted_by(|a, b| a.id().cmp(&b.id()))
|
||||
.collect())
|
||||
})
|
||||
.map_err(|e| UpdateError::Error(Box::new(e)))?;
|
||||
Ok(updates)
|
||||
}
|
||||
None => Err(UpdateError::UnexistingIndex(uuid)),
|
||||
};
|
||||
result
|
||||
}).await
|
||||
.map_err(|e| UpdateError::Error(Box::new(e)))?
|
||||
}
|
||||
|
||||
|
||||
async fn handle_get_update(&self, uuid: Uuid, id: u64) -> Result<Option<UpdateStatus>> {
|
||||
let store = self.store
|
||||
.get(&uuid)
|
||||
.await?
|
||||
.ok_or(UpdateError::UnexistingIndex(uuid))?;
|
||||
let result = store.meta(id)
|
||||
.map_err(|e| UpdateError::Error(Box::new(e)))?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn handle_delete(&self, uuid: Uuid) -> Result<()> {
|
||||
let store = self.store
|
||||
.delete(&uuid)
|
||||
.await?;
|
||||
|
||||
if let Some(store) = store {
|
||||
tokio::task::spawn(async move {
|
||||
let store = get_arc_ownership_blocking(store).await;
|
||||
tokio::task::spawn_blocking(move || {
|
||||
store.prepare_for_closing().wait();
|
||||
info!("Update store {} was closed.", uuid);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct UpdateActorHandle<D> {
|
||||
sender: mpsc::Sender<UpdateMsg<D>>,
|
||||
}
|
||||
|
||||
impl<D> UpdateActorHandle<D>
|
||||
where
|
||||
D: AsRef<[u8]> + Sized + 'static + Sync + Send,
|
||||
{
|
||||
pub fn new(index_handle: IndexActorHandle, path: impl AsRef<Path>) -> Self {
|
||||
let path = path.as_ref().to_owned().join("updates");
|
||||
let (sender, receiver) = mpsc::channel(100);
|
||||
let store = MapUpdateStoreStore::new(index_handle, &path);
|
||||
let actor = UpdateActor::new(store, receiver, path);
|
||||
|
||||
tokio::task::spawn(actor.run());
|
||||
|
||||
Self { sender }
|
||||
}
|
||||
|
||||
pub async fn update(
|
||||
&self,
|
||||
meta: UpdateMeta,
|
||||
data: mpsc::Receiver<PayloadData<D>>,
|
||||
uuid: Uuid,
|
||||
) -> Result<UpdateStatus> {
|
||||
let (ret, receiver) = oneshot::channel();
|
||||
let msg = UpdateMsg::Update {
|
||||
uuid,
|
||||
data,
|
||||
meta,
|
||||
ret,
|
||||
};
|
||||
let _ = self.sender.send(msg).await;
|
||||
receiver.await.expect("update actor killed.")
|
||||
}
|
||||
|
||||
pub async fn get_all_updates_status(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>> {
|
||||
let (ret, receiver) = oneshot::channel();
|
||||
let msg = UpdateMsg::ListUpdates { uuid, ret };
|
||||
let _ = self.sender.send(msg).await;
|
||||
receiver.await.expect("update actor killed.")
|
||||
}
|
||||
|
||||
pub async fn update_status(&self, uuid: Uuid, id: u64) -> Result<Option<UpdateStatus>> {
|
||||
let (ret, receiver) = oneshot::channel();
|
||||
let msg = UpdateMsg::GetUpdate { uuid, id, ret };
|
||||
let _ = self.sender.send(msg).await;
|
||||
receiver.await.expect("update actor killed.")
|
||||
}
|
||||
|
||||
pub async fn delete(&self, uuid: Uuid) -> Result<()> {
|
||||
let (ret, receiver) = oneshot::channel();
|
||||
let msg = UpdateMsg::Delete { uuid, ret };
|
||||
let _ = self.sender.send(msg).await;
|
||||
receiver.await.expect("update actor killed.")
|
||||
}
|
||||
}
|
||||
|
||||
struct MapUpdateStoreStore {
|
||||
db: Arc<RwLock<HashMap<Uuid, Arc<UpdateStore>>>>,
|
||||
index_handle: IndexActorHandle,
|
||||
path: PathBuf,
|
||||
}
|
||||
|
||||
impl MapUpdateStoreStore {
|
||||
fn new(index_handle: IndexActorHandle, path: impl AsRef<Path>) -> Self {
|
||||
let db = Arc::new(RwLock::new(HashMap::new()));
|
||||
let path = path.as_ref().to_owned();
|
||||
Self {
|
||||
db,
|
||||
index_handle,
|
||||
path,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl UpdateStoreStore for MapUpdateStoreStore {
|
||||
async fn get_or_create(&self, uuid: Uuid) -> Result<Arc<UpdateStore>> {
|
||||
match self.db.write().await.entry(uuid) {
|
||||
Entry::Vacant(e) => {
|
||||
let mut options = heed::EnvOpenOptions::new();
|
||||
options.map_size(4096 * 100_000);
|
||||
let path = self.path.clone().join(format!("updates-{}", e.key()));
|
||||
create_dir_all(&path).unwrap();
|
||||
let index_handle = self.index_handle.clone();
|
||||
let store = UpdateStore::open(options, &path, move |meta, file| {
|
||||
futures::executor::block_on(index_handle.update(meta, file))
|
||||
})
|
||||
.unwrap();
|
||||
let store = e.insert(store);
|
||||
Ok(store.clone())
|
||||
}
|
||||
Entry::Occupied(e) => Ok(e.get().clone()),
|
||||
}
|
||||
}
|
||||
|
||||
async fn get(&self, uuid: &Uuid) -> Result<Option<Arc<UpdateStore>>> {
|
||||
Ok(self.db.read().await.get(uuid).cloned())
|
||||
}
|
||||
|
||||
async fn delete(&self, uuid: &Uuid) -> Result<Option<Arc<UpdateStore>>> {
|
||||
let store = self.db.write().await.remove(&uuid);
|
||||
if store.is_some() {
|
||||
let path = self.path.clone().join(format!("updates-{}", uuid));
|
||||
remove_dir_all(path).unwrap();
|
||||
}
|
||||
Ok(store)
|
||||
}
|
||||
}
|
98
meilisearch-http/src/index_controller/update_handler.rs
Normal file
98
meilisearch-http/src/index_controller/update_handler.rs
Normal file
@ -0,0 +1,98 @@
|
||||
use std::fs::File;
|
||||
|
||||
use anyhow::Result;
|
||||
use grenad::CompressionType;
|
||||
use milli::update::UpdateBuilder;
|
||||
use crate::index::Index;
|
||||
use rayon::ThreadPool;
|
||||
|
||||
use crate::index_controller::updates::{Failed, Processed, Processing};
|
||||
use crate::index_controller::UpdateMeta;
|
||||
use crate::index::UpdateResult;
|
||||
use crate::option::IndexerOpts;
|
||||
|
||||
pub struct UpdateHandler {
|
||||
max_nb_chunks: Option<usize>,
|
||||
chunk_compression_level: Option<u32>,
|
||||
thread_pool: ThreadPool,
|
||||
log_frequency: usize,
|
||||
max_memory: usize,
|
||||
linked_hash_map_size: usize,
|
||||
chunk_compression_type: CompressionType,
|
||||
chunk_fusing_shrink_size: u64,
|
||||
}
|
||||
|
||||
impl UpdateHandler {
|
||||
pub fn new(
|
||||
opt: &IndexerOpts,
|
||||
) -> anyhow::Result<Self> {
|
||||
let thread_pool = rayon::ThreadPoolBuilder::new()
|
||||
.num_threads(opt.indexing_jobs.unwrap_or(0))
|
||||
.build()?;
|
||||
Ok(Self {
|
||||
max_nb_chunks: opt.max_nb_chunks,
|
||||
chunk_compression_level: opt.chunk_compression_level,
|
||||
thread_pool,
|
||||
log_frequency: opt.log_every_n,
|
||||
max_memory: opt.max_memory.get_bytes() as usize,
|
||||
linked_hash_map_size: opt.linked_hash_map_size,
|
||||
chunk_compression_type: opt.chunk_compression_type,
|
||||
chunk_fusing_shrink_size: opt.chunk_fusing_shrink_size.get_bytes(),
|
||||
})
|
||||
}
|
||||
|
||||
fn update_buidler(&self, update_id: u64) -> UpdateBuilder {
|
||||
// We prepare the update by using the update builder.
|
||||
let mut update_builder = UpdateBuilder::new(update_id);
|
||||
if let Some(max_nb_chunks) = self.max_nb_chunks {
|
||||
update_builder.max_nb_chunks(max_nb_chunks);
|
||||
}
|
||||
if let Some(chunk_compression_level) = self.chunk_compression_level {
|
||||
update_builder.chunk_compression_level(chunk_compression_level);
|
||||
}
|
||||
update_builder.thread_pool(&self.thread_pool);
|
||||
update_builder.log_every_n(self.log_frequency);
|
||||
update_builder.max_memory(self.max_memory);
|
||||
update_builder.linked_hash_map_size(self.linked_hash_map_size);
|
||||
update_builder.chunk_compression_type(self.chunk_compression_type);
|
||||
update_builder.chunk_fusing_shrink_size(self.chunk_fusing_shrink_size);
|
||||
update_builder
|
||||
}
|
||||
|
||||
|
||||
pub fn handle_update(
|
||||
&self,
|
||||
meta: Processing<UpdateMeta>,
|
||||
content: File,
|
||||
index: Index,
|
||||
) -> Result<Processed<UpdateMeta, UpdateResult>, Failed<UpdateMeta, String>> {
|
||||
use UpdateMeta::*;
|
||||
|
||||
let update_id = meta.id();
|
||||
|
||||
let update_builder = self.update_buidler(update_id);
|
||||
|
||||
let result = match meta.meta() {
|
||||
DocumentsAddition {
|
||||
method,
|
||||
format,
|
||||
primary_key,
|
||||
} => index.update_documents(
|
||||
*format,
|
||||
*method,
|
||||
content,
|
||||
update_builder,
|
||||
primary_key.as_deref(),
|
||||
),
|
||||
ClearDocuments => index.clear_documents(update_builder),
|
||||
DeleteDocuments => index.delete_documents(content, update_builder),
|
||||
Settings(settings) => index.update_settings(settings, update_builder),
|
||||
Facets(levels) => index.update_facets(levels, update_builder),
|
||||
};
|
||||
|
||||
match result {
|
||||
Ok(result) => Ok(meta.process(result)),
|
||||
Err(e) => Err(meta.fail(e.to_string())),
|
||||
}
|
||||
}
|
||||
}
|
444
meilisearch-http/src/index_controller/update_store.rs
Normal file
444
meilisearch-http/src/index_controller/update_store.rs
Normal file
@ -0,0 +1,444 @@
|
||||
use std::fs::remove_file;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use heed::types::{DecodeIgnore, OwnedType, SerdeJson};
|
||||
use heed::{Database, Env, EnvOpenOptions};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fs::File;
|
||||
use tokio::sync::mpsc;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::index_controller::updates::*;
|
||||
|
||||
type BEU64 = heed::zerocopy::U64<heed::byteorder::BE>;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct UpdateStore<M, N, E> {
|
||||
env: Env,
|
||||
pending_meta: Database<OwnedType<BEU64>, SerdeJson<Pending<M>>>,
|
||||
pending: Database<OwnedType<BEU64>, SerdeJson<PathBuf>>,
|
||||
processed_meta: Database<OwnedType<BEU64>, SerdeJson<Processed<M, N>>>,
|
||||
failed_meta: Database<OwnedType<BEU64>, SerdeJson<Failed<M, E>>>,
|
||||
aborted_meta: Database<OwnedType<BEU64>, SerdeJson<Aborted<M>>>,
|
||||
processing: Arc<RwLock<Option<Processing<M>>>>,
|
||||
notification_sender: mpsc::Sender<()>,
|
||||
}
|
||||
|
||||
pub trait HandleUpdate<M, N, E> {
|
||||
fn handle_update(
|
||||
&mut self,
|
||||
meta: Processing<M>,
|
||||
content: File,
|
||||
) -> anyhow::Result<Result<Processed<M, N>, Failed<M, E>>>;
|
||||
}
|
||||
|
||||
impl<M, N, E, F> HandleUpdate<M, N, E> for F
|
||||
where
|
||||
F: FnMut(Processing<M>, File) -> anyhow::Result<Result<Processed<M, N>, Failed<M, E>>>,
|
||||
{
|
||||
fn handle_update(
|
||||
&mut self,
|
||||
meta: Processing<M>,
|
||||
content: File,
|
||||
) -> anyhow::Result<Result<Processed<M, N>, Failed<M, E>>> {
|
||||
self(meta, content)
|
||||
}
|
||||
}
|
||||
|
||||
impl<M, N, E> UpdateStore<M, N, E>
|
||||
where
|
||||
M: for<'a> Deserialize<'a> + Serialize + 'static + Send + Sync + Clone,
|
||||
N: for<'a> Deserialize<'a> + Serialize + 'static + Send + Sync,
|
||||
E: for<'a> Deserialize<'a> + Serialize + 'static + Send + Sync,
|
||||
{
|
||||
pub fn open<P, U>(
|
||||
mut options: EnvOpenOptions,
|
||||
path: P,
|
||||
update_handler: U,
|
||||
) -> heed::Result<Arc<Self>>
|
||||
where
|
||||
P: AsRef<Path>,
|
||||
U: HandleUpdate<M, N, E> + Sync + Clone + Send + 'static,
|
||||
{
|
||||
options.max_dbs(5);
|
||||
|
||||
let env = options.open(path)?;
|
||||
let pending_meta = env.create_database(Some("pending-meta"))?;
|
||||
let pending = env.create_database(Some("pending"))?;
|
||||
let processed_meta = env.create_database(Some("processed-meta"))?;
|
||||
let aborted_meta = env.create_database(Some("aborted-meta"))?;
|
||||
let failed_meta = env.create_database(Some("failed-meta"))?;
|
||||
let processing = Arc::new(RwLock::new(None));
|
||||
|
||||
let (notification_sender, mut notification_receiver) = mpsc::channel(10);
|
||||
// Send a first notification to trigger the process.
|
||||
let _ = notification_sender.send(());
|
||||
|
||||
let update_store = Arc::new(UpdateStore {
|
||||
env,
|
||||
pending,
|
||||
pending_meta,
|
||||
processed_meta,
|
||||
aborted_meta,
|
||||
notification_sender,
|
||||
failed_meta,
|
||||
processing,
|
||||
});
|
||||
|
||||
// We need a weak reference so we can take ownership on the arc later when we
|
||||
// want to close the index.
|
||||
let update_store_weak = Arc::downgrade(&update_store);
|
||||
tokio::task::spawn(async move {
|
||||
// Block and wait for something to process.
|
||||
'outer: while let Some(_) = notification_receiver.recv().await {
|
||||
loop {
|
||||
match update_store_weak.upgrade() {
|
||||
Some(update_store) => {
|
||||
let handler = update_handler.clone();
|
||||
let res = tokio::task::spawn_blocking(move || {
|
||||
update_store.process_pending_update(handler)
|
||||
})
|
||||
.await
|
||||
.expect("Fatal error processing update.");
|
||||
match res {
|
||||
Ok(Some(_)) => (),
|
||||
Ok(None) => break,
|
||||
Err(e) => eprintln!("error while processing update: {}", e),
|
||||
}
|
||||
}
|
||||
// the ownership on the arc has been taken, we need to exit.
|
||||
None => break 'outer,
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(update_store)
|
||||
}
|
||||
|
||||
pub fn prepare_for_closing(self) -> heed::EnvClosingEvent {
|
||||
self.env.prepare_for_closing()
|
||||
}
|
||||
|
||||
/// Returns the new biggest id to use to store the new update.
|
||||
fn new_update_id(&self, txn: &heed::RoTxn) -> heed::Result<u64> {
|
||||
let last_pending = self
|
||||
.pending_meta
|
||||
.remap_data_type::<DecodeIgnore>()
|
||||
.last(txn)?
|
||||
.map(|(k, _)| k.get());
|
||||
|
||||
let last_processed = self
|
||||
.processed_meta
|
||||
.remap_data_type::<DecodeIgnore>()
|
||||
.last(txn)?
|
||||
.map(|(k, _)| k.get());
|
||||
|
||||
let last_aborted = self
|
||||
.aborted_meta
|
||||
.remap_data_type::<DecodeIgnore>()
|
||||
.last(txn)?
|
||||
.map(|(k, _)| k.get());
|
||||
|
||||
let last_update_id = [last_pending, last_processed, last_aborted]
|
||||
.iter()
|
||||
.copied()
|
||||
.flatten()
|
||||
.max();
|
||||
|
||||
match last_update_id {
|
||||
Some(last_id) => Ok(last_id + 1),
|
||||
None => Ok(0),
|
||||
}
|
||||
}
|
||||
|
||||
/// Registers the update content in the pending store and the meta
|
||||
/// into the pending-meta store. Returns the new unique update id.
|
||||
pub fn register_update(
|
||||
&self,
|
||||
meta: M,
|
||||
content: impl AsRef<Path>,
|
||||
index_uuid: Uuid,
|
||||
) -> heed::Result<Pending<M>> {
|
||||
let mut wtxn = self.env.write_txn()?;
|
||||
|
||||
// We ask the update store to give us a new update id, this is safe,
|
||||
// no other update can have the same id because we use a write txn before
|
||||
// asking for the id and registering it so other update registering
|
||||
// will be forced to wait for a new write txn.
|
||||
let update_id = self.new_update_id(&wtxn)?;
|
||||
let update_key = BEU64::new(update_id);
|
||||
|
||||
let meta = Pending::new(meta, update_id, index_uuid);
|
||||
self.pending_meta.put(&mut wtxn, &update_key, &meta)?;
|
||||
self.pending
|
||||
.put(&mut wtxn, &update_key, &content.as_ref().to_owned())?;
|
||||
|
||||
wtxn.commit()?;
|
||||
|
||||
self.notification_sender
|
||||
.blocking_send(())
|
||||
.expect("Update store loop exited.");
|
||||
Ok(meta)
|
||||
}
|
||||
/// Executes the user provided function on the next pending update (the one with the lowest id).
|
||||
/// This is asynchronous as it let the user process the update with a read-only txn and
|
||||
/// only writing the result meta to the processed-meta store *after* it has been processed.
|
||||
fn process_pending_update<U>(&self, mut handler: U) -> anyhow::Result<Option<()>>
|
||||
where
|
||||
U: HandleUpdate<M, N, E>,
|
||||
{
|
||||
// Create a read transaction to be able to retrieve the pending update in order.
|
||||
let rtxn = self.env.read_txn()?;
|
||||
let first_meta = self.pending_meta.first(&rtxn)?;
|
||||
|
||||
// If there is a pending update we process and only keep
|
||||
// a reader while processing it, not a writer.
|
||||
match first_meta {
|
||||
Some((first_id, pending)) => {
|
||||
let content_path = self
|
||||
.pending
|
||||
.get(&rtxn, &first_id)?
|
||||
.expect("associated update content");
|
||||
|
||||
// we change the state of the update from pending to processing before we pass it
|
||||
// to the update handler. Processing store is non persistent to be able recover
|
||||
// from a failure
|
||||
let processing = pending.processing();
|
||||
self.processing.write().unwrap().replace(processing.clone());
|
||||
let file = File::open(&content_path)?;
|
||||
// Process the pending update using the provided user function.
|
||||
let result = handler.handle_update(processing, file)?;
|
||||
drop(rtxn);
|
||||
|
||||
// Once the pending update have been successfully processed
|
||||
// we must remove the content from the pending and processing stores and
|
||||
// write the *new* meta to the processed-meta store and commit.
|
||||
let mut wtxn = self.env.write_txn()?;
|
||||
self.processing.write().unwrap().take();
|
||||
self.pending_meta.delete(&mut wtxn, &first_id)?;
|
||||
remove_file(&content_path)?;
|
||||
self.pending.delete(&mut wtxn, &first_id)?;
|
||||
match result {
|
||||
Ok(processed) => self.processed_meta.put(&mut wtxn, &first_id, &processed)?,
|
||||
Err(failed) => self.failed_meta.put(&mut wtxn, &first_id, &failed)?,
|
||||
}
|
||||
wtxn.commit()?;
|
||||
|
||||
Ok(Some(()))
|
||||
}
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
/// Execute the user defined function with the meta-store iterators, the first
|
||||
/// iterator is the *processed* meta one, the second the *aborted* meta one
|
||||
/// and, the last is the *pending* meta one.
|
||||
pub fn iter_metas<F, T>(&self, mut f: F) -> heed::Result<T>
|
||||
where
|
||||
F: for<'a> FnMut(
|
||||
Option<Processing<M>>,
|
||||
heed::RoIter<'a, OwnedType<BEU64>, SerdeJson<Processed<M, N>>>,
|
||||
heed::RoIter<'a, OwnedType<BEU64>, SerdeJson<Aborted<M>>>,
|
||||
heed::RoIter<'a, OwnedType<BEU64>, SerdeJson<Pending<M>>>,
|
||||
heed::RoIter<'a, OwnedType<BEU64>, SerdeJson<Failed<M, E>>>,
|
||||
) -> heed::Result<T>,
|
||||
{
|
||||
let rtxn = self.env.read_txn()?;
|
||||
|
||||
// We get the pending, processed and aborted meta iterators.
|
||||
let processed_iter = self.processed_meta.iter(&rtxn)?;
|
||||
let aborted_iter = self.aborted_meta.iter(&rtxn)?;
|
||||
let pending_iter = self.pending_meta.iter(&rtxn)?;
|
||||
let processing = self.processing.read().unwrap().clone();
|
||||
let failed_iter = self.failed_meta.iter(&rtxn)?;
|
||||
|
||||
// We execute the user defined function with both iterators.
|
||||
(f)(
|
||||
processing,
|
||||
processed_iter,
|
||||
aborted_iter,
|
||||
pending_iter,
|
||||
failed_iter,
|
||||
)
|
||||
}
|
||||
|
||||
/// Returns the update associated meta or `None` if the update doesn't exist.
|
||||
pub fn meta(&self, update_id: u64) -> heed::Result<Option<UpdateStatus<M, N, E>>> {
|
||||
let rtxn = self.env.read_txn()?;
|
||||
let key = BEU64::new(update_id);
|
||||
|
||||
if let Some(ref meta) = *self.processing.read().unwrap() {
|
||||
if meta.id() == update_id {
|
||||
return Ok(Some(UpdateStatus::Processing(meta.clone())));
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(meta) = self.pending_meta.get(&rtxn, &key)? {
|
||||
return Ok(Some(UpdateStatus::Pending(meta)));
|
||||
}
|
||||
|
||||
if let Some(meta) = self.processed_meta.get(&rtxn, &key)? {
|
||||
return Ok(Some(UpdateStatus::Processed(meta)));
|
||||
}
|
||||
|
||||
if let Some(meta) = self.aborted_meta.get(&rtxn, &key)? {
|
||||
return Ok(Some(UpdateStatus::Aborted(meta)));
|
||||
}
|
||||
|
||||
if let Some(meta) = self.failed_meta.get(&rtxn, &key)? {
|
||||
return Ok(Some(UpdateStatus::Failed(meta)));
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Aborts an update, an aborted update content is deleted and
|
||||
/// the meta of it is moved into the aborted updates database.
|
||||
///
|
||||
/// Trying to abort an update that is currently being processed, an update
|
||||
/// that as already been processed or which doesn't actually exist, will
|
||||
/// return `None`.
|
||||
#[allow(dead_code)]
|
||||
pub fn abort_update(&self, update_id: u64) -> heed::Result<Option<Aborted<M>>> {
|
||||
let mut wtxn = self.env.write_txn()?;
|
||||
let key = BEU64::new(update_id);
|
||||
|
||||
// We cannot abort an update that is currently being processed.
|
||||
if self.pending_meta.first(&wtxn)?.map(|(key, _)| key.get()) == Some(update_id) {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let pending = match self.pending_meta.get(&wtxn, &key)? {
|
||||
Some(meta) => meta,
|
||||
None => return Ok(None),
|
||||
};
|
||||
|
||||
let aborted = pending.abort();
|
||||
|
||||
self.aborted_meta.put(&mut wtxn, &key, &aborted)?;
|
||||
self.pending_meta.delete(&mut wtxn, &key)?;
|
||||
self.pending.delete(&mut wtxn, &key)?;
|
||||
|
||||
wtxn.commit()?;
|
||||
|
||||
Ok(Some(aborted))
|
||||
}
|
||||
|
||||
/// Aborts all the pending updates, and not the one being currently processed.
|
||||
/// Returns the update metas and ids that were successfully aborted.
|
||||
#[allow(dead_code)]
|
||||
pub fn abort_pendings(&self) -> heed::Result<Vec<(u64, Aborted<M>)>> {
|
||||
let mut wtxn = self.env.write_txn()?;
|
||||
let mut aborted_updates = Vec::new();
|
||||
|
||||
// We skip the first pending update as it is currently being processed.
|
||||
for result in self.pending_meta.iter(&wtxn)?.skip(1) {
|
||||
let (key, pending) = result?;
|
||||
let id = key.get();
|
||||
aborted_updates.push((id, pending.abort()));
|
||||
}
|
||||
|
||||
for (id, aborted) in &aborted_updates {
|
||||
let key = BEU64::new(*id);
|
||||
self.aborted_meta.put(&mut wtxn, &key, &aborted)?;
|
||||
self.pending_meta.delete(&mut wtxn, &key)?;
|
||||
self.pending.delete(&mut wtxn, &key)?;
|
||||
}
|
||||
|
||||
wtxn.commit()?;
|
||||
|
||||
Ok(aborted_updates)
|
||||
}
|
||||
}
|
||||
|
||||
//#[cfg(test)]
|
||||
//mod tests {
|
||||
//use super::*;
|
||||
//use std::thread;
|
||||
//use std::time::{Duration, Instant};
|
||||
|
||||
//#[test]
|
||||
//fn simple() {
|
||||
//let dir = tempfile::tempdir().unwrap();
|
||||
//let mut options = EnvOpenOptions::new();
|
||||
//options.map_size(4096 * 100);
|
||||
//let update_store = UpdateStore::open(
|
||||
//options,
|
||||
//dir,
|
||||
//|meta: Processing<String>, _content: &_| -> Result<_, Failed<_, ()>> {
|
||||
//let new_meta = meta.meta().to_string() + " processed";
|
||||
//let processed = meta.process(new_meta);
|
||||
//Ok(processed)
|
||||
//},
|
||||
//)
|
||||
//.unwrap();
|
||||
|
||||
//let meta = String::from("kiki");
|
||||
//let update = update_store.register_update(meta, &[]).unwrap();
|
||||
//thread::sleep(Duration::from_millis(100));
|
||||
//let meta = update_store.meta(update.id()).unwrap().unwrap();
|
||||
//if let UpdateStatus::Processed(Processed { success, .. }) = meta {
|
||||
//assert_eq!(success, "kiki processed");
|
||||
//} else {
|
||||
//panic!()
|
||||
//}
|
||||
//}
|
||||
|
||||
//#[test]
|
||||
//#[ignore]
|
||||
//fn long_running_update() {
|
||||
//let dir = tempfile::tempdir().unwrap();
|
||||
//let mut options = EnvOpenOptions::new();
|
||||
//options.map_size(4096 * 100);
|
||||
//let update_store = UpdateStore::open(
|
||||
//options,
|
||||
//dir,
|
||||
//|meta: Processing<String>, _content: &_| -> Result<_, Failed<_, ()>> {
|
||||
//thread::sleep(Duration::from_millis(400));
|
||||
//let new_meta = meta.meta().to_string() + "processed";
|
||||
//let processed = meta.process(new_meta);
|
||||
//Ok(processed)
|
||||
//},
|
||||
//)
|
||||
//.unwrap();
|
||||
|
||||
//let before_register = Instant::now();
|
||||
|
||||
//let meta = String::from("kiki");
|
||||
//let update_kiki = update_store.register_update(meta, &[]).unwrap();
|
||||
//assert!(before_register.elapsed() < Duration::from_millis(200));
|
||||
|
||||
//let meta = String::from("coco");
|
||||
//let update_coco = update_store.register_update(meta, &[]).unwrap();
|
||||
//assert!(before_register.elapsed() < Duration::from_millis(200));
|
||||
|
||||
//let meta = String::from("cucu");
|
||||
//let update_cucu = update_store.register_update(meta, &[]).unwrap();
|
||||
//assert!(before_register.elapsed() < Duration::from_millis(200));
|
||||
|
||||
//thread::sleep(Duration::from_millis(400 * 3 + 100));
|
||||
|
||||
//let meta = update_store.meta(update_kiki.id()).unwrap().unwrap();
|
||||
//if let UpdateStatus::Processed(Processed { success, .. }) = meta {
|
||||
//assert_eq!(success, "kiki processed");
|
||||
//} else {
|
||||
//panic!()
|
||||
//}
|
||||
|
||||
//let meta = update_store.meta(update_coco.id()).unwrap().unwrap();
|
||||
//if let UpdateStatus::Processed(Processed { success, .. }) = meta {
|
||||
//assert_eq!(success, "coco processed");
|
||||
//} else {
|
||||
//panic!()
|
||||
//}
|
||||
|
||||
//let meta = update_store.meta(update_cucu.id()).unwrap().unwrap();
|
||||
//if let UpdateStatus::Processed(Processed { success, .. }) = meta {
|
||||
//assert_eq!(success, "cucu processed");
|
||||
//} else {
|
||||
//panic!()
|
||||
//}
|
||||
//}
|
||||
//}
|
186
meilisearch-http/src/index_controller/updates.rs
Normal file
186
meilisearch-http/src/index_controller/updates.rs
Normal file
@ -0,0 +1,186 @@
|
||||
use chrono::{Utc, DateTime};
|
||||
use serde::{Serialize, Deserialize};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Pending<M> {
|
||||
pub update_id: u64,
|
||||
pub meta: M,
|
||||
pub enqueued_at: DateTime<Utc>,
|
||||
pub index_uuid: Uuid,
|
||||
}
|
||||
|
||||
impl<M> Pending<M> {
|
||||
pub fn new(meta: M, update_id: u64, index_uuid: Uuid) -> Self {
|
||||
Self {
|
||||
enqueued_at: Utc::now(),
|
||||
meta,
|
||||
update_id,
|
||||
index_uuid,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn processing(self) -> Processing<M> {
|
||||
Processing {
|
||||
from: self,
|
||||
started_processing_at: Utc::now(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn abort(self) -> Aborted<M> {
|
||||
Aborted {
|
||||
from: self,
|
||||
aborted_at: Utc::now(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn meta(&self) -> &M {
|
||||
&self.meta
|
||||
}
|
||||
|
||||
pub fn id(&self) -> u64 {
|
||||
self.update_id
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Processed<M, N> {
|
||||
pub success: N,
|
||||
pub processed_at: DateTime<Utc>,
|
||||
#[serde(flatten)]
|
||||
pub from: Processing<M>,
|
||||
}
|
||||
|
||||
impl<M, N> Processed<M, N> {
|
||||
pub fn id(&self) -> u64 {
|
||||
self.from.id()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Processing<M> {
|
||||
#[serde(flatten)]
|
||||
pub from: Pending<M>,
|
||||
pub started_processing_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl<M> Processing<M> {
|
||||
pub fn id(&self) -> u64 {
|
||||
self.from.id()
|
||||
}
|
||||
|
||||
pub fn meta(&self) -> &M {
|
||||
self.from.meta()
|
||||
}
|
||||
|
||||
pub fn index_uuid(&self) -> &Uuid {
|
||||
&self.from.index_uuid
|
||||
}
|
||||
|
||||
pub fn process<N>(self, meta: N) -> Processed<M, N> {
|
||||
Processed {
|
||||
success: meta,
|
||||
from: self,
|
||||
processed_at: Utc::now(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn fail<E>(self, error: E) -> Failed<M, E> {
|
||||
Failed {
|
||||
from: self,
|
||||
error,
|
||||
failed_at: Utc::now(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Aborted<M> {
|
||||
#[serde(flatten)]
|
||||
from: Pending<M>,
|
||||
aborted_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl<M> Aborted<M> {
|
||||
pub fn id(&self) -> u64 {
|
||||
self.from.id()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Failed<M, E> {
|
||||
#[serde(flatten)]
|
||||
from: Processing<M>,
|
||||
error: E,
|
||||
failed_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl<M, E> Failed<M, E> {
|
||||
pub fn id(&self) -> u64 {
|
||||
self.from.id()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Serialize)]
|
||||
#[serde(tag = "status", rename_all = "camelCase")]
|
||||
pub enum UpdateStatus<M, N, E> {
|
||||
Processing(Processing<M>),
|
||||
Pending(Pending<M>),
|
||||
Processed(Processed<M, N>),
|
||||
Aborted(Aborted<M>),
|
||||
Failed(Failed<M, E>),
|
||||
}
|
||||
|
||||
impl<M, N, E> UpdateStatus<M, N, E> {
|
||||
pub fn id(&self) -> u64 {
|
||||
match self {
|
||||
UpdateStatus::Processing(u) => u.id(),
|
||||
UpdateStatus::Pending(u) => u.id(),
|
||||
UpdateStatus::Processed(u) => u.id(),
|
||||
UpdateStatus::Aborted(u) => u.id(),
|
||||
UpdateStatus::Failed(u) => u.id(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn processed(&self) -> Option<&Processed<M, N>> {
|
||||
match self {
|
||||
UpdateStatus::Processed(p) => Some(p),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<M, N, E> From<Pending<M>> for UpdateStatus<M, N, E> {
|
||||
fn from(other: Pending<M>) -> Self {
|
||||
Self::Pending(other)
|
||||
}
|
||||
}
|
||||
|
||||
impl<M, N, E> From<Aborted<M>> for UpdateStatus<M, N, E> {
|
||||
fn from(other: Aborted<M>) -> Self {
|
||||
Self::Aborted(other)
|
||||
}
|
||||
}
|
||||
|
||||
impl<M, N, E> From<Processed<M, N>> for UpdateStatus<M, N, E> {
|
||||
fn from(other: Processed<M, N>) -> Self {
|
||||
Self::Processed(other)
|
||||
}
|
||||
}
|
||||
|
||||
impl<M, N, E> From<Processing<M>> for UpdateStatus<M, N, E> {
|
||||
fn from(other: Processing<M>) -> Self {
|
||||
Self::Processing(other)
|
||||
}
|
||||
}
|
||||
|
||||
impl<M, N, E> From<Failed<M, E>> for UpdateStatus<M, N, E> {
|
||||
fn from(other: Failed<M, E>) -> Self {
|
||||
Self::Failed(other)
|
||||
}
|
||||
}
|
219
meilisearch-http/src/index_controller/uuid_resolver.rs
Normal file
219
meilisearch-http/src/index_controller/uuid_resolver.rs
Normal file
@ -0,0 +1,219 @@
|
||||
use log::{info, warn};
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use thiserror::Error;
|
||||
use tokio::sync::{mpsc, oneshot, RwLock};
|
||||
use uuid::Uuid;
|
||||
|
||||
pub type Result<T> = std::result::Result<T, UuidError>;
|
||||
|
||||
#[derive(Debug)]
|
||||
enum UuidResolveMsg {
|
||||
Resolve {
|
||||
name: String,
|
||||
ret: oneshot::Sender<Result<Uuid>>,
|
||||
},
|
||||
GetOrCreate {
|
||||
name: String,
|
||||
ret: oneshot::Sender<Result<Uuid>>,
|
||||
},
|
||||
Create {
|
||||
name: String,
|
||||
ret: oneshot::Sender<Result<Uuid>>,
|
||||
},
|
||||
Delete {
|
||||
name: String,
|
||||
ret: oneshot::Sender<Result<Uuid>>,
|
||||
},
|
||||
List {
|
||||
ret: oneshot::Sender<Result<Vec<(String, Uuid)>>>,
|
||||
},
|
||||
}
|
||||
|
||||
struct UuidResolverActor<S> {
|
||||
inbox: mpsc::Receiver<UuidResolveMsg>,
|
||||
store: S,
|
||||
}
|
||||
|
||||
impl<S: UuidStore> UuidResolverActor<S> {
|
||||
fn new(inbox: mpsc::Receiver<UuidResolveMsg>, store: S) -> Self {
|
||||
Self { inbox, store }
|
||||
}
|
||||
|
||||
async fn run(mut self) {
|
||||
use UuidResolveMsg::*;
|
||||
|
||||
info!("uuid resolver started");
|
||||
|
||||
loop {
|
||||
match self.inbox.recv().await {
|
||||
Some(Create { name, ret }) => {
|
||||
let _ = ret.send(self.handle_create(name).await);
|
||||
}
|
||||
Some(GetOrCreate { name, ret }) => {
|
||||
let _ = ret.send(self.handle_get_or_create(name).await);
|
||||
}
|
||||
Some(Resolve { name, ret }) => {
|
||||
let _ = ret.send(self.handle_resolve(name).await);
|
||||
}
|
||||
Some(Delete { name, ret }) => {
|
||||
let _ = ret.send(self.handle_delete(name).await);
|
||||
}
|
||||
Some(List { ret }) => {
|
||||
let _ = ret.send(self.handle_list().await);
|
||||
}
|
||||
// all senders have been dropped, need to quit.
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
|
||||
warn!("exiting uuid resolver loop");
|
||||
}
|
||||
|
||||
async fn handle_create(&self, name: String) -> Result<Uuid> {
|
||||
self.store.create_uuid(name, true).await
|
||||
}
|
||||
|
||||
async fn handle_get_or_create(&self, name: String) -> Result<Uuid> {
|
||||
self.store.create_uuid(name, false).await
|
||||
}
|
||||
|
||||
async fn handle_resolve(&self, name: String) -> Result<Uuid> {
|
||||
self.store
|
||||
.get_uuid(&name)
|
||||
.await?
|
||||
.ok_or(UuidError::UnexistingIndex(name))
|
||||
}
|
||||
|
||||
async fn handle_delete(&self, name: String) -> Result<Uuid> {
|
||||
self.store
|
||||
.delete(&name)
|
||||
.await?
|
||||
.ok_or(UuidError::UnexistingIndex(name))
|
||||
}
|
||||
|
||||
async fn handle_list(&self) -> Result<Vec<(String, Uuid)>> {
|
||||
let result = self.store.list().await?;
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct UuidResolverHandle {
|
||||
sender: mpsc::Sender<UuidResolveMsg>,
|
||||
}
|
||||
|
||||
impl UuidResolverHandle {
|
||||
pub fn new() -> Self {
|
||||
let (sender, reveiver) = mpsc::channel(100);
|
||||
let store = MapUuidStore(Arc::new(RwLock::new(HashMap::new())));
|
||||
let actor = UuidResolverActor::new(reveiver, store);
|
||||
tokio::spawn(actor.run());
|
||||
Self { sender }
|
||||
}
|
||||
|
||||
pub async fn resolve(&self, name: String) -> anyhow::Result<Uuid> {
|
||||
let (ret, receiver) = oneshot::channel();
|
||||
let msg = UuidResolveMsg::Resolve { name, ret };
|
||||
let _ = self.sender.send(msg).await;
|
||||
Ok(receiver
|
||||
.await
|
||||
.expect("Uuid resolver actor has been killed")?)
|
||||
}
|
||||
|
||||
pub async fn get_or_create(&self, name: String) -> Result<Uuid> {
|
||||
let (ret, receiver) = oneshot::channel();
|
||||
let msg = UuidResolveMsg::GetOrCreate { name, ret };
|
||||
let _ = self.sender.send(msg).await;
|
||||
Ok(receiver
|
||||
.await
|
||||
.expect("Uuid resolver actor has been killed")?)
|
||||
}
|
||||
|
||||
pub async fn create(&self, name: String) -> anyhow::Result<Uuid> {
|
||||
let (ret, receiver) = oneshot::channel();
|
||||
let msg = UuidResolveMsg::Create { name, ret };
|
||||
let _ = self.sender.send(msg).await;
|
||||
Ok(receiver
|
||||
.await
|
||||
.expect("Uuid resolver actor has been killed")?)
|
||||
}
|
||||
|
||||
pub async fn delete(&self, name: String) -> anyhow::Result<Uuid> {
|
||||
let (ret, receiver) = oneshot::channel();
|
||||
let msg = UuidResolveMsg::Delete { name, ret };
|
||||
let _ = self.sender.send(msg).await;
|
||||
Ok(receiver
|
||||
.await
|
||||
.expect("Uuid resolver actor has been killed")?)
|
||||
}
|
||||
|
||||
pub async fn list(&self) -> anyhow::Result<Vec<(String, Uuid)>> {
|
||||
let (ret, receiver) = oneshot::channel();
|
||||
let msg = UuidResolveMsg::List { ret };
|
||||
let _ = self.sender.send(msg).await;
|
||||
Ok(receiver
|
||||
.await
|
||||
.expect("Uuid resolver actor has been killed")?)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Error)]
|
||||
pub enum UuidError {
|
||||
#[error("Name already exist.")]
|
||||
NameAlreadyExist,
|
||||
#[error("Index \"{0}\" doesn't exist.")]
|
||||
UnexistingIndex(String),
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
trait UuidStore {
|
||||
// Create a new entry for `name`. Return an error if `err` and the entry already exists, return
|
||||
// the uuid otherwise.
|
||||
async fn create_uuid(&self, name: String, err: bool) -> Result<Uuid>;
|
||||
async fn get_uuid(&self, name: &str) -> Result<Option<Uuid>>;
|
||||
async fn delete(&self, name: &str) -> Result<Option<Uuid>>;
|
||||
async fn list(&self) -> Result<Vec<(String, Uuid)>>;
|
||||
}
|
||||
|
||||
struct MapUuidStore(Arc<RwLock<HashMap<String, Uuid>>>);
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl UuidStore for MapUuidStore {
|
||||
async fn create_uuid(&self, name: String, err: bool) -> Result<Uuid> {
|
||||
match self.0.write().await.entry(name) {
|
||||
Entry::Occupied(entry) => {
|
||||
if err {
|
||||
Err(UuidError::NameAlreadyExist)
|
||||
} else {
|
||||
Ok(entry.get().clone())
|
||||
}
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
let uuid = Uuid::new_v4();
|
||||
let uuid = entry.insert(uuid);
|
||||
Ok(uuid.clone())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_uuid(&self, name: &str) -> Result<Option<Uuid>> {
|
||||
Ok(self.0.read().await.get(name).cloned())
|
||||
}
|
||||
|
||||
async fn delete(&self, name: &str) -> Result<Option<Uuid>> {
|
||||
Ok(self.0.write().await.remove(name))
|
||||
}
|
||||
|
||||
async fn list(&self) -> Result<Vec<(String, Uuid)>> {
|
||||
let list = self
|
||||
.0
|
||||
.read()
|
||||
.await
|
||||
.iter()
|
||||
.map(|(name, uuid)| (name.to_owned(), uuid.clone()))
|
||||
.collect();
|
||||
Ok(list)
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user