mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-11-04 01:46:28 +00:00 
			
		
		
		
	Fix a lot of primary key related tests
This commit is contained in:
		
				
					committed by
					
						
						Louis Dureuil
					
				
			
			
				
	
			
			
			
						parent
						
							bd31ea2174
						
					
				
				
					commit
					677d7293f5
				
			@@ -32,9 +32,7 @@ use meilisearch_types::error::Code;
 | 
			
		||||
use meilisearch_types::heed::{RoTxn, RwTxn};
 | 
			
		||||
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader, PrimaryKey};
 | 
			
		||||
use meilisearch_types::milli::heed::CompactionOption;
 | 
			
		||||
use meilisearch_types::milli::update::new::indexer::{
 | 
			
		||||
    self, retrieve_or_guess_primary_key, UpdateByFunction,
 | 
			
		||||
};
 | 
			
		||||
use meilisearch_types::milli::update::new::indexer::{self, UpdateByFunction};
 | 
			
		||||
use meilisearch_types::milli::update::{IndexDocumentsMethod, Settings as MilliSettings};
 | 
			
		||||
use meilisearch_types::milli::vector::parsed_vectors::{
 | 
			
		||||
    ExplicitVectors, VectorOrArrayOfVectors, RESERVED_VECTORS_FIELD_NAME,
 | 
			
		||||
@@ -43,7 +41,6 @@ use meilisearch_types::milli::{self, Filter, ThreadPoolNoAbortBuilder};
 | 
			
		||||
use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked};
 | 
			
		||||
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task};
 | 
			
		||||
use meilisearch_types::{compression, Index, VERSION_FILE_NAME};
 | 
			
		||||
use raw_collections::RawMap;
 | 
			
		||||
use roaring::RoaringBitmap;
 | 
			
		||||
use time::macros::format_description;
 | 
			
		||||
use time::OffsetDateTime;
 | 
			
		||||
