Compare commits

...

8 Commits

Author SHA1 Message Date
YoEight
328718ee90 improve documentation 2025-12-11 16:30:04 -05:00
YoEight
a2878efafe time for some tests 2025-12-11 16:11:22 -05:00
YoEight
f392e0a0f8 threading down on_missing_document param 2025-12-11 15:54:11 -05:00
YoEight
e359325dbd rename to a clearer name 2025-12-11 14:31:16 -05:00
YoEight
d5f66c195d introduce DocumentCreationPolicy 2025-12-11 14:13:11 -05:00
YoEight
9f64b0de66 Allow strict document update without creating missing documents 2025-12-11 12:49:34 -05:00
Clément Renault
26e368b116 Merge pull request #6041 from meilisearch/fix-workflow-injection
Remove risk of command injection
2025-12-09 17:04:58 +00:00
curquiza
ba95ac0915 Remove risk of command injection 2025-12-09 17:06:41 +01:00
23 changed files with 220 additions and 90 deletions

View File

@@ -25,14 +25,18 @@ jobs:
- uses: actions/checkout@v5 - uses: actions/checkout@v5
- name: Define the Docker image we need to use - name: Define the Docker image we need to use
id: define-image id: define-image
env:
EVENT_NAME: ${{ github.event_name }}
DOCKER_IMAGE_INPUT: ${{ github.event.inputs.docker_image }}
run: | run: |
event=${{ github.event_name }}
echo "docker-image=nightly" >> $GITHUB_OUTPUT echo "docker-image=nightly" >> $GITHUB_OUTPUT
if [[ $event == 'workflow_dispatch' ]]; then if [[ "$EVENT_NAME" == 'workflow_dispatch' ]]; then
echo "docker-image=${{ github.event.inputs.docker_image }}" >> $GITHUB_OUTPUT echo "docker-image=$DOCKER_IMAGE_INPUT" >> $GITHUB_OUTPUT
fi fi
- name: Docker image is ${{ steps.define-image.outputs.docker-image }} - name: Docker image is ${{ steps.define-image.outputs.docker-image }}
run: echo "Docker image is ${{ steps.define-image.outputs.docker-image }}" env:
DOCKER_IMAGE: ${{ steps.define-image.outputs.docker-image }}
run: echo "Docker image is $DOCKER_IMAGE"
########## ##########
## SDKs ## ## SDKs ##

View File

@@ -113,9 +113,9 @@ fn main() {
for op in &operations { for op in &operations {
match op { match op {
Either::Left(documents) => { Either::Left(documents) => indexer
indexer.replace_documents(documents).unwrap() .replace_documents(documents, Default::default())
} .unwrap(),
Either::Right(ids) => indexer.delete_documents(ids), Either::Right(ids) => indexer.delete_documents(ids),
} }
} }

View File

@@ -164,6 +164,7 @@ impl<'a> Dump<'a> {
content_file: content_uuid.ok_or(Error::CorruptedDump)?, content_file: content_uuid.ok_or(Error::CorruptedDump)?,
documents_count, documents_count,
allow_index_creation, allow_index_creation,
on_missing_document: Default::default(),
}, },
KindDump::DocumentDeletion { documents_ids } => KindWithContent::DocumentDeletion { KindDump::DocumentDeletion { documents_ids } => KindWithContent::DocumentDeletion {
documents_ids, documents_ids,

View File

@@ -40,6 +40,7 @@ fn doc_imp(
content_file: Uuid::new_v4(), content_file: Uuid::new_v4(),
documents_count: 0, documents_count: 0,
allow_index_creation, allow_index_creation,
on_missing_document: Default::default(),
} }
} }

View File

