Compare commits

...

18 Commits

Author SHA1 Message Date
Tamo
e7be4ca103 improve deletion 2025-07-24 18:32:57 +02:00
Tamo
c3d356c3a3 upgrade version of cellulite 2025-07-24 18:20:31 +02:00
Tamo
7db6d76851 remove useless log 2025-07-24 18:11:49 +02:00
Tamo
ebb4865b95 fix the cellulite spilling bug 2025-07-24 18:11:16 +02:00
Tamo
bad5406095 prepare for release 2025-07-23 18:21:26 +02:00
Tamo
4af3659179 fix the cellulite integration 2025-07-23 15:25:38 +02:00
Tamo
3a6505fcaa plug in the document deletion in cellulite 2025-07-22 16:41:43 +02:00
Tamo
e923154c90 update to the latest version of cellulite and steppe 2025-07-22 16:24:49 +02:00
Tamo
44dc64accb add the deletion in the new indexer 2025-07-17 18:54:37 +02:00
Tamo
d80edead01 fix the old indexer 2025-07-17 18:21:48 +02:00
Tamo
e510d4a8a3 add a new _geoPolygon filter to query the cellulite database 2025-07-16 00:50:03 +02:00
Tamo
05a13f662d fmt 2025-07-16 00:49:41 +02:00
Tamo
3f00f56f9f finish plugin cellulite to the new indexer 2025-07-16 00:10:40 +02:00
Tamo
a921ee31ce Cellulite is almost in the new indexer. We must add the documentID to the geojson pipeline 2025-07-15 23:48:14 +02:00
Tamo
b00a1dcc00 add an extractor for cellulite in the new pipeline 2025-07-15 23:16:08 +02:00
Tamo
14a93d65a4 add cellulite to the old pipeline, it probably doesn't works 2025-07-15 23:15:48 +02:00
Tamo
3dd4f0587d add a few helpers 2025-07-15 23:14:48 +02:00
Tamo
73eb64242d add cellulite to the index 2025-07-15 23:14:06 +02:00
34 changed files with 1886 additions and 40 deletions

313
Cargo.lock generated
View File

