fix the cellulite spilling bug

This commit is contained in:
Tamo
2025-07-24 18:11:16 +02:00
parent bad5406095
commit ebb4865b95
16 changed files with 157 additions and 116 deletions

3
Cargo.lock generated
View File

@ -1065,8 +1065,6 @@ dependencies = [
[[package]]
name = "cellulite"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "377e81073db1dd0b0f0f297da35717fd799a77117668f2a6c48a3cabe0ef717e"
dependencies = [
"geo",
"geo-types",
@ -4214,6 +4212,7 @@ dependencies = [
"url",
"utoipa",
"uuid",
"zerometry",
]
[[package]]

View File

@ -7,7 +7,8 @@ const LILLE: &str = include_str!("assets/lille.geojson");
async fn basic_add_settings_and_geojson_documents() {
let server = Server::new_shared();
let index = server.unique_index();
let (task, _status_code) = index.update_settings(json!({"filterableAttributes": ["_geojson"]})).await;
let (task, _status_code) =
index.update_settings(json!({"filterableAttributes": ["_geojson"]})).await;
server.wait_task(task.uid()).await.succeeded();
let (response, _) = index.search_get("?filter=_geoPolygon([0,0],[2,0],[2,2],[0,2])").await;
@ -113,7 +114,6 @@ async fn basic_add_settings_and_geojson_documents() {
"#);
}
#[actix_rt::test]
async fn basic_add_geojson_documents_and_settings() {
let server = Server::new_shared();
@ -168,7 +168,8 @@ async fn basic_add_geojson_documents_and_settings() {
}
"#);
let (task, _status_code) = index.update_settings(json!({"filterableAttributes": ["_geojson"]})).await;
let (task, _status_code) =
index.update_settings(json!({"filterableAttributes": ["_geojson"]})).await;
server.wait_task(task.uid()).await.succeeded();
let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[2,0],[2,2],[0,2])").await;
snapshot!(response,
@ -212,14 +213,16 @@ async fn add_and_remove_geojson() {
]);
let (task, _status_code) = index.add_documents(documents, None).await;
server.wait_task(task.uid()).await.succeeded();
let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[0.9,0],[0.9,0.9],[0,0.9])").await;
let (response, _code) =
index.search_get("?filter=_geoPolygon([0,0],[0.9,0],[0.9,0.9],[0,0.9])").await;
assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 0);
let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[2,0],[2,2],[0,2])").await;
assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 1);
let (task, _) = index.delete_document(0).await;
server.wait_task(task.uid()).await.succeeded();
let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[0.9,0],[0.9,0.9],[0,0.9])").await;
let (response, _code) =
index.search_get("?filter=_geoPolygon([0,0],[0.9,0],[0.9,0.9],[0,0.9])").await;
assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 0);
let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[2,0],[2,2],[0,2])").await;
assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 0);
@ -233,13 +236,13 @@ async fn add_and_remove_geojson() {
]);
let (task, _status_code) = index.add_documents(documents, None).await;
server.wait_task(task.uid()).await.succeeded();
let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[0.9,0],[0.9,0.9],[0,0.9])").await;
let (response, _code) =
index.search_get("?filter=_geoPolygon([0,0],[0.9,0],[0.9,0.9],[0,0.9])").await;
assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 0);
let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[2,0],[2,2],[0,2])").await;
assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 1);
}
#[actix_rt::test]
async fn partial_update_geojson() {
let server = Server::new_shared();
@ -255,12 +258,12 @@ async fn partial_update_geojson() {
]);
let (task, _status_code) = index.add_documents(documents, None).await;
server.wait_task(task.uid()).await.succeeded();
let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[0.9,0],[0.9,0.9],[0,0.9])").await;
let (response, _code) =
index.search_get("?filter=_geoPolygon([0,0],[0.9,0],[0.9,0.9],[0,0.9])").await;
assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 0);
let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[2,0],[2,2],[0,2])").await;
assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 1);
let documents = json!([
{
"id": 0,
@ -269,10 +272,12 @@ async fn partial_update_geojson() {
]);
let (task, _status_code) = index.update_documents(documents, None).await;
server.wait_task(task.uid()).await.succeeded();
let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[0.9,0],[0.9,0.9],[0,0.9])").await;
let (response, _code) =
index.search_get("?filter=_geoPolygon([0,0],[0.9,0],[0.9,0.9],[0,0.9])").await;
assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 1);
let (response, _code) = index.search_get("?filter=_geoPolygon([0,0],[2,0],[2,2],[0,2])").await;
assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 1);
let (response, _code) = index.search_get("?filter=_geoPolygon([0.9,0.9],[2,0.9],[2,2],[0.9,2])").await;
let (response, _code) =
index.search_get("?filter=_geoPolygon([0.9,0.9],[2,0.9],[2,2],[0.9,2])").await;
assert_eq!(response.get("hits").unwrap().as_array().unwrap().len(), 0);
}

