finish plugin cellulite to the new indexer

This commit is contained in:
Tamo
2025-07-16 00:10:40 +02:00
parent a921ee31ce
commit 3f00f56f9f
6 changed files with 43 additions and 27 deletions

View File

@ -140,7 +140,7 @@ pub enum ReceiverAction {
LargeEntry(LargeEntry),
LargeVectors(LargeVectors),
LargeVector(LargeVector),
GeoJson(GeoJson),
GeoJson(DocumentId, GeoJson),
}
/// An entry that cannot fit in the BBQueue buffers has been
@ -1155,10 +1155,10 @@ impl GeoSender<'_, '_> {
pub struct GeoJsonSender<'a, 'b>(&'a ExtractorBbqueueSender<'b>);
impl GeoJsonSender<'_, '_> {
pub fn send_geojson(&self, value: GeoJson) -> StdResult<(), SendError<()>> {
pub fn send_geojson(&self, docid: DocumentId, value: GeoJson) -> StdResult<(), SendError<()>> {
self.0
.sender
.send(ReceiverAction::GeoJson(value))
.send(ReceiverAction::GeoJson(docid, value))
.map_err(|_| SendError(()))
}
}

View File

@ -5,7 +5,7 @@ use std::str::FromStr;
use std::{iter, mem, result};
use bumpalo::Bump;
use bytemuck::{bytes_of, pod_read_unaligned, Pod, Zeroable};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use geojson::GeoJson;
use heed::RoTxn;
use serde_json::value::RawValue;
@ -60,8 +60,8 @@ impl From<ExtractedGeoPoint> for GeoPoint {
pub struct GeoJsonExtractorData<'extractor> {
/// The set of documents ids that were removed. If a document sees its geo
/// point being updated, we first put it in the deleted and then in the inserted.
removed: bumpalo::collections::Vec<'extractor, GeoJson>,
inserted: bumpalo::collections::Vec<'extractor, GeoJson>,
removed: bumpalo::collections::Vec<'extractor, (DocumentId, GeoJson)>,
inserted: bumpalo::collections::Vec<'extractor, (DocumentId, GeoJson)>,
/// Contains a packed list of `ExtractedGeoPoint` of the inserted geo points
/// data structures if we have spilled to disk.
spilled_removed: Option<BufWriter<File>>,
@ -90,8 +90,8 @@ impl<'extractor> GeoJsonExtractorData<'extractor> {
unsafe impl MostlySend for GeoJsonExtractorData<'_> {}
pub struct FrozenGeoJsonExtractorData<'extractor> {
pub removed: &'extractor [GeoJson],
pub inserted: &'extractor [GeoJson],
pub removed: &'extractor [(DocumentId, GeoJson)],
pub inserted: &'extractor [(DocumentId, GeoJson)],
pub spilled_removed: Option<BufReader<File>>,
pub spilled_inserted: Option<BufReader<File>>,
}
@ -99,7 +99,7 @@ pub struct FrozenGeoJsonExtractorData<'extractor> {
impl FrozenGeoJsonExtractorData<'_> {
pub fn iter_and_clear_removed(
&mut self,
) -> io::Result<impl IntoIterator<Item = Result<GeoJson, serde_json::Error>> + '_> {
) -> io::Result<impl IntoIterator<Item = Result<(DocumentId, GeoJson), serde_json::Error>> + '_> {
Ok(mem::take(&mut self.removed)
.iter()
.cloned()
@ -109,7 +109,7 @@ impl FrozenGeoJsonExtractorData<'_> {
pub fn iter_and_clear_inserted(
&mut self,
) -> io::Result<impl IntoIterator<Item = Result<GeoJson, serde_json::Error>> + '_> {
) -> io::Result<impl IntoIterator<Item = Result<(DocumentId, GeoJson), serde_json::Error>> + '_> {
Ok(mem::take(&mut self.inserted)
.iter()
.cloned()
@ -120,7 +120,7 @@ impl FrozenGeoJsonExtractorData<'_> {
fn iterator_over_spilled_geojsons(
spilled: &mut Option<BufReader<File>>,
) -> io::Result<impl IntoIterator<Item = Result<GeoJson, serde_json::Error>> + '_> {
) -> io::Result<impl IntoIterator<Item = Result<(DocumentId, GeoJson), serde_json::Error>> + '_> {
let mut spilled = spilled.take();
if let Some(spilled) = &mut spilled {
spilled.rewind()?;
@ -128,8 +128,13 @@ fn iterator_over_spilled_geojsons(
Ok(iter::from_fn(move || match &mut spilled {
Some(file) => {
let docid = match file.read_u32::<BigEndian>() {
Ok(docid) => docid,
Err(e) if e.kind() == ErrorKind::UnexpectedEof => return None,
Err(e) => return Some(Err(serde_json::Error::io(e))),
};
match GeoJson::from_reader(file) {
Ok(geojson) => Some(Ok(geojson)),
Ok(geojson) => Some(Ok((docid, geojson))),
Err(e) if e.is_eof() => None,
Err(e) => Some(Err(e)),
}
@ -178,10 +183,13 @@ impl<'extractor> Extractor<'extractor> for GeoJsonExtractor {
if let Some(geojson) = current.geojson_field()? {
match &mut data_ref.spilled_removed {
Some(file) => file.write_all(geojson.get().as_bytes())?,
Some(file) => {
file.write_u32::<BigEndian>(docid)?;
file.write_all(geojson.get().as_bytes())?;
}
None => data_ref.removed.push(
// TODO: The error type is wrong here. It should be an internal error.
GeoJson::from_str(geojson.get()).map_err(UserError::from)?,
(docid, GeoJson::from_str(geojson.get()).map_err(UserError::from)?),
),
}
}
@ -202,20 +210,26 @@ impl<'extractor> Extractor<'extractor> for GeoJsonExtractor {
// delete the current point from cellulite.
if let Some(geojson) = current_geo {
match &mut data_ref.spilled_removed {
Some(file) => file.write_all(geojson.get().as_bytes())?,
Some(file) => {
file.write_u32::<BigEndian>(docid)?;
file.write_all(geojson.get().as_bytes())?;
}
// TODO: Should be an internal error
None => data_ref.removed.push(
GeoJson::from_str(geojson.get()).map_err(UserError::from)?,
(docid, GeoJson::from_str(geojson.get()).map_err(UserError::from)?),
),
}
}
if let Some(geojson) = updated_geo {
match &mut data_ref.spilled_inserted {
Some(file) => file.write_all(geojson.get().as_bytes())?,
Some(file) => {
file.write_u32::<BigEndian>(docid)?;
file.write_all(geojson.get().as_bytes())?;
}
// TODO: Is the error type correct here? Shouldn't it be an internal error?
None => data_ref.inserted.push(
GeoJson::from_str(geojson.get()).map_err(UserError::from)?,
(docid, GeoJson::from_str(geojson.get()).map_err(UserError::from)?),
),
}
}
@ -229,10 +243,13 @@ impl<'extractor> Extractor<'extractor> for GeoJsonExtractor {
if let Some(geojson) = inserted_geo {
match &mut data_ref.spilled_inserted {
Some(file) => file.write_all(geojson.get().as_bytes())?,
Some(file) => {
file.write_u32::<BigEndian>(docid)?;
file.write_all(geojson.get().as_bytes())?;
}
// TODO: Is the error type correct here? Shouldn't it be an internal error?
None => data_ref.inserted.push(
GeoJson::from_str(geojson.get()).map_err(UserError::from)?,
(docid, GeoJson::from_str(geojson.get()).map_err(UserError::from)?),
),
}
}

View File

@ -72,9 +72,9 @@ pub fn write_to_db(
let embedding = large_vector.read_embedding(*dimensions);
writer.add_item_in_store(wtxn, docid, extractor_id, embedding)?;
}
ReceiverAction::GeoJson(geojson) => {
ReceiverAction::GeoJson(docid, geojson) => {
let cellulite = cellulite::Writer::new(index.cellulite);
cellulite.add_item(wtxn, doc_id, &geojson)?;
cellulite.add_item(wtxn, docid, &geojson).map_err(InternalError::CelluliteError)?;
}
}

View File

@ -75,8 +75,6 @@ pub fn merge_and_send_cellulite<'extractor, MSP>(
where
MSP: Fn() -> bool + Sync,
{
let cellulite = cellulite::Writer::new(index.cellulite);
for data in datastore {
if must_stop_processing() {
return Err(InternalError::AbortedIndexation.into());
@ -93,7 +91,8 @@ where
}
for result in frozen.iter_and_clear_inserted()? {
geojson_sender.send_geojson(result.map_err(InternalError::SerdeJson)?).unwrap();
let (docid, geojson) = result.map_err(InternalError::SerdeJson)?;
geojson_sender.send_geojson(docid, geojson).unwrap();
}
}