From 0b40892eca253a6bedaba312d38d446a7fc5f294 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 19 Aug 2025 11:49:30 +0200 Subject: [PATCH] WIP: Ooops, I did some unsafe --- .../update/new/indexer/document_operation.rs | 58 +++++++++++++++---- .../update/new/indexer/guess_primary_key.rs | 6 +- 2 files changed, 49 insertions(+), 15 deletions(-) diff --git a/crates/milli/src/update/new/indexer/document_operation.rs b/crates/milli/src/update/new/indexer/document_operation.rs index aa393e49e..1b5253004 100644 --- a/crates/milli/src/update/new/indexer/document_operation.rs +++ b/crates/milli/src/update/new/indexer/document_operation.rs @@ -1,11 +1,12 @@ +use std::cell::RefCell; use std::sync::atomic::Ordering; -use std::sync::OnceLock; +use std::sync::{OnceLock, RwLock}; use bumpalo::collections::CollectIn; use bumpalo::Bump; use bumparaw_collections::RawMap; use hashbrown::hash_map::Entry; -use heed::RoTxn; +use heed::{RoTxn, WithoutTls}; use memmap2::Mmap; use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; use rayon::slice::ParallelSlice; @@ -23,7 +24,10 @@ use crate::update::new::steps::IndexingStep; use crate::update::new::thread_local::MostlySend; use crate::update::new::{DocumentIdentifiers, Insertion, Update}; use crate::update::{ConcurrentAvailableIds, IndexDocumentsMethod}; -use crate::{DocumentId, Error, FieldsIdsMap, Index, InternalError, Result, UserError}; +use crate::{ + DocumentId, Error, FieldIdMapWithMetadata, FieldsIdsMap, GlobalFieldsIdsMap, Index, + InternalError, MetadataBuilder, Result, UserError, +}; #[derive(Default)] pub struct DocumentOperation<'pl> { @@ -64,13 +68,13 @@ impl<'pl> DocumentOperation<'pl> { #[allow(clippy::too_many_arguments)] #[tracing::instrument(level = "trace", skip_all, target = "indexing::document_operation")] - pub fn into_changes( + pub fn into_changes<'e: 'pl, MSP>( self, indexer: &'pl Bump, - index: &Index, - rtxn: &'pl RoTxn<'pl>, + index: &'e Index, + rtxn: &'pl RoTxn<'pl, WithoutTls>, primary_key_from_op: Option<&'pl str>, - new_fields_ids_map: &mut FieldsIdsMap, + new_fid_map: &mut FieldsIdsMap, must_stop_processing: &MSP, progress: Progress, ) -> Result<(DocumentOperationChanges<'pl>, Vec, Option>)> @@ -80,6 +84,11 @@ impl<'pl> DocumentOperation<'pl> { progress.update_progress(IndexingStep::PreparingPayloads); let Self { operations } = self; + let metadata_builder = MetadataBuilder::from_index(index, rtxn)?; + let fid_map_with_meta = FieldIdMapWithMetadata::new(new_fid_map.clone(), metadata_builder); + let global = RwLock::new(fid_map_with_meta); + let gfid_map = GlobalFieldsIdsMap::new(&global); + let documents_ids = index.documents_ids(rtxn)?; let available_docids = ConcurrentAvailableIds::new(documents_ids); let mut docids_version_offsets = hashbrown::HashMap::new(); @@ -89,6 +98,7 @@ impl<'pl> DocumentOperation<'pl> { let (step, progress_step) = AtomicPayloadStep::new(payload_count as u32); progress.update_progress(progress_step); + let long_txn_id = rtxn.id(); let operations_stats = operations .into_par_iter() .enumerate() @@ -98,15 +108,21 @@ impl<'pl> DocumentOperation<'pl> { } step.fetch_add(1, Ordering::Relaxed); + let short = index.read_txn()?; + // SAFETY: The long_txn_id comes from the main rtxn and the long lifetime is the one of the long rtxn. + let rtxn: &'pl RoTxn<'e, _> = unsafe { extend_rtxn_lifetime(long_txn_id, &short) }; + let indexer = bumpalo::Bump::new(); + let mut gfid_map = gfid_map.clone(); + let mut bytes = 0; let result = match operation { Payload::Replace(payload) => extract_addition_payload_changes( - indexer, + &indexer, index, &rtxn, primary_key_from_op, &primary_key, - new_fields_ids_map, + &mut gfid_map, &available_docids, &mut bytes, &docids_version_offsets, @@ -114,12 +130,12 @@ impl<'pl> DocumentOperation<'pl> { payload, ), Payload::Update(payload) => extract_addition_payload_changes( - indexer, + &indexer, index, &rtxn, primary_key_from_op, &primary_key, - new_fields_ids_map, + &mut gfid_map, &available_docids, &mut bytes, &docids_version_offsets, @@ -179,7 +195,7 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>( rtxn: &'r RoTxn<'r>, primary_key_from_op: Option<&'r str>, primary_key: &OnceLock>, - new_fields_ids_map: &mut FieldsIdsMap, + new_fields_ids_map: &mut GlobalFieldsIdsMap, available_docids: &ConcurrentAvailableIds, bytes: &mut u64, main_docids_version_offsets: &hashbrown::HashMap<&'pl str, PayloadOperations<'pl>>, @@ -623,3 +639,21 @@ pub fn first_update_pointer(docops: &[InnerDocOp]) -> Option { InnerDocOp::Deletion => None, }) } + +/// Extends the lifetime of a read-only transaction. +/// The `long_txn_id` transaction ID must come from a call to the `RoTxn::id` method. +/// +/// ## Panics +/// +/// Panics if the long transaction ID does not match the transaction ID of the short transaction. +pub unsafe fn extend_rtxn_lifetime<'e, 'long, 'short>( + long_txn_id: usize, + short: &'short RoTxn<'e, WithoutTls>, +) -> &'long RoTxn<'e, WithoutTls> { + assert_eq!( + long_txn_id, + short.id(), + "Lifetimes can only be extended if they have the same transaction ID" + ); + unsafe { std::mem::transmute(short) } +} diff --git a/crates/milli/src/update/new/indexer/guess_primary_key.rs b/crates/milli/src/update/new/indexer/guess_primary_key.rs index f0eb82b8d..b641d20ca 100644 --- a/crates/milli/src/update/new/indexer/guess_primary_key.rs +++ b/crates/milli/src/update/new/indexer/guess_primary_key.rs @@ -4,7 +4,7 @@ use rustc_hash::FxBuildHasher; use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY}; use crate::update::new::StdResult; -use crate::{FieldsIdsMap, Index, Result, UserError}; +use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError}; /// Returns the primary key that has already been set for this index or the /// one we will guess by searching for the first key that contains "id" as a substring, @@ -12,7 +12,7 @@ use crate::{FieldsIdsMap, Index, Result, UserError}; pub fn retrieve_or_guess_primary_key<'a>( rtxn: &'a RoTxn<'a>, index: &Index, - new_fields_ids_map: &mut FieldsIdsMap, + new_fields_ids_map: &mut GlobalFieldsIdsMap, // Use a &mut Mapper: MutFieldIdMapper primary_key_from_op: Option<&'a str>, first_document: Option>, ) -> Result, bool), UserError>> { @@ -47,7 +47,7 @@ pub fn retrieve_or_guess_primary_key<'a>( let guesses: Result> = first_document .keys() .filter_map(|name| { - let Some(_) = new_fields_ids_map.insert(name) else { + let Some(_) = new_fields_ids_map.id_or_insert(name) else { return Some(Err(UserError::AttributeLimitReached.into())); }; name.to_lowercase().ends_with(DEFAULT_PRIMARY_KEY).then_some(Ok(name))