From 2addedf98a48b0adc96e4b23e74e501be5104b7b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Thu, 14 Aug 2025 15:10:45 +0200 Subject: [PATCH] WIP: parallelizing --- Cargo.lock | 28 ++++ crates/milli/Cargo.toml | 2 + .../update/new/indexer/document_operation.rs | 138 ++++++++++-------- 3 files changed, 104 insertions(+), 64 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 04a3cdb76..6573e77be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -736,6 +736,12 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "boxcar" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36f64beae40a84da1b4b26ff2761a5b895c12adc41dc25aaee1c4f2bbfe97a6e" + [[package]] name = "brotli" version = "8.0.1" @@ -3928,6 +3934,7 @@ dependencies = [ "big_s", "bimap", "bincode", + "boxcar", "bstr", "bumpalo", "bumparaw-collections", @@ -3970,6 +3977,7 @@ dependencies = [ "obkv", "once_cell", "ordered-float 5.0.0", + "papaya", "rand 0.8.5", "rayon", "rhai", @@ -4411,6 +4419,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "papaya" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f92dd0b07c53a0a0c764db2ace8c541dc47320dad97c2200c2a637ab9dd2328f" +dependencies = [ + "equivalent", + "seize", +] + [[package]] name = "parking_lot" version = "0.12.4" @@ -5462,6 +5480,16 @@ dependencies = [ "time", ] +[[package]] +name = "seize" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4b8d813387d566f627f3ea1b914c068aac94c40ae27ec43f5f33bde65abefe7" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "semver" version = "1.0.26" diff --git a/crates/milli/Cargo.toml b/crates/milli/Cargo.toml index d94a4d4e1..130d16f19 100644 --- a/crates/milli/Cargo.toml +++ b/crates/milli/Cargo.toml @@ -109,6 +109,8 @@ utoipa = { version = "5.4.0", features = [ "openapi_extensions", ] } lru = "0.14.0" +boxcar = "0.2.14" +papaya = "0.2.3" [dev-dependencies] mimalloc = { version = "0.1.47", default-features = false } diff --git a/crates/milli/src/update/new/indexer/document_operation.rs b/crates/milli/src/update/new/indexer/document_operation.rs index 98faaf145..99a52a510 100644 --- a/crates/milli/src/update/new/indexer/document_operation.rs +++ b/crates/milli/src/update/new/indexer/document_operation.rs @@ -6,6 +6,7 @@ use bumparaw_collections::RawMap; use hashbrown::hash_map::Entry; use heed::RoTxn; use memmap2::Mmap; +use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; use rayon::slice::ParallelSlice; use rustc_hash::FxBuildHasher; use serde_json::value::RawValue; @@ -20,7 +21,7 @@ use crate::update::new::document::{DocumentContext, Versions}; use crate::update::new::steps::IndexingStep; use crate::update::new::thread_local::MostlySend; use crate::update::new::{DocumentIdentifiers, Insertion, Update}; -use crate::update::{AvailableIds, IndexDocumentsMethod}; +use crate::update::{AvailableIds, ConcurrentAvailableIds, IndexDocumentsMethod}; use crate::{DocumentId, Error, FieldsIdsMap, Index, InternalError, Result, UserError}; #[derive(Default)] @@ -73,14 +74,13 @@ impl<'pl> DocumentOperation<'pl> { progress: Progress, ) -> Result<(DocumentOperationChanges<'pl>, Vec, Option>)> where - MSP: Fn() -> bool, + MSP: Fn() -> bool + Sync, { progress.update_progress(IndexingStep::PreparingPayloads); let Self { operations } = self; let documents_ids = index.documents_ids(rtxn)?; - let mut operations_stats = Vec::new(); - let mut available_docids = AvailableIds::new(&documents_ids); + let available_docids = ConcurrentAvailableIds::new(documents_ids); let mut docids_version_offsets = hashbrown::HashMap::new(); let mut primary_key = None; @@ -88,64 +88,74 @@ impl<'pl> DocumentOperation<'pl> { let (step, progress_step) = AtomicPayloadStep::new(payload_count as u32); progress.update_progress(progress_step); - for (payload_index, operation) in operations.into_iter().enumerate() { - if must_stop_processing() { - return Err(InternalError::AbortedIndexation.into()); - } - step.store(payload_index as u32, Ordering::Relaxed); - - let mut bytes = 0; - let result = match operation { - Payload::Replace(payload) => extract_addition_payload_changes( - indexer, - index, - rtxn, - primary_key_from_op, - &mut primary_key, - new_fields_ids_map, - &mut available_docids, - &mut bytes, - &docids_version_offsets, - IndexDocumentsMethod::ReplaceDocuments, - payload, - ), - Payload::Update(payload) => extract_addition_payload_changes( - indexer, - index, - rtxn, - primary_key_from_op, - &mut primary_key, - new_fields_ids_map, - &mut available_docids, - &mut bytes, - &docids_version_offsets, - IndexDocumentsMethod::UpdateDocuments, - payload, - ), - Payload::Deletion(to_delete) => extract_deletion_payload_changes( - index, - rtxn, - &mut available_docids, - &docids_version_offsets, - to_delete, - ), - }; - - let mut document_count = 0; - let error = match result { - Ok(new_docids_version_offsets) => { - document_count = new_docids_version_offsets.len() as u64; - // If we don't have any error then we can merge the content of this payload - // into to main payload. Else we just drop this payload extraction. - merge_version_offsets(&mut docids_version_offsets, new_docids_version_offsets); - None + let operations_stats = operations + .into_par_iter() + .enumerate() + .map(|(payload_index, operation)| { + if must_stop_processing() { + return Err(InternalError::AbortedIndexation.into()); } - Err(Error::UserError(user_error)) => Some(user_error), - Err(e) => return Err(e), - }; - operations_stats.push(PayloadStats { document_count, bytes, error }); - } - step.store(payload_count as u32, Ordering::Relaxed); + step.fetch_add(1, Ordering::Relaxed); + + // TODO do not create too many rtxn and rather keep a pool + let rtxn = index.read_txn()?; + let bump = Bump::new(); + + let mut bytes = 0; + let result = match operation { + Payload::Replace(payload) => extract_addition_payload_changes( + &bump, + index, + &rtxn, + primary_key_from_op, + &mut primary_key, + new_fields_ids_map, + &available_docids, + &mut bytes, + &docids_version_offsets, + IndexDocumentsMethod::ReplaceDocuments, + payload, + ), + Payload::Update(payload) => extract_addition_payload_changes( + &bump, + index, + &rtxn, + primary_key_from_op, + &mut primary_key, + new_fields_ids_map, + &available_docids, + &mut bytes, + &docids_version_offsets, + IndexDocumentsMethod::UpdateDocuments, + payload, + ), + Payload::Deletion(to_delete) => extract_deletion_payload_changes( + index, + &rtxn, + &available_docids, + &docids_version_offsets, + to_delete, + ), + }; + + match result { + Ok(new_docids_version_offsets) => { + let document_count = new_docids_version_offsets.len() as u64; + // If we don't have any error then we can merge the content of this payload + // into to main payload. Else we just drop this payload extraction. + merge_version_offsets( + &mut docids_version_offsets, + new_docids_version_offsets, + ); + Ok(PayloadStats { document_count, bytes, error: None }) + } + Err(Error::UserError(user_error)) => { + Ok(PayloadStats { document_count: 0, bytes, error: Some(user_error) }) + } + Err(e) => Err(e), + } + }) + .collect::>>()?; // TODO We must drain the HashMap into a Vec because rayon::hash_map::IntoIter: !Clone let mut docids_version_offsets: bumpalo::collections::vec::Vec<_> = @@ -169,7 +179,7 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>( primary_key_from_op: Option<&'r str>, primary_key: &mut Option>, new_fields_ids_map: &mut FieldsIdsMap, - available_docids: &mut AvailableIds, + available_docids: &ConcurrentAvailableIds, bytes: &mut u64, main_docids_version_offsets: &hashbrown::HashMap<&'pl str, PayloadOperations<'pl>>, method: IndexDocumentsMethod, @@ -318,7 +328,7 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>( Ok(Err(UserError::NoPrimaryKeyCandidateFound)) => (), Ok(Err(user_error)) => return Err(Error::UserError(user_error)), Err(error) => return Err(error), - }; + } } Ok(new_docids_version_offsets) @@ -327,7 +337,7 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>( fn extract_deletion_payload_changes<'s, 'pl: 's>( index: &Index, rtxn: &RoTxn, - available_docids: &mut AvailableIds, + available_docids: &ConcurrentAvailableIds, main_docids_version_offsets: &hashbrown::HashMap<&'s str, PayloadOperations<'pl>>, to_delete: &'pl [&'pl str], ) -> Result>> {