diff --git a/crates/milli/src/update/clear_documents.rs b/crates/milli/src/update/clear_documents.rs index 01631e9a3..af733d25f 100644 --- a/crates/milli/src/update/clear_documents.rs +++ b/crates/milli/src/update/clear_documents.rs @@ -47,6 +47,7 @@ impl<'t, 'i> ClearDocuments<'t, 'i> { field_id_docid_facet_strings, vector_arroy, embedder_category_id: _, + cellulite, documents, } = self.index; @@ -89,6 +90,7 @@ impl<'t, 'i> ClearDocuments<'t, 'i> { field_id_docid_facet_strings.clear(self.wtxn)?; // vector vector_arroy.clear(self.wtxn)?; + cellulite.clear(self.wtxn)?; documents.clear(self.wtxn)?; diff --git a/crates/milli/src/update/index_documents/extract/extract_geo_points.rs b/crates/milli/src/update/index_documents/extract/extract_geo_points.rs index fb2ea9d77..44df98a39 100644 --- a/crates/milli/src/update/index_documents/extract/extract_geo_points.rs +++ b/crates/milli/src/update/index_documents/extract/extract_geo_points.rs @@ -2,6 +2,7 @@ use std::fs::File; use std::io::{self, BufReader}; use concat_arrays::concat_arrays; +use geojson::GeoJson; use serde_json::Value; use super::helpers::{create_writer, writer_into_reader, GrenadParameters}; @@ -107,3 +108,77 @@ fn extract_lat_lng( None => Ok(None), } } + +/// Extracts the geographical coordinates contained in each document under the `_geo` field. +/// +/// Returns the generated grenad reader containing the docid as key associated to the (latitude, longitude) +#[tracing::instrument(level = "trace", skip_all, target = "indexing::extract")] +pub fn extract_geojson( + obkv_documents: grenad::Reader, + indexer: GrenadParameters, + primary_key_id: FieldId, + settings_diff: &InnerIndexSettingsDiff, +) -> Result>> { + let mut writer = create_writer( + indexer.chunk_compression_type, + indexer.chunk_compression_level, + tempfile::tempfile()?, + ); + + tracing::info!("Extracting one geojson"); + + let mut cursor = obkv_documents.into_cursor()?; + while let Some((docid_bytes, value)) = cursor.move_on_next()? { + let obkv = obkv::KvReader::from_slice(value); + // since we only need the primary key when we throw an error + // we create this getter to lazily get it when needed + let document_id = || -> Value { + let reader = KvReaderDelAdd::from_slice(obkv.get(primary_key_id).unwrap()); + let document_id = + reader.get(DelAdd::Deletion).or(reader.get(DelAdd::Addition)).unwrap(); + serde_json::from_slice(document_id).unwrap() + }; + + // extract old version + let del_geojson = + extract_geojson_field(obkv, &settings_diff.old, DelAdd::Deletion, document_id)?; + // extract new version + let add_geojson = + extract_geojson_field(obkv, &settings_diff.new, DelAdd::Addition, document_id)?; + + if del_geojson != add_geojson { + let mut obkv = KvWriterDelAdd::memory(); + if del_geojson.is_some() { + #[allow(clippy::drop_non_drop)] + // We don't need to store the geojson, we'll just delete it by id + obkv.insert(DelAdd::Deletion, [])?; + } + if let Some(geojson) = add_geojson { + #[allow(clippy::drop_non_drop)] + obkv.insert(DelAdd::Addition, geojson.to_string().as_bytes())?; + } + let bytes = obkv.into_inner()?; + writer.insert(docid_bytes, bytes)?; + } + } + + writer_into_reader(writer) +} + +fn extract_geojson_field( + obkv: &obkv::KvReader, + settings: &InnerIndexSettings, + deladd: DelAdd, + document_id: impl Fn() -> Value, +) -> Result> { + match settings.geojson_fid { + Some(fid) => { + let value = obkv.get(fid).map(KvReaderDelAdd::from_slice).and_then(|r| r.get(deladd)); + // TODO: That's a user error, not an internal error + Ok(value + .map(|v| serde_json::from_slice(v).map_err(InternalError::SerdeJson)) + .transpose()?) + } + None => Ok(None), + } +} diff --git a/crates/milli/src/update/index_documents/extract/mod.rs b/crates/milli/src/update/index_documents/extract/mod.rs index b41fd59e1..e1c2f254c 100644 --- a/crates/milli/src/update/index_documents/extract/mod.rs +++ b/crates/milli/src/update/index_documents/extract/mod.rs @@ -31,6 +31,7 @@ use self::extract_word_position_docids::extract_word_position_docids; use super::helpers::{as_cloneable_grenad, CursorClonableMmap, GrenadParameters}; use super::{helpers, TypedChunk}; use crate::progress::EmbedderStats; +use crate::update::index_documents::extract::extract_geo_points::extract_geojson; use crate::update::index_documents::extract::extract_vector_points::extract_embeddings_from_fragments; use crate::update::settings::InnerIndexSettingsDiff; use crate::vector::db::EmbedderInfo; @@ -62,6 +63,7 @@ pub(crate) fn data_from_obkv_documents( original_documents_chunk, indexer, lmdb_writer_sx.clone(), + primary_key_id, settings_diff.clone(), embedder_info.clone(), possible_embedding_mistakes.clone(), @@ -232,6 +234,7 @@ fn send_original_documents_data( original_documents_chunk: Result>>, indexer: GrenadParameters, lmdb_writer_sx: Sender>, + primary_key_id: FieldId, settings_diff: Arc, embedder_info: Arc>, possible_embedding_mistakes: Arc, @@ -240,6 +243,22 @@ fn send_original_documents_data( let original_documents_chunk = original_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?; + tracing::info!("Do we have a geojson"); + if settings_diff.run_geojson_indexing() { + tracing::info!("Yes we do"); + let documents_chunk_cloned = original_documents_chunk.clone(); + let lmdb_writer_sx_cloned = lmdb_writer_sx.clone(); + let settings_diff = settings_diff.clone(); + rayon::spawn(move || { + let result = + extract_geojson(documents_chunk_cloned, indexer, primary_key_id, &settings_diff); + let _ = match result { + Ok(geojson) => lmdb_writer_sx_cloned.send(Ok(TypedChunk::GeoJson(geojson))), + Err(error) => lmdb_writer_sx_cloned.send(Err(error)), + }; + }); + } + let index_vectors = (settings_diff.reindex_vectors() || !settings_diff.settings_update_only()) // no point in indexing vectors without embedders && (!settings_diff.new.runtime_embedders.inner_as_ref().is_empty()); diff --git a/crates/milli/src/update/index_documents/mod.rs b/crates/milli/src/update/index_documents/mod.rs index 658ff1923..1e91dfe5a 100644 --- a/crates/milli/src/update/index_documents/mod.rs +++ b/crates/milli/src/update/index_documents/mod.rs @@ -522,7 +522,7 @@ where .is_some_and(|conf| conf.is_quantized); let is_quantizing = embedder_config.is_some_and(|action| action.is_being_quantized); - pool.install(|| { + pool.install(|| -> Result<()> { let mut writer = ArroyWrapper::new(vector_arroy, embedder_index, was_quantized); writer.build_and_quantize( wtxn, diff --git a/crates/milli/src/update/index_documents/typed_chunk.rs b/crates/milli/src/update/index_documents/typed_chunk.rs index c93e3e0f7..b3015fa94 100644 --- a/crates/milli/src/update/index_documents/typed_chunk.rs +++ b/crates/milli/src/update/index_documents/typed_chunk.rs @@ -30,7 +30,7 @@ use crate::vector::db::{EmbeddingStatusDelta, IndexEmbeddingConfig}; use crate::vector::ArroyWrapper; use crate::{ lat_lng_to_xyz, CboRoaringBitmapCodec, DocumentId, FieldId, GeoPoint, Index, InternalError, - Result, SerializationError, U8StrStrCodec, + Result, SerializationError, U8StrStrCodec, UserError, }; /// This struct accumulates and group the TypedChunks @@ -85,6 +85,7 @@ pub(crate) enum TypedChunk { FieldIdFacetIsNullDocids(grenad::Reader>), FieldIdFacetIsEmptyDocids(grenad::Reader>), GeoPoints(grenad::Reader>), + GeoJson(grenad::Reader>), VectorPoints { remove_vectors: grenad::Reader>, // docid -> vector @@ -614,6 +615,44 @@ pub(crate) fn write_typed_chunk_into_index( index.put_geo_rtree(wtxn, &rtree)?; index.put_geo_faceted_documents_ids(wtxn, &geo_faceted_docids)?; } + TypedChunk::GeoJson(_) => { + let span = tracing::trace_span!(target: "indexing::write_db", "geo_json"); + let _entered = span.enter(); + + let mut builder = MergerBuilder::new(KeepFirst); + for typed_chunk in typed_chunks { + let TypedChunk::GeoJson(chunk) = typed_chunk else { + unreachable!(); + }; + + builder.push(chunk.into_cursor()?); + } + let merger = builder.build(); + + let cellulite = index.cellulite; + + let mut iter = merger.into_stream_merger_iter()?; + while let Some((key, value)) = iter.next()? { + // convert the key back to a u32 (4 bytes) + let docid = key.try_into().map(DocumentId::from_be_bytes).unwrap(); + + let deladd_obkv = KvReaderDelAdd::from_slice(value); + if let Some(_value) = deladd_obkv.get(DelAdd::Deletion) { + todo!("handle deletion"); + // cellulite.remove(&docid); + } + if let Some(value) = deladd_obkv.get(DelAdd::Addition) { + tracing::info!("Adding one geojson to cellulite"); + + let geojson = + geojson::GeoJson::from_reader(value).map_err(UserError::SerdeJson)?; + let writer = cellulite::Writer::new(index.cellulite); + writer + .add_item(wtxn, docid, &geojson) + .map_err(InternalError::CelluliteError)?; + } + } + } TypedChunk::VectorPoints { .. } => { let span = tracing::trace_span!(target: "indexing::write_db", "vector_points"); let _entered = span.enter(); diff --git a/crates/milli/src/update/settings.rs b/crates/milli/src/update/settings.rs index 911f51865..9866f7147 100644 --- a/crates/milli/src/update/settings.rs +++ b/crates/milli/src/update/settings.rs @@ -15,7 +15,7 @@ use super::del_add::{DelAdd, DelAddOperation}; use super::index_documents::{IndexDocumentsConfig, Transform}; use super::{ChatSettings, IndexerConfig}; use crate::attribute_patterns::PatternMatch; -use crate::constants::RESERVED_GEO_FIELD_NAME; +use crate::constants::{RESERVED_GEOJSON_FIELD_NAME, RESERVED_GEO_FIELD_NAME}; use crate::criterion::Criterion; use crate::disabled_typos_terms::DisabledTyposTerms; use crate::error::UserError::{self, InvalidChatSettingsDocumentTemplateMaxBytes}; @@ -1884,6 +1884,13 @@ impl InnerIndexSettingsDiff { self.old.geo_fields_ids != self.new.geo_fields_ids || (!self.settings_update_only && self.new.geo_fields_ids.is_some()) } + + pub fn run_geojson_indexing(&self) -> bool { + tracing::info!("old.geojson_fid: {:?}", self.old.geojson_fid); + tracing::info!("new.geojson_fid: {:?}", self.new.geojson_fid); + self.old.geojson_fid != self.new.geojson_fid + || (!self.settings_update_only && self.new.geojson_fid.is_some()) + } } #[derive(Clone)] @@ -1904,6 +1911,7 @@ pub(crate) struct InnerIndexSettings { pub runtime_embedders: RuntimeEmbedders, pub embedder_category_id: HashMap, pub geo_fields_ids: Option<(FieldId, FieldId)>, + pub geojson_fid: Option, pub prefix_search: PrefixSearch, pub facet_search: bool, } @@ -1943,6 +1951,7 @@ impl InnerIndexSettings { } _ => None, }; + let geo_json_fid = fields_ids_map.id(RESERVED_GEOJSON_FIELD_NAME); let localized_attributes_rules = index.localized_attributes_rules(rtxn)?.unwrap_or_default(); let filterable_attributes_rules = index.filterable_attributes_rules(rtxn)?; @@ -1971,6 +1980,7 @@ impl InnerIndexSettings { runtime_embedders, embedder_category_id, geo_fields_ids, + geojson_fid: geo_json_fid, prefix_search, facet_search, disabled_typos_terms,