113: snapshots r=MarinPostma a=MarinPostma

 This pr adds support for snapshoting.

The snapshoting process for an index requires that no other update is processing at the same time. A mutex lock has been added to prevent a snapshot from occuring at the same time as an update, while still premitting updates to be pushed.

The list of the indexes to snapshot is first retrieved from the `UuidResolver` which also performs its snapshot.

This list is passed to the update store, which attempts to acquire a lock on the update store while it snaphots itself and it's associated index store.

 This means that a snapshot can only be completed once all indexes have finished their ongoing update.

This pr also adds refactoring of the code to allow unit testing and mocking, and unit test the snapshot creation.

Co-authored-by: mpostma <postma.marin@protonmail.com>
Co-authored-by: tamo <irevoire@protonmail.ch>
Co-authored-by: marin <postma.marin@protonmail.com>
Co-authored-by: Marin Postma <postma.marin@protonmail.com>
This commit is contained in:
bors[bot]
2021-04-01 13:16:00 +00:00
committed by GitHub
34 changed files with 2249 additions and 1587 deletions

96
Cargo.lock generated
View File

@@ -937,6 +937,12 @@ version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80115a2dfde04491e181c2440a39e4be26e52d9ca4e92bed213f65b94e0b8db1" checksum = "80115a2dfde04491e181c2440a39e4be26e52d9ca4e92bed213f65b94e0b8db1"
[[package]]
name = "difference"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "524cbf6897b527295dff137cec09ecf3a05f4fddffd7dfcd1585403449e74198"
[[package]] [[package]]
name = "digest" name = "digest"
version = "0.8.1" version = "0.8.1"
@@ -961,6 +967,12 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0" checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0"
[[package]]
name = "downcast"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bb454f0228b18c7f4c3b0ebbee346ed9c52e7443b0999cd543ff3571205701d"
[[package]] [[package]]
name = "either" name = "either"
version = "1.6.1" version = "1.6.1"
@@ -1066,6 +1078,15 @@ dependencies = [
"miniz_oxide", "miniz_oxide",
] ]
[[package]]
name = "float-cmp"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1267f4ac4f343772758f7b1bdcbe767c218bbab93bb432acbf5162bbf85a6c4"
dependencies = [
"num-traits",
]
[[package]] [[package]]
name = "fnv" name = "fnv"
version = "1.0.7" version = "1.0.7"
@@ -1082,6 +1103,12 @@ dependencies = [
"percent-encoding", "percent-encoding",
] ]
[[package]]
name = "fragile"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69a039c3498dc930fe810151a34ba0c1c70b02b8625035592e74432f678591f2"
[[package]] [[package]]
name = "fs_extra" name = "fs_extra"
version = "1.2.0" version = "1.2.0"
@@ -1822,6 +1849,7 @@ dependencies = [
"memmap", "memmap",
"milli", "milli",
"mime", "mime",
"mockall",
"once_cell", "once_cell",
"oxidized-json-checker", "oxidized-json-checker",
"parking_lot", "parking_lot",
@@ -2023,6 +2051,33 @@ dependencies = [
"winapi 0.3.9", "winapi 0.3.9",
] ]
[[package]]
name = "mockall"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18d614ad23f9bb59119b8b5670a85c7ba92c5e9adf4385c81ea00c51c8be33d5"
dependencies = [
"cfg-if 1.0.0",
"downcast",
"fragile",
"lazy_static",
"mockall_derive",
"predicates",
"predicates-tree",
]
[[package]]
name = "mockall_derive"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5dd4234635bca06fc96c7368d038061e0aae1b00a764dc817e900dc974e3deea"
dependencies = [
"cfg-if 1.0.0",
"proc-macro2 1.0.24",
"quote 1.0.9",
"syn 1.0.64",
]
[[package]] [[package]]
name = "net2" name = "net2"
version = "0.2.37" version = "0.2.37"
@@ -2046,6 +2101,12 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "normalize-line-endings"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be"
[[package]] [[package]]
name = "ntapi" name = "ntapi"
version = "0.3.6" version = "0.3.6"
@@ -2329,6 +2390,35 @@ version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857" checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857"
[[package]]
name = "predicates"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eeb433456c1a57cc93554dea3ce40b4c19c4057e41c55d4a0f3d84ea71c325aa"
dependencies = [
"difference",
"float-cmp",
"normalize-line-endings",
"predicates-core",
"regex",
]
[[package]]
name = "predicates-core"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57e35a3326b75e49aa85f5dc6ec15b41108cf5aee58eabb1f274dd18b73c2451"
[[package]]
name = "predicates-tree"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15f553275e5721409451eb85e15fd9a860a6e5ab4496eb215987502b5f5391f2"
dependencies = [
"predicates-core",
"treeline",
]
[[package]] [[package]]
name = "proc-macro-error" name = "proc-macro-error"
version = "1.0.4" version = "1.0.4"
@@ -3438,6 +3528,12 @@ dependencies = [
"tracing", "tracing",
] ]
[[package]]
name = "treeline"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7f741b240f1a48843f9b8e0444fb55fb2a4ff67293b50a9179dfd5ea67f8d41"
[[package]] [[package]]
name = "trust-dns-proto" name = "trust-dns-proto"
version = "0.19.7" version = "0.19.7"

View File

@@ -80,10 +80,11 @@ version = "0.18.1"
[dev-dependencies] [dev-dependencies]
actix-rt = "2.1.0"
assert-json-diff = { branch = "master", git = "https://github.com/qdequele/assert-json-diff" }
mockall = "0.9.1"
serde_url_params = "0.2.0" serde_url_params = "0.2.0"
tempdir = "0.3.7" tempdir = "0.3.7"
assert-json-diff = { branch = "master", git = "https://github.com/qdequele/assert-json-diff" }
actix-rt = "2.1.0"
urlencoding = "1.1.1" urlencoding = "1.1.1"
[features] [features]

View File

