mirror of
https://github.com/Nanaloveyuki/BitLogger.git
synced 2026-05-30 15:42:25 +00:00
✨ Add async linger-based batch draining
This commit is contained in:
@@ -9,6 +9,7 @@ async test "shutdown drains pending records" {
|
|||||||
max_pending=4,
|
max_pending=4,
|
||||||
overflow=AsyncOverflowPolicy::Blocking,
|
overflow=AsyncOverflowPolicy::Blocking,
|
||||||
max_batch=4,
|
max_batch=4,
|
||||||
|
linger_ms=10,
|
||||||
flush=AsyncFlushPolicy::Batch,
|
flush=AsyncFlushPolicy::Batch,
|
||||||
),
|
),
|
||||||
min_level=@bitlogger.Level::Info,
|
min_level=@bitlogger.Level::Info,
|
||||||
@@ -91,12 +92,14 @@ test "async logger config stringify roundtrips stable fields" {
|
|||||||
max_pending=8,
|
max_pending=8,
|
||||||
overflow=AsyncOverflowPolicy::DropOldest,
|
overflow=AsyncOverflowPolicy::DropOldest,
|
||||||
max_batch=3,
|
max_batch=3,
|
||||||
|
linger_ms=25,
|
||||||
flush=AsyncFlushPolicy::Batch,
|
flush=AsyncFlushPolicy::Batch,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
let config = parse_async_logger_config_text(text)
|
let config = parse_async_logger_config_text(text)
|
||||||
inspect(config.max_pending, content="8")
|
inspect(config.max_pending, content="8")
|
||||||
inspect(config.max_batch, content="3")
|
inspect(config.max_batch, content="3")
|
||||||
|
inspect(config.linger_ms, content="25")
|
||||||
inspect(match config.overflow {
|
inspect(match config.overflow {
|
||||||
AsyncOverflowPolicy::Blocking => "Blocking"
|
AsyncOverflowPolicy::Blocking => "Blocking"
|
||||||
AsyncOverflowPolicy::DropOldest => "DropOldest"
|
AsyncOverflowPolicy::DropOldest => "DropOldest"
|
||||||
@@ -122,6 +125,7 @@ test "async build config stringify roundtrips nested logger and async fields" {
|
|||||||
max_pending=2,
|
max_pending=2,
|
||||||
overflow=AsyncOverflowPolicy::DropNewest,
|
overflow=AsyncOverflowPolicy::DropNewest,
|
||||||
max_batch=5,
|
max_batch=5,
|
||||||
|
linger_ms=40,
|
||||||
flush=AsyncFlushPolicy::Shutdown,
|
flush=AsyncFlushPolicy::Shutdown,
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
@@ -132,6 +136,7 @@ test "async build config stringify roundtrips nested logger and async fields" {
|
|||||||
inspect(config.logger.timestamp, content="true")
|
inspect(config.logger.timestamp, content="true")
|
||||||
inspect(config.async_config.max_pending, content="2")
|
inspect(config.async_config.max_pending, content="2")
|
||||||
inspect(config.async_config.max_batch, content="5")
|
inspect(config.async_config.max_batch, content="5")
|
||||||
|
inspect(config.async_config.linger_ms, content="40")
|
||||||
inspect(match config.async_config.overflow {
|
inspect(match config.async_config.overflow {
|
||||||
AsyncOverflowPolicy::Blocking => "Blocking"
|
AsyncOverflowPolicy::Blocking => "Blocking"
|
||||||
AsyncOverflowPolicy::DropOldest => "DropOldest"
|
AsyncOverflowPolicy::DropOldest => "DropOldest"
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ pub struct AsyncLoggerConfig {
|
|||||||
max_pending : Int
|
max_pending : Int
|
||||||
overflow : AsyncOverflowPolicy
|
overflow : AsyncOverflowPolicy
|
||||||
max_batch : Int
|
max_batch : Int
|
||||||
|
linger_ms : Int
|
||||||
flush : AsyncFlushPolicy
|
flush : AsyncFlushPolicy
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -25,12 +26,14 @@ pub fn AsyncLoggerConfig::new(
|
|||||||
max_pending~ : Int = 0,
|
max_pending~ : Int = 0,
|
||||||
overflow~ : AsyncOverflowPolicy = AsyncOverflowPolicy::Blocking,
|
overflow~ : AsyncOverflowPolicy = AsyncOverflowPolicy::Blocking,
|
||||||
max_batch~ : Int = 1,
|
max_batch~ : Int = 1,
|
||||||
|
linger_ms~ : Int = 0,
|
||||||
flush~ : AsyncFlushPolicy = AsyncFlushPolicy::Never,
|
flush~ : AsyncFlushPolicy = AsyncFlushPolicy::Never,
|
||||||
) -> AsyncLoggerConfig {
|
) -> AsyncLoggerConfig {
|
||||||
{
|
{
|
||||||
max_pending,
|
max_pending,
|
||||||
overflow,
|
overflow,
|
||||||
max_batch: if max_batch <= 1 { 1 } else { max_batch },
|
max_batch: if max_batch <= 1 { 1 } else { max_batch },
|
||||||
|
linger_ms: if linger_ms < 0 { 0 } else { linger_ms },
|
||||||
flush,
|
flush,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -82,6 +85,13 @@ pub fn parse_async_logger_config_text(input : String) -> AsyncLoggerConfig raise
|
|||||||
}
|
}
|
||||||
None => 1
|
None => 1
|
||||||
}
|
}
|
||||||
|
let linger_ms = match obj.get("linger_ms") {
|
||||||
|
Some(value) => match value.as_number() {
|
||||||
|
Some(number) => number.to_int()
|
||||||
|
None => raise Failure::Failure("Expected number at async_config.linger_ms")
|
||||||
|
}
|
||||||
|
None => 0
|
||||||
|
}
|
||||||
let flush = match obj.get("flush") {
|
let flush = match obj.get("flush") {
|
||||||
Some(value) => match value.as_string() {
|
Some(value) => match value.as_string() {
|
||||||
Some(text) => parse_async_flush(text)
|
Some(text) => parse_async_flush(text)
|
||||||
@@ -93,6 +103,7 @@ pub fn parse_async_logger_config_text(input : String) -> AsyncLoggerConfig raise
|
|||||||
max_pending=max_pending,
|
max_pending=max_pending,
|
||||||
overflow=overflow,
|
overflow=overflow,
|
||||||
max_batch=max_batch,
|
max_batch=max_batch,
|
||||||
|
linger_ms=linger_ms,
|
||||||
flush=flush,
|
flush=flush,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
@@ -101,6 +112,7 @@ pub fn async_logger_config_to_json(config : AsyncLoggerConfig) -> @json_parser.J
|
|||||||
@json_parser.JsonValue::Object({
|
@json_parser.JsonValue::Object({
|
||||||
"max_pending": @json_parser.JsonValue::Number(config.max_pending.to_double()),
|
"max_pending": @json_parser.JsonValue::Number(config.max_pending.to_double()),
|
||||||
"max_batch": @json_parser.JsonValue::Number(config.max_batch.to_double()),
|
"max_batch": @json_parser.JsonValue::Number(config.max_batch.to_double()),
|
||||||
|
"linger_ms": @json_parser.JsonValue::Number(config.linger_ms.to_double()),
|
||||||
"overflow": @json_parser.JsonValue::String(match config.overflow {
|
"overflow": @json_parser.JsonValue::String(match config.overflow {
|
||||||
AsyncOverflowPolicy::Blocking => "Blocking"
|
AsyncOverflowPolicy::Blocking => "Blocking"
|
||||||
AsyncOverflowPolicy::DropOldest => "DropOldest"
|
AsyncOverflowPolicy::DropOldest => "DropOldest"
|
||||||
@@ -179,6 +191,7 @@ pub struct AsyncLogger[S] {
|
|||||||
timestamp : Bool
|
timestamp : Bool
|
||||||
overflow : AsyncOverflowPolicy
|
overflow : AsyncOverflowPolicy
|
||||||
max_batch : Int
|
max_batch : Int
|
||||||
|
linger_ms : Int
|
||||||
flush_policy : AsyncFlushPolicy
|
flush_policy : AsyncFlushPolicy
|
||||||
sink : S
|
sink : S
|
||||||
flush_sink : (S) -> Int
|
flush_sink : (S) -> Int
|
||||||
@@ -207,6 +220,7 @@ pub fn[S] async_logger(
|
|||||||
timestamp: false,
|
timestamp: false,
|
||||||
overflow: config.overflow,
|
overflow: config.overflow,
|
||||||
max_batch: config.max_batch,
|
max_batch: config.max_batch,
|
||||||
|
linger_ms: config.linger_ms,
|
||||||
flush_policy: config.flush,
|
flush_policy: config.flush,
|
||||||
sink,
|
sink,
|
||||||
flush_sink: flush,
|
flush_sink: flush,
|
||||||
@@ -477,6 +491,22 @@ async fn[S : @bitlogger.Sink] run_worker(logger : AsyncLogger[S]) -> Unit {
|
|||||||
err => raise err
|
err => raise err
|
||||||
}
|
}
|
||||||
match next {
|
match next {
|
||||||
|
Some(next) => {
|
||||||
|
logger.sink.write(next)
|
||||||
|
if logger.pending_count.val > 0 {
|
||||||
|
logger.pending_count.val -= 1
|
||||||
|
}
|
||||||
|
continue drained + 1
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
if logger.linger_ms <= 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
let waited = @async.with_timeout_opt(logger.linger_ms, () => logger.queue.get()) catch {
|
||||||
|
err if err is AsyncLoggerClosed => None
|
||||||
|
err => raise err
|
||||||
|
}
|
||||||
|
match waited {
|
||||||
Some(next) => {
|
Some(next) => {
|
||||||
logger.sink.write(next)
|
logger.sink.write(next)
|
||||||
if logger.pending_count.val > 0 {
|
if logger.pending_count.val > 0 {
|
||||||
@@ -487,6 +517,8 @@ async fn[S : @bitlogger.Sink] run_worker(logger : AsyncLogger[S]) -> Unit {
|
|||||||
None => break
|
None => break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
match logger.flush_policy {
|
match logger.flush_policy {
|
||||||
AsyncFlushPolicy::Batch => ignore((logger.flush_sink)(logger.sink))
|
AsyncFlushPolicy::Batch => ignore((logger.flush_sink)(logger.sink))
|
||||||
_ => ()
|
_ => ()
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ pub struct AsyncLoggerConfig {
|
|||||||
max_pending : Int
|
max_pending : Int
|
||||||
overflow : AsyncOverflowPolicy
|
overflow : AsyncOverflowPolicy
|
||||||
max_batch : Int
|
max_batch : Int
|
||||||
|
linger_ms : Int
|
||||||
flush : AsyncFlushPolicy
|
flush : AsyncFlushPolicy
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -25,12 +26,14 @@ pub fn AsyncLoggerConfig::new(
|
|||||||
max_pending~ : Int = 0,
|
max_pending~ : Int = 0,
|
||||||
overflow~ : AsyncOverflowPolicy = AsyncOverflowPolicy::Blocking,
|
overflow~ : AsyncOverflowPolicy = AsyncOverflowPolicy::Blocking,
|
||||||
max_batch~ : Int = 1,
|
max_batch~ : Int = 1,
|
||||||
|
linger_ms~ : Int = 0,
|
||||||
flush~ : AsyncFlushPolicy = AsyncFlushPolicy::Never,
|
flush~ : AsyncFlushPolicy = AsyncFlushPolicy::Never,
|
||||||
) -> AsyncLoggerConfig {
|
) -> AsyncLoggerConfig {
|
||||||
{
|
{
|
||||||
max_pending,
|
max_pending,
|
||||||
overflow,
|
overflow,
|
||||||
max_batch: if max_batch <= 1 { 1 } else { max_batch },
|
max_batch: if max_batch <= 1 { 1 } else { max_batch },
|
||||||
|
linger_ms: if linger_ms < 0 { 0 } else { linger_ms },
|
||||||
flush,
|
flush,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user