@@ -1278,16 +1275,6 @@ impl IndexScheduler {
 | 
			
		||||
                // TODO: at some point, for better efficiency we might want to reuse the bumpalo for successive batches.
 | 
			
		||||
                // this is made difficult by the fact we're doing private clones of the index scheduler and sending it
 | 
			
		||||
                // to a fresh thread.
 | 
			
		||||
 | 
			
		||||
                /// TODO manage errors correctly
 | 
			
		||||
                let first_addition_uuid = operations
 | 
			
		||||
                    .iter()
 | 
			
		||||
                    .find_map(|op| match op {
 | 
			
		||||
                        DocumentOperation::Add(content_uuid) => Some(content_uuid),
 | 
			
		||||
                        _ => None,
 | 
			
		||||
                    })
 | 
			
		||||
                    .unwrap();
 | 
			
		||||
 | 
			
		||||
                let mut content_files = Vec::new();
 | 
			
		||||
                for operation in &operations {
 | 
			
		||||
                    if let DocumentOperation::Add(content_uuid) = operation {
 | 
			
		||||
@@ -1303,28 +1290,6 @@ impl IndexScheduler {
 | 
			
		||||
                let db_fields_ids_map = index.fields_ids_map(&rtxn)?;
 | 
			
		||||
                let mut new_fields_ids_map = db_fields_ids_map.clone();
 | 
			
		||||
 | 
			
		||||
                let first_document = match content_files.first() {
 | 
			
		||||
                    Some(mmap) => {
 | 
			
		||||
                        let mut iter = serde_json::Deserializer::from_slice(mmap).into_iter();
 | 
			
		||||
                        iter.next().transpose().map_err(|e| e.into()).map_err(Error::IoError)?
 | 
			
		||||
                    }
 | 
			
		||||
                    None => None,
 | 
			
		||||
                };
 | 
			
		||||
 | 
			
		||||
                let (primary_key, primary_key_has_been_set) = retrieve_or_guess_primary_key(
 | 
			
		||||
                    &rtxn,
 | 
			
		||||
                    index,
 | 
			
		||||
                    &mut new_fields_ids_map,
 | 
			
		||||
                    primary_key.as_deref(),
 | 
			
		||||
                    first_document
 | 
			
		||||
                        .map(|raw| RawMap::from_raw_value(raw, &indexer_alloc))
 | 
			
		||||
                        .transpose()
 | 
			
		||||
                        .map_err(|error| {
 | 
			
		||||
                            milli::Error::UserError(milli::UserError::SerdeJson(error))
 | 
			
		||||
                        })?,
 | 
			
		||||
                )?
 | 
			
		||||
                .map_err(milli::Error::from)?;
 | 
			
		||||
 | 
			
		||||
                let indexer_config = self.index_mapper.indexer_config();
 | 
			
		||||
                let mut content_files_iter = content_files.iter();
 | 
			
		||||
                let mut indexer = indexer::DocumentOperation::new(method);
 | 
			
		||||
@@ -1356,11 +1321,11 @@ impl IndexScheduler {
 | 
			
		||||
                    }
 | 
			
		||||
                };
 | 
			
		||||
 | 
			
		||||
                let (document_changes, operation_stats) = indexer.into_changes(
 | 
			
		||||
                let (document_changes, operation_stats, primary_key) = indexer.into_changes(
 | 
			
		||||
                    &indexer_alloc,
 | 
			
		||||
                    index,
 | 
			
		||||
                    &rtxn,
 | 
			
		||||
                    &primary_key,
 | 
			
		||||
                    primary_key.as_deref(),
 | 
			
		||||
                    &mut new_fields_ids_map,
 | 
			
		||||
                )?;
 | 
			
		||||
 | 
			
		||||
@@ -1403,7 +1368,7 @@ impl IndexScheduler {
 | 
			
		||||
                            index,
 | 
			
		||||
                            &db_fields_ids_map,
 | 
			
		||||
                            new_fields_ids_map,
 | 
			
		||||
                            primary_key_has_been_set.then_some(primary_key),
 | 
			
		||||
                            primary_key,
 | 
			
		||||
                            &document_changes,
 | 
			
		||||
                            embedders,
 | 
			
		||||
                            &|| must_stop_processing.get(),
 | 
			
		||||
 
 | 
			
		||||
@@ -4296,11 +4296,11 @@ mod tests {
 | 
			
		||||
        snapshot!(snapshot_index_scheduler(&index_scheduler), name: "only_first_task_succeed");
 | 
			
		||||
 | 
			
		||||
        // The second batch should fail.
 | 
			
		||||
        handle.advance_one_failed_batch();
 | 
			
		||||
        handle.advance_one_successful_batch();
 | 
			
		||||
        snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_task_fails");
 | 
			
		||||
 | 
			
		||||
        // The second batch should fail.
 | 
			
		||||
        handle.advance_one_failed_batch();
 | 
			
		||||
        handle.advance_one_successful_batch();
 | 
			
		||||
        snapshot!(snapshot_index_scheduler(&index_scheduler), name: "third_task_fails");
 | 
			
		||||
 | 
			
		||||
        // Is the primary key still what we expect?
 | 
			
		||||
@@ -4361,7 +4361,7 @@ mod tests {
 | 
			
		||||
        snapshot!(snapshot_index_scheduler(&index_scheduler), name: "only_first_task_succeed");
 | 
			
		||||
 | 
			
		||||
        // The second batch should fail and contains two tasks.
 | 
			
		||||
        handle.advance_one_failed_batch();
 | 
			
		||||
        handle.advance_one_successful_batch();
 | 
			
		||||
        snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_and_third_tasks_fails");
 | 
			
		||||
 | 
			
		||||
        // Is the primary key still what we expect?
 | 
			
		||||
@@ -4440,7 +4440,8 @@ mod tests {
 | 
			
		||||
        snapshot!(primary_key, @"id");
 | 
			
		||||
 | 
			
		||||
        // We're trying to `bork` again, but now there is already a primary key set for this index.
 | 
			
		||||
        handle.advance_one_failed_batch();
 | 
			
		||||
        // NOTE: it's marked as successful because the batch didn't fails, it's the individual tasks that failed.
 | 
			
		||||
        handle.advance_one_successful_batch();
 | 
			
		||||
        snapshot!(snapshot_index_scheduler(&index_scheduler), name: "fourth_task_fails");
 | 
			
		||||
 | 
			
		||||
        // Finally the last task should succeed since its primary key is the same as the valid one.
 | 
			
		||||
 
 | 
			
		||||
@@ -3,12 +3,14 @@ use bumpalo::Bump;
 | 
			
		||||
use hashbrown::hash_map::Entry;
 | 
			
		||||
use heed::RoTxn;
 | 
			
		||||
use memmap2::Mmap;
 | 
			
		||||
use raw_collections::RawMap;
 | 
			
		||||
use rayon::slice::ParallelSlice;
 | 
			
		||||
use serde_json::value::RawValue;
 | 
			
		||||
use serde_json::Deserializer;
 | 
			
		||||
 | 
			
		||||
use super::super::document_change::DocumentChange;
 | 
			
		||||
use super::document_changes::{DocumentChangeContext, DocumentChanges, MostlySend};
 | 
			
		||||
use super::retrieve_or_guess_primary_key;
 | 
			
		||||
use crate::documents::PrimaryKey;
 | 
			
		||||
use crate::update::new::document::Versions;
 | 
			
		||||
use crate::update::new::{Deletion, Insertion, Update};
 | 
			
		||||
@@ -41,16 +43,17 @@ impl<'pl> DocumentOperation<'pl> {
 | 
			
		||||
        self,
 | 
			
		||||
        indexer: &'pl Bump,
 | 
			
		||||
        index: &Index,
 | 
			
		||||
        rtxn: &RoTxn,
 | 
			
		||||
        primary_key: &PrimaryKey,
 | 
			
		||||
        rtxn: &'pl RoTxn<'pl>,
 | 
			
		||||
        primary_key_from_op: Option<&'pl str>,
 | 
			
		||||
        new_fields_ids_map: &mut FieldsIdsMap,
 | 
			
		||||
    ) -> Result<(DocumentOperationChanges<'pl>, Vec<PayloadStats>)> {
 | 
			
		||||
    ) -> Result<(DocumentOperationChanges<'pl>, Vec<PayloadStats>, Option<PrimaryKey<'pl>>)> {
 | 
			
		||||
        let Self { operations, method } = self;
 | 
			
		||||
 | 
			
		||||
        let documents_ids = index.documents_ids(rtxn)?;
 | 
			
		||||
        let mut operations_stats = Vec::new();
 | 
			
		||||
        let mut available_docids = AvailableIds::new(&documents_ids);
 | 
			
		||||
        let mut docids_version_offsets = hashbrown::HashMap::new();
 | 
			
		||||
        let mut primary_key = None;
 | 
			
		||||
 | 
			
		||||
        for operation in operations {
 | 
			
		||||
            let (bytes, document_count, result) = match operation {
 | 
			
		||||
@@ -58,7 +61,8 @@ impl<'pl> DocumentOperation<'pl> {
 | 
			
		||||
                    indexer,
 | 
			
		||||
                    index,
 | 
			
		||||
                    rtxn,
 | 
			
		||||
                    primary_key,
 | 
			
		||||
                    primary_key_from_op,
 | 
			
		||||
                    &mut primary_key,
 | 
			
		||||
                    new_fields_ids_map,
 | 
			
		||||
                    &mut available_docids,
 | 
			
		||||
                    &docids_version_offsets,
 | 
			
		||||
@@ -98,30 +102,30 @@ impl<'pl> DocumentOperation<'pl> {
 | 
			
		||||
        docids_version_offsets.sort_unstable_by_key(|(_, po)| method.sort_key(&po.operations));
 | 
			
		||||
 | 
			
		||||
        let docids_version_offsets = docids_version_offsets.into_bump_slice();
 | 
			
		||||
        Ok((DocumentOperationChanges { docids_version_offsets }, operations_stats))
 | 
			
		||||
        Ok((DocumentOperationChanges { docids_version_offsets }, operations_stats, primary_key))
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn extract_addition_payload_changes<'s, 'pl: 's>(
 | 
			
		||||
fn extract_addition_payload_changes<'r, 'pl: 'r>(
 | 
			
		||||
    indexer: &'pl Bump,
 | 
			
		||||
    index: &Index,
 | 
			
		||||
    rtxn: &RoTxn,
 | 
			
		||||
    primary_key: &PrimaryKey,
 | 
			
		||||
    fields_ids_map: &mut FieldsIdsMap,
 | 
			
		||||
    rtxn: &'r RoTxn<'r>,
 | 
			
		||||
    primary_key_from_op: Option<&'r str>,
 | 
			
		||||
    primary_key: &mut Option<PrimaryKey<'r>>,
 | 
			
		||||
    new_fields_ids_map: &mut FieldsIdsMap,
 | 
			
		||||
    available_docids: &mut AvailableIds,
 | 
			
		||||
    main_docids_version_offsets: &hashbrown::HashMap<&'s str, PayloadOperations<'pl>>,
 | 
			
		||||
    main_docids_version_offsets: &hashbrown::HashMap<&'pl str, PayloadOperations<'pl>>,
 | 
			
		||||
    method: MergeMethod,
 | 
			
		||||
    payload: &'pl [u8],
 | 
			
		||||
) -> (u64, u64, Result<hashbrown::HashMap<&'s str, PayloadOperations<'pl>>>) {
 | 
			
		||||
) -> (u64, u64, Result<hashbrown::HashMap<&'pl str, PayloadOperations<'pl>>>) {
 | 
			
		||||
    let mut new_docids_version_offsets = hashbrown::HashMap::<&str, PayloadOperations<'pl>>::new();
 | 
			
		||||
 | 
			
		||||
    /// TODO manage the error
 | 
			
		||||
    let mut previous_offset = 0;
 | 
			
		||||
    let mut iter = Deserializer::from_slice(payload).into_iter::<&RawValue>();
 | 
			
		||||
    loop {
 | 
			
		||||
        let doc = match iter.next().transpose() {
 | 
			
		||||
            Ok(Some(doc)) => doc,
 | 
			
		||||
            Ok(None) => break,
 | 
			
		||||
        let optdoc = match iter.next().transpose() {
 | 
			
		||||
            Ok(optdoc) => optdoc,
 | 
			
		||||
            Err(e) => {
 | 
			
		||||
                return (
 | 
			
		||||
                    payload.len() as u64,
 | 
			
		||||
@@ -131,7 +135,63 @@ fn extract_addition_payload_changes<'s, 'pl: 's>(
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        let external_id = match primary_key.extract_fields_and_docid(doc, fields_ids_map, indexer) {
 | 
			
		||||
        // Only guess the primary key if it is the first document
 | 
			
		||||
        let retrieved_primary_key = if previous_offset == 0 {
 | 
			
		||||
            let optdoc = match optdoc {
 | 
			
		||||
                Some(doc) => match RawMap::from_raw_value(doc, indexer) {
 | 
			
		||||
                    Ok(docmap) => Some(docmap),
 | 
			
		||||
                    Err(error) => {
 | 
			
		||||
                        return (
 | 
			
		||||
                            payload.len() as u64,
 | 
			
		||||
                            new_docids_version_offsets.len() as u64,
 | 
			
		||||
                            Err(Error::UserError(UserError::SerdeJson(error))),
 | 
			
		||||
                        )
 | 
			
		||||
                    }
 | 
			
		||||
                },
 | 
			
		||||
                None => None,
 | 
			
		||||
            };
 | 
			
		||||
 | 
			
		||||
            let result = retrieve_or_guess_primary_key(
 | 
			
		||||
                rtxn,
 | 
			
		||||
                index,
 | 
			
		||||
                new_fields_ids_map,
 | 
			
		||||
                primary_key_from_op,
 | 
			
		||||
                optdoc,
 | 
			
		||||
            );
 | 
			
		||||
 | 
			
		||||
            let (pk, _has_been_changed) = match result {
 | 
			
		||||
                Ok(Ok(pk)) => pk,
 | 
			
		||||
                Ok(Err(user_error)) => {
 | 
			
		||||
                    return (
 | 
			
		||||
                        payload.len() as u64,
 | 
			
		||||
                        new_docids_version_offsets.len() as u64,
 | 
			
		||||
                        Err(Error::UserError(user_error)),
 | 
			
		||||
                    )
 | 
			
		||||
                }
 | 
			
		||||
                Err(error) => {
 | 
			
		||||
                    return (
 | 
			
		||||
                        payload.len() as u64,
 | 
			
		||||
                        new_docids_version_offsets.len() as u64,
 | 
			
		||||
                        Err(error),
 | 
			
		||||
                    )
 | 
			
		||||
                }
 | 
			
		||||
            };
 | 
			
		||||
 | 
			
		||||
            primary_key.get_or_insert(pk)
 | 
			
		||||
        } else {
 | 
			
		||||
            primary_key.as_ref().unwrap()
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        let doc = match optdoc {
 | 
			
		||||
            Some(doc) => doc,
 | 
			
		||||
            None => break,
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        let external_id = match retrieved_primary_key.extract_fields_and_docid(
 | 
			
		||||
            doc,
 | 
			
		||||
            new_fields_ids_map,
 | 
			
		||||
            indexer,
 | 
			
		||||
        ) {
 | 
			
		||||
            Ok(edi) => edi,
 | 
			
		||||
            Err(e) => {
 | 
			
		||||
                return (payload.len() as u64, new_docids_version_offsets.len() as u64, Err(e))
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user