mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-09-06 04:36:32 +00:00
add the deletion in the new indexer
This commit is contained in:
@ -140,7 +140,11 @@ pub enum ReceiverAction {
|
|||||||
LargeEntry(LargeEntry),
|
LargeEntry(LargeEntry),
|
||||||
LargeVectors(LargeVectors),
|
LargeVectors(LargeVectors),
|
||||||
LargeVector(LargeVector),
|
LargeVector(LargeVector),
|
||||||
GeoJson(DocumentId, GeoJson),
|
// TODO: I don't understand all the buffer stuff so I'm going to send all geojson one by one stored in RAM.
|
||||||
|
// The geojson for france made of 63k points takes 594KiB which means with a capacity of 1000,
|
||||||
|
// the absolute maximum amounts of memory we could consume is about 580MiB which is acceptable for this POC.
|
||||||
|
// If the geojson is None, it means that the document is being deleted.
|
||||||
|
GeoJson(DocumentId, Option<GeoJson>),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// An entry that cannot fit in the BBQueue buffers has been
|
/// An entry that cannot fit in the BBQueue buffers has been
|
||||||
@ -1155,6 +1159,9 @@ pub struct GeoJsonSender<'a, 'b>(&'a ExtractorBbqueueSender<'b>);
|
|||||||
|
|
||||||
impl GeoJsonSender<'_, '_> {
|
impl GeoJsonSender<'_, '_> {
|
||||||
pub fn send_geojson(&self, docid: DocumentId, value: GeoJson) -> StdResult<(), SendError<()>> {
|
pub fn send_geojson(&self, docid: DocumentId, value: GeoJson) -> StdResult<(), SendError<()>> {
|
||||||
self.0.sender.send(ReceiverAction::GeoJson(docid, value)).map_err(|_| SendError(()))
|
self.0.sender.send(ReceiverAction::GeoJson(docid, Some(value))).map_err(|_| SendError(()))
|
||||||
|
}
|
||||||
|
pub fn delete_geojson(&self, docid: DocumentId) -> StdResult<(), SendError<()>> {
|
||||||
|
self.0.sender.send(ReceiverAction::GeoJson(docid, None)).map_err(|_| SendError(()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -32,6 +32,7 @@ pub fn write_to_db(
|
|||||||
let _entered = span.enter();
|
let _entered = span.enter();
|
||||||
let span = tracing::trace_span!(target: "indexing::write_db", "post_merge");
|
let span = tracing::trace_span!(target: "indexing::write_db", "post_merge");
|
||||||
let mut _entered_post_merge = None;
|
let mut _entered_post_merge = None;
|
||||||
|
let cellulite = cellulite::Cellulite::new(index.cellulite);
|
||||||
while let Some(action) = writer_receiver.recv_action() {
|
while let Some(action) = writer_receiver.recv_action() {
|
||||||
if _entered_post_merge.is_none()
|
if _entered_post_merge.is_none()
|
||||||
&& finished_extraction.load(std::sync::atomic::Ordering::Relaxed)
|
&& finished_extraction.load(std::sync::atomic::Ordering::Relaxed)
|
||||||
@ -73,9 +74,15 @@ pub fn write_to_db(
|
|||||||
writer.add_item_in_store(wtxn, docid, extractor_id, embedding)?;
|
writer.add_item_in_store(wtxn, docid, extractor_id, embedding)?;
|
||||||
}
|
}
|
||||||
ReceiverAction::GeoJson(docid, geojson) => {
|
ReceiverAction::GeoJson(docid, geojson) => {
|
||||||
let cellulite = cellulite::Cellulite::new(index.cellulite);
|
match geojson {
|
||||||
|
Some(geojson) => {
|
||||||
cellulite.add(wtxn, docid, &geojson).map_err(InternalError::CelluliteError)?;
|
cellulite.add(wtxn, docid, &geojson).map_err(InternalError::CelluliteError)?;
|
||||||
}
|
}
|
||||||
|
None => {
|
||||||
|
cellulite.delete(wtxn, docid).map_err(InternalError::CelluliteError)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Every time the is a message in the channel we search
|
// Every time the is a message in the channel we search
|
||||||
|
Reference in New Issue
Block a user