Optimize geojson channels

This commit is contained in:
Mubelotix
2025-08-18 17:09:38 +02:00
parent 5322aee018
commit 47f1e707d5
2 changed files with 119 additions and 22 deletions

View File

@ -139,11 +139,7 @@ pub enum ReceiverAction {
LargeEntry(LargeEntry),
LargeVectors(LargeVectors),
LargeVector(LargeVector),
// TODO: I don't understand all the buffer stuff so I'm going to send all geojson one by one stored in RAM.
// The geojson for france made of 63k points takes 594KiB which means with a capacity of 1000,
// the absolute maximum amounts of memory we could consume is about 580MiB which is acceptable for this POC.
// If the geojson is None, it means that the document is being deleted.
GeoJson(DocumentId, Option<Vec<u8>>),
LargeGeoJson(LargeGeoJson),
}
/// An entry that cannot fit in the BBQueue buffers has been
@ -198,6 +194,14 @@ impl LargeVector {
}
}
#[derive(Debug)]
pub struct LargeGeoJson {
/// The document id associated to the large geojson.
pub docid: DocumentId,
/// The large geojson that must be written.
pub geojson: Mmap,
}
impl<'a> WriterBbqueueReceiver<'a> {
/// Tries to receive an action to do until the timeout occurs
/// and if it does, consider it as a spurious wake up.
@ -263,10 +267,12 @@ pub enum EntryHeader {
ArroyDeleteVector(ArroyDeleteVector),
ArroySetVectors(ArroySetVectors),
ArroySetVector(ArroySetVector),
CelluliteItem(DocumentId),
CelluliteRemove(DocumentId),
}
impl EntryHeader {
const fn variant_size() -> usize {
pub const fn variant_size() -> usize {
mem::size_of::<u8>()
}
@ -276,6 +282,8 @@ impl EntryHeader {
EntryHeader::ArroyDeleteVector(_) => 1,
EntryHeader::ArroySetVectors(_) => 2,
EntryHeader::ArroySetVector(_) => 3,
EntryHeader::CelluliteItem(_) => 4,
EntryHeader::CelluliteRemove(_) => 5,
}
}
@ -294,6 +302,14 @@ impl EntryHeader {
Self::variant_size() + mem::size_of::<ArroyDeleteVector>()
}
const fn total_cellulite_item_size(value_length: usize) -> usize {
Self::variant_size() + mem::size_of::<DocumentId>() + value_length
}
const fn total_cellulite_remove_size() -> usize {
Self::variant_size() + mem::size_of::<DocumentId>()
}
/// The `dimensions` corresponds to the number of `f32` in the embedding.
fn total_set_vectors_size(count: usize, dimensions: usize) -> usize {
let embedding_size = dimensions * mem::size_of::<f32>();
@ -311,6 +327,8 @@ impl EntryHeader {
EntryHeader::ArroyDeleteVector(adv) => mem::size_of_val(adv),
EntryHeader::ArroySetVectors(asvs) => mem::size_of_val(asvs),
EntryHeader::ArroySetVector(asv) => mem::size_of_val(asv),
EntryHeader::CelluliteItem(docid) => mem::size_of_val(docid),
EntryHeader::CelluliteRemove(docid) => mem::size_of_val(docid),
};
Self::variant_size() + payload_size
}
@ -338,6 +356,16 @@ impl EntryHeader {
let header = checked::pod_read_unaligned(header_bytes);
EntryHeader::ArroySetVector(header)
}
4 => {
let header_bytes = &remaining[..mem::size_of::<DocumentId>()];
let header = checked::pod_read_unaligned(header_bytes);
EntryHeader::CelluliteItem(header)
}
5 => {
let header_bytes = &remaining[..mem::size_of::<DocumentId>()];
let header = checked::pod_read_unaligned(header_bytes);
EntryHeader::CelluliteRemove(header)
}
id => panic!("invalid variant id: {id}"),
}
}
@ -349,6 +377,8 @@ impl EntryHeader {
EntryHeader::ArroyDeleteVector(adv) => bytemuck::bytes_of(adv),
EntryHeader::ArroySetVectors(asvs) => bytemuck::bytes_of(asvs),
EntryHeader::ArroySetVector(asv) => bytemuck::bytes_of(asv),
EntryHeader::CelluliteItem(docid) => bytemuck::bytes_of(docid),
EntryHeader::CelluliteRemove(docid) => bytemuck::bytes_of(docid),
};
*first = self.variant_id();
remaining.copy_from_slice(payload_bytes);
@ -1154,10 +1184,67 @@ impl GeoSender<'_, '_> {
pub struct GeoJsonSender<'a, 'b>(&'a ExtractorBbqueueSender<'b>);
impl GeoJsonSender<'_, '_> {
pub fn send_geojson(&self, docid: DocumentId, value: Vec<u8>) -> StdResult<(), SendError<()>> {
self.0.sender.send(ReceiverAction::GeoJson(docid, Some(value))).map_err(|_| SendError(()))
pub fn send_geojson(&self, docid: DocumentId, value: Vec<u8>) -> crate::Result<()> {
let max_grant = self.0.max_grant;
let refcell = self.0.producers.get().unwrap();
let mut producer = refcell.0.borrow_mut_or_yield();
let payload_header = EntryHeader::CelluliteItem(docid);
let value_length = value.len();
let total_length = EntryHeader::total_cellulite_item_size(value_length);
if total_length > max_grant {
let mut value_file = tempfile::tempfile().map(BufWriter::new)?;
let mut embedding_bytes = bytemuck::cast_slice(&value);
io::copy(&mut embedding_bytes, &mut value_file)?;
let value_file = value_file.into_inner().map_err(|ie| ie.into_error())?;
let geojson = unsafe { Mmap::map(&value_file)? }; // Safe because the file is never modified
let large_geojson = LargeGeoJson { docid, geojson };
self.0.sender.send(ReceiverAction::LargeGeoJson(large_geojson)).unwrap();
return Ok(());
}
pub fn delete_geojson(&self, docid: DocumentId) -> StdResult<(), SendError<()>> {
self.0.sender.send(ReceiverAction::GeoJson(docid, None)).map_err(|_| SendError(()))
// Spin loop to have a frame the size we requested.
reserve_and_write_grant(
&mut producer,
total_length,
&self.0.sender,
&self.0.sent_messages_attempts,
&self.0.blocking_sent_messages_attempts,
|grant| {
let header_size = payload_header.header_size();
let (header_bytes, remaining) = grant.split_at_mut(header_size);
payload_header.serialize_into(header_bytes);
remaining.copy_from_slice(&value);
Ok(())
},
)?;
Ok(())
}
pub fn delete_geojson(&self, docid: DocumentId) -> crate::Result<()> {
let refcell = self.0.producers.get().unwrap();
let mut producer = refcell.0.borrow_mut_or_yield();
let payload_header = EntryHeader::CelluliteRemove(docid);
let total_length = EntryHeader::total_cellulite_remove_size();
reserve_and_write_grant(
&mut producer,
total_length,
&self.0.sender,
&self.0.sent_messages_attempts,
&self.0.blocking_sent_messages_attempts,
|grant| {
payload_header.serialize_into(grant);
Ok(())
},
)?;
Ok(())
}
}

View File

@ -16,7 +16,7 @@ use crate::update::settings::InnerIndexSettings;
use crate::vector::db::IndexEmbeddingConfig;
use crate::vector::settings::EmbedderAction;
use crate::vector::{ArroyWrapper, Embedder, Embeddings, RuntimeEmbedders};
use crate::{Error, Index, InternalError, Result, UserError};
use crate::{DocumentId, Error, Index, InternalError, Result, UserError};
pub fn write_to_db(
mut writer_receiver: WriterBbqueueReceiver<'_>,
@ -72,17 +72,14 @@ pub fn write_to_db(
let embedding = large_vector.read_embedding(*dimensions);
writer.add_item_in_store(wtxn, docid, extractor_id, embedding)?;
}
ReceiverAction::GeoJson(docid, geojson) => match geojson {
Some(geojson) => {
ReceiverAction::LargeGeoJson(LargeGeoJson { docid, geojson }) => {
// It cannot be a deletion because it's large. Deletions are always small
let geojson: &[u8] = &geojson;
index
.cellulite
.add_raw_zerometry(wtxn, docid, &geojson)
.add_raw_zerometry(wtxn, docid, geojson)
.map_err(InternalError::CelluliteError)?;
}
None => {
index.cellulite.delete(wtxn, docid).map_err(InternalError::CelluliteError)?;
}
},
}
// Every time the is a message in the channel we search
@ -276,6 +273,19 @@ pub fn write_from_bbqueue(
writer.add_item_in_store(wtxn, docid, extractor_id, embedding)?;
}
}
EntryHeader::CelluliteItem(docid) => {
let frame = frame_with_header.frame();
let skip = EntryHeader::variant_size() + std::mem::size_of::<DocumentId>();
let geojson = &frame[skip..];
index
.cellulite
.add_raw_zerometry(wtxn, docid, geojson)
.map_err(InternalError::CelluliteError)?;
}
EntryHeader::CelluliteRemove(docid) => {
index.cellulite.delete(wtxn, docid).map_err(InternalError::CelluliteError)?;
}
}
}