@@ -1,7 +1,6 @@
pub mod search; pub mod search;
mod updates; mod updates;
use std::fs::create_dir_all;
use std::ops::Deref; use std::ops::Deref;
use std::sync::Arc; use std::sync::Arc;
@@ -59,10 +58,7 @@ impl Data {
pub fn new(options: Opt) -> anyhow::Result<Data> { pub fn new(options: Opt) -> anyhow::Result<Data> {
let path = options.db_path.clone(); let path = options.db_path.clone();
create_dir_all(&path)?; let index_controller = IndexController::new(&path, &options)?;
let index_size = options.max_mdb_size.get_bytes() as usize;
let update_store_size = options.max_udb_size.get_bytes() as usize;
let index_controller = IndexController::new(&path, index_size, update_store_size)?;
let mut api_keys = ApiKeys { let mut api_keys = ApiKeys {
master: options.clone().master_key, master: options.clone().master_key,

View File

@@ -1,27 +1,26 @@
use flate2::read::GzDecoder;
use flate2::write::GzEncoder;
use flate2::Compression;
use std::fs::{create_dir_all, File}; use std::fs::{create_dir_all, File};
use std::io::Write;
use std::path::Path; use std::path::Path;
use flate2::{read::GzDecoder, write::GzEncoder, Compression};
use tar::{Archive, Builder}; use tar::{Archive, Builder};
use crate::error::Error; pub fn to_tar_gz(src: impl AsRef<Path>, dest: impl AsRef<Path>) -> anyhow::Result<()> {
let mut f = File::create(dest)?;
pub fn to_tar_gz(src: &Path, dest: &Path) -> Result<(), Error> { let gz_encoder = GzEncoder::new(&mut f, Compression::default());
let f = File::create(dest)?;
let gz_encoder = GzEncoder::new(f, Compression::default());
let mut tar_encoder = Builder::new(gz_encoder); let mut tar_encoder = Builder::new(gz_encoder);
tar_encoder.append_dir_all(".", src)?; tar_encoder.append_dir_all(".", src)?;
let gz_encoder = tar_encoder.into_inner()?; let gz_encoder = tar_encoder.into_inner()?;
gz_encoder.finish()?; gz_encoder.finish()?;
f.flush()?;
Ok(()) Ok(())
} }
pub fn from_tar_gz(src: &Path, dest: &Path) -> Result<(), Error> { pub fn from_tar_gz(src: impl AsRef<Path>, dest: impl AsRef<Path>) -> anyhow::Result<()> {
let f = File::open(src)?; let f = File::open(&src)?;
let gz = GzDecoder::new(f); let gz = GzDecoder::new(f);
let mut ar = Archive::new(gz); let mut ar = Archive::new(gz);
create_dir_all(dest)?; create_dir_all(&dest)?;
ar.unpack(dest)?; ar.unpack(&dest)?;
Ok(()) Ok(())
} }

View File

@@ -20,7 +20,6 @@ const fn default_search_limit() -> usize {
#[derive(Deserialize)] #[derive(Deserialize)]
#[serde(rename_all = "camelCase", deny_unknown_fields)] #[serde(rename_all = "camelCase", deny_unknown_fields)]
#[allow(dead_code)]
pub struct SearchQuery { pub struct SearchQuery {
pub q: Option<String>, pub q: Option<String>,
pub offset: Option<usize>, pub offset: Option<usize>,

View File

@@ -1,612 +0,0 @@
use std::collections::HashMap;
use std::fs::{create_dir_all, File};
use std::future::Future;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use async_stream::stream;
use chrono::{DateTime, Utc};
use futures::pin_mut;
use futures::stream::StreamExt;
use heed::EnvOpenOptions;
use log::debug;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::fs::remove_dir_all;
use tokio::sync::{mpsc, oneshot, RwLock};
use tokio::task::spawn_blocking;
use uuid::Uuid;
use super::update_handler::UpdateHandler;
use super::{get_arc_ownership_blocking, IndexSettings};
use crate::index::UpdateResult as UResult;
use crate::index::{Document, Index, SearchQuery, SearchResult, Settings};
use crate::index_controller::{
updates::{Failed, Processed, Processing},
UpdateMeta,
};
use crate::option::IndexerOpts;
pub type Result<T> = std::result::Result<T, IndexError>;
type AsyncMap<K, V> = Arc<RwLock<HashMap<K, V>>>;
type UpdateResult = std::result::Result<Processed<UpdateMeta, UResult>, Failed<UpdateMeta, String>>;
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct IndexMeta {
created_at: DateTime<Utc>,
updated_at: DateTime<Utc>,
primary_key: Option<String>,
}
impl IndexMeta {
fn new(index: &Index) -> Result<Self> {
let txn = index.read_txn()?;
Self::new_txn(index, &txn)
}
fn new_txn(index: &Index, txn: &heed::RoTxn) -> Result<Self> {
let created_at = index.created_at(&txn)?;
let updated_at = index.updated_at(&txn)?;
let primary_key = index.primary_key(&txn)?.map(String::from);
Ok(Self {
primary_key,
updated_at,
created_at,
})
}
}
enum IndexMsg {
CreateIndex {
uuid: Uuid,
primary_key: Option<String>,
ret: oneshot::Sender<Result<IndexMeta>>,
},
Update {
meta: Processing<UpdateMeta>,
data: std::fs::File,
ret: oneshot::Sender<Result<UpdateResult>>,
},
Search {
uuid: Uuid,
query: SearchQuery,
ret: oneshot::Sender<anyhow::Result<SearchResult>>,
},
Settings {
uuid: Uuid,
ret: oneshot::Sender<Result<Settings>>,
},
Documents {
uuid: Uuid,
attributes_to_retrieve: Option<Vec<String>>,
offset: usize,
limit: usize,
ret: oneshot::Sender<Result<Vec<Document>>>,
},
Document {
uuid: Uuid,
attributes_to_retrieve: Option<Vec<String>>,
doc_id: String,
ret: oneshot::Sender<Result<Document>>,
},
Delete {
uuid: Uuid,
ret: oneshot::Sender<Result<()>>,
},
GetMeta {
uuid: Uuid,
ret: oneshot::Sender<Result<IndexMeta>>,
},
UpdateIndex {
uuid: Uuid,
index_settings: IndexSettings,
ret: oneshot::Sender<Result<IndexMeta>>,
},
}
struct IndexActor<S> {
read_receiver: Option<mpsc::Receiver<IndexMsg>>,
write_receiver: Option<mpsc::Receiver<IndexMsg>>,
update_handler: Arc<UpdateHandler>,
store: S,
}
#[derive(Error, Debug)]
pub enum IndexError {
#[error("error with index: {0}")]
Error(#[from] anyhow::Error),
#[error("index already exists")]
IndexAlreadyExists,
#[error("Index doesn't exists")]
UnexistingIndex,
#[error("Heed error: {0}")]
HeedError(#[from] heed::Error),
#[error("Existing primary key")]
ExistingPrimaryKey,
}
#[async_trait::async_trait]
trait IndexStore {
async fn create(&self, uuid: Uuid, primary_key: Option<String>) -> Result<Index>;
async fn get(&self, uuid: Uuid) -> Result<Option<Index>>;
async fn delete(&self, uuid: Uuid) -> Result<Option<Index>>;
}
impl<S: IndexStore + Sync + Send> IndexActor<S> {
fn new(
read_receiver: mpsc::Receiver<IndexMsg>,
write_receiver: mpsc::Receiver<IndexMsg>,
store: S,
) -> Result<Self> {
let options = IndexerOpts::default();
let update_handler = UpdateHandler::new(&options).map_err(IndexError::Error)?;
let update_handler = Arc::new(update_handler);
let read_receiver = Some(read_receiver);
let write_receiver = Some(write_receiver);
Ok(Self {
read_receiver,
write_receiver,
store,
update_handler,
})
}
/// `run` poll the write_receiver and read_receiver concurrently, but while messages send
/// through the read channel are processed concurrently, the messages sent through the write
/// channel are processed one at a time.
async fn run(mut self) {
let mut read_receiver = self
.read_receiver
.take()
.expect("Index Actor must have a inbox at this point.");
let read_stream = stream! {
loop {
match read_receiver.recv().await {
Some(msg) => yield msg,
None => break,
}
}
};
let mut write_receiver = self
.write_receiver
.take()
.expect("Index Actor must have a inbox at this point.");
let write_stream = stream! {
loop {
match write_receiver.recv().await {
Some(msg) => yield msg,
None => break,
}
}
};
pin_mut!(write_stream);
pin_mut!(read_stream);
let fut1 = read_stream.for_each_concurrent(Some(10), |msg| self.handle_message(msg));
let fut2 = write_stream.for_each_concurrent(Some(1), |msg| self.handle_message(msg));
let fut1: Box<dyn Future<Output = ()> + Unpin + Send> = Box::new(fut1);
let fut2: Box<dyn Future<Output = ()> + Unpin + Send> = Box::new(fut2);
tokio::join!(fut1, fut2);
}
async fn handle_message(&self, msg: IndexMsg) {
use IndexMsg::*;
match msg {
CreateIndex {
uuid,
primary_key,
ret,
} => {
let _ = ret.send(self.handle_create_index(uuid, primary_key).await);
}
Update { ret, meta, data } => {
let _ = ret.send(self.handle_update(meta, data).await);
}
Search { ret, query, uuid } => {
let _ = ret.send(self.handle_search(uuid, query).await);
}
Settings { ret, uuid } => {
let _ = ret.send(self.handle_settings(uuid).await);
}
Documents {
ret,
uuid,
attributes_to_retrieve,
offset,
limit,
} => {
let _ = ret.send(
self.handle_fetch_documents(uuid, offset, limit, attributes_to_retrieve)
.await,
);
}
Document {
uuid,
attributes_to_retrieve,
doc_id,
ret,
} => {
let _ = ret.send(
self.handle_fetch_document(uuid, doc_id, attributes_to_retrieve)
.await,
);
}
Delete { uuid, ret } => {
let _ = ret.send(self.handle_delete(uuid).await);
}
GetMeta { uuid, ret } => {
let _ = ret.send(self.handle_get_meta(uuid).await);
}
UpdateIndex {
uuid,
index_settings,
ret,
} => {
let _ = ret.send(self.handle_update_index(uuid, index_settings).await);
}
}
}
async fn handle_search(&self, uuid: Uuid, query: SearchQuery) -> anyhow::Result<SearchResult> {
let index = self
.store
.get(uuid)
.await?
.ok_or(IndexError::UnexistingIndex)?;
spawn_blocking(move || index.perform_search(query)).await?
}
async fn handle_create_index(
&self,
uuid: Uuid,
primary_key: Option<String>,
) -> Result<IndexMeta> {
let index = self.store.create(uuid, primary_key).await?;
let meta = spawn_blocking(move || IndexMeta::new(&index))
.await
.map_err(|e| IndexError::Error(e.into()))??;
Ok(meta)
}
async fn handle_update(
&self,
meta: Processing<UpdateMeta>,
data: File,
) -> Result<UpdateResult> {
log::info!("Processing update {}", meta.id());
let uuid = meta.index_uuid();
let update_handler = self.update_handler.clone();
let index = match self.store.get(*uuid).await? {
Some(index) => index,
None => self.store.create(*uuid, None).await?,
};
spawn_blocking(move || update_handler.handle_update(meta, data, index))
.await
.map_err(|e| IndexError::Error(e.into()))
}
async fn handle_settings(&self, uuid: Uuid) -> Result<Settings> {
let index = self
.store
.get(uuid)
.await?
.ok_or(IndexError::UnexistingIndex)?;
spawn_blocking(move || index.settings().map_err(IndexError::Error))
.await
.map_err(|e| IndexError::Error(e.into()))?
}
async fn handle_fetch_documents(
&self,
uuid: Uuid,
offset: usize,
limit: usize,
attributes_to_retrieve: Option<Vec<String>>,
) -> Result<Vec<Document>> {
let index = self
.store
.get(uuid)
.await?
.ok_or(IndexError::UnexistingIndex)?;
spawn_blocking(move || {
index
.retrieve_documents(offset, limit, attributes_to_retrieve)
.map_err(IndexError::Error)
})
.await
.map_err(|e| IndexError::Error(e.into()))?
}
async fn handle_fetch_document(
&self,
uuid: Uuid,
doc_id: String,
attributes_to_retrieve: Option<Vec<String>>,
) -> Result<Document> {
let index = self
.store
.get(uuid)
.await?
.ok_or(IndexError::UnexistingIndex)?;
spawn_blocking(move || {
index
.retrieve_document(doc_id, attributes_to_retrieve)
.map_err(IndexError::Error)
})
.await
.map_err(|e| IndexError::Error(e.into()))?
}
async fn handle_delete(&self, uuid: Uuid) -> Result<()> {
let index = self.store.delete(uuid).await?;
if let Some(index) = index {
tokio::task::spawn(async move {
let index = index.0;
let store = get_arc_ownership_blocking(index).await;
spawn_blocking(move || {
store.prepare_for_closing().wait();
debug!("Index closed");
});
});
}
Ok(())
}
async fn handle_get_meta(&self, uuid: Uuid) -> Result<IndexMeta> {
match self.store.get(uuid).await? {
Some(index) => {
let meta = spawn_blocking(move || IndexMeta::new(&index))
.await
.map_err(|e| IndexError::Error(e.into()))??;
Ok(meta)
}
None => Err(IndexError::UnexistingIndex),
}
}
async fn handle_update_index(
&self,
uuid: Uuid,
index_settings: IndexSettings,
) -> Result<IndexMeta> {
let index = self
.store
.get(uuid)
.await?
.ok_or(IndexError::UnexistingIndex)?;
spawn_blocking(move || match index_settings.primary_key {
Some(ref primary_key) => {
let mut txn = index.write_txn()?;
if index.primary_key(&txn)?.is_some() {
return Err(IndexError::ExistingPrimaryKey);
}
index.put_primary_key(&mut txn, primary_key)?;
let meta = IndexMeta::new_txn(&index, &txn)?;
txn.commit()?;
Ok(meta)
}
None => {
let meta = IndexMeta::new(&index)?;
Ok(meta)
}
})
.await
.map_err(|e| IndexError::Error(e.into()))?
}
}
#[derive(Clone)]
pub struct IndexActorHandle {
read_sender: mpsc::Sender<IndexMsg>,
write_sender: mpsc::Sender<IndexMsg>,
}
impl IndexActorHandle {
pub fn new(path: impl AsRef<Path>, index_size: usize) -> anyhow::Result<Self> {
let (read_sender, read_receiver) = mpsc::channel(100);
let (write_sender, write_receiver) = mpsc::channel(100);
let store = HeedIndexStore::new(path, index_size);
let actor = IndexActor::new(read_receiver, write_receiver, store)?;
tokio::task::spawn(actor.run());
Ok(Self {
read_sender,
write_sender,
})
}
pub async fn create_index(&self, uuid: Uuid, primary_key: Option<String>) -> Result<IndexMeta> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::CreateIndex {
ret,
uuid,
primary_key,
};
let _ = self.read_sender.send(msg).await;
receiver.await.expect("IndexActor has been killed")
}
pub async fn update(
&self,
meta: Processing<UpdateMeta>,
data: std::fs::File,
) -> anyhow::Result<UpdateResult> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Update { ret, meta, data };
let _ = self.write_sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
pub async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result<SearchResult> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Search { uuid, query, ret };
let _ = self.read_sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
pub async fn settings(&self, uuid: Uuid) -> Result<Settings> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Settings { uuid, ret };
let _ = self.read_sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
pub async fn documents(
&self,
uuid: Uuid,
offset: usize,
limit: usize,
attributes_to_retrieve: Option<Vec<String>>,
) -> Result<Vec<Document>> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Documents {
uuid,
ret,
offset,
attributes_to_retrieve,
limit,
};
let _ = self.read_sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
pub async fn document(
&self,
uuid: Uuid,
doc_id: String,
attributes_to_retrieve: Option<Vec<String>>,
) -> Result<Document> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Document {
uuid,
ret,
doc_id,
attributes_to_retrieve,
};
let _ = self.read_sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
pub async fn delete(&self, uuid: Uuid) -> Result<()> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Delete { uuid, ret };
let _ = self.write_sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
pub async fn get_index_meta(&self, uuid: Uuid) -> Result<IndexMeta> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::GetMeta { uuid, ret };
let _ = self.read_sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
pub async fn update_index(
&self,
uuid: Uuid,
index_settings: IndexSettings,
) -> Result<IndexMeta> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::UpdateIndex {
uuid,
index_settings,
ret,
};
let _ = self.read_sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
}
struct HeedIndexStore {
index_store: AsyncMap<Uuid, Index>,
path: PathBuf,
index_size: usize,
}
impl HeedIndexStore {
fn new(path: impl AsRef<Path>, index_size: usize) -> Self {
let path = path.as_ref().join("indexes/");
let index_store = Arc::new(RwLock::new(HashMap::new()));
Self {
index_store,
path,
index_size,
}
}
}
#[async_trait::async_trait]
impl IndexStore for HeedIndexStore {
async fn create(&self, uuid: Uuid, primary_key: Option<String>) -> Result<Index> {
let path = self.path.join(format!("index-{}", uuid));
if path.exists() {
return Err(IndexError::IndexAlreadyExists);
}
let index_size = self.index_size;
let index = spawn_blocking(move || -> Result<Index> {
let index = open_index(&path, index_size)?;
if let Some(primary_key) = primary_key {
let mut txn = index.write_txn()?;
index.put_primary_key(&mut txn, &primary_key)?;
txn.commit()?;
}
Ok(index)
})
.await
.map_err(|e| IndexError::Error(e.into()))??;
self.index_store.write().await.insert(uuid, index.clone());
Ok(index)
}
async fn get(&self, uuid: Uuid) -> Result<Option<Index>> {
let guard = self.index_store.read().await;
match guard.get(&uuid) {
Some(index) => Ok(Some(index.clone())),
None => {
// drop the guard here so we can perform the write after without deadlocking;
drop(guard);
let path = self.path.join(format!("index-{}", uuid));
if !path.exists() {
return Ok(None);
}
let index_size = self.index_size;
let index = spawn_blocking(move || open_index(path, index_size))
.await
.map_err(|e| IndexError::Error(e.into()))??;
self.index_store.write().await.insert(uuid, index.clone());
Ok(Some(index))
}
}
}
async fn delete(&self, uuid: Uuid) -> Result<Option<Index>> {
let db_path = self.path.join(format!("index-{}", uuid));
remove_dir_all(db_path)
.await
.map_err(|e| IndexError::Error(e.into()))?;
let index = self.index_store.write().await.remove(&uuid);
Ok(index)
}
}
fn open_index(path: impl AsRef<Path>, size: usize) -> Result<Index> {
create_dir_all(&path).map_err(|e| IndexError::Error(e.into()))?;
let mut options = EnvOpenOptions::new();
options.map_size(size);
let index = milli::Index::new(options, &path).map_err(IndexError::Error)?;
Ok(Index(Arc::new(index)))
}

View File

@@ -0,0 +1,331 @@
use std::fs::File;
use std::future::Future;
use std::path::PathBuf;
use std::sync::Arc;
use async_stream::stream;
use futures::pin_mut;
use futures::stream::StreamExt;
use heed::CompactionOption;
use log::debug;
use tokio::sync::mpsc;
use tokio::task::spawn_blocking;
use uuid::Uuid;
use super::{IndexError, IndexMeta, IndexMsg, IndexSettings, IndexStore, Result, UpdateResult};
use crate::index::{Document, SearchQuery, SearchResult, Settings};
use crate::index_controller::update_handler::UpdateHandler;
use crate::index_controller::{get_arc_ownership_blocking, updates::Processing, UpdateMeta};
use crate::option::IndexerOpts;
pub struct IndexActor<S> {
read_receiver: Option<mpsc::Receiver<IndexMsg>>,
write_receiver: Option<mpsc::Receiver<IndexMsg>>,
update_handler: Arc<UpdateHandler>,
store: S,
}
impl<S: IndexStore + Sync + Send> IndexActor<S> {
pub fn new(
read_receiver: mpsc::Receiver<IndexMsg>,
write_receiver: mpsc::Receiver<IndexMsg>,
store: S,
) -> Result<Self> {
let options = IndexerOpts::default();
let update_handler = UpdateHandler::new(&options).map_err(IndexError::Error)?;
let update_handler = Arc::new(update_handler);
let read_receiver = Some(read_receiver);
let write_receiver = Some(write_receiver);
Ok(Self {
read_receiver,
write_receiver,
store,
update_handler,
})
}
/// `run` poll the write_receiver and read_receiver concurrently, but while messages send
/// through the read channel are processed concurrently, the messages sent through the write
/// channel are processed one at a time.
pub async fn run(mut self) {
let mut read_receiver = self
.read_receiver
.take()
.expect("Index Actor must have a inbox at this point.");
let read_stream = stream! {
loop {
match read_receiver.recv().await {
Some(msg) => yield msg,
None => break,
}
}
};
let mut write_receiver = self
.write_receiver
.take()
.expect("Index Actor must have a inbox at this point.");
let write_stream = stream! {
loop {
match write_receiver.recv().await {
Some(msg) => yield msg,
None => break,
}
}
};
pin_mut!(write_stream);
pin_mut!(read_stream);
let fut1 = read_stream.for_each_concurrent(Some(10), |msg| self.handle_message(msg));
let fut2 = write_stream.for_each_concurrent(Some(1), |msg| self.handle_message(msg));
let fut1: Box<dyn Future<Output = ()> + Unpin + Send> = Box::new(fut1);
let fut2: Box<dyn Future<Output = ()> + Unpin + Send> = Box::new(fut2);
tokio::join!(fut1, fut2);
}
async fn handle_message(&self, msg: IndexMsg) {
use IndexMsg::*;
match msg {
CreateIndex {
uuid,
primary_key,
ret,
} => {
let _ = ret.send(self.handle_create_index(uuid, primary_key).await);
}
Update { ret, meta, data } => {
let _ = ret.send(self.handle_update(meta, data).await);
}
Search { ret, query, uuid } => {
let _ = ret.send(self.handle_search(uuid, query).await);
}
Settings { ret, uuid } => {
let _ = ret.send(self.handle_settings(uuid).await);
}
Documents {
ret,
uuid,
attributes_to_retrieve,
offset,
limit,
} => {
let _ = ret.send(
self.handle_fetch_documents(uuid, offset, limit, attributes_to_retrieve)
.await,
);
}
Document {
uuid,
attributes_to_retrieve,
doc_id,
ret,
} => {
let _ = ret.send(
self.handle_fetch_document(uuid, doc_id, attributes_to_retrieve)
.await,
);
}
Delete { uuid, ret } => {
let _ = ret.send(self.handle_delete(uuid).await);
}
GetMeta { uuid, ret } => {
let _ = ret.send(self.handle_get_meta(uuid).await);
}
UpdateIndex {
uuid,
index_settings,
ret,
} => {
let _ = ret.send(self.handle_update_index(uuid, index_settings).await);
}
Snapshot { uuid, path, ret } => {
let _ = ret.send(self.handle_snapshot(uuid, path).await);
}
}
}
async fn handle_search(&self, uuid: Uuid, query: SearchQuery) -> anyhow::Result<SearchResult> {
let index = self
.store
.get(uuid)
.await?
.ok_or(IndexError::UnexistingIndex)?;
spawn_blocking(move || index.perform_search(query)).await?
}
async fn handle_create_index(
&self,
uuid: Uuid,
primary_key: Option<String>,
) -> Result<IndexMeta> {
let index = self.store.create(uuid, primary_key).await?;
let meta = spawn_blocking(move || IndexMeta::new(&index))
.await
.map_err(|e| IndexError::Error(e.into()))??;
Ok(meta)
}
async fn handle_update(
&self,
meta: Processing<UpdateMeta>,
data: File,
) -> Result<UpdateResult> {
debug!("Processing update {}", meta.id());
let uuid = meta.index_uuid();
let update_handler = self.update_handler.clone();
let index = match self.store.get(*uuid).await? {
Some(index) => index,
None => self.store.create(*uuid, None).await?,
};
spawn_blocking(move || update_handler.handle_update(meta, data, index))
.await
.map_err(|e| IndexError::Error(e.into()))
}
async fn handle_settings(&self, uuid: Uuid) -> Result<Settings> {
let index = self
.store
.get(uuid)
.await?
.ok_or(IndexError::UnexistingIndex)?;
spawn_blocking(move || index.settings().map_err(IndexError::Error))
.await
.map_err(|e| IndexError::Error(e.into()))?
}
async fn handle_fetch_documents(
&self,
uuid: Uuid,
offset: usize,
limit: usize,
attributes_to_retrieve: Option<Vec<String>>,
) -> Result<Vec<Document>> {
let index = self
.store
.get(uuid)
.await?
.ok_or(IndexError::UnexistingIndex)?;
spawn_blocking(move || {
index
.retrieve_documents(offset, limit, attributes_to_retrieve)
.map_err(IndexError::Error)
})
.await
.map_err(|e| IndexError::Error(e.into()))?
}
async fn handle_fetch_document(
&self,
uuid: Uuid,
doc_id: String,
attributes_to_retrieve: Option<Vec<String>>,
) -> Result<Document> {
let index = self
.store
.get(uuid)
.await?
.ok_or(IndexError::UnexistingIndex)?;
spawn_blocking(move || {
index
.retrieve_document(doc_id, attributes_to_retrieve)
.map_err(IndexError::Error)
})
.await
.map_err(|e| IndexError::Error(e.into()))?
}
async fn handle_delete(&self, uuid: Uuid) -> Result<()> {
let index = self.store.delete(uuid).await?;
if let Some(index) = index {
tokio::task::spawn(async move {
let index = index.0;
let store = get_arc_ownership_blocking(index).await;
spawn_blocking(move || {
store.prepare_for_closing().wait();
debug!("Index closed");
});
});
}
Ok(())
}
async fn handle_get_meta(&self, uuid: Uuid) -> Result<IndexMeta> {
match self.store.get(uuid).await? {
Some(index) => {
let meta = spawn_blocking(move || IndexMeta::new(&index))
.await
.map_err(|e| IndexError::Error(e.into()))??;
Ok(meta)
}
None => Err(IndexError::UnexistingIndex),
}
}
async fn handle_update_index(
&self,
uuid: Uuid,
index_settings: IndexSettings,
) -> Result<IndexMeta> {
let index = self
.store
.get(uuid)
.await?
.ok_or(IndexError::UnexistingIndex)?;
spawn_blocking(move || match index_settings.primary_key {
Some(ref primary_key) => {
let mut txn = index.write_txn()?;
if index.primary_key(&txn)?.is_some() {
return Err(IndexError::ExistingPrimaryKey);
}
index.put_primary_key(&mut txn, primary_key)?;
let meta = IndexMeta::new_txn(&index, &txn)?;
txn.commit()?;
Ok(meta)
}
None => {
let meta = IndexMeta::new(&index)?;
Ok(meta)
}
})
.await
.map_err(|e| IndexError::Error(e.into()))?
}
async fn handle_snapshot(&self, uuid: Uuid, mut path: PathBuf) -> Result<()> {
use tokio::fs::create_dir_all;
path.push("indexes");
create_dir_all(&path)
.await
.map_err(|e| IndexError::Error(e.into()))?;
if let Some(index) = self.store.get(uuid).await? {
let mut index_path = path.join(format!("index-{}", uuid));
create_dir_all(&index_path)
.await
.map_err(|e| IndexError::Error(e.into()))?;
index_path.push("data.mdb");
spawn_blocking(move || -> anyhow::Result<()> {
// Get write txn to wait for ongoing write transaction before snapshot.
let _txn = index.write_txn()?;
index
.env
.copy_to_path(index_path, CompactionOption::Enabled)?;
Ok(())
})
.await
.map_err(|e| IndexError::Error(e.into()))?
.map_err(IndexError::Error)?;
}
Ok(())
}
}

View File

@@ -0,0 +1,139 @@
use std::path::{Path, PathBuf};
use tokio::sync::{mpsc, oneshot};
use uuid::Uuid;
use super::{
IndexActor, IndexActorHandle, IndexMeta, IndexMsg, MapIndexStore, Result, UpdateResult,
};
use crate::index::{Document, SearchQuery, SearchResult, Settings};
use crate::index_controller::IndexSettings;
use crate::index_controller::{updates::Processing, UpdateMeta};
#[derive(Clone)]
pub struct IndexActorHandleImpl {
read_sender: mpsc::Sender<IndexMsg>,
write_sender: mpsc::Sender<IndexMsg>,
}
#[async_trait::async_trait]
impl IndexActorHandle for IndexActorHandleImpl {
async fn create_index(&self, uuid: Uuid, primary_key: Option<String>) -> Result<IndexMeta> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::CreateIndex {
ret,
uuid,
primary_key,
};
let _ = self.read_sender.send(msg).await;
receiver.await.expect("IndexActor has been killed")
}
async fn update(
&self,
meta: Processing<UpdateMeta>,
data: std::fs::File,
) -> anyhow::Result<UpdateResult> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Update { ret, meta, data };
let _ = self.read_sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result<SearchResult> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Search { uuid, query, ret };
let _ = self.read_sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
async fn settings(&self, uuid: Uuid) -> Result<Settings> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Settings { uuid, ret };
let _ = self.read_sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
async fn documents(
&self,
uuid: Uuid,
offset: usize,
limit: usize,
attributes_to_retrieve: Option<Vec<String>>,
) -> Result<Vec<Document>> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Documents {
uuid,
ret,
offset,
attributes_to_retrieve,
limit,
};
let _ = self.read_sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
async fn document(
&self,
uuid: Uuid,
doc_id: String,
attributes_to_retrieve: Option<Vec<String>>,
) -> Result<Document> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Document {
uuid,
ret,
doc_id,
attributes_to_retrieve,
};
let _ = self.read_sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
async fn delete(&self, uuid: Uuid) -> Result<()> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Delete { uuid, ret };
let _ = self.read_sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
async fn get_index_meta(&self, uuid: Uuid) -> Result<IndexMeta> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::GetMeta { uuid, ret };
let _ = self.read_sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
async fn update_index(&self, uuid: Uuid, index_settings: IndexSettings) -> Result<IndexMeta> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::UpdateIndex {
uuid,
index_settings,
ret,
};
let _ = self.read_sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Snapshot { uuid, path, ret };
let _ = self.read_sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
}
impl IndexActorHandleImpl {
pub fn new(path: impl AsRef<Path>, index_size: usize) -> anyhow::Result<Self> {
let (read_sender, read_receiver) = mpsc::channel(100);
let (write_sender, write_receiver) = mpsc::channel(100);
let store = MapIndexStore::new(path, index_size);
let actor = IndexActor::new(read_receiver, write_receiver, store)?;
tokio::task::spawn(actor.run());
Ok(Self {
read_sender,
write_sender,
})
}
}

View File

@@ -0,0 +1,61 @@
use std::path::PathBuf;
use tokio::sync::oneshot;
use uuid::Uuid;
use super::{IndexMeta, IndexSettings, Result, UpdateResult};
use crate::index::{Document, SearchQuery, SearchResult, Settings};
use crate::index_controller::{updates::Processing, UpdateMeta};
pub enum IndexMsg {
CreateIndex {
uuid: Uuid,
primary_key: Option<String>,
ret: oneshot::Sender<Result<IndexMeta>>,
},
Update {
meta: Processing<UpdateMeta>,
data: std::fs::File,
ret: oneshot::Sender<Result<UpdateResult>>,
},
Search {
uuid: Uuid,
query: SearchQuery,
ret: oneshot::Sender<anyhow::Result<SearchResult>>,
},
Settings {
uuid: Uuid,
ret: oneshot::Sender<Result<Settings>>,
},
Documents {
uuid: Uuid,
attributes_to_retrieve: Option<Vec<String>>,
offset: usize,
limit: usize,
ret: oneshot::Sender<Result<Vec<Document>>>,
},
Document {
uuid: Uuid,
attributes_to_retrieve: Option<Vec<String>>,
doc_id: String,
ret: oneshot::Sender<Result<Document>>,
},
Delete {
uuid: Uuid,
ret: oneshot::Sender<Result<()>>,
},
GetMeta {
uuid: Uuid,
ret: oneshot::Sender<Result<IndexMeta>>,
},
UpdateIndex {
uuid: Uuid,
index_settings: IndexSettings,
ret: oneshot::Sender<Result<IndexMeta>>,
},
Snapshot {
uuid: Uuid,
path: PathBuf,
ret: oneshot::Sender<Result<()>>,
},
}

View File

@@ -0,0 +1,101 @@
mod actor;
mod handle_impl;
mod message;
mod store;
use std::path::PathBuf;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use uuid::Uuid;
use super::IndexSettings;
use crate::index::UpdateResult as UResult;
use crate::index::{Document, Index, SearchQuery, SearchResult, Settings};
use crate::index_controller::{
updates::{Failed, Processed, Processing},
UpdateMeta,
};
use actor::IndexActor;
use message::IndexMsg;
use store::{IndexStore, MapIndexStore};
pub use handle_impl::IndexActorHandleImpl;
#[cfg(test)]
use mockall::automock;
pub type Result<T> = std::result::Result<T, IndexError>;
type UpdateResult = std::result::Result<Processed<UpdateMeta, UResult>, Failed<UpdateMeta, String>>;
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct IndexMeta {
created_at: DateTime<Utc>,
updated_at: DateTime<Utc>,
primary_key: Option<String>,
}
impl IndexMeta {
fn new(index: &Index) -> Result<Self> {
let txn = index.read_txn()?;
Self::new_txn(index, &txn)
}
fn new_txn(index: &Index, txn: &heed::RoTxn) -> Result<Self> {
let created_at = index.created_at(&txn)?;
let updated_at = index.updated_at(&txn)?;
let primary_key = index.primary_key(&txn)?.map(String::from);
Ok(Self {
primary_key,
updated_at,
created_at,
})
}
}
#[derive(Error, Debug)]
pub enum IndexError {
#[error("error with index: {0}")]
Error(#[from] anyhow::Error),
#[error("index already exists")]
IndexAlreadyExists,
#[error("Index doesn't exists")]
UnexistingIndex,
#[error("Heed error: {0}")]
HeedError(#[from] heed::Error),
#[error("Existing primary key")]
ExistingPrimaryKey,
}
#[async_trait::async_trait]
#[cfg_attr(test, automock)]
pub trait IndexActorHandle {
async fn create_index(&self, uuid: Uuid, primary_key: Option<String>) -> Result<IndexMeta>;
async fn update(
&self,
meta: Processing<UpdateMeta>,
data: std::fs::File,
) -> anyhow::Result<UpdateResult>;
async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result<SearchResult>;
async fn settings(&self, uuid: Uuid) -> Result<Settings>;
async fn documents(
&self,
uuid: Uuid,
offset: usize,
limit: usize,
attributes_to_retrieve: Option<Vec<String>>,
) -> Result<Vec<Document>>;
async fn document(
&self,
uuid: Uuid,
doc_id: String,
attributes_to_retrieve: Option<Vec<String>>,
) -> Result<Document>;
async fn delete(&self, uuid: Uuid) -> Result<()>;
async fn get_index_meta(&self, uuid: Uuid) -> Result<IndexMeta>;
async fn update_index(&self, uuid: Uuid, index_settings: IndexSettings) -> Result<IndexMeta>;
async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()>;
}

View File

@@ -0,0 +1,105 @@
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use heed::EnvOpenOptions;
use tokio::fs;
use tokio::sync::RwLock;
use tokio::task::spawn_blocking;
use uuid::Uuid;
use super::{IndexError, Result};
use crate::index::Index;
type AsyncMap<K, V> = Arc<RwLock<HashMap<K, V>>>;
#[async_trait::async_trait]
pub trait IndexStore {
async fn create(&self, uuid: Uuid, primary_key: Option<String>) -> Result<Index>;
async fn get(&self, uuid: Uuid) -> Result<Option<Index>>;
async fn delete(&self, uuid: Uuid) -> Result<Option<Index>>;
}
pub struct MapIndexStore {
index_store: AsyncMap<Uuid, Index>,
path: PathBuf,
index_size: usize,
}
impl MapIndexStore {
pub fn new(path: impl AsRef<Path>, index_size: usize) -> Self {
let path = path.as_ref().join("indexes/");
let index_store = Arc::new(RwLock::new(HashMap::new()));
Self {
index_store,
path,
index_size,
}
}
}
#[async_trait::async_trait]
impl IndexStore for MapIndexStore {
async fn create(&self, uuid: Uuid, primary_key: Option<String>) -> Result<Index> {
let path = self.path.join(format!("index-{}", uuid));
if path.exists() {
return Err(IndexError::IndexAlreadyExists);
}
let index_size = self.index_size;
let index = spawn_blocking(move || -> Result<Index> {
let index = open_index(&path, index_size)?;
if let Some(primary_key) = primary_key {
let mut txn = index.write_txn()?;
index.put_primary_key(&mut txn, &primary_key)?;
txn.commit()?;
}
Ok(index)
})
.await
.map_err(|e| IndexError::Error(e.into()))??;
self.index_store.write().await.insert(uuid, index.clone());
Ok(index)
}
async fn get(&self, uuid: Uuid) -> Result<Option<Index>> {
let guard = self.index_store.read().await;
match guard.get(&uuid) {
Some(index) => Ok(Some(index.clone())),
None => {
// drop the guard here so we can perform the write after without deadlocking;
drop(guard);
let path = self.path.join(format!("index-{}", uuid));
if !path.exists() {
return Ok(None);
}
let index_size = self.index_size;
let index = spawn_blocking(move || open_index(path, index_size))
.await
.map_err(|e| IndexError::Error(e.into()))??;
self.index_store.write().await.insert(uuid, index.clone());
Ok(Some(index))
}
}
}
async fn delete(&self, uuid: Uuid) -> Result<Option<Index>> {
let db_path = self.path.join(format!("index-{}", uuid));
fs::remove_dir_all(db_path)
.await
.map_err(|e| IndexError::Error(e.into()))?;
let index = self.index_store.write().await.remove(&uuid);
Ok(index)
}
}
fn open_index(path: impl AsRef<Path>, size: usize) -> Result<Index> {
std::fs::create_dir_all(&path).map_err(|e| IndexError::Error(e.into()))?;
let mut options = EnvOpenOptions::new();
options.map_size(size);
let index = milli::Index::new(options, &path).map_err(IndexError::Error)?;
Ok(Index(Arc::new(index)))
}

View File

@@ -1,7 +1,7 @@
mod index_actor; mod index_actor;
mod snapshot;
mod update_actor; mod update_actor;
mod update_handler; mod update_handler;
mod update_store;
mod updates; mod updates;
mod uuid_resolver; mod uuid_resolver;
@@ -12,6 +12,7 @@ use std::time::Duration;
use actix_web::web::{Bytes, Payload}; use actix_web::web::{Bytes, Payload};
use anyhow::bail; use anyhow::bail;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use log::info;
use milli::update::{IndexDocumentsMethod, UpdateFormat}; use milli::update::{IndexDocumentsMethod, UpdateFormat};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::sync::mpsc; use tokio::sync::mpsc;
@@ -20,6 +21,14 @@ use uuid::Uuid;
use crate::index::{Document, SearchQuery, SearchResult}; use crate::index::{Document, SearchQuery, SearchResult};
use crate::index::{Facets, Settings, UpdateResult}; use crate::index::{Facets, Settings, UpdateResult};
use crate::option::Opt;
use index_actor::IndexActorHandle;
use snapshot::load_snapshot;
use update_actor::UpdateActorHandle;
use uuid_resolver::UuidResolverHandle;
use snapshot::SnapshotService;
pub use updates::{Failed, Processed, Processing}; pub use updates::{Failed, Processed, Processing};
use uuid_resolver::UuidError; use uuid_resolver::UuidError;
@@ -55,24 +64,54 @@ pub struct IndexSettings {
} }
pub struct IndexController { pub struct IndexController {
uuid_resolver: uuid_resolver::UuidResolverHandle, uuid_resolver: uuid_resolver::UuidResolverHandleImpl,
index_handle: index_actor::IndexActorHandle, index_handle: index_actor::IndexActorHandleImpl,
update_handle: update_actor::UpdateActorHandle<Bytes>, update_handle: update_actor::UpdateActorHandleImpl<Bytes>,
} }
impl IndexController { impl IndexController {
pub fn new( pub fn new(path: impl AsRef<Path>, options: &Opt) -> anyhow::Result<Self> {
path: impl AsRef<Path>, let index_size = options.max_mdb_size.get_bytes() as usize;
index_size: usize, let update_store_size = options.max_udb_size.get_bytes() as usize;
update_store_size: usize,
) -> anyhow::Result<Self> { if let Some(ref path) = options.import_snapshot {
let uuid_resolver = uuid_resolver::UuidResolverHandle::new(&path)?; info!("Loading from snapshot {:?}", path);
let index_actor = index_actor::IndexActorHandle::new(&path, index_size)?; load_snapshot(
let update_handle = &options.db_path,
update_actor::UpdateActorHandle::new(index_actor.clone(), &path, update_store_size)?; path,
options.ignore_snapshot_if_db_exists,
options.ignore_missing_snapshot,
)?;
}
std::fs::create_dir_all(&path)?;
let uuid_resolver = uuid_resolver::UuidResolverHandleImpl::new(&path)?;
let index_handle = index_actor::IndexActorHandleImpl::new(&path, index_size)?;
let update_handle = update_actor::UpdateActorHandleImpl::new(
index_handle.clone(),
&path,
update_store_size,
)?;
if options.schedule_snapshot {
let snapshot_service = SnapshotService::new(
uuid_resolver.clone(),
update_handle.clone(),
Duration::from_secs(options.snapshot_interval_sec),
options.snapshot_dir.clone(),
options.db_path
.file_name()
.map(|n| n.to_owned().into_string().expect("invalid path"))
.unwrap_or_else(|| String::from("data.ms")),
);
tokio::task::spawn(snapshot_service.run());
}
Ok(Self { Ok(Self {
uuid_resolver, uuid_resolver,
index_handle: index_actor, index_handle,
update_handle, update_handle,
}) })
} }

View File

@@ -0,0 +1,260 @@
use std::path::{Path, PathBuf};
use std::time::Duration;
use anyhow::bail;
use log::{error, info};
use tokio::fs;
use tokio::task::spawn_blocking;
use tokio::time::sleep;
use super::update_actor::UpdateActorHandle;
use super::uuid_resolver::UuidResolverHandle;
use crate::helpers::compression;
pub struct SnapshotService<U, R> {
uuid_resolver_handle: R,
update_handle: U,
snapshot_period: Duration,
snapshot_path: PathBuf,
db_name: String,
}
impl<U, R> SnapshotService<U, R>
where
U: UpdateActorHandle,
R: UuidResolverHandle,
{
pub fn new(
uuid_resolver_handle: R,
update_handle: U,
snapshot_period: Duration,
snapshot_path: PathBuf,
db_name: String,
) -> Self {
Self {
uuid_resolver_handle,
update_handle,
snapshot_period,
snapshot_path,
db_name,
}
}
pub async fn run(self) {
info!(
"Snapshot scheduled every {}s.",
self.snapshot_period.as_secs()
);
loop {
if let Err(e) = self.perform_snapshot().await {
error!("{}", e);
}
sleep(self.snapshot_period).await;
}
}
async fn perform_snapshot(&self) -> anyhow::Result<()> {
info!("Performing snapshot.");
let snapshot_dir = self.snapshot_path.clone();
fs::create_dir_all(&snapshot_dir).await?;
let temp_snapshot_dir =
spawn_blocking(move || tempfile::tempdir_in(snapshot_dir)).await??;
let temp_snapshot_path = temp_snapshot_dir.path().to_owned();
let uuids = self
.uuid_resolver_handle
.snapshot(temp_snapshot_path.clone())
.await?;
if uuids.is_empty() {
return Ok(());
}
let tasks = uuids
.iter()
.map(|&uuid| {
self.update_handle
.snapshot(uuid, temp_snapshot_path.clone())
})
.collect::<Vec<_>>();
futures::future::try_join_all(tasks).await?;
let snapshot_dir = self.snapshot_path.clone();
let snapshot_path = self
.snapshot_path
.join(format!("{}.snapshot", self.db_name));
let snapshot_path = spawn_blocking(move || -> anyhow::Result<PathBuf> {
let temp_snapshot_file = tempfile::NamedTempFile::new_in(snapshot_dir)?;
let temp_snapshot_file_path = temp_snapshot_file.path().to_owned();
compression::to_tar_gz(temp_snapshot_path, temp_snapshot_file_path)?;
temp_snapshot_file.persist(&snapshot_path)?;
Ok(snapshot_path)
})
.await??;
info!("Created snapshot in {:?}.", snapshot_path);
Ok(())
}
}
pub fn load_snapshot(
db_path: impl AsRef<Path>,
snapshot_path: impl AsRef<Path>,
ignore_snapshot_if_db_exists: bool,
ignore_missing_snapshot: bool,
) -> anyhow::Result<()> {
if !db_path.as_ref().exists() && snapshot_path.as_ref().exists() {
match compression::from_tar_gz(snapshot_path, &db_path) {
Ok(()) => Ok(()),
Err(e) => {
// clean created db folder
std::fs::remove_dir_all(&db_path)?;
Err(e)
}
}
} else if db_path.as_ref().exists() && !ignore_snapshot_if_db_exists {
bail!(
"database already exists at {:?}, try to delete it or rename it",
db_path
.as_ref()
.canonicalize()
.unwrap_or_else(|_| db_path.as_ref().to_owned())
)
} else if !snapshot_path.as_ref().exists() && !ignore_missing_snapshot {
bail!(
"snapshot doesn't exist at {:?}",
snapshot_path
.as_ref()
.canonicalize()
.unwrap_or_else(|_| snapshot_path.as_ref().to_owned())
)
} else {
Ok(())
}
}
#[cfg(test)]
mod test {
use futures::future::{err, ok};
use rand::Rng;
use tokio::time::timeout;
use uuid::Uuid;
use super::*;
use crate::index_controller::update_actor::{MockUpdateActorHandle, UpdateError};
use crate::index_controller::uuid_resolver::{MockUuidResolverHandle, UuidError};
#[actix_rt::test]
async fn test_normal() {
let mut rng = rand::thread_rng();
let uuids_num = rng.gen_range(5, 10);
let uuids = (0..uuids_num).map(|_| Uuid::new_v4()).collect::<Vec<_>>();
let mut uuid_resolver = MockUuidResolverHandle::new();
let uuids_clone = uuids.clone();
uuid_resolver
.expect_snapshot()
.times(1)
.returning(move |_| Box::pin(ok(uuids_clone.clone())));
let mut update_handle = MockUpdateActorHandle::new();
let uuids_clone = uuids.clone();
update_handle
.expect_snapshot()
.withf(move |uuid, _path| uuids_clone.contains(uuid))
.times(uuids_num)
.returning(move |_, _| Box::pin(ok(())));
let snapshot_path = tempfile::tempdir_in(".").unwrap();
let snapshot_service = SnapshotService::new(
uuid_resolver,
update_handle,
Duration::from_millis(100),
snapshot_path.path().to_owned(),
"data.ms".to_string(),
);
snapshot_service.perform_snapshot().await.unwrap();
}
#[actix_rt::test]
async fn error_performing_uuid_snapshot() {
let mut uuid_resolver = MockUuidResolverHandle::new();
uuid_resolver
.expect_snapshot()
.times(1)
// abitrary error
.returning(|_| Box::pin(err(UuidError::NameAlreadyExist)));
let update_handle = MockUpdateActorHandle::new();
let snapshot_path = tempfile::tempdir_in(".").unwrap();
let snapshot_service = SnapshotService::new(
uuid_resolver,
update_handle,
Duration::from_millis(100),
snapshot_path.path().to_owned(),
"data.ms".to_string(),
);
assert!(snapshot_service.perform_snapshot().await.is_err());
// Nothing was written to the file
assert!(!snapshot_path.path().join("data.ms.snapshot").exists());
}
#[actix_rt::test]
async fn error_performing_index_snapshot() {
let uuid = Uuid::new_v4();
let mut uuid_resolver = MockUuidResolverHandle::new();
uuid_resolver
.expect_snapshot()
.times(1)
.returning(move |_| Box::pin(ok(vec![uuid])));
let mut update_handle = MockUpdateActorHandle::new();
update_handle
.expect_snapshot()
// abitrary error
.returning(|_, _| Box::pin(err(UpdateError::UnexistingUpdate(0))));
let snapshot_path = tempfile::tempdir_in(".").unwrap();
let snapshot_service = SnapshotService::new(
uuid_resolver,
update_handle,
Duration::from_millis(100),
snapshot_path.path().to_owned(),
"data.ms".to_string(),
);
assert!(snapshot_service.perform_snapshot().await.is_err());
// Nothing was written to the file
assert!(!snapshot_path.path().join("data.ms.snapshot").exists());
}
#[actix_rt::test]
async fn test_loop() {
let mut uuid_resolver = MockUuidResolverHandle::new();
uuid_resolver
.expect_snapshot()
// we expect the funtion to be called between 2 and 3 time in the given interval.
.times(2..4)
// abitrary error, to short-circuit the function
.returning(move |_| Box::pin(err(UuidError::NameAlreadyExist)));
let update_handle = MockUpdateActorHandle::new();
let snapshot_path = tempfile::tempdir_in(".").unwrap();
let snapshot_service = SnapshotService::new(
uuid_resolver,
update_handle,
Duration::from_millis(100),
snapshot_path.path().to_owned(),
"data.ms".to_string(),
);
let _ = timeout(Duration::from_millis(300), snapshot_service.run()).await;
}
}

View File

@@ -1,399 +0,0 @@
use std::collections::{hash_map::Entry, HashMap};
use std::io::SeekFrom;
use std::fs::{create_dir_all, remove_dir_all};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use log::info;
use oxidized_json_checker::JsonChecker;
use super::index_actor::IndexActorHandle;
use thiserror::Error;
use tokio::fs::OpenOptions;
use tokio::io::{AsyncWriteExt, AsyncSeekExt};
use tokio::sync::{mpsc, oneshot, RwLock};
use uuid::Uuid;
use super::get_arc_ownership_blocking;
use crate::index::UpdateResult;
use crate::index_controller::{UpdateMeta, UpdateStatus};
pub type Result<T> = std::result::Result<T, UpdateError>;
type UpdateStore = super::update_store::UpdateStore<UpdateMeta, UpdateResult, String>;
type PayloadData<D> = std::result::Result<D, Box<dyn std::error::Error + Sync + Send + 'static>>;
#[derive(Debug, Error)]
pub enum UpdateError {
#[error("error with update: {0}")]
Error(Box<dyn std::error::Error + Sync + Send + 'static>),
#[error("Index {0} doesn't exist.")]
UnexistingIndex(Uuid),
#[error("Update {0} doesn't exist.")]
UnexistingUpdate(u64),
}
enum UpdateMsg<D> {
Update {
uuid: Uuid,
meta: UpdateMeta,
data: mpsc::Receiver<PayloadData<D>>,
ret: oneshot::Sender<Result<UpdateStatus>>,
},
ListUpdates {
uuid: Uuid,
ret: oneshot::Sender<Result<Vec<UpdateStatus>>>,
},
GetUpdate {
uuid: Uuid,
ret: oneshot::Sender<Result<UpdateStatus>>,
id: u64,
},
Delete {
uuid: Uuid,
ret: oneshot::Sender<Result<()>>,
},
Create {
uuid: Uuid,
ret: oneshot::Sender<Result<()>>,
},
}
struct UpdateActor<D, S> {
path: PathBuf,
store: S,
inbox: mpsc::Receiver<UpdateMsg<D>>,
}
#[async_trait::async_trait]
trait UpdateStoreStore {
async fn get_or_create(&self, uuid: Uuid) -> Result<Arc<UpdateStore>>;
async fn delete(&self, uuid: Uuid) -> Result<Option<Arc<UpdateStore>>>;
async fn get(&self, uuid: Uuid) -> Result<Option<Arc<UpdateStore>>>;
}
impl<D, S> UpdateActor<D, S>
where
D: AsRef<[u8]> + Sized + 'static,
S: UpdateStoreStore,
{
fn new(
store: S,
inbox: mpsc::Receiver<UpdateMsg<D>>,
path: impl AsRef<Path>,
) -> anyhow::Result<Self> {
let path = path.as_ref().to_owned().join("update_files");
create_dir_all(&path)?;
assert!(path.exists());
Ok(Self { store, inbox, path })
}
async fn run(mut self) {
use UpdateMsg::*;
info!("Started update actor.");
loop {
match self.inbox.recv().await {
Some(Update {
uuid,
meta,
data,
ret,
}) => {
let _ = ret.send(self.handle_update(uuid, meta, data).await);
}
Some(ListUpdates { uuid, ret }) => {
let _ = ret.send(self.handle_list_updates(uuid).await);
}
Some(GetUpdate { uuid, ret, id }) => {
let _ = ret.send(self.handle_get_update(uuid, id).await);
}
Some(Delete { uuid, ret }) => {
let _ = ret.send(self.handle_delete(uuid).await);
}
Some(Create { uuid, ret }) => {
let _ = ret.send(self.handle_create(uuid).await);
}
None => break,
}
}
}
async fn handle_update(
&self,
uuid: Uuid,
meta: UpdateMeta,
mut payload: mpsc::Receiver<PayloadData<D>>,
) -> Result<UpdateStatus> {
let update_store = self.store.get_or_create(uuid).await?;
let update_file_id = uuid::Uuid::new_v4();
let path = self.path.join(format!("update_{}", update_file_id));
let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(&path)
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?;
while let Some(bytes) = payload.recv().await {
match bytes {
Ok(bytes) => {
file.write_all(bytes.as_ref())
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?;
}
Err(e) => {
return Err(UpdateError::Error(e));
}
}
}
file.flush()
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?;
file.seek(SeekFrom::Start(0))
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?;
let mut file = file.into_std().await;
tokio::task::spawn_blocking(move || {
use std::io::{BufReader, sink, copy, Seek};
// If the payload is empty, ignore the check.
if file.metadata().map_err(|e| UpdateError::Error(Box::new(e)))?.len() > 0 {
// Check that the json payload is valid:
let reader = BufReader::new(&mut file);
let mut checker = JsonChecker::new(reader);
if copy(&mut checker, &mut sink()).is_err() || checker.finish().is_err() {
// The json file is invalid, we use Serde to get a nice error message:
file.seek(SeekFrom::Start(0))
.map_err(|e| UpdateError::Error(Box::new(e)))?;
let _: serde_json::Value = serde_json::from_reader(file)
.map_err(|e| UpdateError::Error(Box::new(e)))?;
}
}
// The payload is valid, we can register it to the update store.
update_store
.register_update(meta, path, uuid)
.map(UpdateStatus::Pending)
.map_err(|e| UpdateError::Error(Box::new(e)))
})
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?
}
async fn handle_list_updates(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>> {
let update_store = self.store.get(uuid).await?;
tokio::task::spawn_blocking(move || {
let result = update_store
.ok_or(UpdateError::UnexistingIndex(uuid))?
.list()
.map_err(|e| UpdateError::Error(e.into()))?;
Ok(result)
})
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?
}
async fn handle_get_update(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus> {
let store = self
.store
.get(uuid)
.await?
.ok_or(UpdateError::UnexistingIndex(uuid))?;
let result = store
.meta(id)
.map_err(|e| UpdateError::Error(Box::new(e)))?
.ok_or(UpdateError::UnexistingUpdate(id))?;
Ok(result)
}
async fn handle_delete(&self, uuid: Uuid) -> Result<()> {
let store = self.store.delete(uuid).await?;
if let Some(store) = store {
tokio::task::spawn(async move {
let store = get_arc_ownership_blocking(store).await;
tokio::task::spawn_blocking(move || {
store.prepare_for_closing().wait();
info!("Update store {} was closed.", uuid);
});
});
}
Ok(())
}
async fn handle_create(&self, uuid: Uuid) -> Result<()> {
let _ = self.store.get_or_create(uuid).await?;
Ok(())
}
}
#[derive(Clone)]
pub struct UpdateActorHandle<D> {
sender: mpsc::Sender<UpdateMsg<D>>,
}
impl<D> UpdateActorHandle<D>
where
D: AsRef<[u8]> + Sized + 'static + Sync + Send,
{
pub fn new(
index_handle: IndexActorHandle,
path: impl AsRef<Path>,
update_store_size: usize,
) -> anyhow::Result<Self> {
let path = path.as_ref().to_owned().join("updates");
let (sender, receiver) = mpsc::channel(100);
let store = MapUpdateStoreStore::new(index_handle, &path, update_store_size);
let actor = UpdateActor::new(store, receiver, path)?;
tokio::task::spawn(actor.run());
Ok(Self { sender })
}
pub async fn update(
&self,
meta: UpdateMeta,
data: mpsc::Receiver<PayloadData<D>>,
uuid: Uuid,
) -> Result<UpdateStatus> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::Update {
uuid,
data,
meta,
ret,
};
let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.")
}
pub async fn get_all_updates_status(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::ListUpdates { uuid, ret };
let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.")
}
pub async fn update_status(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::GetUpdate { uuid, id, ret };
let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.")
}
pub async fn delete(&self, uuid: Uuid) -> Result<()> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::Delete { uuid, ret };
let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.")
}
pub async fn create(&self, uuid: Uuid) -> Result<()> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::Create { uuid, ret };
let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.")
}
}
struct MapUpdateStoreStore {
db: Arc<RwLock<HashMap<Uuid, Arc<UpdateStore>>>>,
index_handle: IndexActorHandle,
path: PathBuf,
update_store_size: usize,
}
impl MapUpdateStoreStore {
fn new(
index_handle: IndexActorHandle,
path: impl AsRef<Path>,
update_store_size: usize,
) -> Self {
let db = Arc::new(RwLock::new(HashMap::new()));
let path = path.as_ref().to_owned();
Self {
db,
index_handle,
path,
update_store_size,
}
}
}
#[async_trait::async_trait]
impl UpdateStoreStore for MapUpdateStoreStore {
async fn get_or_create(&self, uuid: Uuid) -> Result<Arc<UpdateStore>> {
match self.db.write().await.entry(uuid) {
Entry::Vacant(e) => {
let mut options = heed::EnvOpenOptions::new();
let update_store_size = self.update_store_size;
options.map_size(update_store_size);
let path = self.path.clone().join(format!("updates-{}", e.key()));
create_dir_all(&path).unwrap();
let index_handle = self.index_handle.clone();
let store = UpdateStore::open(options, &path, move |meta, file| {
futures::executor::block_on(index_handle.update(meta, file))
})
.map_err(|e| UpdateError::Error(e.into()))?;
let store = e.insert(store);
Ok(store.clone())
}
Entry::Occupied(e) => Ok(e.get().clone()),
}
}
async fn get(&self, uuid: Uuid) -> Result<Option<Arc<UpdateStore>>> {
let guard = self.db.read().await;
match guard.get(&uuid) {
Some(uuid) => Ok(Some(uuid.clone())),
None => {
// The index is not found in the found in the loaded indexes, so we attempt to load
// it from disk. We need to acquire a write lock **before** attempting to open the
// index, because someone could be trying to open it at the same time as us.
drop(guard);
let path = self.path.clone().join(format!("updates-{}", uuid));
if path.exists() {
let mut guard = self.db.write().await;
match guard.entry(uuid) {
Entry::Vacant(entry) => {
// We can safely load the index
let index_handle = self.index_handle.clone();
let mut options = heed::EnvOpenOptions::new();
let update_store_size = self.update_store_size;
options.map_size(update_store_size);
let store = UpdateStore::open(options, &path, move |meta, file| {
futures::executor::block_on(index_handle.update(meta, file))
})
.map_err(|e| UpdateError::Error(e.into()))?;
let store = entry.insert(store);
Ok(Some(store.clone()))
}
Entry::Occupied(entry) => {
// The index was loaded while we attempted to to iter
Ok(Some(entry.get().clone()))
}
}
} else {
Ok(None)
}
}
}
}
async fn delete(&self, uuid: Uuid) -> Result<Option<Arc<UpdateStore>>> {
let store = self.db.write().await.remove(&uuid);
let path = self.path.clone().join(format!("updates-{}", uuid));
if store.is_some() || path.exists() {
remove_dir_all(path).unwrap();
}
Ok(store)
}
}

View File

@@ -0,0 +1,226 @@
use std::io::SeekFrom;
use std::path::{Path, PathBuf};
use log::info;
use oxidized_json_checker::JsonChecker;
use tokio::fs;
use tokio::io::{AsyncSeekExt, AsyncWriteExt};
use tokio::sync::mpsc;
use uuid::Uuid;
use super::{PayloadData, Result, UpdateError, UpdateMsg, UpdateStoreStore};
use crate::index_controller::index_actor::IndexActorHandle;
use crate::index_controller::{get_arc_ownership_blocking, UpdateMeta, UpdateStatus};
pub struct UpdateActor<D, S, I> {
path: PathBuf,
store: S,
inbox: mpsc::Receiver<UpdateMsg<D>>,
index_handle: I,
}
impl<D, S, I> UpdateActor<D, S, I>
where
D: AsRef<[u8]> + Sized + 'static,
S: UpdateStoreStore,
I: IndexActorHandle + Clone + Send + Sync + 'static,
{
pub fn new(
store: S,
inbox: mpsc::Receiver<UpdateMsg<D>>,
path: impl AsRef<Path>,
index_handle: I,
) -> anyhow::Result<Self> {
let path = path.as_ref().to_owned();
std::fs::create_dir_all(path.join("update_files"))?;
assert!(path.exists());
Ok(Self {
store,
inbox,
path,
index_handle,
})
}
pub async fn run(mut self) {
use UpdateMsg::*;
info!("Started update actor.");
loop {
match self.inbox.recv().await {
Some(Update {
uuid,
meta,
data,
ret,
}) => {
let _ = ret.send(self.handle_update(uuid, meta, data).await);
}
Some(ListUpdates { uuid, ret }) => {
let _ = ret.send(self.handle_list_updates(uuid).await);
}
Some(GetUpdate { uuid, ret, id }) => {
let _ = ret.send(self.handle_get_update(uuid, id).await);
}
Some(Delete { uuid, ret }) => {
let _ = ret.send(self.handle_delete(uuid).await);
}
Some(Create { uuid, ret }) => {
let _ = ret.send(self.handle_create(uuid).await);
}
Some(Snapshot { uuid, path, ret }) => {
let _ = ret.send(self.handle_snapshot(uuid, path).await);
}
None => break,
}
}
}
async fn handle_update(
&self,
uuid: Uuid,
meta: UpdateMeta,
mut payload: mpsc::Receiver<PayloadData<D>>,
) -> Result<UpdateStatus> {
let update_store = self.store.get_or_create(uuid).await?;
let update_file_id = uuid::Uuid::new_v4();
let path = self
.path
.join(format!("update_files/update_{}", update_file_id));
let mut file = fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(&path)
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?;
while let Some(bytes) = payload.recv().await {
match bytes {
Ok(bytes) => {
file.write_all(bytes.as_ref())
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?;
}
Err(e) => {
return Err(UpdateError::Error(e));
}
}
}
file.flush()
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?;
file.seek(SeekFrom::Start(0))
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?;
let mut file = file.into_std().await;
tokio::task::spawn_blocking(move || {
use std::io::{copy, sink, BufReader, Seek};
// If the payload is empty, ignore the check.
if file
.metadata()
.map_err(|e| UpdateError::Error(Box::new(e)))?
.len()
> 0
{
// Check that the json payload is valid:
let reader = BufReader::new(&mut file);
let mut checker = JsonChecker::new(reader);
if copy(&mut checker, &mut sink()).is_err() || checker.finish().is_err() {
// The json file is invalid, we use Serde to get a nice error message:
file.seek(SeekFrom::Start(0))
.map_err(|e| UpdateError::Error(Box::new(e)))?;
let _: serde_json::Value = serde_json::from_reader(file)
.map_err(|e| UpdateError::Error(Box::new(e)))?;
}
}
// The payload is valid, we can register it to the update store.
update_store
.register_update(meta, path, uuid)
.map(UpdateStatus::Pending)
.map_err(|e| UpdateError::Error(Box::new(e)))
})
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?
}
async fn handle_list_updates(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>> {
let update_store = self.store.get(uuid).await?;
tokio::task::spawn_blocking(move || {
let result = update_store
.ok_or(UpdateError::UnexistingIndex(uuid))?
.list()
.map_err(|e| UpdateError::Error(e.into()))?;
Ok(result)
})
.await
.map_err(|e| UpdateError::Error(Box::new(e)))?
}
async fn handle_get_update(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus> {
let store = self
.store
.get(uuid)
.await?
.ok_or(UpdateError::UnexistingIndex(uuid))?;
let result = store
.meta(id)
.map_err(|e| UpdateError::Error(Box::new(e)))?
.ok_or(UpdateError::UnexistingUpdate(id))?;
Ok(result)
}
async fn handle_delete(&self, uuid: Uuid) -> Result<()> {
let store = self.store.delete(uuid).await?;
if let Some(store) = store {
tokio::task::spawn(async move {
let store = get_arc_ownership_blocking(store).await;
tokio::task::spawn_blocking(move || {
store.prepare_for_closing().wait();
info!("Update store {} was closed.", uuid);
});
});
}
Ok(())
}
async fn handle_create(&self, uuid: Uuid) -> Result<()> {
let _ = self.store.get_or_create(uuid).await?;
Ok(())
}
async fn handle_snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> {
let index_handle = self.index_handle.clone();
if let Some(update_store) = self.store.get(uuid).await? {
tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
// acquire write lock to prevent further writes during snapshot
// the update lock must be acquired BEFORE the write lock to prevent dead lock
let _lock = update_store.update_lock.lock();
let mut txn = update_store.env.write_txn()?;
// create db snapshot
update_store.snapshot(&mut txn, &path, uuid)?;
futures::executor::block_on(
async move { index_handle.snapshot(uuid, path).await },
)?;
Ok(())
})
.await
.map_err(|e| UpdateError::Error(e.into()))?
.map_err(|e| UpdateError::Error(e.into()))?;
}
Ok(())
}
}

View File

@@ -0,0 +1,96 @@
use std::path::{Path, PathBuf};
use tokio::sync::{mpsc, oneshot};
use uuid::Uuid;
use super::{
MapUpdateStoreStore, PayloadData, Result, UpdateActor, UpdateActorHandle, UpdateMeta,
UpdateMsg, UpdateStatus,
};
use crate::index_controller::IndexActorHandle;
#[derive(Clone)]
pub struct UpdateActorHandleImpl<D> {
sender: mpsc::Sender<UpdateMsg<D>>,
}
impl<D> UpdateActorHandleImpl<D>
where
D: AsRef<[u8]> + Sized + 'static + Sync + Send,
{
pub fn new<I>(
index_handle: I,
path: impl AsRef<Path>,
update_store_size: usize,
) -> anyhow::Result<Self>
where
I: IndexActorHandle + Clone + Send + Sync + 'static,
{
let path = path.as_ref().to_owned().join("updates");
let (sender, receiver) = mpsc::channel(100);
let store = MapUpdateStoreStore::new(index_handle.clone(), &path, update_store_size);
let actor = UpdateActor::new(store, receiver, path, index_handle)?;
tokio::task::spawn(actor.run());
Ok(Self { sender })
}
}
#[async_trait::async_trait]
impl<D> UpdateActorHandle for UpdateActorHandleImpl<D>
where
D: AsRef<[u8]> + Sized + 'static + Sync + Send,
{
type Data = D;
async fn update(
&self,
meta: UpdateMeta,
data: mpsc::Receiver<PayloadData<Self::Data>>,
uuid: Uuid,
) -> Result<UpdateStatus> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::Update {
uuid,
data,
meta,
ret,
};
let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.")
}
async fn get_all_updates_status(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::ListUpdates { uuid, ret };
let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.")
}
async fn update_status(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::GetUpdate { uuid, id, ret };
let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.")
}
async fn delete(&self, uuid: Uuid) -> Result<()> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::Delete { uuid, ret };
let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.")
}
async fn create(&self, uuid: Uuid) -> Result<()> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::Create { uuid, ret };
let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.")
}
async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::Snapshot { uuid, path, ret };
let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.")
}
}

View File

@@ -0,0 +1,37 @@
use std::path::PathBuf;
use tokio::sync::{mpsc, oneshot};
use uuid::Uuid;
use super::{PayloadData, Result, UpdateMeta, UpdateStatus};
pub enum UpdateMsg<D> {
Update {
uuid: Uuid,
meta: UpdateMeta,
data: mpsc::Receiver<PayloadData<D>>,
ret: oneshot::Sender<Result<UpdateStatus>>,
},
ListUpdates {
uuid: Uuid,
ret: oneshot::Sender<Result<Vec<UpdateStatus>>>,
},
GetUpdate {
uuid: Uuid,
ret: oneshot::Sender<Result<UpdateStatus>>,
id: u64,
},
Delete {
uuid: Uuid,
ret: oneshot::Sender<Result<()>>,
},
Create {
uuid: Uuid,
ret: oneshot::Sender<Result<()>>,
},
Snapshot {
uuid: Uuid,
path: PathBuf,
ret: oneshot::Sender<Result<()>>,
},
}

View File

@@ -0,0 +1,55 @@
mod actor;
mod handle_impl;
mod message;
mod store;
mod update_store;
use std::path::PathBuf;
use thiserror::Error;
use tokio::sync::mpsc;
use uuid::Uuid;
use crate::index::UpdateResult;
use crate::index_controller::{UpdateMeta, UpdateStatus};
use actor::UpdateActor;
use message::UpdateMsg;
use store::{MapUpdateStoreStore, UpdateStoreStore};
pub use handle_impl::UpdateActorHandleImpl;
pub type Result<T> = std::result::Result<T, UpdateError>;
type UpdateStore = update_store::UpdateStore<UpdateMeta, UpdateResult, String>;
type PayloadData<D> = std::result::Result<D, Box<dyn std::error::Error + Sync + Send + 'static>>;
#[cfg(test)]
use mockall::automock;
#[derive(Debug, Error)]
pub enum UpdateError {
#[error("error with update: {0}")]
Error(Box<dyn std::error::Error + Sync + Send + 'static>),
#[error("Index {0} doesn't exist.")]
UnexistingIndex(Uuid),
#[error("Update {0} doesn't exist.")]
UnexistingUpdate(u64),
}
#[async_trait::async_trait]
#[cfg_attr(test, automock(type Data=Vec<u8>;))]
pub trait UpdateActorHandle {
type Data: AsRef<[u8]> + Sized + 'static + Sync + Send;
async fn get_all_updates_status(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>>;
async fn update_status(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus>;
async fn delete(&self, uuid: Uuid) -> Result<()>;
async fn create(&self, uuid: Uuid) -> Result<()>;
async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()>;
async fn update(
&self,
meta: UpdateMeta,
data: mpsc::Receiver<PayloadData<Self::Data>>,
uuid: Uuid,
) -> Result<UpdateStatus>;
}

View File

@@ -0,0 +1,111 @@
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::fs;
use tokio::sync::RwLock;
use uuid::Uuid;
use super::{Result, UpdateError, UpdateStore};
use crate::index_controller::IndexActorHandle;
#[async_trait::async_trait]
pub trait UpdateStoreStore {
async fn get_or_create(&self, uuid: Uuid) -> Result<Arc<UpdateStore>>;
async fn delete(&self, uuid: Uuid) -> Result<Option<Arc<UpdateStore>>>;
async fn get(&self, uuid: Uuid) -> Result<Option<Arc<UpdateStore>>>;
}
pub struct MapUpdateStoreStore<I> {
db: Arc<RwLock<HashMap<Uuid, Arc<UpdateStore>>>>,
index_handle: I,
path: PathBuf,
update_store_size: usize,
}
impl<I: IndexActorHandle> MapUpdateStoreStore<I> {
pub fn new(index_handle: I, path: impl AsRef<Path>, update_store_size: usize) -> Self {
let db = Arc::new(RwLock::new(HashMap::new()));
let path = path.as_ref().to_owned();
Self {
db,
index_handle,
path,
update_store_size,
}
}
}
#[async_trait::async_trait]
impl<I> UpdateStoreStore for MapUpdateStoreStore<I>
where
I: IndexActorHandle + Clone + Send + Sync + 'static,
{
async fn get_or_create(&self, uuid: Uuid) -> Result<Arc<UpdateStore>> {
match self.db.write().await.entry(uuid) {
Entry::Vacant(e) => {
let mut options = heed::EnvOpenOptions::new();
let update_store_size = self.update_store_size;
options.map_size(update_store_size);
let path = self.path.clone().join(format!("updates-{}", e.key()));
fs::create_dir_all(&path).await.unwrap();
let index_handle = self.index_handle.clone();
let store = UpdateStore::open(options, &path, move |meta, file| {
futures::executor::block_on(index_handle.update(meta, file))
})
.map_err(|e| UpdateError::Error(e.into()))?;
let store = e.insert(store);
Ok(store.clone())
}
Entry::Occupied(e) => Ok(e.get().clone()),
}
}
async fn get(&self, uuid: Uuid) -> Result<Option<Arc<UpdateStore>>> {
let guard = self.db.read().await;
match guard.get(&uuid) {
Some(uuid) => Ok(Some(uuid.clone())),
None => {
// The index is not found in the found in the loaded indexes, so we attempt to load
// it from disk. We need to acquire a write lock **before** attempting to open the
// index, because someone could be trying to open it at the same time as us.
drop(guard);
let path = self.path.clone().join(format!("updates-{}", uuid));
if path.exists() {
let mut guard = self.db.write().await;
match guard.entry(uuid) {
Entry::Vacant(entry) => {
// We can safely load the index
let index_handle = self.index_handle.clone();
let mut options = heed::EnvOpenOptions::new();
let update_store_size = self.update_store_size;
options.map_size(update_store_size);
let store = UpdateStore::open(options, &path, move |meta, file| {
futures::executor::block_on(index_handle.update(meta, file))
})
.map_err(|e| UpdateError::Error(e.into()))?;
let store = entry.insert(store);
Ok(Some(store.clone()))
}
Entry::Occupied(entry) => {
// The index was loaded while we attempted to to iter
Ok(Some(entry.get().clone()))
}
}
} else {
Ok(None)
}
}
}
}
async fn delete(&self, uuid: Uuid) -> Result<Option<Arc<UpdateStore>>> {
let store = self.db.write().await.remove(&uuid);
let path = self.path.clone().join(format!("updates-{}", uuid));
if store.is_some() || path.exists() {
fs::remove_dir_all(path).await.unwrap();
}
Ok(store)
}
}

View File

@@ -1,10 +1,10 @@
use std::fs::remove_file; use std::fs::{copy, create_dir_all, remove_file};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use heed::types::{DecodeIgnore, OwnedType, SerdeJson}; use heed::types::{DecodeIgnore, OwnedType, SerdeJson};
use heed::{Database, Env, EnvOpenOptions}; use heed::{CompactionOption, Database, Env, EnvOpenOptions};
use parking_lot::RwLock; use parking_lot::{Mutex, RwLock};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::fs::File; use std::fs::File;
use tokio::sync::mpsc; use tokio::sync::mpsc;
@@ -16,7 +16,7 @@ type BEU64 = heed::zerocopy::U64<heed::byteorder::BE>;
#[derive(Clone)] #[derive(Clone)]
pub struct UpdateStore<M, N, E> { pub struct UpdateStore<M, N, E> {
env: Env, pub env: Env,
pending_meta: Database<OwnedType<BEU64>, SerdeJson<Pending<M>>>, pending_meta: Database<OwnedType<BEU64>, SerdeJson<Pending<M>>>,
pending: Database<OwnedType<BEU64>, SerdeJson<PathBuf>>, pending: Database<OwnedType<BEU64>, SerdeJson<PathBuf>>,
processed_meta: Database<OwnedType<BEU64>, SerdeJson<Processed<M, N>>>, processed_meta: Database<OwnedType<BEU64>, SerdeJson<Processed<M, N>>>,
@@ -24,6 +24,9 @@ pub struct UpdateStore<M, N, E> {
aborted_meta: Database<OwnedType<BEU64>, SerdeJson<Aborted<M>>>, aborted_meta: Database<OwnedType<BEU64>, SerdeJson<Aborted<M>>>,
processing: Arc<RwLock<Option<Processing<M>>>>, processing: Arc<RwLock<Option<Processing<M>>>>,
notification_sender: mpsc::Sender<()>, notification_sender: mpsc::Sender<()>,
/// A lock on the update loop. This is meant to prevent a snapshot to occur while an update is
/// processing, while not preventing writes all together during an update
pub update_lock: Arc<Mutex<()>>,
} }
pub trait HandleUpdate<M, N, E> { pub trait HandleUpdate<M, N, E> {
@@ -76,6 +79,8 @@ where
// Send a first notification to trigger the process. // Send a first notification to trigger the process.
let _ = notification_sender.send(()); let _ = notification_sender.send(());
let update_lock = Arc::new(Mutex::new(()));
let update_store = Arc::new(UpdateStore { let update_store = Arc::new(UpdateStore {
env, env,
pending, pending,
@@ -85,6 +90,7 @@ where
notification_sender, notification_sender,
failed_meta, failed_meta,
processing, processing,
update_lock,
}); });
// We need a weak reference so we can take ownership on the arc later when we // We need a weak reference so we can take ownership on the arc later when we
@@ -190,6 +196,7 @@ where
where where
U: HandleUpdate<M, N, E>, U: HandleUpdate<M, N, E>,
{ {
let _lock = self.update_lock.lock();
// Create a read transaction to be able to retrieve the pending update in order. // Create a read transaction to be able to retrieve the pending update in order.
let rtxn = self.env.read_txn()?; let rtxn = self.env.read_txn()?;
let first_meta = self.pending_meta.first(&rtxn)?; let first_meta = self.pending_meta.first(&rtxn)?;
@@ -371,94 +378,35 @@ where
Ok(aborted_updates) Ok(aborted_updates)
} }
pub fn snapshot(
&self,
txn: &mut heed::RwTxn,
path: impl AsRef<Path>,
uuid: Uuid,
) -> anyhow::Result<()> {
let update_path = path.as_ref().join("updates");
create_dir_all(&update_path)?;
let mut snapshot_path = update_path.join(format!("update-{}", uuid));
// acquire write lock to prevent further writes during snapshot
create_dir_all(&snapshot_path)?;
snapshot_path.push("data.mdb");
// create db snapshot
self.env
.copy_to_path(&snapshot_path, CompactionOption::Enabled)?;
let update_files_path = update_path.join("update_files");
create_dir_all(&update_files_path)?;
for path in self.pending.iter(&txn)? {
let (_, path) = path?;
let name = path.file_name().unwrap();
let to = update_files_path.join(name);
copy(path, to)?;
}
Ok(())
}
} }
//#[cfg(test)]
//mod tests {
//use super::*;
//use std::thread;
//use std::time::{Duration, Instant};
//#[test]
//fn simple() {
//let dir = tempfile::tempdir().unwrap();
//let mut options = EnvOpenOptions::new();
//options.map_size(4096 * 100);
//let update_store = UpdateStore::open(
//options,
//dir,
//|meta: Processing<String>, _content: &_| -> Result<_, Failed<_, ()>> {
//let new_meta = meta.meta().to_string() + " processed";
//let processed = meta.process(new_meta);
//Ok(processed)
//},
//)
//.unwrap();
//let meta = String::from("kiki");
//let update = update_store.register_update(meta, &[]).unwrap();
//thread::sleep(Duration::from_millis(100));
//let meta = update_store.meta(update.id()).unwrap().unwrap();
//if let UpdateStatus::Processed(Processed { success, .. }) = meta {
//assert_eq!(success, "kiki processed");
//} else {
//panic!()
//}
//}
//#[test]
//#[ignore]
//fn long_running_update() {
//let dir = tempfile::tempdir().unwrap();
//let mut options = EnvOpenOptions::new();
//options.map_size(4096 * 100);
//let update_store = UpdateStore::open(
//options,
//dir,
//|meta: Processing<String>, _content: &_| -> Result<_, Failed<_, ()>> {
//thread::sleep(Duration::from_millis(400));
//let new_meta = meta.meta().to_string() + "processed";
//let processed = meta.process(new_meta);
//Ok(processed)
//},
//)
//.unwrap();
//let before_register = Instant::now();
//let meta = String::from("kiki");
//let update_kiki = update_store.register_update(meta, &[]).unwrap();
//assert!(before_register.elapsed() < Duration::from_millis(200));
//let meta = String::from("coco");
//let update_coco = update_store.register_update(meta, &[]).unwrap();
//assert!(before_register.elapsed() < Duration::from_millis(200));
//let meta = String::from("cucu");
//let update_cucu = update_store.register_update(meta, &[]).unwrap();
//assert!(before_register.elapsed() < Duration::from_millis(200));
//thread::sleep(Duration::from_millis(400 * 3 + 100));
//let meta = update_store.meta(update_kiki.id()).unwrap().unwrap();
//if let UpdateStatus::Processed(Processed { success, .. }) = meta {
//assert_eq!(success, "kiki processed");
//} else {
//panic!()
//}
//let meta = update_store.meta(update_coco.id()).unwrap().unwrap();
//if let UpdateStatus::Processed(Processed { success, .. }) = meta {
//assert_eq!(success, "coco processed");
//} else {
//panic!()
//}
//let meta = update_store.meta(update_cucu.id()).unwrap().unwrap();
//if let UpdateStatus::Processed(Processed { success, .. }) = meta {
//assert_eq!(success, "cucu processed");
//} else {
//panic!()
//}
//}
//}

View File

@@ -1,310 +0,0 @@
use std::{fs::create_dir_all, path::Path};
use heed::{
types::{ByteSlice, Str},
Database, Env, EnvOpenOptions,
};
use log::{info, warn};
use thiserror::Error;
use tokio::sync::{mpsc, oneshot};
use uuid::Uuid;
pub type Result<T> = std::result::Result<T, UuidError>;
#[derive(Debug)]
enum UuidResolveMsg {
Get {
uid: String,
ret: oneshot::Sender<Result<Uuid>>,
},
Create {
uid: String,
ret: oneshot::Sender<Result<Uuid>>,
},
Delete {
uid: String,
ret: oneshot::Sender<Result<Uuid>>,
},
List {
ret: oneshot::Sender<Result<Vec<(String, Uuid)>>>,
},
Insert {
uuid: Uuid,
name: String,
ret: oneshot::Sender<Result<()>>,
}
}
struct UuidResolverActor<S> {
inbox: mpsc::Receiver<UuidResolveMsg>,
store: S,
}
impl<S: UuidStore> UuidResolverActor<S> {
fn new(inbox: mpsc::Receiver<UuidResolveMsg>, store: S) -> Self {
Self { inbox, store }
}
async fn run(mut self) {
use UuidResolveMsg::*;
info!("uuid resolver started");
loop {
match self.inbox.recv().await {
Some(Create { uid: name, ret }) => {
let _ = ret.send(self.handle_create(name).await);
}
Some(Get { uid: name, ret }) => {
let _ = ret.send(self.handle_get(name).await);
}
Some(Delete { uid: name, ret }) => {
let _ = ret.send(self.handle_delete(name).await);
}
Some(List { ret }) => {
let _ = ret.send(self.handle_list().await);
}
Some(Insert { ret, uuid, name }) => {
let _ = ret.send(self.handle_insert(name, uuid).await);
}
// all senders have been dropped, need to quit.
None => break,
}
}
warn!("exiting uuid resolver loop");
}
async fn handle_create(&self, uid: String) -> Result<Uuid> {
if !is_index_uid_valid(&uid) {
return Err(UuidError::BadlyFormatted(uid));
}
self.store.create_uuid(uid, true).await
}
async fn handle_get(&self, uid: String) -> Result<Uuid> {
self.store
.get_uuid(uid.clone())
.await?
.ok_or(UuidError::UnexistingIndex(uid))
}
async fn handle_delete(&self, uid: String) -> Result<Uuid> {
self.store
.delete(uid.clone())
.await?
.ok_or(UuidError::UnexistingIndex(uid))
}
async fn handle_list(&self) -> Result<Vec<(String, Uuid)>> {
let result = self.store.list().await?;
Ok(result)
}
async fn handle_insert(&self, uid: String, uuid: Uuid) -> Result<()> {
if !is_index_uid_valid(&uid) {
return Err(UuidError::BadlyFormatted(uid));
}
self.store.insert(uid, uuid).await?;
Ok(())
}
}
fn is_index_uid_valid(uid: &str) -> bool {
uid.chars()
.all(|x| x.is_ascii_alphanumeric() || x == '-' || x == '_')
}
#[derive(Clone)]
pub struct UuidResolverHandle {
sender: mpsc::Sender<UuidResolveMsg>,
}
impl UuidResolverHandle {
pub fn new(path: impl AsRef<Path>) -> anyhow::Result<Self> {
let (sender, reveiver) = mpsc::channel(100);
let store = HeedUuidStore::new(path)?;
let actor = UuidResolverActor::new(reveiver, store);
tokio::spawn(actor.run());
Ok(Self { sender })
}
pub async fn get(&self, name: String) -> Result<Uuid> {
let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::Get { uid: name, ret };
let _ = self.sender.send(msg).await;
Ok(receiver
.await
.expect("Uuid resolver actor has been killed")?)
}
pub async fn create(&self, name: String) -> anyhow::Result<Uuid> {
let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::Create { uid: name, ret };
let _ = self.sender.send(msg).await;
Ok(receiver
.await
.expect("Uuid resolver actor has been killed")?)
}
pub async fn delete(&self, name: String) -> anyhow::Result<Uuid> {
let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::Delete { uid: name, ret };
let _ = self.sender.send(msg).await;
Ok(receiver
.await
.expect("Uuid resolver actor has been killed")?)
}
pub async fn list(&self) -> anyhow::Result<Vec<(String, Uuid)>> {
let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::List { ret };
let _ = self.sender.send(msg).await;
Ok(receiver
.await
.expect("Uuid resolver actor has been killed")?)
}
pub async fn insert(&self, name: String, uuid: Uuid) -> anyhow::Result<()> {
let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::Insert { ret, name, uuid };
let _ = self.sender.send(msg).await;
Ok(receiver
.await
.expect("Uuid resolver actor has been killed")?)
}
}
#[derive(Debug, Error)]
pub enum UuidError {
#[error("Name already exist.")]
NameAlreadyExist,
#[error("Index \"{0}\" doesn't exist.")]
UnexistingIndex(String),
#[error("Error performing task: {0}")]
TokioTask(#[from] tokio::task::JoinError),
#[error("Database error: {0}")]
Heed(#[from] heed::Error),
#[error("Uuid error: {0}")]
Uuid(#[from] uuid::Error),
#[error("Badly formatted index uid: {0}")]
BadlyFormatted(String),
}
#[async_trait::async_trait]
trait UuidStore {
// Create a new entry for `name`. Return an error if `err` and the entry already exists, return
// the uuid otherwise.
async fn create_uuid(&self, uid: String, err: bool) -> Result<Uuid>;
async fn get_uuid(&self, uid: String) -> Result<Option<Uuid>>;
async fn delete(&self, uid: String) -> Result<Option<Uuid>>;
async fn list(&self) -> Result<Vec<(String, Uuid)>>;
async fn insert(&self, name: String, uuid: Uuid) -> Result<()>;
}
struct HeedUuidStore {
env: Env,
db: Database<Str, ByteSlice>,
}
impl HeedUuidStore {
fn new(path: impl AsRef<Path>) -> anyhow::Result<Self> {
let path = path.as_ref().join("index_uuids");
create_dir_all(&path)?;
let mut options = EnvOpenOptions::new();
options.map_size(1_073_741_824); // 1GB
let env = options.open(path)?;
let db = env.create_database(None)?;
Ok(Self { env, db })
}
}
#[async_trait::async_trait]
impl UuidStore for HeedUuidStore {
async fn create_uuid(&self, name: String, err: bool) -> Result<Uuid> {
let env = self.env.clone();
let db = self.db;
tokio::task::spawn_blocking(move || {
let mut txn = env.write_txn()?;
match db.get(&txn, &name)? {
Some(uuid) => {
if err {
Err(UuidError::NameAlreadyExist)
} else {
let uuid = Uuid::from_slice(uuid)?;
Ok(uuid)
}
}
None => {
let uuid = Uuid::new_v4();
db.put(&mut txn, &name, uuid.as_bytes())?;
txn.commit()?;
Ok(uuid)
}
}
})
.await?
}
async fn get_uuid(&self, name: String) -> Result<Option<Uuid>> {
let env = self.env.clone();
let db = self.db;
tokio::task::spawn_blocking(move || {
let txn = env.read_txn()?;
match db.get(&txn, &name)? {
Some(uuid) => {
let uuid = Uuid::from_slice(uuid)?;
Ok(Some(uuid))
}
None => Ok(None),
}
})
.await?
}
async fn delete(&self, uid: String) -> Result<Option<Uuid>> {
let env = self.env.clone();
let db = self.db;
tokio::task::spawn_blocking(move || {
let mut txn = env.write_txn()?;
match db.get(&txn, &uid)? {
Some(uuid) => {
let uuid = Uuid::from_slice(uuid)?;
db.delete(&mut txn, &uid)?;
txn.commit()?;
Ok(Some(uuid))
}
None => Ok(None),
}
})
.await?
}
async fn list(&self) -> Result<Vec<(String, Uuid)>> {
let env = self.env.clone();
let db = self.db;
tokio::task::spawn_blocking(move || {
let txn = env.read_txn()?;
let mut entries = Vec::new();
for entry in db.iter(&txn)? {
let (name, uuid) = entry?;
let uuid = Uuid::from_slice(uuid)?;
entries.push((name.to_owned(), uuid))
}
Ok(entries)
})
.await?
}
async fn insert(&self, name: String, uuid: Uuid) -> Result<()> {
let env = self.env.clone();
let db = self.db;
tokio::task::spawn_blocking(move || {
let mut txn = env.write_txn()?;
db.put(&mut txn, &name, uuid.as_bytes())?;
txn.commit()?;
Ok(())
})
.await?
}
}

View File

@@ -0,0 +1,94 @@
use std::path::PathBuf;
use log::{info, warn};
use tokio::sync::mpsc;
use uuid::Uuid;
use super::{Result, UuidError, UuidResolveMsg, UuidStore};
pub struct UuidResolverActor<S> {
inbox: mpsc::Receiver<UuidResolveMsg>,
store: S,
}
impl<S: UuidStore> UuidResolverActor<S> {
pub fn new(inbox: mpsc::Receiver<UuidResolveMsg>, store: S) -> Self {
Self { inbox, store }
}
pub async fn run(mut self) {
use UuidResolveMsg::*;
info!("uuid resolver started");
loop {
match self.inbox.recv().await {
Some(Create { uid: name, ret }) => {
let _ = ret.send(self.handle_create(name).await);
}
Some(Get { uid: name, ret }) => {
let _ = ret.send(self.handle_get(name).await);
}
Some(Delete { uid: name, ret }) => {
let _ = ret.send(self.handle_delete(name).await);
}
Some(List { ret }) => {
let _ = ret.send(self.handle_list().await);
}
Some(Insert { ret, uuid, name }) => {
let _ = ret.send(self.handle_insert(name, uuid).await);
}
Some(SnapshotRequest { path, ret }) => {
let _ = ret.send(self.handle_snapshot(path).await);
}
// all senders have been dropped, need to quit.
None => break,
}
}
warn!("exiting uuid resolver loop");
}
async fn handle_create(&self, uid: String) -> Result<Uuid> {
if !is_index_uid_valid(&uid) {
return Err(UuidError::BadlyFormatted(uid));
}
self.store.create_uuid(uid, true).await
}
async fn handle_get(&self, uid: String) -> Result<Uuid> {
self.store
.get_uuid(uid.clone())
.await?
.ok_or(UuidError::UnexistingIndex(uid))
}
async fn handle_delete(&self, uid: String) -> Result<Uuid> {
self.store
.delete(uid.clone())
.await?
.ok_or(UuidError::UnexistingIndex(uid))
}
async fn handle_list(&self) -> Result<Vec<(String, Uuid)>> {
let result = self.store.list().await?;
Ok(result)
}
async fn handle_snapshot(&self, path: PathBuf) -> Result<Vec<Uuid>> {
self.store.snapshot(path).await
}
async fn handle_insert(&self, uid: String, uuid: Uuid) -> Result<()> {
if !is_index_uid_valid(&uid) {
return Err(UuidError::BadlyFormatted(uid));
}
self.store.insert(uid, uuid).await?;
Ok(())
}
}
fn is_index_uid_valid(uid: &str) -> bool {
uid.chars()
.all(|x| x.is_ascii_alphanumeric() || x == '-' || x == '_')
}

View File

@@ -0,0 +1,78 @@
use std::path::{Path, PathBuf};
use tokio::sync::{mpsc, oneshot};
use uuid::Uuid;
use super::{HeedUuidStore, Result, UuidResolveMsg, UuidResolverActor, UuidResolverHandle};
#[derive(Clone)]
pub struct UuidResolverHandleImpl {
sender: mpsc::Sender<UuidResolveMsg>,
}
impl UuidResolverHandleImpl {
pub fn new(path: impl AsRef<Path>) -> anyhow::Result<Self> {
let (sender, reveiver) = mpsc::channel(100);
let store = HeedUuidStore::new(path)?;
let actor = UuidResolverActor::new(reveiver, store);
tokio::spawn(actor.run());
Ok(Self { sender })
}
}
#[async_trait::async_trait]
impl UuidResolverHandle for UuidResolverHandleImpl {
async fn get(&self, name: String) -> Result<Uuid> {
let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::Get { uid: name, ret };
let _ = self.sender.send(msg).await;
Ok(receiver
.await
.expect("Uuid resolver actor has been killed")?)
}
async fn create(&self, name: String) -> anyhow::Result<Uuid> {
let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::Create { uid: name, ret };
let _ = self.sender.send(msg).await;
Ok(receiver
.await
.expect("Uuid resolver actor has been killed")?)
}
async fn delete(&self, name: String) -> anyhow::Result<Uuid> {
let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::Delete { uid: name, ret };
let _ = self.sender.send(msg).await;
Ok(receiver
.await
.expect("Uuid resolver actor has been killed")?)
}
async fn list(&self) -> anyhow::Result<Vec<(String, Uuid)>> {
let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::List { ret };
let _ = self.sender.send(msg).await;
Ok(receiver
.await
.expect("Uuid resolver actor has been killed")?)
}
async fn insert(&self, name: String, uuid: Uuid) -> anyhow::Result<()> {
let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::Insert { ret, name, uuid };
let _ = self.sender.send(msg).await;
Ok(receiver
.await
.expect("Uuid resolver actor has been killed")?)
}
async fn snapshot(&self, path: PathBuf) -> Result<Vec<Uuid>> {
let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::SnapshotRequest { path, ret };
let _ = self.sender.send(msg).await;
Ok(receiver
.await
.expect("Uuid resolver actor has been killed")?)
}
}

View File

@@ -0,0 +1,32 @@
use std::path::PathBuf;
use tokio::sync::oneshot;
use uuid::Uuid;
use super::Result;
pub enum UuidResolveMsg {
Get {
uid: String,
ret: oneshot::Sender<Result<Uuid>>,
},
Create {
uid: String,
ret: oneshot::Sender<Result<Uuid>>,
},
Delete {
uid: String,
ret: oneshot::Sender<Result<Uuid>>,
},
List {
ret: oneshot::Sender<Result<Vec<(String, Uuid)>>>,
},
Insert {
uuid: Uuid,
name: String,
ret: oneshot::Sender<Result<()>>,
},
SnapshotRequest {
path: PathBuf,
ret: oneshot::Sender<Result<Vec<Uuid>>>,
},
}

View File

@@ -0,0 +1,49 @@
mod actor;
mod handle_impl;
mod message;
mod store;
use std::path::PathBuf;
use thiserror::Error;
use uuid::Uuid;
use actor::UuidResolverActor;
use message::UuidResolveMsg;
use store::{HeedUuidStore, UuidStore};
#[cfg(test)]
use mockall::automock;
pub use handle_impl::UuidResolverHandleImpl;
const UUID_STORE_SIZE: usize = 1_073_741_824; //1GiB
pub type Result<T> = std::result::Result<T, UuidError>;
#[async_trait::async_trait]
#[cfg_attr(test, automock)]
pub trait UuidResolverHandle {
async fn get(&self, name: String) -> Result<Uuid>;
async fn insert(&self, name: String, uuid: Uuid) -> anyhow::Result<()>;
async fn create(&self, name: String) -> anyhow::Result<Uuid>;
async fn delete(&self, name: String) -> anyhow::Result<Uuid>;
async fn list(&self) -> anyhow::Result<Vec<(String, Uuid)>>;
async fn snapshot(&self, path: PathBuf) -> Result<Vec<Uuid>>;
}
#[derive(Debug, Error)]
pub enum UuidError {
#[error("Name already exist.")]
NameAlreadyExist,
#[error("Index \"{0}\" doesn't exist.")]
UnexistingIndex(String),
#[error("Error performing task: {0}")]
TokioTask(#[from] tokio::task::JoinError),
#[error("Database error: {0}")]
Heed(#[from] heed::Error),
#[error("Uuid error: {0}")]
Uuid(#[from] uuid::Error),
#[error("Badly formatted index uid: {0}")]
BadlyFormatted(String),
}

View File

@@ -0,0 +1,154 @@
use std::fs::create_dir_all;
use std::path::{Path, PathBuf};
use heed::{
types::{ByteSlice, Str},
CompactionOption, Database, Env, EnvOpenOptions,
};
use uuid::Uuid;
use super::{Result, UuidError, UUID_STORE_SIZE};
#[async_trait::async_trait]
pub trait UuidStore {
// Create a new entry for `name`. Return an error if `err` and the entry already exists, return
// the uuid otherwise.
async fn create_uuid(&self, uid: String, err: bool) -> Result<Uuid>;
async fn get_uuid(&self, uid: String) -> Result<Option<Uuid>>;
async fn delete(&self, uid: String) -> Result<Option<Uuid>>;
async fn list(&self) -> Result<Vec<(String, Uuid)>>;
async fn insert(&self, name: String, uuid: Uuid) -> Result<()>;
async fn snapshot(&self, path: PathBuf) -> Result<Vec<Uuid>>;
}
pub struct HeedUuidStore {
env: Env,
db: Database<Str, ByteSlice>,
}
impl HeedUuidStore {
pub fn new(path: impl AsRef<Path>) -> anyhow::Result<Self> {
let path = path.as_ref().join("index_uuids");
create_dir_all(&path)?;
let mut options = EnvOpenOptions::new();
options.map_size(UUID_STORE_SIZE); // 1GB
let env = options.open(path)?;
let db = env.create_database(None)?;
Ok(Self { env, db })
}
}
#[async_trait::async_trait]
impl UuidStore for HeedUuidStore {
async fn create_uuid(&self, name: String, err: bool) -> Result<Uuid> {
let env = self.env.clone();
let db = self.db;
tokio::task::spawn_blocking(move || {
let mut txn = env.write_txn()?;
match db.get(&txn, &name)? {
Some(uuid) => {
if err {
Err(UuidError::NameAlreadyExist)
} else {
let uuid = Uuid::from_slice(uuid)?;
Ok(uuid)
}
}
None => {
let uuid = Uuid::new_v4();
db.put(&mut txn, &name, uuid.as_bytes())?;
txn.commit()?;
Ok(uuid)
}
}
})
.await?
}
async fn get_uuid(&self, name: String) -> Result<Option<Uuid>> {
let env = self.env.clone();
let db = self.db;
tokio::task::spawn_blocking(move || {
let txn = env.read_txn()?;
match db.get(&txn, &name)? {
Some(uuid) => {
let uuid = Uuid::from_slice(uuid)?;
Ok(Some(uuid))
}
None => Ok(None),
}
})
.await?
}
async fn delete(&self, uid: String) -> Result<Option<Uuid>> {
let env = self.env.clone();
let db = self.db;
tokio::task::spawn_blocking(move || {
let mut txn = env.write_txn()?;
match db.get(&txn, &uid)? {
Some(uuid) => {
let uuid = Uuid::from_slice(uuid)?;
db.delete(&mut txn, &uid)?;
txn.commit()?;
Ok(Some(uuid))
}
None => Ok(None),
}
})
.await?
}
async fn list(&self) -> Result<Vec<(String, Uuid)>> {
let env = self.env.clone();
let db = self.db;
tokio::task::spawn_blocking(move || {
let txn = env.read_txn()?;
let mut entries = Vec::new();
for entry in db.iter(&txn)? {
let (name, uuid) = entry?;
let uuid = Uuid::from_slice(uuid)?;
entries.push((name.to_owned(), uuid))
}
Ok(entries)
})
.await?
}
async fn insert(&self, name: String, uuid: Uuid) -> Result<()> {
let env = self.env.clone();
let db = self.db;
tokio::task::spawn_blocking(move || {
let mut txn = env.write_txn()?;
db.put(&mut txn, &name, uuid.as_bytes())?;
txn.commit()?;
Ok(())
})
.await?
}
async fn snapshot(&self, mut path: PathBuf) -> Result<Vec<Uuid>> {
let env = self.env.clone();
let db = self.db;
tokio::task::spawn_blocking(move || {
// Write transaction to acquire a lock on the database.
let txn = env.write_txn()?;
let mut entries = Vec::new();
for entry in db.iter(&txn)? {
let (_, uuid) = entry?;
let uuid = Uuid::from_slice(uuid)?;
entries.push(uuid)
}
// only perform snapshot if there are indexes
if !entries.is_empty() {
path.push("index_uuids");
create_dir_all(&path).unwrap();
path.push("data.mdb");
env.copy_to_path(path, CompactionOption::Enabled)?;
}
Ok(entries)
})
.await?
}
}

View File

@@ -191,8 +191,8 @@ pub struct Opt {
pub schedule_snapshot: bool, pub schedule_snapshot: bool,
/// Defines time interval, in seconds, between each snapshot creation. /// Defines time interval, in seconds, between each snapshot creation.
#[structopt(long, env = "MEILI_SNAPSHOT_INTERVAL_SEC")] #[structopt(long, env = "MEILI_SNAPSHOT_INTERVAL_SEC", default_value = "86400")] // 24h
pub snapshot_interval_sec: Option<u64>, pub snapshot_interval_sec: u64,
/// Folder where dumps are created when the dump route is called. /// Folder where dumps are created when the dump route is called.
#[structopt(long, env = "MEILI_DUMPS_DIR", default_value = "dumps/")] #[structopt(long, env = "MEILI_DUMPS_DIR", default_value = "dumps/")]

View File

@@ -1,96 +0,0 @@
use crate::Data;
use crate::error::Error;
use crate::helpers::compression;
use log::error;
use std::fs::create_dir_all;
use std::path::Path;
use std::thread;
use std::time::{Duration};
use tempfile::TempDir;
pub fn load_snapshot(
db_path: &str,
snapshot_path: &Path,
ignore_snapshot_if_db_exists: bool,
ignore_missing_snapshot: bool
) -> Result<(), Error> {
let db_path = Path::new(db_path);
if !db_path.exists() && snapshot_path.exists() {
compression::from_tar_gz(snapshot_path, db_path)
} else if db_path.exists() && !ignore_snapshot_if_db_exists {
Err(Error::Internal(format!("database already exists at {:?}, try to delete it or rename it", db_path.canonicalize().unwrap_or(db_path.into()))))
} else if !snapshot_path.exists() && !ignore_missing_snapshot {
Err(Error::Internal(format!("snapshot doesn't exist at {:?}", snapshot_path.canonicalize().unwrap_or(snapshot_path.into()))))
} else {
Ok(())
}
}
pub fn create_snapshot(data: &Data, snapshot_path: &Path) -> Result<(), Error> {
let tmp_dir = TempDir::new()?;
data.db.copy_and_compact_to_path(tmp_dir.path())?;
compression::to_tar_gz(tmp_dir.path(), snapshot_path).map_err(|e| Error::Internal(format!("something went wrong during snapshot compression: {}", e)))
}
pub fn schedule_snapshot(data: Data, snapshot_dir: &Path, time_gap_s: u64) -> Result<(), Error> {
if snapshot_dir.file_name().is_none() {
return Err(Error::Internal("invalid snapshot file path".to_string()));
}
let db_name = Path::new(&data.db_path).file_name().ok_or_else(|| Error::Internal("invalid database name".to_string()))?;
create_dir_all(snapshot_dir)?;
let snapshot_path = snapshot_dir.join(format!("{}.snapshot", db_name.to_str().unwrap_or("data.ms")));
thread::spawn(move || loop {
if let Err(e) = create_snapshot(&data, &snapshot_path) {
error!("Unsuccessful snapshot creation: {}", e);
}
thread::sleep(Duration::from_secs(time_gap_s));
});
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::prelude::*;
use std::fs;
#[test]
fn test_pack_unpack() {
let tempdir = TempDir::new().unwrap();
let test_dir = tempdir.path();
let src_dir = test_dir.join("src");
let dest_dir = test_dir.join("complex/destination/path/");
let archive_path = test_dir.join("archive.snapshot");
let file_1_relative = Path::new("file1.txt");
let subdir_relative = Path::new("subdir/");
let file_2_relative = Path::new("subdir/file2.txt");
create_dir_all(src_dir.join(subdir_relative)).unwrap();
fs::File::create(src_dir.join(file_1_relative)).unwrap().write_all(b"Hello_file_1").unwrap();
fs::File::create(src_dir.join(file_2_relative)).unwrap().write_all(b"Hello_file_2").unwrap();
assert!(compression::to_tar_gz(&src_dir, &archive_path).is_ok());
assert!(archive_path.exists());
assert!(load_snapshot(&dest_dir.to_str().unwrap(), &archive_path, false, false).is_ok());
assert!(dest_dir.exists());
assert!(dest_dir.join(file_1_relative).exists());
assert!(dest_dir.join(subdir_relative).exists());
assert!(dest_dir.join(file_2_relative).exists());
let contents = fs::read_to_string(dest_dir.join(file_1_relative)).unwrap();
assert_eq!(contents, "Hello_file_1");
let contents = fs::read_to_string(dest_dir.join(file_2_relative)).unwrap();
assert_eq!(contents, "Hello_file_2");
}
}

View File

@@ -96,7 +96,6 @@ impl Index<'_> {
self.service.get(url).await self.service.get(url).await
} }
#[allow(dead_code)]
pub async fn list_updates(&self) -> (Value, StatusCode) { pub async fn list_updates(&self) -> (Value, StatusCode) {
let url = format!("/indexes/{}/updates", self.uid); let url = format!("/indexes/{}/updates", self.uid);
self.service.get(url).await self.service.get(url).await

View File

@@ -1,6 +1,6 @@
mod index; pub mod index;
mod server; pub mod server;
mod service; pub mod service;
pub use index::{GetAllDocumentsOptions, GetDocumentOptions}; pub use index::{GetAllDocumentsOptions, GetDocumentOptions};
pub use server::Server; pub use server::Server;

View File

@@ -1,3 +1,5 @@
use std::path::Path;
use actix_web::http::StatusCode; use actix_web::http::StatusCode;
use byte_unit::{Byte, ByteUnit}; use byte_unit::{Byte, ByteUnit};
use serde_json::Value; use serde_json::Value;
@@ -12,50 +14,33 @@ use super::service::Service;
pub struct Server { pub struct Server {
pub service: Service, pub service: Service,
// hod ownership to the tempdir while we use the server instance. // hold ownership to the tempdir while we use the server instance.
_dir: tempdir::TempDir, _dir: Option<tempdir::TempDir>,
} }
impl Server { impl Server {
pub async fn new() -> Self { pub async fn new() -> Self {
let dir = TempDir::new("meilisearch").unwrap(); let dir = TempDir::new("meilisearch").unwrap();
let opt = Opt { let opt = default_settings(dir.path());
db_path: dir.path().join("db"),
dumps_dir: dir.path().join("dump"),
dump_batch_size: 16,
http_addr: "127.0.0.1:7700".to_owned(),
master_key: None,
env: "development".to_owned(),
no_analytics: true,
max_mdb_size: Byte::from_unit(4.0, ByteUnit::GiB).unwrap(),
max_udb_size: Byte::from_unit(4.0, ByteUnit::GiB).unwrap(),
http_payload_size_limit: Byte::from_unit(10.0, ByteUnit::MiB).unwrap(),
ssl_cert_path: None,
ssl_key_path: None,
ssl_auth_path: None,
ssl_ocsp_path: None,
ssl_require_auth: false,
ssl_resumption: false,
ssl_tickets: false,
import_snapshot: None,
ignore_missing_snapshot: false,
ignore_snapshot_if_db_exists: false,
snapshot_dir: ".".into(),
schedule_snapshot: false,
snapshot_interval_sec: None,
import_dump: None,
indexer_options: IndexerOpts::default(),
#[cfg(all(not(debug_assertions), feature = "sentry"))]
sentry_dsn: String::from(""),
#[cfg(all(not(debug_assertions), feature = "sentry"))]
no_sentry: true,
};
let data = Data::new(opt).unwrap(); let data = Data::new(opt).unwrap();
let service = Service(data); let service = Service(data);
Server { service, _dir: dir } Server {
service,
_dir: Some(dir),
}
}
pub async fn new_with_options(opt: Opt) -> Self {
let data = Data::new(opt).unwrap();
let service = Service(data);
Server {
service,
_dir: None,
}
} }
/// Returns a view to an index. There is no guarantee that the index exists. /// Returns a view to an index. There is no guarantee that the index exists.
@@ -74,3 +59,37 @@ impl Server {
self.service.get("/version").await self.service.get("/version").await
} }
} }
pub fn default_settings(dir: impl AsRef<Path>) -> Opt {
Opt {
db_path: dir.as_ref().join("db"),
dumps_dir: dir.as_ref().join("dump"),
dump_batch_size: 16,
http_addr: "127.0.0.1:7700".to_owned(),
master_key: None,
env: "development".to_owned(),
no_analytics: true,
max_mdb_size: Byte::from_unit(4.0, ByteUnit::GiB).unwrap(),
max_udb_size: Byte::from_unit(4.0, ByteUnit::GiB).unwrap(),
http_payload_size_limit: Byte::from_unit(10.0, ByteUnit::MiB).unwrap(),
ssl_cert_path: None,
ssl_key_path: None,
ssl_auth_path: None,
ssl_ocsp_path: None,
ssl_require_auth: false,
ssl_resumption: false,
ssl_tickets: false,
import_snapshot: None,
ignore_missing_snapshot: false,
ignore_snapshot_if_db_exists: false,
snapshot_dir: ".".into(),
schedule_snapshot: false,
snapshot_interval_sec: 0,
import_dump: None,
indexer_options: IndexerOpts::default(),
#[cfg(all(not(debug_assertions), feature = "sentry"))]
sentry_dsn: String::from(""),
#[cfg(all(not(debug_assertions), feature = "sentry"))]
no_sentry: true,
}
}

View File

@@ -3,8 +3,9 @@ mod documents;
mod index; mod index;
mod search; mod search;
mod settings; mod settings;
mod updates; mod snapshot;
mod stats; mod stats;
mod updates;
// Tests are isolated by features in different modules to allow better readability, test // Tests are isolated by features in different modules to allow better readability, test
// targetability, and improved incremental compilation times. // targetability, and improved incremental compilation times.

View File

@@ -0,0 +1,53 @@
use std::time::Duration;
use crate::common::server::default_settings;
use crate::common::GetAllDocumentsOptions;
use crate::common::Server;
use tokio::time::sleep;
use meilisearch_http::Opt;
#[ignore]
#[actix_rt::test]
async fn perform_snapshot() {
let temp = tempfile::tempdir_in(".").unwrap();
let snapshot_dir = tempfile::tempdir_in(".").unwrap();
let options = Opt {
snapshot_dir: snapshot_dir.path().to_owned(),
snapshot_interval_sec: 1,
schedule_snapshot: true,
..default_settings(temp.path())
};
let server = Server::new_with_options(options).await;
let index = server.index("test");
index.load_test_set().await;
let (response, _) = index
.get_all_documents(GetAllDocumentsOptions::default())
.await;
sleep(Duration::from_secs(2)).await;
let temp = tempfile::tempdir_in(".").unwrap();
let snapshot_path = snapshot_dir
.path()
.to_owned()
.join(format!("db.snapshot"));
let options = Opt {
import_snapshot: Some(snapshot_path),
..default_settings(temp.path())
};
let server = Server::new_with_options(options).await;
let index = server.index("test");
let (response_from_snapshot, _) = index
.get_all_documents(GetAllDocumentsOptions::default())
.await;
assert_eq!(response, response_from_snapshot);
}

View File

@@ -18,4 +18,4 @@ async fn test_healthyness() {
let (response, status_code) = server.service.get("/health").await; let (response, status_code) = server.service.get("/health").await;
assert_eq!(status_code, 200); assert_eq!(status_code, 200);
assert_eq!(response["status"], "available"); assert_eq!(response["status"], "available");
} }