mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-10-24 12:36:28 +00:00
Support guessing primary key again
This commit is contained in:
@@ -29,14 +29,17 @@ use meilisearch_types::error::Code;
|
|||||||
use meilisearch_types::heed::{RoTxn, RwTxn};
|
use meilisearch_types::heed::{RoTxn, RwTxn};
|
||||||
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader, PrimaryKey};
|
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader, PrimaryKey};
|
||||||
use meilisearch_types::milli::heed::CompactionOption;
|
use meilisearch_types::milli::heed::CompactionOption;
|
||||||
use meilisearch_types::milli::update::new::indexer::{self, guess_primary_key, DocumentChanges};
|
use meilisearch_types::milli::update::new::indexer::{
|
||||||
|
self, retrieve_or_guess_primary_key, DocumentChanges,
|
||||||
|
};
|
||||||
|
use meilisearch_types::milli::update::new::TopLevelMap;
|
||||||
use meilisearch_types::milli::update::{
|
use meilisearch_types::milli::update::{
|
||||||
IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings as MilliSettings,
|
IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings as MilliSettings,
|
||||||
};
|
};
|
||||||
use meilisearch_types::milli::vector::parsed_vectors::{
|
use meilisearch_types::milli::vector::parsed_vectors::{
|
||||||
ExplicitVectors, VectorOrArrayOfVectors, RESERVED_VECTORS_FIELD_NAME,
|
ExplicitVectors, VectorOrArrayOfVectors, RESERVED_VECTORS_FIELD_NAME,
|
||||||
};
|
};
|
||||||
use meilisearch_types::milli::{self, Filter, Object};
|
use meilisearch_types::milli::{self, Filter, InternalError, Object};
|
||||||
use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked};
|
use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked};
|
||||||
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task};
|
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task};
|
||||||
use meilisearch_types::{compression, Index, VERSION_FILE_NAME};
|
use meilisearch_types::{compression, Index, VERSION_FILE_NAME};
|
||||||
@@ -1296,22 +1299,34 @@ impl IndexScheduler {
|
|||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
// let content_file = self.file_store.get_update(*first_addition_uuid)?;
|
|
||||||
// let reader =
|
|
||||||
// DocumentsBatchReader::from_reader(content_file).map_err(milli::Error::from)?;
|
|
||||||
// let (cursor, documents_batch_index) = reader.into_cursor_and_fields_index();
|
|
||||||
// let primary_key =
|
|
||||||
// guess_primary_key(&rtxn, index, cursor, &documents_batch_index)?.unwrap();
|
|
||||||
|
|
||||||
let mut content_files = Vec::new();
|
let mut content_files = Vec::new();
|
||||||
for operation in &operations {
|
for operation in &operations {
|
||||||
if let DocumentOperation::Add(content_uuid) = operation {
|
if let DocumentOperation::Add(content_uuid) = operation {
|
||||||
let content_file = self.file_store.get_update(*content_uuid)?;
|
let content_file = self.file_store.get_update(*content_uuid)?;
|
||||||
let mmap = unsafe { memmap2::Mmap::map(&content_file)? };
|
let mmap = unsafe { memmap2::Mmap::map(&content_file)? };
|
||||||
content_files.push(mmap);
|
if !mmap.is_empty() {
|
||||||
|
content_files.push(mmap);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let mut fields_ids_map = index.fields_ids_map(&rtxn)?;
|
||||||
|
let first_document = match content_files.first() {
|
||||||
|
Some(mmap) => {
|
||||||
|
let mut iter = serde_json::Deserializer::from_slice(mmap).into_iter();
|
||||||
|
iter.next().transpose().map_err(|e| e.into()).map_err(Error::IoError)?
|
||||||
|
}
|
||||||
|
None => None,
|
||||||
|
};
|
||||||
|
|
||||||
|
let primary_key = retrieve_or_guess_primary_key(
|
||||||
|
&rtxn,
|
||||||
|
index,
|
||||||
|
&mut fields_ids_map,
|
||||||
|
first_document.as_ref(),
|
||||||
|
)?
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
let mut content_files_iter = content_files.iter();
|
let mut content_files_iter = content_files.iter();
|
||||||
let mut indexer = indexer::DocumentOperation::new(method);
|
let mut indexer = indexer::DocumentOperation::new(method);
|
||||||
for (operation, task) in operations.into_iter().zip(tasks.iter_mut()) {
|
for (operation, task) in operations.into_iter().zip(tasks.iter_mut()) {
|
||||||
@@ -1364,21 +1379,9 @@ impl IndexScheduler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !tasks.iter().all(|res| res.error.is_some()) {
|
if !tasks.iter().all(|res| res.error.is_some()) {
|
||||||
let mut fields_ids_map = index.fields_ids_map(&rtxn)?;
|
|
||||||
/// TODO create a pool if needed
|
/// TODO create a pool if needed
|
||||||
// let pool = indexer_config.thread_pool.unwrap();
|
// let pool = indexer_config.thread_pool.unwrap();
|
||||||
let pool = rayon::ThreadPoolBuilder::new().build().unwrap();
|
let pool = rayon::ThreadPoolBuilder::new().build().unwrap();
|
||||||
// let fields_ids_map = RwLock::new(fields_ids_map);
|
|
||||||
|
|
||||||
/// TODO correctly guess the primary key in a NDJSON
|
|
||||||
let pk = match std::env::var("MEILI_PRIMARY_KEY") {
|
|
||||||
Ok(pk) => pk,
|
|
||||||
Err(VarError::NotPresent) => "id".to_string(),
|
|
||||||
Err(e) => panic!("primary key error: {e}"),
|
|
||||||
};
|
|
||||||
|
|
||||||
fields_ids_map.insert(&pk);
|
|
||||||
let primary_key = PrimaryKey::new(&pk, &fields_ids_map).unwrap();
|
|
||||||
|
|
||||||
let param = (index, &rtxn, &primary_key);
|
let param = (index, &rtxn, &primary_key);
|
||||||
let document_changes = indexer.document_changes(&mut fields_ids_map, param)?;
|
let document_changes = indexer.document_changes(&mut fields_ids_map, param)?;
|
||||||
|
@@ -22,7 +22,7 @@ use crate::documents::{
|
|||||||
};
|
};
|
||||||
use crate::update::new::channel::{DatabaseType, ExtractorSender};
|
use crate::update::new::channel::{DatabaseType, ExtractorSender};
|
||||||
use crate::update::GrenadParameters;
|
use crate::update::GrenadParameters;
|
||||||
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError};
|
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, UserError};
|
||||||
|
|
||||||
mod document_deletion;
|
mod document_deletion;
|
||||||
mod document_operation;
|
mod document_operation;
|
||||||
@@ -242,53 +242,46 @@ fn extract_and_send_docids<E: SearchableExtractor, D: DatabaseType>(
|
|||||||
Ok(sender.send_searchable::<D>(merger).unwrap())
|
Ok(sender.send_searchable::<D>(merger).unwrap())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns the primary key *field id* that has already been set for this index or the
|
||||||
|
/// one we will guess by searching for the first key that contains "id" as a substring.
|
||||||
/// TODO move this elsewhere
|
/// TODO move this elsewhere
|
||||||
pub fn guess_primary_key<'a>(
|
pub fn retrieve_or_guess_primary_key<'a>(
|
||||||
rtxn: &'a RoTxn<'a>,
|
rtxn: &'a RoTxn<'a>,
|
||||||
index: &Index,
|
index: &Index,
|
||||||
mut cursor: DocumentsBatchCursor<File>,
|
fields_ids_map: &mut FieldsIdsMap,
|
||||||
documents_batch_index: &'a DocumentsBatchIndex,
|
first_document: Option<&'a TopLevelMap<'_>>,
|
||||||
) -> Result<StdResult<PrimaryKey<'a>, UserError>> {
|
) -> Result<StdResult<PrimaryKey<'a>, UserError>> {
|
||||||
// The primary key *field id* that has already been set for this index or the one
|
|
||||||
// we will guess by searching for the first key that contains "id" as a substring.
|
|
||||||
match index.primary_key(rtxn)? {
|
match index.primary_key(rtxn)? {
|
||||||
Some(primary_key) => match PrimaryKey::new(primary_key, documents_batch_index) {
|
Some(primary_key) => match PrimaryKey::new(primary_key, fields_ids_map) {
|
||||||
Some(primary_key) => Ok(Ok(primary_key)),
|
Some(primary_key) => Ok(Ok(primary_key)),
|
||||||
None => match cursor.next_document()? {
|
None => unreachable!("Why is the primary key not in the fidmap?"),
|
||||||
Some(first_document) => Ok(Err(UserError::MissingDocumentId {
|
|
||||||
primary_key: primary_key.to_string(),
|
|
||||||
document: obkv_to_object(first_document, documents_batch_index)?,
|
|
||||||
})),
|
|
||||||
None => unreachable!("Called with reader.is_empty()"),
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
None => {
|
None => {
|
||||||
let mut guesses: Vec<(u16, &str)> = documents_batch_index
|
let first_document = match first_document {
|
||||||
.iter()
|
Some(document) => document,
|
||||||
.filter(|(_, name)| name.to_lowercase().ends_with(DEFAULT_PRIMARY_KEY))
|
None => return Ok(Err(UserError::NoPrimaryKeyCandidateFound)),
|
||||||
.map(|(field_id, name)| (*field_id, name.as_str()))
|
};
|
||||||
|
|
||||||
|
let mut guesses: Vec<&str> = first_document
|
||||||
|
.keys()
|
||||||
|
.map(AsRef::as_ref)
|
||||||
|
.filter(|name| name.to_lowercase().ends_with(DEFAULT_PRIMARY_KEY))
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
// sort the keys in a deterministic, obvious way, so that fields are always in the same order.
|
// sort the keys in lexicographical order, so that fields are always in the same order.
|
||||||
guesses.sort_by(|(_, left_name), (_, right_name)| {
|
guesses.sort_unstable();
|
||||||
// shortest name first
|
|
||||||
left_name.len().cmp(&right_name.len()).then_with(
|
|
||||||
// then alphabetical order
|
|
||||||
|| left_name.cmp(right_name),
|
|
||||||
)
|
|
||||||
});
|
|
||||||
|
|
||||||
match guesses.as_slice() {
|
match guesses.as_slice() {
|
||||||
[] => Ok(Err(UserError::NoPrimaryKeyCandidateFound)),
|
[] => Ok(Err(UserError::NoPrimaryKeyCandidateFound)),
|
||||||
[(field_id, name)] => {
|
[name] => {
|
||||||
tracing::info!("Primary key was not specified in index. Inferred to '{name}'");
|
tracing::info!("Primary key was not specified in index. Inferred to '{name}'");
|
||||||
Ok(Ok(PrimaryKey::Flat { name, field_id: *field_id }))
|
match fields_ids_map.insert(name) {
|
||||||
|
Some(field_id) => Ok(Ok(PrimaryKey::Flat { name, field_id })),
|
||||||
|
None => Ok(Err(UserError::AttributeLimitReached)),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
multiple => Ok(Err(UserError::MultiplePrimaryKeyCandidatesFound {
|
multiple => Ok(Err(UserError::MultiplePrimaryKeyCandidatesFound {
|
||||||
candidates: multiple
|
candidates: multiple.iter().map(|candidate| candidate.to_string()).collect(),
|
||||||
.iter()
|
|
||||||
.map(|(_, candidate)| candidate.to_string())
|
|
||||||
.collect(),
|
|
||||||
})),
|
})),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -1,13 +1,41 @@
|
|||||||
use std::borrow::{Borrow, Cow};
|
use std::borrow::{Borrow, Cow};
|
||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
use std::fmt;
|
use std::{fmt, ops};
|
||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_json::value::RawValue;
|
use serde_json::value::RawValue;
|
||||||
|
use serde_json::{Map, Value};
|
||||||
|
|
||||||
#[derive(Deserialize, Serialize)]
|
#[derive(Deserialize, Serialize)]
|
||||||
pub struct TopLevelMap<'p>(#[serde(borrow)] pub BTreeMap<CowStr<'p>, &'p RawValue>);
|
pub struct TopLevelMap<'p>(#[serde(borrow)] pub BTreeMap<CowStr<'p>, &'p RawValue>);
|
||||||
|
|
||||||
|
impl TryFrom<&'_ TopLevelMap<'_>> for Map<String, Value> {
|
||||||
|
type Error = serde_json::Error;
|
||||||
|
|
||||||
|
fn try_from(tlmap: &TopLevelMap<'_>) -> Result<Self, Self::Error> {
|
||||||
|
let mut object = Map::new();
|
||||||
|
for (k, v) in &tlmap.0 {
|
||||||
|
let value = serde_json::from_str(v.get())?;
|
||||||
|
object.insert(k.to_string(), value);
|
||||||
|
}
|
||||||
|
Ok(object)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'p> ops::Deref for TopLevelMap<'p> {
|
||||||
|
type Target = BTreeMap<CowStr<'p>, &'p RawValue>;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
&self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ops::DerefMut for TopLevelMap<'_> {
|
||||||
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||||
|
&mut self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Deserialize, Serialize, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)]
|
#[derive(Deserialize, Serialize, PartialEq, Eq, PartialOrd, Ord, Hash, Clone)]
|
||||||
pub struct CowStr<'p>(#[serde(borrow)] pub Cow<'p, str>);
|
pub struct CowStr<'p>(#[serde(borrow)] pub Cow<'p, str>);
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user