@@ -2,7 +2,7 @@ use std::fmt;
use std::io::ErrorKind; use std::io::ErrorKind;
use meilisearch_types::heed::RoTxn; use meilisearch_types::heed::RoTxn;
use meilisearch_types::milli::update::IndexDocumentsMethod; use meilisearch_types::milli::update::{IndexDocumentsMethod, MissingDocumentPolicy};
use meilisearch_types::settings::{Settings, Unchecked}; use meilisearch_types::settings::{Settings, Unchecked};
use meilisearch_types::tasks::{BatchStopReason, Kind, KindWithContent, Status, Task}; use meilisearch_types::tasks::{BatchStopReason, Kind, KindWithContent, Status, Task};
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
@@ -63,8 +63,8 @@ pub(crate) enum Batch {
#[derive(Debug)] #[derive(Debug)]
pub(crate) enum DocumentOperation { pub(crate) enum DocumentOperation {
Replace(Uuid), Replace { content_file: Uuid, on_missing_document: MissingDocumentPolicy },
Update(Uuid), Update { content_file: Uuid, on_missing_document: MissingDocumentPolicy },
Delete(Vec<String>), Delete(Vec<String>),
} }
@@ -293,13 +293,22 @@ impl IndexScheduler {
for task in tasks.iter() { for task in tasks.iter() {
match task.kind { match task.kind {
KindWithContent::DocumentAdditionOrUpdate { KindWithContent::DocumentAdditionOrUpdate {
content_file, method, .. content_file,
method,
on_missing_document,
..
} => match method { } => match method {
IndexDocumentsMethod::ReplaceDocuments => { IndexDocumentsMethod::ReplaceDocuments => {
operations.push(DocumentOperation::Replace(content_file)) operations.push(DocumentOperation::Replace {
content_file,
on_missing_document,
})
} }
IndexDocumentsMethod::UpdateDocuments => { IndexDocumentsMethod::UpdateDocuments => {
operations.push(DocumentOperation::Update(content_file)) operations.push(DocumentOperation::Update {
content_file,
on_missing_document,
})
} }
_ => unreachable!("Unknown document merging method"), _ => unreachable!("Unknown document merging method"),
}, },

View File

@@ -77,8 +77,8 @@ impl IndexScheduler {
let mut content_files = Vec::new(); let mut content_files = Vec::new();
for operation in &operations { for operation in &operations {
match operation { match operation {
DocumentOperation::Replace(content_uuid) DocumentOperation::Replace { content_file: content_uuid, .. }
| DocumentOperation::Update(content_uuid) => { | DocumentOperation::Update { content_file: content_uuid, .. } => {
let content_file = self.queue.file_store.get_update(*content_uuid)?; let content_file = self.queue.file_store.get_update(*content_uuid)?;
let mmap = unsafe { memmap2::Mmap::map(&content_file)? }; let mmap = unsafe { memmap2::Mmap::map(&content_file)? };
content_files.push(mmap); content_files.push(mmap);
@@ -100,16 +100,16 @@ impl IndexScheduler {
let embedders = self.embedders(index_uid.clone(), embedders)?; let embedders = self.embedders(index_uid.clone(), embedders)?;
for operation in operations { for operation in operations {
match operation { match operation {
DocumentOperation::Replace(_content_uuid) => { DocumentOperation::Replace { content_file: _, on_missing_document } => {
let mmap = content_files_iter.next().unwrap(); let mmap = content_files_iter.next().unwrap();
indexer indexer
.replace_documents(mmap) .replace_documents(mmap, on_missing_document)
.map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?; .map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?;
} }
DocumentOperation::Update(_content_uuid) => { DocumentOperation::Update { content_file: _, on_missing_document } => {
let mmap = content_files_iter.next().unwrap(); let mmap = content_files_iter.next().unwrap();
indexer indexer
.update_documents(mmap) .update_documents(mmap, on_missing_document)
.map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?; .map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?;
} }
DocumentOperation::Delete(document_ids) => { DocumentOperation::Delete(document_ids) => {

View File

@@ -294,6 +294,7 @@ fn document_addition_and_index_deletion() {
content_file: uuid, content_file: uuid,
documents_count, documents_count,
allow_index_creation: true, allow_index_creation: true,
on_missing_document: Default::default(),
}, },
None, None,
false, false,
@@ -482,6 +483,7 @@ fn document_addition_and_index_deletion_on_unexisting_index() {
content_file: uuid, content_file: uuid,
documents_count, documents_count,
allow_index_creation: true, allow_index_creation: true,
on_missing_document: Default::default(),
}, },
None, None,
false, false,

View File

@@ -31,6 +31,7 @@ fn document_addition() {
content_file: uuid, content_file: uuid,
documents_count, documents_count,
allow_index_creation: true, allow_index_creation: true,
on_missing_document: Default::default(),
}, },
None, None,
false, false,
@@ -67,6 +68,7 @@ fn document_addition_and_document_deletion() {
content_file: uuid, content_file: uuid,
documents_count, documents_count,
allow_index_creation: true, allow_index_creation: true,
on_missing_document: Default::default(),
}, },
None, None,
false, false,
@@ -133,6 +135,7 @@ fn document_deletion_and_document_addition() {
content_file: uuid, content_file: uuid,
documents_count, documents_count,
allow_index_creation: true, allow_index_creation: true,
on_missing_document: Default::default(),
}, },
None, None,
false, false,
@@ -185,6 +188,7 @@ fn test_document_replace() {
content_file: uuid, content_file: uuid,
documents_count, documents_count,
allow_index_creation: true, allow_index_creation: true,
on_missing_document: Default::default(),
}, },
None, None,
false, false,
@@ -236,6 +240,7 @@ fn test_document_update() {
content_file: uuid, content_file: uuid,
documents_count, documents_count,
allow_index_creation: true, allow_index_creation: true,
on_missing_document: Default::default(),
}, },
None, None,
false, false,
@@ -289,6 +294,7 @@ fn test_mixed_document_addition() {
content_file: uuid, content_file: uuid,
documents_count, documents_count,
allow_index_creation: true, allow_index_creation: true,
on_missing_document: Default::default(),
}, },
None, None,
false, false,
@@ -340,6 +346,7 @@ fn test_document_replace_without_autobatching() {
content_file: uuid, content_file: uuid,
documents_count, documents_count,
allow_index_creation: true, allow_index_creation: true,
on_missing_document: Default::default(),
}, },
None, None,
false, false,
@@ -395,6 +402,7 @@ fn test_document_update_without_autobatching() {
content_file: uuid, content_file: uuid,
documents_count, documents_count,
allow_index_creation: true, allow_index_creation: true,
on_missing_document: Default::default(),
}, },
None, None,
false, false,
@@ -454,6 +462,7 @@ fn test_document_addition_cant_create_index_without_index() {
content_file: uuid, content_file: uuid,
documents_count, documents_count,
allow_index_creation: false, allow_index_creation: false,
on_missing_document: Default::default(),
}, },
None, None,
false, false,
@@ -506,6 +515,7 @@ fn test_document_addition_cant_create_index_without_index_without_autobatching()
content_file: uuid, content_file: uuid,
documents_count, documents_count,
allow_index_creation: false, allow_index_creation: false,
on_missing_document: Default::default(),
}, },
None, None,
false, false,
@@ -568,6 +578,7 @@ fn test_document_addition_cant_create_index_with_index() {
content_file: uuid, content_file: uuid,
documents_count, documents_count,
allow_index_creation: false, allow_index_creation: false,
on_missing_document: Default::default(),
}, },
None, None,
false, false,
@@ -635,6 +646,7 @@ fn test_document_addition_cant_create_index_with_index_without_autobatching() {
content_file: uuid, content_file: uuid,
documents_count, documents_count,
allow_index_creation: false, allow_index_creation: false,
on_missing_document: Default::default(),
}, },
None, None,
false, false,
@@ -707,6 +719,7 @@ fn test_document_addition_mixed_rights_with_index() {
content_file: uuid, content_file: uuid,
documents_count, documents_count,
allow_index_creation, allow_index_creation,
on_missing_document: Default::default(),
}, },
None, None,
false, false,
@@ -764,6 +777,7 @@ fn test_document_addition_mixed_right_without_index_starts_with_cant_create() {
content_file: uuid, content_file: uuid,
documents_count, documents_count,
allow_index_creation, allow_index_creation,
on_missing_document: Default::default(),
}, },
None, None,
false, false,
@@ -820,6 +834,7 @@ fn test_document_addition_with_multiple_primary_key() {
content_file: uuid, content_file: uuid,
documents_count, documents_count,
allow_index_creation: true, allow_index_creation: true,
on_missing_document: Default::default(),
}, },
None, None,
false, false,
@@ -883,6 +898,7 @@ fn test_document_addition_with_multiple_primary_key_batch_wrong_key() {
content_file: uuid, content_file: uuid,
documents_count, documents_count,
allow_index_creation: true, allow_index_creation: true,
on_missing_document: Default::default(),
}, },
None, None,
false, false,
@@ -943,6 +959,7 @@ fn test_document_addition_with_bad_primary_key() {
content_file: uuid, content_file: uuid,
documents_count, documents_count,
allow_index_creation: true, allow_index_creation: true,
on_missing_document: Default::default(),
}, },
None, None,
false, false,
@@ -1029,6 +1046,7 @@ fn test_document_addition_with_set_and_null_primary_key() {
content_file: uuid, content_file: uuid,
documents_count, documents_count,
allow_index_creation: true, allow_index_creation: true,
on_missing_document: Default::default(),
}, },
None, None,
false, false,
@@ -1104,6 +1122,7 @@ fn test_document_addition_with_set_and_null_primary_key_inference_works() {
content_file: uuid, content_file: uuid,
documents_count, documents_count,
allow_index_creation: true, allow_index_creation: true,
on_missing_document: Default::default(),
}, },
None, None,
false, false,