@@ -421,6 +421,15 @@ version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c"
[[package]]
name = "approx"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cab112f0a86d568ea0e627cc1d6be74a1e9cd55214684db5561995f6dad897c6"
dependencies = [
"num-traits",
]
[[package]]
name = "arbitrary"
version = "1.4.1"
@@ -458,7 +467,7 @@ dependencies = [
"page_size",
"rand 0.8.5",
"rayon",
"roaring",
"roaring 0.10.12",
"tempfile",
"thiserror 2.0.12",
"tracing",
@@ -595,7 +604,7 @@ dependencies = [
"rand 0.8.5",
"rand_chacha 0.3.1",
"reqwest",
"roaring",
"roaring 0.10.12",
"serde_json",
"tempfile",
]
@@ -1053,6 +1062,24 @@ dependencies = [
"smallvec",
]
[[package]]
name = "cellulite"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a736cdbd78849f818b36d7c62e6226145ea18aa53de1ec7fc1669631521e3142"
dependencies = [
"geo",
"geo-types",
"geojson",
"h3o",
"heed",
"ordered-float 5.0.0",
"roaring 0.11.1",
"steppe",
"thiserror 2.0.12",
"zerometry",
]
[[package]]
name = "cexpr"
version = "0.6.0"
@@ -1212,6 +1239,17 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990"
[[package]]
name = "colored_json"
version = "5.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e35980a1b846f8e3e359fd18099172a0857140ba9230affc4f71348081e039b6"
dependencies = [
"serde",
"serde_json",
"yansi",
]
[[package]]
name = "concat-arrays"
version = "0.1.2"
@@ -1785,7 +1823,7 @@ dependencies = [
"meilisearch-types",
"once_cell",
"regex",
"roaring",
"roaring 0.10.12",
"serde",
"serde_json",
"tar",
@@ -1815,6 +1853,16 @@ dependencies = [
"bytemuck",
]
[[package]]
name = "earcutr"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79127ed59a85d7687c409e9978547cffb7dc79675355ed22da6b66fd5f6ead01"
dependencies = [
"itertools 0.11.0",
"num-traits",
]
[[package]]
name = "either"
version = "1.15.0"
@@ -2055,6 +2103,18 @@ dependencies = [
"serde_json",
]
[[package]]
name = "float_eq"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28a80e3145d8ad11ba0995949bbcf48b9df2be62772b3d351ef017dff6ecb853"
[[package]]
name = "float_next_after"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8"
[[package]]
name = "flume"
version = "0.11.1"
@@ -2477,6 +2537,59 @@ dependencies = [
"version_check",
]
[[package]]
name = "geo"
version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4416397671d8997e9a3e7ad99714f4f00a22e9eaa9b966a5985d2194fc9e02e1"
dependencies = [
"earcutr",
"float_next_after",
"geo-types",
"geographiclib-rs",
"i_overlay",
"log",
"num-traits",
"robust",
"rstar",
"spade",
]
[[package]]
name = "geo-types"
version = "0.7.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62ddb1950450d67efee2bbc5e429c68d052a822de3aad010d28b351fbb705224"
dependencies = [
"approx",
"num-traits",
"rayon",
"rstar",
"serde",
]
[[package]]
name = "geographiclib-rs"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f611040a2bb37eaa29a78a128d1e92a378a03e0b6e66ae27398d42b1ba9a7841"
dependencies = [
"libm",
]
[[package]]
name = "geojson"
version = "0.24.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e26f3c45b36fccc9cf2805e61d4da6bc4bbd5a3a9589b01afa3a40eff703bd79"
dependencies = [
"geo-types",
"log",
"serde",
"serde_json",
"thiserror 2.0.12",
]
[[package]]
name = "geoutils"
version = "0.5.1"
@@ -2586,6 +2699,26 @@ dependencies = [
"tracing",
]
[[package]]
name = "h3o"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd8f6bbd82fcf88ec958095a97201044bc36307ad0ac3dba72106c973e8873a9"
dependencies = [
"ahash 0.8.12",
"either",
"float_eq",
"geo",
"h3o-bit",
"libm",
]
[[package]]
name = "h3o-bit"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fb45e8060378c0353781abf67e1917b545a6b710d0342d85b70c125af7ef320"
[[package]]
name = "half"
version = "2.6.0"
@@ -2670,6 +2803,7 @@ dependencies = [
"lmdb-master-sys",
"once_cell",
"page_size",
"serde",
"synchronoise",
"url",
]
@@ -2850,6 +2984,50 @@ dependencies = [
"tracing",
]
[[package]]
name = "i_float"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85df3a416829bb955fdc2416c7b73680c8dcea8d731f2c7aa23e1042fe1b8343"
dependencies = [
"serde",
]
[[package]]
name = "i_key_sort"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "347c253b4748a1a28baf94c9ce133b6b166f08573157e05afe718812bc599fcd"
[[package]]
name = "i_overlay"
version = "2.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0542dfef184afdd42174a03dcc0625b6147fb73e1b974b1a08a2a42ac35cee49"
dependencies = [
"i_float",
"i_key_sort",
"i_shape",
"i_tree",
"rayon",
]
[[package]]
name = "i_shape"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a38f5a42678726718ff924f6d4a0e79b129776aeed298f71de4ceedbd091bce"
dependencies = [
"i_float",
"serde",
]
[[package]]
name = "i_tree"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "155181bc97d770181cf9477da51218a19ee92a8e5be642e796661aee2b601139"
[[package]]
name = "icu_collections"
version = "2.0.0"
@@ -3020,7 +3198,7 @@ dependencies = [
"memmap2",
"page_size",
"rayon",
"roaring",
"roaring 0.10.12",
"serde",
"serde_json",
"synchronoise",
@@ -3034,9 +3212,9 @@ dependencies = [
[[package]]
name = "indexmap"
version = "2.9.0"
version = "2.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cea70ddb795996207ad57735b50c5982d8844f38ba9ee5f1aedcfb708a2aa11e"
checksum = "fe4cd85333e22411419a0bcae1297d25e58c9443848b11dc6a86fefe8c78a661"
dependencies = [
"equivalent",
"hashbrown 0.15.4",
@@ -3208,6 +3386,47 @@ dependencies = [
"regex",
]
[[package]]
name = "jiff"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be1f93b8b1eb69c77f24bbb0afdf66f54b632ee39af40ca21c4365a1d7347e49"
dependencies = [
"jiff-static",
"jiff-tzdb-platform",
"log",
"portable-atomic",
"portable-atomic-util",
"serde",
"windows-sys 0.59.0",
]
[[package]]
name = "jiff-static"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03343451ff899767262ec32146f6d559dd759fdadf42ff0e227c7c48f72594b4"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.101",
]
[[package]]
name = "jiff-tzdb"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1283705eb0a21404d2bfd6eef2a7593d240bc42a0bdb39db0ad6fa2ec026524"
[[package]]
name = "jiff-tzdb-platform"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "875a5a69ac2bab1a891711cf5eccbec1ce0341ea805560dcd90b7a2e925132e8"
dependencies = [
"jiff-tzdb",
]
[[package]]
name = "jobserver"
version = "0.1.33"
@@ -3791,7 +4010,7 @@ dependencies = [
"rayon",
"regex",
"reqwest",
"roaring",
"roaring 0.10.12",
"rustls",
"rustls-pemfile",
"rustls-pki-types",
@@ -3838,7 +4057,7 @@ dependencies = [
"maplit",
"meilisearch-types",
"rand 0.8.5",
"roaring",
"roaring 0.10.12",
"serde",
"serde_json",
"sha2",
@@ -3868,7 +4087,7 @@ dependencies = [
"meili-snap",
"memmap2",
"milli",
"roaring",
"roaring 0.10.12",
"rustc-hash 2.1.1",
"serde",
"serde-cs",
@@ -3934,6 +4153,7 @@ dependencies = [
"candle-core",
"candle-nn",
"candle-transformers",
"cellulite",
"charabia",
"concat-arrays",
"convert_case 0.8.0",
@@ -3947,6 +4167,8 @@ dependencies = [
"flume",
"fst",
"fxhash",
"geo-types",
"geojson",
"geoutils",
"grenad",
"hashbrown 0.15.4",
@@ -3971,7 +4193,7 @@ dependencies = [
"rand 0.8.5",
"rayon",
"rhai",
"roaring",
"roaring 0.10.12",
"rstar",
"rustc-hash 2.1.1",
"serde",
@@ -3980,6 +4202,7 @@ dependencies = [
"smallstr",
"smallvec",
"smartstring",
"steppe",
"tempfile",
"thiserror 2.0.12",
"thread_local",
@@ -3991,6 +4214,7 @@ dependencies = [
"url",
"utoipa",
"uuid",
"zerometry",
]
[[package]]
@@ -4660,6 +4884,15 @@ version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "350e9b48cbc6b0e028b0473b114454c6316e57336ee184ceab6e53f72c178b3e"
[[package]]
name = "portable-atomic-util"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507"
dependencies = [
"portable-atomic",
]
[[package]]
name = "potential_utf"
version = "0.1.2"
@@ -5221,6 +5454,22 @@ dependencies = [
"serde",
]
[[package]]
name = "roaring"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18f79304aff09c245934bb9558a215e53a4cfbbe6aa8ac2a79847be551264979"
dependencies = [
"bytemuck",
"byteorder",
]
[[package]]
name = "robust"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e27ee8bb91ca0adcf0ecb116293afa12d393f9c2b9b9cd54d33e8078fe19839"
[[package]]
name = "rstar"
version = "0.12.2"
@@ -5753,6 +6002,18 @@ dependencies = [
"winapi",
]
[[package]]
name = "spade"
version = "2.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a14e31a007e9f85c32784b04f89e6e194bb252a4d41b4a8ccd9e77245d901c8c"
dependencies = [
"hashbrown 0.15.4",
"num-traits",
"robust",
"smallvec",
]
[[package]]
name = "spin"
version = "0.5.2"
@@ -5803,6 +6064,20 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "steppe"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dead99cdf718f37bcd1d22dda9b498f35c5aa22894b755bfd94bf8c2daec9427"
dependencies = [
"colored_json",
"convert_case 0.8.0",
"indexmap",
"jiff",
"serde",
"serde_json",
]
[[package]]
name = "strsim"
version = "0.10.0"
@@ -7285,6 +7560,12 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aed111bd9e48a802518765906cbdadf0b45afb72b9c81ab049a3b86252adffdd"
[[package]]
name = "yansi"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cfe53a6657fd280eaa890a3bc59152892ffa3e30101319d168b781ed6529b049"
[[package]]
name = "yaup"
version = "0.3.1"
@@ -7405,6 +7686,18 @@ dependencies = [
"syn 2.0.101",
]
[[package]]
name = "zerometry"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "681f08f3f4ef27d3021a128eb6d8df1cd781e4c9c797c3971c1f85316374f977"
dependencies = [
"bytemuck",
"byteorder",
"geo",
"geo-types",
]
[[package]]
name = "zerotrie"
version = "0.2.2"

View File

@@ -19,6 +19,7 @@
//! word = (alphanumeric | _ | - | .)+
//! geoRadius = "_geoRadius(" WS* float WS* "," WS* float WS* "," float WS* ")"
//! geoBoundingBox = "_geoBoundingBox([" WS * float WS* "," WS* float WS* "], [" WS* float WS* "," WS* float WS* "]")
//! geoPolygon = "_geoPolygon([[" WS* float WS* "," WS* float WS* "],+])"
//! ```
//!
//! Other BNF grammar used to handle some specific errors:
@@ -145,6 +146,7 @@ pub enum FilterCondition<'a> {
And(Vec<Self>),
GeoLowerThan { point: [Token<'a>; 2], radius: Token<'a> },
GeoBoundingBox { top_right_point: [Token<'a>; 2], bottom_left_point: [Token<'a>; 2] },
GeoPolygon { points: Vec<[Token<'a>; 2]> },
}
pub enum TraversedElement<'a> {
@@ -175,6 +177,7 @@ impl<'a> FilterCondition<'a> {
}
FilterCondition::GeoLowerThan { .. }
| FilterCondition::GeoBoundingBox { .. }
| FilterCondition::GeoPolygon { .. }
| FilterCondition::In { .. } => None,
}
}
@@ -422,6 +425,38 @@ fn parse_geo_bounding_box(input: Span) -> IResult<FilterCondition> {
Ok((input, res))
}
/// geoPolygon = "_geoPolygon([[" WS* float WS* "," WS* float WS* "],+])"
/// If we parse `_geoPolygon` we MUST parse the rest of the expression.
fn parse_geo_polygon(input: Span) -> IResult<FilterCondition> {
// we want to allow space BEFORE the _geoBoundingBox but not after
let parsed = preceded(
tuple((multispace0, word_exact("_geoPolygon"))),
// if we were able to parse `_geoPolygon` and can't parse the rest of the input we return a failure
cut(delimited(
char('('),
separated_list1(
tag(","),
ws(delimited(char('['), separated_list1(tag(","), ws(recognize_float)), char(']'))),
),
char(')'),
)),
)(input)
.map_err(|e| e.map(|_| Error::new_from_kind(dbg!(input), ErrorKind::GeoBoundingBox)));
let (input, args) = parsed?;
// TODO: Return a more specific error
if args.len() <= 2 || args.iter().any(|a| a.len() != 2) {
println!("here");
return Err(nom::Err::Failure(Error::new_from_kind(input, ErrorKind::GeoBoundingBox)));
}
let res = FilterCondition::GeoPolygon {
points: args.into_iter().map(|a| [a[0].into(), a[1].into()]).collect(),
};
Ok((input, res))
}
/// geoPoint = WS* "_geoPoint(float WS* "," WS* float WS* "," WS* float)
fn parse_geo_point(input: Span) -> IResult<FilterCondition> {
// we want to forbid space BEFORE the _geoPoint but not after
@@ -491,8 +526,8 @@ fn parse_primary(input: Span, depth: usize) -> IResult<FilterCondition> {
Error::new_from_kind(input, ErrorKind::MissingClosingDelimiter(c.char()))
}),
),
parse_geo_radius,
parse_geo_bounding_box,
// Made a random block of functions because we reached the maximum number of elements per alt
alt((parse_geo_radius, parse_geo_bounding_box, parse_geo_polygon)),
parse_in,
parse_not_in,
parse_condition,
@@ -573,6 +608,13 @@ impl std::fmt::Display for FilterCondition<'_> {
bottom_right_point[1]
)
}
FilterCondition::GeoPolygon { points } => {
write!(f, "_geoPolygon([")?;
for point in points {
write!(f, "[{}, {}], ", point[0], point[1])?;
}
write!(f, "])")
}
}
}
}

View File

@@ -484,7 +484,9 @@ impl ErrorCode for milli::Error {
Code::InvalidFacetSearchFacetName
}
UserError::CriterionError(_) => Code::InvalidSettingsRankingRules,
UserError::InvalidGeoField { .. } => Code::InvalidDocumentGeoField,
UserError::InvalidGeoField { .. } | UserError::GeoJsonError(_) => {
Code::InvalidDocumentGeoField
}
UserError::InvalidVectorDimensions { .. }
| UserError::InvalidIndexingVectorDimensions { .. } => {
Code::InvalidVectorDimensions
@@ -507,6 +509,7 @@ impl ErrorCode for milli::Error {
| UserError::DocumentEditionCompilationError(_) => {
Code::EditDocumentsByFunctionError
}
UserError::CelluliteError(_) => Code::Internal,
}
}
}

View File

@@ -0,0 +1,547 @@
{
"type": "Polygon",
"coordinates": [
[
[
3.11681,
50.63646
],
[
3.11945,
50.63488
],
[
3.12134,
50.63504
],
[
3.12064,
50.63127
],
[
3.12203,
50.62785
],
[
3.12389,
50.6262
],
[
3.12161,
50.62358
],
[
3.12547,
50.62114
],
[
3.12447,
50.61874
],
[
3.12288,
50.61988
],
[
3.12054,
50.61846
],
[
3.11846,
50.61754
],
[
3.11482,
50.6207
],
[
3.11232,
50.6188
],
[
3.10936,
50.61727
],
[
3.10822,
50.61765
],
[
3.10603,
50.61536
],
[
3.1041,
50.61596
],
[
3.10017,
50.6186
],
[
3.09688,
50.61714
],
[
3.09575,
50.61795
],
[
3.0891,
50.61532
],
[
3.08625,
50.61792
],
[
3.07948,
50.61428
],
[
3.07146,
50.6066
],
[
3.06819,
50.60918
],
[
3.06502,
50.61046
],
[
3.06223,
50.61223
],
[
3.05925,
50.60659
],
[
3.05463,
50.60077
],
[
3.04906,
50.6008
],
[
3.04726,
50.6035
],
[
3.04328,
50.60667
],
[
3.04155,
50.60417
],
[
3.03767,
50.60456
],
[
3.03528,
50.60538
],
[
3.03239,
50.60725
],
[
3.0254,
50.6111
],
[
3.02387,
50.6125
],
[
3.0248,
50.61344
],
[
3.02779,
50.61418
],
[
3.02414,
50.6169
],
[
3.02312,
50.61975
],
[
3.02172,
50.62082
],
[
3.01953,
50.62484
],
[
3.01811,
50.62529
],
[
3.01313,
50.62558
],
[
3.01385,
50.62695
],
[
3.00844,
50.62717
],
[
3.0056,
50.6267
],
[
3.00229,
50.62557
],
[
3.00119,
50.62723
],
[
2.99769,
50.62901
],
[
2.99391,
50.62732
],
[
2.98971,
50.63036
],
[
2.9862,
50.63328
],
[
2.98178,
50.63404
],
[
2.97917,
50.63499
],
[
2.97284,
50.63429
],
[
2.97174,
50.63365
],
[
2.97002,
50.63366
],
[
2.96956,
50.63506
],
[
2.97046,
50.6365
],
[
2.96878,
50.63833
],
[
2.97039,
50.6395
],
[
2.97275,
50.64183
],
[
2.97225,
50.64381
],
[
2.9745,
50.64442
],
[
2.97474,
50.64648
],
[
2.97091,
50.65108
],
[
2.96975,
50.65361
],
[
2.97061,
50.65513
],
[
2.96929,
50.65739
],
[
2.97072,
50.6581
],
[
2.97973,
50.66048
],
[
2.98369,
50.66123
],
[
2.9865,
50.65959
],
[
2.9896,
50.65845
],
[
2.9963,
50.65666
],
[
2.99903,
50.65552
],
[
3.00274,
50.65235
],
[
3.00714,
50.64887
],
[
3.01088,
50.64845
],
[
3.01318,
50.64541
],
[
3.01974,
50.63972
],
[
3.02317,
50.63813
],
[
3.02639,
50.63613
],
[
3.029,
50.63521
],
[
3.03414,
50.6382
],
[
3.03676,
50.63888
],
[
3.03686,
50.64147
],
[
3.03791,
50.64379
],
[
3.0409,
50.64577
],
[
3.04582,
50.64807
],
[
3.05132,
50.64866
],
[
3.05055,
50.64949
],
[
3.05244,
50.65055
],
[
3.05784,
50.64927
],
[
3.0596,
50.65105
],
[
3.06414,
50.65041
],
[
3.06705,
50.64936
],
[
3.07023,
50.64706
],
[
3.07203,
50.64355
],
[
3.07526,
50.64188
],
[
3.0758,
50.64453
],
[
3.07753,
50.64381
],
[
3.07861,
50.64542
],
[
3.08299,
50.64725
],
[
3.08046,
50.64912
],
[
3.08349,
50.65082
],
[
3.08354,
50.65155
],
[
3.08477,
50.65312
],
[
3.08542,
50.65654
],
[
3.08753,
50.65687
],
[
3.09032,
50.65602
],
[
3.09018,
50.65142
],
[
3.09278,
50.65086
],
[
3.09402,
50.64982
],
[
3.09908,
50.65146
],
[
3.10316,
50.65227
],
[
3.09726,
50.64723
],
[
3.09387,
50.64358
],
[
3.09357,
50.64095
],
[
3.09561,
50.64133
],
[
3.09675,
50.64018
],
[
3.09454,
50.63891
],
[
3.09627,
50.63693
],
[
3.09795,
50.63713
],
[
3.09919,
50.63576
],
[
3.10324,
50.6351
],
[
3.10613,
50.63532
],
[
3.10649,
50.63434
],
[
3.1109,
50.63525
],
[
3.11502,
50.63504
],
[
3.11681,
50.63646
]
]
]
}

View File

@@ -0,0 +1,283 @@
use crate::{common::Server, json};
use meili_snap::{json_string, snapshot};
const LILLE: &str = include_str!("assets/lille.geojson");
#[actix_rt::test]
async fn basic_add_settings_and_geojson_documents() {
let server = Server::new_shared();
let index = server.unique_index();
let (task, _status_code) =
index.update_settings(json!({"filterableAttributes": ["_geojson"]})).await;
server.wait_task(task.uid()).await.succeeded();
let (response, _) = index.search_get("?filter=_geoPolygon([0,0],[2,0],[2,2],[0,2])").await;
snapshot!(response,
@r#"
{
"hits": [],
"query": "",
"processingTimeMs": "[duration]",
"limit": 20,
"offset": 0,
"estimatedTotalHits": 0
}
"#);
let lille: serde_json::Value = serde_json::from_str(LILLE).unwrap();
let documents = json!([
{
"id": "missing",
},
{
"id": "point",
"_geojson": { "type": "Point", "coordinates": [1, 1] },
},
{
"id": "lille",
"_geojson": lille,
},
]);
let (task, _status_code) = index.add_documents(documents, None).await;
let response = server.wait_task(task.uid()).await.succeeded();
snapshot!(json_string!(response, { ".uid" => "[uid]", ".batchUid" => "[batch_uid]", ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" }),
@r#"
{
"uid": "[uid]",
"batchUid": "[batch_uid]",
"indexUid": "[uuid]",
"status": "succeeded",
"type": "documentAdditionOrUpdate",
"canceledBy": null,
"details": {
"receivedDocuments": 3,
"indexedDocuments": 3
},
"error": null,
"duration": "[duration]",
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"#);
let (response, code) = index.get_all_documents_raw("?ids=missing,point").await;
snapshot!(code, @"200 OK");
snapshot!(response,
@r#"
{
"results": [
{
"id": "missing"
},
{
"id": "point",
"_geojson": {
"type": "Point",
"coordinates": [
1,
1
]
}
}
],
"offset": 0,
"limit": 20,
"total": 2
}
"#);
let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[2,0],[2,2],[0,2])").await;
snapshot!(response,
@r#"
{
"hits": [
{
"id": "point",
"_geojson": {
"type": "Point",
"coordinates": [
1,
1
]
}
}
],
"query": "",
"processingTimeMs": "[duration]",
"limit": 20,
"offset": 0,
"estimatedTotalHits": 1
}
"#);
}
#[actix_rt::test]
async fn basic_add_geojson_documents_and_settings() {
let server = Server::new_shared();
let index = server.unique_index();
let lille: serde_json::Value = serde_json::from_str(LILLE).unwrap();
let documents = json!([
{
"id": "missing",
},
{
"id": "point",
"_geojson": { "type": "Point", "coordinates": [1, 1] },
},
{
"id": "lille",
"_geojson": lille,
},
]);
let (task, _status_code) = index.add_documents(documents, None).await;
let response = server.wait_task(task.uid()).await.succeeded();
snapshot!(response,
@r#"
{
"uid": "[uid]",
"batchUid": "[batch_uid]",
"indexUid": "[uuid]",
"status": "succeeded",
"type": "documentAdditionOrUpdate",
"canceledBy": null,
"details": {
"receivedDocuments": 3,
"indexedDocuments": 3
},
"error": null,
"duration": "[duration]",
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"#);
let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[2,0],[2,2],[0,2])").await;
snapshot!(response,
@r#"
{
"message": "Index `[uuid]`: Attribute `_geojson` is not filterable. This index does not have configured filterable attributes.\n14:15 _geoPolygon([0,0],[2,0],[2,2],[0,2])",
"code": "invalid_search_filter",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_search_filter"
}
"#);
let (task, _status_code) =
index.update_settings(json!({"filterableAttributes": ["_geojson"]})).await;
server.wait_task(task.uid()).await.succeeded();
let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[2,0],[2,2],[0,2])").await;
snapshot!(response,
@r#"
{
"hits": [
{
"id": "point",
"_geojson": {
"type": "Point",
"coordinates": [
1,
1
]
}
}
],
"query": "",
"processingTimeMs": "[duration]",
"limit": 20,
"offset": 0,
"estimatedTotalHits": 1
}
"#);
}
#[actix_rt::test]
async fn add_and_remove_geojson() {
let server = Server::new_shared();
let index = server.unique_index();
index.update_settings(json!({"filterableAttributes": ["_geojson"]})).await;
let documents = json!([
{
"id": "missing",
},
{
"id": 0,
"_geojson": { "type": "Point", "coordinates": [1, 1] },
}
]);
let (task, _status_code) = index.add_documents(documents, None).await;
server.wait_task(task.uid()).await.succeeded();
let (response, _code) =
index.search_get("?filter=_geoPolygon([0,0],[0.9,0],[0.9,0.9],[0,0.9])").await;
assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 0);
let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[2,0],[2,2],[0,2])").await;
assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 1);
let (task, _) = index.delete_document(0).await;
server.wait_task(task.uid()).await.succeeded();
let (response, _code) =
index.search_get("?filter=_geoPolygon([0,0],[0.9,0],[0.9,0.9],[0,0.9])").await;
assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 0);
let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[2,0],[2,2],[0,2])").await;
assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 0);
// add it back
let documents = json!([
{
"id": 0,
"_geojson": { "type": "Point", "coordinates": [1, 1] },
}
]);
let (task, _status_code) = index.add_documents(documents, None).await;
server.wait_task(task.uid()).await.succeeded();
let (response, _code) =
index.search_get("?filter=_geoPolygon([0,0],[0.9,0],[0.9,0.9],[0,0.9])").await;
assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 0);
let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[2,0],[2,2],[0,2])").await;
assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 1);
}
#[actix_rt::test]
async fn partial_update_geojson() {
let server = Server::new_shared();
let index = server.unique_index();
let (task, _) = index.update_settings(json!({"filterableAttributes": ["_geojson"]})).await;
server.wait_task(task.uid()).await.succeeded();
let documents = json!([
{
"id": 0,
"_geojson": { "type": "Point", "coordinates": [1, 1] },
}
]);
let (task, _status_code) = index.add_documents(documents, None).await;
server.wait_task(task.uid()).await.succeeded();
let (response, _code) =
index.search_get("?filter=_geoPolygon([0,0],[0.9,0],[0.9,0.9],[0,0.9])").await;
assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 0);
let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[2,0],[2,2],[0,2])").await;
assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 1);
let documents = json!([
{
"id": 0,
"_geojson": { "type": "Point", "coordinates": [0.5, 0.5] },
}
]);
let (task, _status_code) = index.update_documents(documents, None).await;
server.wait_task(task.uid()).await.succeeded();
let (response, _code) =
index.search_get("?filter=_geoPolygon([0,0],[0.9,0],[0.9,0.9],[0,0.9])").await;
assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 1);
let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[2,0],[2,2],[0,2])").await;
assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 1);
let (response, _code) =
index.search_get("?filter=_geoPolygon([0.9,0.9],[2,0.9],[2,2],[0.9,2])").await;
assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 0);
}

