diff --git a/Cargo.lock b/Cargo.lock index c04089877..775d4d023 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1065,7 +1065,6 @@ dependencies = [ [[package]] name = "cellulite" version = "0.1.0" -source = "git+https://github.com/irevoire/cellulite?branch=main#3654348942bdc67393e9a7b3c655f7e9a98dee50" dependencies = [ "geo", "geo-types", diff --git a/crates/milli/Cargo.toml b/crates/milli/Cargo.toml index e3f31f7ff..0e30f8fc7 100644 --- a/crates/milli/Cargo.toml +++ b/crates/milli/Cargo.toml @@ -18,7 +18,8 @@ bincode = "1.3.3" bstr = "1.12.0" bytemuck = { version = "1.23.1", features = ["extern_crate_alloc"] } byteorder = "1.5.0" -cellulite = { git = "https://github.com/irevoire/cellulite", branch = "main"} +# cellulite = { git = "https://github.com/irevoire/cellulite", branch = "main"} +cellulite = { path = "../../../cellulite" } charabia = { version = "0.9.6", default-features = false } concat-arrays = "0.1.2" convert_case = "0.8.0" diff --git a/crates/milli/src/update/new/channel.rs b/crates/milli/src/update/new/channel.rs index d85982679..f4087c337 100644 --- a/crates/milli/src/update/new/channel.rs +++ b/crates/milli/src/update/new/channel.rs @@ -140,7 +140,7 @@ pub enum ReceiverAction { LargeEntry(LargeEntry), LargeVectors(LargeVectors), LargeVector(LargeVector), - GeoJson(GeoJson), + GeoJson(DocumentId, GeoJson), } /// An entry that cannot fit in the BBQueue buffers has been @@ -1155,10 +1155,10 @@ impl GeoSender<'_, '_> { pub struct GeoJsonSender<'a, 'b>(&'a ExtractorBbqueueSender<'b>); impl GeoJsonSender<'_, '_> { - pub fn send_geojson(&self, value: GeoJson) -> StdResult<(), SendError<()>> { + pub fn send_geojson(&self, docid: DocumentId, value: GeoJson) -> StdResult<(), SendError<()>> { self.0 .sender - .send(ReceiverAction::GeoJson(value)) + .send(ReceiverAction::GeoJson(docid, value)) .map_err(|_| SendError(())) } } diff --git a/crates/milli/src/update/new/extract/geo/cellulite.rs b/crates/milli/src/update/new/extract/geo/cellulite.rs index 0fcde3c4e..6f1f8f232 100644 --- a/crates/milli/src/update/new/extract/geo/cellulite.rs +++ b/crates/milli/src/update/new/extract/geo/cellulite.rs @@ -5,7 +5,7 @@ use std::str::FromStr; use std::{iter, mem, result}; use bumpalo::Bump; -use bytemuck::{bytes_of, pod_read_unaligned, Pod, Zeroable}; +use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use geojson::GeoJson; use heed::RoTxn; use serde_json::value::RawValue; @@ -60,8 +60,8 @@ impl From for GeoPoint { pub struct GeoJsonExtractorData<'extractor> { /// The set of documents ids that were removed. If a document sees its geo /// point being updated, we first put it in the deleted and then in the inserted. - removed: bumpalo::collections::Vec<'extractor, GeoJson>, - inserted: bumpalo::collections::Vec<'extractor, GeoJson>, + removed: bumpalo::collections::Vec<'extractor, (DocumentId, GeoJson)>, + inserted: bumpalo::collections::Vec<'extractor, (DocumentId, GeoJson)>, /// Contains a packed list of `ExtractedGeoPoint` of the inserted geo points /// data structures if we have spilled to disk. spilled_removed: Option>, @@ -90,8 +90,8 @@ impl<'extractor> GeoJsonExtractorData<'extractor> { unsafe impl MostlySend for GeoJsonExtractorData<'_> {} pub struct FrozenGeoJsonExtractorData<'extractor> { - pub removed: &'extractor [GeoJson], - pub inserted: &'extractor [GeoJson], + pub removed: &'extractor [(DocumentId, GeoJson)], + pub inserted: &'extractor [(DocumentId, GeoJson)], pub spilled_removed: Option>, pub spilled_inserted: Option>, } @@ -99,7 +99,7 @@ pub struct FrozenGeoJsonExtractorData<'extractor> { impl FrozenGeoJsonExtractorData<'_> { pub fn iter_and_clear_removed( &mut self, - ) -> io::Result> + '_> { + ) -> io::Result> + '_> { Ok(mem::take(&mut self.removed) .iter() .cloned() @@ -109,7 +109,7 @@ impl FrozenGeoJsonExtractorData<'_> { pub fn iter_and_clear_inserted( &mut self, - ) -> io::Result> + '_> { + ) -> io::Result> + '_> { Ok(mem::take(&mut self.inserted) .iter() .cloned() @@ -120,7 +120,7 @@ impl FrozenGeoJsonExtractorData<'_> { fn iterator_over_spilled_geojsons( spilled: &mut Option>, -) -> io::Result> + '_> { +) -> io::Result> + '_> { let mut spilled = spilled.take(); if let Some(spilled) = &mut spilled { spilled.rewind()?; @@ -128,8 +128,13 @@ fn iterator_over_spilled_geojsons( Ok(iter::from_fn(move || match &mut spilled { Some(file) => { + let docid = match file.read_u32::() { + Ok(docid) => docid, + Err(e) if e.kind() == ErrorKind::UnexpectedEof => return None, + Err(e) => return Some(Err(serde_json::Error::io(e))), + }; match GeoJson::from_reader(file) { - Ok(geojson) => Some(Ok(geojson)), + Ok(geojson) => Some(Ok((docid, geojson))), Err(e) if e.is_eof() => None, Err(e) => Some(Err(e)), } @@ -178,10 +183,13 @@ impl<'extractor> Extractor<'extractor> for GeoJsonExtractor { if let Some(geojson) = current.geojson_field()? { match &mut data_ref.spilled_removed { - Some(file) => file.write_all(geojson.get().as_bytes())?, + Some(file) => { + file.write_u32::(docid)?; + file.write_all(geojson.get().as_bytes())?; + } None => data_ref.removed.push( // TODO: The error type is wrong here. It should be an internal error. - GeoJson::from_str(geojson.get()).map_err(UserError::from)?, + (docid, GeoJson::from_str(geojson.get()).map_err(UserError::from)?), ), } } @@ -202,20 +210,26 @@ impl<'extractor> Extractor<'extractor> for GeoJsonExtractor { // delete the current point from cellulite. if let Some(geojson) = current_geo { match &mut data_ref.spilled_removed { - Some(file) => file.write_all(geojson.get().as_bytes())?, + Some(file) => { + file.write_u32::(docid)?; + file.write_all(geojson.get().as_bytes())?; + } // TODO: Should be an internal error None => data_ref.removed.push( - GeoJson::from_str(geojson.get()).map_err(UserError::from)?, + (docid, GeoJson::from_str(geojson.get()).map_err(UserError::from)?), ), } } if let Some(geojson) = updated_geo { match &mut data_ref.spilled_inserted { - Some(file) => file.write_all(geojson.get().as_bytes())?, + Some(file) => { + file.write_u32::(docid)?; + file.write_all(geojson.get().as_bytes())?; + } // TODO: Is the error type correct here? Shouldn't it be an internal error? None => data_ref.inserted.push( - GeoJson::from_str(geojson.get()).map_err(UserError::from)?, + (docid, GeoJson::from_str(geojson.get()).map_err(UserError::from)?), ), } } @@ -229,10 +243,13 @@ impl<'extractor> Extractor<'extractor> for GeoJsonExtractor { if let Some(geojson) = inserted_geo { match &mut data_ref.spilled_inserted { - Some(file) => file.write_all(geojson.get().as_bytes())?, + Some(file) => { + file.write_u32::(docid)?; + file.write_all(geojson.get().as_bytes())?; + } // TODO: Is the error type correct here? Shouldn't it be an internal error? None => data_ref.inserted.push( - GeoJson::from_str(geojson.get()).map_err(UserError::from)?, + (docid, GeoJson::from_str(geojson.get()).map_err(UserError::from)?), ), } } diff --git a/crates/milli/src/update/new/indexer/write.rs b/crates/milli/src/update/new/indexer/write.rs index 974385bf9..24e32f0bc 100644 --- a/crates/milli/src/update/new/indexer/write.rs +++ b/crates/milli/src/update/new/indexer/write.rs @@ -72,9 +72,9 @@ pub fn write_to_db( let embedding = large_vector.read_embedding(*dimensions); writer.add_item_in_store(wtxn, docid, extractor_id, embedding)?; } - ReceiverAction::GeoJson(geojson) => { + ReceiverAction::GeoJson(docid, geojson) => { let cellulite = cellulite::Writer::new(index.cellulite); - cellulite.add_item(wtxn, doc_id, &geojson)?; + cellulite.add_item(wtxn, docid, &geojson).map_err(InternalError::CelluliteError)?; } } diff --git a/crates/milli/src/update/new/merger.rs b/crates/milli/src/update/new/merger.rs index 90b0b205f..4b558d5c2 100644 --- a/crates/milli/src/update/new/merger.rs +++ b/crates/milli/src/update/new/merger.rs @@ -75,8 +75,6 @@ pub fn merge_and_send_cellulite<'extractor, MSP>( where MSP: Fn() -> bool + Sync, { - let cellulite = cellulite::Writer::new(index.cellulite); - for data in datastore { if must_stop_processing() { return Err(InternalError::AbortedIndexation.into()); @@ -93,7 +91,8 @@ where } for result in frozen.iter_and_clear_inserted()? { - geojson_sender.send_geojson(result.map_err(InternalError::SerdeJson)?).unwrap(); + let (docid, geojson) = result.map_err(InternalError::SerdeJson)?; + geojson_sender.send_geojson(docid, geojson).unwrap(); } }