jsonl support

This commit is contained in:
mpostma
2021-09-29 10:17:52 +02:00
parent 5bac65f8b8
commit 1f537e1b60
10 changed files with 121 additions and 41 deletions

1
Cargo.lock generated
View File

@@ -1779,6 +1779,7 @@ dependencies = [
[[package]] [[package]]
name = "milli" name = "milli"
version = "0.16.0" version = "0.16.0"
source = "git+https://github.com/meilisearch/milli.git?rev=f65153ad6454317213680e9a9a908ec78d5645a7#f65153ad6454317213680e9a9a908ec78d5645a7"
dependencies = [ dependencies = [
"bimap", "bimap",
"bincode", "bincode",

View File

@@ -35,6 +35,7 @@ macro_rules! guard_content_type {
guard_content_type!(guard_json, "application/json"); guard_content_type!(guard_json, "application/json");
guard_content_type!(guard_csv, "application/csv"); guard_content_type!(guard_csv, "application/csv");
guard_content_type!(guard_ndjson, "application/ndjson");
fn empty_application_type(head: &actix_web::dev::RequestHead) -> bool { fn empty_application_type(head: &actix_web::dev::RequestHead) -> bool {
head.headers.get("Content-Type").is_none() head.headers.get("Content-Type").is_none()
@@ -61,16 +62,26 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service( cfg.service(
web::resource("") web::resource("")
.route(web::get().to(get_all_documents)) .route(web::get().to(get_all_documents))
// replace documents routes
.route(web::post().guard(empty_application_type).to(|| HttpResponse::UnsupportedMediaType())) .route(
web::post()
.guard(empty_application_type)
.to(HttpResponse::UnsupportedMediaType),
)
.route(web::post().guard(guard_json).to(add_documents_json)) .route(web::post().guard(guard_json).to(add_documents_json))
.route(web::post().guard(guard_ndjson).to(add_documents_ndjson))
.route(web::post().guard(guard_csv).to(add_documents_csv)) .route(web::post().guard(guard_csv).to(add_documents_csv))
.route(web::post().to(|| HttpResponse::UnsupportedMediaType())) .route(web::post().to(HttpResponse::UnsupportedMediaType))
// update documents routes
.route(web::put().guard(empty_application_type).to(|| HttpResponse::UnsupportedMediaType())) .route(
web::put()
.guard(empty_application_type)
.to(HttpResponse::UnsupportedMediaType),
)
.route(web::put().guard(guard_json).to(update_documents_json)) .route(web::put().guard(guard_json).to(update_documents_json))
.route(web::put().guard(guard_ndjson).to(update_documents_ndjson))
.route(web::put().guard(guard_csv).to(update_documents_csv)) .route(web::put().guard(guard_csv).to(update_documents_csv))
.route(web::put().to(|| HttpResponse::UnsupportedMediaType())) .route(web::put().to(HttpResponse::UnsupportedMediaType))
.route(web::delete().to(clear_all_documents)), .route(web::delete().to(clear_all_documents)),
) )
// this route needs to be before the /documents/{document_id} to match properly // this route needs to be before the /documents/{document_id} to match properly
@@ -160,7 +171,32 @@ pub async fn add_documents_json(
params: web::Query<UpdateDocumentsQuery>, params: web::Query<UpdateDocumentsQuery>,
body: Payload, body: Payload,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
document_addition(meilisearch, path, params, body, DocumentAdditionFormat::Json, IndexDocumentsMethod::ReplaceDocuments).await document_addition(
meilisearch,
path,
params,
body,
DocumentAdditionFormat::Json,
IndexDocumentsMethod::ReplaceDocuments,
)
.await
}
pub async fn add_documents_ndjson(
meilisearch: GuardedData<Private, MeiliSearch>,
path: web::Path<IndexParam>,
params: web::Query<UpdateDocumentsQuery>,
body: Payload,
) -> Result<HttpResponse, ResponseError> {
document_addition(
meilisearch,
path,
params,
body,
DocumentAdditionFormat::Ndjson,
IndexDocumentsMethod::ReplaceDocuments,
)
.await
} }
pub async fn add_documents_csv( pub async fn add_documents_csv(
@@ -169,7 +205,15 @@ pub async fn add_documents_csv(
params: web::Query<UpdateDocumentsQuery>, params: web::Query<UpdateDocumentsQuery>,
body: Payload, body: Payload,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
document_addition(meilisearch, path, params, body, DocumentAdditionFormat::Csv, IndexDocumentsMethod::ReplaceDocuments).await document_addition(
meilisearch,
path,
params,
body,
DocumentAdditionFormat::Csv,
IndexDocumentsMethod::ReplaceDocuments,
)
.await
} }
pub async fn update_documents_json( pub async fn update_documents_json(
@@ -178,7 +222,32 @@ pub async fn update_documents_json(
params: web::Query<UpdateDocumentsQuery>, params: web::Query<UpdateDocumentsQuery>,
body: Payload, body: Payload,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
document_addition(meilisearch, path, params, body, DocumentAdditionFormat::Json, IndexDocumentsMethod::UpdateDocuments).await document_addition(
meilisearch,
path,
params,
body,
DocumentAdditionFormat::Json,
IndexDocumentsMethod::UpdateDocuments,
)
.await
}
pub async fn update_documents_ndjson(
meilisearch: GuardedData<Private, MeiliSearch>,
path: web::Path<IndexParam>,
params: web::Query<UpdateDocumentsQuery>,
body: Payload,
) -> Result<HttpResponse, ResponseError> {
document_addition(
meilisearch,
path,
params,
body,
DocumentAdditionFormat::Ndjson,
IndexDocumentsMethod::UpdateDocuments,
)
.await
} }
pub async fn update_documents_csv( pub async fn update_documents_csv(
@@ -187,7 +256,15 @@ pub async fn update_documents_csv(
params: web::Query<UpdateDocumentsQuery>, params: web::Query<UpdateDocumentsQuery>,
body: Payload, body: Payload,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
document_addition(meilisearch, path, params, body, DocumentAdditionFormat::Csv, IndexDocumentsMethod::UpdateDocuments).await document_addition(
meilisearch,
path,
params,
body,
DocumentAdditionFormat::Csv,
IndexDocumentsMethod::UpdateDocuments,
)
.await
} }
/// Route used when the payload type is "application/json" /// Route used when the payload type is "application/json"
/// Used to add or replace documents /// Used to add or replace documents

View File

@@ -31,7 +31,7 @@ log = "0.4.14"
meilisearch-error = { path = "../meilisearch-error" } meilisearch-error = { path = "../meilisearch-error" }
meilisearch-tokenizer = { git = "https://github.com/meilisearch/tokenizer.git", tag = "v0.2.5" } meilisearch-tokenizer = { git = "https://github.com/meilisearch/tokenizer.git", tag = "v0.2.5" }
memmap = "0.7.0" memmap = "0.7.0"
milli = { path = "../../milli/milli" } milli = { git = "https://github.com/meilisearch/milli.git", rev = "f65153ad6454317213680e9a9a908ec78d5645a7"}
mime = "0.3.16" mime = "0.3.16"
num_cpus = "1.13.0" num_cpus = "1.13.0"
once_cell = "1.8.0" once_cell = "1.8.0"

View File

@@ -1,5 +1,5 @@
use std::io::{self, Read, Result as IoResult, Seek, Write};
use std::fmt; use std::fmt;
use std::io::{self, Read, Result as IoResult, Seek, Write};
use csv::{Reader as CsvReader, StringRecordsIntoIter}; use csv::{Reader as CsvReader, StringRecordsIntoIter};
use milli::documents::DocumentBatchBuilder; use milli::documents::DocumentBatchBuilder;
@@ -9,7 +9,7 @@ type Result<T> = std::result::Result<T, DocumentFormatError>;
#[derive(Debug)] #[derive(Debug)]
pub enum PayloadType { pub enum PayloadType {
Jsonl, Ndjson,
Json, Json,
Csv, Csv,
} }
@@ -17,7 +17,7 @@ pub enum PayloadType {
impl fmt::Display for PayloadType { impl fmt::Display for PayloadType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self { match self {
PayloadType::Jsonl => write!(f, "ndjson"), PayloadType::Ndjson => write!(f, "ndjson"),
PayloadType::Json => write!(f, "json"), PayloadType::Json => write!(f, "json"),
PayloadType::Csv => write!(f, "csv"), PayloadType::Csv => write!(f, "csv"),
} }
@@ -56,14 +56,13 @@ pub fn read_csv(input: impl Read, writer: impl Write + Seek) -> Result<()> {
Ok(()) Ok(())
} }
/// read jsonl from input and write an obkv batch to writer. /// read jsonl from input and write an obkv batch to writer.
pub fn read_jsonl(input: impl Read, writer: impl Write + Seek) -> Result<()> { pub fn read_ndjson(input: impl Read, writer: impl Write + Seek) -> Result<()> {
let mut builder = DocumentBatchBuilder::new(writer)?; let mut builder = DocumentBatchBuilder::new(writer)?;
let stream = Deserializer::from_reader(input).into_iter::<Map<String, Value>>(); let stream = Deserializer::from_reader(input).into_iter::<Map<String, Value>>();
for value in stream { for value in stream {
let value = malformed!(PayloadType::Jsonl, value)?; let value = malformed!(PayloadType::Ndjson, value)?;
builder.add_documents(&value)?; builder.add_documents(&value)?;
} }
@@ -84,7 +83,6 @@ pub fn read_json(input: impl Read, writer: impl Write + Seek) -> Result<()> {
Ok(()) Ok(())
} }
enum AllowedType { enum AllowedType {
String, String,
Number, Number,
@@ -141,12 +139,12 @@ impl<R: Read> Iterator for CsvDocumentIter<R> {
for ((field_name, field_type), value) in for ((field_name, field_type), value) in
self.headers.iter().zip(csv_document.into_iter()) self.headers.iter().zip(csv_document.into_iter())
{ {
let parsed_value = (|| match field_type { let parsed_value = match field_type {
AllowedType::Number => malformed!(PayloadType::Csv, value AllowedType::Number => {
.parse::<f64>() malformed!(PayloadType::Csv, value.parse::<f64>().map(Value::from))
.map(Value::from)), }
AllowedType::String => Ok(Value::String(value.to_string())), AllowedType::String => Ok(Value::String(value.to_string())),
})(); };
match parsed_value { match parsed_value {
Ok(value) => drop(document.insert(field_name.to_string(), value)), Ok(value) => drop(document.insert(field_name.to_string(), value)),
@@ -156,7 +154,10 @@ impl<R: Read> Iterator for CsvDocumentIter<R> {
Some(Ok(document)) Some(Ok(document))
} }
Err(e) => Some(Err(DocumentFormatError::MalformedPayload(Box::new(e), PayloadType::Csv))), Err(e) => Some(Err(DocumentFormatError::MalformedPayload(
Box::new(e),
PayloadType::Csv,
))),
} }
} }
} }

View File

@@ -9,7 +9,7 @@ use milli::documents::DocumentBatchReader;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
use crate::document_formats::read_jsonl; use crate::document_formats::read_ndjson;
use crate::index::update_handler::UpdateHandler; use crate::index::update_handler::UpdateHandler;
use crate::index::updates::apply_settings_to_builder; use crate::index::updates::apply_settings_to_builder;
use crate::index_controller::{asc_ranking_rule, desc_ranking_rule}; use crate::index_controller::{asc_ranking_rule, desc_ranking_rule};
@@ -142,7 +142,7 @@ impl Index {
let mut tmp_doc_file = tempfile::tempfile()?; let mut tmp_doc_file = tempfile::tempfile()?;
read_jsonl(reader, &mut tmp_doc_file)?; read_ndjson(reader, &mut tmp_doc_file)?;
tmp_doc_file.seek(SeekFrom::Start(0))?; tmp_doc_file.seek(SeekFrom::Start(0))?;

View File

@@ -11,7 +11,7 @@ use milli::update::Setting;
use serde::{Deserialize, Deserializer, Serialize}; use serde::{Deserialize, Deserializer, Serialize};
use uuid::Uuid; use uuid::Uuid;
use crate::document_formats::read_jsonl; use crate::document_formats::read_ndjson;
use crate::index::apply_settings_to_builder; use crate::index::apply_settings_to_builder;
use crate::index::update_handler::UpdateHandler; use crate::index::update_handler::UpdateHandler;
use crate::index_controller::index_resolver::uuid_store::HeedUuidStore; use crate::index_controller::index_resolver::uuid_store::HeedUuidStore;
@@ -124,7 +124,7 @@ fn load_index(
let mut tmp_doc_file = tempfile::tempfile()?; let mut tmp_doc_file = tempfile::tempfile()?;
read_jsonl(reader, &mut tmp_doc_file)?; read_ndjson(reader, &mut tmp_doc_file)?;
tmp_doc_file.seek(SeekFrom::Start(0))?; tmp_doc_file.seek(SeekFrom::Start(0))?;
@@ -213,7 +213,7 @@ impl From<Settings> for index_controller::Settings<Unchecked> {
} }
} }
// /// Extract Settings from `settings.json` file present at provided `dir_path` /// Extract Settings from `settings.json` file present at provided `dir_path`
fn import_settings(dir_path: impl AsRef<Path>) -> anyhow::Result<Settings> { fn import_settings(dir_path: impl AsRef<Path>) -> anyhow::Result<Settings> {
let path = dir_path.as_ref().join("settings.json"); let path = dir_path.as_ref().join("settings.json");
let file = File::open(path)?; let file = File::open(path)?;

View File

@@ -72,6 +72,7 @@ pub struct IndexController {
pub enum DocumentAdditionFormat { pub enum DocumentAdditionFormat {
Json, Json,
Csv, Csv,
Ndjson,
} }
#[derive(Serialize, Debug)] #[derive(Serialize, Debug)]

View File

@@ -10,7 +10,7 @@ use uuid::Uuid;
const UPDATE_FILES_PATH: &str = "updates/updates_files"; const UPDATE_FILES_PATH: &str = "updates/updates_files";
use crate::document_formats::read_jsonl; use crate::document_formats::read_ndjson;
pub struct UpdateFile { pub struct UpdateFile {
path: PathBuf, path: PathBuf,
@@ -86,7 +86,7 @@ impl UpdateFileStore {
.ok_or_else(|| anyhow::anyhow!("invalid update file name"))?; .ok_or_else(|| anyhow::anyhow!("invalid update file name"))?;
let dst_path = dst_update_files_path.join(file_uuid); let dst_path = dst_update_files_path.join(file_uuid);
let dst_file = BufWriter::new(File::create(dst_path)?); let dst_file = BufWriter::new(File::create(dst_path)?);
read_jsonl(update_file, dst_file)?; read_ndjson(update_file, dst_file)?;
} }
Ok(()) Ok(())
@@ -98,9 +98,9 @@ impl UpdateFileStore {
Ok(Self { path }) Ok(Self { path })
} }
/// Created a new temporary update file. /// Creates a new temporary update file.
/// ///
/// A call to persist is needed to persist in the database. /// A call to `persist` is needed to persist the file in the database.
pub fn new_update(&self) -> Result<(Uuid, UpdateFile)> { pub fn new_update(&self) -> Result<(Uuid, UpdateFile)> {
let file = NamedTempFile::new()?; let file = NamedTempFile::new()?;
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
@@ -110,14 +110,14 @@ impl UpdateFileStore {
Ok((uuid, update_file)) Ok((uuid, update_file))
} }
/// Returns a the file corresponding to the requested uuid. /// Returns the file corresponding to the requested uuid.
pub fn get_update(&self, uuid: Uuid) -> Result<File> { pub fn get_update(&self, uuid: Uuid) -> Result<File> {
let path = self.path.join(uuid.to_string()); let path = self.path.join(uuid.to_string());
let file = File::open(path)?; let file = File::open(path)?;
Ok(file) Ok(file)
} }
/// Copies the content of the update file poited to by uuid to dst directory. /// Copies the content of the update file pointed to by `uuid` to the `dst` directory.
pub fn snapshot(&self, uuid: Uuid, dst: impl AsRef<Path>) -> Result<()> { pub fn snapshot(&self, uuid: Uuid, dst: impl AsRef<Path>) -> Result<()> {
let src = self.path.join(uuid.to_string()); let src = self.path.join(uuid.to_string());
let mut dst = dst.as_ref().join(UPDATE_FILES_PATH); let mut dst = dst.as_ref().join(UPDATE_FILES_PATH);
@@ -127,7 +127,7 @@ impl UpdateFileStore {
Ok(()) Ok(())
} }
/// Peform a dump of the given update file uuid into the provided snapshot path. /// Peforms a dump of the given update file uuid into the provided dump path.
pub fn dump(&self, uuid: Uuid, dump_path: impl AsRef<Path>) -> Result<()> { pub fn dump(&self, uuid: Uuid, dump_path: impl AsRef<Path>) -> Result<()> {
let uuid_string = uuid.to_string(); let uuid_string = uuid.to_string();
let update_file_path = self.path.join(&uuid_string); let update_file_path = self.path.join(&uuid_string);
@@ -140,7 +140,8 @@ impl UpdateFileStore {
let mut document_reader = DocumentBatchReader::from_reader(update_file)?; let mut document_reader = DocumentBatchReader::from_reader(update_file)?;
let mut document_buffer = Map::new(); let mut document_buffer = Map::new();
// TODO: we need to find a way to do this more efficiently. (create a custom serializer to // TODO: we need to find a way to do this more efficiently. (create a custom serializer
// for
// jsonl for example...) // jsonl for example...)
while let Some((index, document)) = document_reader.next_document_with_index()? { while let Some((index, document)) = document_reader.next_document_with_index()? {
for (field_id, content) in document.iter() { for (field_id, content) in document.iter() {

View File

@@ -17,8 +17,6 @@ pub enum UpdateLoopError {
UnexistingUpdate(u64), UnexistingUpdate(u64),
#[error("Internal error: {0}")] #[error("Internal error: {0}")]
Internal(Box<dyn Error + Send + Sync + 'static>), Internal(Box<dyn Error + Send + Sync + 'static>),
//#[error("{0}")]
//IndexActor(#[from] IndexActorError),
#[error( #[error(
"update store was shut down due to a fatal error, please check your logs for more info." "update store was shut down due to a fatal error, please check your logs for more info."
)] )]

View File

@@ -21,7 +21,7 @@ use uuid::Uuid;
use self::error::{Result, UpdateLoopError}; use self::error::{Result, UpdateLoopError};
pub use self::message::UpdateMsg; pub use self::message::UpdateMsg;
use self::store::{UpdateStore, UpdateStoreInfo}; use self::store::{UpdateStore, UpdateStoreInfo};
use crate::document_formats::{read_csv, read_json}; use crate::document_formats::{read_csv, read_json, read_ndjson};
use crate::index::{Index, Settings, Unchecked}; use crate::index::{Index, Settings, Unchecked};
use crate::index_controller::update_file_store::UpdateFileStore; use crate::index_controller::update_file_store::UpdateFileStore;
use status::UpdateStatus; use status::UpdateStatus;
@@ -40,7 +40,7 @@ pub fn create_update_handler(
let (sender, receiver) = mpsc::channel(100); let (sender, receiver) = mpsc::channel(100);
let actor = UpdateLoop::new(update_store_size, receiver, path, index_resolver)?; let actor = UpdateLoop::new(update_store_size, receiver, path, index_resolver)?;
tokio::task::spawn_local(actor.run()); tokio::task::spawn(actor.run());
Ok(sender) Ok(sender)
} }
@@ -197,6 +197,7 @@ impl UpdateLoop {
match format { match format {
DocumentAdditionFormat::Json => read_json(reader, &mut *update_file)?, DocumentAdditionFormat::Json => read_json(reader, &mut *update_file)?,
DocumentAdditionFormat::Csv => read_csv(reader, &mut *update_file)?, DocumentAdditionFormat::Csv => read_csv(reader, &mut *update_file)?,
DocumentAdditionFormat::Ndjson => read_ndjson(reader, &mut *update_file)?,
} }
update_file.persist()?; update_file.persist()?;