View File

@@ -1,5 +1,6 @@
mod add_documents;
mod delete_documents;
mod errors;
mod geojson;
mod get_documents;
mod update_documents;

View File

@@ -18,6 +18,9 @@ bincode = "1.3.3"
bstr = "1.12.0"
bytemuck = { version = "1.23.1", features = ["extern_crate_alloc"] }
byteorder = "1.5.0"
# cellulite = { path = "../../../cellulite" }
cellulite = "0.1.1"
steppe = "0.4.0"
charabia = { version = "0.9.6", default-features = false }
concat-arrays = "0.1.2"
convert_case = "0.8.0"
@@ -27,6 +30,7 @@ either = { version = "1.15.0", features = ["serde"] }
flatten-serde-json = { path = "../flatten-serde-json" }
fst = "0.4.7"
fxhash = "0.2.1"
geojson = "0.24.2"
geoutils = "0.5.1"
grenad = { version = "0.5.0", default-features = false, features = [
"rayon",
@@ -109,6 +113,8 @@ utoipa = { version = "5.4.0", features = [
"openapi_extensions",
] }
lru = "0.14.0"
geo-types = "0.7.16"
zerometry = "0.1.0"
[dev-dependencies]
mimalloc = { version = "0.1.47", default-features = false }

View File

@@ -11,3 +11,4 @@ const fn parse_u32(s: &str) -> u32 {
pub const RESERVED_VECTORS_FIELD_NAME: &str = "_vectors";
pub const RESERVED_GEO_FIELD_NAME: &str = "_geo";
pub const RESERVED_GEOJSON_FIELD_NAME: &str = "_geojson";

View File

@@ -78,6 +78,8 @@ pub enum InternalError {
#[error(transparent)]
ArroyError(#[from] arroy::Error),
#[error(transparent)]
CelluliteError(#[from] cellulite::Error),
#[error(transparent)]
VectorEmbeddingError(#[from] crate::vector::Error),
}
@@ -97,6 +99,12 @@ pub enum SerializationError {
InvalidNumberSerialization,
}
impl From<cellulite::Error> for Error {
fn from(error: cellulite::Error) -> Self {
Self::UserError(UserError::CelluliteError(error))
}
}
#[derive(Error, Debug)]
pub enum FieldIdMapMissingEntry {
#[error("unknown field id {field_id} coming from the {process} process")]
@@ -107,6 +115,8 @@ pub enum FieldIdMapMissingEntry {
#[derive(Error, Debug)]
pub enum UserError {
#[error(transparent)]
CelluliteError(#[from] cellulite::Error),
#[error("A document cannot contain more than 65,535 fields.")]
AttributeLimitReached,
#[error(transparent)]
@@ -151,6 +161,8 @@ and can not be more than 511 bytes.", .document_id.to_string()
},
#[error(transparent)]
InvalidGeoField(#[from] Box<GeoError>),
#[error(transparent)]
GeoJsonError(#[from] geojson::Error),
#[error("Invalid vector dimensions: expected: `{}`, found: `{}`.", .expected, .found)]
InvalidVectorDimensions { expected: usize, found: usize },
#[error("Invalid vector dimensions in document with id `{document_id}` in `._vectors.{embedder_name}`.\n - note: embedding #{embedding_index} has dimensions {found}\n - note: embedder `{embedder_name}` requires {expected}")]

View File

@@ -6,7 +6,9 @@ use heed::RoTxn;
use super::FieldsIdsMap;
use crate::attribute_patterns::{match_field_legacy, PatternMatch};
use crate::constants::{RESERVED_GEO_FIELD_NAME, RESERVED_VECTORS_FIELD_NAME};
use crate::constants::{
RESERVED_GEOJSON_FIELD_NAME, RESERVED_GEO_FIELD_NAME, RESERVED_VECTORS_FIELD_NAME,
};
use crate::{
is_faceted_by, FieldId, FilterableAttributesFeatures, FilterableAttributesRule, Index,
LocalizedAttributesRule, Result, Weight,
@@ -24,6 +26,8 @@ pub struct Metadata {
pub asc_desc: bool,
/// The field is a geo field (`_geo`, `_geo.lat`, `_geo.lng`).
pub geo: bool,
/// The field is a geo json field (`_geojson`).
pub geo_json: bool,
/// The id of the localized attributes rule if the field is localized.
pub localized_attributes_rule_id: Option<NonZeroU16>,
/// The id of the filterable attributes rule if the field is filterable.
@@ -269,6 +273,7 @@ impl MetadataBuilder {
distinct: false,
asc_desc: false,
geo: false,
geo_json: false,
localized_attributes_rule_id: None,
filterable_attributes_rule_id: None,
};
@@ -295,6 +300,20 @@ impl MetadataBuilder {
distinct: false,
asc_desc: false,
geo: true,
geo_json: false,
localized_attributes_rule_id: None,
filterable_attributes_rule_id,
};
}
if match_field_legacy(RESERVED_GEOJSON_FIELD_NAME, field) == PatternMatch::Match {
debug_assert!(!sortable, "geojson fields should not be sortable");
return Metadata {
searchable: None,
sortable,
distinct: false,
asc_desc: false,
geo: false,
geo_json: true,
localized_attributes_rule_id: None,
filterable_attributes_rule_id,
};
@@ -328,6 +347,7 @@ impl MetadataBuilder {
distinct,
asc_desc,
geo: false,
geo_json: false,
localized_attributes_rule_id,
filterable_attributes_rule_id,
}

View File

@@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
use crate::attribute_patterns::{match_distinct_field, match_field_legacy, PatternMatch};
use crate::constants::RESERVED_GEO_FIELD_NAME;
use crate::constants::{RESERVED_GEOJSON_FIELD_NAME, RESERVED_GEO_FIELD_NAME};
use crate::AttributePatterns;
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug, ToSchema)]
@@ -34,6 +34,10 @@ impl FilterableAttributesRule {
matches!(self, FilterableAttributesRule::Field(field_name) if field_name == RESERVED_GEO_FIELD_NAME)
}
pub fn has_geojson(&self) -> bool {
matches!(self, FilterableAttributesRule::Field(field_name) if field_name == RESERVED_GEOJSON_FIELD_NAME)
}
/// Get the features of the rule.
pub fn features(&self) -> FilterableAttributesFeatures {
match self {

View File

@@ -5,6 +5,7 @@ use std::fmt;
use std::fs::File;
use std::path::Path;
use cellulite::Cellulite;
use deserr::Deserr;
use heed::types::*;
use heed::{CompactionOption, Database, DatabaseStat, RoTxn, RwTxn, Unspecified, WithoutTls};
@@ -114,9 +115,10 @@ pub mod db_name {
pub const FIELD_ID_DOCID_FACET_STRINGS: &str = "field-id-docid-facet-strings";
pub const VECTOR_EMBEDDER_CATEGORY_ID: &str = "vector-embedder-category-id";
pub const VECTOR_ARROY: &str = "vector-arroy";
pub const CELLULITE: &str = "cellulite";
pub const DOCUMENTS: &str = "documents";
}
const NUMBER_OF_DBS: u32 = 25;
const NUMBER_OF_DBS: u32 = 25 + Cellulite::nb_dbs();
#[derive(Clone)]
pub struct Index {
@@ -182,6 +184,9 @@ pub struct Index {
/// Vector store based on arroy™.
pub vector_arroy: arroy::Database<Unspecified>,
/// Geo store based on cellulite™.
pub cellulite: Cellulite,
/// Maps the document id to the document as an obkv store.
pub(crate) documents: Database<BEU32, ObkvCodec>,
}
@@ -238,6 +243,7 @@ impl Index {
let embedder_category_id =
env.create_database(&mut wtxn, Some(VECTOR_EMBEDDER_CATEGORY_ID))?;
let vector_arroy = env.create_database(&mut wtxn, Some(VECTOR_ARROY))?;
let cellulite = cellulite::Cellulite::create_from_env(&env, &mut wtxn)?;
let documents = env.create_database(&mut wtxn, Some(DOCUMENTS))?;
@@ -266,6 +272,7 @@ impl Index {
field_id_docid_facet_strings,
vector_arroy,
embedder_category_id,
cellulite,
documents,
};
if this.get_version(&wtxn)?.is_none() && creation {
@@ -1023,6 +1030,13 @@ impl Index {
Ok(geo_filter)
}
/// Returns true if the geo sorting feature is enabled.
pub fn is_geojson_enabled(&self, rtxn: &RoTxn<'_>) -> Result<bool> {
let geojson_filter =
self.filterable_attributes_rules(rtxn)?.iter().any(|field| field.has_geojson());
Ok(geojson_filter)
}
pub fn asc_desc_fields(&self, rtxn: &RoTxn<'_>) -> Result<HashSet<String>> {
let asc_desc_fields = self
.criteria(rtxn)?
@@ -1842,6 +1856,7 @@ impl Index {
field_id_docid_facet_strings,
vector_arroy,
embedder_category_id,
cellulite: _,
documents,
} = self;

View File

@@ -85,7 +85,7 @@ pub use self::search::{
};
pub use self::update::ChannelCongestion;
pub type Result<T> = std::result::Result<T, error::Error>;
pub type Result<T, E = error::Error> = std::result::Result<T, E>;
pub type Attribute = u32;
pub type BEU16 = heed::types::U16<heed::byteorder::BE>;

View File

@@ -317,3 +317,27 @@ impl Step for arroy::SubStep {
self.max
}
}
// Integration with steppe
impl steppe::Progress for Progress {
fn update(&self, sub_progress: impl steppe::Step) {
self.update_progress(Compat(sub_progress));
}
}
struct Compat<T: steppe::Step>(T);
impl<T: steppe::Step> Step for Compat<T> {
fn name(&self) -> Cow<'static, str> {
self.0.name().into()
}
fn current(&self) -> u32 {
self.0.current().try_into().unwrap_or(u32::MAX)
}
fn total(&self) -> u32 {
self.0.total().try_into().unwrap_or(u32::MAX)
}
}

View File

@@ -11,7 +11,7 @@ use roaring::{MultiOps, RoaringBitmap};
use serde_json::Value;
use super::facet_range_search;
use crate::constants::RESERVED_GEO_FIELD_NAME;
use crate::constants::{RESERVED_GEOJSON_FIELD_NAME, RESERVED_GEO_FIELD_NAME};
use crate::error::{Error, UserError};
use crate::filterable_attributes_rules::{filtered_matching_patterns, matching_features};
use crate::heed_codec::facet::{
@@ -778,6 +778,39 @@ impl<'a> Filter<'a> {
))?
}
}
FilterCondition::GeoPolygon { points } => {
if index.is_geojson_enabled(rtxn)? {
let polygon = geo_types::Polygon::new(
geo_types::LineString(
points
.iter()
.map(|p| {
Ok(geo_types::Coord {
x: p[0].parse_finite_float()?,
y: p[1].parse_finite_float()?,
})
})
.collect::<Result<_, filter_parser::Error>>()?,
),
Vec::new(),
);
let result = index
.cellulite
.in_shape(rtxn, &polygon.into(), &mut |_| ())
.map_err(InternalError::CelluliteError)?;
// TODO: Remove once we update roaring
let result = roaring::RoaringBitmap::from_iter(result.into_iter());
Ok(result)
} else {
Err(points[0][0].as_external_error(FilterError::AttributeNotFilterable {
attribute: RESERVED_GEOJSON_FIELD_NAME,
filterable_patterns: filtered_matching_patterns(
filterable_attribute_rules,
&|features| features.is_filterable(),
),
}))?
}
}
}
}
}

View File

@@ -47,6 +47,7 @@ impl<'t, 'i> ClearDocuments<'t, 'i> {
field_id_docid_facet_strings,
vector_arroy,
embedder_category_id: _,
cellulite,
documents,
} = self.index;
@@ -89,6 +90,7 @@ impl<'t, 'i> ClearDocuments<'t, 'i> {
field_id_docid_facet_strings.clear(self.wtxn)?;
// vector
vector_arroy.clear(self.wtxn)?;
cellulite.clear(self.wtxn)?;
documents.clear(self.wtxn)?;

View File

@@ -2,6 +2,7 @@ use std::fs::File;
use std::io::{self, BufReader};
use concat_arrays::concat_arrays;
use geojson::GeoJson;
use serde_json::Value;
use super::helpers::{create_writer, writer_into_reader, GrenadParameters};
@@ -9,7 +10,7 @@ use crate::error::GeoError;
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
use crate::update::index_documents::extract_finite_float_from_value;
use crate::update::settings::{InnerIndexSettings, InnerIndexSettingsDiff};
use crate::{FieldId, InternalError, Result};
use crate::{DocumentId, FieldId, InternalError, Result};
/// Extracts the geographical coordinates contained in each document under the `_geo` field.
///
@@ -107,3 +108,77 @@ fn extract_lat_lng(
None => Ok(None),
}
}
/// Extracts the geographical coordinates contained in each document under the `_geo` field.
///
/// Returns the generated grenad reader containing the docid as key associated to the (latitude, longitude)
#[tracing::instrument(level = "trace", skip_all, target = "indexing::extract")]
pub fn extract_geojson<R: io::Read + io::Seek>(
obkv_documents: grenad::Reader<R>,
indexer: GrenadParameters,
primary_key_id: FieldId,
settings_diff: &InnerIndexSettingsDiff,
) -> Result<grenad::Reader<BufReader<File>>> {
let mut writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
tracing::info!("Extracting one geojson");
let mut cursor = obkv_documents.into_cursor()?;
while let Some((docid_bytes, value)) = cursor.move_on_next()? {
let obkv = obkv::KvReader::from_slice(value);
// since we only need the primary key when we throw an error
// we create this getter to lazily get it when needed
let document_id = || -> Value {
let reader = KvReaderDelAdd::from_slice(obkv.get(primary_key_id).unwrap());
let document_id =
reader.get(DelAdd::Deletion).or(reader.get(DelAdd::Addition)).unwrap();
serde_json::from_slice(document_id).unwrap()
};
// extract old version
let del_geojson =
extract_geojson_field(obkv, &settings_diff.old, DelAdd::Deletion, document_id)?;
// extract new version
let add_geojson =
extract_geojson_field(obkv, &settings_diff.new, DelAdd::Addition, document_id)?;
if del_geojson != add_geojson {
let mut obkv = KvWriterDelAdd::memory();
if del_geojson.is_some() {
#[allow(clippy::drop_non_drop)]
// We don't need to store the geojson, we'll just delete it by id
obkv.insert(DelAdd::Deletion, [])?;
}
if let Some(geojson) = add_geojson {
#[allow(clippy::drop_non_drop)]
obkv.insert(DelAdd::Addition, geojson.to_string().as_bytes())?;
}
let bytes = obkv.into_inner()?;
writer.insert(&docid_bytes[0..std::mem::size_of::<DocumentId>()], bytes)?;
}
}
writer_into_reader(writer)
}
fn extract_geojson_field(
obkv: &obkv::KvReader<FieldId>,
settings: &InnerIndexSettings,
deladd: DelAdd,
_document_id: impl Fn() -> Value,
) -> Result<Option<GeoJson>> {
match settings.geojson_fid {
Some(fid) if settings.filterable_attributes_rules.iter().any(|rule| rule.has_geojson()) => {
let value = obkv.get(fid).map(KvReaderDelAdd::from_slice).and_then(|r| r.get(deladd));
// TODO: That's a user error, not an internal error
Ok(value
.map(|v| serde_json::from_slice(v).map_err(InternalError::SerdeJson))
.transpose()?)
}
_ => Ok(None),
}
}

View File

@@ -31,6 +31,7 @@ use self::extract_word_position_docids::extract_word_position_docids;
use super::helpers::{as_cloneable_grenad, CursorClonableMmap, GrenadParameters};
use super::{helpers, TypedChunk};
use crate::progress::EmbedderStats;
use crate::update::index_documents::extract::extract_geo_points::extract_geojson;
use crate::update::index_documents::extract::extract_vector_points::extract_embeddings_from_fragments;
use crate::update::settings::InnerIndexSettingsDiff;
use crate::vector::db::EmbedderInfo;
@@ -62,6 +63,7 @@ pub(crate) fn data_from_obkv_documents(
original_documents_chunk,
indexer,
lmdb_writer_sx.clone(),
primary_key_id,
settings_diff.clone(),
embedder_info.clone(),
possible_embedding_mistakes.clone(),
@@ -232,6 +234,7 @@ fn send_original_documents_data(
original_documents_chunk: Result<grenad::Reader<BufReader<File>>>,
indexer: GrenadParameters,
lmdb_writer_sx: Sender<Result<TypedChunk>>,
primary_key_id: FieldId,
settings_diff: Arc<InnerIndexSettingsDiff>,
embedder_info: Arc<Vec<(String, EmbedderInfo)>>,
possible_embedding_mistakes: Arc<PossibleEmbeddingMistakes>,
@@ -240,6 +243,22 @@ fn send_original_documents_data(
let original_documents_chunk =
original_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?;
tracing::warn!("Do we have a geojson");
if settings_diff.reindex_geojson() {
tracing::warn!("Yes we do");
let documents_chunk_cloned = original_documents_chunk.clone();
let lmdb_writer_sx_cloned = lmdb_writer_sx.clone();
let settings_diff = settings_diff.clone();
rayon::spawn(move || {
let result =
extract_geojson(documents_chunk_cloned, indexer, primary_key_id, &settings_diff);
let _ = match result {
Ok(geojson) => lmdb_writer_sx_cloned.send(Ok(TypedChunk::GeoJson(geojson))),
Err(error) => lmdb_writer_sx_cloned.send(Err(error)),
};
});
}
let index_vectors = (settings_diff.reindex_vectors() || !settings_diff.settings_update_only())
// no point in indexing vectors without embedders
&& (!settings_diff.new.runtime_embedders.inner_as_ref().is_empty());

View File

@@ -522,7 +522,7 @@ where
.is_some_and(|conf| conf.is_quantized);
let is_quantizing = embedder_config.is_some_and(|action| action.is_being_quantized);
pool.install(|| {
pool.install(|| -> Result<()> {
let mut writer = ArroyWrapper::new(vector_arroy, embedder_index, was_quantized);
writer.build_and_quantize(
wtxn,
@@ -539,6 +539,8 @@ where
.map_err(InternalError::from)??;
}
self.index.cellulite.build(self.wtxn, &Progress::default())?;
self.execute_prefix_databases(
word_docids.map(MergerBuilder::build),
exact_word_docids.map(MergerBuilder::build),

View File

@@ -820,19 +820,20 @@ impl<'a, 'i> Transform<'a, 'i> {
let documents_count = documents_ids.len() as usize;
// We initialize the sorter with the user indexing settings.
let mut original_sorter = if settings_diff.reindex_vectors() {
Some(create_sorter(
grenad::SortAlgorithm::Stable,
KeepFirst,
self.indexer_settings.chunk_compression_type,
self.indexer_settings.chunk_compression_level,
self.indexer_settings.max_nb_chunks,
self.indexer_settings.max_memory.map(|mem| mem / 2),
true,
))
} else {
None
};
let mut original_sorter =
if settings_diff.reindex_vectors() || settings_diff.reindex_geojson() {
Some(create_sorter(
grenad::SortAlgorithm::Stable,
KeepFirst,
self.indexer_settings.chunk_compression_type,
self.indexer_settings.chunk_compression_level,
self.indexer_settings.max_nb_chunks,
self.indexer_settings.max_memory.map(|mem| mem / 2),
true,
))
} else {
None
};
let readers: BTreeMap<&str, (ArroyWrapper, &RoaringBitmap)> = settings_diff
.embedding_config_updates

View File

@@ -30,7 +30,7 @@ use crate::vector::db::{EmbeddingStatusDelta, IndexEmbeddingConfig};
use crate::vector::ArroyWrapper;
use crate::{
lat_lng_to_xyz, CboRoaringBitmapCodec, DocumentId, FieldId, GeoPoint, Index, InternalError,
Result, SerializationError, U8StrStrCodec,
Result, SerializationError, U8StrStrCodec, UserError,
};
/// This struct accumulates and group the TypedChunks
@@ -85,6 +85,7 @@ pub(crate) enum TypedChunk {
FieldIdFacetIsNullDocids(grenad::Reader<BufReader<File>>),
FieldIdFacetIsEmptyDocids(grenad::Reader<BufReader<File>>),
GeoPoints(grenad::Reader<BufReader<File>>),
GeoJson(grenad::Reader<BufReader<File>>),
VectorPoints {
remove_vectors: grenad::Reader<BufReader<File>>,
// docid -> vector
@@ -614,6 +615,42 @@ pub(crate) fn write_typed_chunk_into_index(
index.put_geo_rtree(wtxn, &rtree)?;
index.put_geo_faceted_documents_ids(wtxn, &geo_faceted_docids)?;
}
TypedChunk::GeoJson(_) => {
let span = tracing::trace_span!(target: "indexing::write_db", "geo_json");
let _entered = span.enter();
let mut builder = MergerBuilder::new(KeepFirst);
for typed_chunk in typed_chunks {
let TypedChunk::GeoJson(chunk) = typed_chunk else {
unreachable!();
};
builder.push(chunk.into_cursor()?);
}
let merger = builder.build();
let mut iter = merger.into_stream_merger_iter()?;
while let Some((key, value)) = iter.next()? {
// convert the key back to a u32 (4 bytes)
tracing::warn!("Key: {:?}, length: {}", key, key.len());
let docid = key.try_into().map(DocumentId::from_be_bytes).unwrap();
let deladd_obkv = KvReaderDelAdd::from_slice(value);
if let Some(_value) = deladd_obkv.get(DelAdd::Deletion) {
index.cellulite.delete(wtxn, docid)?;
}
if let Some(value) = deladd_obkv.get(DelAdd::Addition) {
tracing::warn!("Adding one geojson to cellulite");
let geojson =
geojson::GeoJson::from_reader(value).map_err(UserError::SerdeJson)?;
index
.cellulite
.add(wtxn, docid, &geojson)
.map_err(InternalError::CelluliteError)?;
}
}
}
TypedChunk::VectorPoints { .. } => {
let span = tracing::trace_span!(target: "indexing::write_db", "vector_points");
let _entered = span.enter();

View File

@@ -139,6 +139,11 @@ pub enum ReceiverAction {
LargeEntry(LargeEntry),
LargeVectors(LargeVectors),
LargeVector(LargeVector),
// TODO: I don't understand all the buffer stuff so I'm going to send all geojson one by one stored in RAM.
// The geojson for france made of 63k points takes 594KiB which means with a capacity of 1000,
// the absolute maximum amounts of memory we could consume is about 580MiB which is acceptable for this POC.
// If the geojson is None, it means that the document is being deleted.
GeoJson(DocumentId, Option<Vec<u8>>),
}
/// An entry that cannot fit in the BBQueue buffers has been
@@ -548,6 +553,10 @@ impl<'b> ExtractorBbqueueSender<'b> {
GeoSender(self)
}
pub fn geojson<'a>(&'a self) -> GeoJsonSender<'a, 'b> {
GeoJsonSender(self)
}
fn delete_vector(&self, docid: DocumentId) -> crate::Result<()> {
let max_grant = self.max_grant;
let refcell = self.producers.get().unwrap();
@@ -1140,3 +1149,15 @@ impl GeoSender<'_, '_> {
)
}
}
#[derive(Clone, Copy)]
pub struct GeoJsonSender<'a, 'b>(&'a ExtractorBbqueueSender<'b>);
impl GeoJsonSender<'_, '_> {
pub fn send_geojson(&self, docid: DocumentId, value: Vec<u8>) -> StdResult<(), SendError<()>> {
self.0.sender.send(ReceiverAction::GeoJson(docid, Some(value))).map_err(|_| SendError(()))
}
pub fn delete_geojson(&self, docid: DocumentId) -> StdResult<(), SendError<()>> {
self.0.sender.send(ReceiverAction::GeoJson(docid, None)).map_err(|_| SendError(()))
}
}

View File

@@ -10,7 +10,9 @@ use serde_json::value::RawValue;
use super::vector_document::VectorDocument;
use super::{KvReaderFieldId, KvWriterFieldId};
use crate::constants::{RESERVED_GEO_FIELD_NAME, RESERVED_VECTORS_FIELD_NAME};
use crate::constants::{
RESERVED_GEOJSON_FIELD_NAME, RESERVED_GEO_FIELD_NAME, RESERVED_VECTORS_FIELD_NAME,
};
use crate::documents::FieldIdMapper;
use crate::update::del_add::KvReaderDelAdd;
use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal};
@@ -26,6 +28,7 @@ use crate::{
///
/// The 'doc lifetime is meant to live sufficiently for the document to be handled by the extractors.
pub trait Document<'doc> {
fn geojson_field(&self) -> Result<Option<&'doc RawValue>>;
/// Iterate over all **top-level** fields of the document, returning their name and raw JSON value.
///
/// - The returned values *may* contain nested fields.
@@ -113,6 +116,10 @@ impl<'t, Mapper: FieldIdMapper> Document<'t> for DocumentFromDb<'t, Mapper> {
self.field(RESERVED_GEO_FIELD_NAME)
}
fn geojson_field(&self) -> Result<Option<&'t RawValue>> {
self.field(RESERVED_GEOJSON_FIELD_NAME)
}
fn top_level_fields_count(&self) -> usize {
let has_vectors_field = self.vectors_field().unwrap_or(None).is_some();
let has_geo_field = self.geo_field().unwrap_or(None).is_some();
@@ -177,6 +184,10 @@ impl<'doc> Document<'doc> for DocumentFromVersions<'_, 'doc> {
Ok(self.versions.geo_field())
}
fn geojson_field(&self) -> Result<Option<&'doc RawValue>> {
Ok(self.versions.geojson_field())
}
fn top_level_fields_count(&self) -> usize {
let has_vectors_field = self.vectors_field().unwrap_or(None).is_some();
let has_geo_field = self.geo_field().unwrap_or(None).is_some();
@@ -265,6 +276,16 @@ impl<'d, 'doc: 'd, 't: 'd, Mapper: FieldIdMapper> Document<'d>
db.geo_field()
}
fn geojson_field(&self) -> Result<Option<&'d RawValue>> {
if let Some(geojson) = self.new_doc.geojson_field()? {
return Ok(Some(geojson));
}
let Some(db) = self.db else { return Ok(None) };
db.geojson_field()
}
fn top_level_fields_count(&self) -> usize {
self.iter_top_level_fields().count()
}
@@ -296,6 +317,10 @@ where
D::geo_field(self)
}
fn geojson_field(&self) -> Result<Option<&'doc RawValue>> {
D::geojson_field(self)
}
fn top_level_fields_count(&self) -> usize {
D::top_level_fields_count(self)
}
@@ -454,6 +479,10 @@ impl<'doc> Versions<'doc> {
self.data.get(RESERVED_GEO_FIELD_NAME)
}
pub fn geojson_field(&self) -> Option<&'doc RawValue> {
self.data.get(RESERVED_GEOJSON_FIELD_NAME)
}
pub fn len(&self) -> usize {
self.data.len()
}
@@ -572,6 +601,10 @@ impl<'a, Mapper: FieldIdMapper> Document<'a> for KvDelAddDocument<'a, Mapper> {
fn geo_field(&self) -> Result<Option<&'a RawValue>> {
self.get(RESERVED_GEO_FIELD_NAME)
}
fn geojson_field(&self) -> Result<Option<&'a RawValue>> {
self.get(RESERVED_GEOJSON_FIELD_NAME)
}
}
pub struct DocumentIdentifiers<'doc> {

View File

@@ -0,0 +1,276 @@
use std::cell::RefCell;
use std::fs::File;
use std::io::{self, BufReader, BufWriter, ErrorKind, Read, Seek as _, Write as _};
use std::str::FromStr;
use std::{iter, mem};
use bumpalo::Bump;
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use cellulite::zerometry::ZerometryCodec;
use geo_types::Geometry;
use geojson::GeoJson;
use heed::{BytesEncode, RoTxn};
use zerometry::Zerometry;
use crate::update::new::channel::GeoJsonSender;
use crate::update::new::document::{Document, DocumentContext};
use crate::update::new::indexer::document_changes::Extractor;
use crate::update::new::ref_cell_ext::RefCellExt as _;
use crate::update::new::thread_local::MostlySend;
use crate::update::new::DocumentChange;
use crate::update::GrenadParameters;
use crate::{DocumentId, Index, InternalError, Result, UserError};
pub struct GeoJsonExtractor {
grenad_parameters: GrenadParameters,
}
impl GeoJsonExtractor {
pub fn new(
rtxn: &RoTxn,
index: &Index,
grenad_parameters: GrenadParameters,
) -> Result<Option<Self>> {
if index.is_geojson_enabled(rtxn)? {
Ok(Some(GeoJsonExtractor { grenad_parameters }))
} else {
Ok(None)
}
}
}
pub struct GeoJsonExtractorData<'extractor> {
/// The set of documents ids that were removed. If a document sees its geo
/// point being updated, we first put it in the deleted and then in the inserted.
removed: bumpalo::collections::Vec<'extractor, (DocumentId, &'extractor [u8])>,
inserted: bumpalo::collections::Vec<'extractor, (DocumentId, &'extractor [u8])>,
/// Contains a packed list of `ExtractedGeoPoint` of the inserted geo points
/// data structures if we have spilled to disk.
spilled_removed: Option<BufWriter<File>>,
/// Contains a packed list of `ExtractedGeoPoint` of the inserted geo points
/// data structures if we have spilled to disk.
spilled_inserted: Option<BufWriter<File>>,
}
impl<'extractor> GeoJsonExtractorData<'extractor> {
pub fn freeze(self) -> Result<FrozenGeoJsonExtractorData<'extractor>> {
let GeoJsonExtractorData { removed, inserted, spilled_removed, spilled_inserted } = self;
Ok(FrozenGeoJsonExtractorData {
removed: removed.into_bump_slice(),
inserted: inserted.into_bump_slice(),
spilled_removed: spilled_removed
.map(|bw| bw.into_inner().map(BufReader::new).map_err(|iie| iie.into_error()))
.transpose()?,
spilled_inserted: spilled_inserted
.map(|bw| bw.into_inner().map(BufReader::new).map_err(|iie| iie.into_error()))
.transpose()?,
})
}
}
unsafe impl MostlySend for GeoJsonExtractorData<'_> {}
pub struct FrozenGeoJsonExtractorData<'extractor> {
pub removed: &'extractor [(DocumentId, &'extractor [u8])],
pub inserted: &'extractor [(DocumentId, &'extractor [u8])],
pub spilled_removed: Option<BufReader<File>>,
pub spilled_inserted: Option<BufReader<File>>,
}
impl FrozenGeoJsonExtractorData<'_> {
pub fn iter_and_clear_removed(&mut self, channel: GeoJsonSender<'_, '_>) -> Result<()> {
for (docid, _buf) in mem::take(&mut self.removed) {
channel.delete_geojson(*docid).unwrap();
}
for ret in iterator_over_spilled_geojsons(&mut self.spilled_removed)? {
let (docid, _buf) = ret.map_err(InternalError::SerdeJson)?;
channel.delete_geojson(docid).unwrap();
}
Ok(())
}
pub fn iter_and_clear_inserted(&mut self, channel: GeoJsonSender<'_, '_>) -> Result<()> {
for (docid, _buf) in mem::take(&mut self.inserted) {
channel.send_geojson(*docid, _buf.to_vec()).unwrap();
}
for ret in iterator_over_spilled_geojsons(&mut self.spilled_inserted)? {
let (docid, buf) = ret.map_err(InternalError::SerdeJson)?;
channel.send_geojson(docid, buf.to_vec()).unwrap();
}
Ok(())
}
}
fn iterator_over_spilled_geojsons(
spilled: &mut Option<BufReader<File>>,
) -> io::Result<impl IntoIterator<Item = Result<(DocumentId, Vec<u8>), serde_json::Error>> + '_> {
let mut spilled = spilled.take();
if let Some(spilled) = &mut spilled {
spilled.rewind()?;
}
Ok(iter::from_fn(move || match &mut spilled {
Some(file) => {
let docid = match file.read_u32::<BigEndian>() {
Ok(docid) => docid,
Err(e) if e.kind() == ErrorKind::UnexpectedEof => return None,
Err(e) => return Some(Err(serde_json::Error::io(e))),
};
let size = match file.read_u32::<BigEndian>() {
Ok(size) => size,
Err(e) => return Some(Err(serde_json::Error::io(e))),
};
let mut buf = vec![0; size as usize];
match file.read_exact(&mut buf) {
Ok(()) => Some(Ok((docid, buf))),
Err(e) => return Some(Err(serde_json::Error::io(e))),
}
}
None => None,
}))
}
impl<'extractor> Extractor<'extractor> for GeoJsonExtractor {
type Data = RefCell<GeoJsonExtractorData<'extractor>>;
fn init_data<'doc>(&'doc self, extractor_alloc: &'extractor Bump) -> Result<Self::Data> {
Ok(RefCell::new(GeoJsonExtractorData {
removed: bumpalo::collections::Vec::new_in(extractor_alloc),
inserted: bumpalo::collections::Vec::new_in(extractor_alloc),
spilled_inserted: None,
spilled_removed: None,
}))
}
fn process<'doc>(
&'doc self,
changes: impl Iterator<Item = Result<DocumentChange<'doc>>>,
context: &'doc DocumentContext<'doc, 'extractor, '_, '_, Self::Data>,
) -> Result<()> {
let rtxn = &context.rtxn;
let index = context.index;
let max_memory = self.grenad_parameters.max_memory_by_thread();
let db_fields_ids_map = context.db_fields_ids_map;
let mut data_ref = context.data.borrow_mut_or_yield();
for change in changes {
if data_ref.spilled_removed.is_none()
&& max_memory.is_some_and(|mm| context.extractor_alloc.allocated_bytes() >= mm)
{
// We must spill as we allocated too much memory
data_ref.spilled_removed = tempfile::tempfile().map(BufWriter::new).map(Some)?;
data_ref.spilled_inserted = tempfile::tempfile().map(BufWriter::new).map(Some)?;
}
match change? {
DocumentChange::Deletion(deletion) => {
let docid = deletion.docid();
let current = deletion.current(rtxn, index, db_fields_ids_map)?;
if let Some(_geojson) = current.geojson_field()? {
let buf = Vec::new();
match &mut data_ref.spilled_removed {
Some(file) => {
file.write_u32::<BigEndian>(docid)?;
file.write_u32::<BigEndian>(buf.len() as u32)?;
file.write_all(&buf)?;
}
None => {
let mut bvec =
bumpalo::collections::Vec::new_in(context.extractor_alloc);
bvec.extend_from_slice(&buf);
data_ref.removed.push((docid, bvec.into_bump_slice()));
}
}
}
}
DocumentChange::Update(update) => {
let current = update.current(rtxn, index, db_fields_ids_map)?;
let docid = update.docid();
let current_geo = current.geojson_field()?;
let updated_geo =
update.merged(rtxn, index, db_fields_ids_map)?.geojson_field()?;
if current_geo.map(|c| c.get()) != updated_geo.map(|u| u.get()) {
// If the current and new geo points are different it means that
// we need to replace the current by the new point and therefore
// delete the current point from cellulite.
if let Some(_geojson) = current_geo {
let buf = Vec::new();
match &mut data_ref.spilled_removed {
Some(file) => {
file.write_u32::<BigEndian>(docid)?;
file.write_u32::<BigEndian>(buf.len() as u32)?;
file.write_all(&buf)?;
}
None => {
let mut bvec =
bumpalo::collections::Vec::new_in(context.extractor_alloc);
bvec.extend_from_slice(&buf);
data_ref.removed.push((docid, bvec.into_bump_slice()));
}
}
}
if let Some(geojson) = updated_geo {
let geojson =
GeoJson::from_str(geojson.get()).map_err(UserError::from)?;
let geometry = Geometry::try_from(geojson).map_err(UserError::from)?;
let buf = ZerometryCodec::bytes_encode(&geometry).unwrap();
match &mut data_ref.spilled_inserted {
Some(file) => {
file.write_u32::<BigEndian>(docid)?;
file.write_u32::<BigEndian>(buf.len() as u32)?;
file.write_all(&buf)?;
}
None => {
let mut bvec =
bumpalo::collections::Vec::new_in(context.extractor_alloc);
bvec.extend_from_slice(&buf);
data_ref.inserted.push((docid, bvec.into_bump_slice()));
}
}
}
}
}
DocumentChange::Insertion(insertion) => {
let docid = insertion.docid();
let inserted_geo = insertion.inserted().geojson_field()?;
if let Some(geojson) = inserted_geo {
let geojson = GeoJson::from_str(geojson.get()).map_err(UserError::from)?;
let geometry = Geometry::try_from(geojson).map_err(UserError::from)?;
let mut bytes = Vec::new();
Zerometry::write_from_geometry(&mut bytes, &geometry)?;
match &mut data_ref.spilled_inserted {
Some(file) => {
file.write_u32::<BigEndian>(docid)?;
file.write_u32::<BigEndian>(bytes.len() as u32)?;
file.write_all(&bytes)?;
}
None => {
let mut bvec =
bumpalo::collections::Vec::new_in(context.extractor_alloc);
bvec.extend_from_slice(&bytes);
data_ref.inserted.push((docid, bvec.into_bump_slice()));
}
}
}
}
}
}
Ok(())
}
}

