Cellulite is almost in the new indexer. We must add the documentID to the geojson pipeline

This commit is contained in:
Tamo
2025-07-15 23:48:14 +02:00
parent b00a1dcc00
commit a921ee31ce
6 changed files with 99 additions and 1 deletions

View File

@ -13,6 +13,7 @@ use bbqueue::framed::{FrameGrantR, FrameProducer};
use bbqueue::BBBuffer; use bbqueue::BBBuffer;
use bytemuck::{checked, CheckedBitPattern, NoUninit}; use bytemuck::{checked, CheckedBitPattern, NoUninit};
use flume::{RecvTimeoutError, SendError}; use flume::{RecvTimeoutError, SendError};
use geojson::GeoJson;
use heed::types::Bytes; use heed::types::Bytes;
use heed::{BytesDecode, MdbError}; use heed::{BytesDecode, MdbError};
use memmap2::{Mmap, MmapMut}; use memmap2::{Mmap, MmapMut};
@ -139,6 +140,7 @@ pub enum ReceiverAction {
LargeEntry(LargeEntry), LargeEntry(LargeEntry),
LargeVectors(LargeVectors), LargeVectors(LargeVectors),
LargeVector(LargeVector), LargeVector(LargeVector),
GeoJson(GeoJson),
} }
/// An entry that cannot fit in the BBQueue buffers has been /// An entry that cannot fit in the BBQueue buffers has been
@ -463,6 +465,7 @@ pub enum Database {
FieldIdDocidFacetStrings, FieldIdDocidFacetStrings,
FieldIdDocidFacetF64s, FieldIdDocidFacetF64s,
VectorEmbedderCategoryId, VectorEmbedderCategoryId,
Cellulite,
} }
impl Database { impl Database {
@ -485,6 +488,7 @@ impl Database {
Database::FieldIdDocidFacetStrings => index.field_id_docid_facet_strings.remap_types(), Database::FieldIdDocidFacetStrings => index.field_id_docid_facet_strings.remap_types(),
Database::FieldIdDocidFacetF64s => index.field_id_docid_facet_f64s.remap_types(), Database::FieldIdDocidFacetF64s => index.field_id_docid_facet_f64s.remap_types(),
Database::VectorEmbedderCategoryId => index.embedder_category_id.remap_types(), Database::VectorEmbedderCategoryId => index.embedder_category_id.remap_types(),
Database::Cellulite => index.cellulite.remap_types(),
} }
} }
@ -507,6 +511,7 @@ impl Database {
Database::FieldIdDocidFacetStrings => db_name::FIELD_ID_DOCID_FACET_STRINGS, Database::FieldIdDocidFacetStrings => db_name::FIELD_ID_DOCID_FACET_STRINGS,
Database::FieldIdDocidFacetF64s => db_name::FIELD_ID_DOCID_FACET_F64S, Database::FieldIdDocidFacetF64s => db_name::FIELD_ID_DOCID_FACET_F64S,
Database::VectorEmbedderCategoryId => db_name::VECTOR_EMBEDDER_CATEGORY_ID, Database::VectorEmbedderCategoryId => db_name::VECTOR_EMBEDDER_CATEGORY_ID,
Database::Cellulite => db_name::CELLULITE,
} }
} }
} }
@ -548,6 +553,10 @@ impl<'b> ExtractorBbqueueSender<'b> {
GeoSender(self) GeoSender(self)
} }
pub fn geojson<'a>(&'a self) -> GeoJsonSender<'a, 'b> {
GeoJsonSender(self)
}
fn delete_vector(&self, docid: DocumentId) -> crate::Result<()> { fn delete_vector(&self, docid: DocumentId) -> crate::Result<()> {
let max_grant = self.max_grant; let max_grant = self.max_grant;
let refcell = self.producers.get().unwrap(); let refcell = self.producers.get().unwrap();
@ -1140,3 +1149,16 @@ impl GeoSender<'_, '_> {
) )
} }
} }
#[derive(Clone, Copy)]
pub struct GeoJsonSender<'a, 'b>(&'a ExtractorBbqueueSender<'b>);
impl GeoJsonSender<'_, '_> {
pub fn send_geojson(&self, value: GeoJson) -> StdResult<(), SendError<()>> {
self.0
.sender
.send(ReceiverAction::GeoJson(value))
.map_err(|_| SendError(()))
}
}

View File