View File

@@ -173,6 +173,7 @@ fn import_vectors() {
content_file: uuid, content_file: uuid,
documents_count, documents_count,
allow_index_creation: true, allow_index_creation: true,
on_missing_document: Default::default(),
}, },
None, None,
false, false,
@@ -263,6 +264,7 @@ fn import_vectors() {
content_file: uuid, content_file: uuid,
documents_count, documents_count,
allow_index_creation: true, allow_index_creation: true,
on_missing_document: Default::default(),
}, },
None, None,
false, false,
@@ -399,6 +401,7 @@ fn import_vectors_first_and_embedder_later() {
content_file: uuid, content_file: uuid,
documents_count, documents_count,
allow_index_creation: true, allow_index_creation: true,
on_missing_document: Default::default(),
}, },
None, None,
false, false,
@@ -539,6 +542,7 @@ fn import_vectors_first_and_embedder_later() {
content_file: uuid, content_file: uuid,
documents_count, documents_count,
allow_index_creation: true, allow_index_creation: true,
on_missing_document: Default::default(),
}, },
None, None,
false, false,
@@ -640,6 +644,7 @@ fn delete_document_containing_vector() {
content_file: uuid, content_file: uuid,
documents_count, documents_count,
allow_index_creation: false, allow_index_creation: false,
on_missing_document: Default::default(),
}, },
None, None,
false, false,
@@ -818,6 +823,7 @@ fn delete_embedder_with_user_provided_vectors() {
content_file: uuid, content_file: uuid,
documents_count, documents_count,
allow_index_creation: false, allow_index_creation: false,
on_missing_document: Default::default(),
}, },
None, None,
false, false,

View File

@@ -52,6 +52,7 @@ fn fail_in_process_batch_for_document_addition() {
content_file: uuid, content_file: uuid,
documents_count, documents_count,
allow_index_creation: true, allow_index_creation: true,
on_missing_document: Default::default(),
}, },
None, None,
false, false,
@@ -94,6 +95,7 @@ fn fail_in_update_task_after_process_batch_success_for_document_addition() {
content_file: uuid, content_file: uuid,
documents_count, documents_count,
allow_index_creation: true, allow_index_creation: true,
on_missing_document: Default::default(),
}, },
None, None,
false, false,
@@ -160,6 +162,7 @@ fn fail_in_process_batch_for_document_deletion() {
content_file: uuid, content_file: uuid,
documents_count, documents_count,
allow_index_creation: true, allow_index_creation: true,
on_missing_document: Default::default(),
}, },
None, None,
false, false,

View File

@@ -197,6 +197,7 @@ pub(crate) fn replace_document_import_task(
content_file: Uuid::from_u128(content_file_uuid), content_file: Uuid::from_u128(content_file_uuid),
documents_count, documents_count,
allow_index_creation: true, allow_index_creation: true,
on_missing_document: Default::default(),
} }
} }

View File

