fix the old indexer

This commit is contained in:
Tamo
2025-07-17 18:21:48 +02:00
parent 0319a46a7a
commit eefd005599
15 changed files with 604 additions and 413 deletions

906
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -527,6 +527,7 @@ impl ErrorCode for milli::Error {
| UserError::DocumentEditionCompilationError(_) => { | UserError::DocumentEditionCompilationError(_) => {
Code::EditDocumentsByFunctionError Code::EditDocumentsByFunctionError
} }
UserError::CelluliteError(_) => Code::Internal,
} }
} }
} }

View File

@ -99,7 +99,7 @@ url = "2.5.4"
hashbrown = "0.15.4" hashbrown = "0.15.4"
bumpalo = "3.18.1" bumpalo = "3.18.1"
bumparaw-collections = "0.1.4" bumparaw-collections = "0.1.4"
steppe = { version = "0.4.0", default-features = false } steppe = { version = "0.4", default-features = false }
thread_local = "1.1.9" thread_local = "1.1.9"
allocator-api2 = "0.3.0" allocator-api2 = "0.3.0"
rustc-hash = "2.1.1" rustc-hash = "2.1.1"

View File

@ -101,6 +101,12 @@ pub enum SerializationError {
InvalidNumberSerialization, InvalidNumberSerialization,
} }
impl From<cellulite::Error> for Error {
fn from(error: cellulite::Error) -> Self {
Self::UserError(UserError::CelluliteError(error))
}
}
#[derive(Error, Debug)] #[derive(Error, Debug)]
pub enum FieldIdMapMissingEntry { pub enum FieldIdMapMissingEntry {
#[error("unknown field id {field_id} coming from the {process} process")] #[error("unknown field id {field_id} coming from the {process} process")]
@ -111,6 +117,8 @@ pub enum FieldIdMapMissingEntry {
#[derive(Error, Debug)] #[derive(Error, Debug)]
pub enum UserError { pub enum UserError {
#[error(transparent)]
CelluliteError(#[from] cellulite::Error),
#[error("A document cannot contain more than 65,535 fields.")] #[error("A document cannot contain more than 65,535 fields.")]
AttributeLimitReached, AttributeLimitReached,
#[error(transparent)] #[error(transparent)]

View File

@ -6,7 +6,7 @@ use heed::RoTxn;
use super::FieldsIdsMap; use super::FieldsIdsMap;
use crate::attribute_patterns::{match_field_legacy, PatternMatch}; use crate::attribute_patterns::{match_field_legacy, PatternMatch};
use crate::constants::{RESERVED_GEO_FIELD_NAME, RESERVED_VECTORS_FIELD_NAME}; use crate::constants::{RESERVED_GEOJSON_FIELD_NAME, RESERVED_GEO_FIELD_NAME, RESERVED_VECTORS_FIELD_NAME};
use crate::{ use crate::{
is_faceted_by, FieldId, FilterableAttributesFeatures, FilterableAttributesRule, Index, is_faceted_by, FieldId, FilterableAttributesFeatures, FilterableAttributesRule, Index,
LocalizedAttributesRule, Result, Weight, LocalizedAttributesRule, Result, Weight,
@ -24,6 +24,8 @@ pub struct Metadata {
pub asc_desc: bool, pub asc_desc: bool,
/// The field is a geo field (`_geo`, `_geo.lat`, `_geo.lng`). /// The field is a geo field (`_geo`, `_geo.lat`, `_geo.lng`).
pub geo: bool, pub geo: bool,
/// The field is a geo json field (`_geojson`).
pub geo_json: bool,
/// The id of the localized attributes rule if the field is localized. /// The id of the localized attributes rule if the field is localized.
pub localized_attributes_rule_id: Option<NonZeroU16>, pub localized_attributes_rule_id: Option<NonZeroU16>,
/// The id of the filterable attributes rule if the field is filterable. /// The id of the filterable attributes rule if the field is filterable.
@ -269,6 +271,7 @@ impl MetadataBuilder {
distinct: false, distinct: false,
asc_desc: false, asc_desc: false,
geo: false, geo: false,
geo_json: false,
localized_attributes_rule_id: None, localized_attributes_rule_id: None,
filterable_attributes_rule_id: None, filterable_attributes_rule_id: None,
}; };
@ -295,6 +298,20 @@ impl MetadataBuilder {
distinct: false, distinct: false,
asc_desc: false, asc_desc: false,
geo: true, geo: true,
geo_json: false,
localized_attributes_rule_id: None,
filterable_attributes_rule_id,
};
}
if match_field_legacy(RESERVED_GEOJSON_FIELD_NAME, field) == PatternMatch::Match {
debug_assert!(!sortable, "geojson fields should not be sortable");
return Metadata {
searchable: None,
sortable,
distinct: false,
asc_desc: false,
geo: false,
geo_json: true,
localized_attributes_rule_id: None, localized_attributes_rule_id: None,
filterable_attributes_rule_id, filterable_attributes_rule_id,
}; };
@ -328,6 +345,7 @@ impl MetadataBuilder {
distinct, distinct,
asc_desc, asc_desc,
geo: false, geo: false,
geo_json: false,
localized_attributes_rule_id, localized_attributes_rule_id,
filterable_attributes_rule_id, filterable_attributes_rule_id,
} }

View File

@ -343,3 +343,27 @@ impl Step for arroy::SubStep {
self.max self.max
} }
} }
// Integration with steppe
impl steppe::Progress for Progress {
fn update(&self, sub_progress: impl steppe::Step) {
self.update_progress(Compat(sub_progress));
}
}
struct Compat<T: steppe::Step>(T);
impl<T: steppe::Step> Step for Compat<T> {
fn name(&self) -> Cow<'static, str> {
self.0.name().into()
}
fn current(&self) -> u32 {
self.0.current().try_into().unwrap_or(u32::MAX)
}
fn total(&self) -> u32 {
self.0.total().try_into().unwrap_or(u32::MAX)
}
}

View File

@ -842,10 +842,12 @@ impl<'a> Filter<'a> {
), ),
Vec::new(), Vec::new(),
); );
let cellulite = cellulite::Writer::new(index.cellulite); let cellulite = cellulite::Cellulite::new(index.cellulite);
let result = cellulite let result = cellulite
.in_shape(rtxn, &polygon.into(), &mut |_| ()) .in_shape(rtxn, &polygon.into(), &mut |_| ())
.map_err(InternalError::CelluliteError)?; .map_err(InternalError::CelluliteError)?;
// TODO: Remove once we update roaring
let result = roaring::RoaringBitmap::from_iter(result.into_iter());
Ok(result) Ok(result)
} else { } else {
Err(points[0][0].as_external_error(FilterError::AttributeNotFilterable { Err(points[0][0].as_external_error(FilterError::AttributeNotFilterable {

View File

@ -10,7 +10,7 @@ use crate::error::GeoError;
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
use crate::update::index_documents::extract_finite_float_from_value; use crate::update::index_documents::extract_finite_float_from_value;
use crate::update::settings::{InnerIndexSettings, InnerIndexSettingsDiff}; use crate::update::settings::{InnerIndexSettings, InnerIndexSettingsDiff};
use crate::{FieldId, InternalError, Result}; use crate::{DocumentId, FieldId, InternalError, Result};
/// Extracts the geographical coordinates contained in each document under the `_geo` field. /// Extracts the geographical coordinates contained in each document under the `_geo` field.
/// ///
@ -158,7 +158,7 @@ pub fn extract_geojson<R: io::Read + io::Seek>(
obkv.insert(DelAdd::Addition, geojson.to_string().as_bytes())?; obkv.insert(DelAdd::Addition, geojson.to_string().as_bytes())?;
} }
let bytes = obkv.into_inner()?; let bytes = obkv.into_inner()?;
writer.insert(docid_bytes, bytes)?; writer.insert(&docid_bytes[0..std::mem::size_of::<DocumentId>()], bytes)?;
} }
} }
@ -172,13 +172,15 @@ fn extract_geojson_field(
document_id: impl Fn() -> Value, document_id: impl Fn() -> Value,
) -> Result<Option<GeoJson>> { ) -> Result<Option<GeoJson>> {
match settings.geojson_fid { match settings.geojson_fid {
Some(fid) => { Some(fid) if settings.filterable_attributes_rules.iter().any(|rule| rule.has_geojson()) => {
let value = obkv.get(fid).map(KvReaderDelAdd::from_slice).and_then(|r| r.get(deladd)); let value = obkv.get(fid).map(KvReaderDelAdd::from_slice).and_then(|r| r.get(deladd));
// TODO: That's a user error, not an internal error // TODO: That's a user error, not an internal error
Ok(value Ok(value
.map(|v| serde_json::from_slice(v).map_err(InternalError::SerdeJson)) .map(|v| serde_json::from_slice(v).map_err(InternalError::SerdeJson))
.transpose()?) .transpose()?)
} }
None => Ok(None), _ => {
Ok(None)
}
} }
} }

View File

@ -243,9 +243,9 @@ fn send_original_documents_data(
let original_documents_chunk = let original_documents_chunk =
original_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?; original_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?;
tracing::info!("Do we have a geojson"); tracing::warn!("Do we have a geojson");
if settings_diff.run_geojson_indexing() { if settings_diff.reindex_geojson() {
tracing::info!("Yes we do"); tracing::warn!("Yes we do");
let documents_chunk_cloned = original_documents_chunk.clone(); let documents_chunk_cloned = original_documents_chunk.clone();
let lmdb_writer_sx_cloned = lmdb_writer_sx.clone(); let lmdb_writer_sx_cloned = lmdb_writer_sx.clone();
let settings_diff = settings_diff.clone(); let settings_diff = settings_diff.clone();

View File

@ -541,6 +541,10 @@ where
.map_err(InternalError::from)??; .map_err(InternalError::from)??;
} }
tracing::warn!("Building cellulite");
let cellulite = cellulite::Cellulite::new(self.index.cellulite);
cellulite.build(self.wtxn, &Progress::default())?;
self.execute_prefix_databases( self.execute_prefix_databases(
word_docids.map(MergerBuilder::build), word_docids.map(MergerBuilder::build),
exact_word_docids.map(MergerBuilder::build), exact_word_docids.map(MergerBuilder::build),

View File

@ -820,7 +820,7 @@ impl<'a, 'i> Transform<'a, 'i> {
let documents_count = documents_ids.len() as usize; let documents_count = documents_ids.len() as usize;
// We initialize the sorter with the user indexing settings. // We initialize the sorter with the user indexing settings.
let mut original_sorter = if settings_diff.reindex_vectors() { let mut original_sorter = if settings_diff.reindex_vectors() || settings_diff.reindex_geojson() {
Some(create_sorter( Some(create_sorter(
grenad::SortAlgorithm::Stable, grenad::SortAlgorithm::Stable,
KeepFirst, KeepFirst,

View File

@ -629,26 +629,25 @@ pub(crate) fn write_typed_chunk_into_index(
} }
let merger = builder.build(); let merger = builder.build();
let cellulite = index.cellulite; let cellulite = cellulite::Cellulite::new(index.cellulite);
let mut iter = merger.into_stream_merger_iter()?; let mut iter = merger.into_stream_merger_iter()?;
while let Some((key, value)) = iter.next()? { while let Some((key, value)) = iter.next()? {
// convert the key back to a u32 (4 bytes) // convert the key back to a u32 (4 bytes)
tracing::warn!("Key: {:?}, length: {}", key, key.len());
let docid = key.try_into().map(DocumentId::from_be_bytes).unwrap(); let docid = key.try_into().map(DocumentId::from_be_bytes).unwrap();
let deladd_obkv = KvReaderDelAdd::from_slice(value); let deladd_obkv = KvReaderDelAdd::from_slice(value);
if let Some(_value) = deladd_obkv.get(DelAdd::Deletion) { if let Some(_value) = deladd_obkv.get(DelAdd::Deletion) {
todo!("handle deletion"); cellulite.delete(wtxn, docid)?;
// cellulite.remove(&docid);
} }
if let Some(value) = deladd_obkv.get(DelAdd::Addition) { if let Some(value) = deladd_obkv.get(DelAdd::Addition) {
tracing::info!("Adding one geojson to cellulite"); tracing::warn!("Adding one geojson to cellulite");
let geojson = let geojson =
geojson::GeoJson::from_reader(value).map_err(UserError::SerdeJson)?; geojson::GeoJson::from_reader(value).map_err(UserError::SerdeJson)?;
let writer = cellulite::Writer::new(index.cellulite); cellulite
writer .add(wtxn, docid, &geojson)
.add_item(wtxn, docid, &geojson)
.map_err(InternalError::CelluliteError)?; .map_err(InternalError::CelluliteError)?;
} }
} }

View File

@ -166,6 +166,10 @@ where
indexing_context.progress.update_progress(IndexingStep::WritingEmbeddingsToDatabase); indexing_context.progress.update_progress(IndexingStep::WritingEmbeddingsToDatabase);
let cellulite = cellulite::Cellulite::new(index.cellulite);
cellulite.build(wtxn, indexing_context.progress)?;
pool.install(|| { pool.install(|| {
build_vectors( build_vectors(
index, index,

View File

@ -73,8 +73,8 @@ pub fn write_to_db(
writer.add_item_in_store(wtxn, docid, extractor_id, embedding)?; writer.add_item_in_store(wtxn, docid, extractor_id, embedding)?;
} }
ReceiverAction::GeoJson(docid, geojson) => { ReceiverAction::GeoJson(docid, geojson) => {
let cellulite = cellulite::Writer::new(index.cellulite); let cellulite = cellulite::Cellulite::new(index.cellulite);
cellulite.add_item(wtxn, docid, &geojson).map_err(InternalError::CelluliteError)?; cellulite.add(wtxn, docid, &geojson).map_err(InternalError::CelluliteError)?;
} }
} }

View File

@ -1862,7 +1862,7 @@ impl InnerIndexSettingsDiff {
} }
pub fn any_reindexing_needed(&self) -> bool { pub fn any_reindexing_needed(&self) -> bool {
self.reindex_searchable() || self.reindex_facets() || self.reindex_vectors() self.reindex_searchable() || self.reindex_facets() || self.reindex_vectors() || self.reindex_geojson()
} }
pub fn reindex_searchable(&self) -> bool { pub fn reindex_searchable(&self) -> bool {
@ -1971,6 +1971,11 @@ impl InnerIndexSettingsDiff {
!self.embedding_config_updates.is_empty() !self.embedding_config_updates.is_empty()
} }
pub fn reindex_geojson(&self) -> bool {
self.old.filterable_attributes_rules.iter().any(|rule| rule.has_geojson())
!= self.new.filterable_attributes_rules.iter().any(|rule| rule.has_geojson())
}
pub fn settings_update_only(&self) -> bool { pub fn settings_update_only(&self) -> bool {
self.settings_update_only self.settings_update_only
} }
@ -1981,8 +1986,6 @@ impl InnerIndexSettingsDiff {
} }
pub fn run_geojson_indexing(&self) -> bool { pub fn run_geojson_indexing(&self) -> bool {
tracing::info!("old.geojson_fid: {:?}", self.old.geojson_fid);
tracing::info!("new.geojson_fid: {:?}", self.new.geojson_fid);
self.old.geojson_fid != self.new.geojson_fid self.old.geojson_fid != self.new.geojson_fid
|| (!self.settings_update_only && self.new.geojson_fid.is_some()) || (!self.settings_update_only && self.new.geojson_fid.is_some())
} }