Move the spin looping for BBQueue frames into a dedicated function

This commit is contained in:
Clément Renault
2024-12-02 10:33:49 +01:00
parent be7d2fbe63
commit 263c5a348e
3 changed files with 49 additions and 44 deletions

View File

@@ -4,10 +4,10 @@ use std::marker::PhantomData;
use std::mem;
use std::num::NonZeroU16;
use bbqueue::framed::{FrameGrantR, FrameProducer};
use bbqueue::framed::{FrameGrantR, FrameGrantW, FrameProducer};
use bbqueue::BBBuffer;
use bytemuck::{checked, CheckedBitPattern, NoUninit};
use crossbeam_channel::SendError;
use flume::SendError;
use heed::types::Bytes;
use heed::BytesDecode;
use memmap2::{Mmap, MmapMut};
@@ -33,7 +33,7 @@ use crate::{CboRoaringBitmapCodec, DocumentId, Index};
///
/// The `channel_capacity` parameter defines the number of
/// too-large-to-fit-in-BBQueue entries that can be sent through
/// a crossbeam channel. This parameter must stay low to make
/// a flume channel. This parameter must stay low to make
/// sure we do not use too much memory.
///
/// Note that the channel is also used to wake-up the receiver
@@ -61,7 +61,7 @@ pub fn extractor_writer_bbqueue(
consumer
});
let (sender, receiver) = crossbeam_channel::bounded(channel_capacity);
let (sender, receiver) = flume::bounded(channel_capacity);
let sender = ExtractorBbqueueSender { sender, producers, capacity };
let receiver = WriterBbqueueReceiver { receiver, consumers };
(sender, receiver)
@@ -70,7 +70,7 @@ pub fn extractor_writer_bbqueue(
pub struct ExtractorBbqueueSender<'a> {
/// This channel is used to wake-up the receiver and
/// send large entries that cannot fit in the BBQueue.
sender: crossbeam_channel::Sender<ReceiverAction>,
sender: flume::Sender<ReceiverAction>,
/// A memory buffer, one by thread, is used to serialize
/// the entries directly in this shared, lock-free space.
producers: ThreadLocal<FullySend<RefCell<FrameProducer<'a>>>>,
@@ -87,7 +87,7 @@ pub struct WriterBbqueueReceiver<'a> {
/// Used to wake up when new entries are available either in
/// any BBQueue buffer or directly sent throught this channel
/// (still written to disk).
receiver: crossbeam_channel::Receiver<ReceiverAction>,
receiver: flume::Receiver<ReceiverAction>,
/// The BBQueue frames to read when waking-up.
consumers: Vec<bbqueue::framed::FrameConsumer<'a>>,
}
@@ -437,19 +437,9 @@ impl<'b> ExtractorBbqueueSender<'b> {
}
// Spin loop to have a frame the size we requested.
let mut grant = loop {
match producer.grant(total_length) {
Ok(grant) => break grant,
Err(bbqueue::Error::InsufficientSize) => continue,
Err(e) => unreachable!("{e:?}"),
}
};
let mut grant = reserve_grant(&mut producer, total_length, &self.sender);
payload_header.serialize_into(&mut grant);
// We could commit only the used memory.
grant.commit(total_length);
// We only send a wake up message when the channel is empty
// so that we don't fill the channel with too many WakeUps.
if self.sender.is_empty() {
@@ -494,13 +484,7 @@ impl<'b> ExtractorBbqueueSender<'b> {
}
// Spin loop to have a frame the size we requested.
let mut grant = loop {
match producer.grant(total_length) {
Ok(grant) => break grant,
Err(bbqueue::Error::InsufficientSize) => continue,
Err(e) => unreachable!("{e:?}"),
}
};
let mut grant = reserve_grant(&mut producer, total_length, &self.sender);
let header_size = payload_header.header_size();
let (header_bytes, remaining) = grant.split_at_mut(header_size);
@@ -571,13 +555,7 @@ impl<'b> ExtractorBbqueueSender<'b> {
}
// Spin loop to have a frame the size we requested.
let mut grant = loop {
match producer.grant(total_length) {
Ok(grant) => break grant,
Err(bbqueue::Error::InsufficientSize) => continue,
Err(e) => unreachable!("{e:?}"),
}
};
let mut grant = reserve_grant(&mut producer, total_length, &self.sender);
let header_size = payload_header.header_size();
let (header_bytes, remaining) = grant.split_at_mut(header_size);
@@ -585,9 +563,6 @@ impl<'b> ExtractorBbqueueSender<'b> {
let (key_buffer, value_buffer) = remaining.split_at_mut(key_length.get() as usize);
key_value_writer(key_buffer, value_buffer)?;
// We could commit only the used memory.
grant.commit(total_length);
// We only send a wake up message when the channel is empty
// so that we don't fill the channel with too many WakeUps.
if self.sender.is_empty() {
@@ -628,22 +603,13 @@ impl<'b> ExtractorBbqueueSender<'b> {
}
// Spin loop to have a frame the size we requested.
let mut grant = loop {
match producer.grant(total_length) {
Ok(grant) => break grant,
Err(bbqueue::Error::InsufficientSize) => continue,
Err(e) => unreachable!("{e:?}"),
}
};
let mut grant = reserve_grant(&mut producer, total_length, &self.sender);
let header_size = payload_header.header_size();
let (header_bytes, remaining) = grant.split_at_mut(header_size);
payload_header.serialize_into(header_bytes);
key_writer(remaining)?;
// We could commit only the used memory.
grant.commit(total_length);
// We only send a wake up message when the channel is empty
// so that we don't fill the channel with too many WakeUps.
if self.sender.is_empty() {
@@ -654,6 +620,31 @@ impl<'b> ExtractorBbqueueSender<'b> {
}
}
/// Try to reserve a frame grant of `total_length` by spin looping
/// on the BBQueue buffer and panics if the receiver has been disconnected.
fn reserve_grant<'b>(
producer: &mut FrameProducer<'b>,
total_length: usize,
sender: &flume::Sender<ReceiverAction>,
) -> FrameGrantW<'b> {
loop {
for _ in 0..10_000 {
match producer.grant(total_length) {
Ok(mut grant) => {
// We could commit only the used memory.
grant.to_commit(total_length);
return grant;
}
Err(bbqueue::Error::InsufficientSize) => continue,
Err(e) => unreachable!("{e:?}"),
}
}
if sender.is_disconnected() {
panic!("channel is disconnected");
}
}
}
pub enum ExactWordDocids {}
pub enum FidWordCountDocids {}
pub enum WordDocids {}