Introduce papaya

This commit is contained in:
Kerollmops 2025-03-18 15:47:06 +01:00
parent cf31a65a88
commit 4a741d3177
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
3 changed files with 87 additions and 55 deletions

21
Cargo.lock generated
View File

@ -3798,6 +3798,7 @@ dependencies = [
"obkv", "obkv",
"once_cell", "once_cell",
"ordered-float", "ordered-float",
"papaya",
"rand", "rand",
"rayon", "rayon",
"rayon-par-bridge", "rayon-par-bridge",
@ -4189,6 +4190,16 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "papaya"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6827e3fc394523c21d4464d02c0bb1c19966ea4a58a9844ad6d746214179d2bc"
dependencies = [
"equivalent",
"seize",
]
[[package]] [[package]]
name = "parking_lot" name = "parking_lot"
version = "0.12.3" version = "0.12.3"
@ -5104,6 +5115,16 @@ dependencies = [
"time", "time",
] ]
[[package]]
name = "seize"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e4b8d813387d566f627f3ea1b914c068aac94c40ae27ec43f5f33bde65abefe7"
dependencies = [
"libc",
"windows-sys 0.52.0",
]
[[package]] [[package]]
name = "semver" name = "semver"
version = "1.0.18" version = "1.0.18"

View File

@ -111,6 +111,7 @@ utoipa = { version = "5.3.1", features = [
"openapi_extensions", "openapi_extensions",
] } ] }
lru = "0.13.0" lru = "0.13.0"
papaya = "0.2.1"
[dev-dependencies] [dev-dependencies]
mimalloc = { version = "0.1.43", default-features = false } mimalloc = { version = "0.1.43", default-features = false }

View File

@ -6,6 +6,7 @@ use bumparaw_collections::RawMap;
use hashbrown::hash_map::Entry; use hashbrown::hash_map::Entry;
use heed::RoTxn; use heed::RoTxn;
use memmap2::Mmap; use memmap2::Mmap;
use rayon::iter::{IndexedParallelIterator as _, IntoParallelIterator as _, ParallelIterator as _};
use rayon::slice::ParallelSlice; use rayon::slice::ParallelSlice;
use rustc_hash::FxBuildHasher; use rustc_hash::FxBuildHasher;
use serde_json::value::RawValue; use serde_json::value::RawValue;
@ -88,64 +89,73 @@ impl<'pl> DocumentOperation<'pl> {
let (step, progress_step) = AtomicPayloadStep::new(payload_count as u32); let (step, progress_step) = AtomicPayloadStep::new(payload_count as u32);
progress.update_progress(progress_step); progress.update_progress(progress_step);
for (payload_index, operation) in operations.into_iter().enumerate() { let operations_stats = operations
if must_stop_processing() { .into_par_iter()
return Err(InternalError::AbortedIndexation.into()); .enumerate()
} .map(|(payload_index, operation)| {
step.store(payload_index as u32, Ordering::Relaxed); // if must_stop_processing() {
// return Err(InternalError::AbortedIndexation.into());
// }
let mut bytes = 0; let mut bytes = 0;
let result = match operation { let rtxn = index.read_txn().unwrap();
Payload::Replace(payload) => extract_addition_payload_changes( step.fetch_add(1, Ordering::Relaxed);
indexer,
index,
rtxn,
primary_key_from_op,
&mut primary_key,
new_fields_ids_map,
&mut available_docids,
&mut bytes,
&docids_version_offsets,
IndexDocumentsMethod::ReplaceDocuments,
payload,
),
Payload::Update(payload) => extract_addition_payload_changes(
indexer,
index,
rtxn,
primary_key_from_op,
&mut primary_key,
new_fields_ids_map,
&mut available_docids,
&mut bytes,
&docids_version_offsets,
IndexDocumentsMethod::UpdateDocuments,
payload,
),
Payload::Deletion(to_delete) => extract_deletion_payload_changes(
index,
rtxn,
&mut available_docids,
&docids_version_offsets,
to_delete,
),
};
let mut document_count = 0; let result = match operation {
let error = match result { Payload::Replace(payload) => extract_addition_payload_changes(
Ok(new_docids_version_offsets) => { indexer,
document_count = new_docids_version_offsets.len() as u64; index,
// If we don't have any error then we can merge the content of this payload &rtxn,
// into to main payload. Else we just drop this payload extraction. primary_key_from_op,
merge_version_offsets(&mut docids_version_offsets, new_docids_version_offsets); &mut primary_key,
None new_fields_ids_map,
&mut available_docids,
&mut bytes,
&docids_version_offsets,
IndexDocumentsMethod::ReplaceDocuments,
payload,
),
Payload::Update(payload) => extract_addition_payload_changes(
indexer,
index,
&rtxn,
primary_key_from_op,
&mut primary_key,
new_fields_ids_map,
&mut available_docids,
&mut bytes,
&docids_version_offsets,
IndexDocumentsMethod::UpdateDocuments,
payload,
),
Payload::Deletion(to_delete) => extract_deletion_payload_changes(
index,
&rtxn,
&mut available_docids,
&docids_version_offsets,
to_delete,
),
};
let mut document_count = 0;
match result {
Ok(new_docids_version_offsets) => {
document_count = new_docids_version_offsets.len() as u64;
// If we don't have any error then we can merge the content of this payload
// into to main payload. Else we just drop this payload extraction.
merge_version_offsets(
&mut docids_version_offsets,
new_docids_version_offsets,
);
Ok(PayloadStats { document_count, bytes, error: None })
}
Err(Error::UserError(user_error)) => {
Ok(PayloadStats { document_count, bytes, error: Some(user_error) })
}
Err(e) => Err(e),
} }
Err(Error::UserError(user_error)) => Some(user_error), })
Err(e) => return Err(e), .collect::<crate::Result<Vec<_>>>()?;
};
operations_stats.push(PayloadStats { document_count, bytes, error });
}
step.store(payload_count as u32, Ordering::Relaxed);
// TODO We must drain the HashMap into a Vec because rayon::hash_map::IntoIter: !Clone // TODO We must drain the HashMap into a Vec because rayon::hash_map::IntoIter: !Clone
let mut docids_version_offsets: bumpalo::collections::vec::Vec<_> = let mut docids_version_offsets: bumpalo::collections::vec::Vec<_> =