View File

@@ -18,6 +18,8 @@ use crate::update::new::DocumentChange;
use crate::update::GrenadParameters;
use crate::{lat_lng_to_xyz, DocumentId, GeoPoint, Index, InternalError, Result};
pub mod cellulite;
pub struct GeoExtractor {
grenad_parameters: GrenadParameters,
}

View File

@@ -630,7 +630,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
settings_delta.try_for_each_fragment_diff(
session.embedder_name(),
|fragment_diff| {
|fragment_diff| -> Result<()> {
let extractor = RequestFragmentExtractor::new(fragment_diff.new, doc_alloc)
.ignore_errors();
let old = if full_reindex {

View File

@@ -24,7 +24,7 @@ pub trait Extractor<'extractor>: Sync {
fn process<'doc>(
&'doc self,
changes: impl Iterator<Item = Result<DocumentChange<'doc>>>,
context: &'doc DocumentContext<Self::Data>,
context: &'doc DocumentContext<'doc, 'extractor, '_, '_, Self::Data>,
) -> Result<()>;
}

View File

@@ -16,8 +16,10 @@ use super::settings_changes::settings_change_extract;
use crate::documents::{FieldIdMapper, PrimaryKey};
use crate::progress::{EmbedderStats, MergingWordCache};
use crate::proximity::ProximityPrecision;
use crate::update::new::extract::cellulite::GeoJsonExtractor;
use crate::update::new::extract::EmbeddingExtractor;
use crate::update::new::indexer::settings_changes::DocumentsIndentifiers;
use crate::update::new::merger::merge_and_send_cellulite;
use crate::update::new::merger::merge_and_send_rtree;
use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases};
use crate::update::settings::SettingsDelta;
@@ -317,6 +319,37 @@ where
&indexing_context.must_stop_processing,
)?;
}
'cellulite: {
let Some(extractor) =
GeoJsonExtractor::new(&rtxn, index, *indexing_context.grenad_parameters)?
else {
break 'cellulite;
};
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
{
let span = tracing::trace_span!(target: "indexing::documents::extract", "cellulite");
let _entered = span.enter();
extract(
document_changes,
&extractor,
indexing_context,
extractor_allocs,
&datastore,
IndexingStep::WritingGeoJson,
)?;
}
merge_and_send_cellulite(
datastore,
&rtxn,
index,
extractor_sender.geojson(),
&indexing_context.must_stop_processing,
)?;
}
indexing_context.progress.update_progress(IndexingStep::WaitingForDatabaseWrites);
finished_extraction.store(true, std::sync::atomic::Ordering::Relaxed);

