From 44dc64accbe156e797f7dd1d2bc55de994074819 Mon Sep 17 00:00:00 2001 From: Tamo Date: Thu, 17 Jul 2025 18:54:37 +0200 Subject: [PATCH] add the deletion in the new indexer --- crates/milli/src/update/new/channel.rs | 11 +++++++++-- crates/milli/src/update/new/indexer/write.rs | 11 +++++++++-- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/crates/milli/src/update/new/channel.rs b/crates/milli/src/update/new/channel.rs index e41e380e3..ba36faf1c 100644 --- a/crates/milli/src/update/new/channel.rs +++ b/crates/milli/src/update/new/channel.rs @@ -140,7 +140,11 @@ pub enum ReceiverAction { LargeEntry(LargeEntry), LargeVectors(LargeVectors), LargeVector(LargeVector), - GeoJson(DocumentId, GeoJson), + // TODO: I don't understand all the buffer stuff so I'm going to send all geojson one by one stored in RAM. + // The geojson for france made of 63k points takes 594KiB which means with a capacity of 1000, + // the absolute maximum amounts of memory we could consume is about 580MiB which is acceptable for this POC. + // If the geojson is None, it means that the document is being deleted. + GeoJson(DocumentId, Option), } /// An entry that cannot fit in the BBQueue buffers has been @@ -1155,6 +1159,9 @@ pub struct GeoJsonSender<'a, 'b>(&'a ExtractorBbqueueSender<'b>); impl GeoJsonSender<'_, '_> { pub fn send_geojson(&self, docid: DocumentId, value: GeoJson) -> StdResult<(), SendError<()>> { - self.0.sender.send(ReceiverAction::GeoJson(docid, value)).map_err(|_| SendError(())) + self.0.sender.send(ReceiverAction::GeoJson(docid, Some(value))).map_err(|_| SendError(())) + } + pub fn delete_geojson(&self, docid: DocumentId) -> StdResult<(), SendError<()>> { + self.0.sender.send(ReceiverAction::GeoJson(docid, None)).map_err(|_| SendError(())) } } diff --git a/crates/milli/src/update/new/indexer/write.rs b/crates/milli/src/update/new/indexer/write.rs index 4a5f80688..6b5497f77 100644 --- a/crates/milli/src/update/new/indexer/write.rs +++ b/crates/milli/src/update/new/indexer/write.rs @@ -32,6 +32,7 @@ pub fn write_to_db( let _entered = span.enter(); let span = tracing::trace_span!(target: "indexing::write_db", "post_merge"); let mut _entered_post_merge = None; + let cellulite = cellulite::Cellulite::new(index.cellulite); while let Some(action) = writer_receiver.recv_action() { if _entered_post_merge.is_none() && finished_extraction.load(std::sync::atomic::Ordering::Relaxed) @@ -73,8 +74,14 @@ pub fn write_to_db( writer.add_item_in_store(wtxn, docid, extractor_id, embedding)?; } ReceiverAction::GeoJson(docid, geojson) => { - let cellulite = cellulite::Cellulite::new(index.cellulite); - cellulite.add(wtxn, docid, &geojson).map_err(InternalError::CelluliteError)?; + match geojson { + Some(geojson) => { + cellulite.add(wtxn, docid, &geojson).map_err(InternalError::CelluliteError)?; + } + None => { + cellulite.delete(wtxn, docid).map_err(InternalError::CelluliteError)?; + } + } } }