Replace the HashMap caches by BTreeMaps

This commit is contained in:
Kerollmops 2024-12-05 15:22:30 +01:00
parent 9a9383643f
commit 9762d02900
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F

View File

@ -60,9 +60,10 @@
//! For now we can use a grenad sorter for spilling even thought I think
//! it's not the most efficient way (too many files open, sorting entries).
use std::borrow::Borrow;
use std::cmp::Ordering;
use std::collections::binary_heap::PeekMut;
use std::collections::BinaryHeap;
use std::collections::{BTreeMap, BinaryHeap};
use std::fs::File;
use std::hash::BuildHasher;
use std::io::BufReader;
@ -70,10 +71,7 @@ use std::{io, iter, mem};
use bumpalo::Bump;
use grenad::ReaderCursor;
use hashbrown::hash_map::RawEntryMut;
use hashbrown::HashMap;
use raw_collections::bbbul::{BitPacker, BitPacker4x};
use raw_collections::map::FrozenMap;
use raw_collections::{Bbbul, FrozenBbbul};
use roaring::RoaringBitmap;
use rustc_hash::FxBuildHasher;
@ -105,9 +103,7 @@ impl<'extractor> BalancedCaches<'extractor> {
hasher: FxBuildHasher,
max_memory,
caches: InnerCaches::Normal(NormalCaches {
caches: iter::repeat_with(|| HashMap::with_hasher_in(FxBuildHasher, alloc))
.take(buckets)
.collect(),
caches: iter::repeat_with(BTreeMap::new).take(buckets).collect(),
}),
alloc,
}
@ -166,8 +162,8 @@ impl<'extractor> BalancedCaches<'extractor> {
rayon::current_thread_index().unwrap_or(0)
);
let allocated: usize = normal_caches.caches.iter().map(|m| m.allocation_size()).sum();
tracing::trace!("The last allocated HashMap took {allocated} bytes");
// let allocated: usize = normal_caches.caches.iter().map(|m| m.allocation_size()).sum();
// tracing::trace!("The last allocated BTreeMap took {allocated} bytes");
let dummy = NormalCaches { caches: Vec::new() };
let NormalCaches { caches: cache_maps } = mem::replace(normal_caches, dummy);
@ -187,21 +183,17 @@ impl<'extractor> BalancedCaches<'extractor> {
// that are the same size.
let map = unsafe {
std::mem::transmute::<
&mut HashMap<
&mut BTreeMap<
&[u8],
DelAddBbbul<BitPacker4x>, // from this
FxBuildHasher,
&Bump,
>,
&mut HashMap<
&mut BTreeMap<
&[u8],
FrozenDelAddBbbul<BitPacker4x>, // to that
FxBuildHasher,
&Bump,
>,
>(map)
};
Ok(FrozenCache { bucket, cache: FrozenMap::new(map), spilled: Vec::new() })
Ok(FrozenCache { bucket, cache: FrozenBTreeMap::new(map), spilled: Vec::new() })
})
.collect(),
InnerCaches::Spilling(SpillingCaches { caches, spilled_entries, .. }) => caches
@ -220,21 +212,17 @@ impl<'extractor> BalancedCaches<'extractor> {
// that are the same size.
let map = unsafe {
std::mem::transmute::<
&mut HashMap<
&mut BTreeMap<
&[u8],
DelAddBbbul<BitPacker4x>, // from this
FxBuildHasher,
&Bump,
>,
&mut HashMap<
&mut BTreeMap<
&[u8],
FrozenDelAddBbbul<BitPacker4x>, // to that
FxBuildHasher,
&Bump,
>,
>(map)
};
Ok(FrozenCache { bucket, cache: FrozenMap::new(map), spilled })
Ok(FrozenCache { bucket, cache: FrozenBTreeMap::new(map), spilled })
})
.collect(),
}
@ -245,14 +233,7 @@ impl<'extractor> BalancedCaches<'extractor> {
unsafe impl MostlySend for BalancedCaches<'_> {}
struct NormalCaches<'extractor> {
caches: Vec<
HashMap<
&'extractor [u8],
DelAddBbbul<'extractor, BitPacker4x>,
FxBuildHasher,
&'extractor Bump,
>,
>,
caches: Vec<BTreeMap<&'extractor [u8], DelAddBbbul<'extractor, BitPacker4x>>>,
}
impl<'extractor> NormalCaches<'extractor> {
@ -266,17 +247,13 @@ impl<'extractor> NormalCaches<'extractor> {
) {
let hash = hasher.hash_one(key);
let bucket = compute_bucket_from_hash(buckets, hash);
match self.caches[bucket].raw_entry_mut().from_hash(hash, |&k| k == key) {
RawEntryMut::Occupied(mut entry) => {
entry.get_mut().del.get_or_insert_with(|| Bbbul::new_in(alloc)).insert(n);
let cache = &mut self.caches[bucket];
match cache.get_mut(key) {
Some(deladd) => {
deladd.del.get_or_insert_with(|| Bbbul::new_in(alloc)).insert(n);
}
RawEntryMut::Vacant(entry) => {
entry.insert_hashed_nocheck(
hash,
alloc.alloc_slice_copy(key),
DelAddBbbul::new_del_u32_in(n, alloc),
);
None => {
cache.insert(alloc.alloc_slice_copy(key), DelAddBbbul::new_del_u32_in(n, alloc));
}
}
}
@ -291,30 +268,20 @@ impl<'extractor> NormalCaches<'extractor> {
) {
let hash = hasher.hash_one(key);
let bucket = compute_bucket_from_hash(buckets, hash);
match self.caches[bucket].raw_entry_mut().from_hash(hash, |&k| k == key) {
RawEntryMut::Occupied(mut entry) => {
entry.get_mut().add.get_or_insert_with(|| Bbbul::new_in(alloc)).insert(n);
let cache = &mut self.caches[bucket];
match cache.get_mut(key) {
Some(deladd) => {
deladd.add.get_or_insert_with(|| Bbbul::new_in(alloc)).insert(n);
}
RawEntryMut::Vacant(entry) => {
entry.insert_hashed_nocheck(
hash,
alloc.alloc_slice_copy(key),
DelAddBbbul::new_add_u32_in(n, alloc),
);
None => {
cache.insert(alloc.alloc_slice_copy(key), DelAddBbbul::new_add_u32_in(n, alloc));
}
}
}
}
struct SpillingCaches<'extractor> {
caches: Vec<
HashMap<
&'extractor [u8],
DelAddBbbul<'extractor, BitPacker4x>,
FxBuildHasher,
&'extractor Bump,
>,
>,
caches: Vec<BTreeMap<&'extractor [u8], DelAddBbbul<'extractor, BitPacker4x>>>,
spilled_entries: Vec<grenad::Sorter<MergeDeladdCboRoaringBitmaps>>,
deladd_buffer: Vec<u8>,
cbo_buffer: Vec<u8>,
@ -322,14 +289,7 @@ struct SpillingCaches<'extractor> {
impl<'extractor> SpillingCaches<'extractor> {
fn from_cache_maps(
caches: Vec<
HashMap<
&'extractor [u8],
DelAddBbbul<'extractor, BitPacker4x>,
FxBuildHasher,
&'extractor Bump,
>,
>,
caches: Vec<BTreeMap<&'extractor [u8], DelAddBbbul<'extractor, BitPacker4x>>>,
) -> SpillingCaches<'extractor> {
SpillingCaches {
spilled_entries: iter::repeat_with(|| {
@ -356,12 +316,12 @@ impl<'extractor> SpillingCaches<'extractor> {
) -> Result<()> {
let hash = hasher.hash_one(key);
let bucket = compute_bucket_from_hash(buckets, hash);
match self.caches[bucket].raw_entry_mut().from_hash(hash, |&k| k == key) {
RawEntryMut::Occupied(mut entry) => {
entry.get_mut().del.get_or_insert_with(|| Bbbul::new_in(alloc)).insert(n);
match self.caches[bucket].get_mut(key) {
Some(deladd) => {
deladd.del.get_or_insert_with(|| Bbbul::new_in(alloc)).insert(n);
Ok(())
}
RawEntryMut::Vacant(_entry) => spill_entry_to_sorter(
None => spill_entry_to_sorter(
&mut self.spilled_entries[bucket],
&mut self.deladd_buffer,
&mut self.cbo_buffer,
@ -381,12 +341,12 @@ impl<'extractor> SpillingCaches<'extractor> {
) -> Result<()> {
let hash = hasher.hash_one(key);
let bucket = compute_bucket_from_hash(buckets, hash);
match self.caches[bucket].raw_entry_mut().from_hash(hash, |&k| k == key) {
RawEntryMut::Occupied(mut entry) => {
entry.get_mut().add.get_or_insert_with(|| Bbbul::new_in(alloc)).insert(n);
match self.caches[bucket].get_mut(key) {
Some(deladd) => {
deladd.add.get_or_insert_with(|| Bbbul::new_in(alloc)).insert(n);
Ok(())
}
RawEntryMut::Vacant(_entry) => spill_entry_to_sorter(
None => spill_entry_to_sorter(
&mut self.spilled_entries[bucket],
&mut self.deladd_buffer,
&mut self.cbo_buffer,
@ -441,13 +401,7 @@ fn spill_entry_to_sorter(
pub struct FrozenCache<'a, 'extractor> {
bucket: usize,
cache: FrozenMap<
'a,
'extractor,
&'extractor [u8],
FrozenDelAddBbbul<'extractor, BitPacker4x>,
FxBuildHasher,
>,
cache: FrozenBTreeMap<'a, &'extractor [u8], FrozenDelAddBbbul<'extractor, BitPacker4x>>,
spilled: Vec<grenad::Reader<BufReader<File>>>,
}
@ -466,6 +420,36 @@ pub fn transpose_and_freeze_caches<'a, 'extractor>(
Ok(bucket_caches)
}
pub struct FrozenBTreeMap<'a, K, V>(&'a mut BTreeMap<K, V>);
unsafe impl<'a, K, V> Send for FrozenBTreeMap<'a, K, V>
where
K: Send,
V: Send,
{
}
impl<'a, K, V> FrozenBTreeMap<'a, K, V> {
#[inline]
pub fn new(map: &'a mut BTreeMap<K, V>) -> Self {
Self(map)
}
#[inline]
pub fn iter_mut(&mut self) -> std::collections::btree_map::IterMut<'_, K, V> {
self.0.iter_mut()
}
#[inline]
pub fn get_mut<Q>(&mut self, key: &Q) -> Option<&mut V>
where
K: Borrow<Q> + Ord,
Q: Ord + ?Sized,
{
self.0.get_mut(key)
}
}
/// Merges the caches that must be all associated to the same bucket
/// but make sure to sort the different buckets before performing the merges.
///
@ -491,7 +475,7 @@ where
for (source_index, source) in readers.into_iter().enumerate() {
let mut cursor = source.into_cursor()?;
if cursor.move_on_next()?.is_some() {
heap.push(Entry { cursor, source_index });
heap.push(CursorEntry { cursor, source_index });
}
}
@ -544,12 +528,11 @@ where
// Then manage the content on the HashMap entries that weren't taken (mem::take).
while let Some(mut map) = maps.pop() {
// Make sure we don't try to work with entries already managed by the spilled
let mut ordered_entries: Vec<_> =
map.iter_mut().filter(|(_, bbbul)| !bbbul.is_empty()).collect();
ordered_entries.sort_unstable_by_key(|(key, _)| *key);
for (key, bbbul) in map.iter_mut() {
if bbbul.is_empty() {
continue;
}
for (key, bbbul) in ordered_entries {
let mut output = DelAddRoaringBitmap::empty();
output.union_and_clear_bbbul(bbbul);
@ -567,29 +550,29 @@ where
Ok(())
}
struct Entry<R> {
struct CursorEntry<R> {
cursor: ReaderCursor<R>,
source_index: usize,
}
impl<R> Ord for Entry<R> {
fn cmp(&self, other: &Entry<R>) -> Ordering {
impl<R> Ord for CursorEntry<R> {
fn cmp(&self, other: &CursorEntry<R>) -> Ordering {
let skey = self.cursor.current().map(|(k, _)| k);
let okey = other.cursor.current().map(|(k, _)| k);
skey.cmp(&okey).then(self.source_index.cmp(&other.source_index)).reverse()
}
}
impl<R> Eq for Entry<R> {}
impl<R> Eq for CursorEntry<R> {}
impl<R> PartialEq for Entry<R> {
fn eq(&self, other: &Entry<R>) -> bool {
impl<R> PartialEq for CursorEntry<R> {
fn eq(&self, other: &CursorEntry<R>) -> bool {
self.cmp(other) == Ordering::Equal
}
}
impl<R> PartialOrd for Entry<R> {
fn partial_cmp(&self, other: &Entry<R>) -> Option<Ordering> {
impl<R> PartialOrd for CursorEntry<R> {
fn partial_cmp(&self, other: &CursorEntry<R>) -> Option<Ordering> {
Some(self.cmp(other))
}
}