View File

@ -1,6 +1,6 @@
mod add_documents;
mod delete_documents;
mod errors;
mod geojson;
mod get_documents;
mod update_documents;
mod geojson;

View File

@ -18,8 +18,8 @@ bincode = "1.3.3"
bstr = "1.12.0"
bytemuck = { version = "1.23.1", features = ["extern_crate_alloc"] }
byteorder = "1.5.0"
# cellulite = { path = "../../../cellulite" }
cellulite = "0.1.0"
cellulite = { path = "../../../cellulite" }
# cellulite = "0.1.0"
steppe = "0.4.0"
charabia = { version = "0.9.6", default-features = false }
concat-arrays = "0.1.2"
@ -114,6 +114,7 @@ utoipa = { version = "5.4.0", features = [
] }
lru = "0.14.0"
geo-types = "0.7.16"
zerometry = "0.1.0"
[dev-dependencies]
mimalloc = { version = "0.1.47", default-features = false }

View File

@ -6,7 +6,9 @@ use heed::RoTxn;
use super::FieldsIdsMap;
use crate::attribute_patterns::{match_field_legacy, PatternMatch};
use crate::constants::{RESERVED_GEOJSON_FIELD_NAME, RESERVED_GEO_FIELD_NAME, RESERVED_VECTORS_FIELD_NAME};
use crate::constants::{
RESERVED_GEOJSON_FIELD_NAME, RESERVED_GEO_FIELD_NAME, RESERVED_VECTORS_FIELD_NAME,
};
use crate::{
is_faceted_by, FieldId, FilterableAttributesFeatures, FilterableAttributesRule, Index,
LocalizedAttributesRule, Result, Weight,

View File

@ -340,4 +340,4 @@ impl<T: steppe::Step> Step for Compat<T> {
fn total(&self) -> u32 {
self.0.total().try_into().unwrap_or(u32::MAX)
}
}
}

View File

@ -794,7 +794,8 @@ impl<'a> Filter<'a> {
),
Vec::new(),
);
let result = index.cellulite
let result = index
.cellulite
.in_shape(rtxn, &polygon.into(), &mut |_| ())
.map_err(InternalError::CelluliteError)?;
// TODO: Remove once we update roaring

View File

@ -179,8 +179,6 @@ fn extract_geojson_field(
.map(|v| serde_json::from_slice(v).map_err(InternalError::SerdeJson))
.transpose()?)
}
_ => {
Ok(None)
}
_ => Ok(None),
}
}

View File

