fix the cellulite integration

This commit is contained in:
Tamo
2025-07-23 15:25:38 +02:00
parent 3a6505fcaa
commit 4af3659179
7 changed files with 839 additions and 4 deletions

View File

@ -0,0 +1,547 @@
{
"type": "Polygon",
"coordinates": [
[
[
3.11681,
50.63646
],
[
3.11945,
50.63488
],
[
3.12134,
50.63504
],
[
3.12064,
50.63127
],
[
3.12203,
50.62785
],
[
3.12389,
50.6262
],
[
3.12161,
50.62358
],
[
3.12547,
50.62114
],
[
3.12447,
50.61874
],
[
3.12288,
50.61988
],
[
3.12054,
50.61846
],
[
3.11846,
50.61754
],
[
3.11482,
50.6207
],
[
3.11232,
50.6188
],
[
3.10936,
50.61727
],
[
3.10822,
50.61765
],
[
3.10603,
50.61536
],
[
3.1041,
50.61596
],
[
3.10017,
50.6186
],
[
3.09688,
50.61714
],
[
3.09575,
50.61795
],
[
3.0891,
50.61532
],
[
3.08625,
50.61792
],
[
3.07948,
50.61428
],
[
3.07146,
50.6066
],
[
3.06819,
50.60918
],
[
3.06502,
50.61046
],
[
3.06223,
50.61223
],
[
3.05925,
50.60659
],
[
3.05463,
50.60077
],
[
3.04906,
50.6008
],
[
3.04726,
50.6035
],
[
3.04328,
50.60667
],
[
3.04155,
50.60417
],
[
3.03767,
50.60456
],
[
3.03528,
50.60538
],
[
3.03239,
50.60725
],
[
3.0254,
50.6111
],
[
3.02387,
50.6125
],
[
3.0248,
50.61344
],
[
3.02779,
50.61418
],
[
3.02414,
50.6169
],
[
3.02312,
50.61975
],
[
3.02172,
50.62082
],
[
3.01953,
50.62484
],
[
3.01811,
50.62529
],
[
3.01313,
50.62558
],
[
3.01385,
50.62695
],
[
3.00844,
50.62717
],
[
3.0056,
50.6267
],
[
3.00229,
50.62557
],
[
3.00119,
50.62723
],
[
2.99769,
50.62901
],
[
2.99391,
50.62732
],
[
2.98971,
50.63036
],
[
2.9862,
50.63328
],
[
2.98178,
50.63404
],
[
2.97917,
50.63499
],
[
2.97284,
50.63429
],
[
2.97174,
50.63365
],
[
2.97002,
50.63366
],
[
2.96956,
50.63506
],
[
2.97046,
50.6365
],
[
2.96878,
50.63833
],
[
2.97039,
50.6395
],
[
2.97275,
50.64183
],
[
2.97225,
50.64381
],
[
2.9745,
50.64442
],
[
2.97474,
50.64648
],
[
2.97091,
50.65108
],
[
2.96975,
50.65361
],
[
2.97061,
50.65513
],
[
2.96929,
50.65739
],
[
2.97072,
50.6581
],
[
2.97973,
50.66048
],
[
2.98369,
50.66123
],
[
2.9865,
50.65959
],
[
2.9896,
50.65845
],
[
2.9963,
50.65666
],
[
2.99903,
50.65552
],
[
3.00274,
50.65235
],
[
3.00714,
50.64887
],
[
3.01088,
50.64845
],
[
3.01318,
50.64541
],
[
3.01974,
50.63972
],
[
3.02317,
50.63813
],
[
3.02639,
50.63613
],
[
3.029,
50.63521
],
[
3.03414,
50.6382
],
[
3.03676,
50.63888
],
[
3.03686,
50.64147
],
[
3.03791,
50.64379
],
[
3.0409,
50.64577
],
[
3.04582,
50.64807
],
[
3.05132,
50.64866
],
[
3.05055,
50.64949
],
[
3.05244,
50.65055
],
[
3.05784,
50.64927
],
[
3.0596,
50.65105
],
[
3.06414,
50.65041
],
[
3.06705,
50.64936
],
[
3.07023,
50.64706
],
[
3.07203,
50.64355
],
[
3.07526,
50.64188
],
[
3.0758,
50.64453
],
[
3.07753,
50.64381
],
[
3.07861,
50.64542
],
[
3.08299,
50.64725
],
[
3.08046,
50.64912
],
[
3.08349,
50.65082
],
[
3.08354,
50.65155
],
[
3.08477,
50.65312
],
[
3.08542,
50.65654
],
[
3.08753,
50.65687
],
[
3.09032,
50.65602
],
[
3.09018,
50.65142
],
[
3.09278,
50.65086
],
[
3.09402,
50.64982
],
[
3.09908,
50.65146
],
[
3.10316,
50.65227
],
[
3.09726,
50.64723
],
[
3.09387,
50.64358
],
[
3.09357,
50.64095
],
[
3.09561,
50.64133
],
[
3.09675,
50.64018
],
[
3.09454,
50.63891
],
[
3.09627,
50.63693
],
[
3.09795,
50.63713
],
[
3.09919,
50.63576
],
[
3.10324,
50.6351
],
[
3.10613,
50.63532
],
[
3.10649,
50.63434
],
[
3.1109,
50.63525
],
[
3.11502,
50.63504
],
[
3.11681,
50.63646
]
]
]
}