View File

@@ -184,6 +184,10 @@ where
facet_field_ids_delta,
)?;
println!("Building geojson");
indexing_context.progress.update_progress(IndexingStep::BuildingGeoJson);
index.cellulite.build(wtxn, indexing_context.progress)?;
indexing_context.progress.update_progress(IndexingStep::Finalizing);
Ok(congestion) as Result<_>
@@ -313,6 +317,9 @@ where
})
.unwrap()?;
indexing_context.progress.update_progress(IndexingStep::BuildingGeoJson);
index.cellulite.build(wtxn, indexing_context.progress)?;
indexing_context.progress.update_progress(IndexingStep::Finalizing);
Ok(congestion) as Result<_>

View File

@@ -72,6 +72,17 @@ pub fn write_to_db(
let embedding = large_vector.read_embedding(*dimensions);
writer.add_item_in_store(wtxn, docid, extractor_id, embedding)?;
}
ReceiverAction::GeoJson(docid, geojson) => match geojson {
Some(geojson) => {
index
.cellulite
.add_raw_zerometry(wtxn, docid, &geojson)
.map_err(InternalError::CelluliteError)?;
}
None => {
index.cellulite.delete(wtxn, docid).map_err(InternalError::CelluliteError)?;
}
},
}
// Every time the is a message in the channel we search

