diff --git a/Cargo.lock b/Cargo.lock index a36c568b5..baeb7b4b9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -611,6 +611,15 @@ dependencies = [ "wyz", ] +[[package]] +name = "blake2" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46502ad458c9a52b69d4d4d32775c788b7a1b85e8bc9d482d92250fc0e3f8efe" +dependencies = [ + "digest", +] + [[package]] name = "block-buffer" version = "0.10.4" @@ -3800,6 +3809,7 @@ dependencies = [ "big_s", "bimap", "bincode", + "blake2", "bstr", "bumpalo", "bumparaw-collections", diff --git a/crates/index-scheduler/src/scheduler/process_index_operation.rs b/crates/index-scheduler/src/scheduler/process_index_operation.rs index 093c6209d..6be4f4d31 100644 --- a/crates/index-scheduler/src/scheduler/process_index_operation.rs +++ b/crates/index-scheduler/src/scheduler/process_index_operation.rs @@ -115,6 +115,19 @@ impl IndexScheduler { let indexer_config = self.index_mapper.indexer_config(); let pool = &indexer_config.thread_pool; + let network = self.network(); + let shards: Vec<&str> = network + .local + .as_deref() + .into_iter() + .chain( + network + .remotes + .keys() + .map(|s| s.as_str()) + .filter(|s| Some(s) != network.local.as_deref().as_ref()), + ) + .collect(); progress.update_progress(DocumentOperationProgress::ComputingDocumentChanges); let (document_changes, operation_stats, primary_key) = indexer @@ -126,6 +139,7 @@ impl IndexScheduler { &mut new_fields_ids_map, &|| must_stop_processing.get(), progress.clone(), + &shards, ) .map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?; diff --git a/crates/meilisearch/tests/network/mod.rs b/crates/meilisearch/tests/network/mod.rs index 60f73ed40..da9536a2f 100644 --- a/crates/meilisearch/tests/network/mod.rs +++ b/crates/meilisearch/tests/network/mod.rs @@ -623,3 +623,88 @@ async fn get_and_set_network() { } "###); } + + +#[actix_rt::test] +async fn index_with_shards() { + let server = Server::new().await; + + let (response, code) = server.set_features(json!({"network": true})).await; + meili_snap::snapshot!(code, @"200 OK"); + meili_snap::snapshot!(meili_snap::json_string!(response["network"]), @r#"true"#); + + // adding self + let (response, code) = server.set_network(json!({"self": "myself"})).await; + meili_snap::snapshot!(code, @"200 OK"); + meili_snap::snapshot!(meili_snap::json_string!(response), @r###" + { + "self": "myself", + "remotes": {} + } + "###); + + // adding remotes + let (response, code) = server + .set_network(json!({ + "self": "myself", + "remotes": { + "myself": { + "url": "http://localhost:7700" + }, + "thy": { + "url": "http://localhost:7701", + "searchApiKey": "foo" + } + }})) + .await; + + meili_snap::snapshot!(code, @"200 OK"); + meili_snap::snapshot!(meili_snap::json_string!(response), @r###" + { + "self": "myself", + "remotes": { + "myself": { + "url": "http://localhost:7700", + "searchApiKey": null + }, + "thy": { + "url": "http://localhost:7701", + "searchApiKey": "foo" + } + } + } + "###); + + // adding one remote + let (response, code) = server + .set_network(json!({"remotes": { + "them": { + "url": "http://localhost:7702", + "searchApiKey": "baz" + } + }})) + .await; + + meili_snap::snapshot!(code, @"200 OK"); + meili_snap::snapshot!(meili_snap::json_string!(response), @r###" + { + "self": "myself", + "remotes": { + "myself": { + "url": "http://localhost:7700", + "searchApiKey": null + }, + "them": { + "url": "http://localhost:7702", + "searchApiKey": "baz" + }, + "thy": { + "url": "http://localhost:7701", + "searchApiKey": "bar" + } + } + } + "###); + + +} diff --git a/crates/milli/Cargo.toml b/crates/milli/Cargo.toml index 909a5f8f9..717c17183 100644 --- a/crates/milli/Cargo.toml +++ b/crates/milli/Cargo.toml @@ -111,6 +111,7 @@ utoipa = { version = "5.3.1", features = [ "openapi_extensions", ] } lru = "0.13.0" +blake2 = "0.10.6" [dev-dependencies] mimalloc = { version = "0.1.43", default-features = false } diff --git a/crates/milli/src/search/new/tests/integration.rs b/crates/milli/src/search/new/tests/integration.rs index 4a6cc9b90..9551ef290 100644 --- a/crates/milli/src/search/new/tests/integration.rs +++ b/crates/milli/src/search/new/tests/integration.rs @@ -76,6 +76,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index { &mut new_fields_ids_map, &|| false, Progress::default(), + &[], ) .unwrap(); diff --git a/crates/milli/src/test_index.rs b/crates/milli/src/test_index.rs index dfd570b96..d37e918e1 100644 --- a/crates/milli/src/test_index.rs +++ b/crates/milli/src/test_index.rs @@ -84,6 +84,7 @@ impl TempIndex { &mut new_fields_ids_map, &|| false, Progress::default(), + &[], )?; if let Some(error) = operation_stats.into_iter().find_map(|stat| stat.error) { @@ -166,6 +167,7 @@ impl TempIndex { &mut new_fields_ids_map, &|| false, Progress::default(), + &[], )?; if let Some(error) = operation_stats.into_iter().find_map(|stat| stat.error) { @@ -240,6 +242,7 @@ fn aborting_indexation() { &mut new_fields_ids_map, &|| false, Progress::default(), + &[], ) .unwrap(); diff --git a/crates/milli/src/update/index_documents/mod.rs b/crates/milli/src/update/index_documents/mod.rs index 379b991e0..577ce9987 100644 --- a/crates/milli/src/update/index_documents/mod.rs +++ b/crates/milli/src/update/index_documents/mod.rs @@ -1955,6 +1955,7 @@ mod tests { &mut new_fields_ids_map, &|| false, Progress::default(), + &[], ) .unwrap(); @@ -2007,6 +2008,7 @@ mod tests { &mut new_fields_ids_map, &|| false, Progress::default(), + &[], ) .unwrap(); @@ -2094,6 +2096,7 @@ mod tests { &mut new_fields_ids_map, &|| false, Progress::default(), + &[], ) .unwrap(); @@ -2282,6 +2285,7 @@ mod tests { &mut new_fields_ids_map, &|| false, Progress::default(), + &[], ) .unwrap(); @@ -2344,6 +2348,7 @@ mod tests { &mut new_fields_ids_map, &|| false, Progress::default(), + &[], ) .unwrap(); @@ -2397,6 +2402,7 @@ mod tests { &mut new_fields_ids_map, &|| false, Progress::default(), + &[], ) .unwrap(); @@ -2449,6 +2455,7 @@ mod tests { &mut new_fields_ids_map, &|| false, Progress::default(), + &[], ) .unwrap(); @@ -2503,6 +2510,7 @@ mod tests { &mut new_fields_ids_map, &|| false, Progress::default(), + &[], ) .unwrap(); @@ -2562,6 +2570,7 @@ mod tests { &mut new_fields_ids_map, &|| false, Progress::default(), + &[], ) .unwrap(); @@ -2614,6 +2623,7 @@ mod tests { &mut new_fields_ids_map, &|| false, Progress::default(), + &[], ) .unwrap(); @@ -2666,6 +2676,7 @@ mod tests { &mut new_fields_ids_map, &|| false, Progress::default(), + &[], ) .unwrap(); @@ -2864,6 +2875,7 @@ mod tests { &mut new_fields_ids_map, &|| false, Progress::default(), + &[], ) .unwrap(); @@ -2923,6 +2935,7 @@ mod tests { &mut new_fields_ids_map, &|| false, Progress::default(), + &[], ) .unwrap(); @@ -2979,6 +2992,7 @@ mod tests { &mut new_fields_ids_map, &|| false, Progress::default(), + &[], ) .unwrap(); diff --git a/crates/milli/src/update/new/indexer/document_operation.rs b/crates/milli/src/update/new/indexer/document_operation.rs index ca433c043..00d73895f 100644 --- a/crates/milli/src/update/new/indexer/document_operation.rs +++ b/crates/milli/src/update/new/indexer/document_operation.rs @@ -1,5 +1,6 @@ use std::sync::atomic::Ordering; +use blake2::{Blake2b512, Digest}; use bumpalo::collections::CollectIn; use bumpalo::Bump; use bumparaw_collections::RawMap; @@ -71,6 +72,7 @@ impl<'pl> DocumentOperation<'pl> { new_fields_ids_map: &mut FieldsIdsMap, must_stop_processing: &MSP, progress: Progress, + shards: &[&str], ) -> Result<(DocumentOperationChanges<'pl>, Vec, Option>)> where MSP: Fn() -> bool, @@ -108,6 +110,7 @@ impl<'pl> DocumentOperation<'pl> { &docids_version_offsets, IndexDocumentsMethod::ReplaceDocuments, payload, + shards, ), Payload::Update(payload) => extract_addition_payload_changes( indexer, @@ -121,6 +124,7 @@ impl<'pl> DocumentOperation<'pl> { &docids_version_offsets, IndexDocumentsMethod::UpdateDocuments, payload, + shards, ), Payload::Deletion(to_delete) => extract_deletion_payload_changes( index, @@ -128,6 +132,7 @@ impl<'pl> DocumentOperation<'pl> { &mut available_docids, &docids_version_offsets, to_delete, + shards, ), }; @@ -174,6 +179,7 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>( main_docids_version_offsets: &hashbrown::HashMap<&'pl str, PayloadOperations<'pl>>, method: IndexDocumentsMethod, payload: &'pl [u8], + shards: &[&str], ) -> Result>> { use IndexDocumentsMethod::{ReplaceDocuments, UpdateDocuments}; @@ -184,7 +190,7 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>( while let Some(doc) = iter.next().transpose().map_err(InternalError::SerdeJson)? { *bytes = previous_offset as u64; - // Only guess the primary key if it is the first document + // Only guess the primary key if it is the first document and whatever the shard is let retrieved_primary_key = if previous_offset == 0 { let doc = RawMap::from_raw_value_and_hasher(doc, FxBuildHasher, indexer) .map(Some) @@ -213,6 +219,11 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>( let external_id = retrieved_primary_key.extract_fields_and_docid(doc, new_fields_ids_map, indexer)?; + if must_be_skipped(external_id.to_de(), shards) { + previous_offset = iter.byte_offset(); + continue; + } + let external_id = external_id.to_de(); let current_offset = iter.byte_offset(); let document_offset = DocumentOffset { content: &payload[previous_offset..current_offset] }; @@ -330,10 +341,15 @@ fn extract_deletion_payload_changes<'s, 'pl: 's>( available_docids: &mut AvailableIds, main_docids_version_offsets: &hashbrown::HashMap<&'s str, PayloadOperations<'pl>>, to_delete: &'pl [&'pl str], + shards: &[&str], ) -> Result>> { let mut new_docids_version_offsets = hashbrown::HashMap::<&str, PayloadOperations<'pl>>::new(); for external_id in to_delete { + if must_be_skipped(external_id, shards) { + continue; + } + match main_docids_version_offsets.get(external_id) { None => { match index.external_documents_ids().get(rtxn, external_id) { @@ -612,3 +628,25 @@ pub fn first_update_pointer(docops: &[InnerDocOp]) -> Option { InnerDocOp::Deletion => None, }) } + +fn must_be_skipped(pk: &str, shards: &[&str]) -> bool { + // Special case for no shard, it means we must index the document + if shards.is_empty() { + return false; + } + + // If there is multiple shards, the fisrt shard is ourselves + let mut hasher = Blake2b512::new(); + hasher.update(shards[0].as_bytes()); + hasher.update(pk.as_bytes()); + let me = hasher.finalize(); + + shards.iter().skip(1).any(|shard| { + let mut hasher = Blake2b512::new(); + hasher.update(shard.as_bytes()); + hasher.update(pk.as_bytes()); + let them = hasher.finalize(); + + me < them + }) +} diff --git a/crates/milli/tests/search/facet_distribution.rs b/crates/milli/tests/search/facet_distribution.rs index 8934cbea4..b3592b20a 100644 --- a/crates/milli/tests/search/facet_distribution.rs +++ b/crates/milli/tests/search/facet_distribution.rs @@ -59,6 +59,7 @@ fn test_facet_distribution_with_no_facet_values() { &mut new_fields_ids_map, &|| false, Progress::default(), + &[], ) .unwrap(); diff --git a/crates/milli/tests/search/mod.rs b/crates/milli/tests/search/mod.rs index 906956716..64245e9f1 100644 --- a/crates/milli/tests/search/mod.rs +++ b/crates/milli/tests/search/mod.rs @@ -95,6 +95,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index { &mut new_fields_ids_map, &|| false, Progress::default(), + &[], ) .unwrap(); diff --git a/crates/milli/tests/search/query_criteria.rs b/crates/milli/tests/search/query_criteria.rs index 1acc89484..3dc9cb89f 100644 --- a/crates/milli/tests/search/query_criteria.rs +++ b/crates/milli/tests/search/query_criteria.rs @@ -329,6 +329,7 @@ fn criteria_ascdesc() { &mut new_fields_ids_map, &|| false, Progress::default(), + &[], ) .unwrap(); diff --git a/crates/milli/tests/search/typo_tolerance.rs b/crates/milli/tests/search/typo_tolerance.rs index 3c0717063..57877ae69 100644 --- a/crates/milli/tests/search/typo_tolerance.rs +++ b/crates/milli/tests/search/typo_tolerance.rs @@ -138,6 +138,7 @@ fn test_typo_disabled_on_word() { &mut new_fields_ids_map, &|| false, Progress::default(), + &[], ) .unwrap();