diff --git a/Cargo.lock b/Cargo.lock index 293d17045..0bc879223 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3798,6 +3798,7 @@ dependencies = [ "obkv", "once_cell", "ordered-float", + "papaya", "rand", "rayon", "rayon-par-bridge", @@ -4189,6 +4190,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "papaya" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6827e3fc394523c21d4464d02c0bb1c19966ea4a58a9844ad6d746214179d2bc" +dependencies = [ + "equivalent", + "seize", +] + [[package]] name = "parking_lot" version = "0.12.3" @@ -5104,6 +5115,16 @@ dependencies = [ "time", ] +[[package]] +name = "seize" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4b8d813387d566f627f3ea1b914c068aac94c40ae27ec43f5f33bde65abefe7" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "semver" version = "1.0.18" diff --git a/crates/milli/Cargo.toml b/crates/milli/Cargo.toml index e3b9b077a..1b203e16f 100644 --- a/crates/milli/Cargo.toml +++ b/crates/milli/Cargo.toml @@ -111,6 +111,7 @@ utoipa = { version = "5.3.1", features = [ "openapi_extensions", ] } lru = "0.13.0" +papaya = "0.2.1" [dev-dependencies] mimalloc = { version = "0.1.43", default-features = false } diff --git a/crates/milli/src/update/new/indexer/document_operation.rs b/crates/milli/src/update/new/indexer/document_operation.rs index 96a64cabe..5b57a5c8b 100644 --- a/crates/milli/src/update/new/indexer/document_operation.rs +++ b/crates/milli/src/update/new/indexer/document_operation.rs @@ -6,6 +6,7 @@ use bumparaw_collections::RawMap; use hashbrown::hash_map::Entry; use heed::RoTxn; use memmap2::Mmap; +use rayon::iter::{IndexedParallelIterator as _, IntoParallelIterator as _, ParallelIterator as _}; use rayon::slice::ParallelSlice; use rustc_hash::FxBuildHasher; use serde_json::value::RawValue; @@ -88,64 +89,73 @@ impl<'pl> DocumentOperation<'pl> { let (step, progress_step) = AtomicPayloadStep::new(payload_count as u32); progress.update_progress(progress_step); - for (payload_index, operation) in operations.into_iter().enumerate() { - if must_stop_processing() { - return Err(InternalError::AbortedIndexation.into()); - } - step.store(payload_index as u32, Ordering::Relaxed); + let operations_stats = operations + .into_par_iter() + .enumerate() + .map(|(payload_index, operation)| { + // if must_stop_processing() { + // return Err(InternalError::AbortedIndexation.into()); + // } - let mut bytes = 0; - let result = match operation { - Payload::Replace(payload) => extract_addition_payload_changes( - indexer, - index, - rtxn, - primary_key_from_op, - &mut primary_key, - new_fields_ids_map, - &mut available_docids, - &mut bytes, - &docids_version_offsets, - IndexDocumentsMethod::ReplaceDocuments, - payload, - ), - Payload::Update(payload) => extract_addition_payload_changes( - indexer, - index, - rtxn, - primary_key_from_op, - &mut primary_key, - new_fields_ids_map, - &mut available_docids, - &mut bytes, - &docids_version_offsets, - IndexDocumentsMethod::UpdateDocuments, - payload, - ), - Payload::Deletion(to_delete) => extract_deletion_payload_changes( - index, - rtxn, - &mut available_docids, - &docids_version_offsets, - to_delete, - ), - }; + let mut bytes = 0; + let rtxn = index.read_txn().unwrap(); + step.fetch_add(1, Ordering::Relaxed); - let mut document_count = 0; - let error = match result { - Ok(new_docids_version_offsets) => { - document_count = new_docids_version_offsets.len() as u64; - // If we don't have any error then we can merge the content of this payload - // into to main payload. Else we just drop this payload extraction. - merge_version_offsets(&mut docids_version_offsets, new_docids_version_offsets); - None + let result = match operation { + Payload::Replace(payload) => extract_addition_payload_changes( + indexer, + index, + &rtxn, + primary_key_from_op, + &mut primary_key, + new_fields_ids_map, + &mut available_docids, + &mut bytes, + &docids_version_offsets, + IndexDocumentsMethod::ReplaceDocuments, + payload, + ), + Payload::Update(payload) => extract_addition_payload_changes( + indexer, + index, + &rtxn, + primary_key_from_op, + &mut primary_key, + new_fields_ids_map, + &mut available_docids, + &mut bytes, + &docids_version_offsets, + IndexDocumentsMethod::UpdateDocuments, + payload, + ), + Payload::Deletion(to_delete) => extract_deletion_payload_changes( + index, + &rtxn, + &mut available_docids, + &docids_version_offsets, + to_delete, + ), + }; + + let mut document_count = 0; + match result { + Ok(new_docids_version_offsets) => { + document_count = new_docids_version_offsets.len() as u64; + // If we don't have any error then we can merge the content of this payload + // into to main payload. Else we just drop this payload extraction. + merge_version_offsets( + &mut docids_version_offsets, + new_docids_version_offsets, + ); + Ok(PayloadStats { document_count, bytes, error: None }) + } + Err(Error::UserError(user_error)) => { + Ok(PayloadStats { document_count, bytes, error: Some(user_error) }) + } + Err(e) => Err(e), } - Err(Error::UserError(user_error)) => Some(user_error), - Err(e) => return Err(e), - }; - operations_stats.push(PayloadStats { document_count, bytes, error }); - } - step.store(payload_count as u32, Ordering::Relaxed); + }) + .collect::>>()?; // TODO We must drain the HashMap into a Vec because rayon::hash_map::IntoIter: !Clone let mut docids_version_offsets: bumpalo::collections::vec::Vec<_> =