View File

@@ -13,6 +13,7 @@ use super::extract::{
FacetKind, GeoExtractorData,
};
use crate::update::facet::new_incremental::FacetFieldIdChange;
use crate::update::new::extract::cellulite::GeoJsonExtractorData;
use crate::{CboRoaringBitmapCodec, FieldId, GeoPoint, Index, InternalError, Result};
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
@@ -62,6 +63,30 @@ where
Ok(())
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
pub fn merge_and_send_cellulite<'extractor, MSP>(
datastore: impl IntoIterator<Item = RefCell<GeoJsonExtractorData<'extractor>>>,
_rtxn: &RoTxn,
_index: &Index,
geojson_sender: GeoJsonSender<'_, '_>,
must_stop_processing: &MSP,
) -> Result<()>
where
MSP: Fn() -> bool + Sync,
{
for data in datastore {
if must_stop_processing() {
return Err(InternalError::AbortedIndexation.into());
}
let mut frozen = data.into_inner().freeze()?;
frozen.iter_and_clear_removed(geojson_sender)?;
frozen.iter_and_clear_inserted(geojson_sender)?;
}
Ok(())
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
pub fn merge_and_send_docids<'extractor, MSP, D>(
mut caches: Vec<BalancedCaches<'extractor>>,

View File

@@ -12,11 +12,13 @@ make_enum_progress! {
MergingWordCaches,
MergingWordProximity,
WritingGeoPoints,
WritingGeoJson,
WaitingForDatabaseWrites,
WaitingForExtractors,
WritingEmbeddingsToDatabase,
PostProcessingFacets,
PostProcessingWords,
BuildingGeoJson,
Finalizing,
}
}

