From d8de09f23da5648e5e47aad433f9f05efa0bdf3a Mon Sep 17 00:00:00 2001 From: Mubelotix Date: Thu, 14 Aug 2025 13:47:23 +0200 Subject: [PATCH] Optimize points removed serialization --- .../src/update/new/extract/geo/cellulite.rs | 93 ++++++++----------- 1 file changed, 37 insertions(+), 56 deletions(-) diff --git a/crates/milli/src/update/new/extract/geo/cellulite.rs b/crates/milli/src/update/new/extract/geo/cellulite.rs index a09923590..57c883513 100644 --- a/crates/milli/src/update/new/extract/geo/cellulite.rs +++ b/crates/milli/src/update/new/extract/geo/cellulite.rs @@ -1,8 +1,8 @@ use std::cell::RefCell; use std::fs::File; -use std::io::{self, BufReader, BufWriter, ErrorKind, Read, Seek as _, Write as _}; +use std::io::{BufReader, BufWriter, ErrorKind, Read, Seek as _, Write as _}; +use std::mem; use std::str::FromStr; -use std::{iter, mem}; use bumpalo::Bump; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; @@ -42,7 +42,7 @@ impl GeoJsonExtractor { pub struct GeoJsonExtractorData<'extractor> { /// The set of documents ids that were removed. If a document sees its geo /// point being updated, we first put it in the deleted and then in the inserted. - removed: bumpalo::collections::Vec<'extractor, (DocumentId, &'extractor [u8])>, + removed: bumpalo::collections::Vec<'extractor, DocumentId>, inserted: bumpalo::collections::Vec<'extractor, (DocumentId, &'extractor [u8])>, /// Contains a packed list of `ExtractedGeoPoint` of the inserted geo points /// data structures if we have spilled to disk. @@ -72,7 +72,7 @@ impl<'extractor> GeoJsonExtractorData<'extractor> { unsafe impl MostlySend for GeoJsonExtractorData<'_> {} pub struct FrozenGeoJsonExtractorData<'extractor> { - pub removed: &'extractor [(DocumentId, &'extractor [u8])], + pub removed: &'extractor [DocumentId], pub inserted: &'extractor [(DocumentId, &'extractor [u8])], pub spilled_removed: Option>, pub spilled_inserted: Option>, @@ -80,13 +80,21 @@ pub struct FrozenGeoJsonExtractorData<'extractor> { impl FrozenGeoJsonExtractorData<'_> { pub fn iter_and_clear_removed(&mut self, channel: GeoJsonSender<'_, '_>) -> Result<()> { - for (docid, _buf) in mem::take(&mut self.removed) { + for docid in mem::take(&mut self.removed) { channel.delete_geojson(*docid).unwrap(); } - for ret in iterator_over_spilled_geojsons(&mut self.spilled_removed)? { - let (docid, _buf) = ret.map_err(InternalError::SerdeJson)?; - channel.delete_geojson(docid).unwrap(); + if let Some(mut spilled) = self.spilled_removed.take() { + spilled.rewind()?; + + loop { + let docid = match spilled.read_u32::() { + Ok(docid) => docid, + Err(e) if e.kind() == ErrorKind::UnexpectedEof => break, + Err(e) => return Err(InternalError::SerdeJson(serde_json::Error::io(e)).into()), + }; + channel.delete_geojson(docid).unwrap(); + } } Ok(()) @@ -97,44 +105,31 @@ impl FrozenGeoJsonExtractorData<'_> { channel.send_geojson(*docid, _buf.to_vec()).unwrap(); } - for ret in iterator_over_spilled_geojsons(&mut self.spilled_inserted)? { - let (docid, buf) = ret.map_err(InternalError::SerdeJson)?; - channel.send_geojson(docid, buf.to_vec()).unwrap(); + if let Some(mut spilled) = self.spilled_inserted.take() { + spilled.rewind()?; + + loop { + let docid = match spilled.read_u32::() { + Ok(docid) => docid, + Err(e) if e.kind() == ErrorKind::UnexpectedEof => break, + Err(e) => return Err(InternalError::SerdeJson(serde_json::Error::io(e)).into()), + }; + let size = match spilled.read_u32::() { + Ok(size) => size, + Err(e) => return Err(InternalError::SerdeJson(serde_json::Error::io(e)).into()), + }; + let mut buf = vec![0; size as usize]; + spilled + .read_exact(&mut buf) + .map_err(|e| InternalError::SerdeJson(serde_json::Error::io(e)))?; + channel.send_geojson(docid, buf).unwrap(); + } } Ok(()) } } -fn iterator_over_spilled_geojsons( - spilled: &mut Option>, -) -> io::Result), serde_json::Error>> + '_> { - let mut spilled = spilled.take(); - if let Some(spilled) = &mut spilled { - spilled.rewind()?; - } - - Ok(iter::from_fn(move || match &mut spilled { - Some(file) => { - let docid = match file.read_u32::() { - Ok(docid) => docid, - Err(e) if e.kind() == ErrorKind::UnexpectedEof => return None, - Err(e) => return Some(Err(serde_json::Error::io(e))), - }; - let size = match file.read_u32::() { - Ok(size) => size, - Err(e) => return Some(Err(serde_json::Error::io(e))), - }; - let mut buf = vec![0; size as usize]; - match file.read_exact(&mut buf) { - Ok(()) => Some(Ok((docid, buf))), - Err(e) => return Some(Err(serde_json::Error::io(e))), - } - } - None => None, - })) -} - impl<'extractor> Extractor<'extractor> for GeoJsonExtractor { type Data = RefCell>; @@ -173,19 +168,12 @@ impl<'extractor> Extractor<'extractor> for GeoJsonExtractor { let current = deletion.current(rtxn, index, db_fields_ids_map)?; if let Some(_geojson) = current.geojson_field()? { - let buf = Vec::new(); - match &mut data_ref.spilled_removed { Some(file) => { file.write_u32::(docid)?; - file.write_u32::(buf.len() as u32)?; - file.write_all(&buf)?; } None => { - let mut bvec = - bumpalo::collections::Vec::new_in(context.extractor_alloc); - bvec.extend_from_slice(&buf); - data_ref.removed.push((docid, bvec.into_bump_slice())); + data_ref.removed.push(docid); } } } @@ -204,19 +192,12 @@ impl<'extractor> Extractor<'extractor> for GeoJsonExtractor { // we need to replace the current by the new point and therefore // delete the current point from cellulite. if let Some(_geojson) = current_geo { - let buf = Vec::new(); - match &mut data_ref.spilled_removed { Some(file) => { file.write_u32::(docid)?; - file.write_u32::(buf.len() as u32)?; - file.write_all(&buf)?; } None => { - let mut bvec = - bumpalo::collections::Vec::new_in(context.extractor_alloc); - bvec.extend_from_slice(&buf); - data_ref.removed.push((docid, bvec.into_bump_slice())); + data_ref.removed.push(docid); } } }