implement a first POC for sharding where we only index the document that concerns our shard

This commit is contained in:
Tamo 2025-06-04 11:05:18 +02:00
parent 5c14a25d5a
commit 1ef5b1a2a8
12 changed files with 171 additions and 1 deletions

10
Cargo.lock generated
View File

@ -611,6 +611,15 @@ dependencies = [
"wyz",
]
[[package]]
name = "blake2"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46502ad458c9a52b69d4d4d32775c788b7a1b85e8bc9d482d92250fc0e3f8efe"
dependencies = [
"digest",
]
[[package]]
name = "block-buffer"
version = "0.10.4"
@ -3800,6 +3809,7 @@ dependencies = [
"big_s",
"bimap",
"bincode",
"blake2",
"bstr",
"bumpalo",
"bumparaw-collections",

View File

@ -115,6 +115,19 @@ impl IndexScheduler {
let indexer_config = self.index_mapper.indexer_config();
let pool = &indexer_config.thread_pool;
let network = self.network();
let shards: Vec<&str> = network
.local
.as_deref()
.into_iter()
.chain(
network
.remotes
.keys()
.map(|s| s.as_str())
.filter(|s| Some(s) != network.local.as_deref().as_ref()),
)
.collect();
progress.update_progress(DocumentOperationProgress::ComputingDocumentChanges);
let (document_changes, operation_stats, primary_key) = indexer
@ -126,6 +139,7 @@ impl IndexScheduler {
&mut new_fields_ids_map,
&|| must_stop_processing.get(),
progress.clone(),
&shards,
)
.map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?;

View File

@ -623,3 +623,88 @@ async fn get_and_set_network() {
}
"###);
}
#[actix_rt::test]
async fn index_with_shards() {
let server = Server::new().await;
let (response, code) = server.set_features(json!({"network": true})).await;
meili_snap::snapshot!(code, @"200 OK");
meili_snap::snapshot!(meili_snap::json_string!(response["network"]), @r#"true"#);
// adding self
let (response, code) = server.set_network(json!({"self": "myself"})).await;
meili_snap::snapshot!(code, @"200 OK");
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"self": "myself",
"remotes": {}
}
"###);
// adding remotes
let (response, code) = server
.set_network(json!({
"self": "myself",
"remotes": {
"myself": {
"url": "http://localhost:7700"
},
"thy": {
"url": "http://localhost:7701",
"searchApiKey": "foo"
}
}}))
.await;
meili_snap::snapshot!(code, @"200 OK");
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"self": "myself",
"remotes": {
"myself": {
"url": "http://localhost:7700",
"searchApiKey": null
},
"thy": {
"url": "http://localhost:7701",
"searchApiKey": "foo"
}
}
}
"###);
// adding one remote
let (response, code) = server
.set_network(json!({"remotes": {
"them": {
"url": "http://localhost:7702",
"searchApiKey": "baz"
}
}}))
.await;
meili_snap::snapshot!(code, @"200 OK");
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"self": "myself",
"remotes": {
"myself": {
"url": "http://localhost:7700",
"searchApiKey": null
},
"them": {
"url": "http://localhost:7702",
"searchApiKey": "baz"
},
"thy": {
"url": "http://localhost:7701",
"searchApiKey": "bar"
}
}
}
"###);
}

View File

@ -111,6 +111,7 @@ utoipa = { version = "5.3.1", features = [
"openapi_extensions",
] }
lru = "0.13.0"
blake2 = "0.10.6"
[dev-dependencies]
mimalloc = { version = "0.1.43", default-features = false }

View File

@ -76,6 +76,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
&[],
)
.unwrap();

View File

