diff --git a/crates/milli/src/update/new/channel.rs b/crates/milli/src/update/new/channel.rs index 09cdf8ee9..1fdeda840 100644 --- a/crates/milli/src/update/new/channel.rs +++ b/crates/milli/src/update/new/channel.rs @@ -139,11 +139,7 @@ pub enum ReceiverAction { LargeEntry(LargeEntry), LargeVectors(LargeVectors), LargeVector(LargeVector), - // TODO: I don't understand all the buffer stuff so I'm going to send all geojson one by one stored in RAM. - // The geojson for france made of 63k points takes 594KiB which means with a capacity of 1000, - // the absolute maximum amounts of memory we could consume is about 580MiB which is acceptable for this POC. - // If the geojson is None, it means that the document is being deleted. - GeoJson(DocumentId, Option>), + LargeGeoJson(LargeGeoJson), } /// An entry that cannot fit in the BBQueue buffers has been @@ -198,6 +194,14 @@ impl LargeVector { } } +#[derive(Debug)] +pub struct LargeGeoJson { + /// The document id associated to the large geojson. + pub docid: DocumentId, + /// The large geojson that must be written. + pub geojson: Mmap, +} + impl<'a> WriterBbqueueReceiver<'a> { /// Tries to receive an action to do until the timeout occurs /// and if it does, consider it as a spurious wake up. @@ -263,10 +267,12 @@ pub enum EntryHeader { ArroyDeleteVector(ArroyDeleteVector), ArroySetVectors(ArroySetVectors), ArroySetVector(ArroySetVector), + CelluliteItem(DocumentId), + CelluliteRemove(DocumentId), } impl EntryHeader { - const fn variant_size() -> usize { + pub const fn variant_size() -> usize { mem::size_of::() } @@ -276,6 +282,8 @@ impl EntryHeader { EntryHeader::ArroyDeleteVector(_) => 1, EntryHeader::ArroySetVectors(_) => 2, EntryHeader::ArroySetVector(_) => 3, + EntryHeader::CelluliteItem(_) => 4, + EntryHeader::CelluliteRemove(_) => 5, } } @@ -294,6 +302,14 @@ impl EntryHeader { Self::variant_size() + mem::size_of::() } + const fn total_cellulite_item_size(value_length: usize) -> usize { + Self::variant_size() + mem::size_of::() + value_length + } + + const fn total_cellulite_remove_size() -> usize { + Self::variant_size() + mem::size_of::() + } + /// The `dimensions` corresponds to the number of `f32` in the embedding. fn total_set_vectors_size(count: usize, dimensions: usize) -> usize { let embedding_size = dimensions * mem::size_of::(); @@ -311,6 +327,8 @@ impl EntryHeader { EntryHeader::ArroyDeleteVector(adv) => mem::size_of_val(adv), EntryHeader::ArroySetVectors(asvs) => mem::size_of_val(asvs), EntryHeader::ArroySetVector(asv) => mem::size_of_val(asv), + EntryHeader::CelluliteItem(docid) => mem::size_of_val(docid), + EntryHeader::CelluliteRemove(docid) => mem::size_of_val(docid), }; Self::variant_size() + payload_size } @@ -338,6 +356,16 @@ impl EntryHeader { let header = checked::pod_read_unaligned(header_bytes); EntryHeader::ArroySetVector(header) } + 4 => { + let header_bytes = &remaining[..mem::size_of::()]; + let header = checked::pod_read_unaligned(header_bytes); + EntryHeader::CelluliteItem(header) + } + 5 => { + let header_bytes = &remaining[..mem::size_of::()]; + let header = checked::pod_read_unaligned(header_bytes); + EntryHeader::CelluliteRemove(header) + } id => panic!("invalid variant id: {id}"), } } @@ -349,6 +377,8 @@ impl EntryHeader { EntryHeader::ArroyDeleteVector(adv) => bytemuck::bytes_of(adv), EntryHeader::ArroySetVectors(asvs) => bytemuck::bytes_of(asvs), EntryHeader::ArroySetVector(asv) => bytemuck::bytes_of(asv), + EntryHeader::CelluliteItem(docid) => bytemuck::bytes_of(docid), + EntryHeader::CelluliteRemove(docid) => bytemuck::bytes_of(docid), }; *first = self.variant_id(); remaining.copy_from_slice(payload_bytes); @@ -1154,10 +1184,67 @@ impl GeoSender<'_, '_> { pub struct GeoJsonSender<'a, 'b>(&'a ExtractorBbqueueSender<'b>); impl GeoJsonSender<'_, '_> { - pub fn send_geojson(&self, docid: DocumentId, value: Vec) -> StdResult<(), SendError<()>> { - self.0.sender.send(ReceiverAction::GeoJson(docid, Some(value))).map_err(|_| SendError(())) + pub fn send_geojson(&self, docid: DocumentId, value: Vec) -> crate::Result<()> { + let max_grant = self.0.max_grant; + let refcell = self.0.producers.get().unwrap(); + let mut producer = refcell.0.borrow_mut_or_yield(); + + let payload_header = EntryHeader::CelluliteItem(docid); + let value_length = value.len(); + let total_length = EntryHeader::total_cellulite_item_size(value_length); + if total_length > max_grant { + let mut value_file = tempfile::tempfile().map(BufWriter::new)?; + + let mut embedding_bytes = bytemuck::cast_slice(&value); + io::copy(&mut embedding_bytes, &mut value_file)?; + + let value_file = value_file.into_inner().map_err(|ie| ie.into_error())?; + let geojson = unsafe { Mmap::map(&value_file)? }; // Safe because the file is never modified + + let large_geojson = LargeGeoJson { docid, geojson }; + self.0.sender.send(ReceiverAction::LargeGeoJson(large_geojson)).unwrap(); + + return Ok(()); + } + + // Spin loop to have a frame the size we requested. + reserve_and_write_grant( + &mut producer, + total_length, + &self.0.sender, + &self.0.sent_messages_attempts, + &self.0.blocking_sent_messages_attempts, + |grant| { + let header_size = payload_header.header_size(); + let (header_bytes, remaining) = grant.split_at_mut(header_size); + payload_header.serialize_into(header_bytes); + remaining.copy_from_slice(&value); + Ok(()) + }, + )?; + + Ok(()) } - pub fn delete_geojson(&self, docid: DocumentId) -> StdResult<(), SendError<()>> { - self.0.sender.send(ReceiverAction::GeoJson(docid, None)).map_err(|_| SendError(())) + + pub fn delete_geojson(&self, docid: DocumentId) -> crate::Result<()> { + let refcell = self.0.producers.get().unwrap(); + let mut producer = refcell.0.borrow_mut_or_yield(); + + let payload_header = EntryHeader::CelluliteRemove(docid); + let total_length = EntryHeader::total_cellulite_remove_size(); + + reserve_and_write_grant( + &mut producer, + total_length, + &self.0.sender, + &self.0.sent_messages_attempts, + &self.0.blocking_sent_messages_attempts, + |grant| { + payload_header.serialize_into(grant); + Ok(()) + }, + )?; + + Ok(()) } } diff --git a/crates/milli/src/update/new/indexer/write.rs b/crates/milli/src/update/new/indexer/write.rs index fdd6f007c..ca11ce6f4 100644 --- a/crates/milli/src/update/new/indexer/write.rs +++ b/crates/milli/src/update/new/indexer/write.rs @@ -16,7 +16,7 @@ use crate::update::settings::InnerIndexSettings; use crate::vector::db::IndexEmbeddingConfig; use crate::vector::settings::EmbedderAction; use crate::vector::{ArroyWrapper, Embedder, Embeddings, RuntimeEmbedders}; -use crate::{Error, Index, InternalError, Result, UserError}; +use crate::{DocumentId, Error, Index, InternalError, Result, UserError}; pub fn write_to_db( mut writer_receiver: WriterBbqueueReceiver<'_>, @@ -72,17 +72,14 @@ pub fn write_to_db( let embedding = large_vector.read_embedding(*dimensions); writer.add_item_in_store(wtxn, docid, extractor_id, embedding)?; } - ReceiverAction::GeoJson(docid, geojson) => match geojson { - Some(geojson) => { - index - .cellulite - .add_raw_zerometry(wtxn, docid, &geojson) - .map_err(InternalError::CelluliteError)?; - } - None => { - index.cellulite.delete(wtxn, docid).map_err(InternalError::CelluliteError)?; - } - }, + ReceiverAction::LargeGeoJson(LargeGeoJson { docid, geojson }) => { + // It cannot be a deletion because it's large. Deletions are always small + let geojson: &[u8] = &geojson; + index + .cellulite + .add_raw_zerometry(wtxn, docid, geojson) + .map_err(InternalError::CelluliteError)?; + } } // Every time the is a message in the channel we search @@ -276,6 +273,19 @@ pub fn write_from_bbqueue( writer.add_item_in_store(wtxn, docid, extractor_id, embedding)?; } } + EntryHeader::CelluliteItem(docid) => { + let frame = frame_with_header.frame(); + let skip = EntryHeader::variant_size() + std::mem::size_of::(); + let geojson = &frame[skip..]; + + index + .cellulite + .add_raw_zerometry(wtxn, docid, geojson) + .map_err(InternalError::CelluliteError)?; + } + EntryHeader::CelluliteRemove(docid) => { + index.cellulite.delete(wtxn, docid).map_err(InternalError::CelluliteError)?; + } } }