From ebb4865b95cacfc1d4094bb3225296424898e620 Mon Sep 17 00:00:00 2001 From: Tamo Date: Thu, 24 Jul 2025 18:11:16 +0200 Subject: [PATCH] fix the cellulite spilling bug --- Cargo.lock | 3 +- .../tests/documents/geojson/mod.rs | 27 ++-- crates/meilisearch/tests/documents/mod.rs | 2 +- crates/milli/Cargo.toml | 5 +- crates/milli/src/fields_ids_map/metadata.rs | 4 +- crates/milli/src/progress.rs | 2 +- crates/milli/src/search/facet/filter.rs | 3 +- .../extract/extract_geo_points.rs | 4 +- .../src/update/index_documents/transform.rs | 27 ++-- .../src/update/index_documents/typed_chunk.rs | 3 +- crates/milli/src/update/new/channel.rs | 5 +- .../src/update/new/extract/geo/cellulite.rs | 149 +++++++++++------- .../update/new/indexer/document_changes.rs | 2 +- crates/milli/src/update/new/indexer/write.rs | 21 ++- crates/milli/src/update/new/merger.rs | 11 +- crates/milli/src/update/settings.rs | 5 +- 16 files changed, 157 insertions(+), 116 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 56f8f3b5f..de5968d88 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1065,8 +1065,6 @@ dependencies = [ [[package]] name = "cellulite" version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "377e81073db1dd0b0f0f297da35717fd799a77117668f2a6c48a3cabe0ef717e" dependencies = [ "geo", "geo-types", @@ -4214,6 +4212,7 @@ dependencies = [ "url", "utoipa", "uuid", + "zerometry", ] [[package]] diff --git a/crates/meilisearch/tests/documents/geojson/mod.rs b/crates/meilisearch/tests/documents/geojson/mod.rs index c898853f1..d00c78de8 100644 --- a/crates/meilisearch/tests/documents/geojson/mod.rs +++ b/crates/meilisearch/tests/documents/geojson/mod.rs @@ -7,7 +7,8 @@ const LILLE: &str = include_str!("assets/lille.geojson"); async fn basic_add_settings_and_geojson_documents() { let server = Server::new_shared(); let index = server.unique_index(); - let (task, _status_code) = index.update_settings(json!({"filterableAttributes": ["_geojson"]})).await; + let (task, _status_code) = + index.update_settings(json!({"filterableAttributes": ["_geojson"]})).await; server.wait_task(task.uid()).await.succeeded(); let (response, _) = index.search_get("?filter=_geoPolygon([0,0],[2,0],[2,2],[0,2])").await; @@ -113,7 +114,6 @@ async fn basic_add_settings_and_geojson_documents() { "#); } - #[actix_rt::test] async fn basic_add_geojson_documents_and_settings() { let server = Server::new_shared(); @@ -168,7 +168,8 @@ async fn basic_add_geojson_documents_and_settings() { } "#); - let (task, _status_code) = index.update_settings(json!({"filterableAttributes": ["_geojson"]})).await; + let (task, _status_code) = + index.update_settings(json!({"filterableAttributes": ["_geojson"]})).await; server.wait_task(task.uid()).await.succeeded(); let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[2,0],[2,2],[0,2])").await; snapshot!(response, @@ -212,14 +213,16 @@ async fn add_and_remove_geojson() { ]); let (task, _status_code) = index.add_documents(documents, None).await; server.wait_task(task.uid()).await.succeeded(); - let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[0.9,0],[0.9,0.9],[0,0.9])").await; + let (response, _code) = + index.search_get("?filter=_geoPolygon([0,0],[0.9,0],[0.9,0.9],[0,0.9])").await; assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 0); let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[2,0],[2,2],[0,2])").await; assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 1); let (task, _) = index.delete_document(0).await; server.wait_task(task.uid()).await.succeeded(); - let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[0.9,0],[0.9,0.9],[0,0.9])").await; + let (response, _code) = + index.search_get("?filter=_geoPolygon([0,0],[0.9,0],[0.9,0.9],[0,0.9])").await; assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 0); let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[2,0],[2,2],[0,2])").await; assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 0); @@ -233,13 +236,13 @@ async fn add_and_remove_geojson() { ]); let (task, _status_code) = index.add_documents(documents, None).await; server.wait_task(task.uid()).await.succeeded(); - let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[0.9,0],[0.9,0.9],[0,0.9])").await; + let (response, _code) = + index.search_get("?filter=_geoPolygon([0,0],[0.9,0],[0.9,0.9],[0,0.9])").await; assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 0); let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[2,0],[2,2],[0,2])").await; assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 1); } - #[actix_rt::test] async fn partial_update_geojson() { let server = Server::new_shared(); @@ -255,12 +258,12 @@ async fn partial_update_geojson() { ]); let (task, _status_code) = index.add_documents(documents, None).await; server.wait_task(task.uid()).await.succeeded(); - let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[0.9,0],[0.9,0.9],[0,0.9])").await; + let (response, _code) = + index.search_get("?filter=_geoPolygon([0,0],[0.9,0],[0.9,0.9],[0,0.9])").await; assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 0); let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[2,0],[2,2],[0,2])").await; assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 1); - let documents = json!([ { "id": 0, @@ -269,10 +272,12 @@ async fn partial_update_geojson() { ]); let (task, _status_code) = index.update_documents(documents, None).await; server.wait_task(task.uid()).await.succeeded(); - let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[0.9,0],[0.9,0.9],[0,0.9])").await; + let (response, _code) = + index.search_get("?filter=_geoPolygon([0,0],[0.9,0],[0.9,0.9],[0,0.9])").await; assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 1); let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[2,0],[2,2],[0,2])").await; assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 1); - let (response, _code) = index.search_get("?filter=_geoPolygon([0.9,0.9],[2,0.9],[2,2],[0.9,2])").await; + let (response, _code) = + index.search_get("?filter=_geoPolygon([0.9,0.9],[2,0.9],[2,2],[0.9,2])").await; assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 0); } diff --git a/crates/meilisearch/tests/documents/mod.rs b/crates/meilisearch/tests/documents/mod.rs index a0f974a37..7644d8a0e 100644 --- a/crates/meilisearch/tests/documents/mod.rs +++ b/crates/meilisearch/tests/documents/mod.rs @@ -1,6 +1,6 @@ mod add_documents; mod delete_documents; mod errors; +mod geojson; mod get_documents; mod update_documents; -mod geojson; \ No newline at end of file diff --git a/crates/milli/Cargo.toml b/crates/milli/Cargo.toml index 0115af5f3..1593d57db 100644 --- a/crates/milli/Cargo.toml +++ b/crates/milli/Cargo.toml @@ -18,8 +18,8 @@ bincode = "1.3.3" bstr = "1.12.0" bytemuck = { version = "1.23.1", features = ["extern_crate_alloc"] } byteorder = "1.5.0" -# cellulite = { path = "../../../cellulite" } -cellulite = "0.1.0" +cellulite = { path = "../../../cellulite" } +# cellulite = "0.1.0" steppe = "0.4.0" charabia = { version = "0.9.6", default-features = false } concat-arrays = "0.1.2" @@ -114,6 +114,7 @@ utoipa = { version = "5.4.0", features = [ ] } lru = "0.14.0" geo-types = "0.7.16" +zerometry = "0.1.0" [dev-dependencies] mimalloc = { version = "0.1.47", default-features = false } diff --git a/crates/milli/src/fields_ids_map/metadata.rs b/crates/milli/src/fields_ids_map/metadata.rs index 266cf209d..de7739d7f 100644 --- a/crates/milli/src/fields_ids_map/metadata.rs +++ b/crates/milli/src/fields_ids_map/metadata.rs @@ -6,7 +6,9 @@ use heed::RoTxn; use super::FieldsIdsMap; use crate::attribute_patterns::{match_field_legacy, PatternMatch}; -use crate::constants::{RESERVED_GEOJSON_FIELD_NAME, RESERVED_GEO_FIELD_NAME, RESERVED_VECTORS_FIELD_NAME}; +use crate::constants::{ + RESERVED_GEOJSON_FIELD_NAME, RESERVED_GEO_FIELD_NAME, RESERVED_VECTORS_FIELD_NAME, +}; use crate::{ is_faceted_by, FieldId, FilterableAttributesFeatures, FilterableAttributesRule, Index, LocalizedAttributesRule, Result, Weight, diff --git a/crates/milli/src/progress.rs b/crates/milli/src/progress.rs index 91682e321..a6bc1ec3e 100644 --- a/crates/milli/src/progress.rs +++ b/crates/milli/src/progress.rs @@ -340,4 +340,4 @@ impl Step for Compat { fn total(&self) -> u32 { self.0.total().try_into().unwrap_or(u32::MAX) } -} \ No newline at end of file +} diff --git a/crates/milli/src/search/facet/filter.rs b/crates/milli/src/search/facet/filter.rs index 374db87a5..c321143de 100644 --- a/crates/milli/src/search/facet/filter.rs +++ b/crates/milli/src/search/facet/filter.rs @@ -794,7 +794,8 @@ impl<'a> Filter<'a> { ), Vec::new(), ); - let result = index.cellulite + let result = index + .cellulite .in_shape(rtxn, &polygon.into(), &mut |_| ()) .map_err(InternalError::CelluliteError)?; // TODO: Remove once we update roaring diff --git a/crates/milli/src/update/index_documents/extract/extract_geo_points.rs b/crates/milli/src/update/index_documents/extract/extract_geo_points.rs index 60b1223cd..2ad994af0 100644 --- a/crates/milli/src/update/index_documents/extract/extract_geo_points.rs +++ b/crates/milli/src/update/index_documents/extract/extract_geo_points.rs @@ -179,8 +179,6 @@ fn extract_geojson_field( .map(|v| serde_json::from_slice(v).map_err(InternalError::SerdeJson)) .transpose()?) } - _ => { - Ok(None) - } + _ => Ok(None), } } diff --git a/crates/milli/src/update/index_documents/transform.rs b/crates/milli/src/update/index_documents/transform.rs index 70523a49d..aead24253 100644 --- a/crates/milli/src/update/index_documents/transform.rs +++ b/crates/milli/src/update/index_documents/transform.rs @@ -820,19 +820,20 @@ impl<'a, 'i> Transform<'a, 'i> { let documents_count = documents_ids.len() as usize; // We initialize the sorter with the user indexing settings. - let mut original_sorter = if settings_diff.reindex_vectors() || settings_diff.reindex_geojson() { - Some(create_sorter( - grenad::SortAlgorithm::Stable, - KeepFirst, - self.indexer_settings.chunk_compression_type, - self.indexer_settings.chunk_compression_level, - self.indexer_settings.max_nb_chunks, - self.indexer_settings.max_memory.map(|mem| mem / 2), - true, - )) - } else { - None - }; + let mut original_sorter = + if settings_diff.reindex_vectors() || settings_diff.reindex_geojson() { + Some(create_sorter( + grenad::SortAlgorithm::Stable, + KeepFirst, + self.indexer_settings.chunk_compression_type, + self.indexer_settings.chunk_compression_level, + self.indexer_settings.max_nb_chunks, + self.indexer_settings.max_memory.map(|mem| mem / 2), + true, + )) + } else { + None + }; let readers: BTreeMap<&str, (ArroyWrapper, &RoaringBitmap)> = settings_diff .embedding_config_updates diff --git a/crates/milli/src/update/index_documents/typed_chunk.rs b/crates/milli/src/update/index_documents/typed_chunk.rs index caa2de23a..338610abd 100644 --- a/crates/milli/src/update/index_documents/typed_chunk.rs +++ b/crates/milli/src/update/index_documents/typed_chunk.rs @@ -644,7 +644,8 @@ pub(crate) fn write_typed_chunk_into_index( let geojson = geojson::GeoJson::from_reader(value).map_err(UserError::SerdeJson)?; - index.cellulite + index + .cellulite .add(wtxn, docid, &geojson) .map_err(InternalError::CelluliteError)?; } diff --git a/crates/milli/src/update/new/channel.rs b/crates/milli/src/update/new/channel.rs index e1707fd93..09cdf8ee9 100644 --- a/crates/milli/src/update/new/channel.rs +++ b/crates/milli/src/update/new/channel.rs @@ -13,7 +13,6 @@ use bbqueue::framed::{FrameGrantR, FrameProducer}; use bbqueue::BBBuffer; use bytemuck::{checked, CheckedBitPattern, NoUninit}; use flume::{RecvTimeoutError, SendError}; -use geojson::GeoJson; use heed::types::Bytes; use heed::{BytesDecode, MdbError}; use memmap2::{Mmap, MmapMut}; @@ -144,7 +143,7 @@ pub enum ReceiverAction { // The geojson for france made of 63k points takes 594KiB which means with a capacity of 1000, // the absolute maximum amounts of memory we could consume is about 580MiB which is acceptable for this POC. // If the geojson is None, it means that the document is being deleted. - GeoJson(DocumentId, Option), + GeoJson(DocumentId, Option>), } /// An entry that cannot fit in the BBQueue buffers has been @@ -1155,7 +1154,7 @@ impl GeoSender<'_, '_> { pub struct GeoJsonSender<'a, 'b>(&'a ExtractorBbqueueSender<'b>); impl GeoJsonSender<'_, '_> { - pub fn send_geojson(&self, docid: DocumentId, value: GeoJson) -> StdResult<(), SendError<()>> { + pub fn send_geojson(&self, docid: DocumentId, value: Vec) -> StdResult<(), SendError<()>> { self.0.sender.send(ReceiverAction::GeoJson(docid, Some(value))).map_err(|_| SendError(())) } pub fn delete_geojson(&self, docid: DocumentId) -> StdResult<(), SendError<()>> { diff --git a/crates/milli/src/update/new/extract/geo/cellulite.rs b/crates/milli/src/update/new/extract/geo/cellulite.rs index 08b320bae..0294a5c0c 100644 --- a/crates/milli/src/update/new/extract/geo/cellulite.rs +++ b/crates/milli/src/update/new/extract/geo/cellulite.rs @@ -1,21 +1,25 @@ use std::cell::RefCell; use std::fs::File; -use std::io::{self, BufReader, BufWriter, ErrorKind, Seek as _, Write as _}; +use std::io::{self, BufReader, BufWriter, ErrorKind, Read, Seek as _, Write as _}; use std::str::FromStr; use std::{iter, mem}; use bumpalo::Bump; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; +use cellulite::zerometry::ZerometryCodec; +use geo_types::Geometry; use geojson::GeoJson; -use heed::RoTxn; +use heed::{BytesEncode, RoTxn}; +use zerometry::Zerometry; +use crate::update::new::channel::GeoJsonSender; use crate::update::new::document::{Document, DocumentContext}; use crate::update::new::indexer::document_changes::Extractor; use crate::update::new::ref_cell_ext::RefCellExt as _; use crate::update::new::thread_local::MostlySend; use crate::update::new::DocumentChange; use crate::update::GrenadParameters; -use crate::{DocumentId, Index, Result, UserError}; +use crate::{DocumentId, Index, InternalError, Result, UserError}; pub struct GeoJsonExtractor { grenad_parameters: GrenadParameters, @@ -38,8 +42,8 @@ impl GeoJsonExtractor { pub struct GeoJsonExtractorData<'extractor> { /// The set of documents ids that were removed. If a document sees its geo /// point being updated, we first put it in the deleted and then in the inserted. - removed: bumpalo::collections::Vec<'extractor, (DocumentId, GeoJson)>, - inserted: bumpalo::collections::Vec<'extractor, (DocumentId, GeoJson)>, + removed: bumpalo::collections::Vec<'extractor, (DocumentId, &'extractor [u8])>, + inserted: bumpalo::collections::Vec<'extractor, (DocumentId, &'extractor [u8])>, /// Contains a packed list of `ExtractedGeoPoint` of the inserted geo points /// data structures if we have spilled to disk. spilled_removed: Option>, @@ -68,39 +72,43 @@ impl<'extractor> GeoJsonExtractorData<'extractor> { unsafe impl MostlySend for GeoJsonExtractorData<'_> {} pub struct FrozenGeoJsonExtractorData<'extractor> { - pub removed: &'extractor [(DocumentId, GeoJson)], - pub inserted: &'extractor [(DocumentId, GeoJson)], + pub removed: &'extractor [(DocumentId, &'extractor [u8])], + pub inserted: &'extractor [(DocumentId, &'extractor [u8])], pub spilled_removed: Option>, pub spilled_inserted: Option>, } impl FrozenGeoJsonExtractorData<'_> { - pub fn iter_and_clear_removed( - &mut self, - ) -> io::Result> + '_> - { - Ok(mem::take(&mut self.removed) - .iter() - .cloned() - .map(Ok) - .chain(iterator_over_spilled_geojsons(&mut self.spilled_removed)?)) + pub fn iter_and_clear_removed(&mut self, channel: GeoJsonSender<'_, '_>) -> Result<()> { + for (docid, _buf) in mem::take(&mut self.removed) { + channel.delete_geojson(*docid).unwrap(); + } + + for ret in iterator_over_spilled_geojsons(&mut self.spilled_removed)? { + let (docid, _buf) = ret.map_err(InternalError::SerdeJson)?; + channel.delete_geojson(docid).unwrap(); + } + + Ok(()) } - pub fn iter_and_clear_inserted( - &mut self, - ) -> io::Result> + '_> - { - Ok(mem::take(&mut self.inserted) - .iter() - .cloned() - .map(Ok) - .chain(iterator_over_spilled_geojsons(&mut self.spilled_inserted)?)) + pub fn iter_and_clear_inserted(&mut self, channel: GeoJsonSender<'_, '_>) -> Result<()> { + for (docid, _buf) in mem::take(&mut self.inserted) { + channel.send_geojson(*docid, _buf.to_vec()).unwrap(); + } + + for ret in iterator_over_spilled_geojsons(&mut self.spilled_inserted)? { + let (docid, buf) = ret.map_err(InternalError::SerdeJson)?; + channel.send_geojson(docid, buf.to_vec()).unwrap(); + } + + Ok(()) } } fn iterator_over_spilled_geojsons( spilled: &mut Option>, -) -> io::Result> + '_> { +) -> io::Result), serde_json::Error>> + '_> { let mut spilled = spilled.take(); if let Some(spilled) = &mut spilled { spilled.rewind()?; @@ -113,10 +121,14 @@ fn iterator_over_spilled_geojsons( Err(e) if e.kind() == ErrorKind::UnexpectedEof => return None, Err(e) => return Some(Err(serde_json::Error::io(e))), }; - match GeoJson::from_reader(file) { - Ok(geojson) => Some(Ok((docid, geojson))), - Err(e) if e.is_eof() => None, - Err(e) => Some(Err(e)), + let size = match file.read_u32::() { + Ok(size) => size, + Err(e) => return Some(Err(serde_json::Error::io(e))), + }; + let mut buf = vec![0; size as usize]; + match file.read_exact(&mut buf) { + Ok(()) => Some(Ok((docid, buf))), + Err(e) => return Some(Err(serde_json::Error::io(e))), } } None => None, @@ -138,7 +150,7 @@ impl<'extractor> Extractor<'extractor> for GeoJsonExtractor { fn process<'doc>( &'doc self, changes: impl Iterator>>, - context: &'doc DocumentContext, + context: &'doc DocumentContext<'doc, 'extractor, '_, '_, Self::Data>, ) -> Result<()> { let rtxn = &context.rtxn; let index = context.index; @@ -161,15 +173,22 @@ impl<'extractor> Extractor<'extractor> for GeoJsonExtractor { let current = deletion.current(rtxn, index, db_fields_ids_map)?; if let Some(geojson) = current.geojson_field()? { + let geojson = GeoJson::from_str(geojson.get()).map_err(UserError::from)?; + let geometry = Geometry::try_from(geojson).map_err(UserError::from)?; + let buf = ZerometryCodec::bytes_encode(&geometry).unwrap(); + match &mut data_ref.spilled_removed { Some(file) => { file.write_u32::(docid)?; - file.write_all(geojson.get().as_bytes())?; + file.write_u32::(buf.len() as u32)?; + file.write_all(&buf)?; + } + None => { + let mut bvec = + bumpalo::collections::Vec::new_in(context.extractor_alloc); + bvec.extend_from_slice(&buf); + data_ref.removed.push((docid, bvec.into_bump_slice())); } - None => data_ref.removed.push( - // TODO: The error type is wrong here. It should be an internal error. - (docid, GeoJson::from_str(geojson.get()).map_err(UserError::from)?), - ), } } } @@ -187,50 +206,70 @@ impl<'extractor> Extractor<'extractor> for GeoJsonExtractor { // we need to replace the current by the new point and therefore // delete the current point from cellulite. if let Some(geojson) = current_geo { + let geojson = + GeoJson::from_str(geojson.get()).map_err(UserError::from)?; + let geometry = Geometry::try_from(geojson).map_err(UserError::from)?; + let buf = ZerometryCodec::bytes_encode(&geometry).unwrap(); + match &mut data_ref.spilled_removed { Some(file) => { file.write_u32::(docid)?; - file.write_all(geojson.get().as_bytes())?; + file.write_u32::(buf.len() as u32)?; + file.write_all(&buf)?; + } + None => { + let mut bvec = + bumpalo::collections::Vec::new_in(context.extractor_alloc); + bvec.extend_from_slice(&buf); + data_ref.removed.push((docid, bvec.into_bump_slice())); } - // TODO: Should be an internal error - None => data_ref.removed.push(( - docid, - GeoJson::from_str(geojson.get()).map_err(UserError::from)?, - )), } } if let Some(geojson) = updated_geo { + let geojson = + GeoJson::from_str(geojson.get()).map_err(UserError::from)?; + let geometry = Geometry::try_from(geojson).map_err(UserError::from)?; + let buf = ZerometryCodec::bytes_encode(&geometry).unwrap(); + match &mut data_ref.spilled_inserted { Some(file) => { file.write_u32::(docid)?; - file.write_all(geojson.get().as_bytes())?; + file.write_u32::(buf.len() as u32)?; + file.write_all(&buf)?; + } + None => { + let mut bvec = + bumpalo::collections::Vec::new_in(context.extractor_alloc); + bvec.extend_from_slice(&buf); + data_ref.inserted.push((docid, bvec.into_bump_slice())); } - // TODO: Is the error type correct here? Shouldn't it be an internal error? - None => data_ref.inserted.push(( - docid, - GeoJson::from_str(geojson.get()).map_err(UserError::from)?, - )), } } } } DocumentChange::Insertion(insertion) => { let docid = insertion.docid(); - let inserted_geo = insertion.inserted().geojson_field()?; if let Some(geojson) = inserted_geo { + let geojson = GeoJson::from_str(geojson.get()).map_err(UserError::from)?; + let geometry = Geometry::try_from(geojson).map_err(UserError::from)?; + let mut bytes = Vec::new(); + Zerometry::write_from_geometry(&mut bytes, &geometry)?; + match &mut data_ref.spilled_inserted { Some(file) => { file.write_u32::(docid)?; - file.write_all(geojson.get().as_bytes())?; + file.write_u32::(bytes.len() as u32)?; + file.write_all(&bytes)?; + } + None => { + let mut bvec = + bumpalo::collections::Vec::new_in(context.extractor_alloc); + bvec.extend_from_slice(&bytes); + data_ref.inserted.push((docid, bvec.into_bump_slice())); } - // TODO: Is the error type correct here? Shouldn't it be an internal error? - None => data_ref.inserted.push(( - docid, - GeoJson::from_str(geojson.get()).map_err(UserError::from)?, - )), } } } diff --git a/crates/milli/src/update/new/indexer/document_changes.rs b/crates/milli/src/update/new/indexer/document_changes.rs index c88751ee3..c2ecd292e 100644 --- a/crates/milli/src/update/new/indexer/document_changes.rs +++ b/crates/milli/src/update/new/indexer/document_changes.rs @@ -24,7 +24,7 @@ pub trait Extractor<'extractor>: Sync { fn process<'doc>( &'doc self, changes: impl Iterator>>, - context: &'doc DocumentContext, + context: &'doc DocumentContext<'doc, 'extractor, '_, '_, Self::Data>, ) -> Result<()>; } diff --git a/crates/milli/src/update/new/indexer/write.rs b/crates/milli/src/update/new/indexer/write.rs index f055f2318..fdd6f007c 100644 --- a/crates/milli/src/update/new/indexer/write.rs +++ b/crates/milli/src/update/new/indexer/write.rs @@ -72,18 +72,17 @@ pub fn write_to_db( let embedding = large_vector.read_embedding(*dimensions); writer.add_item_in_store(wtxn, docid, extractor_id, embedding)?; } - ReceiverAction::GeoJson(docid, geojson) => { - match geojson { - Some(geojson) => { - println!("Adding geojson {docid}"); - index.cellulite.add(wtxn, docid, &geojson).map_err(InternalError::CelluliteError)?; - } - None => { - println!("Deleting geojson {docid}"); - index.cellulite.delete(wtxn, docid).map_err(InternalError::CelluliteError)?; - } + ReceiverAction::GeoJson(docid, geojson) => match geojson { + Some(geojson) => { + index + .cellulite + .add_raw_zerometry(wtxn, docid, &geojson) + .map_err(InternalError::CelluliteError)?; } - } + None => { + index.cellulite.delete(wtxn, docid).map_err(InternalError::CelluliteError)?; + } + }, } // Every time the is a message in the channel we search diff --git a/crates/milli/src/update/new/merger.rs b/crates/milli/src/update/new/merger.rs index 08b9a3111..c5bf75056 100644 --- a/crates/milli/src/update/new/merger.rs +++ b/crates/milli/src/update/new/merger.rs @@ -80,15 +80,8 @@ where } let mut frozen = data.into_inner().freeze()?; - for result in frozen.iter_and_clear_removed()? { - let (docid, _) = result.map_err(InternalError::SerdeJson)?; - geojson_sender.delete_geojson(docid).unwrap(); - } - - for result in frozen.iter_and_clear_inserted()? { - let (docid, geojson) = result.map_err(InternalError::SerdeJson)?; - geojson_sender.send_geojson(docid, geojson).unwrap(); - } + frozen.iter_and_clear_removed(geojson_sender)?; + frozen.iter_and_clear_inserted(geojson_sender)?; } Ok(()) diff --git a/crates/milli/src/update/settings.rs b/crates/milli/src/update/settings.rs index 0b7be364e..a4d8b7203 100644 --- a/crates/milli/src/update/settings.rs +++ b/crates/milli/src/update/settings.rs @@ -1767,7 +1767,10 @@ impl InnerIndexSettingsDiff { } pub fn any_reindexing_needed(&self) -> bool { - self.reindex_searchable() || self.reindex_facets() || self.reindex_vectors() || self.reindex_geojson() + self.reindex_searchable() + || self.reindex_facets() + || self.reindex_vectors() + || self.reindex_geojson() } pub fn reindex_searchable(&self) -> bool {