From a921ee31ce4385c6e089f1cca96120591187d649 Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 15 Jul 2025 23:48:14 +0200 Subject: [PATCH] Cellulite is almost in the new indexer. We must add the documentID to the geojson pipeline --- crates/milli/src/update/new/channel.rs | 22 +++++++++++ .../milli/src/update/new/extract/geo/mod.rs | 2 +- .../milli/src/update/new/indexer/extract.rs | 33 ++++++++++++++++ crates/milli/src/update/new/indexer/write.rs | 4 ++ crates/milli/src/update/new/merger.rs | 38 +++++++++++++++++++ crates/milli/src/update/new/steps.rs | 1 + 6 files changed, 99 insertions(+), 1 deletion(-) diff --git a/crates/milli/src/update/new/channel.rs b/crates/milli/src/update/new/channel.rs index aec192ace..d85982679 100644 --- a/crates/milli/src/update/new/channel.rs +++ b/crates/milli/src/update/new/channel.rs @@ -13,6 +13,7 @@ use bbqueue::framed::{FrameGrantR, FrameProducer}; use bbqueue::BBBuffer; use bytemuck::{checked, CheckedBitPattern, NoUninit}; use flume::{RecvTimeoutError, SendError}; +use geojson::GeoJson; use heed::types::Bytes; use heed::{BytesDecode, MdbError}; use memmap2::{Mmap, MmapMut}; @@ -139,6 +140,7 @@ pub enum ReceiverAction { LargeEntry(LargeEntry), LargeVectors(LargeVectors), LargeVector(LargeVector), + GeoJson(GeoJson), } /// An entry that cannot fit in the BBQueue buffers has been @@ -463,6 +465,7 @@ pub enum Database { FieldIdDocidFacetStrings, FieldIdDocidFacetF64s, VectorEmbedderCategoryId, + Cellulite, } impl Database { @@ -485,6 +488,7 @@ impl Database { Database::FieldIdDocidFacetStrings => index.field_id_docid_facet_strings.remap_types(), Database::FieldIdDocidFacetF64s => index.field_id_docid_facet_f64s.remap_types(), Database::VectorEmbedderCategoryId => index.embedder_category_id.remap_types(), + Database::Cellulite => index.cellulite.remap_types(), } } @@ -507,6 +511,7 @@ impl Database { Database::FieldIdDocidFacetStrings => db_name::FIELD_ID_DOCID_FACET_STRINGS, Database::FieldIdDocidFacetF64s => db_name::FIELD_ID_DOCID_FACET_F64S, Database::VectorEmbedderCategoryId => db_name::VECTOR_EMBEDDER_CATEGORY_ID, + Database::Cellulite => db_name::CELLULITE, } } } @@ -548,6 +553,10 @@ impl<'b> ExtractorBbqueueSender<'b> { GeoSender(self) } + pub fn geojson<'a>(&'a self) -> GeoJsonSender<'a, 'b> { + GeoJsonSender(self) + } + fn delete_vector(&self, docid: DocumentId) -> crate::Result<()> { let max_grant = self.max_grant; let refcell = self.producers.get().unwrap(); @@ -1140,3 +1149,16 @@ impl GeoSender<'_, '_> { ) } } + + +#[derive(Clone, Copy)] +pub struct GeoJsonSender<'a, 'b>(&'a ExtractorBbqueueSender<'b>); + +impl GeoJsonSender<'_, '_> { + pub fn send_geojson(&self, value: GeoJson) -> StdResult<(), SendError<()>> { + self.0 + .sender + .send(ReceiverAction::GeoJson(value)) + .map_err(|_| SendError(())) + } +} diff --git a/crates/milli/src/update/new/extract/geo/mod.rs b/crates/milli/src/update/new/extract/geo/mod.rs index 9c0af1603..8600e6e4b 100644 --- a/crates/milli/src/update/new/extract/geo/mod.rs +++ b/crates/milli/src/update/new/extract/geo/mod.rs @@ -18,7 +18,7 @@ use crate::update::new::DocumentChange; use crate::update::GrenadParameters; use crate::{lat_lng_to_xyz, DocumentId, GeoPoint, Index, InternalError, Result}; -mod cellulite; +pub mod cellulite; pub struct GeoExtractor { grenad_parameters: GrenadParameters, diff --git a/crates/milli/src/update/new/indexer/extract.rs b/crates/milli/src/update/new/indexer/extract.rs index abfb4d6da..e360bdc4c 100644 --- a/crates/milli/src/update/new/indexer/extract.rs +++ b/crates/milli/src/update/new/indexer/extract.rs @@ -16,8 +16,10 @@ use super::settings_changes::settings_change_extract; use crate::documents::{FieldIdMapper, PrimaryKey}; use crate::progress::{EmbedderStats, MergingWordCache}; use crate::proximity::ProximityPrecision; +use crate::update::new::extract::cellulite::GeoJsonExtractor; use crate::update::new::extract::EmbeddingExtractor; use crate::update::new::indexer::settings_changes::DocumentsIndentifiers; +use crate::update::new::merger::merge_and_send_cellulite; use crate::update::new::merger::merge_and_send_rtree; use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases}; use crate::update::settings::SettingsDelta; @@ -317,6 +319,37 @@ where &indexing_context.must_stop_processing, )?; } + + + 'cellulite: { + let Some(extractor) = GeoJsonExtractor::new(&rtxn, index, *indexing_context.grenad_parameters)? + else { + break 'cellulite; + }; + let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); + + { + let span = tracing::trace_span!(target: "indexing::documents::extract", "cellulite"); + let _entered = span.enter(); + + extract( + document_changes, + &extractor, + indexing_context, + extractor_allocs, + &datastore, + IndexingStep::WritingGeoJson, + )?; + } + + merge_and_send_cellulite( + datastore, + &rtxn, + index, + extractor_sender.geojson(), + &indexing_context.must_stop_processing, + )?; + } indexing_context.progress.update_progress(IndexingStep::WaitingForDatabaseWrites); finished_extraction.store(true, std::sync::atomic::Ordering::Relaxed); diff --git a/crates/milli/src/update/new/indexer/write.rs b/crates/milli/src/update/new/indexer/write.rs index b8e3685f8..974385bf9 100644 --- a/crates/milli/src/update/new/indexer/write.rs +++ b/crates/milli/src/update/new/indexer/write.rs @@ -72,6 +72,10 @@ pub fn write_to_db( let embedding = large_vector.read_embedding(*dimensions); writer.add_item_in_store(wtxn, docid, extractor_id, embedding)?; } + ReceiverAction::GeoJson(geojson) => { + let cellulite = cellulite::Writer::new(index.cellulite); + cellulite.add_item(wtxn, doc_id, &geojson)?; + } } // Every time the is a message in the channel we search diff --git a/crates/milli/src/update/new/merger.rs b/crates/milli/src/update/new/merger.rs index 15f06c67d..90b0b205f 100644 --- a/crates/milli/src/update/new/merger.rs +++ b/crates/milli/src/update/new/merger.rs @@ -13,6 +13,7 @@ use super::extract::{ FacetKind, GeoExtractorData, }; use crate::update::facet::new_incremental::FacetFieldIdChange; +use crate::update::new::extract::cellulite::GeoJsonExtractorData; use crate::{CboRoaringBitmapCodec, FieldId, GeoPoint, Index, InternalError, Result}; #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] @@ -62,6 +63,43 @@ where Ok(()) } + +#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] +pub fn merge_and_send_cellulite<'extractor, MSP>( + datastore: impl IntoIterator>>, + rtxn: &RoTxn, + index: &Index, + geojson_sender: GeoJsonSender<'_, '_>, + must_stop_processing: &MSP, +) -> Result<()> +where + MSP: Fn() -> bool + Sync, +{ + let cellulite = cellulite::Writer::new(index.cellulite); + + for data in datastore { + if must_stop_processing() { + return Err(InternalError::AbortedIndexation.into()); + } + + let mut frozen = data.into_inner().freeze()?; + for result in frozen.iter_and_clear_removed()? { + let extracted_geo_point = result.map_err(InternalError::SerdeJson)?; + todo!("We must send the docid instead of the geojson"); + /* + let removed = cellulite.remove(&GeoJsonPoint::from(extracted_geo_point)); + debug_assert!(removed.is_some()); + */ + } + + for result in frozen.iter_and_clear_inserted()? { + geojson_sender.send_geojson(result.map_err(InternalError::SerdeJson)?).unwrap(); + } + } + + Ok(()) +} + #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] pub fn merge_and_send_docids<'extractor, MSP, D>( mut caches: Vec>, diff --git a/crates/milli/src/update/new/steps.rs b/crates/milli/src/update/new/steps.rs index eabf9104e..ecd6761be 100644 --- a/crates/milli/src/update/new/steps.rs +++ b/crates/milli/src/update/new/steps.rs @@ -12,6 +12,7 @@ make_enum_progress! { MergingWordCaches, MergingWordProximity, WritingGeoPoints, + WritingGeoJson, WaitingForDatabaseWrites, WaitingForExtractors, WritingEmbeddingsToDatabase,