mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-11-22 04:36:32 +00:00
Compare commits
9 Commits
v1.24.0
...
add-docume
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
de99e52474 | ||
|
|
1d9caa11fd | ||
|
|
b7724c65b8 | ||
|
|
35c75c2e34 | ||
|
|
c147724384 | ||
|
|
5572f0c2c8 | ||
|
|
669a3ff85f | ||
|
|
75c7fd6afa | ||
|
|
6d3be4e923 |
15
Cargo.lock
generated
15
Cargo.lock
generated
@@ -408,15 +408,6 @@ version = "1.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "199edb7b90631283b10c2422e6a0bc8b7d987bf732995ba1de53b576c97e51a8"
|
||||
|
||||
[[package]]
|
||||
name = "bimap"
|
||||
version = "0.6.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bc0455254eb5c6964c4545d8bac815e1a1be4f3afe0ae695ea539c12d728d44b"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "bincode"
|
||||
version = "1.3.3"
|
||||
@@ -1124,7 +1115,6 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "filter-parser"
|
||||
version = "0.29.3"
|
||||
source = "git+https://github.com/meilisearch/milli.git?tag=v0.29.3#f1d848bb9add86b9414d110a083dfa0462d5d636"
|
||||
dependencies = [
|
||||
"nom",
|
||||
"nom_locate",
|
||||
@@ -1149,7 +1139,6 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "flatten-serde-json"
|
||||
version = "0.29.3"
|
||||
source = "git+https://github.com/meilisearch/milli.git?tag=v0.29.3#f1d848bb9add86b9414d110a083dfa0462d5d636"
|
||||
dependencies = [
|
||||
"serde_json",
|
||||
]
|
||||
@@ -1662,7 +1651,6 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "json-depth-checker"
|
||||
version = "0.29.3"
|
||||
source = "git+https://github.com/meilisearch/milli.git?tag=v0.29.3#f1d848bb9add86b9414d110a083dfa0462d5d636"
|
||||
dependencies = [
|
||||
"serde_json",
|
||||
]
|
||||
@@ -2157,6 +2145,7 @@ name = "meilisearch-types"
|
||||
version = "0.28.0"
|
||||
dependencies = [
|
||||
"actix-web",
|
||||
"milli",
|
||||
"proptest",
|
||||
"proptest-derive",
|
||||
"serde",
|
||||
@@ -2190,9 +2179,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "milli"
|
||||
version = "0.29.3"
|
||||
source = "git+https://github.com/meilisearch/milli.git?tag=v0.29.3#f1d848bb9add86b9414d110a083dfa0462d5d636"
|
||||
dependencies = [
|
||||
"bimap",
|
||||
"bincode",
|
||||
"bstr",
|
||||
"byteorder",
|
||||
|
||||
@@ -8,7 +8,7 @@ base64 = "0.13.0"
|
||||
enum-iterator = "0.7.0"
|
||||
hmac = "0.12.1"
|
||||
meilisearch-types = { path = "../meilisearch-types" }
|
||||
milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.29.3" }
|
||||
milli = { path = "../../milli/milli" }
|
||||
rand = "0.8.4"
|
||||
serde = { version = "1.0.136", features = ["derive"] }
|
||||
serde_json = { version = "1.0.79", features = ["preserve_order"] }
|
||||
|
||||
@@ -1,17 +0,0 @@
|
||||
use meilisearch_lib::heed::Env;
|
||||
use walkdir::WalkDir;
|
||||
|
||||
pub trait EnvSizer {
|
||||
fn size(&self) -> u64;
|
||||
}
|
||||
|
||||
impl EnvSizer for Env {
|
||||
fn size(&self) -> u64 {
|
||||
WalkDir::new(self.path())
|
||||
.into_iter()
|
||||
.filter_map(|entry| entry.ok())
|
||||
.filter_map(|entry| entry.metadata().ok())
|
||||
.filter(|metadata| metadata.is_file())
|
||||
.fold(0, |acc, m| acc + m.len())
|
||||
}
|
||||
}
|
||||
@@ -1,3 +0,0 @@
|
||||
mod env;
|
||||
|
||||
pub use env::EnvSizer;
|
||||
@@ -5,7 +5,6 @@ pub mod analytics;
|
||||
pub mod task;
|
||||
#[macro_use]
|
||||
pub mod extractors;
|
||||
pub mod helpers;
|
||||
pub mod option;
|
||||
pub mod routes;
|
||||
|
||||
@@ -84,17 +83,19 @@ pub fn configure_data(
|
||||
web::JsonConfig::default()
|
||||
.content_type(|mime| mime == mime::APPLICATION_JSON)
|
||||
.error_handler(|err, req: &HttpRequest| match err {
|
||||
JsonPayloadError::ContentType => match req.headers().get(CONTENT_TYPE) {
|
||||
Some(content_type) => MeilisearchHttpError::InvalidContentType(
|
||||
content_type.to_str().unwrap_or("unknown").to_string(),
|
||||
vec![mime::APPLICATION_JSON.to_string()],
|
||||
)
|
||||
.into(),
|
||||
None => MeilisearchHttpError::MissingContentType(vec![
|
||||
mime::APPLICATION_JSON.to_string(),
|
||||
])
|
||||
.into(),
|
||||
},
|
||||
JsonPayloadError::ContentType => {
|
||||
match req.headers().get(CONTENT_TYPE) {
|
||||
Some(content_type) => MeilisearchHttpError::InvalidContentType(
|
||||
content_type.to_str().unwrap_or("unknown").to_string(),
|
||||
vec![mime::APPLICATION_JSON.to_string()],
|
||||
)
|
||||
.into(),
|
||||
None => MeilisearchHttpError::MissingContentType(vec![
|
||||
mime::APPLICATION_JSON.to_string(),
|
||||
])
|
||||
.into(),
|
||||
}
|
||||
}
|
||||
err => PayloadError::from(err).into(),
|
||||
}),
|
||||
)
|
||||
|
||||
@@ -326,7 +326,7 @@ async fn error_add_malformed_json_documents() {
|
||||
assert_eq!(
|
||||
response["message"],
|
||||
json!(
|
||||
r#"The `json` payload provided is malformed. `Couldn't serialize document value: invalid type: string "0123456789012345678901234567...890123456789", expected a documents, or a sequence of documents. at line 1 column 102`."#
|
||||
r#"The `json` payload provided is malformed. `Couldn't serialize document value: invalid type: string "0123456789012345678901234567...7890123456789", expected a document, or a sequence of documents. at line 1 column 102`."#
|
||||
)
|
||||
);
|
||||
assert_eq!(response["code"], json!("malformed_payload"));
|
||||
@@ -349,9 +349,7 @@ async fn error_add_malformed_json_documents() {
|
||||
assert_eq!(status_code, 400);
|
||||
assert_eq!(
|
||||
response["message"],
|
||||
json!(
|
||||
r#"The `json` payload provided is malformed. `Couldn't serialize document value: invalid type: string "0123456789012345678901234567...90123456789m", expected a documents, or a sequence of documents. at line 1 column 103`."#
|
||||
)
|
||||
json!("The `json` payload provided is malformed. `Couldn't serialize document value: invalid type: string \"0123456789012345678901234567...890123456789m\", expected a document, or a sequence of documents. at line 1 column 103`.")
|
||||
);
|
||||
assert_eq!(response["code"], json!("malformed_payload"));
|
||||
assert_eq!(response["type"], json!("invalid_request"));
|
||||
@@ -388,7 +386,7 @@ async fn error_add_malformed_ndjson_documents() {
|
||||
assert_eq!(
|
||||
response["message"],
|
||||
json!(
|
||||
r#"The `ndjson` payload provided is malformed. `Couldn't serialize document value: key must be a string at line 1 column 2`."#
|
||||
r#"The `ndjson` payload provided is malformed. `Couldn't serialize document value: key must be a string at line 2 column 2`."#
|
||||
)
|
||||
);
|
||||
assert_eq!(response["code"], json!("malformed_payload"));
|
||||
@@ -411,9 +409,7 @@ async fn error_add_malformed_ndjson_documents() {
|
||||
assert_eq!(status_code, 400);
|
||||
assert_eq!(
|
||||
response["message"],
|
||||
json!(
|
||||
r#"The `ndjson` payload provided is malformed. `Couldn't serialize document value: key must be a string at line 1 column 2`."#
|
||||
)
|
||||
json!("The `ndjson` payload provided is malformed. `Couldn't serialize document value: key must be a string at line 2 column 2`.")
|
||||
);
|
||||
assert_eq!(response["code"], json!("malformed_payload"));
|
||||
assert_eq!(response["type"], json!("invalid_request"));
|
||||
@@ -1020,7 +1016,7 @@ async fn add_documents_invalid_geo_field() {
|
||||
index.wait_task(2).await;
|
||||
let (response, code) = index.get_task(2).await;
|
||||
assert_eq!(code, 200);
|
||||
assert_eq!(response["status"], "succeeded");
|
||||
assert_eq!(response["status"], "failed");
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
|
||||
@@ -708,9 +708,7 @@ async fn faceting_max_values_per_facet() {
|
||||
}),
|
||||
|response, code| {
|
||||
assert_eq!(code, 200, "{}", response);
|
||||
let numbers = dbg!(&response)["facetDistribution"]["number"]
|
||||
.as_object()
|
||||
.unwrap();
|
||||
let numbers = &response["facetDistribution"]["number"].as_object().unwrap();
|
||||
assert_eq!(numbers.len(), 10_000);
|
||||
},
|
||||
)
|
||||
|
||||
@@ -30,7 +30,7 @@ lazy_static = "1.4.0"
|
||||
log = "0.4.14"
|
||||
meilisearch-auth = { path = "../meilisearch-auth" }
|
||||
meilisearch-types = { path = "../meilisearch-types" }
|
||||
milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.29.3" }
|
||||
milli = { path = "../../milli/milli" }
|
||||
mime = "0.3.16"
|
||||
num_cpus = "1.13.1"
|
||||
obkv = "0.2.0"
|
||||
|
||||
@@ -1,129 +0,0 @@
|
||||
use std::borrow::Borrow;
|
||||
use std::fmt::{self, Debug, Display};
|
||||
use std::io::{self, BufRead, BufReader, BufWriter, Cursor, Read, Seek, Write};
|
||||
|
||||
use meilisearch_types::error::{Code, ErrorCode};
|
||||
use meilisearch_types::internal_error;
|
||||
use milli::documents::DocumentBatchBuilder;
|
||||
|
||||
type Result<T> = std::result::Result<T, DocumentFormatError>;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum PayloadType {
|
||||
Ndjson,
|
||||
Json,
|
||||
Csv,
|
||||
}
|
||||
|
||||
impl fmt::Display for PayloadType {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
PayloadType::Ndjson => write!(f, "ndjson"),
|
||||
PayloadType::Json => write!(f, "json"),
|
||||
PayloadType::Csv => write!(f, "csv"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum DocumentFormatError {
|
||||
Internal(Box<dyn std::error::Error + Send + Sync + 'static>),
|
||||
MalformedPayload(Box<milli::documents::Error>, PayloadType),
|
||||
}
|
||||
|
||||
impl Display for DocumentFormatError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
Self::Internal(e) => write!(f, "An internal error has occurred: `{}`.", e),
|
||||
Self::MalformedPayload(me, b) => match me.borrow() {
|
||||
milli::documents::Error::JsonError(se) => {
|
||||
// https://github.com/meilisearch/meilisearch/issues/2107
|
||||
// The user input maybe insanely long. We need to truncate it.
|
||||
let mut serde_msg = se.to_string();
|
||||
let ellipsis = "...";
|
||||
if serde_msg.len() > 100 + ellipsis.len() {
|
||||
serde_msg.replace_range(50..serde_msg.len() - 85, ellipsis);
|
||||
}
|
||||
|
||||
write!(
|
||||
f,
|
||||
"The `{}` payload provided is malformed. `Couldn't serialize document value: {}`.",
|
||||
b, serde_msg
|
||||
)
|
||||
}
|
||||
_ => write!(f, "The `{}` payload provided is malformed: `{}`.", b, me),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for DocumentFormatError {}
|
||||
|
||||
impl From<(PayloadType, milli::documents::Error)> for DocumentFormatError {
|
||||
fn from((ty, error): (PayloadType, milli::documents::Error)) -> Self {
|
||||
match error {
|
||||
milli::documents::Error::Io(e) => Self::Internal(Box::new(e)),
|
||||
e => Self::MalformedPayload(Box::new(e), ty),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ErrorCode for DocumentFormatError {
|
||||
fn error_code(&self) -> Code {
|
||||
match self {
|
||||
DocumentFormatError::Internal(_) => Code::Internal,
|
||||
DocumentFormatError::MalformedPayload(_, _) => Code::MalformedPayload,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
internal_error!(DocumentFormatError: io::Error);
|
||||
|
||||
/// reads csv from input and write an obkv batch to writer.
|
||||
pub fn read_csv(input: impl Read, writer: impl Write + Seek) -> Result<usize> {
|
||||
let writer = BufWriter::new(writer);
|
||||
let builder =
|
||||
DocumentBatchBuilder::from_csv(input, writer).map_err(|e| (PayloadType::Csv, e))?;
|
||||
|
||||
let count = builder.finish().map_err(|e| (PayloadType::Csv, e))?;
|
||||
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
/// reads jsonl from input and write an obkv batch to writer.
|
||||
pub fn read_ndjson(input: impl Read, writer: impl Write + Seek) -> Result<usize> {
|
||||
let mut reader = BufReader::new(input);
|
||||
let writer = BufWriter::new(writer);
|
||||
|
||||
let mut builder = DocumentBatchBuilder::new(writer).map_err(|e| (PayloadType::Ndjson, e))?;
|
||||
let mut buf = String::new();
|
||||
|
||||
while reader.read_line(&mut buf)? > 0 {
|
||||
// skip empty lines
|
||||
if buf == "\n" {
|
||||
buf.clear();
|
||||
continue;
|
||||
}
|
||||
builder
|
||||
.extend_from_json(Cursor::new(&buf.as_bytes()))
|
||||
.map_err(|e| (PayloadType::Ndjson, e))?;
|
||||
buf.clear();
|
||||
}
|
||||
|
||||
let count = builder.finish().map_err(|e| (PayloadType::Ndjson, e))?;
|
||||
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
/// reads json from input and write an obkv batch to writer.
|
||||
pub fn read_json(input: impl Read, writer: impl Write + Seek) -> Result<usize> {
|
||||
let writer = BufWriter::new(writer);
|
||||
let mut builder = DocumentBatchBuilder::new(writer).map_err(|e| (PayloadType::Json, e))?;
|
||||
builder
|
||||
.extend_from_json(input)
|
||||
.map_err(|e| (PayloadType::Json, e))?;
|
||||
|
||||
let count = builder.finish().map_err(|e| (PayloadType::Json, e))?;
|
||||
|
||||
Ok(count)
|
||||
}
|
||||
@@ -11,11 +11,12 @@ pub enum DumpError {
|
||||
#[error("An internal error has occurred. `{0}`.")]
|
||||
Internal(Box<dyn std::error::Error + Send + Sync + 'static>),
|
||||
#[error("{0}")]
|
||||
IndexResolver(#[from] IndexResolverError),
|
||||
IndexResolver(#[from] Box<IndexResolverError>),
|
||||
}
|
||||
|
||||
internal_error!(
|
||||
DumpError: milli::heed::Error,
|
||||
IndexResolverError,
|
||||
std::io::Error,
|
||||
tokio::task::JoinError,
|
||||
tokio::sync::oneshot::error::RecvError,
|
||||
|
||||
@@ -32,7 +32,9 @@ impl ErrorCode for MilliError<'_> {
|
||||
UserError::AttributeLimitReached => Code::MaxFieldsLimitExceeded,
|
||||
UserError::InvalidFilter(_) => Code::Filter,
|
||||
UserError::MissingDocumentId { .. } => Code::MissingDocumentId,
|
||||
UserError::InvalidDocumentId { .. } => Code::InvalidDocumentId,
|
||||
UserError::InvalidDocumentId { .. } | UserError::TooManyDocumentIds { .. } => {
|
||||
Code::InvalidDocumentId
|
||||
}
|
||||
UserError::MissingPrimaryKey => Code::MissingPrimaryKey,
|
||||
UserError::PrimaryKeyCannotBeChanged(_) => Code::PrimaryKeyAlreadyPresent,
|
||||
UserError::SortRankingRuleMissing => Code::Sort,
|
||||
|
||||
@@ -4,13 +4,13 @@ use std::path::Path;
|
||||
|
||||
use anyhow::Context;
|
||||
use indexmap::IndexMap;
|
||||
use milli::documents::DocumentBatchReader;
|
||||
use milli::documents::DocumentsBatchReader;
|
||||
use milli::heed::{EnvOpenOptions, RoTxn};
|
||||
use milli::update::{IndexDocumentsConfig, IndexerConfig};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::document_formats::read_ndjson;
|
||||
use crate::index::updates::apply_settings_to_builder;
|
||||
use milli::documents::document_formats::read_ndjson;
|
||||
|
||||
use super::error::Result;
|
||||
use super::{index::Index, Settings, Unchecked};
|
||||
@@ -27,7 +27,7 @@ const DATA_FILE_NAME: &str = "documents.jsonl";
|
||||
impl Index {
|
||||
pub fn dump(&self, path: impl AsRef<Path>) -> Result<()> {
|
||||
// acquire write txn make sure any ongoing write is finished before we start.
|
||||
let txn = self.env.write_txn()?;
|
||||
let txn = self.write_txn()?;
|
||||
let path = path.as_ref().join(format!("indexes/{}", self.uuid));
|
||||
|
||||
create_dir_all(&path)?;
|
||||
@@ -135,19 +135,20 @@ impl Index {
|
||||
if !empty {
|
||||
tmp_doc_file.seek(SeekFrom::Start(0))?;
|
||||
|
||||
let documents_reader = DocumentBatchReader::from_reader(tmp_doc_file)?;
|
||||
let documents_reader = DocumentsBatchReader::from_reader(tmp_doc_file)?;
|
||||
|
||||
//If the document file is empty, we don't perform the document addition, to prevent
|
||||
//a primary key error to be thrown.
|
||||
let config = IndexDocumentsConfig::default();
|
||||
let mut builder = milli::update::IndexDocuments::new(
|
||||
let builder = milli::update::IndexDocuments::new(
|
||||
&mut txn,
|
||||
&index,
|
||||
indexer_config,
|
||||
config,
|
||||
|_| (),
|
||||
)?;
|
||||
builder.add_documents(documents_reader)?;
|
||||
let (builder, user_error) = builder.add_documents(documents_reader)?;
|
||||
user_error?;
|
||||
builder.execute()?;
|
||||
}
|
||||
|
||||
|
||||
@@ -40,6 +40,12 @@ impl ErrorCode for IndexError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<milli::UserError> for IndexError {
|
||||
fn from(error: milli::UserError) -> IndexError {
|
||||
IndexError::Milli(error.into())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum FacetError {
|
||||
#[error("Invalid syntax for the filter parameter: `expected {}, found: {1}`.", .0.join(", "))]
|
||||
|
||||
@@ -6,16 +6,16 @@ use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
|
||||
use fst::IntoStreamer;
|
||||
use milli::heed::{EnvOpenOptions, RoTxn};
|
||||
use milli::heed::{CompactionOption, EnvOpenOptions, RoTxn};
|
||||
use milli::update::{IndexerConfig, Setting};
|
||||
use milli::{obkv_to_json, FieldDistribution, DEFAULT_VALUES_PER_FACET};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::{Map, Value};
|
||||
use time::OffsetDateTime;
|
||||
use uuid::Uuid;
|
||||
use walkdir::WalkDir;
|
||||
|
||||
use crate::index::search::DEFAULT_PAGINATION_LIMITED_TO;
|
||||
use crate::EnvSizer;
|
||||
|
||||
use super::error::IndexError;
|
||||
use super::error::Result;
|
||||
@@ -245,11 +245,8 @@ impl Index {
|
||||
let fields_ids_map = self.fields_ids_map(&txn)?;
|
||||
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
|
||||
|
||||
let iter = self.documents.range(&txn, &(..))?.skip(offset).take(limit);
|
||||
|
||||
let mut documents = Vec::new();
|
||||
|
||||
for entry in iter {
|
||||
for entry in self.all_documents(&txn)?.skip(offset).take(limit) {
|
||||
let (_id, obkv) = entry?;
|
||||
let document = obkv_to_json(&all_fields, &fields_ids_map, obkv)?;
|
||||
let document = match &attributes_to_retrieve {
|
||||
@@ -302,7 +299,12 @@ impl Index {
|
||||
}
|
||||
|
||||
pub fn size(&self) -> u64 {
|
||||
self.env.size()
|
||||
WalkDir::new(self.path())
|
||||
.into_iter()
|
||||
.filter_map(|entry| entry.ok())
|
||||
.filter_map(|entry| entry.metadata().ok())
|
||||
.filter(|metadata| metadata.is_file())
|
||||
.fold(0, |acc, m| acc + m.len())
|
||||
}
|
||||
|
||||
pub fn snapshot(&self, path: impl AsRef<Path>) -> Result<()> {
|
||||
@@ -310,9 +312,8 @@ impl Index {
|
||||
create_dir_all(&dst)?;
|
||||
dst.push("data.mdb");
|
||||
let _txn = self.write_txn()?;
|
||||
self.inner
|
||||
.env
|
||||
.copy_to_path(dst, milli::heed::CompactionOption::Enabled)?;
|
||||
self.inner.copy_to_path(dst, CompactionOption::Enabled)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,12 +24,12 @@ pub use test::MockIndex as Index;
|
||||
/// code for unit testing, in places where an index would normally be used.
|
||||
#[cfg(test)]
|
||||
pub mod test {
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
|
||||
use milli::update::IndexerConfig;
|
||||
use milli::update::{DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsMethod};
|
||||
use milli::update::{
|
||||
DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsMethod, IndexerConfig,
|
||||
};
|
||||
use nelson::Mocker;
|
||||
use uuid::Uuid;
|
||||
|
||||
@@ -162,7 +162,7 @@ pub mod test {
|
||||
primary_key: Option<String>,
|
||||
file_store: UpdateFileStore,
|
||||
contents: impl Iterator<Item = Uuid>,
|
||||
) -> Result<DocumentAdditionResult> {
|
||||
) -> Result<Vec<Result<DocumentAdditionResult>>> {
|
||||
match self {
|
||||
MockIndex::Real(index) => {
|
||||
index.update_documents(method, primary_key, file_store, contents)
|
||||
|
||||
@@ -3,7 +3,7 @@ use std::marker::PhantomData;
|
||||
use std::num::NonZeroUsize;
|
||||
|
||||
use log::{debug, info, trace};
|
||||
use milli::documents::DocumentBatchReader;
|
||||
use milli::documents::DocumentsBatchReader;
|
||||
use milli::update::{
|
||||
DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsConfig, IndexDocumentsMethod,
|
||||
Setting,
|
||||
@@ -11,7 +11,7 @@ use milli::update::{
|
||||
use serde::{Deserialize, Serialize, Serializer};
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::error::Result;
|
||||
use super::error::{IndexError, Result};
|
||||
use super::index::{Index, IndexMeta};
|
||||
use crate::update_file_store::UpdateFileStore;
|
||||
|
||||
@@ -299,7 +299,7 @@ impl Index {
|
||||
primary_key: Option<String>,
|
||||
file_store: UpdateFileStore,
|
||||
contents: impl IntoIterator<Item = Uuid>,
|
||||
) -> Result<DocumentAdditionResult> {
|
||||
) -> Result<Vec<Result<DocumentAdditionResult>>> {
|
||||
trace!("performing document addition");
|
||||
let mut txn = self.write_txn()?;
|
||||
|
||||
@@ -323,19 +323,34 @@ impl Index {
|
||||
indexing_callback,
|
||||
)?;
|
||||
|
||||
let mut results = Vec::new();
|
||||
for content_uuid in contents.into_iter() {
|
||||
let content_file = file_store.get_update(content_uuid)?;
|
||||
let reader = DocumentBatchReader::from_reader(content_file)?;
|
||||
builder.add_documents(reader)?;
|
||||
let reader = DocumentsBatchReader::from_reader(content_file)?;
|
||||
let (new_builder, user_result) = builder.add_documents(reader)?;
|
||||
builder = new_builder;
|
||||
|
||||
let user_result = match user_result {
|
||||
Ok(count) => {
|
||||
let addition = DocumentAdditionResult {
|
||||
indexed_documents: count,
|
||||
number_of_documents: count,
|
||||
};
|
||||
info!("document addition done: {:?}", addition);
|
||||
Ok(addition)
|
||||
}
|
||||
Err(e) => Err(IndexError::from(e)),
|
||||
};
|
||||
|
||||
results.push(user_result);
|
||||
}
|
||||
|
||||
let addition = builder.execute()?;
|
||||
if results.iter().any(Result::is_ok) {
|
||||
let _addition = builder.execute()?;
|
||||
txn.commit()?;
|
||||
}
|
||||
|
||||
txn.commit()?;
|
||||
|
||||
info!("document addition done: {:?}", addition);
|
||||
|
||||
Ok(addition)
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
pub fn update_settings(&self, settings: &Settings<Checked>) -> Result<()> {
|
||||
|
||||
@@ -6,11 +6,11 @@ use meilisearch_types::internal_error;
|
||||
use tokio::task::JoinError;
|
||||
|
||||
use super::DocumentAdditionFormat;
|
||||
use crate::document_formats::DocumentFormatError;
|
||||
use crate::dump::error::DumpError;
|
||||
use crate::index::error::IndexError;
|
||||
use crate::tasks::error::TaskError;
|
||||
use crate::update_file_store::UpdateFileStoreError;
|
||||
use milli::documents::document_formats::DocumentFormatError;
|
||||
|
||||
use crate::index_resolver::error::IndexResolverError;
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use meilisearch_auth::SearchRules;
|
||||
use std::collections::BTreeMap;
|
||||
use std::fmt;
|
||||
use std::io::Cursor;
|
||||
use std::io::{BufWriter, Cursor};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
@@ -20,7 +20,6 @@ use tokio::task::spawn_blocking;
|
||||
use tokio::time::sleep;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::document_formats::{read_csv, read_json, read_ndjson};
|
||||
use crate::dump::{self, load_dump, DumpHandler};
|
||||
use crate::index::{
|
||||
Checked, Document, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings, Unchecked,
|
||||
@@ -34,6 +33,7 @@ use crate::tasks::{
|
||||
BatchHandler, EmptyBatchHandler, Scheduler, SnapshotHandler, TaskFilter, TaskStore,
|
||||
};
|
||||
use error::Result;
|
||||
use milli::documents::document_formats::{read_csv, read_json, read_ndjson};
|
||||
|
||||
use self::error::IndexControllerError;
|
||||
use crate::index_resolver::index_store::{IndexStore, MapIndexStore};
|
||||
@@ -392,6 +392,7 @@ where
|
||||
}
|
||||
let (content_uuid, mut update_file) = self.update_file_store.new_update()?;
|
||||
let documents_count = tokio::task::spawn_blocking(move || -> Result<_> {
|
||||
let writer = BufWriter::new(&mut *update_file);
|
||||
// check if the payload is empty, and return an error
|
||||
if buffer.is_empty() {
|
||||
return Err(IndexControllerError::MissingPayload(format));
|
||||
@@ -399,9 +400,9 @@ where
|
||||
|
||||
let reader = Cursor::new(buffer);
|
||||
let count = match format {
|
||||
DocumentAdditionFormat::Json => read_json(reader, &mut *update_file)?,
|
||||
DocumentAdditionFormat::Csv => read_csv(reader, &mut *update_file)?,
|
||||
DocumentAdditionFormat::Ndjson => read_ndjson(reader, &mut *update_file)?,
|
||||
DocumentAdditionFormat::Json => read_json(reader, writer)?,
|
||||
DocumentAdditionFormat::Csv => read_csv(reader, writer)?,
|
||||
DocumentAdditionFormat::Ndjson => read_ndjson(reader, writer)?,
|
||||
};
|
||||
|
||||
update_file.persist()?;
|
||||
|
||||
@@ -150,25 +150,34 @@ mod real {
|
||||
})
|
||||
.await;
|
||||
|
||||
let event = match result {
|
||||
Ok(Ok(result)) => TaskEvent::Succeeded {
|
||||
timestamp: OffsetDateTime::now_utc(),
|
||||
result: TaskResult::DocumentAddition {
|
||||
indexed_documents: result.indexed_documents,
|
||||
},
|
||||
},
|
||||
Ok(Err(e)) => TaskEvent::Failed {
|
||||
timestamp: OffsetDateTime::now_utc(),
|
||||
error: e.into(),
|
||||
},
|
||||
Err(e) => TaskEvent::Failed {
|
||||
timestamp: OffsetDateTime::now_utc(),
|
||||
error: IndexResolverError::from(e).into(),
|
||||
},
|
||||
};
|
||||
|
||||
for task in tasks.iter_mut() {
|
||||
task.events.push(event.clone());
|
||||
match result {
|
||||
Ok(Ok(results)) => {
|
||||
for (task, result) in tasks.iter_mut().zip(results) {
|
||||
let event = match result {
|
||||
Ok(addition) => {
|
||||
TaskEvent::succeeded(TaskResult::DocumentAddition {
|
||||
indexed_documents: addition.indexed_documents,
|
||||
})
|
||||
}
|
||||
Err(error) => {
|
||||
TaskEvent::failed(IndexResolverError::from(error))
|
||||
}
|
||||
};
|
||||
task.events.push(event);
|
||||
}
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
let event = TaskEvent::failed(e);
|
||||
for task in tasks.iter_mut() {
|
||||
task.events.push(event.clone());
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
let event = TaskEvent::failed(IndexResolverError::from(e));
|
||||
for task in tasks.iter_mut() {
|
||||
task.events.push(event.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => panic!("invalid batch!"),
|
||||
|
||||
@@ -18,7 +18,6 @@ pub use milli;
|
||||
pub use milli::heed;
|
||||
|
||||
mod compression;
|
||||
pub mod document_formats;
|
||||
|
||||
use walkdir::WalkDir;
|
||||
|
||||
|
||||
@@ -181,9 +181,7 @@ impl SnapshotJob {
|
||||
let mut options = milli::heed::EnvOpenOptions::new();
|
||||
options.map_size(self.index_size);
|
||||
let index = milli::Index::new(options, entry.path())?;
|
||||
index
|
||||
.env
|
||||
.copy_to_path(dst, milli::heed::CompactionOption::Enabled)?;
|
||||
index.copy_to_path(dst, milli::heed::CompactionOption::Enabled)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -3,8 +3,7 @@ use std::io::{self, BufReader, BufWriter, Write};
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use milli::documents::DocumentBatchReader;
|
||||
use serde_json::Map;
|
||||
use milli::documents::DocumentsBatchReader;
|
||||
use tempfile::{NamedTempFile, PersistError};
|
||||
use uuid::Uuid;
|
||||
|
||||
@@ -15,7 +14,7 @@ pub use test::MockUpdateFileStore as UpdateFileStore;
|
||||
|
||||
const UPDATE_FILES_PATH: &str = "updates/updates_files";
|
||||
|
||||
use crate::document_formats::read_ndjson;
|
||||
use milli::documents::document_formats::read_ndjson;
|
||||
|
||||
pub struct UpdateFile {
|
||||
path: PathBuf,
|
||||
@@ -44,7 +43,8 @@ into_update_store_error!(
|
||||
PersistError,
|
||||
io::Error,
|
||||
serde_json::Error,
|
||||
milli::documents::Error
|
||||
milli::documents::Error,
|
||||
milli::documents::DocumentsBatchCursorError
|
||||
);
|
||||
|
||||
impl UpdateFile {
|
||||
@@ -149,23 +149,14 @@ mod store {
|
||||
|
||||
let update_file = File::open(update_file_path)?;
|
||||
let mut dst_file = NamedTempFile::new_in(&dump_path)?;
|
||||
let mut document_reader = DocumentBatchReader::from_reader(update_file)?;
|
||||
let mut document_cursor = DocumentsBatchReader::from_reader(update_file)?.into_cursor();
|
||||
|
||||
let mut document_buffer = Map::new();
|
||||
// TODO: we need to find a way to do this more efficiently. (create a custom serializer
|
||||
// for jsonl for example...)
|
||||
while let Some((index, document)) = document_reader.next_document_with_index()? {
|
||||
for (field_id, content) in document.iter() {
|
||||
if let Some(field_name) = index.name(field_id) {
|
||||
let content = serde_json::from_slice(content)?;
|
||||
document_buffer.insert(field_name.to_string(), content);
|
||||
}
|
||||
}
|
||||
|
||||
serde_json::to_writer(&mut dst_file, &document_buffer)?;
|
||||
dst_file.write_all(b"\n")?;
|
||||
document_buffer.clear();
|
||||
let mut dst_file_buf_writer = BufWriter::new(&mut dst_file);
|
||||
while let Some(document) = document_cursor.next_document()? {
|
||||
serde_json::to_writer(&mut dst_file_buf_writer, &document)?;
|
||||
dst_file_buf_writer.write_all(b"\n")?;
|
||||
}
|
||||
drop(dst_file_buf_writer);
|
||||
|
||||
dst_file.persist(dst)?;
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ proptest = { version = "1.0.0", optional = true }
|
||||
proptest-derive = { version = "0.3.0", optional = true }
|
||||
serde = { version = "1.0.136", features = ["derive"] }
|
||||
serde_json = "1.0.79"
|
||||
milli = { path = "../../milli/milli" }
|
||||
|
||||
[features]
|
||||
test-traits = ["proptest", "proptest-derive"]
|
||||
|
||||
@@ -94,6 +94,17 @@ pub trait ErrorCode: std::error::Error {
|
||||
}
|
||||
}
|
||||
|
||||
impl ErrorCode for milli::documents::document_formats::DocumentFormatError {
|
||||
fn error_code(&self) -> Code {
|
||||
match self {
|
||||
milli::documents::document_formats::DocumentFormatError::Internal(_) => Code::Internal,
|
||||
milli::documents::document_formats::DocumentFormatError::MalformedPayload(_, _) => {
|
||||
Code::MalformedPayload
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
enum ErrorType {
|
||||
InternalError,
|
||||
|
||||
@@ -49,7 +49,7 @@ fn contained_in(selector: &str, key: &str) -> bool {
|
||||
/// map_leaf_values(
|
||||
/// value.as_object_mut().unwrap(),
|
||||
/// ["jean.race.name"],
|
||||
/// |key, value| match (value, dbg!(key)) {
|
||||
/// |key, value| match (value, key) {
|
||||
/// (Value::String(name), "jean.race.name") => *name = "patou".to_string(),
|
||||
/// _ => unreachable!(),
|
||||
/// },
|
||||
@@ -729,7 +729,7 @@ mod tests {
|
||||
map_leaf_values(
|
||||
value.as_object_mut().unwrap(),
|
||||
["jean.race.name"],
|
||||
|key, value| match (value, dbg!(key)) {
|
||||
|key, value| match (value, key) {
|
||||
(Value::String(name), "jean.race.name") => *name = S("patou"),
|
||||
_ => unreachable!(),
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user