mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-11-04 09:56:28 +00:00 
			
		
		
		
	Introduce the validate_documents_batch function
This commit is contained in:
		@@ -7,6 +7,7 @@ use serde_json::Value;
 | 
			
		||||
 | 
			
		||||
use super::helpers::{create_writer, writer_into_reader, GrenadParameters};
 | 
			
		||||
use crate::error::GeoError;
 | 
			
		||||
use crate::update::index_documents::extract_float_from_value;
 | 
			
		||||
use crate::{FieldId, InternalError, Result};
 | 
			
		||||
 | 
			
		||||
/// Extracts the geographical coordinates contained in each document under the `_geo` field.
 | 
			
		||||
@@ -61,11 +62,3 @@ pub fn extract_geo_points<R: io::Read + io::Seek>(
 | 
			
		||||
 | 
			
		||||
    Ok(writer_into_reader(writer)?)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn extract_float_from_value(value: Value) -> StdResult<f64, Value> {
 | 
			
		||||
    match value {
 | 
			
		||||
        Value::Number(ref n) => n.as_f64().ok_or(value),
 | 
			
		||||
        Value::String(ref s) => s.parse::<f64>().map_err(|_| value),
 | 
			
		||||
        value => Err(value),
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -2,11 +2,13 @@ mod extract;
 | 
			
		||||
mod helpers;
 | 
			
		||||
mod transform;
 | 
			
		||||
mod typed_chunk;
 | 
			
		||||
mod validate;
 | 
			
		||||
 | 
			
		||||
use std::collections::HashSet;
 | 
			
		||||
use std::io::{Cursor, Read, Seek};
 | 
			
		||||
use std::iter::FromIterator;
 | 
			
		||||
use std::num::{NonZeroU32, NonZeroUsize};
 | 
			
		||||
use std::result::Result as StdResult;
 | 
			
		||||
 | 
			
		||||
use crossbeam_channel::{Receiver, Sender};
 | 
			
		||||
use heed::types::Str;
 | 
			
		||||
@@ -25,13 +27,19 @@ pub use self::helpers::{
 | 
			
		||||
};
 | 
			
		||||
use self::helpers::{grenad_obkv_into_chunks, GrenadParameters};
 | 
			
		||||
pub use self::transform::{Transform, TransformOutput};
 | 
			
		||||
use crate::documents::DocumentsBatchReader;
 | 
			
		||||
use self::validate::validate_documents_batch;
 | 
			
		||||
pub use self::validate::{
 | 
			
		||||
    extract_float_from_value, validate_document_id, validate_document_id_from_json,
 | 
			
		||||
    validate_geo_from_json,
 | 
			
		||||
};
 | 
			
		||||
use crate::documents::{obkv_to_object, DocumentsBatchReader};
 | 
			
		||||
use crate::error::UserError;
 | 
			
		||||
pub use crate::update::index_documents::helpers::CursorClonableMmap;
 | 
			
		||||
use crate::update::{
 | 
			
		||||
    self, Facets, IndexerConfig, UpdateIndexingStep, WordPrefixDocids,
 | 
			
		||||
    WordPrefixPairProximityDocids, WordPrefixPositionDocids, WordsPrefixesFst,
 | 
			
		||||
};
 | 
			
		||||
use crate::{Index, Result, RoaringBitmapCodec, UserError};
 | 
			
		||||
use crate::{Index, Result, RoaringBitmapCodec};
 | 
			
		||||
 | 
			
		||||
static MERGED_DATABASE_COUNT: usize = 7;
 | 
			
		||||
static PREFIX_DATABASE_COUNT: usize = 5;
 | 
			
		||||
@@ -117,19 +125,27 @@ where
 | 
			
		||||
 | 
			
		||||
    /// Adds a batch of documents to the current builder.
 | 
			
		||||
    ///
 | 
			
		||||
    /// Since the documents are progressively added to the writer, a failure will cause a stale
 | 
			
		||||
    /// builder, and the builder must be discarded.
 | 
			
		||||
    /// Since the documents are progressively added to the writer, a failure will cause only
 | 
			
		||||
    /// return an error and not the `IndexDocuments` struct as it is invalid to use it afterward.
 | 
			
		||||
    ///
 | 
			
		||||
    /// Returns the number of documents added to the builder.
 | 
			
		||||
    pub fn add_documents<R>(&mut self, reader: DocumentsBatchReader<R>) -> Result<u64>
 | 
			
		||||
    where
 | 
			
		||||
        R: Read + Seek,
 | 
			
		||||
    {
 | 
			
		||||
    pub fn add_documents<R: Read + Seek>(
 | 
			
		||||
        mut self,
 | 
			
		||||
        reader: DocumentsBatchReader<R>,
 | 
			
		||||
    ) -> Result<(Self, StdResult<u64, UserError>)> {
 | 
			
		||||
        // Early return when there is no document to add
 | 
			
		||||
        if reader.is_empty() {
 | 
			
		||||
            return Ok(0);
 | 
			
		||||
            return Ok((self, Ok(0)));
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // We check for user errors in this validator and if there is one, we can return
 | 
			
		||||
        // the `IndexDocument` struct as it is valid to send more documents into it.
 | 
			
		||||
        // However, if there is an internal error we throw it away!
 | 
			
		||||
        let reader = match validate_documents_batch(self.wtxn, self.index, reader)? {
 | 
			
		||||
            Ok(reader) => reader,
 | 
			
		||||
            Err(user_error) => return Ok((self, Err(user_error))),
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        let indexed_documents = self
 | 
			
		||||
            .transform
 | 
			
		||||
            .as_mut()
 | 
			
		||||
@@ -139,7 +155,7 @@ where
 | 
			
		||||
 | 
			
		||||
        self.added_documents += indexed_documents;
 | 
			
		||||
 | 
			
		||||
        Ok(indexed_documents)
 | 
			
		||||
        Ok((self, Ok(indexed_documents)))
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[logging_timer::time("IndexDocuments::{}")]
 | 
			
		||||
 
 | 
			
		||||
@@ -17,6 +17,7 @@ use super::{validate_document_id, IndexDocumentsMethod, IndexerConfig};
 | 
			
		||||
use crate::documents::{DocumentsBatchIndex, DocumentsBatchReader};
 | 
			
		||||
use crate::error::{Error, InternalError, UserError};
 | 
			
		||||
use crate::index::db_name;
 | 
			
		||||
use crate::update::index_documents::validate_document_id_from_json;
 | 
			
		||||
use crate::update::{AvailableDocumentsIds, UpdateIndexingStep};
 | 
			
		||||
use crate::{
 | 
			
		||||
    ExternalDocumentsIds, FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldsIdsMap, Index,
 | 
			
		||||
@@ -782,14 +783,6 @@ fn compute_primary_key_pair(
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn validate_document_id(document_id: &str) -> Option<&str> {
 | 
			
		||||
    let document_id = document_id.trim();
 | 
			
		||||
    Some(document_id).filter(|id| {
 | 
			
		||||
        !id.is_empty()
 | 
			
		||||
            && id.chars().all(|c| matches!(c, 'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_'))
 | 
			
		||||
    })
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Drops all the value of type `U` in vec, and reuses the allocation to create a `Vec<T>`.
 | 
			
		||||
///
 | 
			
		||||
/// The size and alignment of T and U must match.
 | 
			
		||||
@@ -813,22 +806,7 @@ fn update_primary_key<'a>(
 | 
			
		||||
) -> Result<Cow<'a, str>> {
 | 
			
		||||
    match field_buffer_cache.iter_mut().find(|(id, _)| *id == primary_key_id) {
 | 
			
		||||
        Some((_, bytes)) => {
 | 
			
		||||
            let value = match serde_json::from_slice(bytes).map_err(InternalError::SerdeJson)? {
 | 
			
		||||
                Value::String(string) => match validate_document_id(&string) {
 | 
			
		||||
                    Some(s) if s.len() == string.len() => string,
 | 
			
		||||
                    Some(s) => s.to_string(),
 | 
			
		||||
                    None => {
 | 
			
		||||
                        return Err(UserError::InvalidDocumentId {
 | 
			
		||||
                            document_id: Value::String(string),
 | 
			
		||||
                        }
 | 
			
		||||
                        .into())
 | 
			
		||||
                    }
 | 
			
		||||
                },
 | 
			
		||||
                Value::Number(number) => number.to_string(),
 | 
			
		||||
                content => {
 | 
			
		||||
                    return Err(UserError::InvalidDocumentId { document_id: content.clone() }.into())
 | 
			
		||||
                }
 | 
			
		||||
            };
 | 
			
		||||
            let value = validate_document_id_from_json(bytes)??;
 | 
			
		||||
            serde_json::to_writer(external_id_buffer, &value).map_err(InternalError::SerdeJson)?;
 | 
			
		||||
            Ok(Cow::Owned(value))
 | 
			
		||||
        }
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										140
									
								
								milli/src/update/index_documents/validate.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										140
									
								
								milli/src/update/index_documents/validate.rs
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,140 @@
 | 
			
		||||
use std::io::{Read, Seek};
 | 
			
		||||
use std::result::Result as StdResult;
 | 
			
		||||
 | 
			
		||||
use serde_json::Value;
 | 
			
		||||
 | 
			
		||||
use crate::error::{GeoError, InternalError, UserError};
 | 
			
		||||
use crate::update::index_documents::{obkv_to_object, DocumentsBatchReader};
 | 
			
		||||
use crate::{Index, Result};
 | 
			
		||||
 | 
			
		||||
/// This function validates a documents by checking that:
 | 
			
		||||
///  - we can infer a primary key,
 | 
			
		||||
///  - all the documents id exist and,
 | 
			
		||||
///  - the validity of them but also,
 | 
			
		||||
///  - the validity of the `_geo` field depending on the settings.
 | 
			
		||||
pub fn validate_documents_batch<R: Read + Seek>(
 | 
			
		||||
    rtxn: &heed::RoTxn,
 | 
			
		||||
    index: &Index,
 | 
			
		||||
    reader: DocumentsBatchReader<R>,
 | 
			
		||||
) -> Result<StdResult<DocumentsBatchReader<R>, UserError>> {
 | 
			
		||||
    let mut cursor = reader.into_cursor();
 | 
			
		||||
    let documents_batch_index = cursor.documents_batch_index().clone();
 | 
			
		||||
 | 
			
		||||
    // The primary key *field id* that has already been set for this index or the one
 | 
			
		||||
    // we will guess by searching for the first key that contains "id" as a substring.
 | 
			
		||||
    let (primary_key, primary_key_id) = match index.primary_key(rtxn)? {
 | 
			
		||||
        Some(primary_key) => match documents_batch_index.id(primary_key) {
 | 
			
		||||
            Some(id) => (primary_key, id),
 | 
			
		||||
            None => {
 | 
			
		||||
                return match cursor.next_document()? {
 | 
			
		||||
                    Some(first_document) => Ok(Err(UserError::MissingDocumentId {
 | 
			
		||||
                        primary_key: primary_key.to_string(),
 | 
			
		||||
                        document: obkv_to_object(&first_document, &documents_batch_index)?,
 | 
			
		||||
                    })),
 | 
			
		||||
                    // If there is no document in this batch the best we can do is to return this error.
 | 
			
		||||
                    None => Ok(Err(UserError::MissingPrimaryKey)),
 | 
			
		||||
                };
 | 
			
		||||
            }
 | 
			
		||||
        },
 | 
			
		||||
        None => {
 | 
			
		||||
            let guessed = documents_batch_index
 | 
			
		||||
                .iter()
 | 
			
		||||
                .filter(|(_, name)| name.contains("id"))
 | 
			
		||||
                .min_by_key(|(fid, _)| *fid);
 | 
			
		||||
            match guessed {
 | 
			
		||||
                Some((id, name)) => (name.as_str(), *id),
 | 
			
		||||
                None => return Ok(Err(UserError::MissingPrimaryKey)),
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    // If the settings specifies that a _geo field must be used therefore we must check the
 | 
			
		||||
    // validity of it in all the documents of this batch and this is when we return `Some`.
 | 
			
		||||
    let geo_field_id = match documents_batch_index.id("_geo") {
 | 
			
		||||
        Some(geo_field_id) if index.sortable_fields(rtxn)?.contains("_geo") => Some(geo_field_id),
 | 
			
		||||
        _otherwise => None,
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    while let Some(document) = cursor.next_document()? {
 | 
			
		||||
        let document_id = match document.get(primary_key_id) {
 | 
			
		||||
            Some(document_id_bytes) => match validate_document_id_from_json(document_id_bytes)? {
 | 
			
		||||
                Ok(document_id) => document_id,
 | 
			
		||||
                Err(user_error) => return Ok(Err(user_error)),
 | 
			
		||||
            },
 | 
			
		||||
            None => {
 | 
			
		||||
                return Ok(Err(UserError::MissingDocumentId {
 | 
			
		||||
                    primary_key: primary_key.to_string(),
 | 
			
		||||
                    document: obkv_to_object(&document, &documents_batch_index)?,
 | 
			
		||||
                }))
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        if let Some(geo_value) = geo_field_id.and_then(|fid| document.get(fid)) {
 | 
			
		||||
            if let Err(user_error) = validate_geo_from_json(Value::from(document_id), geo_value)? {
 | 
			
		||||
                return Ok(Err(UserError::from(user_error)));
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    Ok(Ok(cursor.into_reader()))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Returns a trimmed version of the document id or `None` if it is invalid.
 | 
			
		||||
pub fn validate_document_id(document_id: &str) -> Option<&str> {
 | 
			
		||||
    let id = document_id.trim();
 | 
			
		||||
    if !id.is_empty()
 | 
			
		||||
        && id.chars().all(|c| matches!(c, 'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_'))
 | 
			
		||||
    {
 | 
			
		||||
        Some(id)
 | 
			
		||||
    } else {
 | 
			
		||||
        None
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Parses a Json encoded document id and validate it, returning a user error when it is one.
 | 
			
		||||
pub fn validate_document_id_from_json(bytes: &[u8]) -> Result<StdResult<String, UserError>> {
 | 
			
		||||
    match serde_json::from_slice(bytes).map_err(InternalError::SerdeJson)? {
 | 
			
		||||
        Value::String(string) => match validate_document_id(&string) {
 | 
			
		||||
            Some(s) if s.len() == string.len() => Ok(Ok(string)),
 | 
			
		||||
            Some(s) => Ok(Ok(s.to_string())),
 | 
			
		||||
            None => {
 | 
			
		||||
                return Ok(Err(UserError::InvalidDocumentId { document_id: Value::String(string) }))
 | 
			
		||||
            }
 | 
			
		||||
        },
 | 
			
		||||
        Value::Number(number) => Ok(Ok(number.to_string())),
 | 
			
		||||
        content => return Ok(Err(UserError::InvalidDocumentId { document_id: content.clone() })),
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Try to extract an `f64` from a JSON `Value` and return the `Value`
 | 
			
		||||
/// in the `Err` variant if it failed.
 | 
			
		||||
pub fn extract_float_from_value(value: Value) -> StdResult<f64, Value> {
 | 
			
		||||
    match value {
 | 
			
		||||
        Value::Number(ref n) => n.as_f64().ok_or(value),
 | 
			
		||||
        Value::String(ref s) => s.parse::<f64>().map_err(|_| value),
 | 
			
		||||
        value => Err(value),
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub fn validate_geo_from_json(document_id: Value, bytes: &[u8]) -> Result<StdResult<(), GeoError>> {
 | 
			
		||||
    let result = match serde_json::from_slice(bytes).map_err(InternalError::SerdeJson)? {
 | 
			
		||||
        Value::Object(mut object) => match (object.remove("lat"), object.remove("lng")) {
 | 
			
		||||
            (Some(lat), Some(lng)) => {
 | 
			
		||||
                match (extract_float_from_value(lat), extract_float_from_value(lng)) {
 | 
			
		||||
                    (Ok(_), Ok(_)) => Ok(()),
 | 
			
		||||
                    (Err(value), Ok(_)) => Err(GeoError::BadLatitude { document_id, value }),
 | 
			
		||||
                    (Ok(_), Err(value)) => Err(GeoError::BadLongitude { document_id, value }),
 | 
			
		||||
                    (Err(lat), Err(lng)) => {
 | 
			
		||||
                        Err(GeoError::BadLatitudeAndLongitude { document_id, lat, lng })
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            (None, Some(_)) => Err(GeoError::MissingLatitude { document_id }),
 | 
			
		||||
            (Some(_), None) => Err(GeoError::MissingLongitude { document_id }),
 | 
			
		||||
            (None, None) => Err(GeoError::MissingLatitudeAndLongitude { document_id }),
 | 
			
		||||
        },
 | 
			
		||||
        value => Err(GeoError::NotAnObject { document_id, value }),
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    Ok(result)
 | 
			
		||||
}
 | 
			
		||||
		Reference in New Issue
	
	Block a user