@@ -255,6 +255,7 @@ InvalidIndexLimit , InvalidRequest , BAD_REQU
InvalidIndexOffset , InvalidRequest , BAD_REQUEST ; InvalidIndexOffset , InvalidRequest , BAD_REQUEST ;
InvalidIndexPrimaryKey , InvalidRequest , BAD_REQUEST ; InvalidIndexPrimaryKey , InvalidRequest , BAD_REQUEST ;
InvalidIndexCustomMetadata , InvalidRequest , BAD_REQUEST ; InvalidIndexCustomMetadata , InvalidRequest , BAD_REQUEST ;
InvalidSkipCreation , InvalidRequest , BAD_REQUEST ;
InvalidIndexUid , InvalidRequest , BAD_REQUEST ; InvalidIndexUid , InvalidRequest , BAD_REQUEST ;
InvalidMultiSearchFacets , InvalidRequest , BAD_REQUEST ; InvalidMultiSearchFacets , InvalidRequest , BAD_REQUEST ;
InvalidMultiSearchFacetsByIndex , InvalidRequest , BAD_REQUEST ; InvalidMultiSearchFacetsByIndex , InvalidRequest , BAD_REQUEST ;

View File

@@ -5,7 +5,7 @@ use std::str::FromStr;
use byte_unit::Byte; use byte_unit::Byte;
use enum_iterator::Sequence; use enum_iterator::Sequence;
use milli::update::IndexDocumentsMethod; use milli::update::{IndexDocumentsMethod, MissingDocumentPolicy};
use milli::Object; use milli::Object;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use serde::{Deserialize, Serialize, Serializer}; use serde::{Deserialize, Serialize, Serializer};
@@ -114,6 +114,7 @@ pub enum KindWithContent {
content_file: Uuid, content_file: Uuid,
documents_count: u64, documents_count: u64,
allow_index_creation: bool, allow_index_creation: bool,
on_missing_document: MissingDocumentPolicy,
}, },
DocumentDeletion { DocumentDeletion {
index_uid: String, index_uid: String,

View File

@@ -629,7 +629,7 @@ fn import_dump(
let mmap = unsafe { memmap2::Mmap::map(index_reader.documents_file())? }; let mmap = unsafe { memmap2::Mmap::map(index_reader.documents_file())? };
indexer.replace_documents(&mmap)?; indexer.replace_documents(&mmap, Default::default())?;
let indexer_config = index_scheduler.indexer_config(); let indexer_config = index_scheduler.indexer_config();
let pool = &indexer_config.thread_pool; let pool = &indexer_config.thread_pool;

View File

@@ -20,7 +20,7 @@ use meilisearch_types::heed::RoTxn;
use meilisearch_types::index_uid::IndexUid; use meilisearch_types::index_uid::IndexUid;
use meilisearch_types::milli::documents::sort::recursive_sort; use meilisearch_types::milli::documents::sort::recursive_sort;
use meilisearch_types::milli::index::EmbeddingsWithMetadata; use meilisearch_types::milli::index::EmbeddingsWithMetadata;
use meilisearch_types::milli::update::IndexDocumentsMethod; use meilisearch_types::milli::update::{IndexDocumentsMethod, MissingDocumentPolicy};
use meilisearch_types::milli::vector::parsed_vectors::ExplicitVectors; use meilisearch_types::milli::vector::parsed_vectors::ExplicitVectors;
use meilisearch_types::milli::{AscDesc, DocumentId}; use meilisearch_types::milli::{AscDesc, DocumentId};
use meilisearch_types::serde_cs::vec::CS; use meilisearch_types::serde_cs::vec::CS;
@@ -687,6 +687,11 @@ pub struct UpdateDocumentsQuery {
#[param(example = "custom")] #[param(example = "custom")]
#[deserr(default, error = DeserrQueryParamError<InvalidIndexCustomMetadata>)] #[deserr(default, error = DeserrQueryParamError<InvalidIndexCustomMetadata>)]
pub custom_metadata: Option<String>, pub custom_metadata: Option<String>,
#[param(example = "true")]
#[deserr(default, try_from(&String) = from_string_skip_creation -> DeserrQueryParamError<InvalidSkipCreation>, error = DeserrQueryParamError<InvalidSkipCreation>)]
/// Only update documents if they already exist.
pub skip_creation: Option<bool>,
} }
#[derive(Deserialize, Debug, Deserr, IntoParams)] #[derive(Deserialize, Debug, Deserr, IntoParams)]
@@ -711,6 +716,23 @@ fn from_char_csv_delimiter(
} }
} }
fn from_string_skip_creation(
s: &String,
) -> Result<Option<bool>, DeserrQueryParamError<InvalidSkipCreation>> {
if s.eq_ignore_ascii_case("true") {
return Ok(Some(true));
}
if s.eq_ignore_ascii_case("false") {
return Ok(Some(false));
}
Err(DeserrQueryParamError::new(
format!("skipCreation must be either `true` or `false`. Found: `{}`", s),
Code::InvalidSkipCreation,
))
}
aggregate_methods!( aggregate_methods!(
Replaced => "Documents Added", Replaced => "Documents Added",
Updated => "Documents Updated", Updated => "Documents Updated",
@@ -840,6 +862,7 @@ pub async fn replace_documents(
params.custom_metadata, params.custom_metadata,
dry_run, dry_run,
allow_index_creation, allow_index_creation,
params.skip_creation,
&req, &req,
) )
.await?; .await?;
@@ -943,6 +966,7 @@ pub async fn update_documents(
params.custom_metadata, params.custom_metadata,
dry_run, dry_run,
allow_index_creation, allow_index_creation,
params.skip_creation,
&req, &req,
) )
.await?; .await?;
@@ -963,6 +987,7 @@ async fn document_addition(
custom_metadata: Option<String>, custom_metadata: Option<String>,
dry_run: bool, dry_run: bool,
allow_index_creation: bool, allow_index_creation: bool,
skip_creation: Option<bool>,
req: &HttpRequest, req: &HttpRequest,
) -> Result<SummarizedTaskView, MeilisearchHttpError> { ) -> Result<SummarizedTaskView, MeilisearchHttpError> {
let mime_type = extract_mime_type(req)?; let mime_type = extract_mime_type(req)?;
@@ -1083,6 +1108,11 @@ async fn document_addition(
primary_key, primary_key,
allow_index_creation, allow_index_creation,
index_uid: index_uid.to_string(), index_uid: index_uid.to_string(),
on_missing_document: if matches!(skip_creation, Some(true)) {
MissingDocumentPolicy::Skip
} else {
MissingDocumentPolicy::Create
},
}; };
let scheduler = index_scheduler.clone(); let scheduler = index_scheduler.clone();

