Optimize points removed serialization

This commit is contained in:
Mubelotix
2025-08-14 13:47:23 +02:00
parent e7be4ca103
commit d8de09f23d

View File

@ -1,8 +1,8 @@
use std::cell::RefCell; use std::cell::RefCell;
use std::fs::File; use std::fs::File;
use std::io::{self, BufReader, BufWriter, ErrorKind, Read, Seek as _, Write as _}; use std::io::{BufReader, BufWriter, ErrorKind, Read, Seek as _, Write as _};
use std::mem;
use std::str::FromStr; use std::str::FromStr;
use std::{iter, mem};
use bumpalo::Bump; use bumpalo::Bump;
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
@ -42,7 +42,7 @@ impl GeoJsonExtractor {
pub struct GeoJsonExtractorData<'extractor> { pub struct GeoJsonExtractorData<'extractor> {
/// The set of documents ids that were removed. If a document sees its geo /// The set of documents ids that were removed. If a document sees its geo
/// point being updated, we first put it in the deleted and then in the inserted. /// point being updated, we first put it in the deleted and then in the inserted.
removed: bumpalo::collections::Vec<'extractor, (DocumentId, &'extractor [u8])>, removed: bumpalo::collections::Vec<'extractor, DocumentId>,
inserted: bumpalo::collections::Vec<'extractor, (DocumentId, &'extractor [u8])>, inserted: bumpalo::collections::Vec<'extractor, (DocumentId, &'extractor [u8])>,
/// Contains a packed list of `ExtractedGeoPoint` of the inserted geo points /// Contains a packed list of `ExtractedGeoPoint` of the inserted geo points
/// data structures if we have spilled to disk. /// data structures if we have spilled to disk.
@ -72,7 +72,7 @@ impl<'extractor> GeoJsonExtractorData<'extractor> {
unsafe impl MostlySend for GeoJsonExtractorData<'_> {} unsafe impl MostlySend for GeoJsonExtractorData<'_> {}
pub struct FrozenGeoJsonExtractorData<'extractor> { pub struct FrozenGeoJsonExtractorData<'extractor> {
pub removed: &'extractor [(DocumentId, &'extractor [u8])], pub removed: &'extractor [DocumentId],
pub inserted: &'extractor [(DocumentId, &'extractor [u8])], pub inserted: &'extractor [(DocumentId, &'extractor [u8])],
pub spilled_removed: Option<BufReader<File>>, pub spilled_removed: Option<BufReader<File>>,
pub spilled_inserted: Option<BufReader<File>>, pub spilled_inserted: Option<BufReader<File>>,
@ -80,13 +80,21 @@ pub struct FrozenGeoJsonExtractorData<'extractor> {
impl FrozenGeoJsonExtractorData<'_> { impl FrozenGeoJsonExtractorData<'_> {
pub fn iter_and_clear_removed(&mut self, channel: GeoJsonSender<'_, '_>) -> Result<()> { pub fn iter_and_clear_removed(&mut self, channel: GeoJsonSender<'_, '_>) -> Result<()> {
for (docid, _buf) in mem::take(&mut self.removed) { for docid in mem::take(&mut self.removed) {
channel.delete_geojson(*docid).unwrap(); channel.delete_geojson(*docid).unwrap();
} }
for ret in iterator_over_spilled_geojsons(&mut self.spilled_removed)? { if let Some(mut spilled) = self.spilled_removed.take() {
let (docid, _buf) = ret.map_err(InternalError::SerdeJson)?; spilled.rewind()?;
channel.delete_geojson(docid).unwrap();
loop {
let docid = match spilled.read_u32::<BigEndian>() {
Ok(docid) => docid,
Err(e) if e.kind() == ErrorKind::UnexpectedEof => break,
Err(e) => return Err(InternalError::SerdeJson(serde_json::Error::io(e)).into()),
};
channel.delete_geojson(docid).unwrap();
}
} }
Ok(()) Ok(())
@ -97,44 +105,31 @@ impl FrozenGeoJsonExtractorData<'_> {
channel.send_geojson(*docid, _buf.to_vec()).unwrap(); channel.send_geojson(*docid, _buf.to_vec()).unwrap();
} }
for ret in iterator_over_spilled_geojsons(&mut self.spilled_inserted)? { if let Some(mut spilled) = self.spilled_inserted.take() {
let (docid, buf) = ret.map_err(InternalError::SerdeJson)?; spilled.rewind()?;
channel.send_geojson(docid, buf.to_vec()).unwrap();
loop {
let docid = match spilled.read_u32::<BigEndian>() {
Ok(docid) => docid,
Err(e) if e.kind() == ErrorKind::UnexpectedEof => break,
Err(e) => return Err(InternalError::SerdeJson(serde_json::Error::io(e)).into()),
};
let size = match spilled.read_u32::<BigEndian>() {
Ok(size) => size,
Err(e) => return Err(InternalError::SerdeJson(serde_json::Error::io(e)).into()),
};
let mut buf = vec![0; size as usize];
spilled
.read_exact(&mut buf)
.map_err(|e| InternalError::SerdeJson(serde_json::Error::io(e)))?;
channel.send_geojson(docid, buf).unwrap();
}
} }
Ok(()) Ok(())
} }
} }
fn iterator_over_spilled_geojsons(
spilled: &mut Option<BufReader<File>>,
) -> io::Result<impl IntoIterator<Item = Result<(DocumentId, Vec<u8>), serde_json::Error>> + '_> {
let mut spilled = spilled.take();
if let Some(spilled) = &mut spilled {
spilled.rewind()?;
}
Ok(iter::from_fn(move || match &mut spilled {
Some(file) => {
let docid = match file.read_u32::<BigEndian>() {
Ok(docid) => docid,
Err(e) if e.kind() == ErrorKind::UnexpectedEof => return None,
Err(e) => return Some(Err(serde_json::Error::io(e))),
};
let size = match file.read_u32::<BigEndian>() {
Ok(size) => size,
Err(e) => return Some(Err(serde_json::Error::io(e))),
};
let mut buf = vec![0; size as usize];
match file.read_exact(&mut buf) {
Ok(()) => Some(Ok((docid, buf))),
Err(e) => return Some(Err(serde_json::Error::io(e))),
}
}
None => None,
}))
}
impl<'extractor> Extractor<'extractor> for GeoJsonExtractor { impl<'extractor> Extractor<'extractor> for GeoJsonExtractor {
type Data = RefCell<GeoJsonExtractorData<'extractor>>; type Data = RefCell<GeoJsonExtractorData<'extractor>>;
@ -173,19 +168,12 @@ impl<'extractor> Extractor<'extractor> for GeoJsonExtractor {
let current = deletion.current(rtxn, index, db_fields_ids_map)?; let current = deletion.current(rtxn, index, db_fields_ids_map)?;
if let Some(_geojson) = current.geojson_field()? { if let Some(_geojson) = current.geojson_field()? {
let buf = Vec::new();
match &mut data_ref.spilled_removed { match &mut data_ref.spilled_removed {
Some(file) => { Some(file) => {
file.write_u32::<BigEndian>(docid)?; file.write_u32::<BigEndian>(docid)?;
file.write_u32::<BigEndian>(buf.len() as u32)?;
file.write_all(&buf)?;
} }
None => { None => {
let mut bvec = data_ref.removed.push(docid);
bumpalo::collections::Vec::new_in(context.extractor_alloc);
bvec.extend_from_slice(&buf);
data_ref.removed.push((docid, bvec.into_bump_slice()));
} }
} }
} }
@ -204,19 +192,12 @@ impl<'extractor> Extractor<'extractor> for GeoJsonExtractor {
// we need to replace the current by the new point and therefore // we need to replace the current by the new point and therefore
// delete the current point from cellulite. // delete the current point from cellulite.
if let Some(_geojson) = current_geo { if let Some(_geojson) = current_geo {
let buf = Vec::new();
match &mut data_ref.spilled_removed { match &mut data_ref.spilled_removed {
Some(file) => { Some(file) => {
file.write_u32::<BigEndian>(docid)?; file.write_u32::<BigEndian>(docid)?;
file.write_u32::<BigEndian>(buf.len() as u32)?;
file.write_all(&buf)?;
} }
None => { None => {
let mut bvec = data_ref.removed.push(docid);
bumpalo::collections::Vec::new_in(context.extractor_alloc);
bvec.extend_from_slice(&buf);
data_ref.removed.push((docid, bvec.into_bump_slice()));
} }
} }
} }