WIP: Ooops, I did some unsafe

This commit is contained in:
Clément Renault
2025-08-19 11:49:30 +02:00
parent 7ddd8a2b66
commit 0b40892eca
2 changed files with 49 additions and 15 deletions

View File

@ -1,11 +1,12 @@
use std::cell::RefCell;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::OnceLock; use std::sync::{OnceLock, RwLock};
use bumpalo::collections::CollectIn; use bumpalo::collections::CollectIn;
use bumpalo::Bump; use bumpalo::Bump;
use bumparaw_collections::RawMap; use bumparaw_collections::RawMap;
use hashbrown::hash_map::Entry; use hashbrown::hash_map::Entry;
use heed::RoTxn; use heed::{RoTxn, WithoutTls};
use memmap2::Mmap; use memmap2::Mmap;
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
use rayon::slice::ParallelSlice; use rayon::slice::ParallelSlice;
@ -23,7 +24,10 @@ use crate::update::new::steps::IndexingStep;
use crate::update::new::thread_local::MostlySend; use crate::update::new::thread_local::MostlySend;
use crate::update::new::{DocumentIdentifiers, Insertion, Update}; use crate::update::new::{DocumentIdentifiers, Insertion, Update};
use crate::update::{ConcurrentAvailableIds, IndexDocumentsMethod}; use crate::update::{ConcurrentAvailableIds, IndexDocumentsMethod};
use crate::{DocumentId, Error, FieldsIdsMap, Index, InternalError, Result, UserError}; use crate::{
DocumentId, Error, FieldIdMapWithMetadata, FieldsIdsMap, GlobalFieldsIdsMap, Index,
InternalError, MetadataBuilder, Result, UserError,
};
#[derive(Default)] #[derive(Default)]
pub struct DocumentOperation<'pl> { pub struct DocumentOperation<'pl> {
@ -64,13 +68,13 @@ impl<'pl> DocumentOperation<'pl> {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
#[tracing::instrument(level = "trace", skip_all, target = "indexing::document_operation")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::document_operation")]
pub fn into_changes<MSP>( pub fn into_changes<'e: 'pl, MSP>(
self, self,
indexer: &'pl Bump, indexer: &'pl Bump,
index: &Index, index: &'e Index,
rtxn: &'pl RoTxn<'pl>, rtxn: &'pl RoTxn<'pl, WithoutTls>,
primary_key_from_op: Option<&'pl str>, primary_key_from_op: Option<&'pl str>,
new_fields_ids_map: &mut FieldsIdsMap, new_fid_map: &mut FieldsIdsMap,
must_stop_processing: &MSP, must_stop_processing: &MSP,
progress: Progress, progress: Progress,
) -> Result<(DocumentOperationChanges<'pl>, Vec<PayloadStats>, Option<PrimaryKey<'pl>>)> ) -> Result<(DocumentOperationChanges<'pl>, Vec<PayloadStats>, Option<PrimaryKey<'pl>>)>
@ -80,6 +84,11 @@ impl<'pl> DocumentOperation<'pl> {
progress.update_progress(IndexingStep::PreparingPayloads); progress.update_progress(IndexingStep::PreparingPayloads);
let Self { operations } = self; let Self { operations } = self;
let metadata_builder = MetadataBuilder::from_index(index, rtxn)?;
let fid_map_with_meta = FieldIdMapWithMetadata::new(new_fid_map.clone(), metadata_builder);
let global = RwLock::new(fid_map_with_meta);
let gfid_map = GlobalFieldsIdsMap::new(&global);
let documents_ids = index.documents_ids(rtxn)?; let documents_ids = index.documents_ids(rtxn)?;
let available_docids = ConcurrentAvailableIds::new(documents_ids); let available_docids = ConcurrentAvailableIds::new(documents_ids);
let mut docids_version_offsets = hashbrown::HashMap::new(); let mut docids_version_offsets = hashbrown::HashMap::new();
@ -89,6 +98,7 @@ impl<'pl> DocumentOperation<'pl> {
let (step, progress_step) = AtomicPayloadStep::new(payload_count as u32); let (step, progress_step) = AtomicPayloadStep::new(payload_count as u32);
progress.update_progress(progress_step); progress.update_progress(progress_step);
let long_txn_id = rtxn.id();
let operations_stats = operations let operations_stats = operations
.into_par_iter() .into_par_iter()
.enumerate() .enumerate()
@ -98,15 +108,21 @@ impl<'pl> DocumentOperation<'pl> {
} }
step.fetch_add(1, Ordering::Relaxed); step.fetch_add(1, Ordering::Relaxed);
let short = index.read_txn()?;
// SAFETY: The long_txn_id comes from the main rtxn and the long lifetime is the one of the long rtxn.
let rtxn: &'pl RoTxn<'e, _> = unsafe { extend_rtxn_lifetime(long_txn_id, &short) };
let indexer = bumpalo::Bump::new();
let mut gfid_map = gfid_map.clone();
let mut bytes = 0; let mut bytes = 0;
let result = match operation { let result = match operation {
Payload::Replace(payload) => extract_addition_payload_changes( Payload::Replace(payload) => extract_addition_payload_changes(
indexer, &indexer,
index, index,
&rtxn, &rtxn,
primary_key_from_op, primary_key_from_op,
&primary_key, &primary_key,
new_fields_ids_map, &mut gfid_map,
&available_docids, &available_docids,
&mut bytes, &mut bytes,
&docids_version_offsets, &docids_version_offsets,
@ -114,12 +130,12 @@ impl<'pl> DocumentOperation<'pl> {
payload, payload,
), ),
Payload::Update(payload) => extract_addition_payload_changes( Payload::Update(payload) => extract_addition_payload_changes(
indexer, &indexer,
index, index,
&rtxn, &rtxn,
primary_key_from_op, primary_key_from_op,
&primary_key, &primary_key,
new_fields_ids_map, &mut gfid_map,
&available_docids, &available_docids,
&mut bytes, &mut bytes,
&docids_version_offsets, &docids_version_offsets,
@ -179,7 +195,7 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>(
rtxn: &'r RoTxn<'r>, rtxn: &'r RoTxn<'r>,
primary_key_from_op: Option<&'r str>, primary_key_from_op: Option<&'r str>,
primary_key: &OnceLock<PrimaryKey<'r>>, primary_key: &OnceLock<PrimaryKey<'r>>,
new_fields_ids_map: &mut FieldsIdsMap, new_fields_ids_map: &mut GlobalFieldsIdsMap,
available_docids: &ConcurrentAvailableIds, available_docids: &ConcurrentAvailableIds,
bytes: &mut u64, bytes: &mut u64,
main_docids_version_offsets: &hashbrown::HashMap<&'pl str, PayloadOperations<'pl>>, main_docids_version_offsets: &hashbrown::HashMap<&'pl str, PayloadOperations<'pl>>,
@ -623,3 +639,21 @@ pub fn first_update_pointer(docops: &[InnerDocOp]) -> Option<usize> {
InnerDocOp::Deletion => None, InnerDocOp::Deletion => None,
}) })
} }
/// Extends the lifetime of a read-only transaction.
/// The `long_txn_id` transaction ID must come from a call to the `RoTxn::id` method.
///
/// ## Panics
///
/// Panics if the long transaction ID does not match the transaction ID of the short transaction.
pub unsafe fn extend_rtxn_lifetime<'e, 'long, 'short>(
long_txn_id: usize,
short: &'short RoTxn<'e, WithoutTls>,
) -> &'long RoTxn<'e, WithoutTls> {
assert_eq!(
long_txn_id,
short.id(),
"Lifetimes can only be extended if they have the same transaction ID"
);
unsafe { std::mem::transmute(short) }
}

View File

@ -4,7 +4,7 @@ use rustc_hash::FxBuildHasher;
use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY}; use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY};
use crate::update::new::StdResult; use crate::update::new::StdResult;
use crate::{FieldsIdsMap, Index, Result, UserError}; use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError};
/// Returns the primary key that has already been set for this index or the /// Returns the primary key that has already been set for this index or the
/// one we will guess by searching for the first key that contains "id" as a substring, /// one we will guess by searching for the first key that contains "id" as a substring,
@ -12,7 +12,7 @@ use crate::{FieldsIdsMap, Index, Result, UserError};
pub fn retrieve_or_guess_primary_key<'a>( pub fn retrieve_or_guess_primary_key<'a>(
rtxn: &'a RoTxn<'a>, rtxn: &'a RoTxn<'a>,
index: &Index, index: &Index,
new_fields_ids_map: &mut FieldsIdsMap, new_fields_ids_map: &mut GlobalFieldsIdsMap, // Use a &mut Mapper: MutFieldIdMapper
primary_key_from_op: Option<&'a str>, primary_key_from_op: Option<&'a str>,
first_document: Option<RawMap<'a, FxBuildHasher>>, first_document: Option<RawMap<'a, FxBuildHasher>>,
) -> Result<StdResult<(PrimaryKey<'a>, bool), UserError>> { ) -> Result<StdResult<(PrimaryKey<'a>, bool), UserError>> {
@ -47,7 +47,7 @@ pub fn retrieve_or_guess_primary_key<'a>(
let guesses: Result<Vec<&str>> = first_document let guesses: Result<Vec<&str>> = first_document
.keys() .keys()
.filter_map(|name| { .filter_map(|name| {
let Some(_) = new_fields_ids_map.insert(name) else { let Some(_) = new_fields_ids_map.id_or_insert(name) else {
return Some(Err(UserError::AttributeLimitReached.into())); return Some(Err(UserError::AttributeLimitReached.into()));
}; };
name.to_lowercase().ends_with(DEFAULT_PRIMARY_KEY).then_some(Ok(name)) name.to_lowercase().ends_with(DEFAULT_PRIMARY_KEY).then_some(Ok(name))