@ -820,19 +820,20 @@ impl<'a, 'i> Transform<'a, 'i> {
let documents_count = documents_ids.len() as usize;
// We initialize the sorter with the user indexing settings.
let mut original_sorter = if settings_diff.reindex_vectors() || settings_diff.reindex_geojson() {
Some(create_sorter(
grenad::SortAlgorithm::Stable,
KeepFirst,
self.indexer_settings.chunk_compression_type,
self.indexer_settings.chunk_compression_level,
self.indexer_settings.max_nb_chunks,
self.indexer_settings.max_memory.map(|mem| mem / 2),
true,
))
} else {
None
};
let mut original_sorter =
if settings_diff.reindex_vectors() || settings_diff.reindex_geojson() {
Some(create_sorter(
grenad::SortAlgorithm::Stable,
KeepFirst,
self.indexer_settings.chunk_compression_type,
self.indexer_settings.chunk_compression_level,
self.indexer_settings.max_nb_chunks,
self.indexer_settings.max_memory.map(|mem| mem / 2),
true,
))
} else {
None
};
let readers: BTreeMap<&str, (ArroyWrapper, &RoaringBitmap)> = settings_diff
.embedding_config_updates

View File

@ -644,7 +644,8 @@ pub(crate) fn write_typed_chunk_into_index(
let geojson =
geojson::GeoJson::from_reader(value).map_err(UserError::SerdeJson)?;
index.cellulite
index
.cellulite
.add(wtxn, docid, &geojson)
.map_err(InternalError::CelluliteError)?;
}

View File

@ -13,7 +13,6 @@ use bbqueue::framed::{FrameGrantR, FrameProducer};
use bbqueue::BBBuffer;
use bytemuck::{checked, CheckedBitPattern, NoUninit};
use flume::{RecvTimeoutError, SendError};
use geojson::GeoJson;
use heed::types::Bytes;
use heed::{BytesDecode, MdbError};
use memmap2::{Mmap, MmapMut};
@ -144,7 +143,7 @@ pub enum ReceiverAction {
// The geojson for france made of 63k points takes 594KiB which means with a capacity of 1000,
// the absolute maximum amounts of memory we could consume is about 580MiB which is acceptable for this POC.
// If the geojson is None, it means that the document is being deleted.
GeoJson(DocumentId, Option<GeoJson>),
GeoJson(DocumentId, Option<Vec<u8>>),
}
/// An entry that cannot fit in the BBQueue buffers has been
@ -1155,7 +1154,7 @@ impl GeoSender<'_, '_> {
pub struct GeoJsonSender<'a, 'b>(&'a ExtractorBbqueueSender<'b>);
impl GeoJsonSender<'_, '_> {
pub fn send_geojson(&self, docid: DocumentId, value: GeoJson) -> StdResult<(), SendError<()>> {
pub fn send_geojson(&self, docid: DocumentId, value: Vec<u8>) -> StdResult<(), SendError<()>> {
self.0.sender.send(ReceiverAction::GeoJson(docid, Some(value))).map_err(|_| SendError(()))
}
pub fn delete_geojson(&self, docid: DocumentId) -> StdResult<(), SendError<()>> {

View File

@ -1,21 +1,25 @@
use std::cell::RefCell;
use std::fs::File;
use std::io::{self, BufReader, BufWriter, ErrorKind, Seek as _, Write as _};
use std::io::{self, BufReader, BufWriter, ErrorKind, Read, Seek as _, Write as _};
use std::str::FromStr;
use std::{iter, mem};
use bumpalo::Bump;
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use cellulite::zerometry::ZerometryCodec;
use geo_types::Geometry;
use geojson::GeoJson;
use heed::RoTxn;
use heed::{BytesEncode, RoTxn};
use zerometry::Zerometry;
use crate::update::new::channel::GeoJsonSender;
use crate::update::new::document::{Document, DocumentContext};
use crate::update::new::indexer::document_changes::Extractor;
use crate::update::new::ref_cell_ext::RefCellExt as _;
use crate::update::new::thread_local::MostlySend;
use crate::update::new::DocumentChange;
use crate::update::GrenadParameters;
use crate::{DocumentId, Index, Result, UserError};
use crate::{DocumentId, Index, InternalError, Result, UserError};
pub struct GeoJsonExtractor {
grenad_parameters: GrenadParameters,
@ -38,8 +42,8 @@ impl GeoJsonExtractor {
pub struct GeoJsonExtractorData<'extractor> {
/// The set of documents ids that were removed. If a document sees its geo
/// point being updated, we first put it in the deleted and then in the inserted.
removed: bumpalo::collections::Vec<'extractor, (DocumentId, GeoJson)>,
inserted: bumpalo::collections::Vec<'extractor, (DocumentId, GeoJson)>,
removed: bumpalo::collections::Vec<'extractor, (DocumentId, &'extractor [u8])>,
inserted: bumpalo::collections::Vec<'extractor, (DocumentId, &'extractor [u8])>,
/// Contains a packed list of `ExtractedGeoPoint` of the inserted geo points
/// data structures if we have spilled to disk.
spilled_removed: Option<BufWriter<File>>,
@ -68,39 +72,43 @@ impl<'extractor> GeoJsonExtractorData<'extractor> {
unsafe impl MostlySend for GeoJsonExtractorData<'_> {}
pub struct FrozenGeoJsonExtractorData<'extractor> {
pub removed: &'extractor [(DocumentId, GeoJson)],
pub inserted: &'extractor [(DocumentId, GeoJson)],
pub removed: &'extractor [(DocumentId, &'extractor [u8])],
pub inserted: &'extractor [(DocumentId, &'extractor [u8])],
pub spilled_removed: Option<BufReader<File>>,
pub spilled_inserted: Option<BufReader<File>>,
}
impl FrozenGeoJsonExtractorData<'_> {
pub fn iter_and_clear_removed(
&mut self,
) -> io::Result<impl IntoIterator<Item = Result<(DocumentId, GeoJson), serde_json::Error>> + '_>
{
Ok(mem::take(&mut self.removed)
.iter()
.cloned()
.map(Ok)
.chain(iterator_over_spilled_geojsons(&mut self.spilled_removed)?))
pub fn iter_and_clear_removed(&mut self, channel: GeoJsonSender<'_, '_>) -> Result<()> {
for (docid, _buf) in mem::take(&mut self.removed) {
channel.delete_geojson(*docid).unwrap();
}
for ret in iterator_over_spilled_geojsons(&mut self.spilled_removed)? {
let (docid, _buf) = ret.map_err(InternalError::SerdeJson)?;
channel.delete_geojson(docid).unwrap();
}
Ok(())
}
pub fn iter_and_clear_inserted(
&mut self,
) -> io::Result<impl IntoIterator<Item = Result<(DocumentId, GeoJson), serde_json::Error>> + '_>
{
Ok(mem::take(&mut self.inserted)
.iter()
.cloned()
.map(Ok)
.chain(iterator_over_spilled_geojsons(&mut self.spilled_inserted)?))
pub fn iter_and_clear_inserted(&mut self, channel: GeoJsonSender<'_, '_>) -> Result<()> {
for (docid, _buf) in mem::take(&mut self.inserted) {
channel.send_geojson(*docid, _buf.to_vec()).unwrap();
}
for ret in iterator_over_spilled_geojsons(&mut self.spilled_inserted)? {
let (docid, buf) = ret.map_err(InternalError::SerdeJson)?;
channel.send_geojson(docid, buf.to_vec()).unwrap();
}
Ok(())
}
}
fn iterator_over_spilled_geojsons(
spilled: &mut Option<BufReader<File>>,
) -> io::Result<impl IntoIterator<Item = Result<(DocumentId, GeoJson), serde_json::Error>> + '_> {
) -> io::Result<impl IntoIterator<Item = Result<(DocumentId, Vec<u8>), serde_json::Error>> + '_> {
let mut spilled = spilled.take();
if let Some(spilled) = &mut spilled {
spilled.rewind()?;
@ -113,10 +121,14 @@ fn iterator_over_spilled_geojsons(
Err(e) if e.kind() == ErrorKind::UnexpectedEof => return None,
Err(e) => return Some(Err(serde_json::Error::io(e))),
};
match GeoJson::from_reader(file) {
Ok(geojson) => Some(Ok((docid, geojson))),
Err(e) if e.is_eof() => None,
Err(e) => Some(Err(e)),
let size = match file.read_u32::<BigEndian>() {
Ok(size) => size,
Err(e) => return Some(Err(serde_json::Error::io(e))),
};
let mut buf = vec![0; size as usize];
match file.read_exact(&mut buf) {
Ok(()) => Some(Ok((docid, buf))),
Err(e) => return Some(Err(serde_json::Error::io(e))),
}
}
None => None,
@ -138,7 +150,7 @@ impl<'extractor> Extractor<'extractor> for GeoJsonExtractor {
fn process<'doc>(
&'doc self,
changes: impl Iterator<Item = Result<DocumentChange<'doc>>>,
context: &'doc DocumentContext<Self::Data>,
context: &'doc DocumentContext<'doc, 'extractor, '_, '_, Self::Data>,
) -> Result<()> {
let rtxn = &context.rtxn;
let index = context.index;
@ -161,15 +173,22 @@ impl<'extractor> Extractor<'extractor> for GeoJsonExtractor {
let current = deletion.current(rtxn, index, db_fields_ids_map)?;
if let Some(geojson) = current.geojson_field()? {
let geojson = GeoJson::from_str(geojson.get()).map_err(UserError::from)?;
let geometry = Geometry::try_from(geojson).map_err(UserError::from)?;
let buf = ZerometryCodec::bytes_encode(&geometry).unwrap();
match &mut data_ref.spilled_removed {
Some(file) => {
file.write_u32::<BigEndian>(docid)?;
file.write_all(geojson.get().as_bytes())?;
file.write_u32::<BigEndian>(buf.len() as u32)?;
file.write_all(&buf)?;
}
None => {
let mut bvec =
bumpalo::collections::Vec::new_in(context.extractor_alloc);
bvec.extend_from_slice(&buf);
data_ref.removed.push((docid, bvec.into_bump_slice()));
}
None => data_ref.removed.push(
// TODO: The error type is wrong here. It should be an internal error.
(docid, GeoJson::from_str(geojson.get()).map_err(UserError::from)?),
),
}
}
}
@ -187,50 +206,70 @@ impl<'extractor> Extractor<'extractor> for GeoJsonExtractor {
// we need to replace the current by the new point and therefore
// delete the current point from cellulite.
if let Some(geojson) = current_geo {
let geojson =
GeoJson::from_str(geojson.get()).map_err(UserError::from)?;
let geometry = Geometry::try_from(geojson).map_err(UserError::from)?;
let buf = ZerometryCodec::bytes_encode(&geometry).unwrap();
match &mut data_ref.spilled_removed {
Some(file) => {
file.write_u32::<BigEndian>(docid)?;
file.write_all(geojson.get().as_bytes())?;
file.write_u32::<BigEndian>(buf.len() as u32)?;
file.write_all(&buf)?;
}
None => {
let mut bvec =
bumpalo::collections::Vec::new_in(context.extractor_alloc);
bvec.extend_from_slice(&buf);
data_ref.removed.push((docid, bvec.into_bump_slice()));
}
// TODO: Should be an internal error
None => data_ref.removed.push((
docid,
GeoJson::from_str(geojson.get()).map_err(UserError::from)?,
)),
}
}
if let Some(geojson) = updated_geo {
let geojson =
GeoJson::from_str(geojson.get()).map_err(UserError::from)?;
let geometry = Geometry::try_from(geojson).map_err(UserError::from)?;
let buf = ZerometryCodec::bytes_encode(&geometry).unwrap();
match &mut data_ref.spilled_inserted {
Some(file) => {
file.write_u32::<BigEndian>(docid)?;
file.write_all(geojson.get().as_bytes())?;
file.write_u32::<BigEndian>(buf.len() as u32)?;
file.write_all(&buf)?;
}
None => {
let mut bvec =
bumpalo::collections::Vec::new_in(context.extractor_alloc);
bvec.extend_from_slice(&buf);
data_ref.inserted.push((docid, bvec.into_bump_slice()));
}
// TODO: Is the error type correct here? Shouldn't it be an internal error?
None => data_ref.inserted.push((
docid,
GeoJson::from_str(geojson.get()).map_err(UserError::from)?,
)),
}
}
}
}
DocumentChange::Insertion(insertion) => {
let docid = insertion.docid();
let inserted_geo = insertion.inserted().geojson_field()?;
if let Some(geojson) = inserted_geo {
let geojson = GeoJson::from_str(geojson.get()).map_err(UserError::from)?;
let geometry = Geometry::try_from(geojson).map_err(UserError::from)?;
let mut bytes = Vec::new();
Zerometry::write_from_geometry(&mut bytes, &geometry)?;
match &mut data_ref.spilled_inserted {
Some(file) => {
file.write_u32::<BigEndian>(docid)?;
file.write_all(geojson.get().as_bytes())?;
file.write_u32::<BigEndian>(bytes.len() as u32)?;
file.write_all(&bytes)?;
}
None => {
let mut bvec =
bumpalo::collections::Vec::new_in(context.extractor_alloc);
bvec.extend_from_slice(&bytes);
data_ref.inserted.push((docid, bvec.into_bump_slice()));
}
// TODO: Is the error type correct here? Shouldn't it be an internal error?
None => data_ref.inserted.push((
docid,
GeoJson::from_str(geojson.get()).map_err(UserError::from)?,
)),
}
}
}

View File

@ -24,7 +24,7 @@ pub trait Extractor<'extractor>: Sync {
fn process<'doc>(
&'doc self,
changes: impl Iterator<Item = Result<DocumentChange<'doc>>>,
context: &'doc DocumentContext<Self::Data>,
context: &'doc DocumentContext<'doc, 'extractor, '_, '_, Self::Data>,
) -> Result<()>;
}

