Index the geo points

This commit is contained in:
Irevoire
2021-08-23 18:41:48 +02:00
committed by Tamo
parent 8d9c2c4425
commit 44d6b6ae9e
5 changed files with 84 additions and 4 deletions

View File

@ -0,0 +1,46 @@
use std::fs::File;
use std::io;
use concat_arrays::concat_arrays;
use log::warn;
use serde_json::Value;
use super::helpers::{create_writer, writer_into_reader, GrenadParameters};
use crate::{FieldId, InternalError, Result};
/// Extracts the geographical coordinates contained in each document under the `_geo` field.
///
/// Returns the generated grenad reader containing the docid as key associated to the (latitude, longitude)
pub fn extract_geo_points<R: io::Read>(
mut obkv_documents: grenad::Reader<R>,
indexer: GrenadParameters,
geo_field_id: Option<FieldId>, // faire un grenad vide
) -> Result<grenad::Reader<File>> {
let mut writer = tempfile::tempfile().and_then(|file| {
create_writer(indexer.chunk_compression_type, indexer.chunk_compression_level, file)
})?;
// we never encountered any documents with a `_geo` field. We can skip entirely this step
if geo_field_id.is_none() {
return Ok(writer_into_reader(writer)?);
}
let geo_field_id = geo_field_id.unwrap();
while let Some((docid_bytes, value)) = obkv_documents.next()? {
let obkv = obkv::KvReader::new(value);
let point = obkv.get(geo_field_id).unwrap(); // TODO: TAMO where should we handle this error?
let point: Value = serde_json::from_slice(point).map_err(InternalError::SerdeJson)?;
if let Some((lat, long)) = point["lat"].as_f64().zip(point["long"].as_f64()) {
// this will create an array of 16 bytes (two 8 bytes floats)
let bytes: [u8; 16] = concat_arrays![lat.to_le_bytes(), long.to_le_bytes()];
writer.insert(docid_bytes, bytes)?;
} else {
// TAMO: improve the warn
warn!("Malformed `_geo` field");
continue;
}
}
Ok(writer_into_reader(writer)?)
}

View File

@ -3,6 +3,7 @@ mod extract_facet_number_docids;
mod extract_facet_string_docids;
mod extract_fid_docid_facet_values;
mod extract_fid_word_count_docids;
mod extract_geo_points;
mod extract_word_docids;
mod extract_word_level_position_docids;
mod extract_word_pair_proximity_docids;
@ -19,6 +20,7 @@ use self::extract_facet_number_docids::extract_facet_number_docids;
use self::extract_facet_string_docids::extract_facet_string_docids;
use self::extract_fid_docid_facet_values::extract_fid_docid_facet_values;
use self::extract_fid_word_count_docids::extract_fid_word_count_docids;
use self::extract_geo_points::extract_geo_points;
use self::extract_word_docids::extract_word_docids;
use self::extract_word_level_position_docids::extract_word_level_position_docids;
use self::extract_word_pair_proximity_docids::extract_word_pair_proximity_docids;
@ -37,6 +39,7 @@ pub(crate) fn data_from_obkv_documents(
lmdb_writer_sx: Sender<Result<TypedChunk>>,
searchable_fields: Option<HashSet<FieldId>>,
faceted_fields: HashSet<FieldId>,
geo_field_id: Option<FieldId>,
stop_words: Option<fst::Set<&[u8]>>,
) -> Result<()> {
let result: Result<(Vec<_>, (Vec<_>, Vec<_>))> = obkv_chunks
@ -54,7 +57,7 @@ pub(crate) fn data_from_obkv_documents(
.collect();
let (
docid_word_positions_chunks,
(docid_word_positions_chunks),
(docid_fid_facet_numbers_chunks, docid_fid_facet_strings_chunks),
) = result?;
@ -118,6 +121,16 @@ pub(crate) fn data_from_obkv_documents(
"field-id-facet-number-docids",
);
spawn_extraction_task(
documents_chunk,
indexer.clone(),
lmdb_writer_sx.clone(),
move |documents, indexer| extract_geo_points(documents, indexer, geo_field_id),
merge_cbo_roaring_bitmaps,
TypedChunk::GeoPoints,
"geo-points",
);
Ok(())
}