From 87e5491801c22aaa9a414e09c0d5fbbe5700339c Mon Sep 17 00:00:00 2001 From: Nanaloveyuki Date: Fri, 8 May 2026 18:52:50 +0800 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20Add=20async=20batching=20and=20flus?= =?UTF-8?q?h=20policies?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bitlogger_async/BitLoggerAsync_test.mbt | 29 ++++++++ bitlogger_async/async_logger_native.mbt | 90 ++++++++++++++++++++++++- bitlogger_async/async_logger_stub.mbt | 24 ++++++- 3 files changed, 140 insertions(+), 3 deletions(-) diff --git a/bitlogger_async/BitLoggerAsync_test.mbt b/bitlogger_async/BitLoggerAsync_test.mbt index fb1fd4f..5e48dc3 100644 --- a/bitlogger_async/BitLoggerAsync_test.mbt +++ b/bitlogger_async/BitLoggerAsync_test.mbt @@ -1,5 +1,6 @@ async test "shutdown drains pending records" { let written : Ref[Array[String]] = Ref::new([]) + let flushes : Ref[Int] = Ref::new(0) let logger = async_logger( @bitlogger.callback_sink(fn(rec) { written.val.push(rec.message) @@ -7,9 +8,15 @@ async test "shutdown drains pending records" { config=AsyncLoggerConfig::new( max_pending=4, overflow=AsyncOverflowPolicy::Blocking, + max_batch=4, + flush=AsyncFlushPolicy::Batch, ), min_level=@bitlogger.Level::Info, target="async.test", + flush=fn(_) { + flushes.val += 1 + 1 + }, ) @async.with_task_group(group => { @@ -23,9 +30,15 @@ async test "shutdown drains pending records" { inspect(logger.is_running(), content="false") inspect(logger.has_failed(), content="false") inspect(logger.pending_count(), content="0") + inspect(match logger.flush_policy() { + AsyncFlushPolicy::Never => "Never" + AsyncFlushPolicy::Batch => "Batch" + AsyncFlushPolicy::Shutdown => "Shutdown" + }, content="Batch") inspect(written.val.length(), content="2") inspect(written.val[0], content="one") inspect(written.val[1], content="two") + inspect(flushes.val, content="1") } async test "close clear counts abandoned records as dropped" { @@ -77,15 +90,23 @@ test "async logger config stringify roundtrips stable fields" { AsyncLoggerConfig::new( max_pending=8, overflow=AsyncOverflowPolicy::DropOldest, + max_batch=3, + flush=AsyncFlushPolicy::Batch, ), ) let config = parse_async_logger_config_text(text) inspect(config.max_pending, content="8") + inspect(config.max_batch, content="3") inspect(match config.overflow { AsyncOverflowPolicy::Blocking => "Blocking" AsyncOverflowPolicy::DropOldest => "DropOldest" AsyncOverflowPolicy::DropNewest => "DropNewest" }, content="DropOldest") + inspect(match config.flush { + AsyncFlushPolicy::Never => "Never" + AsyncFlushPolicy::Batch => "Batch" + AsyncFlushPolicy::Shutdown => "Shutdown" + }, content="Batch") } test "async build config stringify roundtrips nested logger and async fields" { @@ -100,6 +121,8 @@ test "async build config stringify roundtrips nested logger and async fields" { async_config=AsyncLoggerConfig::new( max_pending=2, overflow=AsyncOverflowPolicy::DropNewest, + max_batch=5, + flush=AsyncFlushPolicy::Shutdown, ), ), ) @@ -108,9 +131,15 @@ test "async build config stringify roundtrips nested logger and async fields" { inspect(config.logger.target, content="async.roundtrip") inspect(config.logger.timestamp, content="true") inspect(config.async_config.max_pending, content="2") + inspect(config.async_config.max_batch, content="5") inspect(match config.async_config.overflow { AsyncOverflowPolicy::Blocking => "Blocking" AsyncOverflowPolicy::DropOldest => "DropOldest" AsyncOverflowPolicy::DropNewest => "DropNewest" }, content="DropNewest") + inspect(match config.async_config.flush { + AsyncFlushPolicy::Never => "Never" + AsyncFlushPolicy::Batch => "Batch" + AsyncFlushPolicy::Shutdown => "Shutdown" + }, content="Shutdown") } diff --git a/bitlogger_async/async_logger_native.mbt b/bitlogger_async/async_logger_native.mbt index eb2fd2b..abc5e8d 100644 --- a/bitlogger_async/async_logger_native.mbt +++ b/bitlogger_async/async_logger_native.mbt @@ -8,16 +8,31 @@ pub(all) enum AsyncOverflowPolicy { DropNewest } +pub(all) enum AsyncFlushPolicy { + Never + Batch + Shutdown +} + pub struct AsyncLoggerConfig { max_pending : Int overflow : AsyncOverflowPolicy + max_batch : Int + flush : AsyncFlushPolicy } pub fn AsyncLoggerConfig::new( max_pending~ : Int = 0, overflow~ : AsyncOverflowPolicy = AsyncOverflowPolicy::Blocking, + max_batch~ : Int = 1, + flush~ : AsyncFlushPolicy = AsyncFlushPolicy::Never, ) -> AsyncLoggerConfig { - { max_pending, overflow } + { + max_pending, + overflow, + max_batch: if max_batch <= 1 { 1 } else { max_batch }, + flush, + } } fn parse_async_overflow(name : String) -> AsyncOverflowPolicy raise { @@ -30,6 +45,16 @@ fn parse_async_overflow(name : String) -> AsyncOverflowPolicy raise { } } +fn parse_async_flush(name : String) -> AsyncFlushPolicy raise { + match name.to_upper() { + "NEVER" => AsyncFlushPolicy::Never + "NONE" => AsyncFlushPolicy::Never + "BATCH" => AsyncFlushPolicy::Batch + "SHUTDOWN" => AsyncFlushPolicy::Shutdown + _ => raise Failure::Failure("Unsupported async flush policy: " + name) + } +} + pub fn parse_async_logger_config_text(input : String) -> AsyncLoggerConfig raise { let root = @json_parser.parse(input) let obj = match root.as_object() { @@ -50,17 +75,42 @@ pub fn parse_async_logger_config_text(input : String) -> AsyncLoggerConfig raise } None => AsyncOverflowPolicy::Blocking } - AsyncLoggerConfig::new(max_pending=max_pending, overflow=overflow) + let max_batch = match obj.get("max_batch") { + Some(value) => match value.as_number() { + Some(number) => number.to_int() + None => raise Failure::Failure("Expected number at async_config.max_batch") + } + None => 1 + } + let flush = match obj.get("flush") { + Some(value) => match value.as_string() { + Some(text) => parse_async_flush(text) + None => raise Failure::Failure("Expected string at async_config.flush") + } + None => AsyncFlushPolicy::Never + } + AsyncLoggerConfig::new( + max_pending=max_pending, + overflow=overflow, + max_batch=max_batch, + flush=flush, + ) } pub fn async_logger_config_to_json(config : AsyncLoggerConfig) -> @json_parser.JsonValue { @json_parser.JsonValue::Object({ "max_pending": @json_parser.JsonValue::Number(config.max_pending.to_double()), + "max_batch": @json_parser.JsonValue::Number(config.max_batch.to_double()), "overflow": @json_parser.JsonValue::String(match config.overflow { AsyncOverflowPolicy::Blocking => "Blocking" AsyncOverflowPolicy::DropOldest => "DropOldest" AsyncOverflowPolicy::DropNewest => "DropNewest" }), + "flush": @json_parser.JsonValue::String(match config.flush { + AsyncFlushPolicy::Never => "Never" + AsyncFlushPolicy::Batch => "Batch" + AsyncFlushPolicy::Shutdown => "Shutdown" + }), }) } @@ -128,7 +178,10 @@ pub struct AsyncLogger[S] { target : String timestamp : Bool overflow : AsyncOverflowPolicy + max_batch : Int + flush_policy : AsyncFlushPolicy sink : S + flush_sink : (S) -> Int context_fields : Array[@bitlogger.Field] filter : (@bitlogger.Record) -> Bool patch : @bitlogger.RecordPatch @@ -146,13 +199,17 @@ pub fn[S] async_logger( config~ : AsyncLoggerConfig = AsyncLoggerConfig::new(), min_level~ : @bitlogger.Level = @bitlogger.Level::Info, target~ : String = "", + flush~ : (S) -> Int = fn(_) { 0 }, ) -> AsyncLogger[S] { { min_level, target, timestamp: false, overflow: config.overflow, + max_batch: config.max_batch, + flush_policy: config.flush, sink, + flush_sink: flush, context_fields: [], filter: fn(_) { true }, patch: @bitlogger.identity_patch(), @@ -363,6 +420,10 @@ pub fn[S] AsyncLogger::last_error(self : AsyncLogger[S]) -> String { self.last_error.val } +pub fn[S] AsyncLogger::flush_policy(self : AsyncLogger[S]) -> AsyncFlushPolicy { + self.flush_policy +} + pub fn[S] AsyncLogger::close(self : AsyncLogger[S], clear? : Bool = false) -> Unit { self.is_closed.val = true if clear { @@ -410,6 +471,30 @@ async fn[S : @bitlogger.Sink] run_worker(logger : AsyncLogger[S]) -> Unit { if logger.pending_count.val > 0 { logger.pending_count.val -= 1 } + for drained = 1; drained < logger.max_batch; { + let next = logger.queue.try_get() catch { + err if err is AsyncLoggerClosed => None + err => raise err + } + match next { + Some(next) => { + logger.sink.write(next) + if logger.pending_count.val > 0 { + logger.pending_count.val -= 1 + } + continue drained + 1 + } + None => break + } + } + match logger.flush_policy { + AsyncFlushPolicy::Batch => ignore((logger.flush_sink)(logger.sink)) + _ => () + } + } + match logger.flush_policy { + AsyncFlushPolicy::Shutdown => ignore((logger.flush_sink)(logger.sink)) + _ => () } } @@ -437,6 +522,7 @@ pub fn build_async_logger( config=config.async_config, min_level=logger.min_level, target=logger.target, + flush=fn(sink) { sink.flush() }, ).with_timestamp(enabled=logger.timestamp) } diff --git a/bitlogger_async/async_logger_stub.mbt b/bitlogger_async/async_logger_stub.mbt index b5c605e..351fedf 100644 --- a/bitlogger_async/async_logger_stub.mbt +++ b/bitlogger_async/async_logger_stub.mbt @@ -8,16 +8,31 @@ pub(all) enum AsyncOverflowPolicy { DropNewest } +pub(all) enum AsyncFlushPolicy { + Never + Batch + Shutdown +} + pub struct AsyncLoggerConfig { max_pending : Int overflow : AsyncOverflowPolicy + max_batch : Int + flush : AsyncFlushPolicy } pub fn AsyncLoggerConfig::new( max_pending~ : Int = 0, overflow~ : AsyncOverflowPolicy = AsyncOverflowPolicy::Blocking, + max_batch~ : Int = 1, + flush~ : AsyncFlushPolicy = AsyncFlushPolicy::Never, ) -> AsyncLoggerConfig { - { max_pending, overflow } + { + max_pending, + overflow, + max_batch: if max_batch <= 1 { 1 } else { max_batch }, + flush, + } } pub struct AsyncLogger[S] {} @@ -76,11 +91,13 @@ pub fn async_logger[S : @bitlogger.Sink]( config~ : AsyncLoggerConfig = AsyncLoggerConfig::new(), min_level~ : @bitlogger.Level = @bitlogger.Level::Info, target~ : String = "", + flush~ : (S) -> Int = fn(_) { 0 }, ) -> AsyncLogger[S] { ignore(sink) ignore(config) ignore(min_level) ignore(target) + ignore(flush) abort("bitlogger_async currently only supports native/llvm backends") } @@ -174,6 +191,11 @@ pub fn[S] AsyncLogger::last_error(self : AsyncLogger[S]) -> String { abort("bitlogger_async currently only supports native/llvm backends") } +pub fn[S] AsyncLogger::flush_policy(self : AsyncLogger[S]) -> AsyncFlushPolicy { + ignore(self) + abort("bitlogger_async currently only supports native/llvm backends") +} + pub fn[S] AsyncLogger::close(self : AsyncLogger[S], clear? : Bool = false) -> Unit { ignore(self) ignore(clear)