From cec88cfc29abd06aec880a4eca7973d22faa3954 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Thu, 19 Dec 2024 15:50:30 +0100 Subject: [PATCH 1/6] Measure the bbqueue congestion --- crates/milli/src/update/new/channel.rs | 132 +++++++++++++++++++------ 1 file changed, 103 insertions(+), 29 deletions(-) diff --git a/crates/milli/src/update/new/channel.rs b/crates/milli/src/update/new/channel.rs index 7e2229950..f74dfb7c5 100644 --- a/crates/milli/src/update/new/channel.rs +++ b/crates/milli/src/update/new/channel.rs @@ -5,6 +5,8 @@ use std::marker::PhantomData; use std::mem; use std::num::NonZeroU16; use std::ops::Range; +use std::sync::atomic::{self, AtomicUsize}; +use std::sync::Arc; use std::time::Duration; use bbqueue::framed::{FrameGrantR, FrameProducer}; @@ -71,12 +73,23 @@ pub fn extractor_writer_bbqueue( consumer }); + let sent_messages_attempts = Arc::new(AtomicUsize::new(0)); + let blocking_sent_messages_attempts = Arc::new(AtomicUsize::new(0)); + let (sender, receiver) = flume::bounded(channel_capacity); - let sender = ExtractorBbqueueSender { sender, producers, max_grant }; + let sender = ExtractorBbqueueSender { + sender, + producers, + max_grant, + sent_messages_attempts: sent_messages_attempts.clone(), + blocking_sent_messages_attempts: blocking_sent_messages_attempts.clone(), + }; let receiver = WriterBbqueueReceiver { receiver, look_at_consumer: (0..consumers.len()).cycle(), consumers, + sent_messages_attempts, + blocking_sent_messages_attempts, }; (sender, receiver) } @@ -92,6 +105,12 @@ pub struct ExtractorBbqueueSender<'a> { /// It will never be able to store more than that as the /// buffer cannot split data into two parts. max_grant: usize, + /// The total number of attempts to send messages + /// over the bbqueue channel. + sent_messages_attempts: Arc, + /// The number of times an attempt to send a + /// messages failed and we had to pause for a bit. + blocking_sent_messages_attempts: Arc, } pub struct WriterBbqueueReceiver<'a> { @@ -104,6 +123,12 @@ pub struct WriterBbqueueReceiver<'a> { look_at_consumer: Cycle>, /// The BBQueue frames to read when waking-up. consumers: Vec>, + /// The total number of attempts to send messages + /// over the bbqueue channel. + sent_messages_attempts: Arc, + /// The number of times an attempt to send a + /// messages failed and we had to pause for a bit. + blocking_sent_messages_attempts: Arc, } /// The action to perform on the receiver/writer side. @@ -169,6 +194,16 @@ impl<'a> WriterBbqueueReceiver<'a> { } None } + + /// Returns the total count of attempts to send messages through the BBQueue channel. + pub fn sent_messages_attempts(&self) -> usize { + self.sent_messages_attempts.load(atomic::Ordering::Relaxed) + } + + /// Returns the count of attempts to send messages that had to be paused due to BBQueue being full. + pub fn blocking_sent_messages_attempts(&self) -> usize { + self.blocking_sent_messages_attempts.load(atomic::Ordering::Relaxed) + } } pub struct FrameWithHeader<'a> { @@ -458,10 +493,17 @@ impl<'b> ExtractorBbqueueSender<'b> { } // Spin loop to have a frame the size we requested. - reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| { - payload_header.serialize_into(grant); - Ok(()) - })?; + reserve_and_write_grant( + &mut producer, + total_length, + &self.sender, + &self.sent_messages_attempts, + &self.blocking_sent_messages_attempts, + |grant| { + payload_header.serialize_into(grant); + Ok(()) + }, + )?; Ok(()) } @@ -500,20 +542,28 @@ impl<'b> ExtractorBbqueueSender<'b> { } // Spin loop to have a frame the size we requested. - reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| { - let header_size = payload_header.header_size(); - let (header_bytes, remaining) = grant.split_at_mut(header_size); - payload_header.serialize_into(header_bytes); + reserve_and_write_grant( + &mut producer, + total_length, + &self.sender, + &self.sent_messages_attempts, + &self.blocking_sent_messages_attempts, + |grant| { + let header_size = payload_header.header_size(); + let (header_bytes, remaining) = grant.split_at_mut(header_size); + payload_header.serialize_into(header_bytes); - if dimensions != 0 { - let output_iter = remaining.chunks_exact_mut(dimensions * mem::size_of::()); - for (embedding, output) in embeddings.iter().zip(output_iter) { - output.copy_from_slice(bytemuck::cast_slice(embedding)); + if dimensions != 0 { + let output_iter = + remaining.chunks_exact_mut(dimensions * mem::size_of::()); + for (embedding, output) in embeddings.iter().zip(output_iter) { + output.copy_from_slice(bytemuck::cast_slice(embedding)); + } } - } - Ok(()) - })?; + Ok(()) + }, + )?; Ok(()) } @@ -571,13 +621,20 @@ impl<'b> ExtractorBbqueueSender<'b> { } // Spin loop to have a frame the size we requested. - reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| { - let header_size = payload_header.header_size(); - let (header_bytes, remaining) = grant.split_at_mut(header_size); - payload_header.serialize_into(header_bytes); - let (key_buffer, value_buffer) = remaining.split_at_mut(key_length.get() as usize); - key_value_writer(key_buffer, value_buffer) - })?; + reserve_and_write_grant( + &mut producer, + total_length, + &self.sender, + &self.sent_messages_attempts, + &self.blocking_sent_messages_attempts, + |grant| { + let header_size = payload_header.header_size(); + let (header_bytes, remaining) = grant.split_at_mut(header_size); + payload_header.serialize_into(header_bytes); + let (key_buffer, value_buffer) = remaining.split_at_mut(key_length.get() as usize); + key_value_writer(key_buffer, value_buffer) + }, + )?; Ok(()) } @@ -619,12 +676,19 @@ impl<'b> ExtractorBbqueueSender<'b> { } // Spin loop to have a frame the size we requested. - reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| { - let header_size = payload_header.header_size(); - let (header_bytes, remaining) = grant.split_at_mut(header_size); - payload_header.serialize_into(header_bytes); - key_writer(remaining) - })?; + reserve_and_write_grant( + &mut producer, + total_length, + &self.sender, + &self.sent_messages_attempts, + &self.blocking_sent_messages_attempts, + |grant| { + let header_size = payload_header.header_size(); + let (header_bytes, remaining) = grant.split_at_mut(header_size); + payload_header.serialize_into(header_bytes); + key_writer(remaining) + }, + )?; Ok(()) } @@ -637,12 +701,18 @@ fn reserve_and_write_grant( producer: &mut FrameProducer, total_length: usize, sender: &flume::Sender, + sent_messages_attempts: &AtomicUsize, + blocking_sent_messages_attempts: &AtomicUsize, f: F, ) -> crate::Result<()> where F: FnOnce(&mut [u8]) -> crate::Result<()>, { loop { + // An attempt means trying multiple times + // and succeeded to send or not. + sent_messages_attempts.fetch_add(1, atomic::Ordering::Relaxed); + for _ in 0..10_000 { match producer.grant(total_length) { Ok(mut grant) => { @@ -666,6 +736,10 @@ where return Err(Error::InternalError(InternalError::AbortedIndexation)); } + // We made an attempt to send a message in the + // bbqueue channel but it didn't succeeded. + blocking_sent_messages_attempts.fetch_add(1, atomic::Ordering::Relaxed); + // We prefer to yield and allow the writing thread // to do its job, especially beneficial when there // is only one CPU core available. From 6112bd8caa6daff524a0f1aa905d42812c481130 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Thu, 19 Dec 2024 16:57:17 +0100 Subject: [PATCH 2/6] Display the channel congestion --- crates/milli/src/update/new/indexer/write.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/crates/milli/src/update/new/indexer/write.rs b/crates/milli/src/update/new/indexer/write.rs index d1cc2038c..7f0bb6926 100644 --- a/crates/milli/src/update/new/indexer/write.rs +++ b/crates/milli/src/update/new/indexer/write.rs @@ -72,7 +72,19 @@ pub(super) fn write_to_db( &mut aligned_embedding, )?; } + write_from_bbqueue(&mut writer_receiver, index, wtxn, arroy_writers, &mut aligned_embedding)?; + + let direct_attempts = writer_receiver.sent_messages_attempts(); + let blocking_attempts = writer_receiver.blocking_sent_messages_attempts(); + let congestion_pct = (blocking_attempts as f64 / direct_attempts as f64) * 100.0; + tracing::debug!( + "Channel congestion metrics - \ + Direct send attempts: {direct_attempts}, \ + Blocking send attempts: {blocking_attempts} \ + ({congestion_pct:.1}% congestion)" + ); + Ok(()) } From a00796c46afde721ad94fb988414a022ee2e7ba5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 29 Jan 2025 14:09:44 +0100 Subject: [PATCH 3/6] Improve the naming in the log message --- crates/milli/src/update/new/indexer/write.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/milli/src/update/new/indexer/write.rs b/crates/milli/src/update/new/indexer/write.rs index 7f0bb6926..23009dc98 100644 --- a/crates/milli/src/update/new/indexer/write.rs +++ b/crates/milli/src/update/new/indexer/write.rs @@ -80,8 +80,8 @@ pub(super) fn write_to_db( let congestion_pct = (blocking_attempts as f64 / direct_attempts as f64) * 100.0; tracing::debug!( "Channel congestion metrics - \ - Direct send attempts: {direct_attempts}, \ - Blocking send attempts: {blocking_attempts} \ + Attempts: {direct_attempts}, \ + Blocked attempts: {blocking_attempts} \ ({congestion_pct:.1}% congestion)" ); From db032079d87ba9deac5482953eeb23eb51d131e9 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Wed, 29 Jan 2025 14:16:36 +0100 Subject: [PATCH 4/6] Show indexation allocated memory --- crates/milli/src/update/new/indexer/mod.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index b65750030..fd9e02b84 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -93,6 +93,13 @@ where }, ); + tracing::debug!( + "Indexation allocated memory metrics - \ + Total BBQueue size: {total_bbbuffer_capacity}, \ + Total extractor memory: {:?}", + grenad_parameters.max_memory, + ); + let (extractor_sender, writer_receiver) = pool .install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000)) .unwrap(); From a9d0f4a0021e1494272e2c96ac4a38cfd5af9e94 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 29 Jan 2025 15:16:40 +0100 Subject: [PATCH 5/6] Improve english comments Co-authored-by: Louis Dureuil --- crates/milli/src/update/new/channel.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/milli/src/update/new/channel.rs b/crates/milli/src/update/new/channel.rs index f74dfb7c5..4fff31a35 100644 --- a/crates/milli/src/update/new/channel.rs +++ b/crates/milli/src/update/new/channel.rs @@ -127,7 +127,7 @@ pub struct WriterBbqueueReceiver<'a> { /// over the bbqueue channel. sent_messages_attempts: Arc, /// The number of times an attempt to send a - /// messages failed and we had to pause for a bit. + /// message failed and we had to pause for a bit. blocking_sent_messages_attempts: Arc, } @@ -710,7 +710,7 @@ where { loop { // An attempt means trying multiple times - // and succeeded to send or not. + // whether is succeeded or not. sent_messages_attempts.fetch_add(1, atomic::Ordering::Relaxed); for _ in 0..10_000 { @@ -737,7 +737,7 @@ where } // We made an attempt to send a message in the - // bbqueue channel but it didn't succeeded. + // bbqueue channel but it didn't succeed. blocking_sent_messages_attempts.fetch_add(1, atomic::Ordering::Relaxed); // We prefer to yield and allow the writing thread From cb1b7513af17b44370896e7ba031654e99d10fa3 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Wed, 29 Jan 2025 15:21:52 +0100 Subject: [PATCH 6/6] Log the memory metrics only once --- crates/milli/src/update/new/indexer/mod.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index fd9e02b84..8b98bfba3 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -21,6 +21,7 @@ use crate::progress::Progress; use crate::update::GrenadParameters; use crate::vector::{ArroyWrapper, EmbeddingConfigs}; use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort}; +use std::sync::Once; pub(crate) mod de; pub mod document_changes; @@ -33,6 +34,8 @@ mod post_processing; mod update_by_function; mod write; +static LOG_MEMORY_METRICS_ONCE: Once = Once::new(); + /// This is the main function of this crate. /// /// Give it the output of the [`Indexer::document_changes`] method and it will execute it in the [`rayon::ThreadPool`]. @@ -93,12 +96,14 @@ where }, ); - tracing::debug!( - "Indexation allocated memory metrics - \ - Total BBQueue size: {total_bbbuffer_capacity}, \ - Total extractor memory: {:?}", - grenad_parameters.max_memory, - ); + LOG_MEMORY_METRICS_ONCE.call_once(|| { + tracing::debug!( + "Indexation allocated memory metrics - \ + Total BBQueue size: {total_bbbuffer_capacity}, \ + Total extractor memory: {:?}", + grenad_parameters.max_memory, + ); + }); let (extractor_sender, writer_receiver) = pool .install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000))