@ -84,6 +84,7 @@ impl TempIndex {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
&[],
)?;
if let Some(error) = operation_stats.into_iter().find_map(|stat| stat.error) {
@ -166,6 +167,7 @@ impl TempIndex {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
&[],
)?;
if let Some(error) = operation_stats.into_iter().find_map(|stat| stat.error) {
@ -240,6 +242,7 @@ fn aborting_indexation() {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
&[],
)
.unwrap();

View File

@ -1955,6 +1955,7 @@ mod tests {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
&[],
)
.unwrap();
@ -2007,6 +2008,7 @@ mod tests {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
&[],
)
.unwrap();
@ -2094,6 +2096,7 @@ mod tests {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
&[],
)
.unwrap();
@ -2282,6 +2285,7 @@ mod tests {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
&[],
)
.unwrap();
@ -2344,6 +2348,7 @@ mod tests {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
&[],
)
.unwrap();
@ -2397,6 +2402,7 @@ mod tests {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
&[],
)
.unwrap();
@ -2449,6 +2455,7 @@ mod tests {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
&[],
)
.unwrap();
@ -2503,6 +2510,7 @@ mod tests {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
&[],
)
.unwrap();
@ -2562,6 +2570,7 @@ mod tests {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
&[],
)
.unwrap();
@ -2614,6 +2623,7 @@ mod tests {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
&[],
)
.unwrap();
@ -2666,6 +2676,7 @@ mod tests {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
&[],
)
.unwrap();
@ -2864,6 +2875,7 @@ mod tests {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
&[],
)
.unwrap();
@ -2923,6 +2935,7 @@ mod tests {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
&[],
)
.unwrap();
@ -2979,6 +2992,7 @@ mod tests {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
&[],
)
.unwrap();

View File

@ -1,5 +1,6 @@
use std::sync::atomic::Ordering;
use blake2::{Blake2b512, Digest};
use bumpalo::collections::CollectIn;
use bumpalo::Bump;
use bumparaw_collections::RawMap;
@ -71,6 +72,7 @@ impl<'pl> DocumentOperation<'pl> {
new_fields_ids_map: &mut FieldsIdsMap,
must_stop_processing: &MSP,
progress: Progress,
shards: &[&str],
) -> Result<(DocumentOperationChanges<'pl>, Vec<PayloadStats>, Option<PrimaryKey<'pl>>)>
where
MSP: Fn() -> bool,
@ -108,6 +110,7 @@ impl<'pl> DocumentOperation<'pl> {
&docids_version_offsets,
IndexDocumentsMethod::ReplaceDocuments,
payload,
shards,
),
Payload::Update(payload) => extract_addition_payload_changes(
indexer,
@ -121,6 +124,7 @@ impl<'pl> DocumentOperation<'pl> {
&docids_version_offsets,
IndexDocumentsMethod::UpdateDocuments,
payload,
shards,
),
Payload::Deletion(to_delete) => extract_deletion_payload_changes(
index,
@ -128,6 +132,7 @@ impl<'pl> DocumentOperation<'pl> {
&mut available_docids,
&docids_version_offsets,
to_delete,
shards,
),
};
@ -174,6 +179,7 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>(
main_docids_version_offsets: &hashbrown::HashMap<&'pl str, PayloadOperations<'pl>>,
method: IndexDocumentsMethod,
payload: &'pl [u8],
shards: &[&str],
) -> Result<hashbrown::HashMap<&'pl str, PayloadOperations<'pl>>> {
use IndexDocumentsMethod::{ReplaceDocuments, UpdateDocuments};
@ -184,7 +190,7 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>(
while let Some(doc) = iter.next().transpose().map_err(InternalError::SerdeJson)? {
*bytes = previous_offset as u64;
// Only guess the primary key if it is the first document
// Only guess the primary key if it is the first document and whatever the shard is
let retrieved_primary_key = if previous_offset == 0 {
let doc = RawMap::from_raw_value_and_hasher(doc, FxBuildHasher, indexer)
.map(Some)
@ -213,6 +219,11 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>(
let external_id =
retrieved_primary_key.extract_fields_and_docid(doc, new_fields_ids_map, indexer)?;
if must_be_skipped(external_id.to_de(), shards) {
previous_offset = iter.byte_offset();
continue;
}
let external_id = external_id.to_de();
let current_offset = iter.byte_offset();
let document_offset = DocumentOffset { content: &payload[previous_offset..current_offset] };
@ -330,10 +341,15 @@ fn extract_deletion_payload_changes<'s, 'pl: 's>(
available_docids: &mut AvailableIds,
main_docids_version_offsets: &hashbrown::HashMap<&'s str, PayloadOperations<'pl>>,
to_delete: &'pl [&'pl str],
shards: &[&str],
) -> Result<hashbrown::HashMap<&'s str, PayloadOperations<'pl>>> {
let mut new_docids_version_offsets = hashbrown::HashMap::<&str, PayloadOperations<'pl>>::new();
for external_id in to_delete {
if must_be_skipped(external_id, shards) {
continue;
}
match main_docids_version_offsets.get(external_id) {
None => {
match index.external_documents_ids().get(rtxn, external_id) {
@ -612,3 +628,25 @@ pub fn first_update_pointer(docops: &[InnerDocOp]) -> Option<usize> {
InnerDocOp::Deletion => None,
})
}
fn must_be_skipped(pk: &str, shards: &[&str]) -> bool {
// Special case for no shard, it means we must index the document
if shards.is_empty() {
return false;
}
// If there is multiple shards, the fisrt shard is ourselves
let mut hasher = Blake2b512::new();
hasher.update(shards[0].as_bytes());
hasher.update(pk.as_bytes());
let me = hasher.finalize();
shards.iter().skip(1).any(|shard| {
let mut hasher = Blake2b512::new();
hasher.update(shard.as_bytes());
hasher.update(pk.as_bytes());
let them = hasher.finalize();
me < them
})
}

View File

@ -59,6 +59,7 @@ fn test_facet_distribution_with_no_facet_values() {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
&[],
)
.unwrap();

View File

@ -95,6 +95,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
&[],
)
.unwrap();

View File

@ -329,6 +329,7 @@ fn criteria_ascdesc() {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
&[],
)
.unwrap();

View File

@ -138,6 +138,7 @@ fn test_typo_disabled_on_word() {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
&[],
)
.unwrap();