mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-26 13:36:27 +00:00 
			
		
		
		
	Change the reserve and grant function to accept a closure
This commit is contained in:
		| @@ -7,7 +7,7 @@ use std::num::NonZeroU16; | |||||||
| use std::ops::Range; | use std::ops::Range; | ||||||
| use std::time::Duration; | use std::time::Duration; | ||||||
|  |  | ||||||
| use bbqueue::framed::{FrameGrantR, FrameGrantW, FrameProducer}; | use bbqueue::framed::{FrameGrantR, FrameProducer}; | ||||||
| use bbqueue::BBBuffer; | use bbqueue::BBBuffer; | ||||||
| use bytemuck::{checked, CheckedBitPattern, NoUninit}; | use bytemuck::{checked, CheckedBitPattern, NoUninit}; | ||||||
| use flume::{RecvTimeoutError, SendError}; | use flume::{RecvTimeoutError, SendError}; | ||||||
| @@ -454,8 +454,10 @@ impl<'b> ExtractorBbqueueSender<'b> { | |||||||
|         } |         } | ||||||
|  |  | ||||||
|         // Spin loop to have a frame the size we requested. |         // Spin loop to have a frame the size we requested. | ||||||
|         let mut grant = reserve_grant(&mut producer, total_length, &self.sender)?; |         reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| { | ||||||
|         payload_header.serialize_into(&mut grant); |             payload_header.serialize_into(grant); | ||||||
|  |             Ok(()) | ||||||
|  |         })?; | ||||||
|  |  | ||||||
|         // We only send a wake up message when the channel is empty |         // We only send a wake up message when the channel is empty | ||||||
|         // so that we don't fill the channel with too many WakeUps. |         // so that we don't fill the channel with too many WakeUps. | ||||||
| @@ -500,8 +502,7 @@ impl<'b> ExtractorBbqueueSender<'b> { | |||||||
|         } |         } | ||||||
|  |  | ||||||
|         // Spin loop to have a frame the size we requested. |         // Spin loop to have a frame the size we requested. | ||||||
|         let mut grant = reserve_grant(&mut producer, total_length, &self.sender)?; |         reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| { | ||||||
|  |  | ||||||
|             let header_size = payload_header.header_size(); |             let header_size = payload_header.header_size(); | ||||||
|             let (header_bytes, remaining) = grant.split_at_mut(header_size); |             let (header_bytes, remaining) = grant.split_at_mut(header_size); | ||||||
|             payload_header.serialize_into(header_bytes); |             payload_header.serialize_into(header_bytes); | ||||||
| @@ -513,6 +514,9 @@ impl<'b> ExtractorBbqueueSender<'b> { | |||||||
|                 } |                 } | ||||||
|             } |             } | ||||||
|  |  | ||||||
|  |             Ok(()) | ||||||
|  |         })?; | ||||||
|  |  | ||||||
|         // We only send a wake up message when the channel is empty |         // We only send a wake up message when the channel is empty | ||||||
|         // so that we don't fill the channel with too many WakeUps. |         // so that we don't fill the channel with too many WakeUps. | ||||||
|         if self.sender.is_empty() { |         if self.sender.is_empty() { | ||||||
| @@ -575,13 +579,13 @@ impl<'b> ExtractorBbqueueSender<'b> { | |||||||
|         } |         } | ||||||
|  |  | ||||||
|         // Spin loop to have a frame the size we requested. |         // Spin loop to have a frame the size we requested. | ||||||
|         let mut grant = reserve_grant(&mut producer, total_length, &self.sender)?; |         reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| { | ||||||
|  |  | ||||||
|             let header_size = payload_header.header_size(); |             let header_size = payload_header.header_size(); | ||||||
|             let (header_bytes, remaining) = grant.split_at_mut(header_size); |             let (header_bytes, remaining) = grant.split_at_mut(header_size); | ||||||
|             payload_header.serialize_into(header_bytes); |             payload_header.serialize_into(header_bytes); | ||||||
|             let (key_buffer, value_buffer) = remaining.split_at_mut(key_length.get() as usize); |             let (key_buffer, value_buffer) = remaining.split_at_mut(key_length.get() as usize); | ||||||
|         key_value_writer(key_buffer, value_buffer)?; |             key_value_writer(key_buffer, value_buffer) | ||||||
|  |         })?; | ||||||
|  |  | ||||||
|         // We only send a wake up message when the channel is empty |         // We only send a wake up message when the channel is empty | ||||||
|         // so that we don't fill the channel with too many WakeUps. |         // so that we don't fill the channel with too many WakeUps. | ||||||
| @@ -629,12 +633,12 @@ impl<'b> ExtractorBbqueueSender<'b> { | |||||||
|         } |         } | ||||||
|  |  | ||||||
|         // Spin loop to have a frame the size we requested. |         // Spin loop to have a frame the size we requested. | ||||||
|         let mut grant = reserve_grant(&mut producer, total_length, &self.sender)?; |         reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| { | ||||||
|  |  | ||||||
|             let header_size = payload_header.header_size(); |             let header_size = payload_header.header_size(); | ||||||
|             let (header_bytes, remaining) = grant.split_at_mut(header_size); |             let (header_bytes, remaining) = grant.split_at_mut(header_size); | ||||||
|             payload_header.serialize_into(header_bytes); |             payload_header.serialize_into(header_bytes); | ||||||
|         key_writer(remaining)?; |             key_writer(remaining) | ||||||
|  |         })?; | ||||||
|  |  | ||||||
|         // We only send a wake up message when the channel is empty |         // We only send a wake up message when the channel is empty | ||||||
|         // so that we don't fill the channel with too many WakeUps. |         // so that we don't fill the channel with too many WakeUps. | ||||||
| @@ -648,18 +652,23 @@ impl<'b> ExtractorBbqueueSender<'b> { | |||||||
|  |  | ||||||
| /// Try to reserve a frame grant of `total_length` by spin looping | /// Try to reserve a frame grant of `total_length` by spin looping | ||||||
| /// on the BBQueue buffer and panics if the receiver has been disconnected. | /// on the BBQueue buffer and panics if the receiver has been disconnected. | ||||||
| fn reserve_grant<'b>( | fn reserve_and_write_grant<F>( | ||||||
|     producer: &mut FrameProducer<'b>, |     producer: &mut FrameProducer, | ||||||
|     total_length: usize, |     total_length: usize, | ||||||
|     sender: &flume::Sender<ReceiverAction>, |     sender: &flume::Sender<ReceiverAction>, | ||||||
| ) -> crate::Result<FrameGrantW<'b>> { |     f: F, | ||||||
|  | ) -> crate::Result<()> | ||||||
|  | where | ||||||
|  |     F: FnOnce(&mut [u8]) -> crate::Result<()>, | ||||||
|  | { | ||||||
|     loop { |     loop { | ||||||
|         for _ in 0..10_000 { |         for _ in 0..10_000 { | ||||||
|             match producer.grant(total_length) { |             match producer.grant(total_length) { | ||||||
|                 Ok(mut grant) => { |                 Ok(mut grant) => { | ||||||
|                     // We could commit only the used memory. |                     // We could commit only the used memory. | ||||||
|                     grant.to_commit(total_length); |                     f(&mut grant)?; | ||||||
|                     return Ok(grant); |                     grant.commit(total_length); | ||||||
|  |                     return Ok(()); | ||||||
|                 } |                 } | ||||||
|                 Err(bbqueue::Error::InsufficientSize) => continue, |                 Err(bbqueue::Error::InsufficientSize) => continue, | ||||||
|                 Err(e) => unreachable!("{e:?}"), |                 Err(e) => unreachable!("{e:?}"), | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user