From 069d25dce636e42581aa1408b3b2222ff300ee75 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Tue, 29 Jul 2025 14:41:52 +0200 Subject: [PATCH] Shard documents --- .../src/scheduler/process_index_operation.rs | 6 +++++ .../update/new/indexer/document_operation.rs | 25 ++++++++++++++++--- 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/crates/index-scheduler/src/scheduler/process_index_operation.rs b/crates/index-scheduler/src/scheduler/process_index_operation.rs index 62d0e6545..56e2d4693 100644 --- a/crates/index-scheduler/src/scheduler/process_index_operation.rs +++ b/crates/index-scheduler/src/scheduler/process_index_operation.rs @@ -66,6 +66,11 @@ impl IndexScheduler { } IndexOperation::DocumentOperation { index_uid, primary_key, operations, mut tasks } => { progress.update_progress(DocumentOperationProgress::RetrievingConfig); + + let network = self.network(); + + let shards = network.shards(); + // TODO: at some point, for better efficiency we might want to reuse the bumpalo for successive batches. // this is made difficult by the fact we're doing private clones of the index scheduler and sending it // to a fresh thread. @@ -130,6 +135,7 @@ impl IndexScheduler { &mut new_fields_ids_map, &|| must_stop_processing.get(), progress.clone(), + shards.as_ref(), ) .map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?; diff --git a/crates/milli/src/update/new/indexer/document_operation.rs b/crates/milli/src/update/new/indexer/document_operation.rs index 98faaf145..50364c07c 100644 --- a/crates/milli/src/update/new/indexer/document_operation.rs +++ b/crates/milli/src/update/new/indexer/document_operation.rs @@ -17,6 +17,7 @@ use super::guess_primary_key::retrieve_or_guess_primary_key; use crate::documents::PrimaryKey; use crate::progress::{AtomicPayloadStep, Progress}; use crate::update::new::document::{DocumentContext, Versions}; +use crate::update::new::indexer::sharding::Shards; use crate::update::new::steps::IndexingStep; use crate::update::new::thread_local::MostlySend; use crate::update::new::{DocumentIdentifiers, Insertion, Update}; @@ -71,6 +72,7 @@ impl<'pl> DocumentOperation<'pl> { new_fields_ids_map: &mut FieldsIdsMap, must_stop_processing: &MSP, progress: Progress, + shards: Option<&Shards>, ) -> Result<(DocumentOperationChanges<'pl>, Vec, Option>)> where MSP: Fn() -> bool, @@ -107,6 +109,7 @@ impl<'pl> DocumentOperation<'pl> { &mut bytes, &docids_version_offsets, IndexDocumentsMethod::ReplaceDocuments, + shards, payload, ), Payload::Update(payload) => extract_addition_payload_changes( @@ -120,6 +123,7 @@ impl<'pl> DocumentOperation<'pl> { &mut bytes, &docids_version_offsets, IndexDocumentsMethod::UpdateDocuments, + shards, payload, ), Payload::Deletion(to_delete) => extract_deletion_payload_changes( @@ -127,6 +131,7 @@ impl<'pl> DocumentOperation<'pl> { rtxn, &mut available_docids, &docids_version_offsets, + shards, to_delete, ), }; @@ -173,6 +178,7 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>( bytes: &mut u64, main_docids_version_offsets: &hashbrown::HashMap<&'pl str, PayloadOperations<'pl>>, method: IndexDocumentsMethod, + shards: Option<&Shards>, payload: &'pl [u8], ) -> Result>> { use IndexDocumentsMethod::{ReplaceDocuments, UpdateDocuments}; @@ -210,12 +216,20 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>( primary_key.as_ref().unwrap() }; + let current_offset = iter.byte_offset(); + let content = &payload[previous_offset..current_offset]; + previous_offset = current_offset; + let external_id = retrieved_primary_key.extract_fields_and_docid(doc, new_fields_ids_map, indexer)?; let external_id = external_id.to_de(); - let current_offset = iter.byte_offset(); - let document_offset = DocumentOffset { content: &payload[previous_offset..current_offset] }; + + if shards.is_some_and(|shards| !shards.must_process(external_id)) { + continue; + } + + let document_offset = DocumentOffset { content }; match main_docids_version_offsets.get(external_id) { None => { @@ -299,8 +313,6 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>( }, }, } - - previous_offset = iter.byte_offset(); } if payload.is_empty() { @@ -329,11 +341,16 @@ fn extract_deletion_payload_changes<'s, 'pl: 's>( rtxn: &RoTxn, available_docids: &mut AvailableIds, main_docids_version_offsets: &hashbrown::HashMap<&'s str, PayloadOperations<'pl>>, + shards: Option<&Shards>, to_delete: &'pl [&'pl str], ) -> Result>> { let mut new_docids_version_offsets = hashbrown::HashMap::<&str, PayloadOperations<'pl>>::new(); for external_id in to_delete { + if shards.is_some_and(|shards| !shards.must_process(external_id)) { + continue; + } + match main_docids_version_offsets.get(external_id) { None => { match index.external_documents_ids().get(rtxn, external_id) {