fix the old indexer

This commit is contained in:
Tamo
2025-07-17 18:21:48 +02:00
parent e510d4a8a3
commit d80edead01
15 changed files with 123 additions and 33 deletions

View File

@ -20,6 +20,8 @@ bytemuck = { version = "1.23.1", features = ["extern_crate_alloc"] }
byteorder = "1.5.0"
# cellulite = { git = "https://github.com/irevoire/cellulite", branch = "main"}
cellulite = { path = "../../../cellulite" }
# steppe = { path = "../../../steppe" }
steppe = "0.3.0"
charabia = { version = "0.9.6", default-features = false }
concat-arrays = "0.1.2"
convert_case = "0.8.0"

View File

@ -99,6 +99,12 @@ pub enum SerializationError {
InvalidNumberSerialization,
}
impl From<cellulite::Error> for Error {
fn from(error: cellulite::Error) -> Self {
Self::UserError(UserError::CelluliteError(error))
}
}
#[derive(Error, Debug)]
pub enum FieldIdMapMissingEntry {
#[error("unknown field id {field_id} coming from the {process} process")]
@ -109,6 +115,8 @@ pub enum FieldIdMapMissingEntry {
#[derive(Error, Debug)]
pub enum UserError {
#[error(transparent)]
CelluliteError(#[from] cellulite::Error),
#[error("A document cannot contain more than 65,535 fields.")]
AttributeLimitReached,
#[error(transparent)]

View File

@ -6,7 +6,7 @@ use heed::RoTxn;
use super::FieldsIdsMap;
use crate::attribute_patterns::{match_field_legacy, PatternMatch};
use crate::constants::{RESERVED_GEO_FIELD_NAME, RESERVED_VECTORS_FIELD_NAME};
use crate::constants::{RESERVED_GEOJSON_FIELD_NAME, RESERVED_GEO_FIELD_NAME, RESERVED_VECTORS_FIELD_NAME};
use crate::{
is_faceted_by, FieldId, FilterableAttributesFeatures, FilterableAttributesRule, Index,
LocalizedAttributesRule, Result, Weight,
@ -24,6 +24,8 @@ pub struct Metadata {
pub asc_desc: bool,
/// The field is a geo field (`_geo`, `_geo.lat`, `_geo.lng`).
pub geo: bool,
/// The field is a geo json field (`_geojson`).
pub geo_json: bool,
/// The id of the localized attributes rule if the field is localized.
pub localized_attributes_rule_id: Option<NonZeroU16>,
/// The id of the filterable attributes rule if the field is filterable.
@ -269,6 +271,7 @@ impl MetadataBuilder {
distinct: false,
asc_desc: false,
geo: false,
geo_json: false,
localized_attributes_rule_id: None,
filterable_attributes_rule_id: None,
};
@ -295,6 +298,20 @@ impl MetadataBuilder {
distinct: false,
asc_desc: false,
geo: true,
geo_json: false,
localized_attributes_rule_id: None,
filterable_attributes_rule_id,
};
}
if match_field_legacy(RESERVED_GEOJSON_FIELD_NAME, field) == PatternMatch::Match {
debug_assert!(!sortable, "geojson fields should not be sortable");
return Metadata {
searchable: None,
sortable,
distinct: false,
asc_desc: false,
geo: false,
geo_json: true,
localized_attributes_rule_id: None,
filterable_attributes_rule_id,
};
@ -328,6 +345,7 @@ impl MetadataBuilder {
distinct,
asc_desc,
geo: false,
geo_json: false,
localized_attributes_rule_id,
filterable_attributes_rule_id,
}

View File

@ -317,3 +317,27 @@ impl Step for arroy::SubStep {
self.max
}
}
// Integration with steppe
impl steppe::Progress for Progress {
fn update(&self, sub_progress: impl steppe::Step) {
self.update_progress(Compat(sub_progress));
}
}
struct Compat<T: steppe::Step>(T);
impl<T: steppe::Step> Step for Compat<T> {
fn name(&self) -> Cow<'static, str> {
self.0.name().into()
}
fn current(&self) -> u32 {
self.0.current().try_into().unwrap_or(u32::MAX)
}
fn total(&self) -> u32 {
self.0.total().try_into().unwrap_or(u32::MAX)
}
}

View File

@ -794,10 +794,12 @@ impl<'a> Filter<'a> {
),
Vec::new(),
);
let cellulite = cellulite::Writer::new(index.cellulite);
let cellulite = cellulite::Cellulite::new(index.cellulite);
let result = cellulite
.in_shape(rtxn, &polygon.into(), &mut |_| ())
.map_err(InternalError::CelluliteError)?;
// TODO: Remove once we update roaring
let result = roaring::RoaringBitmap::from_iter(result.into_iter());
Ok(result)
} else {
Err(points[0][0].as_external_error(FilterError::AttributeNotFilterable {

View File

@ -10,7 +10,7 @@ use crate::error::GeoError;
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
use crate::update::index_documents::extract_finite_float_from_value;
use crate::update::settings::{InnerIndexSettings, InnerIndexSettingsDiff};
use crate::{FieldId, InternalError, Result};
use crate::{DocumentId, FieldId, InternalError, Result};
/// Extracts the geographical coordinates contained in each document under the `_geo` field.
///
@ -158,7 +158,7 @@ pub fn extract_geojson<R: io::Read + io::Seek>(
obkv.insert(DelAdd::Addition, geojson.to_string().as_bytes())?;
}
let bytes = obkv.into_inner()?;
writer.insert(docid_bytes, bytes)?;
writer.insert(&docid_bytes[0..std::mem::size_of::<DocumentId>()], bytes)?;
}
}
@ -172,13 +172,15 @@ fn extract_geojson_field(
document_id: impl Fn() -> Value,
) -> Result<Option<GeoJson>> {
match settings.geojson_fid {
Some(fid) => {
Some(fid) if settings.filterable_attributes_rules.iter().any(|rule| rule.has_geojson()) => {
let value = obkv.get(fid).map(KvReaderDelAdd::from_slice).and_then(|r| r.get(deladd));
// TODO: That's a user error, not an internal error
Ok(value
.map(|v| serde_json::from_slice(v).map_err(InternalError::SerdeJson))
.transpose()?)
}
None => Ok(None),
_ => {
Ok(None)
}
}
}

View File

@ -243,9 +243,9 @@ fn send_original_documents_data(
let original_documents_chunk =
original_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?;
tracing::info!("Do we have a geojson");
if settings_diff.run_geojson_indexing() {
tracing::info!("Yes we do");
tracing::warn!("Do we have a geojson");
if settings_diff.reindex_geojson() {
tracing::warn!("Yes we do");
let documents_chunk_cloned = original_documents_chunk.clone();
let lmdb_writer_sx_cloned = lmdb_writer_sx.clone();
let settings_diff = settings_diff.clone();

View File

@ -539,6 +539,10 @@ where
.map_err(InternalError::from)??;
}
tracing::warn!("Building cellulite");
let cellulite = cellulite::Cellulite::new(self.index.cellulite);
cellulite.build(self.wtxn, &Progress::default())?;
self.execute_prefix_databases(
word_docids.map(MergerBuilder::build),
exact_word_docids.map(MergerBuilder::build),

View File

@ -820,7 +820,7 @@ impl<'a, 'i> Transform<'a, 'i> {
let documents_count = documents_ids.len() as usize;
// We initialize the sorter with the user indexing settings.
let mut original_sorter = if settings_diff.reindex_vectors() {
let mut original_sorter = if settings_diff.reindex_vectors() || settings_diff.reindex_geojson() {
Some(create_sorter(
grenad::SortAlgorithm::Stable,
KeepFirst,

View File

@ -629,26 +629,25 @@ pub(crate) fn write_typed_chunk_into_index(
}
let merger = builder.build();
let cellulite = index.cellulite;
let cellulite = cellulite::Cellulite::new(index.cellulite);
let mut iter = merger.into_stream_merger_iter()?;
while let Some((key, value)) = iter.next()? {
// convert the key back to a u32 (4 bytes)
tracing::warn!("Key: {:?}, length: {}", key, key.len());
let docid = key.try_into().map(DocumentId::from_be_bytes).unwrap();
let deladd_obkv = KvReaderDelAdd::from_slice(value);
if let Some(_value) = deladd_obkv.get(DelAdd::Deletion) {
todo!("handle deletion");
// cellulite.remove(&docid);
cellulite.delete(wtxn, docid)?;
}
if let Some(value) = deladd_obkv.get(DelAdd::Addition) {
tracing::info!("Adding one geojson to cellulite");
tracing::warn!("Adding one geojson to cellulite");
let geojson =
geojson::GeoJson::from_reader(value).map_err(UserError::SerdeJson)?;
let writer = cellulite::Writer::new(index.cellulite);
writer
.add_item(wtxn, docid, &geojson)
cellulite
.add(wtxn, docid, &geojson)
.map_err(InternalError::CelluliteError)?;
}
}

View File

@ -163,6 +163,10 @@ where
indexing_context.progress.update_progress(IndexingStep::WritingEmbeddingsToDatabase);
let cellulite = cellulite::Cellulite::new(index.cellulite);
cellulite.build(wtxn, indexing_context.progress)?;
pool.install(|| {
build_vectors(
index,

View File

@ -73,8 +73,8 @@ pub fn write_to_db(
writer.add_item_in_store(wtxn, docid, extractor_id, embedding)?;
}
ReceiverAction::GeoJson(docid, geojson) => {
let cellulite = cellulite::Writer::new(index.cellulite);
cellulite.add_item(wtxn, docid, &geojson).map_err(InternalError::CelluliteError)?;
let cellulite = cellulite::Cellulite::new(index.cellulite);
cellulite.add(wtxn, docid, &geojson).map_err(InternalError::CelluliteError)?;
}
}

View File

@ -1767,7 +1767,7 @@ impl InnerIndexSettingsDiff {
}
pub fn any_reindexing_needed(&self) -> bool {
self.reindex_searchable() || self.reindex_facets() || self.reindex_vectors()
self.reindex_searchable() || self.reindex_facets() || self.reindex_vectors() || self.reindex_geojson()
}
pub fn reindex_searchable(&self) -> bool {
@ -1876,6 +1876,11 @@ impl InnerIndexSettingsDiff {
!self.embedding_config_updates.is_empty()
}
pub fn reindex_geojson(&self) -> bool {
self.old.filterable_attributes_rules.iter().any(|rule| rule.has_geojson())
!= self.new.filterable_attributes_rules.iter().any(|rule| rule.has_geojson())
}
pub fn settings_update_only(&self) -> bool {
self.settings_update_only
}
@ -1886,8 +1891,6 @@ impl InnerIndexSettingsDiff {
}
pub fn run_geojson_indexing(&self) -> bool {
tracing::info!("old.geojson_fid: {:?}", self.old.geojson_fid);
tracing::info!("new.geojson_fid: {:?}", self.new.geojson_fid);
self.old.geojson_fid != self.new.geojson_fid
|| (!self.settings_update_only && self.new.geojson_fid.is_some())
}