add cellulite to the old pipeline, it probably doesn't works

This commit is contained in:
Tamo
2025-07-15 23:15:48 +02:00
parent 3dd4f0587d
commit 14a93d65a4
6 changed files with 148 additions and 3 deletions

View File

@ -47,6 +47,7 @@ impl<'t, 'i> ClearDocuments<'t, 'i> {
field_id_docid_facet_strings, field_id_docid_facet_strings,
vector_arroy, vector_arroy,
embedder_category_id: _, embedder_category_id: _,
cellulite,
documents, documents,
} = self.index; } = self.index;
@ -89,6 +90,7 @@ impl<'t, 'i> ClearDocuments<'t, 'i> {
field_id_docid_facet_strings.clear(self.wtxn)?; field_id_docid_facet_strings.clear(self.wtxn)?;
// vector // vector
vector_arroy.clear(self.wtxn)?; vector_arroy.clear(self.wtxn)?;
cellulite.clear(self.wtxn)?;
documents.clear(self.wtxn)?; documents.clear(self.wtxn)?;

View File

@ -2,6 +2,7 @@ use std::fs::File;
use std::io::{self, BufReader}; use std::io::{self, BufReader};
use concat_arrays::concat_arrays; use concat_arrays::concat_arrays;
use geojson::GeoJson;
use serde_json::Value; use serde_json::Value;
use super::helpers::{create_writer, writer_into_reader, GrenadParameters}; use super::helpers::{create_writer, writer_into_reader, GrenadParameters};
@ -107,3 +108,77 @@ fn extract_lat_lng(
None => Ok(None), None => Ok(None),
} }
} }
/// Extracts the geographical coordinates contained in each document under the `_geo` field.
///
/// Returns the generated grenad reader containing the docid as key associated to the (latitude, longitude)
#[tracing::instrument(level = "trace", skip_all, target = "indexing::extract")]
pub fn extract_geojson<R: io::Read + io::Seek>(
obkv_documents: grenad::Reader<R>,
indexer: GrenadParameters,
primary_key_id: FieldId,
settings_diff: &InnerIndexSettingsDiff,
) -> Result<grenad::Reader<BufReader<File>>> {
let mut writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
tracing::info!("Extracting one geojson");
let mut cursor = obkv_documents.into_cursor()?;
while let Some((docid_bytes, value)) = cursor.move_on_next()? {
let obkv = obkv::KvReader::from_slice(value);
// since we only need the primary key when we throw an error
// we create this getter to lazily get it when needed
let document_id = || -> Value {
let reader = KvReaderDelAdd::from_slice(obkv.get(primary_key_id).unwrap());
let document_id =
reader.get(DelAdd::Deletion).or(reader.get(DelAdd::Addition)).unwrap();
serde_json::from_slice(document_id).unwrap()
};
// extract old version
let del_geojson =
extract_geojson_field(obkv, &settings_diff.old, DelAdd::Deletion, document_id)?;
// extract new version
let add_geojson =
extract_geojson_field(obkv, &settings_diff.new, DelAdd::Addition, document_id)?;
if del_geojson != add_geojson {
let mut obkv = KvWriterDelAdd::memory();
if del_geojson.is_some() {
#[allow(clippy::drop_non_drop)]
// We don't need to store the geojson, we'll just delete it by id
obkv.insert(DelAdd::Deletion, [])?;
}
if let Some(geojson) = add_geojson {
#[allow(clippy::drop_non_drop)]
obkv.insert(DelAdd::Addition, geojson.to_string().as_bytes())?;
}
let bytes = obkv.into_inner()?;
writer.insert(docid_bytes, bytes)?;
}
}
writer_into_reader(writer)
}
fn extract_geojson_field(
obkv: &obkv::KvReader<FieldId>,
settings: &InnerIndexSettings,
deladd: DelAdd,
document_id: impl Fn() -> Value,
) -> Result<Option<GeoJson>> {
match settings.geojson_fid {
Some(fid) => {
let value = obkv.get(fid).map(KvReaderDelAdd::from_slice).and_then(|r| r.get(deladd));
// TODO: That's a user error, not an internal error
Ok(value
.map(|v| serde_json::from_slice(v).map_err(InternalError::SerdeJson))
.transpose()?)
}
None => Ok(None),
}
}

View File

