format code

This commit is contained in:
mpostma
2021-03-24 11:29:11 +01:00
parent 1f16c8d224
commit 4041d9dc48
18 changed files with 60 additions and 61 deletions

View File

@@ -2,7 +2,7 @@ use std::fs::{create_dir_all, File};
use std::io::Write; use std::io::Write;
use std::path::Path; use std::path::Path;
use flate2::{Compression, write::GzEncoder, read::GzDecoder}; use flate2::{read::GzDecoder, write::GzEncoder, Compression};
use tar::{Archive, Builder}; use tar::{Archive, Builder};
pub fn to_tar_gz(src: impl AsRef<Path>, dest: impl AsRef<Path>) -> anyhow::Result<()> { pub fn to_tar_gz(src: impl AsRef<Path>, dest: impl AsRef<Path>) -> anyhow::Result<()> {

View File

@@ -12,11 +12,11 @@ use tokio::sync::mpsc;
use tokio::task::spawn_blocking; use tokio::task::spawn_blocking;
use uuid::Uuid; use uuid::Uuid;
use super::{IndexError, IndexMeta, IndexMsg, IndexSettings, IndexStore, Result, UpdateResult};
use crate::index::{Document, SearchQuery, SearchResult, Settings}; use crate::index::{Document, SearchQuery, SearchResult, Settings};
use crate::index_controller::update_handler::UpdateHandler; use crate::index_controller::update_handler::UpdateHandler;
use crate::index_controller::{updates::Processing, UpdateMeta, get_arc_ownership_blocking}; use crate::index_controller::{get_arc_ownership_blocking, updates::Processing, UpdateMeta};
use crate::option::IndexerOpts; use crate::option::IndexerOpts;
use super::{IndexSettings, Result, IndexMsg, IndexStore, IndexError, UpdateResult, IndexMeta};
pub struct IndexActor<S> { pub struct IndexActor<S> {
read_receiver: Option<mpsc::Receiver<IndexMsg>>, read_receiver: Option<mpsc::Receiver<IndexMsg>>,

View File

@@ -1,12 +1,14 @@
use std::path::{PathBuf, Path}; use std::path::{Path, PathBuf};
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use uuid::Uuid; use uuid::Uuid;
use super::{
IndexActor, IndexActorHandle, IndexMeta, IndexMsg, MapIndexStore, Result, UpdateResult,
};
use crate::index::{Document, SearchQuery, SearchResult, Settings}; use crate::index::{Document, SearchQuery, SearchResult, Settings};
use crate::index_controller::IndexSettings; use crate::index_controller::IndexSettings;
use crate::index_controller::{updates::Processing, UpdateMeta}; use crate::index_controller::{updates::Processing, UpdateMeta};
use super::{IndexActorHandle, IndexMsg, IndexMeta, UpdateResult, Result, IndexActor, MapIndexStore};
#[derive(Clone)] #[derive(Clone)]
pub struct IndexActorHandleImpl { pub struct IndexActorHandleImpl {
@@ -102,11 +104,7 @@ impl IndexActorHandle for IndexActorHandleImpl {
Ok(receiver.await.expect("IndexActor has been killed")?) Ok(receiver.await.expect("IndexActor has been killed")?)
} }
async fn update_index( async fn update_index(&self, uuid: Uuid, index_settings: IndexSettings) -> Result<IndexMeta> {
&self,
uuid: Uuid,
index_settings: IndexSettings,
) -> Result<IndexMeta> {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::UpdateIndex { let msg = IndexMsg::UpdateIndex {
uuid, uuid,

View File

@@ -3,12 +3,9 @@ use std::path::PathBuf;
use tokio::sync::oneshot; use tokio::sync::oneshot;
use uuid::Uuid; use uuid::Uuid;
use super::{IndexMeta, IndexSettings, Result, UpdateResult};
use crate::index::{Document, SearchQuery, SearchResult, Settings}; use crate::index::{Document, SearchQuery, SearchResult, Settings};
use crate::index_controller::{ use crate::index_controller::{updates::Processing, UpdateMeta};
updates::Processing,
UpdateMeta,
};
use super::{IndexSettings, IndexMeta, UpdateResult, Result};
pub enum IndexMsg { pub enum IndexMsg {
CreateIndex { CreateIndex {

View File

@@ -17,9 +17,9 @@ use crate::index_controller::{
updates::{Failed, Processed, Processing}, updates::{Failed, Processed, Processing},
UpdateMeta, UpdateMeta,
}; };
use actor::IndexActor;
use message::IndexMsg; use message::IndexMsg;
use store::{IndexStore, MapIndexStore}; use store::{IndexStore, MapIndexStore};
use actor::IndexActor;
pub use handle_impl::IndexActorHandleImpl; pub use handle_impl::IndexActorHandleImpl;
@@ -69,7 +69,6 @@ pub enum IndexError {
ExistingPrimaryKey, ExistingPrimaryKey,
} }
#[async_trait::async_trait] #[async_trait::async_trait]
#[cfg_attr(test, automock)] #[cfg_attr(test, automock)]
pub trait IndexActorHandle { pub trait IndexActorHandle {
@@ -97,11 +96,6 @@ pub trait IndexActorHandle {
) -> Result<Document>; ) -> Result<Document>;
async fn delete(&self, uuid: Uuid) -> Result<()>; async fn delete(&self, uuid: Uuid) -> Result<()>;
async fn get_index_meta(&self, uuid: Uuid) -> Result<IndexMeta>; async fn get_index_meta(&self, uuid: Uuid) -> Result<IndexMeta>;
async fn update_index( async fn update_index(&self, uuid: Uuid, index_settings: IndexSettings) -> Result<IndexMeta>;
&self,
uuid: Uuid,
index_settings: IndexSettings,
) -> Result<IndexMeta>;
async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()>; async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()>;
} }

View File

@@ -1,12 +1,12 @@
use std::path::{PathBuf, Path};
use std::sync::Arc;
use std::collections::HashMap; use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use uuid::Uuid; use heed::EnvOpenOptions;
use tokio::fs;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tokio::task::spawn_blocking; use tokio::task::spawn_blocking;
use tokio::fs; use uuid::Uuid;
use heed::EnvOpenOptions;
use super::{IndexError, Result}; use super::{IndexError, Result};
use crate::index::Index; use crate::index::Index;

View File

@@ -12,6 +12,7 @@ use std::time::Duration;
use actix_web::web::{Bytes, Payload}; use actix_web::web::{Bytes, Payload};
use anyhow::bail; use anyhow::bail;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use log::info;
use milli::update::{IndexDocumentsMethod, UpdateFormat}; use milli::update::{IndexDocumentsMethod, UpdateFormat};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::sync::mpsc; use tokio::sync::mpsc;
@@ -22,9 +23,9 @@ use crate::index::{Facets, Settings, UpdateResult};
use crate::option::Opt; use crate::option::Opt;
use index_actor::IndexActorHandle; use index_actor::IndexActorHandle;
use snapshot::load_snapshot;
use update_actor::UpdateActorHandle; use update_actor::UpdateActorHandle;
use uuid_resolver::UuidResolverHandle; use uuid_resolver::UuidResolverHandle;
use snapshot::load_snapshot;
use snapshot::SnapshotService; use snapshot::SnapshotService;
pub use updates::{Failed, Processed, Processing}; pub use updates::{Failed, Processed, Processing};
@@ -72,6 +73,7 @@ impl IndexController {
let update_store_size = options.max_udb_size.get_bytes() as usize; let update_store_size = options.max_udb_size.get_bytes() as usize;
if let Some(ref path) = options.import_snapshot { if let Some(ref path) = options.import_snapshot {
info!("Loading from snapshot {:?}", path);
load_snapshot( load_snapshot(
&options.db_path, &options.db_path,
path, path,

View File

@@ -39,7 +39,10 @@ where
} }
pub async fn run(self) { pub async fn run(self) {
info!("Snashot scheduled every {}s.", self.snapshot_period.as_secs()); info!(
"Snashot scheduled every {}s.",
self.snapshot_period.as_secs()
);
loop { loop {
sleep(self.snapshot_period).await; sleep(self.snapshot_period).await;
if let Err(e) = self.perform_snapshot().await { if let Err(e) = self.perform_snapshot().await {
@@ -49,17 +52,11 @@ where
} }
async fn perform_snapshot(&self) -> anyhow::Result<()> { async fn perform_snapshot(&self) -> anyhow::Result<()> {
if !self.snapshot_path.is_file() {
bail!("Invalid snapshot file path.");
}
info!("Performing snapshot."); info!("Performing snapshot.");
let temp_snapshot_dir = spawn_blocking(move || tempfile::tempdir_in(".")).await??; let temp_snapshot_dir = spawn_blocking(move || tempfile::tempdir_in(".")).await??;
let temp_snapshot_path = temp_snapshot_dir.path().to_owned(); let temp_snapshot_path = temp_snapshot_dir.path().to_owned();
fs::create_dir_all(&temp_snapshot_path).await?;
let uuids = self let uuids = self
.uuid_resolver_handle .uuid_resolver_handle
.snapshot(temp_snapshot_path.clone()) .snapshot(temp_snapshot_path.clone())

View File

@@ -2,15 +2,15 @@ use std::io::SeekFrom;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use log::info; use log::info;
use tokio::sync::mpsc;
use uuid::Uuid;
use oxidized_json_checker::JsonChecker; use oxidized_json_checker::JsonChecker;
use tokio::fs; use tokio::fs;
use tokio::io::{AsyncSeekExt, AsyncWriteExt}; use tokio::io::{AsyncSeekExt, AsyncWriteExt};
use tokio::sync::mpsc;
use uuid::Uuid;
use super::{PayloadData, UpdateError, UpdateMsg, UpdateStoreStore, Result}; use super::{PayloadData, Result, UpdateError, UpdateMsg, UpdateStoreStore};
use crate::index_controller::index_actor::IndexActorHandle; use crate::index_controller::index_actor::IndexActorHandle;
use crate::index_controller::{UpdateMeta, UpdateStatus, get_arc_ownership_blocking}; use crate::index_controller::{get_arc_ownership_blocking, UpdateMeta, UpdateStatus};
pub struct UpdateActor<D, S, I> { pub struct UpdateActor<D, S, I> {
path: PathBuf, path: PathBuf,

View File

@@ -1,9 +1,9 @@
use std::path::PathBuf; use std::path::PathBuf;
use tokio::sync::{mpsc, oneshot};
use uuid::Uuid; use uuid::Uuid;
use tokio::sync::{oneshot, mpsc};
use super::{Result, PayloadData, UpdateStatus, UpdateMeta}; use super::{PayloadData, Result, UpdateMeta, UpdateStatus};
pub enum UpdateMsg<D> { pub enum UpdateMsg<D> {
Update { Update {

View File

@@ -1,7 +1,7 @@
mod actor; mod actor;
mod store;
mod message;
mod handle_impl; mod handle_impl;
mod message;
mod store;
mod update_store; mod update_store;
use std::path::PathBuf; use std::path::PathBuf;
@@ -15,7 +15,7 @@ use crate::index_controller::{UpdateMeta, UpdateStatus};
use actor::UpdateActor; use actor::UpdateActor;
use message::UpdateMsg; use message::UpdateMsg;
use store::{UpdateStoreStore, MapUpdateStoreStore}; use store::{MapUpdateStoreStore, UpdateStoreStore};
pub use handle_impl::UpdateActorHandleImpl; pub use handle_impl::UpdateActorHandleImpl;
@@ -51,5 +51,5 @@ pub trait UpdateActorHandle {
meta: UpdateMeta, meta: UpdateMeta,
data: mpsc::Receiver<PayloadData<Self::Data>>, data: mpsc::Receiver<PayloadData<Self::Data>>,
uuid: Uuid, uuid: Uuid,
) -> Result<UpdateStatus> ; ) -> Result<UpdateStatus>;
} }

View File

@@ -1,10 +1,10 @@
use std::fs::{remove_file, create_dir_all, copy}; use std::fs::{copy, create_dir_all, remove_file};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use heed::types::{DecodeIgnore, OwnedType, SerdeJson}; use heed::types::{DecodeIgnore, OwnedType, SerdeJson};
use heed::{Database, Env, EnvOpenOptions, CompactionOption}; use heed::{CompactionOption, Database, Env, EnvOpenOptions};
use parking_lot::{RwLock, Mutex}; use parking_lot::{Mutex, RwLock};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::fs::File; use std::fs::File;
use tokio::sync::mpsc; use tokio::sync::mpsc;
@@ -379,7 +379,12 @@ where
Ok(aborted_updates) Ok(aborted_updates)
} }
pub fn snapshot(&self, txn: &mut heed::RwTxn, path: impl AsRef<Path>, uuid: Uuid) -> anyhow::Result<()> { pub fn snapshot(
&self,
txn: &mut heed::RwTxn,
path: impl AsRef<Path>,
uuid: Uuid,
) -> anyhow::Result<()> {
let update_path = path.as_ref().join("updates"); let update_path = path.as_ref().join("updates");
create_dir_all(&update_path)?; create_dir_all(&update_path)?;
@@ -389,7 +394,8 @@ where
snapshot_path.push("data.mdb"); snapshot_path.push("data.mdb");
// create db snapshot // create db snapshot
self.env.copy_to_path(&snapshot_path, CompactionOption::Enabled)?; self.env
.copy_to_path(&snapshot_path, CompactionOption::Enabled)?;
let update_files_path = update_path.join("update_files"); let update_files_path = update_path.join("update_files");
create_dir_all(&update_files_path)?; create_dir_all(&update_files_path)?;

View File

@@ -4,7 +4,7 @@ use log::{info, warn};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use uuid::Uuid; use uuid::Uuid;
use super::{UuidResolveMsg, UuidStore, Result, UuidError}; use super::{Result, UuidError, UuidResolveMsg, UuidStore};
pub struct UuidResolverActor<S> { pub struct UuidResolverActor<S> {
inbox: mpsc::Receiver<UuidResolveMsg>, inbox: mpsc::Receiver<UuidResolveMsg>,
@@ -91,4 +91,3 @@ fn is_index_uid_valid(uid: &str) -> bool {
uid.chars() uid.chars()
.all(|x| x.is_ascii_alphanumeric() || x == '-' || x == '_') .all(|x| x.is_ascii_alphanumeric() || x == '-' || x == '_')
} }

View File

@@ -3,7 +3,7 @@ use std::path::{Path, PathBuf};
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use uuid::Uuid; use uuid::Uuid;
use super::{HeedUuidStore, UuidResolverActor, UuidResolveMsg, UuidResolverHandle, Result}; use super::{HeedUuidStore, Result, UuidResolveMsg, UuidResolverActor, UuidResolverHandle};
#[derive(Clone)] #[derive(Clone)]
pub struct UuidResolverHandleImpl { pub struct UuidResolverHandleImpl {
@@ -21,7 +21,7 @@ impl UuidResolverHandleImpl {
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl UuidResolverHandle for UuidResolverHandleImpl { impl UuidResolverHandle for UuidResolverHandleImpl {
async fn resolve(&self, name: String) -> anyhow::Result<Uuid> { async fn resolve(&self, name: String) -> anyhow::Result<Uuid> {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::Resolve { uid: name, ret }; let msg = UuidResolveMsg::Resolve { uid: name, ret };

View File

@@ -3,11 +3,11 @@ use std::path::{Path, PathBuf};
use heed::{ use heed::{
types::{ByteSlice, Str}, types::{ByteSlice, Str},
Database, Env, EnvOpenOptions,CompactionOption CompactionOption, Database, Env, EnvOpenOptions,
}; };
use uuid::Uuid; use uuid::Uuid;
use super::{UUID_STORE_SIZE, UuidError, Result}; use super::{Result, UuidError, UUID_STORE_SIZE};
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait UuidStore { pub trait UuidStore {

View File

@@ -56,7 +56,7 @@ struct VersionResponse {
#[get("/version", wrap = "Authentication::Private")] #[get("/version", wrap = "Authentication::Private")]
async fn get_version() -> HttpResponse { async fn get_version() -> HttpResponse {
HttpResponse::Ok().json(VersionResponse { HttpResponse::Ok().json(VersionResponse {
commit_sha: env!("VERGEN_SHA").to_string(), commit_sha: env!("VERGEN_SHA").to_string(),
build_date: env!("VERGEN_BUILD_TIMESTAMP").to_string(), build_date: env!("VERGEN_BUILD_TIMESTAMP").to_string(),
pkg_version: env!("CARGO_PKG_VERSION").to_string(), pkg_version: env!("CARGO_PKG_VERSION").to_string(),

View File

@@ -27,14 +27,20 @@ impl Server {
let data = Data::new(opt).unwrap(); let data = Data::new(opt).unwrap();
let service = Service(data); let service = Service(data);
Server { service, _dir: Some(dir) } Server {
service,
_dir: Some(dir),
}
} }
pub async fn new_with_options(opt: Opt) -> Self { pub async fn new_with_options(opt: Opt) -> Self {
let data = Data::new(opt).unwrap(); let data = Data::new(opt).unwrap();
let service = Service(data); let service = Service(data);
Server { service, _dir: None } Server {
service,
_dir: None,
}
} }
/// Returns a view to an index. There is no guarantee that the index exists. /// Returns a view to an index. There is no guarantee that the index exists.

View File

@@ -18,4 +18,4 @@ async fn test_healthyness() {
let (response, status_code) = server.service.get("/health").await; let (response, status_code) = server.service.get("/health").await;
assert_eq!(status_code, 200); assert_eq!(status_code, 200);
assert_eq!(response["status"], "available"); assert_eq!(response["status"], "available");
} }