nested fields

This commit is contained in:
Irevoire
2022-03-23 17:28:41 +01:00
committed by Tamo
parent 4ae7aea3b2
commit 4f3ce6d9cd
22 changed files with 1197 additions and 367 deletions

View File

@ -2,7 +2,6 @@ use std::fs::File;
use std::io;
use concat_arrays::concat_arrays;
use serde_json::Value;
use super::helpers::{create_writer, writer_into_reader, GrenadParameters};
use crate::{FieldId, InternalError, Result, UserError};
@ -14,7 +13,7 @@ pub fn extract_geo_points<R: io::Read + io::Seek>(
obkv_documents: grenad::Reader<R>,
indexer: GrenadParameters,
primary_key_id: FieldId,
geo_field_id: FieldId,
(lat_fid, lng_fid): (FieldId, FieldId),
) -> Result<grenad::Reader<File>> {
let mut writer = create_writer(
indexer.chunk_compression_type,
@ -25,22 +24,18 @@ pub fn extract_geo_points<R: io::Read + io::Seek>(
let mut cursor = obkv_documents.into_cursor()?;
while let Some((docid_bytes, value)) = cursor.move_on_next()? {
let obkv = obkv::KvReader::new(value);
let point: Value = match obkv.get(geo_field_id) {
Some(point) => serde_json::from_slice(point).map_err(InternalError::SerdeJson)?,
None => continue,
};
if let Some((lat, lng)) = point["lat"].as_f64().zip(point["lng"].as_f64()) {
// this will create an array of 16 bytes (two 8 bytes floats)
let bytes: [u8; 16] = concat_arrays![lat.to_ne_bytes(), lng.to_ne_bytes()];
writer.insert(docid_bytes, bytes)?;
} else {
// All document must have a primary key so we can unwrap safely here
let (lat, lng) = obkv.get(lat_fid).zip(obkv.get(lng_fid)).ok_or_else(|| {
let primary_key = obkv.get(primary_key_id).unwrap();
let primary_key =
serde_json::from_slice(primary_key).map_err(InternalError::SerdeJson)?;
Err(UserError::InvalidGeoField { document_id: primary_key, object: point })?
}
let primary_key = serde_json::from_slice(primary_key).unwrap();
UserError::InvalidGeoField { document_id: primary_key }
})?;
let (lat, lng): (f64, f64) = (
serde_json::from_slice(lat).map_err(InternalError::SerdeJson)?,
serde_json::from_slice(lng).map_err(InternalError::SerdeJson)?,
);
let bytes: [u8; 16] = concat_arrays![lat.to_ne_bytes(), lng.to_ne_bytes()];
writer.insert(docid_bytes, bytes)?;
}
Ok(writer_into_reader(writer)?)

View File

@ -34,28 +34,36 @@ use crate::{FieldId, Result};
/// Extract data for each databases from obkv documents in parallel.
/// Send data in grenad file over provided Sender.
pub(crate) fn data_from_obkv_documents(
obkv_chunks: impl Iterator<Item = Result<grenad::Reader<File>>> + Send,
original_obkv_chunks: impl Iterator<Item = Result<grenad::Reader<File>>> + Send,
flattened_obkv_chunks: impl Iterator<Item = Result<grenad::Reader<File>>> + Send,
indexer: GrenadParameters,
lmdb_writer_sx: Sender<Result<TypedChunk>>,
searchable_fields: Option<HashSet<FieldId>>,
faceted_fields: HashSet<FieldId>,
primary_key_id: FieldId,
geo_field_id: Option<FieldId>,
geo_fields_ids: Option<(FieldId, FieldId)>,
stop_words: Option<fst::Set<&[u8]>>,
max_positions_per_attributes: Option<u32>,
exact_attributes: HashSet<FieldId>,
) -> Result<()> {
let result: Result<(Vec<_>, (Vec<_>, Vec<_>))> = obkv_chunks
original_obkv_chunks
.par_bridge()
.map(|result| {
extract_documents_data(
result,
.map(|original_documents_chunk| {
send_original_documents_data(original_documents_chunk, lmdb_writer_sx.clone())
})
.collect::<Result<()>>()?;
let result: Result<(Vec<_>, (Vec<_>, Vec<_>))> = flattened_obkv_chunks
.par_bridge()
.map(|flattened_obkv_chunks| {
send_and_extract_flattened_documents_data(
flattened_obkv_chunks,
indexer,
lmdb_writer_sx.clone(),
&searchable_fields,
&faceted_fields,
primary_key_id,
geo_field_id,
geo_fields_ids,
&stop_words,
max_positions_per_attributes,
)
@ -170,36 +178,48 @@ fn spawn_extraction_task<FE, FS, M>(
});
}
/// Extract chuncked data and send it into lmdb_writer_sx sender:
/// Extract chunked data and send it into lmdb_writer_sx sender:
/// - documents
fn send_original_documents_data(
original_documents_chunk: Result<grenad::Reader<File>>,
lmdb_writer_sx: Sender<Result<TypedChunk>>,
) -> Result<()> {
let original_documents_chunk =
original_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?;
// TODO: create a custom internal error
lmdb_writer_sx.send(Ok(TypedChunk::Documents(original_documents_chunk))).unwrap();
Ok(())
}
/// Extract chunked data and send it into lmdb_writer_sx sender:
/// - documents_ids
/// - docid_word_positions
/// - docid_fid_facet_numbers
/// - docid_fid_facet_strings
fn extract_documents_data(
documents_chunk: Result<grenad::Reader<File>>,
fn send_and_extract_flattened_documents_data(
flattened_documents_chunk: Result<grenad::Reader<File>>,
indexer: GrenadParameters,
lmdb_writer_sx: Sender<Result<TypedChunk>>,
searchable_fields: &Option<HashSet<FieldId>>,
faceted_fields: &HashSet<FieldId>,
primary_key_id: FieldId,
geo_field_id: Option<FieldId>,
geo_fields_ids: Option<(FieldId, FieldId)>,
stop_words: &Option<fst::Set<&[u8]>>,
max_positions_per_attributes: Option<u32>,
) -> Result<(
grenad::Reader<CursorClonableMmap>,
(grenad::Reader<CursorClonableMmap>, grenad::Reader<CursorClonableMmap>),
)> {
let documents_chunk = documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?;
let flattened_documents_chunk =
flattened_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?;
let _ = lmdb_writer_sx.send(Ok(TypedChunk::Documents(documents_chunk.clone())));
if let Some(geo_field_id) = geo_field_id {
let documents_chunk_cloned = documents_chunk.clone();
if let Some(geo_fields_ids) = geo_fields_ids {
let documents_chunk_cloned = flattened_documents_chunk.clone();
let lmdb_writer_sx_cloned = lmdb_writer_sx.clone();
rayon::spawn(move || {
let result =
extract_geo_points(documents_chunk_cloned, indexer, primary_key_id, geo_field_id);
extract_geo_points(documents_chunk_cloned, indexer, primary_key_id, geo_fields_ids);
let _ = match result {
Ok(geo_points) => lmdb_writer_sx_cloned.send(Ok(TypedChunk::GeoPoints(geo_points))),
Err(error) => lmdb_writer_sx_cloned.send(Err(error)),
@ -211,7 +231,7 @@ fn extract_documents_data(
rayon::join(
|| {
let (documents_ids, docid_word_positions_chunk) = extract_docid_word_positions(
documents_chunk.clone(),
flattened_documents_chunk.clone(),
indexer.clone(),
searchable_fields,
stop_words.as_ref(),
@ -232,7 +252,7 @@ fn extract_documents_data(
|| {
let (docid_fid_facet_numbers_chunk, docid_fid_facet_strings_chunk) =
extract_fid_docid_facet_values(
documents_chunk.clone(),
flattened_documents_chunk.clone(),
indexer.clone(),
faceted_fields,
)?;