update to the latest version of cellulite and steppe

This commit is contained in:
Tamo
2025-07-22 16:24:49 +02:00
parent 44dc64accb
commit e923154c90
11 changed files with 97 additions and 140 deletions

View File

@ -20,8 +20,8 @@ bytemuck = { version = "1.23.1", features = ["extern_crate_alloc"] }
byteorder = "1.5.0"
# cellulite = { git = "https://github.com/irevoire/cellulite", branch = "main"}
cellulite = { path = "../../../cellulite" }
# steppe = { path = "../../../steppe" }
steppe = "0.3.0"
steppe = { path = "../../../steppe" }
# steppe = "0.3.0"
charabia = { version = "0.9.6", default-features = false }
concat-arrays = "0.1.2"
convert_case = "0.8.0"

View File

@ -184,7 +184,7 @@ pub struct Index {
pub vector_arroy: arroy::Database<Unspecified>,
/// Geo store based on cellulite™.
pub cellulite: cellulite::Database,
pub cellulite: cellulite::Cellulite,
/// Maps the document id to the document as an obkv store.
pub(crate) documents: Database<BEU32, ObkvCodec>,
@ -242,7 +242,7 @@ impl Index {
let embedder_category_id =
env.create_database(&mut wtxn, Some(VECTOR_EMBEDDER_CATEGORY_ID))?;
let vector_arroy = env.create_database(&mut wtxn, Some(VECTOR_ARROY))?;
let cellulite = env.create_database(&mut wtxn, Some(CELLULITE))?;
let cellulite = cellulite::Cellulite::create_from_env(&env, &mut wtxn)?;
let documents = env.create_database(&mut wtxn, Some(DOCUMENTS))?;

View File

@ -794,8 +794,7 @@ impl<'a> Filter<'a> {
),
Vec::new(),
);
let cellulite = cellulite::Cellulite::new(index.cellulite);
let result = cellulite
let result = index.cellulite
.in_shape(rtxn, &polygon.into(), &mut |_| ())
.map_err(InternalError::CelluliteError)?;
// TODO: Remove once we update roaring

View File

@ -540,8 +540,7 @@ where
}
tracing::warn!("Building cellulite");
let cellulite = cellulite::Cellulite::new(self.index.cellulite);
cellulite.build(self.wtxn, &Progress::default())?;
self.index.cellulite.build(self.wtxn, &Progress::default())?;
self.execute_prefix_databases(
word_docids.map(MergerBuilder::build),

View File

@ -629,8 +629,6 @@ pub(crate) fn write_typed_chunk_into_index(
}
let merger = builder.build();
let cellulite = cellulite::Cellulite::new(index.cellulite);
let mut iter = merger.into_stream_merger_iter()?;
while let Some((key, value)) = iter.next()? {
// convert the key back to a u32 (4 bytes)
@ -639,14 +637,14 @@ pub(crate) fn write_typed_chunk_into_index(
let deladd_obkv = KvReaderDelAdd::from_slice(value);
if let Some(_value) = deladd_obkv.get(DelAdd::Deletion) {
cellulite.delete(wtxn, docid)?;
index.cellulite.delete(wtxn, docid)?;
}
if let Some(value) = deladd_obkv.get(DelAdd::Addition) {
tracing::warn!("Adding one geojson to cellulite");
let geojson =
geojson::GeoJson::from_reader(value).map_err(UserError::SerdeJson)?;
cellulite
index.cellulite
.add(wtxn, docid, &geojson)
.map_err(InternalError::CelluliteError)?;
}

View File

@ -469,7 +469,6 @@ pub enum Database {
FieldIdDocidFacetStrings,
FieldIdDocidFacetF64s,
VectorEmbedderCategoryId,
Cellulite,
}
impl Database {
@ -492,7 +491,6 @@ impl Database {
Database::FieldIdDocidFacetStrings => index.field_id_docid_facet_strings.remap_types(),
Database::FieldIdDocidFacetF64s => index.field_id_docid_facet_f64s.remap_types(),
Database::VectorEmbedderCategoryId => index.embedder_category_id.remap_types(),
Database::Cellulite => index.cellulite.remap_types(),
}
}
@ -515,7 +513,6 @@ impl Database {
Database::FieldIdDocidFacetStrings => db_name::FIELD_ID_DOCID_FACET_STRINGS,
Database::FieldIdDocidFacetF64s => db_name::FIELD_ID_DOCID_FACET_F64S,
Database::VectorEmbedderCategoryId => db_name::VECTOR_EMBEDDER_CATEGORY_ID,
Database::Cellulite => db_name::CELLULITE,
}
}
}

View File

@ -1,24 +1,21 @@
use std::cell::RefCell;
use std::fs::File;
use std::io::{self, BufReader, BufWriter, ErrorKind, Read, Seek as _, Write as _};
use std::io::{self, BufReader, BufWriter, ErrorKind, Seek as _, Write as _};
use std::str::FromStr;
use std::{iter, mem, result};
use std::{iter, mem};
use bumpalo::Bump;
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use geojson::GeoJson;
use heed::RoTxn;
use serde_json::value::RawValue;
use serde_json::Value;
use crate::error::GeoError;
use crate::update::new::document::{Document, DocumentContext};
use crate::update::new::indexer::document_changes::Extractor;
use crate::update::new::ref_cell_ext::RefCellExt as _;
use crate::update::new::thread_local::MostlySend;
use crate::update::new::DocumentChange;
use crate::update::GrenadParameters;
use crate::{lat_lng_to_xyz, DocumentId, GeoPoint, Index, InternalError, Result, UserError};
use crate::{DocumentId, Index, Result, UserError};
pub struct GeoJsonExtractor {
grenad_parameters: GrenadParameters,
@ -38,25 +35,6 @@ impl GeoJsonExtractor {
}
}
/*
#[derive(Pod, Zeroable, Copy, Clone)]
#[repr(C, packed)]
pub struct ExtractedGeoPoint {
pub docid: DocumentId,
pub lat_lng: [f64; 2],
}
impl From<ExtractedGeoPoint> for GeoPoint {
/// Converts the latitude and longitude back to an xyz GeoPoint.
fn from(value: ExtractedGeoPoint) -> Self {
let [lat, lng] = value.lat_lng;
let point = [lat, lng];
let xyz_point = lat_lng_to_xyz(&point);
GeoPoint::new(xyz_point, (value.docid, point))
}
}
*/
pub struct GeoJsonExtractorData<'extractor> {
/// The set of documents ids that were removed. If a document sees its geo
/// point being updated, we first put it in the deleted and then in the inserted.
@ -265,94 +243,3 @@ impl<'extractor> Extractor<'extractor> for GeoJsonExtractor {
Ok(())
}
}
/// Extracts and validates the latitude and latitude from a document geo field.
///
/// It can be of the form `{ "lat": 0.0, "lng": "1.0" }`.
pub fn extract_geo_coordinates(
external_id: &str,
raw_value: &RawValue,
) -> Result<Option<[f64; 2]>> {
let mut geo = match serde_json::from_str(raw_value.get()).map_err(InternalError::SerdeJson)? {
Value::Null => return Ok(None),
Value::Object(map) => map,
value => {
return Err(Box::new(GeoError::NotAnObject {
document_id: Value::from(external_id),
value,
})
.into())
}
};
let [lat, lng] = match (geo.remove("lat"), geo.remove("lng")) {
(Some(lat), Some(lng)) => {
if geo.is_empty() {
[lat, lng]
} else {
return Err(Box::new(GeoError::UnexpectedExtraFields {
document_id: Value::from(external_id),
value: Value::from(geo),
})
.into());
}
}
(Some(_), None) => {
return Err(Box::new(GeoError::MissingLongitude {
document_id: Value::from(external_id),
})
.into())
}
(None, Some(_)) => {
return Err(Box::new(GeoError::MissingLatitude {
document_id: Value::from(external_id),
})
.into())
}
(None, None) => {
return Err(Box::new(GeoError::MissingLatitudeAndLongitude {
document_id: Value::from(external_id),
})
.into())
}
};
match (extract_finite_float_from_value(lat), extract_finite_float_from_value(lng)) {
(Ok(lat), Ok(lng)) => Ok(Some([lat, lng])),
(Ok(_), Err(value)) => {
Err(Box::new(GeoError::BadLongitude { document_id: Value::from(external_id), value })
.into())
}
(Err(value), Ok(_)) => {
Err(Box::new(GeoError::BadLatitude { document_id: Value::from(external_id), value })
.into())
}
(Err(lat), Err(lng)) => Err(Box::new(GeoError::BadLatitudeAndLongitude {
document_id: Value::from(external_id),
lat,
lng,
})
.into()),
}
}
/// Extracts and validate that a serde JSON Value is actually a finite f64.
pub fn extract_finite_float_from_value(value: Value) -> result::Result<f64, Value> {
let number = match value {
Value::Number(ref n) => match n.as_f64() {
Some(number) => number,
None => return Err(value),
},
Value::String(ref s) => match s.parse::<f64>() {
Ok(number) => number,
Err(_) => return Err(value),
},
value => return Err(value),
};
if number.is_finite() {
Ok(number)
} else {
Err(value)
}
}