@ -18,7 +18,7 @@ use crate::update::new::DocumentChange;
use crate::update::GrenadParameters; use crate::update::GrenadParameters;
use crate::{lat_lng_to_xyz, DocumentId, GeoPoint, Index, InternalError, Result}; use crate::{lat_lng_to_xyz, DocumentId, GeoPoint, Index, InternalError, Result};
mod cellulite; pub mod cellulite;
pub struct GeoExtractor { pub struct GeoExtractor {
grenad_parameters: GrenadParameters, grenad_parameters: GrenadParameters,

View File

@ -16,8 +16,10 @@ use super::settings_changes::settings_change_extract;
use crate::documents::{FieldIdMapper, PrimaryKey}; use crate::documents::{FieldIdMapper, PrimaryKey};
use crate::progress::{EmbedderStats, MergingWordCache}; use crate::progress::{EmbedderStats, MergingWordCache};
use crate::proximity::ProximityPrecision; use crate::proximity::ProximityPrecision;
use crate::update::new::extract::cellulite::GeoJsonExtractor;
use crate::update::new::extract::EmbeddingExtractor; use crate::update::new::extract::EmbeddingExtractor;
use crate::update::new::indexer::settings_changes::DocumentsIndentifiers; use crate::update::new::indexer::settings_changes::DocumentsIndentifiers;
use crate::update::new::merger::merge_and_send_cellulite;
use crate::update::new::merger::merge_and_send_rtree; use crate::update::new::merger::merge_and_send_rtree;
use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases}; use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases};
use crate::update::settings::SettingsDelta; use crate::update::settings::SettingsDelta;
@ -317,6 +319,37 @@ where
&indexing_context.must_stop_processing, &indexing_context.must_stop_processing,
)?; )?;
} }
'cellulite: {
let Some(extractor) = GeoJsonExtractor::new(&rtxn, index, *indexing_context.grenad_parameters)?
else {
break 'cellulite;
};
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
{
let span = tracing::trace_span!(target: "indexing::documents::extract", "cellulite");
let _entered = span.enter();
extract(
document_changes,
&extractor,
indexing_context,
extractor_allocs,
&datastore,
IndexingStep::WritingGeoJson,
)?;
}
merge_and_send_cellulite(
datastore,
&rtxn,
index,
extractor_sender.geojson(),
&indexing_context.must_stop_processing,
)?;
}
indexing_context.progress.update_progress(IndexingStep::WaitingForDatabaseWrites); indexing_context.progress.update_progress(IndexingStep::WaitingForDatabaseWrites);
finished_extraction.store(true, std::sync::atomic::Ordering::Relaxed); finished_extraction.store(true, std::sync::atomic::Ordering::Relaxed);

View File

@ -72,6 +72,10 @@ pub fn write_to_db(
let embedding = large_vector.read_embedding(*dimensions); let embedding = large_vector.read_embedding(*dimensions);
writer.add_item_in_store(wtxn, docid, extractor_id, embedding)?; writer.add_item_in_store(wtxn, docid, extractor_id, embedding)?;
} }
ReceiverAction::GeoJson(geojson) => {
let cellulite = cellulite::Writer::new(index.cellulite);
cellulite.add_item(wtxn, doc_id, &geojson)?;
}
} }
// Every time the is a message in the channel we search // Every time the is a message in the channel we search

View File

@ -13,6 +13,7 @@ use super::extract::{
FacetKind, GeoExtractorData, FacetKind, GeoExtractorData,
}; };
use crate::update::facet::new_incremental::FacetFieldIdChange; use crate::update::facet::new_incremental::FacetFieldIdChange;
use crate::update::new::extract::cellulite::GeoJsonExtractorData;
use crate::{CboRoaringBitmapCodec, FieldId, GeoPoint, Index, InternalError, Result}; use crate::{CboRoaringBitmapCodec, FieldId, GeoPoint, Index, InternalError, Result};
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
@ -62,6 +63,43 @@ where
Ok(()) Ok(())
} }
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
pub fn merge_and_send_cellulite<'extractor, MSP>(
datastore: impl IntoIterator<Item = RefCell<GeoJsonExtractorData<'extractor>>>,
rtxn: &RoTxn,
index: &Index,
geojson_sender: GeoJsonSender<'_, '_>,
must_stop_processing: &MSP,
) -> Result<()>
where
MSP: Fn() -> bool + Sync,
{
let cellulite = cellulite::Writer::new(index.cellulite);
for data in datastore {
if must_stop_processing() {
return Err(InternalError::AbortedIndexation.into());
}
let mut frozen = data.into_inner().freeze()?;
for result in frozen.iter_and_clear_removed()? {
let extracted_geo_point = result.map_err(InternalError::SerdeJson)?;
todo!("We must send the docid instead of the geojson");
/*
let removed = cellulite.remove(&GeoJsonPoint::from(extracted_geo_point));
debug_assert!(removed.is_some());
*/
}
for result in frozen.iter_and_clear_inserted()? {
geojson_sender.send_geojson(result.map_err(InternalError::SerdeJson)?).unwrap();
}
}
Ok(())
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
pub fn merge_and_send_docids<'extractor, MSP, D>( pub fn merge_and_send_docids<'extractor, MSP, D>(
mut caches: Vec<BalancedCaches<'extractor>>, mut caches: Vec<BalancedCaches<'extractor>>,

View File

@ -12,6 +12,7 @@ make_enum_progress! {
MergingWordCaches, MergingWordCaches,
MergingWordProximity, MergingWordProximity,
WritingGeoPoints, WritingGeoPoints,
WritingGeoJson,
WaitingForDatabaseWrites, WaitingForDatabaseWrites,
WaitingForExtractors, WaitingForExtractors,
WritingEmbeddingsToDatabase, WritingEmbeddingsToDatabase,