View File

@ -72,18 +72,17 @@ pub fn write_to_db(
let embedding = large_vector.read_embedding(*dimensions);
writer.add_item_in_store(wtxn, docid, extractor_id, embedding)?;
}
ReceiverAction::GeoJson(docid, geojson) => {
match geojson {
Some(geojson) => {
println!("Adding geojson {docid}");
index.cellulite.add(wtxn, docid, &geojson).map_err(InternalError::CelluliteError)?;
}
None => {
println!("Deleting geojson {docid}");
index.cellulite.delete(wtxn, docid).map_err(InternalError::CelluliteError)?;
}
ReceiverAction::GeoJson(docid, geojson) => match geojson {
Some(geojson) => {
index
.cellulite
.add_raw_zerometry(wtxn, docid, &geojson)
.map_err(InternalError::CelluliteError)?;
}
}
None => {
index.cellulite.delete(wtxn, docid).map_err(InternalError::CelluliteError)?;
}
},
}
// Every time the is a message in the channel we search

View File

@ -80,15 +80,8 @@ where
}
let mut frozen = data.into_inner().freeze()?;
for result in frozen.iter_and_clear_removed()? {
let (docid, _) = result.map_err(InternalError::SerdeJson)?;
geojson_sender.delete_geojson(docid).unwrap();
}
for result in frozen.iter_and_clear_inserted()? {
let (docid, geojson) = result.map_err(InternalError::SerdeJson)?;
geojson_sender.send_geojson(docid, geojson).unwrap();
}
frozen.iter_and_clear_removed(geojson_sender)?;
frozen.iter_and_clear_inserted(geojson_sender)?;
}
Ok(())

View File

@ -1767,7 +1767,10 @@ impl InnerIndexSettingsDiff {
}
pub fn any_reindexing_needed(&self) -> bool {
self.reindex_searchable() || self.reindex_facets() || self.reindex_vectors() || self.reindex_geojson()
self.reindex_searchable()
|| self.reindex_facets()
|| self.reindex_vectors()
|| self.reindex_geojson()
}
pub fn reindex_searchable(&self) -> bool {