WIP: parallelizing

This commit is contained in:
Clément Renault
2025-08-14 15:10:45 +02:00
parent a608e57c3c
commit 2addedf98a
3 changed files with 104 additions and 64 deletions

28
Cargo.lock generated
View File

@ -736,6 +736,12 @@ dependencies = [
"syn 2.0.101", "syn 2.0.101",
] ]
[[package]]
name = "boxcar"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "36f64beae40a84da1b4b26ff2761a5b895c12adc41dc25aaee1c4f2bbfe97a6e"
[[package]] [[package]]
name = "brotli" name = "brotli"
version = "8.0.1" version = "8.0.1"
@ -3928,6 +3934,7 @@ dependencies = [
"big_s", "big_s",
"bimap", "bimap",
"bincode", "bincode",
"boxcar",
"bstr", "bstr",
"bumpalo", "bumpalo",
"bumparaw-collections", "bumparaw-collections",
@ -3970,6 +3977,7 @@ dependencies = [
"obkv", "obkv",
"once_cell", "once_cell",
"ordered-float 5.0.0", "ordered-float 5.0.0",
"papaya",
"rand 0.8.5", "rand 0.8.5",
"rayon", "rayon",
"rhai", "rhai",
@ -4411,6 +4419,16 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "papaya"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f92dd0b07c53a0a0c764db2ace8c541dc47320dad97c2200c2a637ab9dd2328f"
dependencies = [
"equivalent",
"seize",
]
[[package]] [[package]]
name = "parking_lot" name = "parking_lot"
version = "0.12.4" version = "0.12.4"
@ -5462,6 +5480,16 @@ dependencies = [
"time", "time",
] ]
[[package]]
name = "seize"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e4b8d813387d566f627f3ea1b914c068aac94c40ae27ec43f5f33bde65abefe7"
dependencies = [
"libc",
"windows-sys 0.52.0",
]
[[package]] [[package]]
name = "semver" name = "semver"
version = "1.0.26" version = "1.0.26"

View File

@ -109,6 +109,8 @@ utoipa = { version = "5.4.0", features = [
"openapi_extensions", "openapi_extensions",
] } ] }
lru = "0.14.0" lru = "0.14.0"
boxcar = "0.2.14"
papaya = "0.2.3"
[dev-dependencies] [dev-dependencies]
mimalloc = { version = "0.1.47", default-features = false } mimalloc = { version = "0.1.47", default-features = false }

View File