View File

@ -0,0 +1,278 @@
use crate::{common::Server, json};
use meili_snap::{json_string, snapshot};
const LILLE: &str = include_str!("assets/lille.geojson");
#[actix_rt::test]
async fn basic_add_settings_and_geojson_documents() {
let server = Server::new_shared();
let index = server.unique_index();
let (task, _status_code) = index.update_settings(json!({"filterableAttributes": ["_geojson"]})).await;
server.wait_task(task.uid()).await.succeeded();
let (response, _) = index.search_get("?filter=_geoPolygon([0,0],[2,0],[2,2],[0,2])").await;
snapshot!(response,
@r#"
{
"hits": [],
"query": "",
"processingTimeMs": "[duration]",
"limit": 20,
"offset": 0,
"estimatedTotalHits": 0
}
"#);
let lille: serde_json::Value = serde_json::from_str(LILLE).unwrap();
let documents = json!([
{
"id": "missing",
},
{
"id": "point",
"_geojson": { "type": "Point", "coordinates": [1, 1] },
},
{
"id": "lille",
"_geojson": lille,
},
]);
let (task, _status_code) = index.add_documents(documents, None).await;
let response = server.wait_task(task.uid()).await.succeeded();
snapshot!(json_string!(response, { ".uid" => "[uid]", ".batchUid" => "[batch_uid]", ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" }),
@r#"
{
"uid": "[uid]",
"batchUid": "[batch_uid]",
"indexUid": "[uuid]",
"status": "succeeded",
"type": "documentAdditionOrUpdate",
"canceledBy": null,
"details": {
"receivedDocuments": 3,
"indexedDocuments": 3
},
"error": null,
"duration": "[duration]",
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"#);
let (response, code) = index.get_all_documents_raw("?ids=missing,point").await;
snapshot!(code, @"200 OK");
snapshot!(response,
@r#"
{
"results": [
{
"id": "missing"
},
{
"id": "point",
"_geojson": {
"type": "Point",
"coordinates": [
1,
1
]
}
}
],
"offset": 0,
"limit": 20,
"total": 2
}
"#);
let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[2,0],[2,2],[0,2])").await;
snapshot!(response,
@r#"
{
"hits": [
{
"id": "point",
"_geojson": {
"type": "Point",
"coordinates": [
1,
1
]
}
}
],
"query": "",
"processingTimeMs": "[duration]",
"limit": 20,
"offset": 0,
"estimatedTotalHits": 1
}
"#);
}
#[actix_rt::test]
async fn basic_add_geojson_documents_and_settings() {
let server = Server::new_shared();
let index = server.unique_index();
let lille: serde_json::Value = serde_json::from_str(LILLE).unwrap();
let documents = json!([
{
"id": "missing",
},
{
"id": "point",
"_geojson": { "type": "Point", "coordinates": [1, 1] },
},
{
"id": "lille",
"_geojson": lille,
},
]);
let (task, _status_code) = index.add_documents(documents, None).await;
let response = server.wait_task(task.uid()).await.succeeded();
snapshot!(response,
@r#"
{
"uid": "[uid]",
"batchUid": "[batch_uid]",
"indexUid": "[uuid]",
"status": "succeeded",
"type": "documentAdditionOrUpdate",
"canceledBy": null,
"details": {
"receivedDocuments": 3,
"indexedDocuments": 3
},
"error": null,
"duration": "[duration]",
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"#);
let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[2,0],[2,2],[0,2])").await;
snapshot!(response,
@r#"
{
"message": "Index `[uuid]`: Attribute `_geojson` is not filterable. This index does not have configured filterable attributes.\n14:15 _geoPolygon([0,0],[2,0],[2,2],[0,2])",
"code": "invalid_search_filter",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_search_filter"
}
"#);
let (task, _status_code) = index.update_settings(json!({"filterableAttributes": ["_geojson"]})).await;
server.wait_task(task.uid()).await.succeeded();
let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[2,0],[2,2],[0,2])").await;
snapshot!(response,
@r#"
{
"hits": [
{
"id": "point",
"_geojson": {
"type": "Point",
"coordinates": [
1,
1
]
}
}
],
"query": "",
"processingTimeMs": "[duration]",
"limit": 20,
"offset": 0,
"estimatedTotalHits": 1
}
"#);
}
#[actix_rt::test]
async fn add_and_remove_geojson() {
let server = Server::new_shared();
let index = server.unique_index();
index.update_settings(json!({"filterableAttributes": ["_geojson"]})).await;
let documents = json!([
{
"id": "missing",
},
{
"id": 0,
"_geojson": { "type": "Point", "coordinates": [1, 1] },
}
]);
let (task, _status_code) = index.add_documents(documents, None).await;
server.wait_task(task.uid()).await.succeeded();
let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[0.9,0],[0.9,0.9],[0,0.9])").await;
assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 0);
let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[2,0],[2,2],[0,2])").await;
assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 1);
let (task, _) = index.delete_document(0).await;
server.wait_task(task.uid()).await.succeeded();
let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[0.9,0],[0.9,0.9],[0,0.9])").await;
assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 0);
let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[2,0],[2,2],[0,2])").await;
assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 0);
// add it back
let documents = json!([
{
"id": 0,
"_geojson": { "type": "Point", "coordinates": [1, 1] },
}
]);
let (task, _status_code) = index.add_documents(documents, None).await;
server.wait_task(task.uid()).await.succeeded();
let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[0.9,0],[0.9,0.9],[0,0.9])").await;
assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 0);
let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[2,0],[2,2],[0,2])").await;
assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 1);
}
#[actix_rt::test]
async fn partial_update_geojson() {
let server = Server::new_shared();
let index = server.unique_index();
let (task, _) = index.update_settings(json!({"filterableAttributes": ["_geojson"]})).await;
server.wait_task(task.uid()).await.succeeded();
let documents = json!([
{
"id": 0,
"_geojson": { "type": "Point", "coordinates": [1, 1] },
}
]);
let (task, _status_code) = index.add_documents(documents, None).await;
server.wait_task(task.uid()).await.succeeded();
let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[0.9,0],[0.9,0.9],[0,0.9])").await;
assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 0);
let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[2,0],[2,2],[0,2])").await;
assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 1);
let documents = json!([
{
"id": 0,
"_geojson": { "type": "Point", "coordinates": [0.5, 0.5] },
}
]);
let (task, _status_code) = index.update_documents(documents, None).await;
server.wait_task(task.uid()).await.succeeded();
let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[0.9,0],[0.9,0.9],[0,0.9])").await;
assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 1);
let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[2,0],[2,2],[0,2])").await;
assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 1);
let (response, _code) = index.search_get("?filter=_geoPolygon([0.9,0.9],[2,0.9],[2,2],[0.9,2])").await;
assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 0);
}