@ -31,6 +31,7 @@ use self::extract_word_position_docids::extract_word_position_docids;
use super::helpers::{as_cloneable_grenad, CursorClonableMmap, GrenadParameters}; use super::helpers::{as_cloneable_grenad, CursorClonableMmap, GrenadParameters};
use super::{helpers, TypedChunk}; use super::{helpers, TypedChunk};
use crate::progress::EmbedderStats; use crate::progress::EmbedderStats;
use crate::update::index_documents::extract::extract_geo_points::extract_geojson;
use crate::update::index_documents::extract::extract_vector_points::extract_embeddings_from_fragments; use crate::update::index_documents::extract::extract_vector_points::extract_embeddings_from_fragments;
use crate::update::settings::InnerIndexSettingsDiff; use crate::update::settings::InnerIndexSettingsDiff;
use crate::vector::db::EmbedderInfo; use crate::vector::db::EmbedderInfo;
@ -62,6 +63,7 @@ pub(crate) fn data_from_obkv_documents(
original_documents_chunk, original_documents_chunk,
indexer, indexer,
lmdb_writer_sx.clone(), lmdb_writer_sx.clone(),
primary_key_id,
settings_diff.clone(), settings_diff.clone(),
embedder_info.clone(), embedder_info.clone(),
possible_embedding_mistakes.clone(), possible_embedding_mistakes.clone(),
@ -232,6 +234,7 @@ fn send_original_documents_data(
original_documents_chunk: Result<grenad::Reader<BufReader<File>>>, original_documents_chunk: Result<grenad::Reader<BufReader<File>>>,
indexer: GrenadParameters, indexer: GrenadParameters,
lmdb_writer_sx: Sender<Result<TypedChunk>>, lmdb_writer_sx: Sender<Result<TypedChunk>>,
primary_key_id: FieldId,
settings_diff: Arc<InnerIndexSettingsDiff>, settings_diff: Arc<InnerIndexSettingsDiff>,
embedder_info: Arc<Vec<(String, EmbedderInfo)>>, embedder_info: Arc<Vec<(String, EmbedderInfo)>>,
possible_embedding_mistakes: Arc<PossibleEmbeddingMistakes>, possible_embedding_mistakes: Arc<PossibleEmbeddingMistakes>,
@ -240,6 +243,22 @@ fn send_original_documents_data(
let original_documents_chunk = let original_documents_chunk =
original_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?; original_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?;
tracing::info!("Do we have a geojson");
if settings_diff.run_geojson_indexing() {
tracing::info!("Yes we do");
let documents_chunk_cloned = original_documents_chunk.clone();
let lmdb_writer_sx_cloned = lmdb_writer_sx.clone();
let settings_diff = settings_diff.clone();
rayon::spawn(move || {
let result =
extract_geojson(documents_chunk_cloned, indexer, primary_key_id, &settings_diff);
let _ = match result {
Ok(geojson) => lmdb_writer_sx_cloned.send(Ok(TypedChunk::GeoJson(geojson))),
Err(error) => lmdb_writer_sx_cloned.send(Err(error)),
};
});
}
let index_vectors = (settings_diff.reindex_vectors() || !settings_diff.settings_update_only()) let index_vectors = (settings_diff.reindex_vectors() || !settings_diff.settings_update_only())
// no point in indexing vectors without embedders // no point in indexing vectors without embedders
&& (!settings_diff.new.runtime_embedders.inner_as_ref().is_empty()); && (!settings_diff.new.runtime_embedders.inner_as_ref().is_empty());

View File

@ -522,7 +522,7 @@ where
.is_some_and(|conf| conf.is_quantized); .is_some_and(|conf| conf.is_quantized);
let is_quantizing = embedder_config.is_some_and(|action| action.is_being_quantized); let is_quantizing = embedder_config.is_some_and(|action| action.is_being_quantized);
pool.install(|| { pool.install(|| -> Result<()> {
let mut writer = ArroyWrapper::new(vector_arroy, embedder_index, was_quantized); let mut writer = ArroyWrapper::new(vector_arroy, embedder_index, was_quantized);
writer.build_and_quantize( writer.build_and_quantize(
wtxn, wtxn,

View File

@ -30,7 +30,7 @@ use crate::vector::db::{EmbeddingStatusDelta, IndexEmbeddingConfig};
use crate::vector::ArroyWrapper; use crate::vector::ArroyWrapper;
use crate::{ use crate::{
lat_lng_to_xyz, CboRoaringBitmapCodec, DocumentId, FieldId, GeoPoint, Index, InternalError, lat_lng_to_xyz, CboRoaringBitmapCodec, DocumentId, FieldId, GeoPoint, Index, InternalError,
Result, SerializationError, U8StrStrCodec, Result, SerializationError, U8StrStrCodec, UserError,
}; };
/// This struct accumulates and group the TypedChunks /// This struct accumulates and group the TypedChunks
@ -85,6 +85,7 @@ pub(crate) enum TypedChunk {
FieldIdFacetIsNullDocids(grenad::Reader<BufReader<File>>), FieldIdFacetIsNullDocids(grenad::Reader<BufReader<File>>),
FieldIdFacetIsEmptyDocids(grenad::Reader<BufReader<File>>), FieldIdFacetIsEmptyDocids(grenad::Reader<BufReader<File>>),
GeoPoints(grenad::Reader<BufReader<File>>), GeoPoints(grenad::Reader<BufReader<File>>),
GeoJson(grenad::Reader<BufReader<File>>),
VectorPoints { VectorPoints {
remove_vectors: grenad::Reader<BufReader<File>>, remove_vectors: grenad::Reader<BufReader<File>>,
// docid -> vector // docid -> vector
@ -614,6 +615,44 @@ pub(crate) fn write_typed_chunk_into_index(
index.put_geo_rtree(wtxn, &rtree)?; index.put_geo_rtree(wtxn, &rtree)?;
index.put_geo_faceted_documents_ids(wtxn, &geo_faceted_docids)?; index.put_geo_faceted_documents_ids(wtxn, &geo_faceted_docids)?;
} }
TypedChunk::GeoJson(_) => {
let span = tracing::trace_span!(target: "indexing::write_db", "geo_json");
let _entered = span.enter();
let mut builder = MergerBuilder::new(KeepFirst);
for typed_chunk in typed_chunks {
let TypedChunk::GeoJson(chunk) = typed_chunk else {
unreachable!();
};
builder.push(chunk.into_cursor()?);
}
let merger = builder.build();
let cellulite = index.cellulite;
let mut iter = merger.into_stream_merger_iter()?;
while let Some((key, value)) = iter.next()? {
// convert the key back to a u32 (4 bytes)
let docid = key.try_into().map(DocumentId::from_be_bytes).unwrap();
let deladd_obkv = KvReaderDelAdd::from_slice(value);
if let Some(_value) = deladd_obkv.get(DelAdd::Deletion) {
todo!("handle deletion");
// cellulite.remove(&docid);
}
if let Some(value) = deladd_obkv.get(DelAdd::Addition) {
tracing::info!("Adding one geojson to cellulite");
let geojson =
geojson::GeoJson::from_reader(value).map_err(UserError::SerdeJson)?;
let writer = cellulite::Writer::new(index.cellulite);
writer
.add_item(wtxn, docid, &geojson)
.map_err(InternalError::CelluliteError)?;
}
}
}
TypedChunk::VectorPoints { .. } => { TypedChunk::VectorPoints { .. } => {
let span = tracing::trace_span!(target: "indexing::write_db", "vector_points"); let span = tracing::trace_span!(target: "indexing::write_db", "vector_points");
let _entered = span.enter(); let _entered = span.enter();

View File

@ -15,7 +15,7 @@ use super::del_add::{DelAdd, DelAddOperation};
use super::index_documents::{IndexDocumentsConfig, Transform}; use super::index_documents::{IndexDocumentsConfig, Transform};
use super::{ChatSettings, IndexerConfig}; use super::{ChatSettings, IndexerConfig};
use crate::attribute_patterns::PatternMatch; use crate::attribute_patterns::PatternMatch;
use crate::constants::RESERVED_GEO_FIELD_NAME; use crate::constants::{RESERVED_GEOJSON_FIELD_NAME, RESERVED_GEO_FIELD_NAME};
use crate::criterion::Criterion; use crate::criterion::Criterion;
use crate::disabled_typos_terms::DisabledTyposTerms; use crate::disabled_typos_terms::DisabledTyposTerms;
use crate::error::UserError::{self, InvalidChatSettingsDocumentTemplateMaxBytes}; use crate::error::UserError::{self, InvalidChatSettingsDocumentTemplateMaxBytes};
@ -1884,6 +1884,13 @@ impl InnerIndexSettingsDiff {
self.old.geo_fields_ids != self.new.geo_fields_ids self.old.geo_fields_ids != self.new.geo_fields_ids
|| (!self.settings_update_only && self.new.geo_fields_ids.is_some()) || (!self.settings_update_only && self.new.geo_fields_ids.is_some())
} }
pub fn run_geojson_indexing(&self) -> bool {
tracing::info!("old.geojson_fid: {:?}", self.old.geojson_fid);
tracing::info!("new.geojson_fid: {:?}", self.new.geojson_fid);
self.old.geojson_fid != self.new.geojson_fid
|| (!self.settings_update_only && self.new.geojson_fid.is_some())
}
} }
#[derive(Clone)] #[derive(Clone)]
@ -1904,6 +1911,7 @@ pub(crate) struct InnerIndexSettings {
pub runtime_embedders: RuntimeEmbedders, pub runtime_embedders: RuntimeEmbedders,
pub embedder_category_id: HashMap<String, u8>, pub embedder_category_id: HashMap<String, u8>,
pub geo_fields_ids: Option<(FieldId, FieldId)>, pub geo_fields_ids: Option<(FieldId, FieldId)>,
pub geojson_fid: Option<FieldId>,
pub prefix_search: PrefixSearch, pub prefix_search: PrefixSearch,
pub facet_search: bool, pub facet_search: bool,
} }
@ -1943,6 +1951,7 @@ impl InnerIndexSettings {
} }
_ => None, _ => None,
}; };
let geo_json_fid = fields_ids_map.id(RESERVED_GEOJSON_FIELD_NAME);
let localized_attributes_rules = let localized_attributes_rules =
index.localized_attributes_rules(rtxn)?.unwrap_or_default(); index.localized_attributes_rules(rtxn)?.unwrap_or_default();
let filterable_attributes_rules = index.filterable_attributes_rules(rtxn)?; let filterable_attributes_rules = index.filterable_attributes_rules(rtxn)?;
@ -1971,6 +1980,7 @@ impl InnerIndexSettings {
runtime_embedders, runtime_embedders,
embedder_category_id, embedder_category_id,
geo_fields_ids, geo_fields_ids,
geojson_fid: geo_json_fid,
prefix_search, prefix_search,
facet_search, facet_search,
disabled_typos_terms, disabled_typos_terms,