Start change http server; finish document endpoint

This commit is contained in:
Quentin de Quelen
2020-04-07 17:47:35 +02:00
committed by qdequele
parent 6cc80d2565
commit 6d6c8e8fb2
10 changed files with 1125 additions and 813 deletions

1167
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -756,16 +756,16 @@ mod tests {
update_reader.abort(); update_reader.abort();
let reader = db.main_read_txn().unwrap(); let reader = db.main_read_txn().unwrap();
let document: Option<IgnoredAny> = index.document(&reader, None, DocumentId(25)).unwrap(); let document: Option<IgnoredAny> = index.document::<_, String>(&reader, None, DocumentId(25)).unwrap();
assert!(document.is_none()); assert!(document.is_none());
let document: Option<IgnoredAny> = index let document: Option<IgnoredAny> = index
.document(&reader, None, DocumentId(7_900_334_843_754_999_545)) .document::<_, String>(&reader, None, DocumentId(7_900_334_843_754_999_545))
.unwrap(); .unwrap();
assert!(document.is_some()); assert!(document.is_some());
let document: Option<IgnoredAny> = index let document: Option<IgnoredAny> = index
.document(&reader, None, DocumentId(8_367_468_610_878_465_872)) .document::<_, String>(&reader, None, DocumentId(8_367_468_610_878_465_872))
.unwrap(); .unwrap();
assert!(document.is_some()); assert!(document.is_some());
} }
@ -836,16 +836,16 @@ mod tests {
update_reader.abort(); update_reader.abort();
let reader = db.main_read_txn().unwrap(); let reader = db.main_read_txn().unwrap();
let document: Option<IgnoredAny> = index.document(&reader, None, DocumentId(25)).unwrap(); let document: Option<IgnoredAny> = index.document::<_, String>(&reader, None, DocumentId(25)).unwrap();
assert!(document.is_none()); assert!(document.is_none());
let document: Option<IgnoredAny> = index let document: Option<IgnoredAny> = index
.document(&reader, None, DocumentId(7_900_334_843_754_999_545)) .document::<_, String>(&reader, None, DocumentId(7_900_334_843_754_999_545))
.unwrap(); .unwrap();
assert!(document.is_some()); assert!(document.is_some());
let document: Option<IgnoredAny> = index let document: Option<IgnoredAny> = index
.document(&reader, None, DocumentId(8_367_468_610_878_465_872)) .document::<_, String>(&reader, None, DocumentId(8_367_468_610_878_465_872))
.unwrap(); .unwrap();
assert!(document.is_some()); assert!(document.is_some());
@ -882,7 +882,7 @@ mod tests {
let reader = db.main_read_txn().unwrap(); let reader = db.main_read_txn().unwrap();
let document: Option<serde_json::Value> = index let document: Option<serde_json::Value> = index
.document(&reader, None, DocumentId(7_900_334_843_754_999_545)) .document::<_, String>(&reader, None, DocumentId(7_900_334_843_754_999_545))
.unwrap(); .unwrap();
let new_doc1 = serde_json::json!({ let new_doc1 = serde_json::json!({
@ -893,7 +893,7 @@ mod tests {
assert_eq!(document, Some(new_doc1)); assert_eq!(document, Some(new_doc1));
let document: Option<serde_json::Value> = index let document: Option<serde_json::Value> = index
.document(&reader, None, DocumentId(8_367_468_610_878_465_872)) .document::<_, String>(&reader, None, DocumentId(8_367_468_610_878_465_872))
.unwrap(); .unwrap();
let new_doc2 = serde_json::json!({ let new_doc2 = serde_json::json!({

View File

@ -214,17 +214,17 @@ pub struct Index {
} }
impl Index { impl Index {
pub fn document<T: de::DeserializeOwned>( pub fn document<T: de::DeserializeOwned, R: AsRef<str>>(
&self, &self,
reader: &heed::RoTxn<MainT>, reader: &heed::RoTxn<MainT>,
attributes: Option<&HashSet<&str>>, attributes: Option<HashSet<R>>,
document_id: DocumentId, document_id: DocumentId,
) -> MResult<Option<T>> { ) -> MResult<Option<T>> {
let schema = self.main.schema(reader)?; let schema = self.main.schema(reader)?;
let schema = schema.ok_or(Error::SchemaMissing)?; let schema = schema.ok_or(Error::SchemaMissing)?;
let attributes = match attributes { let attributes = match attributes {
Some(attributes) => Some(attributes.iter().filter_map(|name| schema.id(*name)).collect()), Some(attributes) => Some(attributes.iter().filter_map(|name| schema.id(name.as_ref())).collect()),
None => None, None => None,
}; };

View File

@ -14,7 +14,7 @@ name = "meilisearch"
path = "src/main.rs" path = "src/main.rs"
[dependencies] [dependencies]
async-std = { version = "1.5.0", features = ["attributes"] } # async-std = { version = "1.5.0", features = ["attributes"] }
chrono = { version = "0.4.11", features = ["serde"] } chrono = { version = "0.4.11", features = ["serde"] }
crossbeam-channel = "0.4.2" crossbeam-channel = "0.4.2"
env_logger = "0.7.1" env_logger = "0.7.1"
@ -38,11 +38,15 @@ sha2 = "0.8.1"
siphasher = "0.3.2" siphasher = "0.3.2"
structopt = "0.3.12" structopt = "0.3.12"
sysinfo = "0.12.0" sysinfo = "0.12.0"
tide = "0.6.0" # tide = "0.6.0"
ureq = { version = "0.12.0", features = ["tls"], default-features = false } ureq = { version = "0.12.0", features = ["tls"], default-features = false }
walkdir = "2.3.1" walkdir = "2.3.1"
whoami = "0.8.1" whoami = "0.8.1"
slice-group-by = "0.2.6" slice-group-by = "0.2.6"
actix-rt = "1"
actix-web = "2"
actix-http = "1"
tokio = { version = "0.2.0", features = ["macros"] }
[dev-dependencies] [dev-dependencies]
http-service = "0.4.0" http-service = "0.4.0"

View File

@ -10,7 +10,7 @@ use sha2::Digest;
use sysinfo::Pid; use sysinfo::Pid;
use crate::option::Opt; use crate::option::Opt;
use crate::routes::index::index_update_callback; // use crate::routes::index::index_update_callback;
const LAST_UPDATE_KEY: &str = "last-update"; const LAST_UPDATE_KEY: &str = "last-update";
@ -155,7 +155,7 @@ impl Data {
let callback_context = data.clone(); let callback_context = data.clone();
db.set_update_callback(Box::new(move |index_uid, status| { db.set_update_callback(Box::new(move |index_uid, status| {
index_update_callback(&index_uid, &callback_context, status); // index_update_callback(&index_uid, &callback_context, status);
})); }));
data data

View File

@ -1,16 +1,14 @@
use std::fmt::Display; use std::fmt;
use http::status::StatusCode;
use log::{error, warn};
use meilisearch_core::{FstError, HeedError}; use meilisearch_core::{FstError, HeedError};
use serde::{Deserialize, Serialize}; use serde_json::json;
use tide::IntoResponse; use actix_http::{ResponseBuilder, Response};
use tide::Response; use actix_web::http::StatusCode;
use actix_web::*;
use futures::future::{ok, Ready};
use crate::helpers::meilisearch::Error as SearchError; // use crate::helpers::meilisearch::Error as SearchError;
pub type SResult<T> = Result<T, ResponseError>;
#[derive(Debug)]
pub enum ResponseError { pub enum ResponseError {
Internal(String), Internal(String),
BadRequest(String), BadRequest(String),
@ -23,169 +21,113 @@ pub enum ResponseError {
BadParameter(String, String), BadParameter(String, String),
OpenIndex(String), OpenIndex(String),
CreateIndex(String), CreateIndex(String),
CreateTransaction,
CommitTransaction,
Schema,
InferPrimaryKey,
InvalidIndexUid, InvalidIndexUid,
Maintenance, Maintenance,
} }
impl ResponseError { impl fmt::Display for ResponseError {
pub fn internal(message: impl Display) -> ResponseError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
ResponseError::Internal(message.to_string())
}
pub fn bad_request(message: impl Display) -> ResponseError {
ResponseError::BadRequest(message.to_string())
}
pub fn invalid_token(message: impl Display) -> ResponseError {
ResponseError::InvalidToken(message.to_string())
}
pub fn not_found(message: impl Display) -> ResponseError {
ResponseError::NotFound(message.to_string())
}
pub fn index_not_found(message: impl Display) -> ResponseError {
ResponseError::IndexNotFound(message.to_string())
}
pub fn document_not_found(message: impl Display) -> ResponseError {
ResponseError::DocumentNotFound(message.to_string())
}
pub fn missing_header(message: impl Display) -> ResponseError {
ResponseError::MissingHeader(message.to_string())
}
pub fn bad_parameter(name: impl Display, message: impl Display) -> ResponseError {
ResponseError::BadParameter(name.to_string(), message.to_string())
}
pub fn open_index(message: impl Display) -> ResponseError {
ResponseError::OpenIndex(message.to_string())
}
pub fn create_index(message: impl Display) -> ResponseError {
ResponseError::CreateIndex(message.to_string())
}
}
impl IntoResponse for ResponseError {
fn into_response(self) -> Response {
match self { match self {
ResponseError::Internal(err) => { Self::Internal(err) => write!(f, "Internal server error: {}", err),
error!("internal server error: {}", err); Self::BadRequest(err) => write!(f, "Bad request: {}", err),
error("Internal server error".to_string(), Self::InvalidToken(err) => write!(f, "Invalid API key: {}", err),
StatusCode::INTERNAL_SERVER_ERROR, Self::NotFound(err) => write!(f, "{} not found", err),
) Self::IndexNotFound(index_uid) => write!(f, "Index {} not found", index_uid),
} Self::DocumentNotFound(document_id) => write!(f, "Document with id {} not found", document_id),
ResponseError::FilterParsing(err) => { Self::MissingHeader(header) => write!(f, "Header {} is missing", header),
warn!("error paring filter: {}", err); Self::BadParameter(param, err) => write!(f, "Url parameter {} error: {}", param, err),
error(format!("parsing error: {}", err), Self::OpenIndex(err) => write!(f, "Impossible to open index; {}", err),
StatusCode::BAD_REQUEST) Self::CreateIndex(err) => write!(f, "Impossible to create index; {}", err),
} Self::CreateTransaction => write!(f, "Impossible to create transaction"),
ResponseError::BadRequest(err) => { Self::CommitTransaction => write!(f, "Impossible to commit transaction"),
warn!("bad request: {}", err); Self::Schema => write!(f, "Internal schema is innaccessible"),
error(err, StatusCode::BAD_REQUEST) Self::InferPrimaryKey => write!(f, "Could not infer primary key"),
} Self::InvalidIndexUid => write!(f, "Index must have a valid uid; Index uid can be of type integer or string only composed of alphanumeric characters, hyphens (-) and underscores (_)."),
ResponseError::InvalidToken(err) => { Self::Maintenance => write!(f, "Server is in maintenance, please try again later"),
error(format!("Invalid API key: {}", err), StatusCode::FORBIDDEN) Self::FilterParsing(err) => write!(f, "parsing error: {}", err),
}
ResponseError::NotFound(err) => error(err, StatusCode::NOT_FOUND),
ResponseError::IndexNotFound(index) => {
error(format!("Index {} not found", index), StatusCode::NOT_FOUND)
}
ResponseError::DocumentNotFound(id) => error(
format!("Document with id {} not found", id),
StatusCode::NOT_FOUND,
),
ResponseError::MissingHeader(header) => error(
format!("Header {} is missing", header),
StatusCode::UNAUTHORIZED,
),
ResponseError::BadParameter(param, e) => error(
format!("Url parameter {} error: {}", param, e),
StatusCode::BAD_REQUEST,
),
ResponseError::CreateIndex(err) => error(
format!("Impossible to create index; {}", err),
StatusCode::BAD_REQUEST,
),
ResponseError::OpenIndex(err) => error(
format!("Impossible to open index; {}", err),
StatusCode::BAD_REQUEST,
),
ResponseError::InvalidIndexUid => error(
"Index must have a valid uid; Index uid can be of type integer or string only composed of alphanumeric characters, hyphens (-) and underscores (_).".to_string(),
StatusCode::BAD_REQUEST,
),
ResponseError::Maintenance => error(
String::from("Server is in maintenance, please try again later"),
StatusCode::SERVICE_UNAVAILABLE,
),
} }
} }
} }
#[derive(Serialize, Deserialize)] impl error::ResponseError for ResponseError {
struct ErrorMessage { fn error_response(&self) -> HttpResponse {
message: String, ResponseBuilder::new(self.status_code()).json(json!({
"message": self.to_string(),
}))
}
fn status_code(&self) -> StatusCode {
match *self {
Self::Internal(_) => StatusCode::INTERNAL_SERVER_ERROR,
Self::BadRequest(_) => StatusCode::BAD_REQUEST,
Self::InvalidToken(_) => StatusCode::FORBIDDEN,
Self::NotFound(_) => StatusCode::NOT_FOUND,
Self::IndexNotFound(_) => StatusCode::NOT_FOUND,
Self::DocumentNotFound(_) => StatusCode::NOT_FOUND,
Self::MissingHeader(_) => StatusCode::UNAUTHORIZED,
Self::BadParameter(_, _) => StatusCode::BAD_REQUEST,
Self::OpenIndex(_) => StatusCode::BAD_REQUEST,
Self::CreateIndex(_) => StatusCode::BAD_REQUEST,
Self::CreateTransaction => StatusCode::INTERNAL_SERVER_ERROR,
Self::CommitTransaction => StatusCode::INTERNAL_SERVER_ERROR,
Self::Schema => StatusCode::INTERNAL_SERVER_ERROR,
Self::InferPrimaryKey => StatusCode::BAD_REQUEST,
Self::InvalidIndexUid => StatusCode::BAD_REQUEST,
Self::Maintenance => StatusCode::SERVICE_UNAVAILABLE,
Self::FilterParsing(_) => StatusCode::BAD_REQUEST,
}
}
} }
fn error(message: String, status: StatusCode) -> Response { // impl Responder for ResponseError {
let message = ErrorMessage { message }; // type Error = Error;
tide::Response::new(status.as_u16()) // type Future = Ready<Result<Response, Error>>;
.body_json(&message)
.unwrap() // #[inline]
} // fn respond_to(self, req: &HttpRequest) -> Self::Future {
// ok(self.error_response())
// }
// }
impl From<serde_json::Error> for ResponseError { impl From<serde_json::Error> for ResponseError {
fn from(err: serde_json::Error) -> ResponseError { fn from(err: serde_json::Error) -> ResponseError {
ResponseError::internal(err) ResponseError::Internal(err.to_string())
} }
} }
impl From<meilisearch_core::Error> for ResponseError { impl From<meilisearch_core::Error> for ResponseError {
fn from(err: meilisearch_core::Error) -> ResponseError { fn from(err: meilisearch_core::Error) -> ResponseError {
ResponseError::internal(err) ResponseError::Internal(err.to_string())
} }
} }
impl From<HeedError> for ResponseError { impl From<HeedError> for ResponseError {
fn from(err: HeedError) -> ResponseError { fn from(err: HeedError) -> ResponseError {
ResponseError::internal(err) ResponseError::Internal(err.to_string())
} }
} }
impl From<FstError> for ResponseError { impl From<FstError> for ResponseError {
fn from(err: FstError) -> ResponseError { fn from(err: FstError) -> ResponseError {
ResponseError::internal(err) ResponseError::Internal(err.to_string())
} }
} }
impl From<SearchError> for ResponseError { // impl From<SearchError> for ResponseError {
fn from(err: SearchError) -> ResponseError { // fn from(err: SearchError) -> ResponseError {
match err { // match err {
SearchError::FilterParsing(s) => ResponseError::FilterParsing(s), // SearchError::FilterParsing(s) => ResponseError::FilterParsing(s),
_ => ResponseError::internal(err), // _ => ResponseError::Internal(err),
} // }
} // }
} // }
impl From<meilisearch_core::settings::RankingRuleConversionError> for ResponseError { impl From<meilisearch_core::settings::RankingRuleConversionError> for ResponseError {
fn from(err: meilisearch_core::settings::RankingRuleConversionError) -> ResponseError { fn from(err: meilisearch_core::settings::RankingRuleConversionError) -> ResponseError {
ResponseError::internal(err) ResponseError::Internal(err.to_string())
}
}
pub trait IntoInternalError<T> {
fn into_internal_error(self) -> SResult<T>;
}
impl<T> IntoInternalError<T> for Option<T> {
fn into_internal_error(self) -> SResult<T> {
match self {
Some(value) => Ok(value),
None => Err(ResponseError::internal("Heed cannot find requested value")),
}
} }
} }

View File

@ -1,2 +1,2 @@
pub mod meilisearch; // pub mod meilisearch;
pub mod tide; // pub mod tide;

View File

@ -1,16 +1,14 @@
use std::{env, thread}; use std::{env, thread};
use async_std::task;
use log::info; use log::info;
use main_error::MainError; use main_error::MainError;
use structopt::StructOpt; use structopt::StructOpt;
use tide::middleware::{Cors, RequestLogger, Origin}; use actix_web::middleware::Logger;
use http::header::HeaderValue; use actix_web::{post, web, App, HttpServer, HttpResponse, Responder};
use meilisearch_http::data::Data; use meilisearch_http::data::Data;
use meilisearch_http::option::Opt; use meilisearch_http::option::Opt;
use meilisearch_http::routes; use meilisearch_http::routes;
use meilisearch_http::routes::index::index_update_callback; // use meilisearch_http::routes::index::index_update_callback;
mod analytics; mod analytics;
@ -18,8 +16,11 @@ mod analytics;
#[global_allocator] #[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
pub fn main() -> Result<(), MainError> { #[tokio::main]
async fn main() -> Result<(), MainError> {
let opt = Opt::from_args(); let opt = Opt::from_args();
let local = tokio::task::LocalSet::new();
let _sys = actix_rt::System::run_in_tokio("server", &local);
match opt.env.as_ref() { match opt.env.as_ref() {
"production" => { "production" => {
@ -29,8 +30,7 @@ pub fn main() -> Result<(), MainError> {
.into(), .into(),
); );
} }
env_logger::init(); },
}
"development" => { "development" => {
env_logger::from_env(env_logger::Env::default().default_filter_or("info")).init(); env_logger::from_env(env_logger::Env::default().default_filter_or("info")).init();
} }
@ -45,22 +45,27 @@ pub fn main() -> Result<(), MainError> {
let data_cloned = data.clone(); let data_cloned = data.clone();
data.db.set_update_callback(Box::new(move |name, status| { data.db.set_update_callback(Box::new(move |name, status| {
index_update_callback(name, &data_cloned, status); // index_update_callback(name, &data_cloned, status);
})); }));
print_launch_resume(&opt, &data); print_launch_resume(&opt, &data);
let mut app = tide::with_state(data); HttpServer::new(move ||
App::new()
.wrap(Logger::default())
.app_data(web::Data::new(data.clone()))
.service(routes::document::get_document)
.service(routes::document::delete_document)
.service(routes::document::get_all_documents)
.service(routes::document::add_documents)
.service(routes::document::update_documents)
.service(routes::document::delete_documents)
.service(routes::document::clear_all_documents)
)
.bind(opt.http_addr)?
.run()
.await?;
app.middleware(Cors::new()
.allow_methods(HeaderValue::from_static("GET, POST, PUT, DELETE, OPTIONS"))
.allow_headers(HeaderValue::from_static("X-Meili-API-Key"))
.allow_origin(Origin::from("*")));
app.middleware(RequestLogger::new());
routes::load_routes(&mut app);
task::block_on(app.listen(opt.http_addr))?;
Ok(()) Ok(())
} }
@ -76,7 +81,7 @@ pub fn print_launch_resume(opt: &Opt, data: &Data) {
888 888 "Y8888 888 888 888 "Y8888P" "Y8888 "Y888888 888 "Y8888P 888 888 888 888 "Y8888 888 888 888 "Y8888P" "Y8888 "Y888888 888 "Y8888P 888 888
"#; "#;
println!("{}", ascii_name); info!("{}", ascii_name);
info!("Database path: {:?}", opt.db_path); info!("Database path: {:?}", opt.db_path);
info!("Start server on: {:?}", opt.http_addr); info!("Start server on: {:?}", opt.http_addr);

View File

@ -1,111 +1,104 @@
use std::collections::{BTreeSet, HashSet}; use std::collections::BTreeSet;
use indexmap::IndexMap; use indexmap::IndexMap;
use serde::{Deserialize, Serialize}; use serde::Deserialize;
use serde_json::Value; use serde_json::Value;
use tide::{Request, Response}; use actix_web::*;
use crate::error::{ResponseError, SResult}; use crate::error::ResponseError;
use crate::helpers::tide::RequestExt;
use crate::helpers::tide::ACL::*;
use crate::Data; use crate::Data;
use crate::routes::IndexUpdateResponse;
pub async fn get_document(ctx: Request<Data>) -> SResult<Response> { type Document = IndexMap<String, Value>;
ctx.is_allowed(Public)?;
let index = ctx.index()?; #[get("/indexes/{index_uid}/documents/{document_id}")]
pub async fn get_document(
data: web::Data<Data>,
path: web::Path<(String, String)>,
) -> Result<HttpResponse> {
let index = data.db.open_index(&path.0)
.ok_or(ResponseError::IndexNotFound(path.0.clone()))?;
let document_id = meilisearch_core::serde::compute_document_id(path.1.clone());
let original_document_id = ctx.document_id()?; let reader = data.db.main_read_txn()
let document_id = meilisearch_core::serde::compute_document_id(original_document_id.clone()); .map_err(|_| ResponseError::CreateTransaction)?;
let db = &ctx.state().db; let response = index.document::<Document, String>(&reader, None, document_id)
let reader = db.main_read_txn()?; .map_err(|_| ResponseError::DocumentNotFound(path.1.clone()))?
.ok_or(ResponseError::DocumentNotFound(path.1.clone()))?;
let response = index Ok(HttpResponse::Ok().json(response))
.document::<IndexMap<String, Value>>(&reader, None, document_id)?
.ok_or(ResponseError::document_not_found(&original_document_id))?;
if response.is_empty() {
return Err(ResponseError::document_not_found(&original_document_id));
}
Ok(tide::Response::new(200).body_json(&response)?)
} }
#[derive(Default, Serialize)] #[delete("/indexes/{index_uid}/documents/{document_id}")]
#[serde(rename_all = "camelCase")] pub async fn delete_document(
pub struct IndexUpdateResponse { data: web::Data<Data>,
pub update_id: u64, path: web::Path<(String, String)>,
} ) -> Result<HttpResponse> {
let index = data.db.open_index(&path.0)
.ok_or(ResponseError::IndexNotFound(path.0.clone()))?;
let document_id = meilisearch_core::serde::compute_document_id(path.1.clone());
pub async fn delete_document(ctx: Request<Data>) -> SResult<Response> { let mut update_writer = data.db.update_write_txn()
ctx.is_allowed(Private)?; .map_err(|_| ResponseError::CreateTransaction)?;
let index = ctx.index()?;
let document_id = ctx.document_id()?;
let document_id = meilisearch_core::serde::compute_document_id(document_id);
let db = &ctx.state().db;
let mut update_writer = db.update_write_txn()?;
let mut documents_deletion = index.documents_deletion(); let mut documents_deletion = index.documents_deletion();
documents_deletion.delete_document_by_id(document_id); documents_deletion.delete_document_by_id(document_id);
let update_id = documents_deletion.finalize(&mut update_writer)?;
update_writer.commit()?; let update_id = documents_deletion.finalize(&mut update_writer)
.map_err(|_| ResponseError::Internal(path.1.clone()))?;
let response_body = IndexUpdateResponse { update_id }; update_writer.commit()
Ok(tide::Response::new(202).body_json(&response_body)?) .map_err(|_| ResponseError::CommitTransaction)?;
Ok(HttpResponse::Accepted().json(IndexUpdateResponse::with_id(update_id)))
} }
#[derive(Default, Deserialize)] #[derive(Default, Deserialize)]
#[serde(rename_all = "camelCase", deny_unknown_fields)] #[serde(rename_all = "camelCase", deny_unknown_fields)]
struct BrowseQuery { pub struct BrowseQuery {
offset: Option<usize>, offset: Option<usize>,
limit: Option<usize>, limit: Option<usize>,
attributes_to_retrieve: Option<String>, attributes_to_retrieve: Option<String>,
} }
pub async fn get_all_documents(ctx: Request<Data>) -> SResult<Response> { #[get("/indexes/{index_uid}/documents")]
ctx.is_allowed(Private)?; pub async fn get_all_documents(
data: web::Data<Data>,
path: web::Path<String>,
params: web::Query<BrowseQuery>,
) -> Result<HttpResponse> {
let index = ctx.index()?; let index = data.db.open_index(path.clone())
let query: BrowseQuery = ctx.query().unwrap_or_default(); .ok_or(ResponseError::IndexNotFound(path.clone()))?;
let offset = query.offset.unwrap_or(0); let offset = params.offset.unwrap_or(0);
let limit = query.limit.unwrap_or(20); let limit = params.limit.unwrap_or(20);
let db = &ctx.state().db; let reader = data.db.main_read_txn()
let reader = db.main_read_txn()?; .map_err(|_| ResponseError::CreateTransaction)?;
let documents_ids: Result<BTreeSet<_>, _> = index let documents_ids: Result<BTreeSet<_>, _> = index
.documents_fields_counts .documents_fields_counts
.documents_ids(&reader)? .documents_ids(&reader)
.map_err(|_| ResponseError::Internal(path.clone()))?
.skip(offset) .skip(offset)
.take(limit) .take(limit)
.collect(); .collect();
let documents_ids = match documents_ids { let documents_ids = documents_ids.map_err(|err| ResponseError::Internal(err.to_string()))?;
Ok(documents_ids) => documents_ids,
Err(e) => return Err(ResponseError::internal(e)), let attributes = params.attributes_to_retrieve.clone()
}; .map(|a| a.split(',').map(|a| a.to_string()).collect());
let mut response_body = Vec::<IndexMap<String, Value>>::new(); let mut response_body = Vec::<IndexMap<String, Value>>::new();
for document_id in documents_ids {
if let Some(attributes) = query.attributes_to_retrieve { if let Ok(Some(document)) = index.document(&reader, attributes.clone(), document_id) {
let attributes = attributes.split(',').collect::<HashSet<&str>>(); response_body.push(document);
for document_id in documents_ids {
if let Ok(Some(document)) = index.document(&reader, Some(&attributes), document_id) {
response_body.push(document);
}
}
} else {
for document_id in documents_ids {
if let Ok(Some(document)) = index.document(&reader, None, document_id) {
response_body.push(document);
}
} }
} }
Ok(tide::Response::new(200).body_json(&response_body)?) Ok(HttpResponse::Ok().json(response_body))
} }
fn find_primary_key(document: &IndexMap<String, Value>) -> Option<String> { fn find_primary_key(document: &IndexMap<String, Value>) -> Option<String> {
@ -119,40 +112,49 @@ fn find_primary_key(document: &IndexMap<String, Value>) -> Option<String> {
#[derive(Default, Deserialize)] #[derive(Default, Deserialize)]
#[serde(rename_all = "camelCase", deny_unknown_fields)] #[serde(rename_all = "camelCase", deny_unknown_fields)]
struct UpdateDocumentsQuery { pub struct UpdateDocumentsQuery {
primary_key: Option<String>, primary_key: Option<String>,
} }
async fn update_multiple_documents(mut ctx: Request<Data>, is_partial: bool) -> SResult<Response> { async fn update_multiple_documents(
ctx.is_allowed(Private)?; data: web::Data<Data>,
path: web::Path<String>,
params: web::Query<UpdateDocumentsQuery>,
body: web::Json<Vec<Document>>,
is_partial: bool
) -> Result<HttpResponse> {
let index = ctx.index()?; let index = data.db.open_index(path.clone())
.ok_or(ResponseError::IndexNotFound(path.clone()))?;
let data: Vec<IndexMap<String, Value>> = let reader = data.db.main_read_txn()
ctx.body_json().await.map_err(ResponseError::bad_request)?; .map_err(|_| ResponseError::CreateTransaction)?;
let query: UpdateDocumentsQuery = ctx.query().unwrap_or_default();
let db = &ctx.state().db;
let reader = db.main_read_txn()?;
let mut schema = index let mut schema = index
.main .main
.schema(&reader)? .schema(&reader)
.ok_or(ResponseError::internal("schema not found"))?; .map_err(|_| ResponseError::Schema)?
.ok_or(ResponseError::Schema)?;
if schema.primary_key().is_none() { if schema.primary_key().is_none() {
let id = match query.primary_key { let id = match params.primary_key.clone() {
Some(id) => id, Some(id) => id,
None => match data.first().and_then(|docs| find_primary_key(docs)) { None => {
Some(id) => id, body.first()
None => return Err(ResponseError::bad_request("Could not infer a primary key")), .and_then(|docs| find_primary_key(docs))
}, .ok_or(ResponseError::InferPrimaryKey)?
}
}; };
let mut writer = db.main_write_txn()?; let mut writer = data.db.main_write_txn()
schema.set_primary_key(&id).map_err(ResponseError::bad_request)?; .map_err(|_| ResponseError::CreateTransaction)?;
index.main.put_schema(&mut writer, &schema)?;
writer.commit()?; schema.set_primary_key(&id)
.map_err(|e| ResponseError::Internal(e.to_string()))?;
index.main.put_schema(&mut writer, &schema)
.map_err(|e| ResponseError::Internal(e.to_string()))?;
writer.commit()
.map_err(|_| ResponseError::CommitTransaction)?;
} }
let mut document_addition = if is_partial { let mut document_addition = if is_partial {
@ -161,63 +163,88 @@ async fn update_multiple_documents(mut ctx: Request<Data>, is_partial: bool) ->
index.documents_addition() index.documents_addition()
}; };
for document in data { for document in body.into_inner() {
document_addition.update_document(document); document_addition.update_document(document);
} }
let mut update_writer = db.update_write_txn()?; let mut update_writer = data.db.update_write_txn()
let update_id = document_addition.finalize(&mut update_writer)?; .map_err(|_| ResponseError::CreateTransaction)?;
update_writer.commit()?; let update_id = document_addition.finalize(&mut update_writer)
.map_err(|e| ResponseError::Internal(e.to_string()))?;
update_writer.commit()
.map_err(|_| ResponseError::CommitTransaction)?;
let response_body = IndexUpdateResponse { update_id }; Ok(HttpResponse::Accepted().json(IndexUpdateResponse::with_id(update_id)))
Ok(tide::Response::new(202).body_json(&response_body)?)
} }
pub async fn add_or_replace_multiple_documents(ctx: Request<Data>) -> SResult<Response> { #[post("/indexes/{index_uid}/documents")]
update_multiple_documents(ctx, false).await pub async fn add_documents(
data: web::Data<Data>,
path: web::Path<String>,
params: web::Query<UpdateDocumentsQuery>,
body: web::Json<Vec<Document>>
) -> Result<HttpResponse> {
update_multiple_documents(data, path, params, body, false).await
} }
pub async fn add_or_update_multiple_documents(ctx: Request<Data>) -> SResult<Response> { #[put("/indexes/{index_uid}/documents")]
update_multiple_documents(ctx, true).await pub async fn update_documents(
data: web::Data<Data>,
path: web::Path<String>,
params: web::Query<UpdateDocumentsQuery>,
body: web::Json<Vec<Document>>
) -> Result<HttpResponse> {
update_multiple_documents(data, path, params, body, true).await
} }
pub async fn delete_multiple_documents(mut ctx: Request<Data>) -> SResult<Response> { #[post("/indexes/{index_uid}/documents/delete-batch")]
ctx.is_allowed(Private)?; pub async fn delete_documents(
data: web::Data<Data>,
path: web::Path<String>,
body: web::Json<Vec<Value>>
) -> Result<HttpResponse> {
let data: Vec<Value> = ctx.body_json().await.map_err(ResponseError::bad_request)?; let index = data.db.open_index(path.clone())
let index = ctx.index()?; .ok_or(ResponseError::IndexNotFound(path.clone()))?;
let db = &ctx.state().db; let mut writer = data.db.update_write_txn()
let mut writer = db.update_write_txn()?; .map_err(|_| ResponseError::CreateTransaction)?;
let mut documents_deletion = index.documents_deletion(); let mut documents_deletion = index.documents_deletion();
for document_id in data { for document_id in body.into_inner() {
if let Some(document_id) = meilisearch_core::serde::value_to_string(&document_id) { if let Some(document_id) = meilisearch_core::serde::value_to_string(&document_id) {
documents_deletion documents_deletion
.delete_document_by_id(meilisearch_core::serde::compute_document_id(document_id)); .delete_document_by_id(meilisearch_core::serde::compute_document_id(document_id));
} }
} }
let update_id = documents_deletion.finalize(&mut writer)?; let update_id = documents_deletion.finalize(&mut writer)
.map_err(|e| ResponseError::Internal(e.to_string()))?;
writer.commit()?; writer.commit()
.map_err(|_| ResponseError::CommitTransaction)?;
let response_body = IndexUpdateResponse { update_id }; Ok(HttpResponse::Accepted().json(IndexUpdateResponse::with_id(update_id)))
Ok(tide::Response::new(202).body_json(&response_body)?)
} }
pub async fn clear_all_documents(ctx: Request<Data>) -> SResult<Response> { #[delete("/indexes/{index_uid}/documents")]
ctx.is_allowed(Private)?; pub async fn clear_all_documents(
data: web::Data<Data>,
path: web::Path<String>,
) -> Result<HttpResponse> {
let index = ctx.index()?; let index = data.db.open_index(path.clone())
.ok_or(ResponseError::IndexNotFound(path.clone()))?;
let db = &ctx.state().db; let mut writer = data.db.update_write_txn()
let mut writer = db.update_write_txn()?; .map_err(|_| ResponseError::CreateTransaction)?;
let update_id = index.clear_all(&mut writer)?; let update_id = index.clear_all(&mut writer)
writer.commit()?; .map_err(|e| ResponseError::Internal(e.to_string()))?;
let response_body = IndexUpdateResponse { update_id }; writer.commit()
Ok(tide::Response::new(202).body_json(&response_body)?) .map_err(|_| ResponseError::CommitTransaction)?;
Ok(HttpResponse::Accepted().json(IndexUpdateResponse::with_id(update_id)))
} }

View File

@ -1,130 +1,135 @@
use crate::data::Data;
use std::future::Future; use serde::Serialize;
use tide::IntoResponse;
use tide::Response;
pub mod document; pub mod document;
pub mod health; // pub mod health;
pub mod index; // pub mod index;
pub mod key; // pub mod key;
pub mod search; // pub mod search;
pub mod setting; // pub mod setting;
pub mod stats; // pub mod stats;
pub mod stop_words; // pub mod stop_words;
pub mod synonym; // pub mod synonym;
async fn into_response<T: IntoResponse, U: IntoResponse>( #[derive(Default, Serialize)]
x: impl Future<Output = Result<T, U>>, #[serde(rename_all = "camelCase")]
) -> Response { pub struct IndexUpdateResponse {
match x.await { pub update_id: u64,
Ok(resp) => resp.into_response(), pub see_more: String,
Err(resp) => resp.into_response(), }
impl IndexUpdateResponse {
pub fn with_id(update_id: u64) -> Self {
Self {
update_id,
see_more: "https://docs.meilisearch.com/guides/advanced_guides/asynchronous_updates.html".to_string()
}
} }
} }
pub fn load_routes(app: &mut tide::Server<Data>) { // pub fn load_routes(app: &mut tide::Server<Data>) {
app.at("/").get(|_| async { // app.at("/").get(|_| async {
tide::Response::new(200) // tide::Response::new(200)
.body_string(include_str!("../../public/interface.html").to_string()) // .body_string(include_str!("../../public/interface.html").to_string())
.set_mime(mime::TEXT_HTML_UTF_8) // .set_mime(mime::TEXT_HTML_UTF_8)
}); // });
app.at("/bulma.min.css").get(|_| async { // app.at("/bulma.min.css").get(|_| async {
tide::Response::new(200) // tide::Response::new(200)
.body_string(include_str!("../../public/bulma.min.css").to_string()) // .body_string(include_str!("../../public/bulma.min.css").to_string())
.set_mime(mime::TEXT_CSS_UTF_8) // .set_mime(mime::TEXT_CSS_UTF_8)
}); // });
app.at("/indexes") // app.at("/indexes")
.get(|ctx| into_response(index::list_indexes(ctx))) // .get(|ctx| into_response(index::list_indexes(ctx)))
.post(|ctx| into_response(index::create_index(ctx))); // .post(|ctx| into_response(index::create_index(ctx)));
app.at("/indexes/search") // app.at("/indexes/search")
.post(|ctx| into_response(search::search_multi_index(ctx))); // .post(|ctx| into_response(search::search_multi_index(ctx)));
app.at("/indexes/:index") // app.at("/indexes/:index")
.get(|ctx| into_response(index::get_index(ctx))) // .get(|ctx| into_response(index::get_index(ctx)))
.put(|ctx| into_response(index::update_index(ctx))) // .put(|ctx| into_response(index::update_index(ctx)))
.delete(|ctx| into_response(index::delete_index(ctx))); // .delete(|ctx| into_response(index::delete_index(ctx)));
app.at("/indexes/:index/search") // app.at("/indexes/:index/search")
.get(|ctx| into_response(search::search_with_url_query(ctx))); // .get(|ctx| into_response(search::search_with_url_query(ctx)));
app.at("/indexes/:index/updates") // app.at("/indexes/:index/updates")
.get(|ctx| into_response(index::get_all_updates_status(ctx))); // .get(|ctx| into_response(index::get_all_updates_status(ctx)));
app.at("/indexes/:index/updates/:update_id") // app.at("/indexes/:index/updates/:update_id")
.get(|ctx| into_response(index::get_update_status(ctx))); // .get(|ctx| into_response(index::get_update_status(ctx)));
app.at("/indexes/:index/documents") // app.at("/indexes/:index/documents")
.get(|ctx| into_response(document::get_all_documents(ctx))) // .get(|ctx| into_response(document::get_all_documents(ctx)))
.post(|ctx| into_response(document::add_or_replace_multiple_documents(ctx))) // .post(|ctx| into_response(document::add_or_replace_multiple_documents(ctx)))
.put(|ctx| into_response(document::add_or_update_multiple_documents(ctx))) // .put(|ctx| into_response(document::add_or_update_multiple_documents(ctx)))
.delete(|ctx| into_response(document::clear_all_documents(ctx))); // .delete(|ctx| into_response(document::clear_all_documents(ctx)));
app.at("/indexes/:index/documents/:document_id") // app.at("/indexes/:index/documents/:document_id")
.get(|ctx| into_response(document::get_document(ctx))) // .get(|ctx| into_response(document::get_document(ctx)))
.delete(|ctx| into_response(document::delete_document(ctx))); // .delete(|ctx| into_response(document::delete_document(ctx)));
app.at("/indexes/:index/documents/delete-batch") // app.at("/indexes/:index/documents/delete-batch")
.post(|ctx| into_response(document::delete_multiple_documents(ctx))); // .post(|ctx| into_response(document::delete_multiple_documents(ctx)));
app.at("/indexes/:index/settings") // app.at("/indexes/:index/settings")
.get(|ctx| into_response(setting::get_all(ctx))) // .get(|ctx| into_response(setting::get_all(ctx)))
.post(|ctx| into_response(setting::update_all(ctx))) // .post(|ctx| into_response(setting::update_all(ctx)))
.delete(|ctx| into_response(setting::delete_all(ctx))); // .delete(|ctx| into_response(setting::delete_all(ctx)));
app.at("/indexes/:index/settings/ranking-rules") // app.at("/indexes/:index/settings/ranking-rules")
.get(|ctx| into_response(setting::get_rules(ctx))) // .get(|ctx| into_response(setting::get_rules(ctx)))
.post(|ctx| into_response(setting::update_rules(ctx))) // .post(|ctx| into_response(setting::update_rules(ctx)))
.delete(|ctx| into_response(setting::delete_rules(ctx))); // .delete(|ctx| into_response(setting::delete_rules(ctx)));
app.at("/indexes/:index/settings/distinct-attribute") // app.at("/indexes/:index/settings/distinct-attribute")
.get(|ctx| into_response(setting::get_distinct(ctx))) // .get(|ctx| into_response(setting::get_distinct(ctx)))
.post(|ctx| into_response(setting::update_distinct(ctx))) // .post(|ctx| into_response(setting::update_distinct(ctx)))
.delete(|ctx| into_response(setting::delete_distinct(ctx))); // .delete(|ctx| into_response(setting::delete_distinct(ctx)));
app.at("/indexes/:index/settings/searchable-attributes") // app.at("/indexes/:index/settings/searchable-attributes")
.get(|ctx| into_response(setting::get_searchable(ctx))) // .get(|ctx| into_response(setting::get_searchable(ctx)))
.post(|ctx| into_response(setting::update_searchable(ctx))) // .post(|ctx| into_response(setting::update_searchable(ctx)))
.delete(|ctx| into_response(setting::delete_searchable(ctx))); // .delete(|ctx| into_response(setting::delete_searchable(ctx)));
app.at("/indexes/:index/settings/displayed-attributes") // app.at("/indexes/:index/settings/displayed-attributes")
.get(|ctx| into_response(setting::displayed(ctx))) // .get(|ctx| into_response(setting::displayed(ctx)))
.post(|ctx| into_response(setting::update_displayed(ctx))) // .post(|ctx| into_response(setting::update_displayed(ctx)))
.delete(|ctx| into_response(setting::delete_displayed(ctx))); // .delete(|ctx| into_response(setting::delete_displayed(ctx)));
app.at("/indexes/:index/settings/accept-new-fields") // app.at("/indexes/:index/settings/accept-new-fields")
.get(|ctx| into_response(setting::get_accept_new_fields(ctx))) // .get(|ctx| into_response(setting::get_accept_new_fields(ctx)))
.post(|ctx| into_response(setting::update_accept_new_fields(ctx))); // .post(|ctx| into_response(setting::update_accept_new_fields(ctx)));
app.at("/indexes/:index/settings/synonyms") // app.at("/indexes/:index/settings/synonyms")
.get(|ctx| into_response(synonym::get(ctx))) // .get(|ctx| into_response(synonym::get(ctx)))
.post(|ctx| into_response(synonym::update(ctx))) // .post(|ctx| into_response(synonym::update(ctx)))
.delete(|ctx| into_response(synonym::delete(ctx))); // .delete(|ctx| into_response(synonym::delete(ctx)));
app.at("/indexes/:index/settings/stop-words") // app.at("/indexes/:index/settings/stop-words")
.get(|ctx| into_response(stop_words::get(ctx))) // .get(|ctx| into_response(stop_words::get(ctx)))
.post(|ctx| into_response(stop_words::update(ctx))) // .post(|ctx| into_response(stop_words::update(ctx)))
.delete(|ctx| into_response(stop_words::delete(ctx))); // .delete(|ctx| into_response(stop_words::delete(ctx)));
app.at("/indexes/:index/stats") // app.at("/indexes/:index/stats")
.get(|ctx| into_response(stats::index_stats(ctx))); // .get(|ctx| into_response(stats::index_stats(ctx)));
app.at("/keys").get(|ctx| into_response(key::list(ctx))); // app.at("/keys").get(|ctx| into_response(key::list(ctx)));
app.at("/health") // app.at("/health")
.get(|ctx| into_response(health::get_health(ctx))) // .get(|ctx| into_response(health::get_health(ctx)))
.put(|ctx| into_response(health::change_healthyness(ctx))); // .put(|ctx| into_response(health::change_healthyness(ctx)));
app.at("/stats") // app.at("/stats")
.get(|ctx| into_response(stats::get_stats(ctx))); // .get(|ctx| into_response(stats::get_stats(ctx)));
app.at("/version") // app.at("/version")
.get(|ctx| into_response(stats::get_version(ctx))); // .get(|ctx| into_response(stats::get_version(ctx)));
app.at("/sys-info") // app.at("/sys-info")
.get(|ctx| into_response(stats::get_sys_info(ctx))); // .get(|ctx| into_response(stats::get_sys_info(ctx)));
app.at("/sys-info/pretty") // app.at("/sys-info/pretty")
.get(|ctx| into_response(stats::get_sys_info_pretty(ctx))); // .get(|ctx| into_response(stats::get_sys_info_pretty(ctx)));
} // }