fix the cellulite spilling bug

This commit is contained in:
Tamo
2025-07-24 18:11:16 +02:00
parent 41a04aa3ab
commit 8670793e6e
16 changed files with 183 additions and 113 deletions

View File

@ -1,21 +1,25 @@
use std::cell::RefCell;
use std::fs::File;
use std::io::{self, BufReader, BufWriter, ErrorKind, Seek as _, Write as _};
use std::io::{self, BufReader, BufWriter, ErrorKind, Read, Seek as _, Write as _};
use std::str::FromStr;
use std::{iter, mem};
use bumpalo::Bump;
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use cellulite::zerometry::ZerometryCodec;
use geo_types::Geometry;
use geojson::GeoJson;
use heed::RoTxn;
use heed::{BytesEncode, RoTxn};
use zerometry::Zerometry;
use crate::update::new::channel::GeoJsonSender;
use crate::update::new::document::{Document, DocumentContext};
use crate::update::new::indexer::document_changes::Extractor;
use crate::update::new::ref_cell_ext::RefCellExt as _;
use crate::update::new::thread_local::MostlySend;
use crate::update::new::DocumentChange;
use crate::update::GrenadParameters;
use crate::{DocumentId, Index, Result, UserError};
use crate::{DocumentId, Index, InternalError, Result, UserError};
pub struct GeoJsonExtractor {
grenad_parameters: GrenadParameters,
@ -38,8 +42,8 @@ impl GeoJsonExtractor {
pub struct GeoJsonExtractorData<'extractor> {
/// The set of documents ids that were removed. If a document sees its geo
/// point being updated, we first put it in the deleted and then in the inserted.
removed: bumpalo::collections::Vec<'extractor, (DocumentId, GeoJson)>,
inserted: bumpalo::collections::Vec<'extractor, (DocumentId, GeoJson)>,
removed: bumpalo::collections::Vec<'extractor, (DocumentId, &'extractor [u8])>,
inserted: bumpalo::collections::Vec<'extractor, (DocumentId, &'extractor [u8])>,
/// Contains a packed list of `ExtractedGeoPoint` of the inserted geo points
/// data structures if we have spilled to disk.
spilled_removed: Option<BufWriter<File>>,
@ -68,39 +72,43 @@ impl<'extractor> GeoJsonExtractorData<'extractor> {
unsafe impl MostlySend for GeoJsonExtractorData<'_> {}
pub struct FrozenGeoJsonExtractorData<'extractor> {
pub removed: &'extractor [(DocumentId, GeoJson)],
pub inserted: &'extractor [(DocumentId, GeoJson)],
pub removed: &'extractor [(DocumentId, &'extractor [u8])],
pub inserted: &'extractor [(DocumentId, &'extractor [u8])],
pub spilled_removed: Option<BufReader<File>>,
pub spilled_inserted: Option<BufReader<File>>,
}
impl FrozenGeoJsonExtractorData<'_> {
pub fn iter_and_clear_removed(
&mut self,
) -> io::Result<impl IntoIterator<Item = Result<(DocumentId, GeoJson), serde_json::Error>> + '_>
{
Ok(mem::take(&mut self.removed)
.iter()
.cloned()
.map(Ok)
.chain(iterator_over_spilled_geojsons(&mut self.spilled_removed)?))
pub fn iter_and_clear_removed(&mut self, channel: GeoJsonSender<'_, '_>) -> Result<()> {
for (docid, _buf) in mem::take(&mut self.removed) {
channel.delete_geojson(*docid).unwrap();
}
for ret in iterator_over_spilled_geojsons(&mut self.spilled_removed)? {
let (docid, _buf) = ret.map_err(InternalError::SerdeJson)?;
channel.delete_geojson(docid).unwrap();
}
Ok(())
}
pub fn iter_and_clear_inserted(
&mut self,
) -> io::Result<impl IntoIterator<Item = Result<(DocumentId, GeoJson), serde_json::Error>> + '_>
{
Ok(mem::take(&mut self.inserted)
.iter()
.cloned()
.map(Ok)
.chain(iterator_over_spilled_geojsons(&mut self.spilled_inserted)?))
pub fn iter_and_clear_inserted(&mut self, channel: GeoJsonSender<'_, '_>) -> Result<()> {
for (docid, _buf) in mem::take(&mut self.inserted) {
channel.send_geojson(*docid, _buf.to_vec()).unwrap();
}
for ret in iterator_over_spilled_geojsons(&mut self.spilled_inserted)? {
let (docid, buf) = ret.map_err(InternalError::SerdeJson)?;
channel.send_geojson(docid, buf.to_vec()).unwrap();
}
Ok(())
}
}
fn iterator_over_spilled_geojsons(
spilled: &mut Option<BufReader<File>>,
) -> io::Result<impl IntoIterator<Item = Result<(DocumentId, GeoJson), serde_json::Error>> + '_> {
) -> io::Result<impl IntoIterator<Item = Result<(DocumentId, Vec<u8>), serde_json::Error>> + '_> {
let mut spilled = spilled.take();
if let Some(spilled) = &mut spilled {
spilled.rewind()?;
@ -113,10 +121,14 @@ fn iterator_over_spilled_geojsons(
Err(e) if e.kind() == ErrorKind::UnexpectedEof => return None,
Err(e) => return Some(Err(serde_json::Error::io(e))),
};
match GeoJson::from_reader(file) {
Ok(geojson) => Some(Ok((docid, geojson))),
Err(e) if e.is_eof() => None,
Err(e) => Some(Err(e)),
let size = match file.read_u32::<BigEndian>() {
Ok(size) => size,
Err(e) => return Some(Err(serde_json::Error::io(e))),
};
let mut buf = vec![0; size as usize];
match file.read_exact(&mut buf) {
Ok(()) => Some(Ok((docid, buf))),
Err(e) => return Some(Err(serde_json::Error::io(e))),
}
}
None => None,
@ -138,7 +150,7 @@ impl<'extractor> Extractor<'extractor> for GeoJsonExtractor {
fn process<'doc>(
&'doc self,
changes: impl Iterator<Item = Result<DocumentChange<'doc>>>,
context: &'doc DocumentContext<Self::Data>,
context: &'doc DocumentContext<'doc, 'extractor, '_, '_, Self::Data>,
) -> Result<()> {
let rtxn = &context.rtxn;
let index = context.index;
@ -161,15 +173,22 @@ impl<'extractor> Extractor<'extractor> for GeoJsonExtractor {
let current = deletion.current(rtxn, index, db_fields_ids_map)?;
if let Some(geojson) = current.geojson_field()? {
let geojson = GeoJson::from_str(geojson.get()).map_err(UserError::from)?;
let geometry = Geometry::try_from(geojson).map_err(UserError::from)?;
let buf = ZerometryCodec::bytes_encode(&geometry).unwrap();
match &mut data_ref.spilled_removed {
Some(file) => {
file.write_u32::<BigEndian>(docid)?;
file.write_all(geojson.get().as_bytes())?;
file.write_u32::<BigEndian>(buf.len() as u32)?;
file.write_all(&buf)?;
}
None => {
let mut bvec =
bumpalo::collections::Vec::new_in(context.extractor_alloc);
bvec.extend_from_slice(&buf);
data_ref.removed.push((docid, bvec.into_bump_slice()));
}
None => data_ref.removed.push(
// TODO: The error type is wrong here. It should be an internal error.
(docid, GeoJson::from_str(geojson.get()).map_err(UserError::from)?),
),
}
}
}
@ -187,50 +206,70 @@ impl<'extractor> Extractor<'extractor> for GeoJsonExtractor {
// we need to replace the current by the new point and therefore
// delete the current point from cellulite.
if let Some(geojson) = current_geo {
let geojson =
GeoJson::from_str(geojson.get()).map_err(UserError::from)?;
let geometry = Geometry::try_from(geojson).map_err(UserError::from)?;
let buf = ZerometryCodec::bytes_encode(&geometry).unwrap();
match &mut data_ref.spilled_removed {
Some(file) => {
file.write_u32::<BigEndian>(docid)?;
file.write_all(geojson.get().as_bytes())?;
file.write_u32::<BigEndian>(buf.len() as u32)?;
file.write_all(&buf)?;
}
None => {
let mut bvec =
bumpalo::collections::Vec::new_in(context.extractor_alloc);
bvec.extend_from_slice(&buf);
data_ref.removed.push((docid, bvec.into_bump_slice()));
}
// TODO: Should be an internal error
None => data_ref.removed.push((
docid,
GeoJson::from_str(geojson.get()).map_err(UserError::from)?,
)),
}
}
if let Some(geojson) = updated_geo {
let geojson =
GeoJson::from_str(geojson.get()).map_err(UserError::from)?;
let geometry = Geometry::try_from(geojson).map_err(UserError::from)?;
let buf = ZerometryCodec::bytes_encode(&geometry).unwrap();
match &mut data_ref.spilled_inserted {
Some(file) => {
file.write_u32::<BigEndian>(docid)?;
file.write_all(geojson.get().as_bytes())?;
file.write_u32::<BigEndian>(buf.len() as u32)?;
file.write_all(&buf)?;
}
None => {
let mut bvec =
bumpalo::collections::Vec::new_in(context.extractor_alloc);
bvec.extend_from_slice(&buf);
data_ref.inserted.push((docid, bvec.into_bump_slice()));
}
// TODO: Is the error type correct here? Shouldn't it be an internal error?
None => data_ref.inserted.push((
docid,
GeoJson::from_str(geojson.get()).map_err(UserError::from)?,
)),
}
}
}
}
DocumentChange::Insertion(insertion) => {
let docid = insertion.docid();
let inserted_geo = insertion.inserted().geojson_field()?;
if let Some(geojson) = inserted_geo {
let geojson = GeoJson::from_str(geojson.get()).map_err(UserError::from)?;
let geometry = Geometry::try_from(geojson).map_err(UserError::from)?;
let mut bytes = Vec::new();
Zerometry::write_from_geometry(&mut bytes, &geometry)?;
match &mut data_ref.spilled_inserted {
Some(file) => {
file.write_u32::<BigEndian>(docid)?;
file.write_all(geojson.get().as_bytes())?;
file.write_u32::<BigEndian>(bytes.len() as u32)?;
file.write_all(&bytes)?;
}
None => {
let mut bvec =
bumpalo::collections::Vec::new_in(context.extractor_alloc);
bvec.extend_from_slice(&bytes);
data_ref.inserted.push((docid, bvec.into_bump_slice()));
}
// TODO: Is the error type correct here? Shouldn't it be an internal error?
None => data_ref.inserted.push((
docid,
GeoJson::from_str(geojson.get()).map_err(UserError::from)?,
)),
}
}
}