mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-09-06 04:36:32 +00:00
add an extractor for cellulite in the new pipeline
This commit is contained in:
184
Cargo.lock
generated
184
Cargo.lock
generated
@ -421,6 +421,15 @@ version = "0.13.0"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c"
|
checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "approx"
|
||||||
|
version = "0.5.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "cab112f0a86d568ea0e627cc1d6be74a1e9cd55214684db5561995f6dad897c6"
|
||||||
|
dependencies = [
|
||||||
|
"num-traits",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "arbitrary"
|
name = "arbitrary"
|
||||||
version = "1.4.1"
|
version = "1.4.1"
|
||||||
@ -1053,6 +1062,21 @@ dependencies = [
|
|||||||
"smallvec",
|
"smallvec",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "cellulite"
|
||||||
|
version = "0.1.0"
|
||||||
|
source = "git+https://github.com/irevoire/cellulite?branch=main#3654348942bdc67393e9a7b3c655f7e9a98dee50"
|
||||||
|
dependencies = [
|
||||||
|
"geo",
|
||||||
|
"geo-types",
|
||||||
|
"geojson",
|
||||||
|
"h3o",
|
||||||
|
"heed",
|
||||||
|
"ordered-float 5.0.0",
|
||||||
|
"roaring",
|
||||||
|
"thiserror 2.0.12",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "cexpr"
|
name = "cexpr"
|
||||||
version = "0.6.0"
|
version = "0.6.0"
|
||||||
@ -1815,6 +1839,16 @@ dependencies = [
|
|||||||
"bytemuck",
|
"bytemuck",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "earcutr"
|
||||||
|
version = "0.4.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "79127ed59a85d7687c409e9978547cffb7dc79675355ed22da6b66fd5f6ead01"
|
||||||
|
dependencies = [
|
||||||
|
"itertools 0.11.0",
|
||||||
|
"num-traits",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "either"
|
name = "either"
|
||||||
version = "1.15.0"
|
version = "1.15.0"
|
||||||
@ -2055,6 +2089,18 @@ dependencies = [
|
|||||||
"serde_json",
|
"serde_json",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "float_eq"
|
||||||
|
version = "1.0.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "28a80e3145d8ad11ba0995949bbcf48b9df2be62772b3d351ef017dff6ecb853"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "float_next_after"
|
||||||
|
version = "1.0.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "flume"
|
name = "flume"
|
||||||
version = "0.11.1"
|
version = "0.11.1"
|
||||||
@ -2477,6 +2523,59 @@ dependencies = [
|
|||||||
"version_check",
|
"version_check",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "geo"
|
||||||
|
version = "0.30.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "4416397671d8997e9a3e7ad99714f4f00a22e9eaa9b966a5985d2194fc9e02e1"
|
||||||
|
dependencies = [
|
||||||
|
"earcutr",
|
||||||
|
"float_next_after",
|
||||||
|
"geo-types",
|
||||||
|
"geographiclib-rs",
|
||||||
|
"i_overlay",
|
||||||
|
"log",
|
||||||
|
"num-traits",
|
||||||
|
"robust",
|
||||||
|
"rstar",
|
||||||
|
"spade",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "geo-types"
|
||||||
|
version = "0.7.16"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "62ddb1950450d67efee2bbc5e429c68d052a822de3aad010d28b351fbb705224"
|
||||||
|
dependencies = [
|
||||||
|
"approx",
|
||||||
|
"num-traits",
|
||||||
|
"rayon",
|
||||||
|
"rstar",
|
||||||
|
"serde",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "geographiclib-rs"
|
||||||
|
version = "0.2.5"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f611040a2bb37eaa29a78a128d1e92a378a03e0b6e66ae27398d42b1ba9a7841"
|
||||||
|
dependencies = [
|
||||||
|
"libm",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "geojson"
|
||||||
|
version = "0.24.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "e26f3c45b36fccc9cf2805e61d4da6bc4bbd5a3a9589b01afa3a40eff703bd79"
|
||||||
|
dependencies = [
|
||||||
|
"geo-types",
|
||||||
|
"log",
|
||||||
|
"serde",
|
||||||
|
"serde_json",
|
||||||
|
"thiserror 2.0.12",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "geoutils"
|
name = "geoutils"
|
||||||
version = "0.5.1"
|
version = "0.5.1"
|
||||||
@ -2586,6 +2685,26 @@ dependencies = [
|
|||||||
"tracing",
|
"tracing",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "h3o"
|
||||||
|
version = "0.8.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "fd8f6bbd82fcf88ec958095a97201044bc36307ad0ac3dba72106c973e8873a9"
|
||||||
|
dependencies = [
|
||||||
|
"ahash 0.8.12",
|
||||||
|
"either",
|
||||||
|
"float_eq",
|
||||||
|
"geo",
|
||||||
|
"h3o-bit",
|
||||||
|
"libm",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "h3o-bit"
|
||||||
|
version = "0.1.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "6fb45e8060378c0353781abf67e1917b545a6b710d0342d85b70c125af7ef320"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "half"
|
name = "half"
|
||||||
version = "2.6.0"
|
version = "2.6.0"
|
||||||
@ -2670,6 +2789,7 @@ dependencies = [
|
|||||||
"lmdb-master-sys",
|
"lmdb-master-sys",
|
||||||
"once_cell",
|
"once_cell",
|
||||||
"page_size",
|
"page_size",
|
||||||
|
"serde",
|
||||||
"synchronoise",
|
"synchronoise",
|
||||||
"url",
|
"url",
|
||||||
]
|
]
|
||||||
@ -2850,6 +2970,50 @@ dependencies = [
|
|||||||
"tracing",
|
"tracing",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "i_float"
|
||||||
|
version = "1.7.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "85df3a416829bb955fdc2416c7b73680c8dcea8d731f2c7aa23e1042fe1b8343"
|
||||||
|
dependencies = [
|
||||||
|
"serde",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "i_key_sort"
|
||||||
|
version = "0.2.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "347c253b4748a1a28baf94c9ce133b6b166f08573157e05afe718812bc599fcd"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "i_overlay"
|
||||||
|
version = "2.0.5"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "0542dfef184afdd42174a03dcc0625b6147fb73e1b974b1a08a2a42ac35cee49"
|
||||||
|
dependencies = [
|
||||||
|
"i_float",
|
||||||
|
"i_key_sort",
|
||||||
|
"i_shape",
|
||||||
|
"i_tree",
|
||||||
|
"rayon",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "i_shape"
|
||||||
|
version = "1.7.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "0a38f5a42678726718ff924f6d4a0e79b129776aeed298f71de4ceedbd091bce"
|
||||||
|
dependencies = [
|
||||||
|
"i_float",
|
||||||
|
"serde",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "i_tree"
|
||||||
|
version = "0.8.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "155181bc97d770181cf9477da51218a19ee92a8e5be642e796661aee2b601139"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "icu_collections"
|
name = "icu_collections"
|
||||||
version = "2.0.0"
|
version = "2.0.0"
|
||||||
@ -3934,6 +4098,7 @@ dependencies = [
|
|||||||
"candle-core",
|
"candle-core",
|
||||||
"candle-nn",
|
"candle-nn",
|
||||||
"candle-transformers",
|
"candle-transformers",
|
||||||
|
"cellulite",
|
||||||
"charabia",
|
"charabia",
|
||||||
"concat-arrays",
|
"concat-arrays",
|
||||||
"convert_case 0.8.0",
|
"convert_case 0.8.0",
|
||||||
@ -3947,6 +4112,7 @@ dependencies = [
|
|||||||
"flume",
|
"flume",
|
||||||
"fst",
|
"fst",
|
||||||
"fxhash",
|
"fxhash",
|
||||||
|
"geojson",
|
||||||
"geoutils",
|
"geoutils",
|
||||||
"grenad",
|
"grenad",
|
||||||
"hashbrown 0.15.4",
|
"hashbrown 0.15.4",
|
||||||
@ -5221,6 +5387,12 @@ dependencies = [
|
|||||||
"serde",
|
"serde",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "robust"
|
||||||
|
version = "1.2.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "4e27ee8bb91ca0adcf0ecb116293afa12d393f9c2b9b9cd54d33e8078fe19839"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "rstar"
|
name = "rstar"
|
||||||
version = "0.12.2"
|
version = "0.12.2"
|
||||||
@ -5753,6 +5925,18 @@ dependencies = [
|
|||||||
"winapi",
|
"winapi",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "spade"
|
||||||
|
version = "2.14.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "a14e31a007e9f85c32784b04f89e6e194bb252a4d41b4a8ccd9e77245d901c8c"
|
||||||
|
dependencies = [
|
||||||
|
"hashbrown 0.15.4",
|
||||||
|
"num-traits",
|
||||||
|
"robust",
|
||||||
|
"smallvec",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "spin"
|
name = "spin"
|
||||||
version = "0.5.2"
|
version = "0.5.2"
|
||||||
|
@ -10,7 +10,9 @@ use serde_json::value::RawValue;
|
|||||||
|
|
||||||
use super::vector_document::VectorDocument;
|
use super::vector_document::VectorDocument;
|
||||||
use super::{KvReaderFieldId, KvWriterFieldId};
|
use super::{KvReaderFieldId, KvWriterFieldId};
|
||||||
use crate::constants::{RESERVED_GEO_FIELD_NAME, RESERVED_VECTORS_FIELD_NAME};
|
use crate::constants::{
|
||||||
|
RESERVED_GEOJSON_FIELD_NAME, RESERVED_GEO_FIELD_NAME, RESERVED_VECTORS_FIELD_NAME,
|
||||||
|
};
|
||||||
use crate::documents::FieldIdMapper;
|
use crate::documents::FieldIdMapper;
|
||||||
use crate::update::del_add::KvReaderDelAdd;
|
use crate::update::del_add::KvReaderDelAdd;
|
||||||
use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal};
|
use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal};
|
||||||
@ -26,6 +28,7 @@ use crate::{
|
|||||||
///
|
///
|
||||||
/// The 'doc lifetime is meant to live sufficiently for the document to be handled by the extractors.
|
/// The 'doc lifetime is meant to live sufficiently for the document to be handled by the extractors.
|
||||||
pub trait Document<'doc> {
|
pub trait Document<'doc> {
|
||||||
|
fn geojson_field(&self) -> Result<Option<&'doc RawValue>>;
|
||||||
/// Iterate over all **top-level** fields of the document, returning their name and raw JSON value.
|
/// Iterate over all **top-level** fields of the document, returning their name and raw JSON value.
|
||||||
///
|
///
|
||||||
/// - The returned values *may* contain nested fields.
|
/// - The returned values *may* contain nested fields.
|
||||||
@ -113,6 +116,10 @@ impl<'t, Mapper: FieldIdMapper> Document<'t> for DocumentFromDb<'t, Mapper> {
|
|||||||
self.field(RESERVED_GEO_FIELD_NAME)
|
self.field(RESERVED_GEO_FIELD_NAME)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn geojson_field(&self) -> Result<Option<&'t RawValue>> {
|
||||||
|
self.field(RESERVED_GEOJSON_FIELD_NAME)
|
||||||
|
}
|
||||||
|
|
||||||
fn top_level_fields_count(&self) -> usize {
|
fn top_level_fields_count(&self) -> usize {
|
||||||
let has_vectors_field = self.vectors_field().unwrap_or(None).is_some();
|
let has_vectors_field = self.vectors_field().unwrap_or(None).is_some();
|
||||||
let has_geo_field = self.geo_field().unwrap_or(None).is_some();
|
let has_geo_field = self.geo_field().unwrap_or(None).is_some();
|
||||||
@ -177,6 +184,10 @@ impl<'doc> Document<'doc> for DocumentFromVersions<'_, 'doc> {
|
|||||||
Ok(self.versions.geo_field())
|
Ok(self.versions.geo_field())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn geojson_field(&self) -> Result<Option<&'doc RawValue>> {
|
||||||
|
Ok(self.versions.geojson_field())
|
||||||
|
}
|
||||||
|
|
||||||
fn top_level_fields_count(&self) -> usize {
|
fn top_level_fields_count(&self) -> usize {
|
||||||
let has_vectors_field = self.vectors_field().unwrap_or(None).is_some();
|
let has_vectors_field = self.vectors_field().unwrap_or(None).is_some();
|
||||||
let has_geo_field = self.geo_field().unwrap_or(None).is_some();
|
let has_geo_field = self.geo_field().unwrap_or(None).is_some();
|
||||||
@ -265,6 +276,16 @@ impl<'d, 'doc: 'd, 't: 'd, Mapper: FieldIdMapper> Document<'d>
|
|||||||
db.geo_field()
|
db.geo_field()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn geojson_field(&self) -> Result<Option<&'d RawValue>> {
|
||||||
|
if let Some(geojson) = self.new_doc.geojson_field()? {
|
||||||
|
return Ok(Some(geojson));
|
||||||
|
}
|
||||||
|
|
||||||
|
let Some(db) = self.db else { return Ok(None) };
|
||||||
|
|
||||||
|
db.geojson_field()
|
||||||
|
}
|
||||||
|
|
||||||
fn top_level_fields_count(&self) -> usize {
|
fn top_level_fields_count(&self) -> usize {
|
||||||
self.iter_top_level_fields().count()
|
self.iter_top_level_fields().count()
|
||||||
}
|
}
|
||||||
@ -296,6 +317,10 @@ where
|
|||||||
D::geo_field(self)
|
D::geo_field(self)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn geojson_field(&self) -> Result<Option<&'doc RawValue>> {
|
||||||
|
D::geojson_field(self)
|
||||||
|
}
|
||||||
|
|
||||||
fn top_level_fields_count(&self) -> usize {
|
fn top_level_fields_count(&self) -> usize {
|
||||||
D::top_level_fields_count(self)
|
D::top_level_fields_count(self)
|
||||||
}
|
}
|
||||||
@ -454,6 +479,10 @@ impl<'doc> Versions<'doc> {
|
|||||||
self.data.get(RESERVED_GEO_FIELD_NAME)
|
self.data.get(RESERVED_GEO_FIELD_NAME)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn geojson_field(&self) -> Option<&'doc RawValue> {
|
||||||
|
self.data.get(RESERVED_GEOJSON_FIELD_NAME)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn len(&self) -> usize {
|
pub fn len(&self) -> usize {
|
||||||
self.data.len()
|
self.data.len()
|
||||||
}
|
}
|
||||||
@ -572,6 +601,10 @@ impl<'a, Mapper: FieldIdMapper> Document<'a> for KvDelAddDocument<'a, Mapper> {
|
|||||||
fn geo_field(&self) -> Result<Option<&'a RawValue>> {
|
fn geo_field(&self) -> Result<Option<&'a RawValue>> {
|
||||||
self.get(RESERVED_GEO_FIELD_NAME)
|
self.get(RESERVED_GEO_FIELD_NAME)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn geojson_field(&self) -> Result<Option<&'a RawValue>> {
|
||||||
|
self.get(RESERVED_GEOJSON_FIELD_NAME)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct DocumentIdentifiers<'doc> {
|
pub struct DocumentIdentifiers<'doc> {
|
||||||
|
336
crates/milli/src/update/new/extract/geo/cellulite.rs
Normal file
336
crates/milli/src/update/new/extract/geo/cellulite.rs
Normal file
@ -0,0 +1,336 @@
|
|||||||
|
use std::cell::RefCell;
|
||||||
|
use std::fs::File;
|
||||||
|
use std::io::{self, BufReader, BufWriter, ErrorKind, Read, Seek as _, Write as _};
|
||||||
|
use std::str::FromStr;
|
||||||
|
use std::{iter, mem, result};
|
||||||
|
|
||||||
|
use bumpalo::Bump;
|
||||||
|
use bytemuck::{bytes_of, pod_read_unaligned, Pod, Zeroable};
|
||||||
|
use geojson::GeoJson;
|
||||||
|
use heed::RoTxn;
|
||||||
|
use serde_json::value::RawValue;
|
||||||
|
use serde_json::Value;
|
||||||
|
|
||||||
|
use crate::error::GeoError;
|
||||||
|
use crate::update::new::document::{Document, DocumentContext};
|
||||||
|
use crate::update::new::indexer::document_changes::Extractor;
|
||||||
|
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
||||||
|
use crate::update::new::thread_local::MostlySend;
|
||||||
|
use crate::update::new::DocumentChange;
|
||||||
|
use crate::update::GrenadParameters;
|
||||||
|
use crate::{lat_lng_to_xyz, DocumentId, GeoPoint, Index, InternalError, Result, UserError};
|
||||||
|
|
||||||
|
pub struct GeoJsonExtractor {
|
||||||
|
grenad_parameters: GrenadParameters,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl GeoJsonExtractor {
|
||||||
|
pub fn new(
|
||||||
|
rtxn: &RoTxn,
|
||||||
|
index: &Index,
|
||||||
|
grenad_parameters: GrenadParameters,
|
||||||
|
) -> Result<Option<Self>> {
|
||||||
|
if index.is_geojson_enabled(rtxn)? {
|
||||||
|
Ok(Some(GeoJsonExtractor { grenad_parameters }))
|
||||||
|
} else {
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
#[derive(Pod, Zeroable, Copy, Clone)]
|
||||||
|
#[repr(C, packed)]
|
||||||
|
pub struct ExtractedGeoPoint {
|
||||||
|
pub docid: DocumentId,
|
||||||
|
pub lat_lng: [f64; 2],
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<ExtractedGeoPoint> for GeoPoint {
|
||||||
|
/// Converts the latitude and longitude back to an xyz GeoPoint.
|
||||||
|
fn from(value: ExtractedGeoPoint) -> Self {
|
||||||
|
let [lat, lng] = value.lat_lng;
|
||||||
|
let point = [lat, lng];
|
||||||
|
let xyz_point = lat_lng_to_xyz(&point);
|
||||||
|
GeoPoint::new(xyz_point, (value.docid, point))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
pub struct GeoJsonExtractorData<'extractor> {
|
||||||
|
/// The set of documents ids that were removed. If a document sees its geo
|
||||||
|
/// point being updated, we first put it in the deleted and then in the inserted.
|
||||||
|
removed: bumpalo::collections::Vec<'extractor, GeoJson>,
|
||||||
|
inserted: bumpalo::collections::Vec<'extractor, GeoJson>,
|
||||||
|
/// Contains a packed list of `ExtractedGeoPoint` of the inserted geo points
|
||||||
|
/// data structures if we have spilled to disk.
|
||||||
|
spilled_removed: Option<BufWriter<File>>,
|
||||||
|
/// Contains a packed list of `ExtractedGeoPoint` of the inserted geo points
|
||||||
|
/// data structures if we have spilled to disk.
|
||||||
|
spilled_inserted: Option<BufWriter<File>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'extractor> GeoJsonExtractorData<'extractor> {
|
||||||
|
pub fn freeze(self) -> Result<FrozenGeoJsonExtractorData<'extractor>> {
|
||||||
|
let GeoJsonExtractorData { removed, inserted, spilled_removed, spilled_inserted } = self;
|
||||||
|
|
||||||
|
Ok(FrozenGeoJsonExtractorData {
|
||||||
|
removed: removed.into_bump_slice(),
|
||||||
|
inserted: inserted.into_bump_slice(),
|
||||||
|
spilled_removed: spilled_removed
|
||||||
|
.map(|bw| bw.into_inner().map(BufReader::new).map_err(|iie| iie.into_error()))
|
||||||
|
.transpose()?,
|
||||||
|
spilled_inserted: spilled_inserted
|
||||||
|
.map(|bw| bw.into_inner().map(BufReader::new).map_err(|iie| iie.into_error()))
|
||||||
|
.transpose()?,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe impl MostlySend for GeoJsonExtractorData<'_> {}
|
||||||
|
|
||||||
|
pub struct FrozenGeoJsonExtractorData<'extractor> {
|
||||||
|
pub removed: &'extractor [GeoJson],
|
||||||
|
pub inserted: &'extractor [GeoJson],
|
||||||
|
pub spilled_removed: Option<BufReader<File>>,
|
||||||
|
pub spilled_inserted: Option<BufReader<File>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FrozenGeoJsonExtractorData<'_> {
|
||||||
|
pub fn iter_and_clear_removed(
|
||||||
|
&mut self,
|
||||||
|
) -> io::Result<impl IntoIterator<Item = Result<GeoJson, serde_json::Error>> + '_> {
|
||||||
|
Ok(mem::take(&mut self.removed)
|
||||||
|
.iter()
|
||||||
|
.cloned()
|
||||||
|
.map(Ok)
|
||||||
|
.chain(iterator_over_spilled_geojsons(&mut self.spilled_removed)?))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn iter_and_clear_inserted(
|
||||||
|
&mut self,
|
||||||
|
) -> io::Result<impl IntoIterator<Item = Result<GeoJson, serde_json::Error>> + '_> {
|
||||||
|
Ok(mem::take(&mut self.inserted)
|
||||||
|
.iter()
|
||||||
|
.cloned()
|
||||||
|
.map(Ok)
|
||||||
|
.chain(iterator_over_spilled_geojsons(&mut self.spilled_inserted)?))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn iterator_over_spilled_geojsons(
|
||||||
|
spilled: &mut Option<BufReader<File>>,
|
||||||
|
) -> io::Result<impl IntoIterator<Item = Result<GeoJson, serde_json::Error>> + '_> {
|
||||||
|
let mut spilled = spilled.take();
|
||||||
|
if let Some(spilled) = &mut spilled {
|
||||||
|
spilled.rewind()?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(iter::from_fn(move || match &mut spilled {
|
||||||
|
Some(file) => {
|
||||||
|
match GeoJson::from_reader(file) {
|
||||||
|
Ok(geojson) => Some(Ok(geojson)),
|
||||||
|
Err(e) if e.is_eof() => None,
|
||||||
|
Err(e) => Some(Err(e)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => None,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'extractor> Extractor<'extractor> for GeoJsonExtractor {
|
||||||
|
type Data = RefCell<GeoJsonExtractorData<'extractor>>;
|
||||||
|
|
||||||
|
fn init_data<'doc>(&'doc self, extractor_alloc: &'extractor Bump) -> Result<Self::Data> {
|
||||||
|
Ok(RefCell::new(GeoJsonExtractorData {
|
||||||
|
removed: bumpalo::collections::Vec::new_in(extractor_alloc),
|
||||||
|
inserted: bumpalo::collections::Vec::new_in(extractor_alloc),
|
||||||
|
spilled_inserted: None,
|
||||||
|
spilled_removed: None,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn process<'doc>(
|
||||||
|
&'doc self,
|
||||||
|
changes: impl Iterator<Item = Result<DocumentChange<'doc>>>,
|
||||||
|
context: &'doc DocumentContext<Self::Data>,
|
||||||
|
) -> Result<()> {
|
||||||
|
let rtxn = &context.rtxn;
|
||||||
|
let index = context.index;
|
||||||
|
let max_memory = self.grenad_parameters.max_memory_by_thread();
|
||||||
|
let db_fields_ids_map = context.db_fields_ids_map;
|
||||||
|
let mut data_ref = context.data.borrow_mut_or_yield();
|
||||||
|
|
||||||
|
for change in changes {
|
||||||
|
if data_ref.spilled_removed.is_none()
|
||||||
|
&& max_memory.is_some_and(|mm| context.extractor_alloc.allocated_bytes() >= mm)
|
||||||
|
{
|
||||||
|
// We must spill as we allocated too much memory
|
||||||
|
data_ref.spilled_removed = tempfile::tempfile().map(BufWriter::new).map(Some)?;
|
||||||
|
data_ref.spilled_inserted = tempfile::tempfile().map(BufWriter::new).map(Some)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
match change? {
|
||||||
|
DocumentChange::Deletion(deletion) => {
|
||||||
|
let docid = deletion.docid();
|
||||||
|
let external_id = deletion.external_document_id();
|
||||||
|
let current = deletion.current(rtxn, index, db_fields_ids_map)?;
|
||||||
|
|
||||||
|
if let Some(geojson) = current.geojson_field()? {
|
||||||
|
match &mut data_ref.spilled_removed {
|
||||||
|
Some(file) => file.write_all(geojson.get().as_bytes())?,
|
||||||
|
None => data_ref.removed.push(
|
||||||
|
// TODO: The error type is wrong here. It should be an internal error.
|
||||||
|
GeoJson::from_str(geojson.get()).map_err(UserError::from)?,
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
DocumentChange::Update(update) => {
|
||||||
|
let current = update.current(rtxn, index, db_fields_ids_map)?;
|
||||||
|
let external_id = update.external_document_id();
|
||||||
|
let docid = update.docid();
|
||||||
|
|
||||||
|
let current_geo = current.geojson_field()?;
|
||||||
|
|
||||||
|
let updated_geo =
|
||||||
|
update.merged(rtxn, index, db_fields_ids_map)?.geojson_field()?;
|
||||||
|
|
||||||
|
if current_geo.map(|c| c.get()) != updated_geo.map(|u| u.get()) {
|
||||||
|
// If the current and new geo points are different it means that
|
||||||
|
// we need to replace the current by the new point and therefore
|
||||||
|
// delete the current point from cellulite.
|
||||||
|
if let Some(geojson) = current_geo {
|
||||||
|
match &mut data_ref.spilled_removed {
|
||||||
|
Some(file) => file.write_all(geojson.get().as_bytes())?,
|
||||||
|
// TODO: Should be an internal error
|
||||||
|
None => data_ref.removed.push(
|
||||||
|
GeoJson::from_str(geojson.get()).map_err(UserError::from)?,
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(geojson) = updated_geo {
|
||||||
|
match &mut data_ref.spilled_inserted {
|
||||||
|
Some(file) => file.write_all(geojson.get().as_bytes())?,
|
||||||
|
// TODO: Is the error type correct here? Shouldn't it be an internal error?
|
||||||
|
None => data_ref.inserted.push(
|
||||||
|
GeoJson::from_str(geojson.get()).map_err(UserError::from)?,
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
DocumentChange::Insertion(insertion) => {
|
||||||
|
let external_id = insertion.external_document_id();
|
||||||
|
let docid = insertion.docid();
|
||||||
|
|
||||||
|
let inserted_geo = insertion.inserted().geojson_field()?;
|
||||||
|
|
||||||
|
if let Some(geojson) = inserted_geo {
|
||||||
|
match &mut data_ref.spilled_inserted {
|
||||||
|
Some(file) => file.write_all(geojson.get().as_bytes())?,
|
||||||
|
// TODO: Is the error type correct here? Shouldn't it be an internal error?
|
||||||
|
None => data_ref.inserted.push(
|
||||||
|
GeoJson::from_str(geojson.get()).map_err(UserError::from)?,
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Extracts and validates the latitude and latitude from a document geo field.
|
||||||
|
///
|
||||||
|
/// It can be of the form `{ "lat": 0.0, "lng": "1.0" }`.
|
||||||
|
pub fn extract_geo_coordinates(
|
||||||
|
external_id: &str,
|
||||||
|
raw_value: &RawValue,
|
||||||
|
) -> Result<Option<[f64; 2]>> {
|
||||||
|
let mut geo = match serde_json::from_str(raw_value.get()).map_err(InternalError::SerdeJson)? {
|
||||||
|
Value::Null => return Ok(None),
|
||||||
|
Value::Object(map) => map,
|
||||||
|
value => {
|
||||||
|
return Err(Box::new(GeoError::NotAnObject {
|
||||||
|
document_id: Value::from(external_id),
|
||||||
|
value,
|
||||||
|
})
|
||||||
|
.into())
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let [lat, lng] = match (geo.remove("lat"), geo.remove("lng")) {
|
||||||
|
(Some(lat), Some(lng)) => {
|
||||||
|
if geo.is_empty() {
|
||||||
|
[lat, lng]
|
||||||
|
} else {
|
||||||
|
return Err(Box::new(GeoError::UnexpectedExtraFields {
|
||||||
|
document_id: Value::from(external_id),
|
||||||
|
value: Value::from(geo),
|
||||||
|
})
|
||||||
|
.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
(Some(_), None) => {
|
||||||
|
return Err(Box::new(GeoError::MissingLongitude {
|
||||||
|
document_id: Value::from(external_id),
|
||||||
|
})
|
||||||
|
.into())
|
||||||
|
}
|
||||||
|
(None, Some(_)) => {
|
||||||
|
return Err(Box::new(GeoError::MissingLatitude {
|
||||||
|
document_id: Value::from(external_id),
|
||||||
|
})
|
||||||
|
.into())
|
||||||
|
}
|
||||||
|
(None, None) => {
|
||||||
|
return Err(Box::new(GeoError::MissingLatitudeAndLongitude {
|
||||||
|
document_id: Value::from(external_id),
|
||||||
|
})
|
||||||
|
.into())
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
match (extract_finite_float_from_value(lat), extract_finite_float_from_value(lng)) {
|
||||||
|
(Ok(lat), Ok(lng)) => Ok(Some([lat, lng])),
|
||||||
|
(Ok(_), Err(value)) => {
|
||||||
|
Err(Box::new(GeoError::BadLongitude { document_id: Value::from(external_id), value })
|
||||||
|
.into())
|
||||||
|
}
|
||||||
|
(Err(value), Ok(_)) => {
|
||||||
|
Err(Box::new(GeoError::BadLatitude { document_id: Value::from(external_id), value })
|
||||||
|
.into())
|
||||||
|
}
|
||||||
|
(Err(lat), Err(lng)) => Err(Box::new(GeoError::BadLatitudeAndLongitude {
|
||||||
|
document_id: Value::from(external_id),
|
||||||
|
lat,
|
||||||
|
lng,
|
||||||
|
})
|
||||||
|
.into()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Extracts and validate that a serde JSON Value is actually a finite f64.
|
||||||
|
pub fn extract_finite_float_from_value(value: Value) -> result::Result<f64, Value> {
|
||||||
|
let number = match value {
|
||||||
|
Value::Number(ref n) => match n.as_f64() {
|
||||||
|
Some(number) => number,
|
||||||
|
None => return Err(value),
|
||||||
|
},
|
||||||
|
Value::String(ref s) => match s.parse::<f64>() {
|
||||||
|
Ok(number) => number,
|
||||||
|
Err(_) => return Err(value),
|
||||||
|
},
|
||||||
|
value => return Err(value),
|
||||||
|
};
|
||||||
|
|
||||||
|
if number.is_finite() {
|
||||||
|
Ok(number)
|
||||||
|
} else {
|
||||||
|
Err(value)
|
||||||
|
}
|
||||||
|
}
|
@ -18,6 +18,8 @@ use crate::update::new::DocumentChange;
|
|||||||
use crate::update::GrenadParameters;
|
use crate::update::GrenadParameters;
|
||||||
use crate::{lat_lng_to_xyz, DocumentId, GeoPoint, Index, InternalError, Result};
|
use crate::{lat_lng_to_xyz, DocumentId, GeoPoint, Index, InternalError, Result};
|
||||||
|
|
||||||
|
mod cellulite;
|
||||||
|
|
||||||
pub struct GeoExtractor {
|
pub struct GeoExtractor {
|
||||||
grenad_parameters: GrenadParameters,
|
grenad_parameters: GrenadParameters,
|
||||||
}
|
}
|
||||||
|
@ -630,7 +630,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
|
|||||||
|
|
||||||
settings_delta.try_for_each_fragment_diff(
|
settings_delta.try_for_each_fragment_diff(
|
||||||
session.embedder_name(),
|
session.embedder_name(),
|
||||||
|fragment_diff| {
|
|fragment_diff| -> Result<()> {
|
||||||
let extractor = RequestFragmentExtractor::new(fragment_diff.new, doc_alloc)
|
let extractor = RequestFragmentExtractor::new(fragment_diff.new, doc_alloc)
|
||||||
.ignore_errors();
|
.ignore_errors();
|
||||||
let old = if full_reindex {
|
let old = if full_reindex {
|
||||||
|
@ -193,7 +193,7 @@ impl WordPrefixIntegerDocids {
|
|||||||
// We access this HashMap in parallel to compute the *union* of all
|
// We access this HashMap in parallel to compute the *union* of all
|
||||||
// of them and *serialize* them into files. There is one file by CPU.
|
// of them and *serialize* them into files. There is one file by CPU.
|
||||||
let local_entries = ThreadLocal::with_capacity(rayon::current_num_threads());
|
let local_entries = ThreadLocal::with_capacity(rayon::current_num_threads());
|
||||||
prefixes.into_par_iter().map(AsRef::as_ref).try_for_each(|prefix| {
|
prefixes.into_par_iter().map(AsRef::as_ref).try_for_each(|prefix| -> Result<()> {
|
||||||
let refcell = local_entries.get_or(|| {
|
let refcell = local_entries.get_or(|| {
|
||||||
let file = BufWriter::new(spooled_tempfile(
|
let file = BufWriter::new(spooled_tempfile(
|
||||||
self.max_memory_by_thread.unwrap_or(usize::MAX),
|
self.max_memory_by_thread.unwrap_or(usize::MAX),
|
||||||
|
Reference in New Issue
Block a user