386: fix obkv document r=curquiza a=MarinPostma

When serializing a document, the serializer resolved the field_id of the current field and immediately added it to the obkv document under construction. The issue with that is that obkv expects the fields to be inserted in order, and when a document with out of order fields was added, obkv failed to insert the field.

The current fix first resolves each field_id, and adds all the fields to a temporary `BTreeMap`, until `end` is called on the map serializer, where all the fields are added to the obkv at once, and in order.


Co-authored-by: mpostma <postma.marin@protonmail.com>
This commit is contained in:
bors[bot]
2021-10-11 13:45:04 +00:00
committed by GitHub
2 changed files with 23 additions and 6 deletions

View File

@@ -230,4 +230,12 @@ mod test {
let nested: Value = serde_json::from_slice(doc.get(0).unwrap()).unwrap(); let nested: Value = serde_json::from_slice(doc.get(0).unwrap()).unwrap();
assert_eq!(nested, json!({ "toto": ["hello"] })); assert_eq!(nested, json!({ "toto": ["hello"] }));
} }
#[test]
fn out_of_order_fields() {
let _documents = documents!([
{"id": 1,"b": 0},
{"id": 2,"a": 0,"b": 0},
]);
}
} }

View File

@@ -1,9 +1,12 @@
use std::collections::BTreeMap;
use std::convert::TryInto; use std::convert::TryInto;
use std::io::Cursor;
use std::{fmt, io}; use std::{fmt, io};
use byteorder::{BigEndian, WriteBytesExt}; use byteorder::{BigEndian, WriteBytesExt};
use obkv::KvWriter; use obkv::KvWriter;
use serde::ser::{Impossible, Serialize, SerializeMap, SerializeSeq, Serializer}; use serde::ser::{Impossible, Serialize, SerializeMap, SerializeSeq, Serializer};
use serde_json::Value;
use super::{ByteCounter, DocumentsBatchIndex, Error}; use super::{ByteCounter, DocumentsBatchIndex, Error};
use crate::FieldId; use crate::FieldId;
@@ -36,7 +39,7 @@ impl<'a, W: io::Write> Serializer for &'a mut DocumentSerializer<W> {
map: KvWriter::new(cursor), map: KvWriter::new(cursor),
index: &mut self.index, index: &mut self.index,
writer: &mut self.writer, writer: &mut self.writer,
buffer: Vec::new(), mapped_documents: BTreeMap::new(),
}; };
Ok(map_serializer) Ok(map_serializer)
@@ -226,7 +229,7 @@ pub struct MapSerializer<'a, W> {
map: KvWriter<io::Cursor<&'a mut Vec<u8>>, FieldId>, map: KvWriter<io::Cursor<&'a mut Vec<u8>>, FieldId>,
index: &'a mut DocumentsBatchIndex, index: &'a mut DocumentsBatchIndex,
writer: W, writer: W,
buffer: Vec<u8>, mapped_documents: BTreeMap<FieldId, Value>,
} }
/// This implementation of SerializeMap uses serilialize_entry instead of seriliaze_key and /// This implementation of SerializeMap uses serilialize_entry instead of seriliaze_key and
@@ -244,6 +247,14 @@ impl<'a, W: io::Write> SerializeMap for MapSerializer<'a, W> {
} }
fn end(mut self) -> Result<Self::Ok, Self::Error> { fn end(mut self) -> Result<Self::Ok, Self::Error> {
let mut buf = Vec::new();
for (key, value) in self.mapped_documents {
buf.clear();
let mut cursor = Cursor::new(&mut buf);
serde_json::to_writer(&mut cursor, &value).map_err(Error::JsonError)?;
self.map.insert(key, cursor.into_inner()).map_err(Error::Io)?;
}
let data = self.map.into_inner().map_err(Error::Io)?.into_inner(); let data = self.map.into_inner().map_err(Error::Io)?.into_inner();
let data_len: u32 = data.len().try_into().map_err(|_| Error::DocumentTooLarge)?; let data_len: u32 = data.len().try_into().map_err(|_| Error::DocumentTooLarge)?;
@@ -265,11 +276,9 @@ impl<'a, W: io::Write> SerializeMap for MapSerializer<'a, W> {
let field_serializer = FieldSerializer { index: &mut self.index }; let field_serializer = FieldSerializer { index: &mut self.index };
let field_id: FieldId = key.serialize(field_serializer)?; let field_id: FieldId = key.serialize(field_serializer)?;
self.buffer.clear(); let value = serde_json::to_value(value).map_err(Error::JsonError)?;
let mut cursor = io::Cursor::new(&mut self.buffer);
serde_json::to_writer(&mut cursor, value).map_err(Error::JsonError)?;
self.map.insert(field_id, cursor.into_inner()).map_err(Error::Io)?; self.mapped_documents.insert(field_id, value);
Ok(()) Ok(())
} }