View File

@@ -64,7 +64,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index {
let payload = unsafe { memmap2::Mmap::map(&file).unwrap() }; let payload = unsafe { memmap2::Mmap::map(&file).unwrap() };
// index documents // index documents
indexer.replace_documents(&payload).unwrap(); indexer.replace_documents(&payload, Default::default()).unwrap();
let indexer_alloc = Bump::new(); let indexer_alloc = Bump::new();
let (document_changes, operation_stats, primary_key) = indexer let (document_changes, operation_stats, primary_key) = indexer

View File

@@ -70,9 +70,11 @@ impl TempIndex {
let mut indexer = indexer::DocumentOperation::new(); let mut indexer = indexer::DocumentOperation::new();
match self.index_documents_config.update_method { match self.index_documents_config.update_method {
IndexDocumentsMethod::ReplaceDocuments => { IndexDocumentsMethod::ReplaceDocuments => {
indexer.replace_documents(&documents).unwrap() indexer.replace_documents(&documents, Default::default()).unwrap()
}
IndexDocumentsMethod::UpdateDocuments => {
indexer.update_documents(&documents, Default::default()).unwrap()
} }
IndexDocumentsMethod::UpdateDocuments => indexer.update_documents(&documents).unwrap(),
} }
let indexer_alloc = Bump::new(); let indexer_alloc = Bump::new();
@@ -232,7 +234,7 @@ fn aborting_indexation() {
{ "id": 2, "name": "bob", "age": 20 }, { "id": 2, "name": "bob", "age": 20 },
{ "id": 2, "name": "bob", "age": 20 }, { "id": 2, "name": "bob", "age": 20 },
]); ]);
indexer.replace_documents(&payload).unwrap(); indexer.replace_documents(&payload, Default::default()).unwrap();
let indexer_alloc = Bump::new(); let indexer_alloc = Bump::new();
let (document_changes, _operation_stats, primary_key) = indexer let (document_changes, _operation_stats, primary_key) = indexer

View File

