From 7ddd8a2b667285fc55b3e841e148ac0893a1aa17 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Thu, 14 Aug 2025 17:34:52 +0200 Subject: [PATCH] WIP: I need multiple rtxns and will probably use a bit of unsafe --- .../update/new/indexer/document_operation.rs | 33 ++++++++++--------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/crates/milli/src/update/new/indexer/document_operation.rs b/crates/milli/src/update/new/indexer/document_operation.rs index 99a52a510..aa393e49e 100644 --- a/crates/milli/src/update/new/indexer/document_operation.rs +++ b/crates/milli/src/update/new/indexer/document_operation.rs @@ -1,4 +1,5 @@ use std::sync::atomic::Ordering; +use std::sync::OnceLock; use bumpalo::collections::CollectIn; use bumpalo::Bump; @@ -21,7 +22,7 @@ use crate::update::new::document::{DocumentContext, Versions}; use crate::update::new::steps::IndexingStep; use crate::update::new::thread_local::MostlySend; use crate::update::new::{DocumentIdentifiers, Insertion, Update}; -use crate::update::{AvailableIds, ConcurrentAvailableIds, IndexDocumentsMethod}; +use crate::update::{ConcurrentAvailableIds, IndexDocumentsMethod}; use crate::{DocumentId, Error, FieldsIdsMap, Index, InternalError, Result, UserError}; #[derive(Default)] @@ -82,7 +83,7 @@ impl<'pl> DocumentOperation<'pl> { let documents_ids = index.documents_ids(rtxn)?; let available_docids = ConcurrentAvailableIds::new(documents_ids); let mut docids_version_offsets = hashbrown::HashMap::new(); - let mut primary_key = None; + let primary_key = OnceLock::new(); let payload_count = operations.len(); let (step, progress_step) = AtomicPayloadStep::new(payload_count as u32); @@ -97,18 +98,14 @@ impl<'pl> DocumentOperation<'pl> { } step.fetch_add(1, Ordering::Relaxed); - // TODO do not create too many rtxn and rather keep a pool - let rtxn = index.read_txn()?; - let bump = Bump::new(); - let mut bytes = 0; let result = match operation { Payload::Replace(payload) => extract_addition_payload_changes( - &bump, + indexer, index, &rtxn, primary_key_from_op, - &mut primary_key, + &primary_key, new_fields_ids_map, &available_docids, &mut bytes, @@ -117,11 +114,11 @@ impl<'pl> DocumentOperation<'pl> { payload, ), Payload::Update(payload) => extract_addition_payload_changes( - &bump, + indexer, index, &rtxn, primary_key_from_op, - &mut primary_key, + &primary_key, new_fields_ids_map, &available_docids, &mut bytes, @@ -167,17 +164,21 @@ impl<'pl> DocumentOperation<'pl> { .sort_unstable_by_key(|(_, po)| first_update_pointer(&po.operations).unwrap_or(0)); let docids_version_offsets = docids_version_offsets.into_bump_slice(); - Ok((DocumentOperationChanges { docids_version_offsets }, operations_stats, primary_key)) + Ok(( + DocumentOperationChanges { docids_version_offsets }, + operations_stats, + primary_key.into_inner(), + )) } } #[allow(clippy::too_many_arguments)] fn extract_addition_payload_changes<'r, 'pl: 'r>( - indexer: &'pl Bump, + indexer: &Bump, index: &Index, rtxn: &'r RoTxn<'r>, primary_key_from_op: Option<&'r str>, - primary_key: &mut Option>, + primary_key: &OnceLock>, new_fields_ids_map: &mut FieldsIdsMap, available_docids: &ConcurrentAvailableIds, bytes: &mut u64, @@ -214,10 +215,10 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>( Err(error) => return Err(error), }; - primary_key.get_or_insert(pk) + primary_key.get_or_init(|| pk) } else { // primary key was retrieved in the first iteration or in a previous payload - primary_key.as_ref().unwrap() + primary_key.get().unwrap() }; let external_id = @@ -323,7 +324,7 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>( ); match result { Ok(Ok((pk, _))) => { - primary_key.get_or_insert(pk); + primary_key.get_or_init(|| pk); } Ok(Err(UserError::NoPrimaryKeyCandidateFound)) => (), Ok(Err(user_error)) => return Err(Error::UserError(user_error)),