Add async batching and flush policies

This commit is contained in:
Nanaloveyuki
2026-05-08 18:52:50 +08:00
parent 95ccb72093
commit 87e5491801
3 changed files with 140 additions and 3 deletions
+29
View File
@@ -1,5 +1,6 @@
async test "shutdown drains pending records" { async test "shutdown drains pending records" {
let written : Ref[Array[String]] = Ref::new([]) let written : Ref[Array[String]] = Ref::new([])
let flushes : Ref[Int] = Ref::new(0)
let logger = async_logger( let logger = async_logger(
@bitlogger.callback_sink(fn(rec) { @bitlogger.callback_sink(fn(rec) {
written.val.push(rec.message) written.val.push(rec.message)
@@ -7,9 +8,15 @@ async test "shutdown drains pending records" {
config=AsyncLoggerConfig::new( config=AsyncLoggerConfig::new(
max_pending=4, max_pending=4,
overflow=AsyncOverflowPolicy::Blocking, overflow=AsyncOverflowPolicy::Blocking,
max_batch=4,
flush=AsyncFlushPolicy::Batch,
), ),
min_level=@bitlogger.Level::Info, min_level=@bitlogger.Level::Info,
target="async.test", target="async.test",
flush=fn(_) {
flushes.val += 1
1
},
) )
@async.with_task_group(group => { @async.with_task_group(group => {
@@ -23,9 +30,15 @@ async test "shutdown drains pending records" {
inspect(logger.is_running(), content="false") inspect(logger.is_running(), content="false")
inspect(logger.has_failed(), content="false") inspect(logger.has_failed(), content="false")
inspect(logger.pending_count(), content="0") inspect(logger.pending_count(), content="0")
inspect(match logger.flush_policy() {
AsyncFlushPolicy::Never => "Never"
AsyncFlushPolicy::Batch => "Batch"
AsyncFlushPolicy::Shutdown => "Shutdown"
}, content="Batch")
inspect(written.val.length(), content="2") inspect(written.val.length(), content="2")
inspect(written.val[0], content="one") inspect(written.val[0], content="one")
inspect(written.val[1], content="two") inspect(written.val[1], content="two")
inspect(flushes.val, content="1")
} }
async test "close clear counts abandoned records as dropped" { async test "close clear counts abandoned records as dropped" {
@@ -77,15 +90,23 @@ test "async logger config stringify roundtrips stable fields" {
AsyncLoggerConfig::new( AsyncLoggerConfig::new(
max_pending=8, max_pending=8,
overflow=AsyncOverflowPolicy::DropOldest, overflow=AsyncOverflowPolicy::DropOldest,
max_batch=3,
flush=AsyncFlushPolicy::Batch,
), ),
) )
let config = parse_async_logger_config_text(text) let config = parse_async_logger_config_text(text)
inspect(config.max_pending, content="8") inspect(config.max_pending, content="8")
inspect(config.max_batch, content="3")
inspect(match config.overflow { inspect(match config.overflow {
AsyncOverflowPolicy::Blocking => "Blocking" AsyncOverflowPolicy::Blocking => "Blocking"
AsyncOverflowPolicy::DropOldest => "DropOldest" AsyncOverflowPolicy::DropOldest => "DropOldest"
AsyncOverflowPolicy::DropNewest => "DropNewest" AsyncOverflowPolicy::DropNewest => "DropNewest"
}, content="DropOldest") }, content="DropOldest")
inspect(match config.flush {
AsyncFlushPolicy::Never => "Never"
AsyncFlushPolicy::Batch => "Batch"
AsyncFlushPolicy::Shutdown => "Shutdown"
}, content="Batch")
} }
test "async build config stringify roundtrips nested logger and async fields" { test "async build config stringify roundtrips nested logger and async fields" {
@@ -100,6 +121,8 @@ test "async build config stringify roundtrips nested logger and async fields" {
async_config=AsyncLoggerConfig::new( async_config=AsyncLoggerConfig::new(
max_pending=2, max_pending=2,
overflow=AsyncOverflowPolicy::DropNewest, overflow=AsyncOverflowPolicy::DropNewest,
max_batch=5,
flush=AsyncFlushPolicy::Shutdown,
), ),
), ),
) )
@@ -108,9 +131,15 @@ test "async build config stringify roundtrips nested logger and async fields" {
inspect(config.logger.target, content="async.roundtrip") inspect(config.logger.target, content="async.roundtrip")
inspect(config.logger.timestamp, content="true") inspect(config.logger.timestamp, content="true")
inspect(config.async_config.max_pending, content="2") inspect(config.async_config.max_pending, content="2")
inspect(config.async_config.max_batch, content="5")
inspect(match config.async_config.overflow { inspect(match config.async_config.overflow {
AsyncOverflowPolicy::Blocking => "Blocking" AsyncOverflowPolicy::Blocking => "Blocking"
AsyncOverflowPolicy::DropOldest => "DropOldest" AsyncOverflowPolicy::DropOldest => "DropOldest"
AsyncOverflowPolicy::DropNewest => "DropNewest" AsyncOverflowPolicy::DropNewest => "DropNewest"
}, content="DropNewest") }, content="DropNewest")
inspect(match config.async_config.flush {
AsyncFlushPolicy::Never => "Never"
AsyncFlushPolicy::Batch => "Batch"
AsyncFlushPolicy::Shutdown => "Shutdown"
}, content="Shutdown")
} }
+88 -2
View File
@@ -8,16 +8,31 @@ pub(all) enum AsyncOverflowPolicy {
DropNewest DropNewest
} }
pub(all) enum AsyncFlushPolicy {
Never
Batch
Shutdown
}
pub struct AsyncLoggerConfig { pub struct AsyncLoggerConfig {
max_pending : Int max_pending : Int
overflow : AsyncOverflowPolicy overflow : AsyncOverflowPolicy
max_batch : Int
flush : AsyncFlushPolicy
} }
pub fn AsyncLoggerConfig::new( pub fn AsyncLoggerConfig::new(
max_pending~ : Int = 0, max_pending~ : Int = 0,
overflow~ : AsyncOverflowPolicy = AsyncOverflowPolicy::Blocking, overflow~ : AsyncOverflowPolicy = AsyncOverflowPolicy::Blocking,
max_batch~ : Int = 1,
flush~ : AsyncFlushPolicy = AsyncFlushPolicy::Never,
) -> AsyncLoggerConfig { ) -> AsyncLoggerConfig {
{ max_pending, overflow } {
max_pending,
overflow,
max_batch: if max_batch <= 1 { 1 } else { max_batch },
flush,
}
} }
fn parse_async_overflow(name : String) -> AsyncOverflowPolicy raise { fn parse_async_overflow(name : String) -> AsyncOverflowPolicy raise {
@@ -30,6 +45,16 @@ fn parse_async_overflow(name : String) -> AsyncOverflowPolicy raise {
} }
} }
fn parse_async_flush(name : String) -> AsyncFlushPolicy raise {
match name.to_upper() {
"NEVER" => AsyncFlushPolicy::Never
"NONE" => AsyncFlushPolicy::Never
"BATCH" => AsyncFlushPolicy::Batch
"SHUTDOWN" => AsyncFlushPolicy::Shutdown
_ => raise Failure::Failure("Unsupported async flush policy: " + name)
}
}
pub fn parse_async_logger_config_text(input : String) -> AsyncLoggerConfig raise { pub fn parse_async_logger_config_text(input : String) -> AsyncLoggerConfig raise {
let root = @json_parser.parse(input) let root = @json_parser.parse(input)
let obj = match root.as_object() { let obj = match root.as_object() {
@@ -50,17 +75,42 @@ pub fn parse_async_logger_config_text(input : String) -> AsyncLoggerConfig raise
} }
None => AsyncOverflowPolicy::Blocking None => AsyncOverflowPolicy::Blocking
} }
AsyncLoggerConfig::new(max_pending=max_pending, overflow=overflow) let max_batch = match obj.get("max_batch") {
Some(value) => match value.as_number() {
Some(number) => number.to_int()
None => raise Failure::Failure("Expected number at async_config.max_batch")
}
None => 1
}
let flush = match obj.get("flush") {
Some(value) => match value.as_string() {
Some(text) => parse_async_flush(text)
None => raise Failure::Failure("Expected string at async_config.flush")
}
None => AsyncFlushPolicy::Never
}
AsyncLoggerConfig::new(
max_pending=max_pending,
overflow=overflow,
max_batch=max_batch,
flush=flush,
)
} }
pub fn async_logger_config_to_json(config : AsyncLoggerConfig) -> @json_parser.JsonValue { pub fn async_logger_config_to_json(config : AsyncLoggerConfig) -> @json_parser.JsonValue {
@json_parser.JsonValue::Object({ @json_parser.JsonValue::Object({
"max_pending": @json_parser.JsonValue::Number(config.max_pending.to_double()), "max_pending": @json_parser.JsonValue::Number(config.max_pending.to_double()),
"max_batch": @json_parser.JsonValue::Number(config.max_batch.to_double()),
"overflow": @json_parser.JsonValue::String(match config.overflow { "overflow": @json_parser.JsonValue::String(match config.overflow {
AsyncOverflowPolicy::Blocking => "Blocking" AsyncOverflowPolicy::Blocking => "Blocking"
AsyncOverflowPolicy::DropOldest => "DropOldest" AsyncOverflowPolicy::DropOldest => "DropOldest"
AsyncOverflowPolicy::DropNewest => "DropNewest" AsyncOverflowPolicy::DropNewest => "DropNewest"
}), }),
"flush": @json_parser.JsonValue::String(match config.flush {
AsyncFlushPolicy::Never => "Never"
AsyncFlushPolicy::Batch => "Batch"
AsyncFlushPolicy::Shutdown => "Shutdown"
}),
}) })
} }
@@ -128,7 +178,10 @@ pub struct AsyncLogger[S] {
target : String target : String
timestamp : Bool timestamp : Bool
overflow : AsyncOverflowPolicy overflow : AsyncOverflowPolicy
max_batch : Int
flush_policy : AsyncFlushPolicy
sink : S sink : S
flush_sink : (S) -> Int
context_fields : Array[@bitlogger.Field] context_fields : Array[@bitlogger.Field]
filter : (@bitlogger.Record) -> Bool filter : (@bitlogger.Record) -> Bool
patch : @bitlogger.RecordPatch patch : @bitlogger.RecordPatch
@@ -146,13 +199,17 @@ pub fn[S] async_logger(
config~ : AsyncLoggerConfig = AsyncLoggerConfig::new(), config~ : AsyncLoggerConfig = AsyncLoggerConfig::new(),
min_level~ : @bitlogger.Level = @bitlogger.Level::Info, min_level~ : @bitlogger.Level = @bitlogger.Level::Info,
target~ : String = "", target~ : String = "",
flush~ : (S) -> Int = fn(_) { 0 },
) -> AsyncLogger[S] { ) -> AsyncLogger[S] {
{ {
min_level, min_level,
target, target,
timestamp: false, timestamp: false,
overflow: config.overflow, overflow: config.overflow,
max_batch: config.max_batch,
flush_policy: config.flush,
sink, sink,
flush_sink: flush,
context_fields: [], context_fields: [],
filter: fn(_) { true }, filter: fn(_) { true },
patch: @bitlogger.identity_patch(), patch: @bitlogger.identity_patch(),
@@ -363,6 +420,10 @@ pub fn[S] AsyncLogger::last_error(self : AsyncLogger[S]) -> String {
self.last_error.val self.last_error.val
} }
pub fn[S] AsyncLogger::flush_policy(self : AsyncLogger[S]) -> AsyncFlushPolicy {
self.flush_policy
}
pub fn[S] AsyncLogger::close(self : AsyncLogger[S], clear? : Bool = false) -> Unit { pub fn[S] AsyncLogger::close(self : AsyncLogger[S], clear? : Bool = false) -> Unit {
self.is_closed.val = true self.is_closed.val = true
if clear { if clear {
@@ -410,6 +471,30 @@ async fn[S : @bitlogger.Sink] run_worker(logger : AsyncLogger[S]) -> Unit {
if logger.pending_count.val > 0 { if logger.pending_count.val > 0 {
logger.pending_count.val -= 1 logger.pending_count.val -= 1
} }
for drained = 1; drained < logger.max_batch; {
let next = logger.queue.try_get() catch {
err if err is AsyncLoggerClosed => None
err => raise err
}
match next {
Some(next) => {
logger.sink.write(next)
if logger.pending_count.val > 0 {
logger.pending_count.val -= 1
}
continue drained + 1
}
None => break
}
}
match logger.flush_policy {
AsyncFlushPolicy::Batch => ignore((logger.flush_sink)(logger.sink))
_ => ()
}
}
match logger.flush_policy {
AsyncFlushPolicy::Shutdown => ignore((logger.flush_sink)(logger.sink))
_ => ()
} }
} }
@@ -437,6 +522,7 @@ pub fn build_async_logger(
config=config.async_config, config=config.async_config,
min_level=logger.min_level, min_level=logger.min_level,
target=logger.target, target=logger.target,
flush=fn(sink) { sink.flush() },
).with_timestamp(enabled=logger.timestamp) ).with_timestamp(enabled=logger.timestamp)
} }
+23 -1
View File
@@ -8,16 +8,31 @@ pub(all) enum AsyncOverflowPolicy {
DropNewest DropNewest
} }
pub(all) enum AsyncFlushPolicy {
Never
Batch
Shutdown
}
pub struct AsyncLoggerConfig { pub struct AsyncLoggerConfig {
max_pending : Int max_pending : Int
overflow : AsyncOverflowPolicy overflow : AsyncOverflowPolicy
max_batch : Int
flush : AsyncFlushPolicy
} }
pub fn AsyncLoggerConfig::new( pub fn AsyncLoggerConfig::new(
max_pending~ : Int = 0, max_pending~ : Int = 0,
overflow~ : AsyncOverflowPolicy = AsyncOverflowPolicy::Blocking, overflow~ : AsyncOverflowPolicy = AsyncOverflowPolicy::Blocking,
max_batch~ : Int = 1,
flush~ : AsyncFlushPolicy = AsyncFlushPolicy::Never,
) -> AsyncLoggerConfig { ) -> AsyncLoggerConfig {
{ max_pending, overflow } {
max_pending,
overflow,
max_batch: if max_batch <= 1 { 1 } else { max_batch },
flush,
}
} }
pub struct AsyncLogger[S] {} pub struct AsyncLogger[S] {}
@@ -76,11 +91,13 @@ pub fn async_logger[S : @bitlogger.Sink](
config~ : AsyncLoggerConfig = AsyncLoggerConfig::new(), config~ : AsyncLoggerConfig = AsyncLoggerConfig::new(),
min_level~ : @bitlogger.Level = @bitlogger.Level::Info, min_level~ : @bitlogger.Level = @bitlogger.Level::Info,
target~ : String = "", target~ : String = "",
flush~ : (S) -> Int = fn(_) { 0 },
) -> AsyncLogger[S] { ) -> AsyncLogger[S] {
ignore(sink) ignore(sink)
ignore(config) ignore(config)
ignore(min_level) ignore(min_level)
ignore(target) ignore(target)
ignore(flush)
abort("bitlogger_async currently only supports native/llvm backends") abort("bitlogger_async currently only supports native/llvm backends")
} }
@@ -174,6 +191,11 @@ pub fn[S] AsyncLogger::last_error(self : AsyncLogger[S]) -> String {
abort("bitlogger_async currently only supports native/llvm backends") abort("bitlogger_async currently only supports native/llvm backends")
} }
pub fn[S] AsyncLogger::flush_policy(self : AsyncLogger[S]) -> AsyncFlushPolicy {
ignore(self)
abort("bitlogger_async currently only supports native/llvm backends")
}
pub fn[S] AsyncLogger::close(self : AsyncLogger[S], clear? : Bool = false) -> Unit { pub fn[S] AsyncLogger::close(self : AsyncLogger[S], clear? : Bool = false) -> Unit {
ignore(self) ignore(self)
ignore(clear) ignore(clear)