@@ -67,6 +67,21 @@ pub enum IndexDocumentsMethod {
UpdateDocuments, UpdateDocuments,
} }
/// Controls whether new documents should be created when they don't already exist.
///
/// This policy is checked when processing a document whose ID is not found in the index.
/// It applies to both update and replace operations.
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum MissingDocumentPolicy {
/// Create the document if it doesn't exist. This is the default behavior.
#[default]
Create,
/// Skip the document silently if it doesn't exist. No error is returned, the document is simply
/// not indexed.
Skip,
}
pub struct IndexDocuments<'t, 'i, 'a, FP, FA> { pub struct IndexDocuments<'t, 'i, 'a, FP, FA> {
wtxn: &'t mut heed::RwTxn<'i>, wtxn: &'t mut heed::RwTxn<'i>,
index: &'i Index, index: &'i Index,
@@ -1971,10 +1986,10 @@ mod tests {
let mut new_fields_ids_map = db_fields_ids_map.clone(); let mut new_fields_ids_map = db_fields_ids_map.clone();
let mut indexer = indexer::DocumentOperation::new(); let mut indexer = indexer::DocumentOperation::new();
indexer.replace_documents(&doc1).unwrap(); indexer.replace_documents(&doc1, Default::default()).unwrap();
indexer.replace_documents(&doc2).unwrap(); indexer.replace_documents(&doc2, Default::default()).unwrap();
indexer.replace_documents(&doc3).unwrap(); indexer.replace_documents(&doc3, Default::default()).unwrap();
indexer.replace_documents(&doc4).unwrap(); indexer.replace_documents(&doc4, Default::default()).unwrap();
let indexer_alloc = Bump::new(); let indexer_alloc = Bump::new();
let (_document_changes, operation_stats, _primary_key) = indexer let (_document_changes, operation_stats, _primary_key) = indexer
@@ -2024,10 +2039,10 @@ mod tests {
let mut new_fields_ids_map = db_fields_ids_map.clone(); let mut new_fields_ids_map = db_fields_ids_map.clone();
let mut indexer = indexer::DocumentOperation::new(); let mut indexer = indexer::DocumentOperation::new();
indexer.replace_documents(&doc1).unwrap(); indexer.replace_documents(&doc1, Default::default()).unwrap();
indexer.update_documents(&doc2).unwrap(); indexer.update_documents(&doc2, Default::default()).unwrap();
indexer.update_documents(&doc3).unwrap(); indexer.update_documents(&doc3, Default::default()).unwrap();
indexer.update_documents(&doc4).unwrap(); indexer.update_documents(&doc4, Default::default()).unwrap();
let indexer_alloc = Bump::new(); let indexer_alloc = Bump::new();
let (document_changes, operation_stats, primary_key) = indexer let (document_changes, operation_stats, primary_key) = indexer
@@ -2112,11 +2127,11 @@ mod tests {
let mut new_fields_ids_map = db_fields_ids_map.clone(); let mut new_fields_ids_map = db_fields_ids_map.clone();
let mut indexer = indexer::DocumentOperation::new(); let mut indexer = indexer::DocumentOperation::new();
indexer.replace_documents(&doc1).unwrap(); indexer.replace_documents(&doc1, Default::default()).unwrap();
indexer.update_documents(&doc2).unwrap(); indexer.update_documents(&doc2, Default::default()).unwrap();
indexer.update_documents(&doc3).unwrap(); indexer.update_documents(&doc3, Default::default()).unwrap();
indexer.replace_documents(&doc4).unwrap(); indexer.replace_documents(&doc4, Default::default()).unwrap();
indexer.update_documents(&doc5).unwrap(); indexer.update_documents(&doc5, Default::default()).unwrap();
let indexer_alloc = Bump::new(); let indexer_alloc = Bump::new();
let (document_changes, operation_stats, primary_key) = indexer let (document_changes, operation_stats, primary_key) = indexer
@@ -2307,7 +2322,7 @@ mod tests {
let indexer_alloc = Bump::new(); let indexer_alloc = Bump::new();
let embedders = RuntimeEmbedders::default(); let embedders = RuntimeEmbedders::default();
let mut indexer = indexer::DocumentOperation::new(); let mut indexer = indexer::DocumentOperation::new();
indexer.replace_documents(&documents).unwrap(); indexer.replace_documents(&documents, Default::default()).unwrap();
indexer.delete_documents(&["2"]); indexer.delete_documents(&["2"]);
let (document_changes, _operation_stats, primary_key) = indexer let (document_changes, _operation_stats, primary_key) = indexer
.into_changes( .into_changes(
@@ -2362,13 +2377,13 @@ mod tests {
{ "id": 3, "name": "jean", "age": 25 }, { "id": 3, "name": "jean", "age": 25 },
]); ]);
let mut indexer = indexer::DocumentOperation::new(); let mut indexer = indexer::DocumentOperation::new();
indexer.update_documents(&documents).unwrap(); indexer.update_documents(&documents, Default::default()).unwrap();
let documents = documents!([ let documents = documents!([
{ "id": 2, "catto": "jorts" }, { "id": 2, "catto": "jorts" },
{ "id": 3, "legs": 4 }, { "id": 3, "legs": 4 },
]); ]);
indexer.update_documents(&documents).unwrap(); indexer.update_documents(&documents, Default::default()).unwrap();
indexer.delete_documents(&["1", "2"]); indexer.delete_documents(&["1", "2"]);
let indexer_alloc = Bump::new(); let indexer_alloc = Bump::new();
@@ -2426,7 +2441,7 @@ mod tests {
let indexer_alloc = Bump::new(); let indexer_alloc = Bump::new();
let embedders = RuntimeEmbedders::default(); let embedders = RuntimeEmbedders::default();
let mut indexer = indexer::DocumentOperation::new(); let mut indexer = indexer::DocumentOperation::new();
indexer.update_documents(&documents).unwrap(); indexer.update_documents(&documents, Default::default()).unwrap();
let (document_changes, _operation_stats, primary_key) = indexer let (document_changes, _operation_stats, primary_key) = indexer
.into_changes( .into_changes(
@@ -2479,7 +2494,7 @@ mod tests {
let indexer_alloc = Bump::new(); let indexer_alloc = Bump::new();
let embedders = RuntimeEmbedders::default(); let embedders = RuntimeEmbedders::default();
let mut indexer = indexer::DocumentOperation::new(); let mut indexer = indexer::DocumentOperation::new();
indexer.update_documents(&documents).unwrap(); indexer.update_documents(&documents, Default::default()).unwrap();
indexer.delete_documents(&["1", "2"]); indexer.delete_documents(&["1", "2"]);
let (document_changes, _operation_stats, primary_key) = indexer let (document_changes, _operation_stats, primary_key) = indexer
@@ -2536,7 +2551,7 @@ mod tests {
{ "id": 2, "doggo": { "name": "jean", "age": 20 } }, { "id": 2, "doggo": { "name": "jean", "age": 20 } },
{ "id": 3, "name": "bob", "age": 25 }, { "id": 3, "name": "bob", "age": 25 },
]); ]);
indexer.update_documents(&documents).unwrap(); indexer.update_documents(&documents, Default::default()).unwrap();
let (document_changes, _operation_stats, primary_key) = indexer let (document_changes, _operation_stats, primary_key) = indexer
.into_changes( .into_changes(
@@ -2595,7 +2610,7 @@ mod tests {
{ "id": 2, "doggo": { "name": "jean", "age": 20 } }, { "id": 2, "doggo": { "name": "jean", "age": 20 } },
{ "id": 3, "name": "bob", "age": 25 }, { "id": 3, "name": "bob", "age": 25 },
]); ]);
indexer.update_documents(&documents).unwrap(); indexer.update_documents(&documents, Default::default()).unwrap();
indexer.delete_documents(&["1", "2", "1", "2"]); indexer.delete_documents(&["1", "2", "1", "2"]);
@@ -2651,7 +2666,7 @@ mod tests {
let documents = documents!([ let documents = documents!([
{ "id": 1, "doggo": "kevin" }, { "id": 1, "doggo": "kevin" },
]); ]);
indexer.update_documents(&documents).unwrap(); indexer.update_documents(&documents, Default::default()).unwrap();
let (document_changes, _operation_stats, primary_key) = indexer let (document_changes, _operation_stats, primary_key) = indexer
.into_changes( .into_changes(
@@ -2705,7 +2720,7 @@ mod tests {
{ "id": 1, "catto": "jorts" }, { "id": 1, "catto": "jorts" },
]); ]);
indexer.replace_documents(&documents).unwrap(); indexer.replace_documents(&documents, Default::default()).unwrap();
let (document_changes, _operation_stats, primary_key) = indexer let (document_changes, _operation_stats, primary_key) = indexer
.into_changes( .into_changes(
@@ -2916,7 +2931,7 @@ mod tests {
let documents = documents!([ let documents = documents!([
{ "id": 1, "doggo": "bernese" }, { "id": 1, "doggo": "bernese" },
]); ]);
indexer.replace_documents(&documents).unwrap(); indexer.replace_documents(&documents, Default::default()).unwrap();
// FINISHING // FINISHING
let (document_changes, _operation_stats, primary_key) = indexer let (document_changes, _operation_stats, primary_key) = indexer
@@ -2978,7 +2993,7 @@ mod tests {
let documents = documents!([ let documents = documents!([
{ "id": 0, "catto": "jorts" }, { "id": 0, "catto": "jorts" },
]); ]);
indexer.replace_documents(&documents).unwrap(); indexer.replace_documents(&documents, Default::default()).unwrap();
let (document_changes, _operation_stats, primary_key) = indexer let (document_changes, _operation_stats, primary_key) = indexer
.into_changes( .into_changes(
@@ -3036,7 +3051,7 @@ mod tests {
let documents = documents!([ let documents = documents!([
{ "id": 1, "catto": "jorts" }, { "id": 1, "catto": "jorts" },
]); ]);
indexer.replace_documents(&documents).unwrap(); indexer.replace_documents(&documents, Default::default()).unwrap();
let (document_changes, _operation_stats, primary_key) = indexer let (document_changes, _operation_stats, primary_key) = indexer
.into_changes( .into_changes(

View File

@@ -21,7 +21,7 @@ use crate::update::new::indexer::current_edition::sharding::Shards;
use crate::update::new::steps::IndexingStep; use crate::update::new::steps::IndexingStep;
use crate::update::new::thread_local::MostlySend; use crate::update::new::thread_local::MostlySend;
use crate::update::new::{DocumentIdentifiers, Insertion, Update}; use crate::update::new::{DocumentIdentifiers, Insertion, Update};
use crate::update::{AvailableIds, IndexDocumentsMethod}; use crate::update::{AvailableIds, IndexDocumentsMethod, MissingDocumentPolicy};
use crate::{DocumentId, Error, FieldsIdsMap, Index, InternalError, Result, UserError}; use crate::{DocumentId, Error, FieldsIdsMap, Index, InternalError, Result, UserError};
#[derive(Default)] #[derive(Default)]
@@ -37,20 +37,28 @@ impl<'pl> DocumentOperation<'pl> {
/// Append a replacement of documents. /// Append a replacement of documents.
/// ///
/// The payload is expected to be in the NDJSON format /// The payload is expected to be in the NDJSON format
pub fn replace_documents(&mut self, payload: &'pl Mmap) -> Result<()> { pub fn replace_documents(
&mut self,
payload: &'pl Mmap,
on_missing_document: MissingDocumentPolicy,
) -> Result<()> {
#[cfg(unix)] #[cfg(unix)]
payload.advise(memmap2::Advice::Sequential)?; payload.advise(memmap2::Advice::Sequential)?;
self.operations.push(Payload::Replace(&payload[..])); self.operations.push(Payload::Replace { payload: &payload[..], on_missing_document });
Ok(()) Ok(())
} }
/// Append an update of documents. /// Append an update of documents.
/// ///
/// The payload is expected to be in the NDJSON format /// The payload is expected to be in the NDJSON format
pub fn update_documents(&mut self, payload: &'pl Mmap) -> Result<()> { pub fn update_documents(
&mut self,
payload: &'pl Mmap,
on_missing_document: MissingDocumentPolicy,
) -> Result<()> {
#[cfg(unix)] #[cfg(unix)]
payload.advise(memmap2::Advice::Sequential)?; payload.advise(memmap2::Advice::Sequential)?;
self.operations.push(Payload::Update(&payload[..])); self.operations.push(Payload::Update { payload: &payload[..], on_missing_document });
Ok(()) Ok(())
} }
@@ -98,34 +106,40 @@ impl<'pl> DocumentOperation<'pl> {
let mut bytes = 0; let mut bytes = 0;
let result = match operation { let result = match operation {
Payload::Replace(payload) => extract_addition_payload_changes( Payload::Replace { payload, on_missing_document } => {
indexer, extract_addition_payload_changes(
index, indexer,
rtxn, index,
primary_key_from_op, rtxn,
&mut primary_key, primary_key_from_op,
new_fields_ids_map, &mut primary_key,
&mut available_docids, new_fields_ids_map,
&mut bytes, &mut available_docids,
&docids_version_offsets, &mut bytes,
IndexDocumentsMethod::ReplaceDocuments, &docids_version_offsets,
shards, IndexDocumentsMethod::ReplaceDocuments,
payload, shards,
), payload,
Payload::Update(payload) => extract_addition_payload_changes( on_missing_document,
indexer, )
index, }
rtxn, Payload::Update { payload, on_missing_document } => {
primary_key_from_op, extract_addition_payload_changes(
&mut primary_key, indexer,
new_fields_ids_map, index,
&mut available_docids, rtxn,
&mut bytes, primary_key_from_op,
&docids_version_offsets, &mut primary_key,
IndexDocumentsMethod::UpdateDocuments, new_fields_ids_map,
shards, &mut available_docids,
payload, &mut bytes,
), &docids_version_offsets,
IndexDocumentsMethod::UpdateDocuments,
shards,
payload,
on_missing_document,
)
}
Payload::Deletion(to_delete) => extract_deletion_payload_changes( Payload::Deletion(to_delete) => extract_deletion_payload_changes(
index, index,
rtxn, rtxn,
@@ -180,6 +194,7 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>(
method: IndexDocumentsMethod, method: IndexDocumentsMethod,
shards: Option<&Shards>, shards: Option<&Shards>,
payload: &'pl [u8], payload: &'pl [u8],
on_missing_document: MissingDocumentPolicy,
) -> Result<hashbrown::HashMap<&'pl str, PayloadOperations<'pl>>> { ) -> Result<hashbrown::HashMap<&'pl str, PayloadOperations<'pl>>> {
use IndexDocumentsMethod::{ReplaceDocuments, UpdateDocuments}; use IndexDocumentsMethod::{ReplaceDocuments, UpdateDocuments};
@@ -271,6 +286,10 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>(
match method { match method {
ReplaceDocuments => { ReplaceDocuments => {
if matches!(on_missing_document, MissingDocumentPolicy::Skip) {
continue;
}
entry.insert(PayloadOperations::new_replacement( entry.insert(PayloadOperations::new_replacement(
docid, docid,
true, // is new true, // is new
@@ -278,6 +297,10 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>(
)); ));
} }
UpdateDocuments => { UpdateDocuments => {
if matches!(on_missing_document, MissingDocumentPolicy::Skip) {
continue;
}
entry.insert(PayloadOperations::new_update( entry.insert(PayloadOperations::new_update(
docid, docid,
true, // is new true, // is new
@@ -297,6 +320,12 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>(
}, },
Entry::Vacant(entry) => match method { Entry::Vacant(entry) => match method {
ReplaceDocuments => { ReplaceDocuments => {
if payload_operations.is_new
&& matches!(on_missing_document, MissingDocumentPolicy::Skip)
{
continue;
}
entry.insert(PayloadOperations::new_replacement( entry.insert(PayloadOperations::new_replacement(
payload_operations.docid, payload_operations.docid,
payload_operations.is_new, payload_operations.is_new,
@@ -304,6 +333,12 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>(
)); ));
} }
UpdateDocuments => { UpdateDocuments => {
if payload_operations.is_new
&& matches!(on_missing_document, MissingDocumentPolicy::Skip)
{
continue;
}
entry.insert(PayloadOperations::new_update( entry.insert(PayloadOperations::new_update(
payload_operations.docid, payload_operations.docid,
payload_operations.is_new, payload_operations.is_new,
@@ -448,8 +483,8 @@ pub struct DocumentOperationChanges<'pl> {
} }
pub enum Payload<'pl> { pub enum Payload<'pl> {
Replace(&'pl [u8]), Replace { payload: &'pl [u8], on_missing_document: MissingDocumentPolicy },
Update(&'pl [u8]), Update { payload: &'pl [u8], on_missing_document: MissingDocumentPolicy },
Deletion(&'pl [&'pl str]), Deletion(&'pl [&'pl str]),
} }

View File

@@ -47,7 +47,7 @@ fn test_facet_distribution_with_no_facet_values() {
let documents = mmap_from_objects(vec![doc1, doc2]); let documents = mmap_from_objects(vec![doc1, doc2]);
// index documents // index documents
indexer.replace_documents(&documents).unwrap(); indexer.replace_documents(&documents, Default::default()).unwrap();
let indexer_alloc = Bump::new(); let indexer_alloc = Bump::new();
let (document_changes, _operation_stats, primary_key) = indexer let (document_changes, _operation_stats, primary_key) = indexer

View File

@@ -85,7 +85,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index {
let payload = unsafe { memmap2::Mmap::map(&file).unwrap() }; let payload = unsafe { memmap2::Mmap::map(&file).unwrap() };
// index documents // index documents
indexer.replace_documents(&payload).unwrap(); indexer.replace_documents(&payload, Default::default()).unwrap();
let indexer_alloc = Bump::new(); let indexer_alloc = Bump::new();
let (document_changes, operation_stats, primary_key) = indexer let (document_changes, operation_stats, primary_key) = indexer

View File

@@ -319,7 +319,7 @@ fn criteria_ascdesc() {
file.sync_all().unwrap(); file.sync_all().unwrap();
let payload = unsafe { memmap2::Mmap::map(&file).unwrap() }; let payload = unsafe { memmap2::Mmap::map(&file).unwrap() };
indexer.replace_documents(&payload).unwrap(); indexer.replace_documents(&payload, Default::default()).unwrap();
let (document_changes, _operation_stats, primary_key) = indexer let (document_changes, _operation_stats, primary_key) = indexer
.into_changes( .into_changes(
&indexer_alloc, &indexer_alloc,

View File

@@ -126,7 +126,7 @@ fn test_typo_disabled_on_word() {
let embedders = RuntimeEmbedders::default(); let embedders = RuntimeEmbedders::default();
let mut indexer = indexer::DocumentOperation::new(); let mut indexer = indexer::DocumentOperation::new();
indexer.replace_documents(&documents).unwrap(); indexer.replace_documents(&documents, Default::default()).unwrap();
let indexer_alloc = Bump::new(); let indexer_alloc = Bump::new();
let (document_changes, _operation_stats, primary_key) = indexer let (document_changes, _operation_stats, primary_key) = indexer