View File

@ -163,9 +163,7 @@ where
indexing_context.progress.update_progress(IndexingStep::WritingEmbeddingsToDatabase);
let cellulite = cellulite::Cellulite::new(index.cellulite);
cellulite.build(wtxn, indexing_context.progress)?;
index.cellulite.build(wtxn, indexing_context.progress)?;
pool.install(|| {
build_vectors(

View File

@ -32,7 +32,6 @@ pub fn write_to_db(
let _entered = span.enter();
let span = tracing::trace_span!(target: "indexing::write_db", "post_merge");
let mut _entered_post_merge = None;
let cellulite = cellulite::Cellulite::new(index.cellulite);
while let Some(action) = writer_receiver.recv_action() {
if _entered_post_merge.is_none()
&& finished_extraction.load(std::sync::atomic::Ordering::Relaxed)
@ -76,10 +75,10 @@ pub fn write_to_db(
ReceiverAction::GeoJson(docid, geojson) => {
match geojson {
Some(geojson) => {
cellulite.add(wtxn, docid, &geojson).map_err(InternalError::CelluliteError)?;
index.cellulite.add(wtxn, docid, &geojson).map_err(InternalError::CelluliteError)?;
}
None => {
cellulite.delete(wtxn, docid).map_err(InternalError::CelluliteError)?;
index.cellulite.delete(wtxn, docid).map_err(InternalError::CelluliteError)?;
}
}
}

View File

@ -82,6 +82,7 @@ where
let mut frozen = data.into_inner().freeze()?;
for result in frozen.iter_and_clear_removed()? {
let extracted_geo_point = result.map_err(InternalError::SerdeJson)?;
/// Fix that
todo!("We must send the docid instead of the geojson");
/*
let removed = cellulite.remove(&GeoJsonPoint::from(extracted_geo_point));