WIP: I need multiple rtxns and will probably use a bit of unsafe

This commit is contained in:
Clément Renault
2025-08-14 17:34:52 +02:00
parent 2addedf98a
commit 7ddd8a2b66

View File

@ -1,4 +1,5 @@
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::OnceLock;
use bumpalo::collections::CollectIn; use bumpalo::collections::CollectIn;
use bumpalo::Bump; use bumpalo::Bump;
@ -21,7 +22,7 @@ use crate::update::new::document::{DocumentContext, Versions};
use crate::update::new::steps::IndexingStep; use crate::update::new::steps::IndexingStep;
use crate::update::new::thread_local::MostlySend; use crate::update::new::thread_local::MostlySend;
use crate::update::new::{DocumentIdentifiers, Insertion, Update}; use crate::update::new::{DocumentIdentifiers, Insertion, Update};
use crate::update::{AvailableIds, ConcurrentAvailableIds, IndexDocumentsMethod}; use crate::update::{ConcurrentAvailableIds, IndexDocumentsMethod};
use crate::{DocumentId, Error, FieldsIdsMap, Index, InternalError, Result, UserError}; use crate::{DocumentId, Error, FieldsIdsMap, Index, InternalError, Result, UserError};
#[derive(Default)] #[derive(Default)]
@ -82,7 +83,7 @@ impl<'pl> DocumentOperation<'pl> {
let documents_ids = index.documents_ids(rtxn)?; let documents_ids = index.documents_ids(rtxn)?;
let available_docids = ConcurrentAvailableIds::new(documents_ids); let available_docids = ConcurrentAvailableIds::new(documents_ids);
let mut docids_version_offsets = hashbrown::HashMap::new(); let mut docids_version_offsets = hashbrown::HashMap::new();
let mut primary_key = None; let primary_key = OnceLock::new();
let payload_count = operations.len(); let payload_count = operations.len();
let (step, progress_step) = AtomicPayloadStep::new(payload_count as u32); let (step, progress_step) = AtomicPayloadStep::new(payload_count as u32);
@ -97,18 +98,14 @@ impl<'pl> DocumentOperation<'pl> {
} }
step.fetch_add(1, Ordering::Relaxed); step.fetch_add(1, Ordering::Relaxed);
// TODO do not create too many rtxn and rather keep a pool
let rtxn = index.read_txn()?;
let bump = Bump::new();
let mut bytes = 0; let mut bytes = 0;
let result = match operation { let result = match operation {
Payload::Replace(payload) => extract_addition_payload_changes( Payload::Replace(payload) => extract_addition_payload_changes(
&bump, indexer,
index, index,
&rtxn, &rtxn,
primary_key_from_op, primary_key_from_op,
&mut primary_key, &primary_key,
new_fields_ids_map, new_fields_ids_map,
&available_docids, &available_docids,
&mut bytes, &mut bytes,
@ -117,11 +114,11 @@ impl<'pl> DocumentOperation<'pl> {
payload, payload,
), ),
Payload::Update(payload) => extract_addition_payload_changes( Payload::Update(payload) => extract_addition_payload_changes(
&bump, indexer,
index, index,
&rtxn, &rtxn,
primary_key_from_op, primary_key_from_op,
&mut primary_key, &primary_key,
new_fields_ids_map, new_fields_ids_map,
&available_docids, &available_docids,
&mut bytes, &mut bytes,
@ -167,17 +164,21 @@ impl<'pl> DocumentOperation<'pl> {
.sort_unstable_by_key(|(_, po)| first_update_pointer(&po.operations).unwrap_or(0)); .sort_unstable_by_key(|(_, po)| first_update_pointer(&po.operations).unwrap_or(0));
let docids_version_offsets = docids_version_offsets.into_bump_slice(); let docids_version_offsets = docids_version_offsets.into_bump_slice();
Ok((DocumentOperationChanges { docids_version_offsets }, operations_stats, primary_key)) Ok((
DocumentOperationChanges { docids_version_offsets },
operations_stats,
primary_key.into_inner(),
))
} }
} }
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
fn extract_addition_payload_changes<'r, 'pl: 'r>( fn extract_addition_payload_changes<'r, 'pl: 'r>(
indexer: &'pl Bump, indexer: &Bump,
index: &Index, index: &Index,
rtxn: &'r RoTxn<'r>, rtxn: &'r RoTxn<'r>,
primary_key_from_op: Option<&'r str>, primary_key_from_op: Option<&'r str>,
primary_key: &mut Option<PrimaryKey<'r>>, primary_key: &OnceLock<PrimaryKey<'r>>,
new_fields_ids_map: &mut FieldsIdsMap, new_fields_ids_map: &mut FieldsIdsMap,
available_docids: &ConcurrentAvailableIds, available_docids: &ConcurrentAvailableIds,
bytes: &mut u64, bytes: &mut u64,
@ -214,10 +215,10 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>(
Err(error) => return Err(error), Err(error) => return Err(error),
}; };
primary_key.get_or_insert(pk) primary_key.get_or_init(|| pk)
} else { } else {
// primary key was retrieved in the first iteration or in a previous payload // primary key was retrieved in the first iteration or in a previous payload
primary_key.as_ref().unwrap() primary_key.get().unwrap()
}; };
let external_id = let external_id =
@ -323,7 +324,7 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>(
); );
match result { match result {
Ok(Ok((pk, _))) => { Ok(Ok((pk, _))) => {
primary_key.get_or_insert(pk); primary_key.get_or_init(|| pk);
} }
Ok(Err(UserError::NoPrimaryKeyCandidateFound)) => (), Ok(Err(UserError::NoPrimaryKeyCandidateFound)) => (),
Ok(Err(user_error)) => return Err(Error::UserError(user_error)), Ok(Err(user_error)) => return Err(Error::UserError(user_error)),