mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-09-18 18:56:25 +00:00
update to the latest version of cellulite and steppe
This commit is contained in:
@ -185,7 +185,7 @@ pub struct Index {
|
|||||||
pub vector_store: hannoy::Database<Unspecified>,
|
pub vector_store: hannoy::Database<Unspecified>,
|
||||||
|
|
||||||
/// Geo store based on cellulite™.
|
/// Geo store based on cellulite™.
|
||||||
pub cellulite: cellulite::Database,
|
pub cellulite: cellulite::Cellulite,
|
||||||
|
|
||||||
/// Maps the document id to the document as an obkv store.
|
/// Maps the document id to the document as an obkv store.
|
||||||
pub(crate) documents: Database<BEU32, ObkvCodec>,
|
pub(crate) documents: Database<BEU32, ObkvCodec>,
|
||||||
@ -243,7 +243,7 @@ impl Index {
|
|||||||
let embedder_category_id =
|
let embedder_category_id =
|
||||||
env.create_database(&mut wtxn, Some(VECTOR_EMBEDDER_CATEGORY_ID))?;
|
env.create_database(&mut wtxn, Some(VECTOR_EMBEDDER_CATEGORY_ID))?;
|
||||||
let vector_store = env.create_database(&mut wtxn, Some(VECTOR_STORE))?;
|
let vector_store = env.create_database(&mut wtxn, Some(VECTOR_STORE))?;
|
||||||
let cellulite = env.create_database(&mut wtxn, Some(CELLULITE))?;
|
let cellulite = cellulite::Cellulite::create_from_env(&env, &mut wtxn)?;
|
||||||
|
|
||||||
let documents = env.create_database(&mut wtxn, Some(DOCUMENTS))?;
|
let documents = env.create_database(&mut wtxn, Some(DOCUMENTS))?;
|
||||||
|
|
||||||
|
@ -842,8 +842,7 @@ impl<'a> Filter<'a> {
|
|||||||
),
|
),
|
||||||
Vec::new(),
|
Vec::new(),
|
||||||
);
|
);
|
||||||
let cellulite = cellulite::Cellulite::new(index.cellulite);
|
let result = index.cellulite
|
||||||
let result = cellulite
|
|
||||||
.in_shape(rtxn, &polygon.into(), &mut |_| ())
|
.in_shape(rtxn, &polygon.into(), &mut |_| ())
|
||||||
.map_err(InternalError::CelluliteError)?;
|
.map_err(InternalError::CelluliteError)?;
|
||||||
// TODO: Remove once we update roaring
|
// TODO: Remove once we update roaring
|
||||||
|
@ -542,8 +542,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
tracing::warn!("Building cellulite");
|
tracing::warn!("Building cellulite");
|
||||||
let cellulite = cellulite::Cellulite::new(self.index.cellulite);
|
self.index.cellulite.build(self.wtxn, &Progress::default())?;
|
||||||
cellulite.build(self.wtxn, &Progress::default())?;
|
|
||||||
|
|
||||||
self.execute_prefix_databases(
|
self.execute_prefix_databases(
|
||||||
word_docids.map(MergerBuilder::build),
|
word_docids.map(MergerBuilder::build),
|
||||||
|
@ -629,8 +629,6 @@ pub(crate) fn write_typed_chunk_into_index(
|
|||||||
}
|
}
|
||||||
let merger = builder.build();
|
let merger = builder.build();
|
||||||
|
|
||||||
let cellulite = cellulite::Cellulite::new(index.cellulite);
|
|
||||||
|
|
||||||
let mut iter = merger.into_stream_merger_iter()?;
|
let mut iter = merger.into_stream_merger_iter()?;
|
||||||
while let Some((key, value)) = iter.next()? {
|
while let Some((key, value)) = iter.next()? {
|
||||||
// convert the key back to a u32 (4 bytes)
|
// convert the key back to a u32 (4 bytes)
|
||||||
@ -639,14 +637,14 @@ pub(crate) fn write_typed_chunk_into_index(
|
|||||||
|
|
||||||
let deladd_obkv = KvReaderDelAdd::from_slice(value);
|
let deladd_obkv = KvReaderDelAdd::from_slice(value);
|
||||||
if let Some(_value) = deladd_obkv.get(DelAdd::Deletion) {
|
if let Some(_value) = deladd_obkv.get(DelAdd::Deletion) {
|
||||||
cellulite.delete(wtxn, docid)?;
|
index.cellulite.delete(wtxn, docid)?;
|
||||||
}
|
}
|
||||||
if let Some(value) = deladd_obkv.get(DelAdd::Addition) {
|
if let Some(value) = deladd_obkv.get(DelAdd::Addition) {
|
||||||
tracing::warn!("Adding one geojson to cellulite");
|
tracing::warn!("Adding one geojson to cellulite");
|
||||||
|
|
||||||
let geojson =
|
let geojson =
|
||||||
geojson::GeoJson::from_reader(value).map_err(UserError::SerdeJson)?;
|
geojson::GeoJson::from_reader(value).map_err(UserError::SerdeJson)?;
|
||||||
cellulite
|
index.cellulite
|
||||||
.add(wtxn, docid, &geojson)
|
.add(wtxn, docid, &geojson)
|
||||||
.map_err(InternalError::CelluliteError)?;
|
.map_err(InternalError::CelluliteError)?;
|
||||||
}
|
}
|
||||||
|
@ -469,7 +469,6 @@ pub enum Database {
|
|||||||
FieldIdDocidFacetStrings,
|
FieldIdDocidFacetStrings,
|
||||||
FieldIdDocidFacetF64s,
|
FieldIdDocidFacetF64s,
|
||||||
VectorEmbedderCategoryId,
|
VectorEmbedderCategoryId,
|
||||||
Cellulite,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Database {
|
impl Database {
|
||||||
@ -492,7 +491,6 @@ impl Database {
|
|||||||
Database::FieldIdDocidFacetStrings => index.field_id_docid_facet_strings.remap_types(),
|
Database::FieldIdDocidFacetStrings => index.field_id_docid_facet_strings.remap_types(),
|
||||||
Database::FieldIdDocidFacetF64s => index.field_id_docid_facet_f64s.remap_types(),
|
Database::FieldIdDocidFacetF64s => index.field_id_docid_facet_f64s.remap_types(),
|
||||||
Database::VectorEmbedderCategoryId => index.embedder_category_id.remap_types(),
|
Database::VectorEmbedderCategoryId => index.embedder_category_id.remap_types(),
|
||||||
Database::Cellulite => index.cellulite.remap_types(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -515,7 +513,6 @@ impl Database {
|
|||||||
Database::FieldIdDocidFacetStrings => db_name::FIELD_ID_DOCID_FACET_STRINGS,
|
Database::FieldIdDocidFacetStrings => db_name::FIELD_ID_DOCID_FACET_STRINGS,
|
||||||
Database::FieldIdDocidFacetF64s => db_name::FIELD_ID_DOCID_FACET_F64S,
|
Database::FieldIdDocidFacetF64s => db_name::FIELD_ID_DOCID_FACET_F64S,
|
||||||
Database::VectorEmbedderCategoryId => db_name::VECTOR_EMBEDDER_CATEGORY_ID,
|
Database::VectorEmbedderCategoryId => db_name::VECTOR_EMBEDDER_CATEGORY_ID,
|
||||||
Database::Cellulite => db_name::CELLULITE,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,24 +1,21 @@
|
|||||||
use std::cell::RefCell;
|
use std::cell::RefCell;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::{self, BufReader, BufWriter, ErrorKind, Read, Seek as _, Write as _};
|
use std::io::{self, BufReader, BufWriter, ErrorKind, Seek as _, Write as _};
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::{iter, mem, result};
|
use std::{iter, mem};
|
||||||
|
|
||||||
use bumpalo::Bump;
|
use bumpalo::Bump;
|
||||||
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
|
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
|
||||||
use geojson::GeoJson;
|
use geojson::GeoJson;
|
||||||
use heed::RoTxn;
|
use heed::RoTxn;
|
||||||
use serde_json::value::RawValue;
|
|
||||||
use serde_json::Value;
|
|
||||||
|
|
||||||
use crate::error::GeoError;
|
|
||||||
use crate::update::new::document::{Document, DocumentContext};
|
use crate::update::new::document::{Document, DocumentContext};
|
||||||
use crate::update::new::indexer::document_changes::Extractor;
|
use crate::update::new::indexer::document_changes::Extractor;
|
||||||
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
||||||
use crate::update::new::thread_local::MostlySend;
|
use crate::update::new::thread_local::MostlySend;
|
||||||
use crate::update::new::DocumentChange;
|
use crate::update::new::DocumentChange;
|
||||||
use crate::update::GrenadParameters;
|
use crate::update::GrenadParameters;
|
||||||
use crate::{lat_lng_to_xyz, DocumentId, GeoPoint, Index, InternalError, Result, UserError};
|
use crate::{DocumentId, Index, Result, UserError};
|
||||||
|
|
||||||
pub struct GeoJsonExtractor {
|
pub struct GeoJsonExtractor {
|
||||||
grenad_parameters: GrenadParameters,
|
grenad_parameters: GrenadParameters,
|
||||||
@ -38,25 +35,6 @@ impl GeoJsonExtractor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
#[derive(Pod, Zeroable, Copy, Clone)]
|
|
||||||
#[repr(C, packed)]
|
|
||||||
pub struct ExtractedGeoPoint {
|
|
||||||
pub docid: DocumentId,
|
|
||||||
pub lat_lng: [f64; 2],
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<ExtractedGeoPoint> for GeoPoint {
|
|
||||||
/// Converts the latitude and longitude back to an xyz GeoPoint.
|
|
||||||
fn from(value: ExtractedGeoPoint) -> Self {
|
|
||||||
let [lat, lng] = value.lat_lng;
|
|
||||||
let point = [lat, lng];
|
|
||||||
let xyz_point = lat_lng_to_xyz(&point);
|
|
||||||
GeoPoint::new(xyz_point, (value.docid, point))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
pub struct GeoJsonExtractorData<'extractor> {
|
pub struct GeoJsonExtractorData<'extractor> {
|
||||||
/// The set of documents ids that were removed. If a document sees its geo
|
/// The set of documents ids that were removed. If a document sees its geo
|
||||||
/// point being updated, we first put it in the deleted and then in the inserted.
|
/// point being updated, we first put it in the deleted and then in the inserted.
|
||||||
@ -265,94 +243,3 @@ impl<'extractor> Extractor<'extractor> for GeoJsonExtractor {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Extracts and validates the latitude and latitude from a document geo field.
|
|
||||||
///
|
|
||||||
/// It can be of the form `{ "lat": 0.0, "lng": "1.0" }`.
|
|
||||||
pub fn extract_geo_coordinates(
|
|
||||||
external_id: &str,
|
|
||||||
raw_value: &RawValue,
|
|
||||||
) -> Result<Option<[f64; 2]>> {
|
|
||||||
let mut geo = match serde_json::from_str(raw_value.get()).map_err(InternalError::SerdeJson)? {
|
|
||||||
Value::Null => return Ok(None),
|
|
||||||
Value::Object(map) => map,
|
|
||||||
value => {
|
|
||||||
return Err(Box::new(GeoError::NotAnObject {
|
|
||||||
document_id: Value::from(external_id),
|
|
||||||
value,
|
|
||||||
})
|
|
||||||
.into())
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let [lat, lng] = match (geo.remove("lat"), geo.remove("lng")) {
|
|
||||||
(Some(lat), Some(lng)) => {
|
|
||||||
if geo.is_empty() {
|
|
||||||
[lat, lng]
|
|
||||||
} else {
|
|
||||||
return Err(Box::new(GeoError::UnexpectedExtraFields {
|
|
||||||
document_id: Value::from(external_id),
|
|
||||||
value: Value::from(geo),
|
|
||||||
})
|
|
||||||
.into());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
(Some(_), None) => {
|
|
||||||
return Err(Box::new(GeoError::MissingLongitude {
|
|
||||||
document_id: Value::from(external_id),
|
|
||||||
})
|
|
||||||
.into())
|
|
||||||
}
|
|
||||||
(None, Some(_)) => {
|
|
||||||
return Err(Box::new(GeoError::MissingLatitude {
|
|
||||||
document_id: Value::from(external_id),
|
|
||||||
})
|
|
||||||
.into())
|
|
||||||
}
|
|
||||||
(None, None) => {
|
|
||||||
return Err(Box::new(GeoError::MissingLatitudeAndLongitude {
|
|
||||||
document_id: Value::from(external_id),
|
|
||||||
})
|
|
||||||
.into())
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
match (extract_finite_float_from_value(lat), extract_finite_float_from_value(lng)) {
|
|
||||||
(Ok(lat), Ok(lng)) => Ok(Some([lat, lng])),
|
|
||||||
(Ok(_), Err(value)) => {
|
|
||||||
Err(Box::new(GeoError::BadLongitude { document_id: Value::from(external_id), value })
|
|
||||||
.into())
|
|
||||||
}
|
|
||||||
(Err(value), Ok(_)) => {
|
|
||||||
Err(Box::new(GeoError::BadLatitude { document_id: Value::from(external_id), value })
|
|
||||||
.into())
|
|
||||||
}
|
|
||||||
(Err(lat), Err(lng)) => Err(Box::new(GeoError::BadLatitudeAndLongitude {
|
|
||||||
document_id: Value::from(external_id),
|
|
||||||
lat,
|
|
||||||
lng,
|
|
||||||
})
|
|
||||||
.into()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Extracts and validate that a serde JSON Value is actually a finite f64.
|
|
||||||
pub fn extract_finite_float_from_value(value: Value) -> result::Result<f64, Value> {
|
|
||||||
let number = match value {
|
|
||||||
Value::Number(ref n) => match n.as_f64() {
|
|
||||||
Some(number) => number,
|
|
||||||
None => return Err(value),
|
|
||||||
},
|
|
||||||
Value::String(ref s) => match s.parse::<f64>() {
|
|
||||||
Ok(number) => number,
|
|
||||||
Err(_) => return Err(value),
|
|
||||||
},
|
|
||||||
value => return Err(value),
|
|
||||||
};
|
|
||||||
|
|
||||||
if number.is_finite() {
|
|
||||||
Ok(number)
|
|
||||||
} else {
|
|
||||||
Err(value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -166,9 +166,7 @@ where
|
|||||||
|
|
||||||
indexing_context.progress.update_progress(IndexingStep::WritingEmbeddingsToDatabase);
|
indexing_context.progress.update_progress(IndexingStep::WritingEmbeddingsToDatabase);
|
||||||
|
|
||||||
|
index.cellulite.build(wtxn, indexing_context.progress)?;
|
||||||
let cellulite = cellulite::Cellulite::new(index.cellulite);
|
|
||||||
cellulite.build(wtxn, indexing_context.progress)?;
|
|
||||||
|
|
||||||
pool.install(|| {
|
pool.install(|| {
|
||||||
build_vectors(
|
build_vectors(
|
||||||
|
@ -32,7 +32,6 @@ pub fn write_to_db(
|
|||||||
let _entered = span.enter();
|
let _entered = span.enter();
|
||||||
let span = tracing::trace_span!(target: "indexing::write_db", "post_merge");
|
let span = tracing::trace_span!(target: "indexing::write_db", "post_merge");
|
||||||
let mut _entered_post_merge = None;
|
let mut _entered_post_merge = None;
|
||||||
let cellulite = cellulite::Cellulite::new(index.cellulite);
|
|
||||||
while let Some(action) = writer_receiver.recv_action() {
|
while let Some(action) = writer_receiver.recv_action() {
|
||||||
if _entered_post_merge.is_none()
|
if _entered_post_merge.is_none()
|
||||||
&& finished_extraction.load(std::sync::atomic::Ordering::Relaxed)
|
&& finished_extraction.load(std::sync::atomic::Ordering::Relaxed)
|
||||||
@ -76,10 +75,10 @@ pub fn write_to_db(
|
|||||||
ReceiverAction::GeoJson(docid, geojson) => {
|
ReceiverAction::GeoJson(docid, geojson) => {
|
||||||
match geojson {
|
match geojson {
|
||||||
Some(geojson) => {
|
Some(geojson) => {
|
||||||
cellulite.add(wtxn, docid, &geojson).map_err(InternalError::CelluliteError)?;
|
index.cellulite.add(wtxn, docid, &geojson).map_err(InternalError::CelluliteError)?;
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
cellulite.delete(wtxn, docid).map_err(InternalError::CelluliteError)?;
|
index.cellulite.delete(wtxn, docid).map_err(InternalError::CelluliteError)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -82,6 +82,7 @@ where
|
|||||||
let mut frozen = data.into_inner().freeze()?;
|
let mut frozen = data.into_inner().freeze()?;
|
||||||
for result in frozen.iter_and_clear_removed()? {
|
for result in frozen.iter_and_clear_removed()? {
|
||||||
let extracted_geo_point = result.map_err(InternalError::SerdeJson)?;
|
let extracted_geo_point = result.map_err(InternalError::SerdeJson)?;
|
||||||
|
/// Fix that
|
||||||
todo!("We must send the docid instead of the geojson");
|
todo!("We must send the docid instead of the geojson");
|
||||||
/*
|
/*
|
||||||
let removed = cellulite.remove(&GeoJsonPoint::from(extracted_geo_point));
|
let removed = cellulite.remove(&GeoJsonPoint::from(extracted_geo_point));
|
||||||
|
Reference in New Issue
Block a user