update tokio and disable all routes

This commit is contained in:
mpostma
2021-02-26 09:10:04 +01:00
parent 45d8f36f5e
commit 61ce749122
14 changed files with 680 additions and 461 deletions

View File

@ -9,7 +9,8 @@ use std::sync::Arc;
use sha2::Digest;
use crate::index_controller::{IndexController, LocalIndexController, IndexMetadata, Settings, IndexSettings};
use crate::index_controller::{IndexController, IndexMetadata, Settings, IndexSettings};
use crate::index_controller::actor_index_controller::ActorIndexController;
use crate::option::Opt;
#[derive(Clone)]
@ -27,7 +28,7 @@ impl Deref for Data {
#[derive(Clone)]
pub struct DataInner {
pub index_controller: Arc<LocalIndexController>,
pub index_controller: Arc<dyn IndexController + Send + Sync>,
pub api_keys: ApiKeys,
options: Opt,
}
@ -59,14 +60,9 @@ impl ApiKeys {
impl Data {
pub fn new(options: Opt) -> anyhow::Result<Data> {
let path = options.db_path.clone();
let indexer_opts = options.indexer_options.clone();
//let indexer_opts = options.indexer_options.clone();
create_dir_all(&path)?;
let index_controller = LocalIndexController::new(
&path,
indexer_opts,
options.max_mdb_size.get_bytes(),
options.max_udb_size.get_bytes(),
)?;
let index_controller = ActorIndexController::new();
let index_controller = Arc::new(index_controller);
let mut api_keys = ApiKeys {
@ -85,7 +81,7 @@ impl Data {
pub fn settings<S: AsRef<str>>(&self, index_uid: S) -> anyhow::Result<Settings> {
let index = self.index_controller
.index(&index_uid)?
.index(index_uid.as_ref().to_string())?
.ok_or_else(|| anyhow::anyhow!("Index {} does not exist.", index_uid.as_ref()))?;
let txn = index.read_txn()?;
@ -119,19 +115,20 @@ impl Data {
}
pub fn index(&self, name: impl AsRef<str>) -> anyhow::Result<Option<IndexMetadata>> {
Ok(self
.list_indexes()?
.into_iter()
.find(|i| i.uid == name.as_ref()))
todo!()
//Ok(self
//.list_indexes()?
//.into_iter()
//.find(|i| i.uid == name.as_ref()))
}
pub fn create_index(&self, name: impl AsRef<str>, primary_key: Option<impl AsRef<str>>) -> anyhow::Result<IndexMetadata> {
pub async fn create_index(&self, name: impl AsRef<str>, primary_key: Option<impl AsRef<str>>) -> anyhow::Result<IndexMetadata> {
let settings = IndexSettings {
name: Some(name.as_ref().to_string()),
primary_key: primary_key.map(|s| s.as_ref().to_string()),
};
let meta = self.index_controller.create_index(settings)?;
let meta = self.index_controller.create_index(settings).await?;
Ok(meta)
}

View File

@ -2,7 +2,7 @@ use std::collections::{HashSet, BTreeMap};
use std::mem;
use std::time::Instant;
use anyhow::{bail, Context};
use anyhow::bail;
use either::Either;
use heed::RoTxn;
use meilisearch_tokenizer::{Analyzer, AnalyzerConfig};
@ -11,7 +11,6 @@ use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use super::Data;
use crate::index_controller::IndexController;
pub const DEFAULT_SEARCH_LIMIT: usize = 20;
@ -202,107 +201,110 @@ impl<'a, A: AsRef<[u8]>> Highlighter<'a, A> {
impl Data {
pub fn search<S: AsRef<str>>(
&self,
index: S,
search_query: SearchQuery,
_index: S,
_search_query: SearchQuery,
) -> anyhow::Result<SearchResult> {
match self.index_controller.index(&index)? {
Some(index) => Ok(search_query.perform(index)?),
None => bail!("index {:?} doesn't exists", index.as_ref()),
}
todo!()
//match self.index_controller.index(&index)? {
//Some(index) => Ok(search_query.perform(index)?),
//None => bail!("index {:?} doesn't exists", index.as_ref()),
//}
}
pub async fn retrieve_documents<S>(
&self,
index: impl AsRef<str> + Send + Sync + 'static,
offset: usize,
limit: usize,
attributes_to_retrieve: Option<Vec<S>>,
_index: String,
_offset: usize,
_limit: usize,
_attributes_to_retrieve: Option<Vec<S>>,
) -> anyhow::Result<Vec<Map<String, Value>>>
where
S: AsRef<str> + Send + Sync + 'static,
{
let index_controller = self.index_controller.clone();
let documents: anyhow::Result<_> = tokio::task::spawn_blocking(move || {
let index = index_controller
.index(&index)?
.with_context(|| format!("Index {:?} doesn't exist", index.as_ref()))?;
todo!()
//let index_controller = self.index_controller.clone();
//let documents: anyhow::Result<_> = tokio::task::spawn_blocking(move || {
//let index = index_controller
//.index(index.clone())?
//.with_context(|| format!("Index {:?} doesn't exist", index))?;
let txn = index.read_txn()?;
//let txn = index.read_txn()?;
let fields_ids_map = index.fields_ids_map(&txn)?;
//let fields_ids_map = index.fields_ids_map(&txn)?;
let attributes_to_retrieve_ids = match attributes_to_retrieve {
Some(attrs) => attrs
.iter()
.filter_map(|f| fields_ids_map.id(f.as_ref()))
.collect::<Vec<_>>(),
None => fields_ids_map.iter().map(|(id, _)| id).collect(),
};
//let attributes_to_retrieve_ids = match attributes_to_retrieve {
//Some(attrs) => attrs
//.iter()
//.filter_map(|f| fields_ids_map.id(f.as_ref()))
//.collect::<Vec<_>>(),
//None => fields_ids_map.iter().map(|(id, _)| id).collect(),
//};
let iter = index.documents.range(&txn, &(..))?.skip(offset).take(limit);
//let iter = index.documents.range(&txn, &(..))?.skip(offset).take(limit);
let mut documents = Vec::new();
//let mut documents = Vec::new();
for entry in iter {
let (_id, obkv) = entry?;
let object = obkv_to_json(&attributes_to_retrieve_ids, &fields_ids_map, obkv)?;
documents.push(object);
}
//for entry in iter {
//let (_id, obkv) = entry?;
//let object = obkv_to_json(&attributes_to_retrieve_ids, &fields_ids_map, obkv)?;
//documents.push(object);
//}
Ok(documents)
})
.await?;
documents
//Ok(documents)
//})
//.await?;
//documents
}
pub async fn retrieve_document<S>(
&self,
index: impl AsRef<str> + Sync + Send + 'static,
document_id: impl AsRef<str> + Sync + Send + 'static,
attributes_to_retrieve: Option<Vec<S>>,
_index: impl AsRef<str> + Sync + Send + 'static,
_document_id: impl AsRef<str> + Sync + Send + 'static,
_attributes_to_retrieve: Option<Vec<S>>,
) -> anyhow::Result<Map<String, Value>>
where
S: AsRef<str> + Sync + Send + 'static,
{
let index_controller = self.index_controller.clone();
let document: anyhow::Result<_> = tokio::task::spawn_blocking(move || {
let index = index_controller
.index(&index)?
.with_context(|| format!("Index {:?} doesn't exist", index.as_ref()))?;
let txn = index.read_txn()?;
todo!()
//let index_controller = self.index_controller.clone();
//let document: anyhow::Result<_> = tokio::task::spawn_blocking(move || {
//let index = index_controller
//.index(&index)?
//.with_context(|| format!("Index {:?} doesn't exist", index.as_ref()))?;
//let txn = index.read_txn()?;
let fields_ids_map = index.fields_ids_map(&txn)?;
//let fields_ids_map = index.fields_ids_map(&txn)?;
let attributes_to_retrieve_ids = match attributes_to_retrieve {
Some(attrs) => attrs
.iter()
.filter_map(|f| fields_ids_map.id(f.as_ref()))
.collect::<Vec<_>>(),
None => fields_ids_map.iter().map(|(id, _)| id).collect(),
};
//let attributes_to_retrieve_ids = match attributes_to_retrieve {
//Some(attrs) => attrs
//.iter()
//.filter_map(|f| fields_ids_map.id(f.as_ref()))
//.collect::<Vec<_>>(),
//None => fields_ids_map.iter().map(|(id, _)| id).collect(),
//};
let internal_id = index
.external_documents_ids(&txn)?
.get(document_id.as_ref().as_bytes())
.with_context(|| format!("Document with id {} not found", document_id.as_ref()))?;
//let internal_id = index
//.external_documents_ids(&txn)?
//.get(document_id.as_ref().as_bytes())
//.with_context(|| format!("Document with id {} not found", document_id.as_ref()))?;
let document = index
.documents(&txn, std::iter::once(internal_id))?
.into_iter()
.next()
.map(|(_, d)| d);
//let document = index
//.documents(&txn, std::iter::once(internal_id))?
//.into_iter()
//.next()
//.map(|(_, d)| d);
match document {
Some(document) => Ok(obkv_to_json(
&attributes_to_retrieve_ids,
&fields_ids_map,
document,
)?),
None => bail!("Document with id {} not found", document_id.as_ref()),
}
})
.await?;
document
//match document {
//Some(document) => Ok(obkv_to_json(
//&attributes_to_retrieve_ids,
//&fields_ids_map,
//document,
//)?),
//None => bail!("Document with id {} not found", document_id.as_ref()),
//}
//})
//.await?;
//document
}
}

View File

@ -1,12 +1,13 @@
use std::ops::Deref;
use async_compression::tokio_02::write::GzipEncoder;
use futures_util::stream::StreamExt;
use milli::update::{IndexDocumentsMethod, UpdateFormat};
use tokio::io::AsyncWriteExt;
//use async_compression::tokio_02::write::GzipEncoder;
//use futures_util::stream::StreamExt;
//use milli::update::{IndexDocumentsMethod, UpdateFormat};
//use tokio::io::AsyncWriteExt;
use actix_web::web::Payload;
use crate::index_controller::UpdateStatus;
use crate::index_controller::{IndexController, Settings, IndexSettings, IndexMetadata};
use crate::index_controller::{Settings, IndexMetadata};
use super::Data;
impl Data {
@ -15,88 +16,68 @@ impl Data {
index: impl AsRef<str> + Send + Sync + 'static,
method: IndexDocumentsMethod,
format: UpdateFormat,
mut stream: impl futures::Stream<Item=Result<B, E>> + Unpin,
stream: Payload,
primary_key: Option<String>,
) -> anyhow::Result<UpdateStatus>
where
B: Deref<Target = [u8]>,
E: std::error::Error + Send + Sync + 'static,
{
let file = tokio::task::spawn_blocking(tempfile::tempfile).await?;
let file = tokio::fs::File::from_std(file?);
let mut encoder = GzipEncoder::new(file);
let mut empty_update = true;
while let Some(result) = stream.next().await {
empty_update = false;
let bytes = &*result?;
encoder.write_all(&bytes[..]).await?;
}
encoder.shutdown().await?;
let mut file = encoder.into_inner();
file.sync_all().await?;
let file = file.into_std().await;
let index_controller = self.index_controller.clone();
let update = tokio::task::spawn_blocking(move ||{
let mmap;
let bytes = if empty_update {
&[][..]
} else {
mmap = unsafe { memmap::Mmap::map(&file)? };
&mmap
};
index_controller.add_documents(index, method, format, &bytes, primary_key)
}).await??;
Ok(update.into())
let update_status = self.index_controller.add_documents(index.as_ref().to_string(), method, format, stream, primary_key).await?;
Ok(update_status)
}
pub async fn update_settings(
&self,
index: impl AsRef<str> + Send + Sync + 'static,
settings: Settings
_index: impl AsRef<str> + Send + Sync + 'static,
_settings: Settings
) -> anyhow::Result<UpdateStatus> {
let index_controller = self.index_controller.clone();
let update = tokio::task::spawn_blocking(move || index_controller.update_settings(index, settings)).await??;
Ok(update.into())
todo!()
//let index_controller = self.index_controller.clone();
//let update = tokio::task::spawn_blocking(move || index_controller.update_settings(index, settings)).await??;
//Ok(update.into())
}
pub async fn clear_documents(
&self,
index: impl AsRef<str> + Sync + Send + 'static,
_index: impl AsRef<str> + Sync + Send + 'static,
) -> anyhow::Result<UpdateStatus> {
let index_controller = self.index_controller.clone();
let update = tokio::task::spawn_blocking(move || index_controller.clear_documents(index)).await??;
Ok(update.into())
todo!()
//let index_controller = self.index_controller.clone();
//let update = tokio::task::spawn_blocking(move || index_controller.clear_documents(index)).await??;
//Ok(update.into())
}
pub async fn delete_documents(
&self,
index: impl AsRef<str> + Sync + Send + 'static,
document_ids: Vec<String>,
_index: impl AsRef<str> + Sync + Send + 'static,
_document_ids: Vec<String>,
) -> anyhow::Result<UpdateStatus> {
let index_controller = self.index_controller.clone();
let update = tokio::task::spawn_blocking(move || index_controller.delete_documents(index, document_ids)).await??;
Ok(update.into())
todo!()
//let index_controller = self.index_controller.clone();
//let update = tokio::task::spawn_blocking(move || index_controller.delete_documents(index, document_ids)).await??;
//Ok(update.into())
}
pub async fn delete_index(
&self,
index: impl AsRef<str> + Send + Sync + 'static,
_index: impl AsRef<str> + Send + Sync + 'static,
) -> anyhow::Result<()> {
let index_controller = self.index_controller.clone();
tokio::task::spawn_blocking(move || { index_controller.delete_index(index) }).await??;
Ok(())
todo!()
//let index_controller = self.index_controller.clone();
//tokio::task::spawn_blocking(move || { index_controller.delete_index(index) }).await??;
//Ok(())
}
#[inline]
pub fn get_update_status(&self, index: impl AsRef<str>, uid: u64) -> anyhow::Result<Option<UpdateStatus>> {
self.index_controller.update_status(index, uid)
todo!()
//self.index_controller.update_status(index, uid)
}
pub fn get_updates_status(&self, index: impl AsRef<str>) -> anyhow::Result<Vec<UpdateStatus>> {
self.index_controller.all_update_status(index)
todo!()
//self.index_controller.all_update_status(index)
}
pub fn update_index(
@ -105,11 +86,12 @@ impl Data {
primary_key: Option<impl AsRef<str>>,
new_name: Option<impl AsRef<str>>
) -> anyhow::Result<IndexMetadata> {
let settings = IndexSettings {
name: new_name.map(|s| s.as_ref().to_string()),
primary_key: primary_key.map(|s| s.as_ref().to_string()),
};
todo!()
//let settings = IndexSettings {
//name: new_name.map(|s| s.as_ref().to_string()),
//primary_key: primary_key.map(|s| s.as_ref().to_string()),
//};
self.index_controller.update_index(name, settings)
//self.index_controller.update_index(name, settings)
}
}

View File

@ -1,7 +1,8 @@
use std::error;
use std::fmt;
use actix_http::ResponseBuilder;
use actix_web::dev::HttpResponseBuilder;
use actix_web::http::Error as HttpError;
use actix_web as aweb;
use actix_web::error::{JsonPayloadError, QueryPayloadError};
use actix_web::http::StatusCode;
@ -66,7 +67,7 @@ impl Serialize for ResponseError {
impl aweb::error::ResponseError for ResponseError {
fn error_response(&self) -> aweb::HttpResponse {
ResponseBuilder::new(self.status_code()).json(&self)
HttpResponseBuilder::new(self.status_code()).json(&self)
}
fn status_code(&self) -> StatusCode {
@ -260,8 +261,8 @@ impl From<std::io::Error> for Error {
}
}
impl From<actix_http::Error> for Error {
fn from(err: actix_http::Error) -> Error {
impl From<HttpError> for Error {
fn from(err: HttpError) -> Error {
Error::Internal(err.to_string())
}
}

View File

@ -3,27 +3,26 @@ use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use actix_service::{Service, Transform};
use actix_web::{dev::ServiceRequest, dev::ServiceResponse, web};
use actix_web::dev::{Transform, Service, ServiceResponse, ServiceRequest};
use actix_web::web;
use futures::future::{err, ok, Future, Ready};
use crate::error::{Error, ResponseError};
use crate::Data;
#[derive(Clone)]
#[derive(Clone, Copy)]
pub enum Authentication {
Public,
Private,
Admin,
}
impl<S: 'static, B> Transform<S> for Authentication
impl<S: 'static, B> Transform<S, ServiceRequest> for Authentication
where
S: Service<Request = ServiceRequest, Response = ServiceResponse<B>, Error = actix_web::Error>,
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = actix_web::Error>,
S::Future: 'static,
B: 'static,
{
type Request = ServiceRequest;
type Response = ServiceResponse<B>;
type Error = actix_web::Error;
type InitError = ();
@ -32,7 +31,7 @@ where
fn new_transform(&self, service: S) -> Self::Future {
ok(LoggingMiddleware {
acl: self.clone(),
acl: *self,
service: Rc::new(RefCell::new(service)),
})
}
@ -44,23 +43,22 @@ pub struct LoggingMiddleware<S> {
}
#[allow(clippy::type_complexity)]
impl<S, B> Service for LoggingMiddleware<S>
impl<S, B> Service<ServiceRequest> for LoggingMiddleware<S>
where
S: Service<Request = ServiceRequest, Response = ServiceResponse<B>, Error = actix_web::Error> + 'static,
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = actix_web::Error> + 'static,
S::Future: 'static,
B: 'static,
{
type Request = ServiceRequest;
type Response = ServiceResponse<B>;
type Error = actix_web::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}
fn call(&mut self, req: ServiceRequest) -> Self::Future {
let mut svc = self.service.clone();
fn call(&self, req: ServiceRequest) -> Self::Future {
let svc = self.service.clone();
// This unwrap is left because this error should never appear. If that's the case, then
// it means that actix-web has an issue or someone changes the type `Data`.
let data = req.app_data::<web::Data<Data>>().unwrap();

View File

@ -1,5 +1,5 @@
/// From https://docs.rs/actix-web/3.0.0-alpha.2/src/actix_web/middleware/normalize.rs.html#34
use actix_http::Error;
use actix_web::http::Error;
use actix_service::{Service, Transform};
use actix_web::{
dev::ServiceRequest,

View File

@ -1,8 +1,7 @@
mod local_index_controller;
pub mod actor_index_controller;
//mod local_index_controller;
mod updates;
pub use local_index_controller::LocalIndexController;
use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::sync::Arc;
@ -13,6 +12,7 @@ use milli::Index;
use milli::update::{IndexDocumentsMethod, UpdateFormat, DocumentAdditionResult};
use serde::{Serialize, Deserialize, de::Deserializer};
use uuid::Uuid;
use actix_web::web::Payload;
pub use updates::{Processed, Processing, Failed};
@ -21,7 +21,6 @@ pub type UpdateStatus = updates::UpdateStatus<UpdateMeta, UpdateResult, String>;
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct IndexMetadata {
pub uid: String,
uuid: Uuid,
created_at: DateTime<Utc>,
updated_at: DateTime<Utc>,
@ -114,6 +113,7 @@ pub struct IndexSettings {
/// be provided. This allows the implementer to define the behaviour of write accesses to the
/// indices, and abstract the scheduling of the updates. The implementer must be able to provide an
/// instance of `IndexStore`
#[async_trait::async_trait]
pub trait IndexController {
/*
@ -126,59 +126,47 @@ pub trait IndexController {
/// Perform document addition on the database. If the provided index does not exist, it will be
/// created when the addition is applied to the index.
fn add_documents<S: AsRef<str>>(
async fn add_documents(
&self,
index: S,
index: String,
method: IndexDocumentsMethod,
format: UpdateFormat,
data: &[u8],
data: Payload,
primary_key: Option<String>,
) -> anyhow::Result<UpdateStatus>;
/// Clear all documents in the given index.
fn clear_documents(&self, index: impl AsRef<str>) -> anyhow::Result<UpdateStatus>;
fn clear_documents(&self, index: String) -> anyhow::Result<UpdateStatus>;
/// Delete all documents in `document_ids`.
fn delete_documents(&self, index: impl AsRef<str>, document_ids: Vec<String>) -> anyhow::Result<UpdateStatus>;
fn delete_documents(&self, index: String, document_ids: Vec<String>) -> anyhow::Result<UpdateStatus>;
/// Updates an index settings. If the index does not exist, it will be created when the update
/// is applied to the index.
fn update_settings<S: AsRef<str>>(&self, index_uid: S, settings: Settings) -> anyhow::Result<UpdateStatus>;
fn update_settings(&self, index_uid: String, settings: Settings) -> anyhow::Result<UpdateStatus>;
/// Create an index with the given `index_uid`.
fn create_index(&self, index_settings: IndexSettings) -> Result<IndexMetadata>;
async fn create_index(&self, index_settings: IndexSettings) -> Result<IndexMetadata>;
/// Delete index with the given `index_uid`, attempting to close it beforehand.
fn delete_index<S: AsRef<str>>(&self, index_uid: S) -> Result<()>;
fn delete_index(&self, index_uid: String) -> Result<()>;
/// Swap two indexes, concretely, it simply swaps the index the names point to.
fn swap_indices<S1: AsRef<str>, S2: AsRef<str>>(&self, index1_uid: S1, index2_uid: S2) -> Result<()>;
/// Apply an update to the given index. This method can be called when an update is ready to be
/// processed
fn handle_update<S: AsRef<str>>(
&self,
_index: S,
_update_id: u64,
_meta: Processing<UpdateMeta>,
_content: &[u8]
) -> Result<Processed<UpdateMeta, UpdateResult>, Failed<UpdateMeta, String>> {
todo!()
}
fn swap_indices(&self, index1_uid: String, index2_uid: String) -> Result<()>;
/// Returns, if it exists, the `Index` with the povided name.
fn index(&self, name: impl AsRef<str>) -> anyhow::Result<Option<Arc<Index>>>;
fn index(&self, name: String) -> anyhow::Result<Option<Arc<Index>>>;
/// Returns the udpate status an update
fn update_status(&self, index: impl AsRef<str>, id: u64) -> anyhow::Result<Option<UpdateStatus>>;
fn update_status(&self, index: String, id: u64) -> anyhow::Result<Option<UpdateStatus>>;
/// Returns all the udpate status for an index
fn all_update_status(&self, index: impl AsRef<str>) -> anyhow::Result<Vec<UpdateStatus>>;
fn all_update_status(&self, index: String) -> anyhow::Result<Vec<UpdateStatus>>;
/// List all the indexes
fn list_indexes(&self) -> anyhow::Result<Vec<IndexMetadata>>;
fn update_index(&self, name: impl AsRef<str>, index_settings: IndexSettings) -> anyhow::Result<IndexMetadata>;
fn update_index(&self, name: String, index_settings: IndexSettings) -> anyhow::Result<IndexMetadata>;
}

View File

@ -1,4 +1,7 @@
#![allow(clippy::or_fun_call)]
#![allow(unused_must_use)]
#![allow(unused_variables)]
#![allow(dead_code)]
pub mod data;
pub mod error;
@ -7,54 +10,5 @@ pub mod option;
pub mod routes;
mod index_controller;
use actix_http::Error;
use actix_service::ServiceFactory;
use actix_web::{dev, web, App};
pub use option::Opt;
pub use self::data::Data;
use self::error::payload_error_handler;
pub fn create_app(
data: &Data,
enable_frontend: bool,
) -> App<
impl ServiceFactory<
Config = (),
Request = dev::ServiceRequest,
Response = dev::ServiceResponse<actix_http::body::Body>,
Error = Error,
InitError = (),
>,
actix_http::body::Body,
> {
let app = App::new()
.data(data.clone())
.app_data(
web::JsonConfig::default()
.limit(data.http_payload_size_limit())
.content_type(|_mime| true) // Accept all mime types
.error_handler(|err, _req| payload_error_handler(err).into()),
)
.app_data(
web::QueryConfig::default()
.error_handler(|err, _req| payload_error_handler(err).into())
)
.configure(routes::document::services)
.configure(routes::index::services)
.configure(routes::search::services)
.configure(routes::settings::services)
.configure(routes::stop_words::services)
.configure(routes::synonym::services)
.configure(routes::health::services)
.configure(routes::stats::services)
.configure(routes::key::services);
//.configure(routes::dump::services);
if enable_frontend {
app
.service(routes::load_html)
.service(routes::load_css)
} else {
app
}
}

View File

@ -1,11 +1,13 @@
use std::env;
use actix_cors::Cors;
use actix_web::{middleware, HttpServer};
use actix_web::{middleware, HttpServer, web, web::ServiceConfig};
use main_error::MainError;
use meilisearch_http::helpers::NormalizePath;
use meilisearch_http::{create_app, Data, Opt};
use meilisearch::{Data, Opt};
use structopt::StructOpt;
use actix_web::App;
use meilisearch::error::payload_error_handler;
use actix_web::middleware::TrailingSlash;
//mod analytics;
@ -74,9 +76,34 @@ async fn main() -> Result<(), MainError> {
print_launch_resume(&opt, &data);
let enable_frontend = opt.env != "production";
run_http(data, opt, enable_frontend).await?;
Ok(())
}
async fn run_http(data: Data, opt: Opt, enable_frontend: bool) -> Result<(), Box<dyn std::error::Error>> {
let http_server = HttpServer::new(move || {
create_app(&data, enable_frontend)
.wrap(
let app = App::new()
.configure(|c| configure_data(c, &data))
.configure(meilisearch::routes::document::services)
.configure(meilisearch::routes::index::services)
.configure(meilisearch::routes::search::services)
.configure(meilisearch::routes::settings::services)
.configure(meilisearch::routes::stop_words::services)
.configure(meilisearch::routes::synonym::services)
.configure(meilisearch::routes::health::services)
.configure(meilisearch::routes::stats::services)
.configure(meilisearch::routes::key::services);
//.configure(routes::dump::services);
let app = if enable_frontend {
app
.service(meilisearch::routes::load_html)
.service(meilisearch::routes::load_css)
} else {
app
};
app.wrap(
Cors::default()
.send_wildcard()
.allowed_headers(vec!["content-type", "x-meili-api-key"])
@ -84,7 +111,7 @@ async fn main() -> Result<(), MainError> {
)
.wrap(middleware::Logger::default())
.wrap(middleware::Compress::default())
.wrap(NormalizePath)
.wrap(middleware::NormalizePath::new(TrailingSlash::Trim))
});
if let Some(config) = opt.get_ssl_config()? {
@ -95,10 +122,24 @@ async fn main() -> Result<(), MainError> {
} else {
http_server.bind(opt.http_addr)?.run().await?;
}
Ok(())
}
fn configure_data(config: &mut ServiceConfig, data: &Data) {
config
.data(data.clone())
.app_data(
web::JsonConfig::default()
.limit(data.http_payload_size_limit())
.content_type(|_mime| true) // Accept all mime types
.error_handler(|err, _req| payload_error_handler(err).into()),
)
.app_data(
web::QueryConfig::default()
.error_handler(|err, _req| payload_error_handler(err).into())
);
}
pub fn print_launch_resume(opt: &Opt, data: &Data) {
let ascii_name = r#"
888b d888 d8b 888 d8b .d8888b. 888

View File

@ -3,7 +3,7 @@ use actix_web::{delete, get, post, put};
use actix_web::{web, HttpResponse};
use indexmap::IndexMap;
use log::error;
use milli::update::{IndexDocumentsMethod, UpdateFormat};
//use milli::update::{IndexDocumentsMethod, UpdateFormat};
use serde::Deserialize;
use serde_json::Value;
@ -142,25 +142,26 @@ async fn add_documents_json(
params: web::Query<UpdateDocumentsQuery>,
body: Payload,
) -> Result<HttpResponse, ResponseError> {
let addition_result = data
.add_documents(
path.into_inner().index_uid,
IndexDocumentsMethod::ReplaceDocuments,
UpdateFormat::Json,
body,
params.primary_key.clone(),
).await;
todo!()
//let addition_result = data
//.add_documents(
//path.into_inner().index_uid,
//IndexDocumentsMethod::ReplaceDocuments,
//UpdateFormat::Json,
//body,
//params.primary_key.clone(),
//).await;
match addition_result {
Ok(update) => {
let value = serde_json::to_string(&update).unwrap();
let response = HttpResponse::Ok().body(value);
Ok(response)
}
Err(e) => {
Ok(HttpResponse::BadRequest().body(serde_json::json!({ "error": e.to_string() })))
}
}
//match addition_result {
//Ok(update) => {
//let value = serde_json::to_string(&update).unwrap();
//let response = HttpResponse::Ok().body(value);
//Ok(response)
//}
//Err(e) => {
//Ok(HttpResponse::BadRequest().body(serde_json::json!({ "error": e.to_string() })))
//}
//}
}
@ -199,25 +200,26 @@ async fn update_documents(
params: web::Query<UpdateDocumentsQuery>,
body: web::Payload,
) -> Result<HttpResponse, ResponseError> {
let addition_result = data
.add_documents(
path.into_inner().index_uid,
IndexDocumentsMethod::UpdateDocuments,
UpdateFormat::Json,
body,
params.primary_key.clone(),
).await;
todo!()
//let addition_result = data
//.add_documents(
//path.into_inner().index_uid,
//IndexDocumentsMethod::UpdateDocuments,
//UpdateFormat::Json,
//body,
//params.primary_key.clone(),
//).await;
match addition_result {
Ok(update) => {
let value = serde_json::to_string(&update).unwrap();
let response = HttpResponse::Ok().body(value);
Ok(response)
}
Err(e) => {
Ok(HttpResponse::BadRequest().body(serde_json::json!({ "error": e.to_string() })))
}
}
//match addition_result {
//Ok(update) => {
//let value = serde_json::to_string(&update).unwrap();
//let response = HttpResponse::Ok().body(value);
//Ok(response)
//}
//Err(e) => {
//Ok(HttpResponse::BadRequest().body(serde_json::json!({ "error": e.to_string() })))
//}
//}
}
#[post(
@ -229,20 +231,21 @@ async fn delete_documents(
path: web::Path<IndexParam>,
body: web::Json<Vec<Value>>,
) -> Result<HttpResponse, ResponseError> {
let ids = body
.iter()
.map(|v| v.as_str().map(String::from).unwrap_or_else(|| v.to_string()))
.collect();
todo!()
//let ids = body
//.iter()
//.map(|v| v.as_str().map(String::from).unwrap_or_else(|| v.to_string()))
//.collect();
match data.delete_documents(path.index_uid.clone(), ids).await {
Ok(result) => {
let json = serde_json::to_string(&result).unwrap();
Ok(HttpResponse::Ok().body(json))
}
Err(e) => {
Ok(HttpResponse::BadRequest().body(serde_json::json!({ "error": e.to_string() })))
}
}
//match data.delete_documents(path.index_uid.clone(), ids).await {
//Ok(result) => {
//let json = serde_json::to_string(&result).unwrap();
//Ok(HttpResponse::Ok().body(json))
//}
//Err(e) => {
//Ok(HttpResponse::BadRequest().body(serde_json::json!({ "error": e.to_string() })))
//}
//}
}
#[delete("/indexes/{index_uid}/documents", wrap = "Authentication::Private")]
@ -250,13 +253,14 @@ async fn clear_all_documents(
data: web::Data<Data>,
path: web::Path<IndexParam>,
) -> Result<HttpResponse, ResponseError> {
match data.clear_documents(path.index_uid.clone()).await {
Ok(update) => {
let json = serde_json::to_string(&update).unwrap();
Ok(HttpResponse::Ok().body(json))
}
Err(e) => {
Ok(HttpResponse::BadRequest().body(serde_json::json!({ "error": e.to_string() })))
}
}
todo!()
//match data.clear_documents(path.index_uid.clone()).await {
//Ok(update) => {
//let json = serde_json::to_string(&update).unwrap();
//Ok(HttpResponse::Ok().body(json))
//}
//Err(e) => {
//Ok(HttpResponse::BadRequest().body(serde_json::json!({ "error": e.to_string() })))
//}
//}
}

View File

@ -61,7 +61,7 @@ async fn create_index(
data: web::Data<Data>,
body: web::Json<IndexCreateRequest>,
) -> Result<HttpResponse, ResponseError> {
match data.create_index(&body.uid, body.primary_key.clone()) {
match data.create_index(&body.uid, body.primary_key.clone()).await {
Ok(meta) => {
let json = serde_json::to_string(&meta).unwrap();
Ok(HttpResponse::Ok().body(json))

View File

@ -19,7 +19,7 @@ struct KeysResponse {
#[get("/keys", wrap = "Authentication::Admin")]
async fn list(data: web::Data<Data>) -> HttpResponse {
let api_keys = data.api_keys.clone();
HttpResponse::Ok().json(KeysResponse {
HttpResponse::Ok().json(&KeysResponse {
private: api_keys.private,
public: api_keys.public,
})