mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-31 07:56:28 +00:00 
			
		
		
		
	Reduce the maximum grant possible we can store in the BBQueue
This commit is contained in:
		
				
					committed by
					
						 Louis Dureuil
						Louis Dureuil
					
				
			
			
				
	
			
			
			
						parent
						
							1f54f07f72
						
					
				
				
					commit
					5ab4cdb1f3
				
			| @@ -27,6 +27,12 @@ use crate::update::new::KvReaderFieldId; | |||||||
| use crate::vector::Embedding; | use crate::vector::Embedding; | ||||||
| use crate::{CboRoaringBitmapCodec, DocumentId, Error, Index, InternalError}; | use crate::{CboRoaringBitmapCodec, DocumentId, Error, Index, InternalError}; | ||||||
|  |  | ||||||
|  | /// Note that the FrameProducer requires up to 9 bytes to | ||||||
|  | /// encode the length, the max grant has been computed accordingly. | ||||||
|  | /// | ||||||
|  | /// <https://docs.rs/bbqueue/latest/bbqueue/framed/index.html#frame-header> | ||||||
|  | const MAX_FRAME_HEADER_SIZE: usize = 9; | ||||||
|  |  | ||||||
| /// Creates a tuple of senders/receiver to be used by | /// Creates a tuple of senders/receiver to be used by | ||||||
| /// the extractors and the writer loop. | /// the extractors and the writer loop. | ||||||
| /// | /// | ||||||
| @@ -53,8 +59,8 @@ pub fn extractor_writer_bbqueue( | |||||||
|     bbbuffers.resize_with(current_num_threads, || BBBuffer::new(bbbuffer_capacity)); |     bbbuffers.resize_with(current_num_threads, || BBBuffer::new(bbbuffer_capacity)); | ||||||
|  |  | ||||||
|     let capacity = bbbuffers.first().unwrap().capacity(); |     let capacity = bbbuffers.first().unwrap().capacity(); | ||||||
|     // Read the field description to understand this |     // Read the const description to understand this | ||||||
|     let capacity = capacity.checked_sub(9).unwrap(); |     let max_grant = capacity.saturating_div(2).checked_sub(MAX_FRAME_HEADER_SIZE).unwrap(); | ||||||
|  |  | ||||||
|     let producers = ThreadLocal::with_capacity(bbbuffers.len()); |     let producers = ThreadLocal::with_capacity(bbbuffers.len()); | ||||||
|     let consumers = rayon::broadcast(|bi| { |     let consumers = rayon::broadcast(|bi| { | ||||||
| @@ -65,7 +71,7 @@ pub fn extractor_writer_bbqueue( | |||||||
|     }); |     }); | ||||||
|  |  | ||||||
|     let (sender, receiver) = flume::bounded(channel_capacity); |     let (sender, receiver) = flume::bounded(channel_capacity); | ||||||
|     let sender = ExtractorBbqueueSender { sender, producers, capacity }; |     let sender = ExtractorBbqueueSender { sender, producers, max_grant }; | ||||||
|     let receiver = WriterBbqueueReceiver { |     let receiver = WriterBbqueueReceiver { | ||||||
|         receiver, |         receiver, | ||||||
|         look_at_consumer: (0..consumers.len()).cycle(), |         look_at_consumer: (0..consumers.len()).cycle(), | ||||||
| @@ -81,13 +87,10 @@ pub struct ExtractorBbqueueSender<'a> { | |||||||
|     /// A memory buffer, one by thread, is used to serialize |     /// A memory buffer, one by thread, is used to serialize | ||||||
|     /// the entries directly in this shared, lock-free space. |     /// the entries directly in this shared, lock-free space. | ||||||
|     producers: ThreadLocal<FullySend<RefCell<FrameProducer<'a>>>>, |     producers: ThreadLocal<FullySend<RefCell<FrameProducer<'a>>>>, | ||||||
|     /// The capacity of this frame producer, will never be able to store more than that. |     /// The maximum frame grant that a producer can reserve. | ||||||
|     /// |     /// It will never be able to store more than that as the | ||||||
|     /// Note that the FrameProducer requires up to 9 bytes to encode the length, |     /// buffer cannot split data into two parts. | ||||||
|     /// the capacity has been shrunk accordingly. |     max_grant: usize, | ||||||
|     /// |  | ||||||
|     /// <https://docs.rs/bbqueue/latest/bbqueue/framed/index.html#frame-header> |  | ||||||
|     capacity: usize, |  | ||||||
| } | } | ||||||
|  |  | ||||||
| pub struct WriterBbqueueReceiver<'a> { | pub struct WriterBbqueueReceiver<'a> { | ||||||
| @@ -443,14 +446,14 @@ impl<'b> ExtractorBbqueueSender<'b> { | |||||||
|     } |     } | ||||||
|  |  | ||||||
|     fn delete_vector(&self, docid: DocumentId) -> crate::Result<()> { |     fn delete_vector(&self, docid: DocumentId) -> crate::Result<()> { | ||||||
|         let capacity = self.capacity; |         let max_grant = self.max_grant; | ||||||
|         let refcell = self.producers.get().unwrap(); |         let refcell = self.producers.get().unwrap(); | ||||||
|         let mut producer = refcell.0.borrow_mut_or_yield(); |         let mut producer = refcell.0.borrow_mut_or_yield(); | ||||||
|  |  | ||||||
|         let payload_header = EntryHeader::ArroyDeleteVector(ArroyDeleteVector { docid }); |         let payload_header = EntryHeader::ArroyDeleteVector(ArroyDeleteVector { docid }); | ||||||
|         let total_length = EntryHeader::total_delete_vector_size(); |         let total_length = EntryHeader::total_delete_vector_size(); | ||||||
|         if total_length > capacity { |         if total_length > max_grant { | ||||||
|             panic!("The entry is larger ({total_length} bytes) than the BBQueue capacity ({capacity} bytes)"); |             panic!("The entry is larger ({total_length} bytes) than the BBQueue max grant ({max_grant} bytes)"); | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         // Spin loop to have a frame the size we requested. |         // Spin loop to have a frame the size we requested. | ||||||
| @@ -468,7 +471,7 @@ impl<'b> ExtractorBbqueueSender<'b> { | |||||||
|         embedder_id: u8, |         embedder_id: u8, | ||||||
|         embeddings: &[Vec<f32>], |         embeddings: &[Vec<f32>], | ||||||
|     ) -> crate::Result<()> { |     ) -> crate::Result<()> { | ||||||
|         let capacity = self.capacity; |         let max_grant = self.max_grant; | ||||||
|         let refcell = self.producers.get().unwrap(); |         let refcell = self.producers.get().unwrap(); | ||||||
|         let mut producer = refcell.0.borrow_mut_or_yield(); |         let mut producer = refcell.0.borrow_mut_or_yield(); | ||||||
|  |  | ||||||
| @@ -479,7 +482,7 @@ impl<'b> ExtractorBbqueueSender<'b> { | |||||||
|         let arroy_set_vector = ArroySetVectors { docid, embedder_id, _padding: [0; 3] }; |         let arroy_set_vector = ArroySetVectors { docid, embedder_id, _padding: [0; 3] }; | ||||||
|         let payload_header = EntryHeader::ArroySetVectors(arroy_set_vector); |         let payload_header = EntryHeader::ArroySetVectors(arroy_set_vector); | ||||||
|         let total_length = EntryHeader::total_set_vectors_size(embeddings.len(), dimensions); |         let total_length = EntryHeader::total_set_vectors_size(embeddings.len(), dimensions); | ||||||
|         if total_length > capacity { |         if total_length > max_grant { | ||||||
|             let mut value_file = tempfile::tempfile().map(BufWriter::new)?; |             let mut value_file = tempfile::tempfile().map(BufWriter::new)?; | ||||||
|             for embedding in embeddings { |             for embedding in embeddings { | ||||||
|                 let mut embedding_bytes = bytemuck::cast_slice(embedding); |                 let mut embedding_bytes = bytemuck::cast_slice(embedding); | ||||||
| @@ -540,14 +543,14 @@ impl<'b> ExtractorBbqueueSender<'b> { | |||||||
|     where |     where | ||||||
|         F: FnOnce(&mut [u8], &mut [u8]) -> crate::Result<()>, |         F: FnOnce(&mut [u8], &mut [u8]) -> crate::Result<()>, | ||||||
|     { |     { | ||||||
|         let capacity = self.capacity; |         let max_grant = self.max_grant; | ||||||
|         let refcell = self.producers.get().unwrap(); |         let refcell = self.producers.get().unwrap(); | ||||||
|         let mut producer = refcell.0.borrow_mut_or_yield(); |         let mut producer = refcell.0.borrow_mut_or_yield(); | ||||||
|  |  | ||||||
|         let operation = DbOperation { database, key_length: Some(key_length) }; |         let operation = DbOperation { database, key_length: Some(key_length) }; | ||||||
|         let payload_header = EntryHeader::DbOperation(operation); |         let payload_header = EntryHeader::DbOperation(operation); | ||||||
|         let total_length = EntryHeader::total_key_value_size(key_length, value_length); |         let total_length = EntryHeader::total_key_value_size(key_length, value_length); | ||||||
|         if total_length > capacity { |         if total_length > max_grant { | ||||||
|             let mut key_buffer = vec![0; key_length.get() as usize].into_boxed_slice(); |             let mut key_buffer = vec![0; key_length.get() as usize].into_boxed_slice(); | ||||||
|             let value_file = tempfile::tempfile()?; |             let value_file = tempfile::tempfile()?; | ||||||
|             value_file.set_len(value_length.try_into().unwrap())?; |             value_file.set_len(value_length.try_into().unwrap())?; | ||||||
| @@ -601,7 +604,7 @@ impl<'b> ExtractorBbqueueSender<'b> { | |||||||
|     where |     where | ||||||
|         F: FnOnce(&mut [u8]) -> crate::Result<()>, |         F: FnOnce(&mut [u8]) -> crate::Result<()>, | ||||||
|     { |     { | ||||||
|         let capacity = self.capacity; |         let max_grant = self.max_grant; | ||||||
|         let refcell = self.producers.get().unwrap(); |         let refcell = self.producers.get().unwrap(); | ||||||
|         let mut producer = refcell.0.borrow_mut_or_yield(); |         let mut producer = refcell.0.borrow_mut_or_yield(); | ||||||
|  |  | ||||||
| @@ -610,8 +613,8 @@ impl<'b> ExtractorBbqueueSender<'b> { | |||||||
|         let operation = DbOperation { database, key_length: None }; |         let operation = DbOperation { database, key_length: None }; | ||||||
|         let payload_header = EntryHeader::DbOperation(operation); |         let payload_header = EntryHeader::DbOperation(operation); | ||||||
|         let total_length = EntryHeader::total_key_size(key_length); |         let total_length = EntryHeader::total_key_size(key_length); | ||||||
|         if total_length > capacity { |         if total_length > max_grant { | ||||||
|             panic!("The entry is larger ({total_length} bytes) than the BBQueue capacity ({capacity} bytes)"); |             panic!("The entry is larger ({total_length} bytes) than the BBQueue max grant ({max_grant} bytes)"); | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         // Spin loop to have a frame the size we requested. |         // Spin loop to have a frame the size we requested. | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user