View File

@@ -193,7 +193,7 @@ impl WordPrefixIntegerDocids {
// We access this HashMap in parallel to compute the *union* of all
// of them and *serialize* them into files. There is one file by CPU.
let local_entries = ThreadLocal::with_capacity(rayon::current_num_threads());
prefixes.into_par_iter().map(AsRef::as_ref).try_for_each(|prefix| {
prefixes.into_par_iter().map(AsRef::as_ref).try_for_each(|prefix| -> Result<()> {
let refcell = local_entries.get_or(|| {
let file = BufWriter::new(spooled_tempfile(
self.max_memory_by_thread.unwrap_or(usize::MAX),

View File

@@ -15,7 +15,7 @@ use super::del_add::{DelAdd, DelAddOperation};
use super::index_documents::{IndexDocumentsConfig, Transform};
use super::{ChatSettings, IndexerConfig};
use crate::attribute_patterns::PatternMatch;
use crate::constants::RESERVED_GEO_FIELD_NAME;
use crate::constants::{RESERVED_GEOJSON_FIELD_NAME, RESERVED_GEO_FIELD_NAME};
use crate::criterion::Criterion;
use crate::disabled_typos_terms::DisabledTyposTerms;
use crate::error::UserError::{self, InvalidChatSettingsDocumentTemplateMaxBytes};
@@ -1767,7 +1767,10 @@ impl InnerIndexSettingsDiff {
}
pub fn any_reindexing_needed(&self) -> bool {
self.reindex_searchable() || self.reindex_facets() || self.reindex_vectors()
self.reindex_searchable()
|| self.reindex_facets()
|| self.reindex_vectors()
|| self.reindex_geojson()
}
pub fn reindex_searchable(&self) -> bool {
@@ -1876,6 +1879,11 @@ impl InnerIndexSettingsDiff {
!self.embedding_config_updates.is_empty()
}
pub fn reindex_geojson(&self) -> bool {
self.old.filterable_attributes_rules.iter().any(|rule| rule.has_geojson())
!= self.new.filterable_attributes_rules.iter().any(|rule| rule.has_geojson())
}
pub fn settings_update_only(&self) -> bool {
self.settings_update_only
}
@@ -1884,6 +1892,11 @@ impl InnerIndexSettingsDiff {
self.old.geo_fields_ids != self.new.geo_fields_ids
|| (!self.settings_update_only && self.new.geo_fields_ids.is_some())
}
pub fn run_geojson_indexing(&self) -> bool {
self.old.geojson_fid != self.new.geojson_fid
|| (!self.settings_update_only && self.new.geojson_fid.is_some())
}
}
#[derive(Clone)]
@@ -1904,6 +1917,7 @@ pub(crate) struct InnerIndexSettings {
pub runtime_embedders: RuntimeEmbedders,
pub embedder_category_id: HashMap<String, u8>,
pub geo_fields_ids: Option<(FieldId, FieldId)>,
pub geojson_fid: Option<FieldId>,
pub prefix_search: PrefixSearch,
pub facet_search: bool,
}
@@ -1943,6 +1957,7 @@ impl InnerIndexSettings {
}
_ => None,
};
let geo_json_fid = fields_ids_map.id(RESERVED_GEOJSON_FIELD_NAME);
let localized_attributes_rules =
index.localized_attributes_rules(rtxn)?.unwrap_or_default();
let filterable_attributes_rules = index.filterable_attributes_rules(rtxn)?;
@@ -1971,6 +1986,7 @@ impl InnerIndexSettings {
runtime_embedders,
embedder_category_id,
geo_fields_ids,
geojson_fid: geo_json_fid,
prefix_search,
facet_search,
disabled_typos_terms,