@ -6,6 +6,7 @@ use bumparaw_collections::RawMap;
use hashbrown::hash_map::Entry; use hashbrown::hash_map::Entry;
use heed::RoTxn; use heed::RoTxn;
use memmap2::Mmap; use memmap2::Mmap;
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
use rayon::slice::ParallelSlice; use rayon::slice::ParallelSlice;
use rustc_hash::FxBuildHasher; use rustc_hash::FxBuildHasher;
use serde_json::value::RawValue; use serde_json::value::RawValue;
@ -20,7 +21,7 @@ use crate::update::new::document::{DocumentContext, Versions};
use crate::update::new::steps::IndexingStep; use crate::update::new::steps::IndexingStep;
use crate::update::new::thread_local::MostlySend; use crate::update::new::thread_local::MostlySend;
use crate::update::new::{DocumentIdentifiers, Insertion, Update}; use crate::update::new::{DocumentIdentifiers, Insertion, Update};
use crate::update::{AvailableIds, IndexDocumentsMethod}; use crate::update::{AvailableIds, ConcurrentAvailableIds, IndexDocumentsMethod};
use crate::{DocumentId, Error, FieldsIdsMap, Index, InternalError, Result, UserError}; use crate::{DocumentId, Error, FieldsIdsMap, Index, InternalError, Result, UserError};
#[derive(Default)] #[derive(Default)]
@ -73,14 +74,13 @@ impl<'pl> DocumentOperation<'pl> {
progress: Progress, progress: Progress,
) -> Result<(DocumentOperationChanges<'pl>, Vec<PayloadStats>, Option<PrimaryKey<'pl>>)> ) -> Result<(DocumentOperationChanges<'pl>, Vec<PayloadStats>, Option<PrimaryKey<'pl>>)>
where where
MSP: Fn() -> bool, MSP: Fn() -> bool + Sync,
{ {
progress.update_progress(IndexingStep::PreparingPayloads); progress.update_progress(IndexingStep::PreparingPayloads);
let Self { operations } = self; let Self { operations } = self;
let documents_ids = index.documents_ids(rtxn)?; let documents_ids = index.documents_ids(rtxn)?;
let mut operations_stats = Vec::new(); let available_docids = ConcurrentAvailableIds::new(documents_ids);
let mut available_docids = AvailableIds::new(&documents_ids);
let mut docids_version_offsets = hashbrown::HashMap::new(); let mut docids_version_offsets = hashbrown::HashMap::new();
let mut primary_key = None; let mut primary_key = None;
@ -88,35 +88,42 @@ impl<'pl> DocumentOperation<'pl> {
let (step, progress_step) = AtomicPayloadStep::new(payload_count as u32); let (step, progress_step) = AtomicPayloadStep::new(payload_count as u32);
progress.update_progress(progress_step); progress.update_progress(progress_step);
for (payload_index, operation) in operations.into_iter().enumerate() { let operations_stats = operations
.into_par_iter()
.enumerate()
.map(|(payload_index, operation)| {
if must_stop_processing() { if must_stop_processing() {
return Err(InternalError::AbortedIndexation.into()); return Err(InternalError::AbortedIndexation.into());
} }
step.store(payload_index as u32, Ordering::Relaxed); step.fetch_add(1, Ordering::Relaxed);
// TODO do not create too many rtxn and rather keep a pool
let rtxn = index.read_txn()?;
let bump = Bump::new();
let mut bytes = 0; let mut bytes = 0;
let result = match operation { let result = match operation {
Payload::Replace(payload) => extract_addition_payload_changes( Payload::Replace(payload) => extract_addition_payload_changes(
indexer, &bump,
index, index,
rtxn, &rtxn,
primary_key_from_op, primary_key_from_op,
&mut primary_key, &mut primary_key,
new_fields_ids_map, new_fields_ids_map,
&mut available_docids, &available_docids,
&mut bytes, &mut bytes,
&docids_version_offsets, &docids_version_offsets,
IndexDocumentsMethod::ReplaceDocuments, IndexDocumentsMethod::ReplaceDocuments,
payload, payload,
), ),
Payload::Update(payload) => extract_addition_payload_changes( Payload::Update(payload) => extract_addition_payload_changes(
indexer, &bump,
index, index,
rtxn, &rtxn,
primary_key_from_op, primary_key_from_op,
&mut primary_key, &mut primary_key,
new_fields_ids_map, new_fields_ids_map,
&mut available_docids, &available_docids,
&mut bytes, &mut bytes,
&docids_version_offsets, &docids_version_offsets,
IndexDocumentsMethod::UpdateDocuments, IndexDocumentsMethod::UpdateDocuments,
@ -124,28 +131,31 @@ impl<'pl> DocumentOperation<'pl> {
), ),
Payload::Deletion(to_delete) => extract_deletion_payload_changes( Payload::Deletion(to_delete) => extract_deletion_payload_changes(
index, index,
rtxn, &rtxn,
&mut available_docids, &available_docids,
&docids_version_offsets, &docids_version_offsets,
to_delete, to_delete,
), ),
}; };
let mut document_count = 0; match result {
let error = match result {
Ok(new_docids_version_offsets) => { Ok(new_docids_version_offsets) => {
document_count = new_docids_version_offsets.len() as u64; let document_count = new_docids_version_offsets.len() as u64;
// If we don't have any error then we can merge the content of this payload // If we don't have any error then we can merge the content of this payload
// into to main payload. Else we just drop this payload extraction. // into to main payload. Else we just drop this payload extraction.
merge_version_offsets(&mut docids_version_offsets, new_docids_version_offsets); merge_version_offsets(
None &mut docids_version_offsets,
new_docids_version_offsets,
);
Ok(PayloadStats { document_count, bytes, error: None })
} }
Err(Error::UserError(user_error)) => Some(user_error), Err(Error::UserError(user_error)) => {
Err(e) => return Err(e), Ok(PayloadStats { document_count: 0, bytes, error: Some(user_error) })
};
operations_stats.push(PayloadStats { document_count, bytes, error });
} }
step.store(payload_count as u32, Ordering::Relaxed); Err(e) => Err(e),
}
})
.collect::<Result<Vec<_>>>()?;
// TODO We must drain the HashMap into a Vec because rayon::hash_map::IntoIter: !Clone // TODO We must drain the HashMap into a Vec because rayon::hash_map::IntoIter: !Clone
let mut docids_version_offsets: bumpalo::collections::vec::Vec<_> = let mut docids_version_offsets: bumpalo::collections::vec::Vec<_> =
@ -169,7 +179,7 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>(
primary_key_from_op: Option<&'r str>, primary_key_from_op: Option<&'r str>,
primary_key: &mut Option<PrimaryKey<'r>>, primary_key: &mut Option<PrimaryKey<'r>>,
new_fields_ids_map: &mut FieldsIdsMap, new_fields_ids_map: &mut FieldsIdsMap,
available_docids: &mut AvailableIds, available_docids: &ConcurrentAvailableIds,
bytes: &mut u64, bytes: &mut u64,
main_docids_version_offsets: &hashbrown::HashMap<&'pl str, PayloadOperations<'pl>>, main_docids_version_offsets: &hashbrown::HashMap<&'pl str, PayloadOperations<'pl>>,
method: IndexDocumentsMethod, method: IndexDocumentsMethod,
@ -318,7 +328,7 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>(
Ok(Err(UserError::NoPrimaryKeyCandidateFound)) => (), Ok(Err(UserError::NoPrimaryKeyCandidateFound)) => (),
Ok(Err(user_error)) => return Err(Error::UserError(user_error)), Ok(Err(user_error)) => return Err(Error::UserError(user_error)),
Err(error) => return Err(error), Err(error) => return Err(error),
}; }
} }
Ok(new_docids_version_offsets) Ok(new_docids_version_offsets)
@ -327,7 +337,7 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>(
fn extract_deletion_payload_changes<'s, 'pl: 's>( fn extract_deletion_payload_changes<'s, 'pl: 's>(
index: &Index, index: &Index,
rtxn: &RoTxn, rtxn: &RoTxn,
available_docids: &mut AvailableIds, available_docids: &ConcurrentAvailableIds,
main_docids_version_offsets: &hashbrown::HashMap<&'s str, PayloadOperations<'pl>>, main_docids_version_offsets: &hashbrown::HashMap<&'s str, PayloadOperations<'pl>>,
to_delete: &'pl [&'pl str], to_delete: &'pl [&'pl str],
) -> Result<hashbrown::HashMap<&'s str, PayloadOperations<'pl>>> { ) -> Result<hashbrown::HashMap<&'s str, PayloadOperations<'pl>>> {