diff --git a/bitlogger_async/BitLoggerAsync_test.mbt b/bitlogger_async/BitLoggerAsync_test.mbt index 5e48dc3..c4c293d 100644 --- a/bitlogger_async/BitLoggerAsync_test.mbt +++ b/bitlogger_async/BitLoggerAsync_test.mbt @@ -9,6 +9,7 @@ async test "shutdown drains pending records" { max_pending=4, overflow=AsyncOverflowPolicy::Blocking, max_batch=4, + linger_ms=10, flush=AsyncFlushPolicy::Batch, ), min_level=@bitlogger.Level::Info, @@ -91,12 +92,14 @@ test "async logger config stringify roundtrips stable fields" { max_pending=8, overflow=AsyncOverflowPolicy::DropOldest, max_batch=3, + linger_ms=25, flush=AsyncFlushPolicy::Batch, ), ) let config = parse_async_logger_config_text(text) inspect(config.max_pending, content="8") inspect(config.max_batch, content="3") + inspect(config.linger_ms, content="25") inspect(match config.overflow { AsyncOverflowPolicy::Blocking => "Blocking" AsyncOverflowPolicy::DropOldest => "DropOldest" @@ -122,6 +125,7 @@ test "async build config stringify roundtrips nested logger and async fields" { max_pending=2, overflow=AsyncOverflowPolicy::DropNewest, max_batch=5, + linger_ms=40, flush=AsyncFlushPolicy::Shutdown, ), ), @@ -132,6 +136,7 @@ test "async build config stringify roundtrips nested logger and async fields" { inspect(config.logger.timestamp, content="true") inspect(config.async_config.max_pending, content="2") inspect(config.async_config.max_batch, content="5") + inspect(config.async_config.linger_ms, content="40") inspect(match config.async_config.overflow { AsyncOverflowPolicy::Blocking => "Blocking" AsyncOverflowPolicy::DropOldest => "DropOldest" diff --git a/bitlogger_async/async_logger_native.mbt b/bitlogger_async/async_logger_native.mbt index abc5e8d..10cdb9e 100644 --- a/bitlogger_async/async_logger_native.mbt +++ b/bitlogger_async/async_logger_native.mbt @@ -18,6 +18,7 @@ pub struct AsyncLoggerConfig { max_pending : Int overflow : AsyncOverflowPolicy max_batch : Int + linger_ms : Int flush : AsyncFlushPolicy } @@ -25,12 +26,14 @@ pub fn AsyncLoggerConfig::new( max_pending~ : Int = 0, overflow~ : AsyncOverflowPolicy = AsyncOverflowPolicy::Blocking, max_batch~ : Int = 1, + linger_ms~ : Int = 0, flush~ : AsyncFlushPolicy = AsyncFlushPolicy::Never, ) -> AsyncLoggerConfig { { max_pending, overflow, max_batch: if max_batch <= 1 { 1 } else { max_batch }, + linger_ms: if linger_ms < 0 { 0 } else { linger_ms }, flush, } } @@ -82,6 +85,13 @@ pub fn parse_async_logger_config_text(input : String) -> AsyncLoggerConfig raise } None => 1 } + let linger_ms = match obj.get("linger_ms") { + Some(value) => match value.as_number() { + Some(number) => number.to_int() + None => raise Failure::Failure("Expected number at async_config.linger_ms") + } + None => 0 + } let flush = match obj.get("flush") { Some(value) => match value.as_string() { Some(text) => parse_async_flush(text) @@ -93,6 +103,7 @@ pub fn parse_async_logger_config_text(input : String) -> AsyncLoggerConfig raise max_pending=max_pending, overflow=overflow, max_batch=max_batch, + linger_ms=linger_ms, flush=flush, ) } @@ -101,6 +112,7 @@ pub fn async_logger_config_to_json(config : AsyncLoggerConfig) -> @json_parser.J @json_parser.JsonValue::Object({ "max_pending": @json_parser.JsonValue::Number(config.max_pending.to_double()), "max_batch": @json_parser.JsonValue::Number(config.max_batch.to_double()), + "linger_ms": @json_parser.JsonValue::Number(config.linger_ms.to_double()), "overflow": @json_parser.JsonValue::String(match config.overflow { AsyncOverflowPolicy::Blocking => "Blocking" AsyncOverflowPolicy::DropOldest => "DropOldest" @@ -179,6 +191,7 @@ pub struct AsyncLogger[S] { timestamp : Bool overflow : AsyncOverflowPolicy max_batch : Int + linger_ms : Int flush_policy : AsyncFlushPolicy sink : S flush_sink : (S) -> Int @@ -207,6 +220,7 @@ pub fn[S] async_logger( timestamp: false, overflow: config.overflow, max_batch: config.max_batch, + linger_ms: config.linger_ms, flush_policy: config.flush, sink, flush_sink: flush, @@ -484,7 +498,25 @@ async fn[S : @bitlogger.Sink] run_worker(logger : AsyncLogger[S]) -> Unit { } continue drained + 1 } - None => break + None => { + if logger.linger_ms <= 0 { + break + } + let waited = @async.with_timeout_opt(logger.linger_ms, () => logger.queue.get()) catch { + err if err is AsyncLoggerClosed => None + err => raise err + } + match waited { + Some(next) => { + logger.sink.write(next) + if logger.pending_count.val > 0 { + logger.pending_count.val -= 1 + } + continue drained + 1 + } + None => break + } + } } } match logger.flush_policy { diff --git a/bitlogger_async/async_logger_stub.mbt b/bitlogger_async/async_logger_stub.mbt index 351fedf..7af964a 100644 --- a/bitlogger_async/async_logger_stub.mbt +++ b/bitlogger_async/async_logger_stub.mbt @@ -18,6 +18,7 @@ pub struct AsyncLoggerConfig { max_pending : Int overflow : AsyncOverflowPolicy max_batch : Int + linger_ms : Int flush : AsyncFlushPolicy } @@ -25,12 +26,14 @@ pub fn AsyncLoggerConfig::new( max_pending~ : Int = 0, overflow~ : AsyncOverflowPolicy = AsyncOverflowPolicy::Blocking, max_batch~ : Int = 1, + linger_ms~ : Int = 0, flush~ : AsyncFlushPolicy = AsyncFlushPolicy::Never, ) -> AsyncLoggerConfig { { max_pending, overflow, max_batch: if max_batch <= 1 { 1 } else { max_batch }, + linger_ms: if linger_ms < 0 { 0 } else { linger_ms }, flush, } }