Compare commits

..

1 Commits

Author SHA1 Message Date
Louis Dureuil
cd58a71f57 panic on serde json 2025-01-29 10:13:02 +01:00
22 changed files with 104 additions and 115 deletions

34
Cargo.lock generated
View File

@@ -496,7 +496,7 @@ source = "git+https://github.com/meilisearch/bbqueue#cbb87cc707b5af415ef203bdaf2
[[package]]
name = "benchmarks"
version = "1.12.8"
version = "1.12.7"
dependencies = [
"anyhow",
"bumpalo",
@@ -689,7 +689,7 @@ dependencies = [
[[package]]
name = "build-info"
version = "1.12.8"
version = "1.12.7"
dependencies = [
"anyhow",
"time",
@@ -1664,7 +1664,7 @@ dependencies = [
[[package]]
name = "dump"
version = "1.12.8"
version = "1.12.7"
dependencies = [
"anyhow",
"big_s",
@@ -1876,7 +1876,7 @@ checksum = "486f806e73c5707928240ddc295403b1b93c96a02038563881c4a2fd84b81ac4"
[[package]]
name = "file-store"
version = "1.12.8"
version = "1.12.7"
dependencies = [
"tempfile",
"thiserror",
@@ -1898,7 +1898,7 @@ dependencies = [
[[package]]
name = "filter-parser"
version = "1.12.8"
version = "1.12.7"
dependencies = [
"insta",
"nom",
@@ -1918,7 +1918,7 @@ dependencies = [
[[package]]
name = "flatten-serde-json"
version = "1.12.8"
version = "1.12.7"
dependencies = [
"criterion",
"serde_json",
@@ -2057,7 +2057,7 @@ dependencies = [
[[package]]
name = "fuzzers"
version = "1.12.8"
version = "1.12.7"
dependencies = [
"arbitrary",
"bumpalo",
@@ -2624,7 +2624,7 @@ checksum = "206ca75c9c03ba3d4ace2460e57b189f39f43de612c2f85836e65c929701bb2d"
[[package]]
name = "index-scheduler"
version = "1.12.8"
version = "1.12.7"
dependencies = [
"anyhow",
"arroy 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -2822,7 +2822,7 @@ dependencies = [
[[package]]
name = "json-depth-checker"
version = "1.12.8"
version = "1.12.7"
dependencies = [
"criterion",
"serde_json",
@@ -3441,7 +3441,7 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
[[package]]
name = "meili-snap"
version = "1.12.8"
version = "1.12.7"
dependencies = [
"insta",
"md5",
@@ -3450,7 +3450,7 @@ dependencies = [
[[package]]
name = "meilisearch"
version = "1.12.8"
version = "1.12.7"
dependencies = [
"actix-cors",
"actix-http",
@@ -3540,7 +3540,7 @@ dependencies = [
[[package]]
name = "meilisearch-auth"
version = "1.12.8"
version = "1.12.7"
dependencies = [
"base64 0.22.1",
"enum-iterator",
@@ -3559,7 +3559,7 @@ dependencies = [
[[package]]
name = "meilisearch-types"
version = "1.12.8"
version = "1.12.7"
dependencies = [
"actix-web",
"anyhow",
@@ -3592,7 +3592,7 @@ dependencies = [
[[package]]
name = "meilitool"
version = "1.12.8"
version = "1.12.7"
dependencies = [
"anyhow",
"arroy 0.5.0 (git+https://github.com/meilisearch/arroy/?tag=DO-NOT-DELETE-upgrade-v04-to-v05)",
@@ -3627,7 +3627,7 @@ dependencies = [
[[package]]
name = "milli"
version = "1.12.8"
version = "1.12.7"
dependencies = [
"allocator-api2",
"arroy 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -4083,7 +4083,7 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
[[package]]
name = "permissive-json-pointer"
version = "1.12.8"
version = "1.12.7"
dependencies = [
"big_s",
"serde_json",
@@ -6486,7 +6486,7 @@ dependencies = [
[[package]]
name = "xtask"
version = "1.12.8"
version = "1.12.7"
dependencies = [
"anyhow",
"build-info",

View File

@@ -22,7 +22,7 @@ members = [
]
[workspace.package]
version = "1.12.8"
version = "1.12.7"
authors = [
"Quentin de Quelen <quentin@dequelen.me>",
"Clément Renault <clement@meilisearch.com>",

View File

@@ -839,12 +839,14 @@ impl IndexScheduler {
for document in
serde_json::de::Deserializer::from_reader(content_file).into_iter()
{
let document = document.map_err(|e| {
Error::from_milli(
milli::InternalError::SerdeJson(e).into(),
None,
)
})?;
let document = document
.map_err(|e| {
Error::from_milli(
milli::InternalError::SerdeJson(e).into(),
None,
)
})
.unwrap();
dump_content_file.push_document(&document)?;
}

View File

@@ -305,9 +305,6 @@ fn create_or_open_index(
let mut options = EnvOpenOptions::new();
options.map_size(clamp_to_page_size(map_size));
// You can find more details about this experimental
// environment variable on the following GitHub discussion:
// <https://github.com/orgs/meilisearch/discussions/806>
let max_readers = match std::env::var("MEILI_EXPERIMENTAL_INDEX_MAX_READERS") {
Ok(value) => u32::from_str(&value).unwrap(),
Err(VarError::NotPresent) => 1024,

View File

@@ -2024,9 +2024,11 @@ impl<'a> Dump<'a> {
let mut writer = io::BufWriter::new(file);
for doc in content_file {
let doc = doc?;
serde_json::to_writer(&mut writer, &doc).map_err(|e| {
Error::from_milli(milli::InternalError::SerdeJson(e).into(), None)
})?;
serde_json::to_writer(&mut writer, &doc)
.map_err(|e| {
Error::from_milli(milli::InternalError::SerdeJson(e).into(), None)
})
.unwrap();
}
let file = writer.into_inner().map_err(|e| e.into_error())?;
file.persist()?;

View File

@@ -1337,7 +1337,7 @@ impl<'a> HitMaker<'a> {
ExplicitVectors { embeddings: Some(vector.into()), regenerate: !user_provided };
vectors.insert(
name,
serde_json::to_value(embeddings).map_err(InternalError::SerdeJson)?,
serde_json::to_value(embeddings).map_err(InternalError::SerdeJson).unwrap(),
);
}
document.insert("_vectors".into(), vectors.into());
@@ -1717,7 +1717,7 @@ fn make_document(
// recreate the original json
for (key, value) in obkv.iter() {
let value = serde_json::from_slice(value).map_err(InternalError::SerdeJson)?;
let value = serde_json::from_slice(value).map_err(InternalError::SerdeJson).unwrap();
let key = field_ids_map.name(key).expect("Missing field name").to_string();
document.insert(key, value);

View File

@@ -33,7 +33,7 @@ pub fn obkv_to_object(obkv: &KvReader<FieldId>, index: &DocumentsBatchIndex) ->
let field_name = index
.name(field_id)
.ok_or(FieldIdMapMissingEntry::FieldId { field_id, process: "obkv_to_object" })?;
let value = serde_json::from_slice(value).map_err(InternalError::SerdeJson)?;
let value = serde_json::from_slice(value).map_err(InternalError::SerdeJson).unwrap();
Ok((field_name.to_string(), value))
})
.collect()
@@ -84,7 +84,8 @@ impl DocumentsBatchIndex {
let key =
self.0.get_by_left(&k).ok_or(crate::error::InternalError::DatabaseClosing)?.clone();
let value = serde_json::from_slice::<serde_json::Value>(v)
.map_err(crate::error::InternalError::SerdeJson)?;
.map_err(crate::error::InternalError::SerdeJson)
.unwrap();
map.insert(key, value);
}

View File

@@ -92,7 +92,8 @@ impl<'a> PrimaryKey<'a> {
PrimaryKey::Flat { name: _, field_id } => match document.get(*field_id) {
Some(document_id_bytes) => {
let document_id = serde_json::from_slice(document_id_bytes)
.map_err(InternalError::SerdeJson)?;
.map_err(InternalError::SerdeJson)
.unwrap();
match validate_document_id_value(document_id) {
Ok(document_id) => Ok(Ok(document_id)),
Err(user_error) => {
@@ -108,7 +109,8 @@ impl<'a> PrimaryKey<'a> {
if let Some(field_id) = fields.id(first_level_name) {
if let Some(value_bytes) = document.get(field_id) {
let object = serde_json::from_slice(value_bytes)
.map_err(InternalError::SerdeJson)?;
.map_err(InternalError::SerdeJson)
.unwrap();
fetch_matching_values(object, right, &mut matching_documents_ids);
if matching_documents_ids.len() >= 2 {
@@ -151,11 +153,12 @@ impl<'a> PrimaryKey<'a> {
};
let document_id: &RawValue =
serde_json::from_slice(document_id).map_err(InternalError::SerdeJson)?;
serde_json::from_slice(document_id).map_err(InternalError::SerdeJson).unwrap();
let document_id = document_id
.deserialize_any(crate::update::new::indexer::de::DocumentIdVisitor(indexer))
.map_err(InternalError::SerdeJson)?;
.map_err(InternalError::SerdeJson)
.unwrap();
let external_document_id = match document_id {
Ok(document_id) => Ok(document_id),
@@ -173,7 +176,7 @@ impl<'a> PrimaryKey<'a> {
let Some(value) = document.get(fid) else { continue };
let value: &RawValue =
serde_json::from_slice(value).map_err(InternalError::SerdeJson)?;
serde_json::from_slice(value).map_err(InternalError::SerdeJson).unwrap();
match match_component(first_level, right, value, indexer, &mut docid) {
ControlFlow::Continue(()) => continue,
ControlFlow::Break(Ok(_)) => {
@@ -183,7 +186,7 @@ impl<'a> PrimaryKey<'a> {
.into())
}
ControlFlow::Break(Err(err)) => {
return Err(InternalError::SerdeJson(err).into())
panic!("{err}");
}
}
}

View File

@@ -228,7 +228,8 @@ pub fn obkv_to_json(
field_id: id,
process: "obkv_to_json",
})?;
let value = serde_json::from_slice(value).map_err(error::InternalError::SerdeJson)?;
let value =
serde_json::from_slice(value).map_err(error::InternalError::SerdeJson).unwrap();
Ok((name.to_owned(), value))
})
.collect()

View File

@@ -123,7 +123,8 @@ pub fn enrich_documents_batch<R: Read + Seek>(
}
}
let document_id = serde_json::to_vec(&document_id).map_err(InternalError::SerdeJson)?;
let document_id =
serde_json::to_vec(&document_id).map_err(InternalError::SerdeJson).unwrap();
external_ids.insert(count.to_be_bytes(), document_id)?;
count += 1;
@@ -237,7 +238,7 @@ pub fn validate_geo_from_json(id: &DocumentId, bytes: &[u8]) -> Result<StdResult
let debug_id = || {
serde_json::from_slice(id.value().as_bytes()).unwrap_or_else(|_| Value::from(id.debug()))
};
match serde_json::from_slice(bytes).map_err(InternalError::SerdeJson)? {
match serde_json::from_slice(bytes).map_err(InternalError::SerdeJson).unwrap() {
Value::Object(mut object) => match (object.remove("lat"), object.remove("lng")) {
(Some(lat), Some(lng)) => {
match (extract_finite_float_from_value(lat), extract_finite_float_from_value(lng)) {

View File

@@ -206,7 +206,7 @@ fn tokens_from_document<'a>(
if let Some(field_bytes) = KvReaderDelAdd::from_slice(field_bytes).get(del_add) {
// parse json.
let value =
serde_json::from_slice(field_bytes).map_err(InternalError::SerdeJson)?;
serde_json::from_slice(field_bytes).map_err(InternalError::SerdeJson).unwrap();
// prepare writing destination.
buffers.obkv_positions_buffer.clear();

View File

@@ -86,7 +86,7 @@ impl<'t, Mapper: FieldIdMapper> Document<'t> for DocumentFromDb<'t, Mapper> {
let res = (|| {
let value =
serde_json::from_slice(value).map_err(crate::InternalError::SerdeJson)?;
serde_json::from_slice(value).map_err(crate::InternalError::SerdeJson).unwrap();
Ok((name, value))
})();
@@ -139,7 +139,7 @@ impl<'t, Mapper: FieldIdMapper> DocumentFromDb<'t, Mapper> {
return Ok(None);
};
let Some(value) = self.content.get(fid) else { return Ok(None) };
Ok(Some(serde_json::from_slice(value).map_err(InternalError::SerdeJson)?))
Ok(Some(serde_json::from_slice(value).map_err(InternalError::SerdeJson).unwrap()))
}
}

View File

@@ -27,7 +27,7 @@ pub fn extract_document_facets<'doc>(
let selection = perm_json_p::select_field(field_name, Some(attributes_to_extract), &[]);
if selection != perm_json_p::Selection::Skip {
// parse json.
match serde_json::value::to_value(value).map_err(InternalError::SerdeJson)? {
match serde_json::value::to_value(value).map_err(InternalError::SerdeJson).unwrap() {
Value::Object(object) => {
perm_json_p::seek_leaf_values_in_object(
&object,

View File

@@ -256,15 +256,16 @@ pub fn extract_geo_coordinates(
external_id: &str,
raw_value: &RawValue,
) -> Result<Option<[f64; 2]>> {
let mut geo = match serde_json::from_str(raw_value.get()).map_err(InternalError::SerdeJson)? {
Value::Null => return Ok(None),
Value::Object(map) => map,
value => {
return Err(
GeoError::NotAnObject { document_id: Value::from(external_id), value }.into()
)
}
};
let mut geo =
match serde_json::from_str(raw_value.get()).map_err(InternalError::SerdeJson).unwrap() {
Value::Null => return Ok(None),
Value::Object(map) => map,
value => {
return Err(
GeoError::NotAnObject { document_id: Value::from(external_id), value }.into()
)
}
};
let [lat, lng] = match (geo.remove("lat"), geo.remove("lng")) {
(Some(lat), Some(lng)) => {

View File

@@ -94,7 +94,7 @@ impl<'a> DocumentTokenizer<'a> {
};
// parse json.
match serde_json::to_value(value).map_err(InternalError::SerdeJson)? {
match serde_json::to_value(value).map_err(InternalError::SerdeJson).unwrap() {
Value::Object(object) => seek_leaf_values_in_object(
&object,
None,

View File

@@ -158,7 +158,7 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>(
let mut previous_offset = 0;
let mut iter = Deserializer::from_slice(payload).into_iter::<&RawValue>();
while let Some(doc) = iter.next().transpose().map_err(InternalError::SerdeJson)? {
while let Some(doc) = iter.next().transpose().map_err(InternalError::SerdeJson).unwrap() {
*bytes = previous_offset as u64;
// Only guess the primary key if it is the first document

View File

@@ -78,7 +78,8 @@ where
let external_document_id = external_document_id.to_de();
let document = RawMap::from_raw_value_and_hasher(document, FxBuildHasher, doc_alloc)
.map_err(InternalError::SerdeJson)?;
.map_err(InternalError::SerdeJson)
.unwrap();
let insertion = Insertion::create(docid, external_document_id, Versions::single(document));
Ok(Some(DocumentChange::Insertion(insertion)))

View File

@@ -58,9 +58,9 @@ impl UpdateByFunction {
let ast = engine.compile(code).map_err(UserError::DocumentEditionCompilationError)?;
let context = match context {
Some(context) => {
Some(serde_json::from_value(context.into()).map_err(InternalError::SerdeJson)?)
}
Some(context) => Some(
serde_json::from_value(context.into()).map_err(InternalError::SerdeJson).unwrap(),
),
None => None,
};
@@ -137,9 +137,11 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> {
Some(new_rhai_document) => {
let mut buffer = bumpalo::collections::Vec::new_in(doc_alloc);
serde_json::to_writer(&mut buffer, &new_rhai_document)
.map_err(InternalError::SerdeJson)?;
.map_err(InternalError::SerdeJson)
.unwrap();
let raw_new_doc = serde_json::from_slice(buffer.into_bump_slice())
.map_err(InternalError::SerdeJson)?;
.map_err(InternalError::SerdeJson)
.unwrap();
// Note: This condition is not perfect. Sometimes it detect changes
// like with floating points numbers and consider updating
@@ -166,7 +168,8 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> {
FxBuildHasher,
doc_alloc,
)
.map_err(InternalError::SerdeJson)?;
.map_err(InternalError::SerdeJson)
.unwrap();
Ok(Some(DocumentChange::Update(Update::create(
docid,
@@ -200,7 +203,7 @@ fn obkv_to_rhaimap(obkv: &KvReaderFieldId, fields_ids_map: &FieldsIdsMap) -> Res
field_id: id,
process: "all_obkv_to_rhaimap",
})?;
let value = serde_json::from_slice(value).map_err(InternalError::SerdeJson)?;
let value = serde_json::from_slice(value).map_err(InternalError::SerdeJson).unwrap();
Ok((name.into(), value))
})
.collect();

View File

@@ -105,7 +105,8 @@ impl<'t> VectorDocumentFromDb<'t> {
let vectors_field = match vectors {
Some(vectors) => Some(
RawMap::from_raw_value_and_hasher(vectors, FxBuildHasher, doc_alloc)
.map_err(InternalError::SerdeJson)?,
.map_err(InternalError::SerdeJson)
.unwrap(),
),
None => None,
};

View File

@@ -98,20 +98,14 @@ impl Embedder {
text_chunks: Vec<Vec<String>>,
threads: &ThreadPoolNoAbort,
) -> Result<Vec<Vec<Embedding>>, EmbedError> {
// This condition helps reduce the number of active rayon jobs
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
if threads.active_operations() >= REQUEST_PARALLELISM {
text_chunks.into_iter().map(move |chunk| self.embed(&chunk, None)).collect()
} else {
threads
.install(move || {
text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect()
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
}
threads
.install(move || {
text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect()
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
}
pub(crate) fn embed_chunks_ref(
@@ -119,8 +113,6 @@ impl Embedder {
texts: &[&str],
threads: &ThreadPoolNoAbort,
) -> Result<Vec<Vec<f32>>, EmbedError> {
// This condition helps reduce the number of active rayon jobs
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
if threads.active_operations() >= REQUEST_PARALLELISM {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.chunks(self.prompt_count_in_chunk_hint())

View File

@@ -255,20 +255,14 @@ impl Embedder {
text_chunks: Vec<Vec<String>>,
threads: &ThreadPoolNoAbort,
) -> Result<Vec<Vec<Embedding>>, EmbedError> {
// This condition helps reduce the number of active rayon jobs
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
if threads.active_operations() >= REQUEST_PARALLELISM {
text_chunks.into_iter().map(move |chunk| self.embed(&chunk, None)).collect()
} else {
threads
.install(move || {
text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect()
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
}
threads
.install(move || {
text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect()
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
}
pub(crate) fn embed_chunks_ref(
@@ -276,8 +270,6 @@ impl Embedder {
texts: &[&str],
threads: &ThreadPoolNoAbort,
) -> Result<Vec<Vec<f32>>, EmbedError> {
// This condition helps reduce the number of active rayon jobs
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
if threads.active_operations() >= REQUEST_PARALLELISM {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.chunks(self.prompt_count_in_chunk_hint())

View File

@@ -188,20 +188,14 @@ impl Embedder {
text_chunks: Vec<Vec<String>>,
threads: &ThreadPoolNoAbort,
) -> Result<Vec<Vec<Embedding>>, EmbedError> {
// This condition helps reduce the number of active rayon jobs
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
if threads.active_operations() >= REQUEST_PARALLELISM {
text_chunks.into_iter().map(move |chunk| self.embed(chunk, None)).collect()
} else {
threads
.install(move || {
text_chunks.into_par_iter().map(move |chunk| self.embed(chunk, None)).collect()
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
}
threads
.install(move || {
text_chunks.into_par_iter().map(move |chunk| self.embed(chunk, None)).collect()
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
}
pub(crate) fn embed_chunks_ref(
@@ -209,8 +203,6 @@ impl Embedder {
texts: &[&str],
threads: &ThreadPoolNoAbort,
) -> Result<Vec<Embedding>, EmbedError> {
// This condition helps reduce the number of active rayon jobs
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
if threads.active_operations() >= REQUEST_PARALLELISM {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.chunks(self.prompt_count_in_chunk_hint())