Introduce a tracing tool to filter logs associated to high memory usage

This commit is contained in:
Clément Renault
2024-02-08 15:20:05 +01:00
parent 0b1bb42753
commit 173aad6090
5 changed files with 331 additions and 12 deletions

View File

@ -13,11 +13,10 @@ serde_json = "1.0.111"
tracing = "0.1.40"
tracing-error = "0.2.0"
tracing-subscriber = "0.3.18"
byte-unit = { version = "4.0.19", default-features = false, features = [
"std",
"serde",
] }
tokio = { version = "1.35.1", features = ["sync"] }
clap = { version = "4.4.18", features = ["derive"] }
anyhow = "1.0.79"
byte-unit = { version = "5.1.4", features = ["byte"] }
[target.'cfg(any(target_os = "linux", target_os = "macos"))'.dependencies]
libproc = "0.14.2"

View File

@ -0,0 +1,89 @@
use std::collections::vec_deque::Drain;
use std::collections::VecDeque;
use std::io::{self, BufReader, Write};
use std::mem;
use anyhow::Context;
use byte_unit::Byte;
use clap::Parser;
use tracing_trace::entry::Entry;
/// A program that filters trace logs to only keeps
/// the logs related to memory usage above the given threshold.
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// The threshold that a log must have to be returned by this program.
#[arg(short, long)]
memory_threshold: Byte,
/// Number of context lines to keep around high memory log lines.
#[arg(long, default_value_t = 10)]
context: usize,
}
fn main() -> anyhow::Result<()> {
let Args { memory_threshold, context } = Args::parse();
let mut context = EntryContext::new(context);
let mut currently_in_threshold = false;
let input = BufReader::new(io::stdin());
let mut output = io::BufWriter::new(io::stdout());
for result in tracing_trace::TraceReader::new(input) {
let entry = result?;
if entry.memory().map_or(true, |m| m.resident < memory_threshold.as_u64()) {
if mem::replace(&mut currently_in_threshold, false) {
for entry in context.drain() {
serde_json::to_writer(&mut output, &entry)
.context("while serializing and writing to stdout")?;
}
}
context.push(entry);
} else {
currently_in_threshold = true;
for entry in context.drain() {
serde_json::to_writer(&mut output, &entry)
.context("while serializing and writing to stdout")?;
}
serde_json::to_writer(&mut output, &entry)
.context("while serializing and writing to stdout")?;
}
}
for entry in context.drain() {
serde_json::to_writer(&mut output, &entry)
.context("while serializing and writing to stdout")?;
}
output.flush().context("flushing stdout")?;
Ok(())
}
/// Keeps only the last `size` element in memory.
/// It's basically a sliding window.
pub struct EntryContext {
size: usize,
queue: VecDeque<Entry>,
}
impl EntryContext {
pub fn new(size: usize) -> EntryContext {
EntryContext { size, queue: VecDeque::with_capacity(size) }
}
pub fn is_full(&self) -> bool {
self.size >= self.queue.len()
}
pub fn push(&mut self, entry: Entry) {
if self.queue.len() == self.size {
self.queue.pop_front();
}
self.queue.push_back(entry);
}
pub fn drain(&mut self) -> Drain<Entry> {
self.queue.drain(..)
}
}

View File

@ -38,6 +38,20 @@ pub enum Entry {
Event(Event),
}
impl Entry {
pub fn memory(&self) -> Option<MemoryStats> {
match self {
Entry::NewCallsite(_)
| Entry::NewThread(_)
| Entry::NewSpan(_)
| Entry::SpanClose(_) => None,
Entry::SpanEnter(event) => event.memory,
Entry::SpanExit(event) => event.memory,
Entry::Event(event) => event.memory,
}
}
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct SpanId(u64);

View File

@ -189,7 +189,7 @@ fn print_duration(duration: std::time::Duration) -> String {
/// Format only the allocated bytes, deallocated bytes and reallocated bytes in GiB, MiB, KiB, Bytes.
fn print_memory(MemoryStats { resident }: MemoryStats) -> String {
use byte_unit::Byte;
let rss_bytes = Byte::from_bytes(resident).get_appropriate_unit(true);
use byte_unit::{Byte, UnitType};
let rss_bytes = Byte::from_u64(resident).get_appropriate_unit(UnitType::Binary);
format!("RSS {rss_bytes:.2}")
}