Shard documents

This commit is contained in:
Louis Dureuil
2025-07-29 14:41:52 +02:00
parent 9929f798d3
commit 069d25dce6
2 changed files with 27 additions and 4 deletions

View File

@ -66,6 +66,11 @@ impl IndexScheduler {
}
IndexOperation::DocumentOperation { index_uid, primary_key, operations, mut tasks } => {
progress.update_progress(DocumentOperationProgress::RetrievingConfig);
let network = self.network();
let shards = network.shards();
// TODO: at some point, for better efficiency we might want to reuse the bumpalo for successive batches.
// this is made difficult by the fact we're doing private clones of the index scheduler and sending it
// to a fresh thread.
@ -130,6 +135,7 @@ impl IndexScheduler {
&mut new_fields_ids_map,
&|| must_stop_processing.get(),
progress.clone(),
shards.as_ref(),
)
.map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?;

View File

@ -17,6 +17,7 @@ use super::guess_primary_key::retrieve_or_guess_primary_key;
use crate::documents::PrimaryKey;
use crate::progress::{AtomicPayloadStep, Progress};
use crate::update::new::document::{DocumentContext, Versions};
use crate::update::new::indexer::sharding::Shards;
use crate::update::new::steps::IndexingStep;
use crate::update::new::thread_local::MostlySend;
use crate::update::new::{DocumentIdentifiers, Insertion, Update};
@ -71,6 +72,7 @@ impl<'pl> DocumentOperation<'pl> {
new_fields_ids_map: &mut FieldsIdsMap,
must_stop_processing: &MSP,
progress: Progress,
shards: Option<&Shards>,
) -> Result<(DocumentOperationChanges<'pl>, Vec<PayloadStats>, Option<PrimaryKey<'pl>>)>
where
MSP: Fn() -> bool,
@ -107,6 +109,7 @@ impl<'pl> DocumentOperation<'pl> {
&mut bytes,
&docids_version_offsets,
IndexDocumentsMethod::ReplaceDocuments,
shards,
payload,
),
Payload::Update(payload) => extract_addition_payload_changes(
@ -120,6 +123,7 @@ impl<'pl> DocumentOperation<'pl> {
&mut bytes,
&docids_version_offsets,
IndexDocumentsMethod::UpdateDocuments,
shards,
payload,
),
Payload::Deletion(to_delete) => extract_deletion_payload_changes(
@ -127,6 +131,7 @@ impl<'pl> DocumentOperation<'pl> {
rtxn,
&mut available_docids,
&docids_version_offsets,
shards,
to_delete,
),
};
@ -173,6 +178,7 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>(
bytes: &mut u64,
main_docids_version_offsets: &hashbrown::HashMap<&'pl str, PayloadOperations<'pl>>,
method: IndexDocumentsMethod,
shards: Option<&Shards>,
payload: &'pl [u8],
) -> Result<hashbrown::HashMap<&'pl str, PayloadOperations<'pl>>> {
use IndexDocumentsMethod::{ReplaceDocuments, UpdateDocuments};
@ -210,12 +216,20 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>(
primary_key.as_ref().unwrap()
};
let current_offset = iter.byte_offset();
let content = &payload[previous_offset..current_offset];
previous_offset = current_offset;
let external_id =
retrieved_primary_key.extract_fields_and_docid(doc, new_fields_ids_map, indexer)?;
let external_id = external_id.to_de();
let current_offset = iter.byte_offset();
let document_offset = DocumentOffset { content: &payload[previous_offset..current_offset] };
if shards.is_some_and(|shards| !shards.must_process(external_id)) {
continue;
}
let document_offset = DocumentOffset { content };
match main_docids_version_offsets.get(external_id) {
None => {
@ -299,8 +313,6 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>(
},
},
}
previous_offset = iter.byte_offset();
}
if payload.is_empty() {
@ -329,11 +341,16 @@ fn extract_deletion_payload_changes<'s, 'pl: 's>(
rtxn: &RoTxn,
available_docids: &mut AvailableIds,
main_docids_version_offsets: &hashbrown::HashMap<&'s str, PayloadOperations<'pl>>,
shards: Option<&Shards>,
to_delete: &'pl [&'pl str],
) -> Result<hashbrown::HashMap<&'s str, PayloadOperations<'pl>>> {
let mut new_docids_version_offsets = hashbrown::HashMap::<&str, PayloadOperations<'pl>>::new();
for external_id in to_delete {
if shards.is_some_and(|shards| !shards.must_process(external_id)) {
continue;
}
match main_docids_version_offsets.get(external_id) {
None => {
match index.external_documents_ids().get(rtxn, external_id) {