View File

@ -3,3 +3,4 @@ mod delete_documents;
mod errors;
mod get_documents;
mod update_documents;
mod geojson;

View File

@ -5,6 +5,7 @@ use std::fmt;
use std::fs::File;
use std::path::Path;
use cellulite::Cellulite;
use deserr::Deserr;
use heed::types::*;
use heed::{CompactionOption, Database, DatabaseStat, RoTxn, RwTxn, Unspecified, WithoutTls};
@ -117,7 +118,7 @@ pub mod db_name {
pub const CELLULITE: &str = "cellulite";
pub const DOCUMENTS: &str = "documents";
}
const NUMBER_OF_DBS: u32 = 25;
const NUMBER_OF_DBS: u32 = 25 + Cellulite::nb_dbs();
#[derive(Clone)]
pub struct Index {
@ -184,7 +185,7 @@ pub struct Index {
pub vector_arroy: arroy::Database<Unspecified>,
/// Geo store based on cellulite™.
pub cellulite: cellulite::Cellulite,
pub cellulite: Cellulite,
/// Maps the document id to the document as an obkv store.
pub(crate) documents: Database<BEU32, ObkvCodec>,

View File

@ -163,8 +163,6 @@ where
indexing_context.progress.update_progress(IndexingStep::WritingEmbeddingsToDatabase);
index.cellulite.build(wtxn, indexing_context.progress)?;
pool.install(|| {
build_vectors(
index,
@ -186,6 +184,10 @@ where
facet_field_ids_delta,
)?;
println!("Building geojson");
indexing_context.progress.update_progress(IndexingStep::BuildingGeoJson);
index.cellulite.build(wtxn, indexing_context.progress)?;
indexing_context.progress.update_progress(IndexingStep::Finalizing);
Ok(congestion) as Result<_>
@ -315,6 +317,9 @@ where
})
.unwrap()?;
indexing_context.progress.update_progress(IndexingStep::BuildingGeoJson);
index.cellulite.build(wtxn, indexing_context.progress)?;
indexing_context.progress.update_progress(IndexingStep::Finalizing);
Ok(congestion) as Result<_>

View File

@ -75,9 +75,11 @@ pub fn write_to_db(
ReceiverAction::GeoJson(docid, geojson) => {
match geojson {
Some(geojson) => {
println!("Adding geojson {docid}");
index.cellulite.add(wtxn, docid, &geojson).map_err(InternalError::CelluliteError)?;
}
None => {
println!("Deleting geojson {docid}");
index.cellulite.delete(wtxn, docid).map_err(InternalError::CelluliteError)?;
}
}

View File

@ -18,6 +18,7 @@ make_enum_progress! {
WritingEmbeddingsToDatabase,
PostProcessingFacets,
PostProcessingWords,
BuildingGeoJson,
Finalizing,
}
}