diff --git a/src-async/async_logger_native.mbt b/src-async/async_logger_native.mbt index 8318cba..7f4cf1f 100644 --- a/src-async/async_logger_native.mbt +++ b/src-async/async_logger_native.mbt @@ -1,685 +1,20 @@ -pub(all) suberror AsyncLoggerClosed { - AsyncLoggerClosed -} - -pub(all) enum AsyncOverflowPolicy { - Blocking - DropOldest - DropNewest -} - -pub(all) enum AsyncFlushPolicy { - Never - Batch - Shutdown -} - -pub enum AsyncRuntimeMode { - NativeWorker - Compatibility -} - pub fn async_runtime_mode() -> AsyncRuntimeMode { AsyncRuntimeMode::NativeWorker } -pub fn async_runtime_mode_label(mode : AsyncRuntimeMode) -> String { - match mode { - AsyncRuntimeMode::NativeWorker => "native_worker" - AsyncRuntimeMode::Compatibility => "compatibility" - } -} - -fn all_async_runtime_modes() -> Array[AsyncRuntimeMode] { - [AsyncRuntimeMode::NativeWorker, AsyncRuntimeMode::Compatibility] -} - pub fn async_runtime_supports_background_worker() -> Bool { ignore(all_async_runtime_modes()) true } -pub struct AsyncRuntimeState { - mode : AsyncRuntimeMode - background_worker : Bool +fn async_runtime_guard_closed_on_log() -> Bool { + false } -pub struct AsyncLoggerState { - runtime : AsyncRuntimeState - pending_count : Int - dropped_count : Int - is_closed : Bool - is_running : Bool - has_failed : Bool - last_error : String - flush_policy : AsyncFlushPolicy +fn async_runtime_shutdown_clears_pending_after_wait_idle() -> Bool { + true } -pub fn async_runtime_state() -> AsyncRuntimeState { - { - mode: async_runtime_mode(), - background_worker: async_runtime_supports_background_worker(), - } -} - -pub fn async_runtime_state_to_json(state : AsyncRuntimeState) -> @json_parser.JsonValue { - @json_parser.JsonValue::Object({ - "mode": @json_parser.JsonValue::String(async_runtime_mode_label(state.mode)), - "background_worker": @json_parser.JsonValue::Bool(state.background_worker), - }) -} - -pub fn stringify_async_runtime_state( - state : AsyncRuntimeState, - pretty~ : Bool = false, -) -> String { - let value = async_runtime_state_to_json(state) - if pretty { - @json_parser.stringify_pretty(value, 2) - } else { - @json_parser.stringify(value) - } -} - -fn async_flush_policy_label(policy : AsyncFlushPolicy) -> String { - match policy { - AsyncFlushPolicy::Never => "Never" - AsyncFlushPolicy::Batch => "Batch" - AsyncFlushPolicy::Shutdown => "Shutdown" - } -} - -fn async_logger_state_to_json_value(state : AsyncLoggerState) -> @json_parser.JsonValue { - @json_parser.JsonValue::Object({ - "runtime": async_runtime_state_to_json(state.runtime), - "pending_count": @json_parser.JsonValue::Number(state.pending_count.to_double()), - "dropped_count": @json_parser.JsonValue::Number(state.dropped_count.to_double()), - "is_closed": @json_parser.JsonValue::Bool(state.is_closed), - "is_running": @json_parser.JsonValue::Bool(state.is_running), - "has_failed": @json_parser.JsonValue::Bool(state.has_failed), - "last_error": @json_parser.JsonValue::String(state.last_error), - "flush_policy": @json_parser.JsonValue::String(async_flush_policy_label(state.flush_policy)), - }) -} - -pub fn async_logger_state_to_json(state : AsyncLoggerState) -> @json_parser.JsonValue { - async_logger_state_to_json_value(state) -} - -pub fn stringify_async_logger_state( - state : AsyncLoggerState, - pretty~ : Bool = false, -) -> String { - let value = async_logger_state_to_json_value(state) - if pretty { - @json_parser.stringify_pretty(value, 2) - } else { - @json_parser.stringify(value) - } -} - -pub struct AsyncLoggerConfig { - max_pending : Int - overflow : AsyncOverflowPolicy - max_batch : Int - linger_ms : Int - flush : AsyncFlushPolicy -} - -pub fn AsyncLoggerConfig::new( - max_pending~ : Int = 0, - overflow~ : AsyncOverflowPolicy = AsyncOverflowPolicy::Blocking, - max_batch~ : Int = 1, - linger_ms~ : Int = 0, - flush~ : AsyncFlushPolicy = AsyncFlushPolicy::Never, -) -> AsyncLoggerConfig { - { - max_pending, - overflow, - max_batch: if max_batch <= 1 { 1 } else { max_batch }, - linger_ms: if linger_ms < 0 { 0 } else { linger_ms }, - flush, - } -} - -fn parse_async_overflow(name : String) -> AsyncOverflowPolicy raise { - match name.to_upper() { - "BLOCKING" => AsyncOverflowPolicy::Blocking - "DROPOLDEST" => AsyncOverflowPolicy::DropOldest - "DROPLATEST" => AsyncOverflowPolicy::DropNewest - "DROPNEWEST" => AsyncOverflowPolicy::DropNewest - _ => raise Failure::Failure("Unsupported async overflow policy: " + name) - } -} - -fn parse_async_flush(name : String) -> AsyncFlushPolicy raise { - match name.to_upper() { - "NEVER" => AsyncFlushPolicy::Never - "NONE" => AsyncFlushPolicy::Never - "BATCH" => AsyncFlushPolicy::Batch - "SHUTDOWN" => AsyncFlushPolicy::Shutdown - _ => raise Failure::Failure("Unsupported async flush policy: " + name) - } -} - -pub fn parse_async_logger_config_text(input : String) -> AsyncLoggerConfig raise { - let root = @json_parser.parse(input) - let obj = match root.as_object() { - Some(obj) => obj - None => raise Failure::Failure("Expected object for async logger config") - } - let max_pending = match obj.get("max_pending") { - Some(value) => match value.as_number() { - Some(number) => number.to_int() - None => raise Failure::Failure("Expected number at async_config.max_pending") - } - None => 0 - } - let overflow = match obj.get("overflow") { - Some(value) => match value.as_string() { - Some(text) => parse_async_overflow(text) - None => raise Failure::Failure("Expected string at async_config.overflow") - } - None => AsyncOverflowPolicy::Blocking - } - let max_batch = match obj.get("max_batch") { - Some(value) => match value.as_number() { - Some(number) => number.to_int() - None => raise Failure::Failure("Expected number at async_config.max_batch") - } - None => 1 - } - let linger_ms = match obj.get("linger_ms") { - Some(value) => match value.as_number() { - Some(number) => number.to_int() - None => raise Failure::Failure("Expected number at async_config.linger_ms") - } - None => 0 - } - let flush = match obj.get("flush") { - Some(value) => match value.as_string() { - Some(text) => parse_async_flush(text) - None => raise Failure::Failure("Expected string at async_config.flush") - } - None => AsyncFlushPolicy::Never - } - AsyncLoggerConfig::new( - max_pending=max_pending, - overflow=overflow, - max_batch=max_batch, - linger_ms=linger_ms, - flush=flush, - ) -} - -pub fn async_logger_config_to_json(config : AsyncLoggerConfig) -> @json_parser.JsonValue { - @json_parser.JsonValue::Object({ - "max_pending": @json_parser.JsonValue::Number(config.max_pending.to_double()), - "max_batch": @json_parser.JsonValue::Number(config.max_batch.to_double()), - "linger_ms": @json_parser.JsonValue::Number(config.linger_ms.to_double()), - "overflow": @json_parser.JsonValue::String(match config.overflow { - AsyncOverflowPolicy::Blocking => "Blocking" - AsyncOverflowPolicy::DropOldest => "DropOldest" - AsyncOverflowPolicy::DropNewest => "DropNewest" - }), - "flush": @json_parser.JsonValue::String(match config.flush { - AsyncFlushPolicy::Never => "Never" - AsyncFlushPolicy::Batch => "Batch" - AsyncFlushPolicy::Shutdown => "Shutdown" - }), - }) -} - -pub fn stringify_async_logger_config(config : AsyncLoggerConfig, pretty~ : Bool = false) -> String { - let value = async_logger_config_to_json(config) - if pretty { - @json_parser.stringify_pretty(value, 2) - } else { - @json_parser.stringify(value) - } -} - -pub struct AsyncLoggerBuildConfig { - logger : @bitlogger.LoggerConfig - async_config : AsyncLoggerConfig -} - -pub fn AsyncLoggerBuildConfig::new( - logger~ : @bitlogger.LoggerConfig = @bitlogger.default_logger_config(), - async_config~ : AsyncLoggerConfig = AsyncLoggerConfig::new(), -) -> AsyncLoggerBuildConfig { - { logger, async_config } -} - -pub fn parse_async_logger_build_config_text(input : String) -> AsyncLoggerBuildConfig raise { - let root = @json_parser.parse(input) - let obj = match root.as_object() { - Some(obj) => obj - None => raise Failure::Failure("Expected object at async logger build config root") - } - let logger = match obj.get("logger") { - Some(value) => @bitlogger.parse_logger_config_text(@json_parser.stringify(value)) - None => @bitlogger.default_logger_config() - } - let async_config = match obj.get("async_config") { - Some(value) => parse_async_logger_config_text(@json_parser.stringify(value)) - None => AsyncLoggerConfig::new() - } - AsyncLoggerBuildConfig::new(logger=logger, async_config=async_config) -} - -pub fn async_logger_build_config_to_json( - config : AsyncLoggerBuildConfig, -) -> @json_parser.JsonValue { - @json_parser.JsonValue::Object({ - "logger": @bitlogger.logger_config_to_json(config.logger), - "async_config": async_logger_config_to_json(config.async_config), - }) -} - -pub fn stringify_async_logger_build_config( - config : AsyncLoggerBuildConfig, - pretty~ : Bool = false, -) -> String { - let value = async_logger_build_config_to_json(config) - if pretty { - @json_parser.stringify_pretty(value, 2) - } else { - @json_parser.stringify(value) - } -} - -pub struct AsyncLogger[S] { - min_level : @bitlogger.Level - target : String - timestamp : Bool - overflow : AsyncOverflowPolicy - max_batch : Int - linger_ms : Int - flush_policy : AsyncFlushPolicy - sink : S - flush_sink : (S) -> Int - context_fields : Array[@bitlogger.Field] - filter : (@bitlogger.Record) -> Bool - patch : @bitlogger.RecordPatch - queue : @async.Queue[@bitlogger.Record] - pending_count : Ref[Int] - dropped_count : Ref[Int] - is_closed : Ref[Bool] - is_running : Ref[Bool] - has_failed : Ref[Bool] - last_error : Ref[String] -} - -pub fn[S] async_logger( - sink : S, - config~ : AsyncLoggerConfig = AsyncLoggerConfig::new(), - min_level~ : @bitlogger.Level = @bitlogger.Level::Info, - target~ : String = "", - flush~ : (S) -> Int = fn(_) { 0 }, -) -> AsyncLogger[S] { - { - min_level, - target, - timestamp: false, - overflow: config.overflow, - max_batch: config.max_batch, - linger_ms: config.linger_ms, - flush_policy: config.flush, - sink, - flush_sink: flush, - context_fields: [], - filter: fn(_) { true }, - patch: @bitlogger.identity_patch(), - queue: @async.Queue(kind=queue_kind_of(config)), - pending_count: Ref::new(0), - dropped_count: Ref::new(0), - is_closed: Ref::new(false), - is_running: Ref::new(false), - has_failed: Ref::new(false), - last_error: Ref::new(""), - } -} - -fn queue_kind_of(config : AsyncLoggerConfig) -> @aqueue.Kind { - let limit = if config.max_pending < 0 { 0 } else { config.max_pending } - match config.overflow { - AsyncOverflowPolicy::Blocking => @aqueue.Kind::Blocking(limit) - AsyncOverflowPolicy::DropOldest => @aqueue.Kind::DiscardOldest(limit) - AsyncOverflowPolicy::DropNewest => @aqueue.Kind::DiscardLatest(limit) - } -} - -pub fn[S] AsyncLogger::with_timestamp(self : AsyncLogger[S], enabled~ : Bool = true) -> AsyncLogger[S] { - { ..self, timestamp: enabled } -} - -pub fn[S] AsyncLogger::with_target(self : AsyncLogger[S], target : String) -> AsyncLogger[S] { - { ..self, target } -} - -pub fn[S] AsyncLogger::with_context_fields( - self : AsyncLogger[S], - fields : Array[@bitlogger.Field], -) -> AsyncLogger[S] { - { ..self, context_fields: fields } -} - -pub fn[S] AsyncLogger::with_filter( - self : AsyncLogger[S], - predicate : (@bitlogger.Record) -> Bool, -) -> AsyncLogger[S] { - let current = self.filter - { - ..self, - filter: fn(rec) { - current(rec) && predicate(rec) - }, - } -} - -pub fn[S] AsyncLogger::with_patch( - self : AsyncLogger[S], - patch : @bitlogger.RecordPatch, -) -> AsyncLogger[S] { - let current = self.patch - { - ..self, - patch: fn(rec) { - patch(current(rec)) - }, - } -} - -pub fn[S] AsyncLogger::with_min_level( - self : AsyncLogger[S], - min_level : @bitlogger.Level, -) -> AsyncLogger[S] { - { ..self, min_level } -} - -fn combine_targets(parent : String, child : String) -> String { - if parent == "" { - child - } else if child == "" { - parent - } else { - "\{parent}.\{child}" - } -} - -pub fn[S] AsyncLogger::child(self : AsyncLogger[S], target : String) -> AsyncLogger[S] { - { ..self, target: combine_targets(self.target, target) } -} - -pub fn[S] AsyncLogger::is_enabled(self : AsyncLogger[S], level : @bitlogger.Level) -> Bool { - level.enabled(self.min_level) -} - -pub async fn[S] AsyncLogger::log( - self : AsyncLogger[S], - level : @bitlogger.Level, - message : String, - fields~ : Array[@bitlogger.Field] = [], - target? : String = "", -) -> Unit { - guard self.is_enabled(level) else { - () - } - let actual_target = if target == "" { self.target } else { target } - let timestamp_ms = if self.timestamp { @env.now() } else { 0UL } - let rec = @bitlogger.Record::new( - level, - message, - timestamp_ms=timestamp_ms, - target=actual_target, - fields=merge_fields(self.context_fields, fields), - ) - let rec = (self.patch)(rec) - guard (self.filter)(rec) else { - () - } - let accepted = self.queue.try_put(rec) catch { - err if err is AsyncLoggerClosed => false - err => raise err - } - if accepted { - self.pending_count.val += 1 - } else { - match self.overflow { - AsyncOverflowPolicy::Blocking => { - self.queue.put(rec) catch { - err if err is AsyncLoggerClosed => () - err => raise err - } - self.pending_count.val += 1 - } - AsyncOverflowPolicy::DropOldest | AsyncOverflowPolicy::DropNewest => { - self.dropped_count.val += 1 - } - } - } -} - -fn merge_fields( - left : Array[@bitlogger.Field], - right : Array[@bitlogger.Field], -) -> Array[@bitlogger.Field] { - if left.length() == 0 { - right - } else if right.length() == 0 { - left - } else { - left + right - } -} - -pub async fn[S] AsyncLogger::trace( - self : AsyncLogger[S], - message : String, - fields~ : Array[@bitlogger.Field] = [], -) -> Unit { - self.log(@bitlogger.Level::Trace, message, fields=fields) -} - -pub async fn[S] AsyncLogger::debug( - self : AsyncLogger[S], - message : String, - fields~ : Array[@bitlogger.Field] = [], -) -> Unit { - self.log(@bitlogger.Level::Debug, message, fields=fields) -} - -pub async fn[S] AsyncLogger::info( - self : AsyncLogger[S], - message : String, - fields~ : Array[@bitlogger.Field] = [], -) -> Unit { - self.log(@bitlogger.Level::Info, message, fields=fields) -} - -pub async fn[S] AsyncLogger::warn( - self : AsyncLogger[S], - message : String, - fields~ : Array[@bitlogger.Field] = [], -) -> Unit { - self.log(@bitlogger.Level::Warn, message, fields=fields) -} - -pub async fn[S] AsyncLogger::error( - self : AsyncLogger[S], - message : String, - fields~ : Array[@bitlogger.Field] = [], -) -> Unit { - self.log(@bitlogger.Level::Error, message, fields=fields) -} - -pub fn[S] AsyncLogger::pending_count(self : AsyncLogger[S]) -> Int { - self.pending_count.val -} - -pub fn[S] AsyncLogger::dropped_count(self : AsyncLogger[S]) -> Int { - self.dropped_count.val -} - -pub fn[S] AsyncLogger::is_closed(self : AsyncLogger[S]) -> Bool { - self.is_closed.val -} - -pub fn[S] AsyncLogger::is_running(self : AsyncLogger[S]) -> Bool { - self.is_running.val -} - -pub fn[S] AsyncLogger::has_failed(self : AsyncLogger[S]) -> Bool { - self.has_failed.val -} - -pub fn[S] AsyncLogger::last_error(self : AsyncLogger[S]) -> String { - self.last_error.val -} - -pub fn[S] AsyncLogger::flush_policy(self : AsyncLogger[S]) -> AsyncFlushPolicy { - self.flush_policy -} - -pub fn[S] AsyncLogger::state(self : AsyncLogger[S]) -> AsyncLoggerState { - { - runtime: async_runtime_state(), - pending_count: self.pending_count(), - dropped_count: self.dropped_count(), - is_closed: self.is_closed(), - is_running: self.is_running(), - has_failed: self.has_failed(), - last_error: self.last_error(), - flush_policy: self.flush_policy(), - } -} - -pub fn[S] AsyncLogger::close(self : AsyncLogger[S], clear? : Bool = false) -> Unit { - self.is_closed.val = true - if clear { - let abandoned = self.pending_count.val - if abandoned > 0 { - self.dropped_count.val += abandoned - self.pending_count.val = 0 - } - } - self.queue.close(error=AsyncLoggerClosed, clear=clear) -} - -pub async fn[S] AsyncLogger::shutdown(self : AsyncLogger[S], clear? : Bool = false) -> Unit { - if clear { - self.close(clear=true) - } else { - self.wait_idle() - if self.pending_count() > 0 { - self.close(clear=true) - } else { - self.close() - } - } - while self.is_running() { - @async.pause() - } -} - -pub async fn[S] AsyncLogger::wait_idle(self : AsyncLogger[S]) -> Unit { - while self.pending_count() > 0 { - if self.has_failed() { - break - } - @async.pause() - } -} - -async fn[S : @bitlogger.Sink] run_worker(logger : AsyncLogger[S]) -> Unit { - while true { - let rec = logger.queue.get() catch { - err if err is AsyncLoggerClosed => break - err => raise err - } - logger.sink.write(rec) - if logger.pending_count.val > 0 { - logger.pending_count.val -= 1 - } - for drained = 1; drained < logger.max_batch; { - let next = logger.queue.try_get() catch { - err if err is AsyncLoggerClosed => None - err => raise err - } - match next { - Some(next) => { - logger.sink.write(next) - if logger.pending_count.val > 0 { - logger.pending_count.val -= 1 - } - continue drained + 1 - } - None => { - if logger.linger_ms <= 0 { - break - } - let waited = @async.with_timeout_opt(logger.linger_ms, () => logger.queue.get()) catch { - err if err is AsyncLoggerClosed => None - err => raise err - } - match waited { - Some(next) => { - logger.sink.write(next) - if logger.pending_count.val > 0 { - logger.pending_count.val -= 1 - } - continue drained + 1 - } - None => break - } - } - } - } - match logger.flush_policy { - AsyncFlushPolicy::Batch => ignore((logger.flush_sink)(logger.sink)) - _ => () - } - } - match logger.flush_policy { - AsyncFlushPolicy::Shutdown => ignore((logger.flush_sink)(logger.sink)) - _ => () - } -} - -pub async fn[S : @bitlogger.Sink] AsyncLogger::run(self : AsyncLogger[S]) -> Unit { - self.is_running.val = true - self.has_failed.val = false - self.last_error.val = "" - run_worker(self) catch { - err => { - self.has_failed.val = true - self.last_error.val = err.to_string() - self.is_running.val = false - raise err - } - } - self.is_running.val = false -} - -pub fn build_async_logger( - config : AsyncLoggerBuildConfig, -) -> AsyncLogger[@bitlogger.RuntimeSink] { - let logger = @bitlogger.build_logger(config.logger) - async_logger( - logger.sink, - config=config.async_config, - min_level=logger.min_level, - target=logger.target, - flush=fn(sink) { sink.flush() }, - ).with_timestamp(enabled=logger.timestamp) -} - -pub fn build_async_text_logger(config : AsyncLoggerBuildConfig) -> AsyncLogger[@bitlogger.FormattedConsoleSink] { - async_logger( - @bitlogger.text_console_sink(config.logger.sink.text_formatter.to_formatter()), - config=config.async_config, - min_level=config.logger.min_level, - target=config.logger.target, - ).with_timestamp(enabled=config.logger.timestamp) +fn async_runtime_shutdown_waits_for_worker() -> Bool { + true } diff --git a/src-async/async_logger_shared.mbt b/src-async/async_logger_shared.mbt new file mode 100644 index 0000000..b93bd0d --- /dev/null +++ b/src-async/async_logger_shared.mbt @@ -0,0 +1,681 @@ +pub(all) suberror AsyncLoggerClosed { + AsyncLoggerClosed +} + +pub(all) enum AsyncOverflowPolicy { + Blocking + DropOldest + DropNewest +} + +pub(all) enum AsyncFlushPolicy { + Never + Batch + Shutdown +} + +pub enum AsyncRuntimeMode { + NativeWorker + Compatibility +} + +pub fn async_runtime_mode_label(mode : AsyncRuntimeMode) -> String { + match mode { + AsyncRuntimeMode::NativeWorker => "native_worker" + AsyncRuntimeMode::Compatibility => "compatibility" + } +} + +fn all_async_runtime_modes() -> Array[AsyncRuntimeMode] { + [AsyncRuntimeMode::NativeWorker, AsyncRuntimeMode::Compatibility] +} + +pub struct AsyncRuntimeState { + mode : AsyncRuntimeMode + background_worker : Bool +} + +pub struct AsyncLoggerState { + runtime : AsyncRuntimeState + pending_count : Int + dropped_count : Int + is_closed : Bool + is_running : Bool + has_failed : Bool + last_error : String + flush_policy : AsyncFlushPolicy +} + +pub fn async_runtime_state() -> AsyncRuntimeState { + { + mode: async_runtime_mode(), + background_worker: async_runtime_supports_background_worker(), + } +} + +pub fn async_runtime_state_to_json(state : AsyncRuntimeState) -> @json_parser.JsonValue { + @json_parser.JsonValue::Object({ + "mode": @json_parser.JsonValue::String(async_runtime_mode_label(state.mode)), + "background_worker": @json_parser.JsonValue::Bool(state.background_worker), + }) +} + +pub fn stringify_async_runtime_state( + state : AsyncRuntimeState, + pretty~ : Bool = false, +) -> String { + let value = async_runtime_state_to_json(state) + if pretty { + @json_parser.stringify_pretty(value, 2) + } else { + @json_parser.stringify(value) + } +} + +fn async_flush_policy_label(policy : AsyncFlushPolicy) -> String { + match policy { + AsyncFlushPolicy::Never => "Never" + AsyncFlushPolicy::Batch => "Batch" + AsyncFlushPolicy::Shutdown => "Shutdown" + } +} + +fn async_logger_state_to_json_value(state : AsyncLoggerState) -> @json_parser.JsonValue { + @json_parser.JsonValue::Object({ + "runtime": async_runtime_state_to_json(state.runtime), + "pending_count": @json_parser.JsonValue::Number(state.pending_count.to_double()), + "dropped_count": @json_parser.JsonValue::Number(state.dropped_count.to_double()), + "is_closed": @json_parser.JsonValue::Bool(state.is_closed), + "is_running": @json_parser.JsonValue::Bool(state.is_running), + "has_failed": @json_parser.JsonValue::Bool(state.has_failed), + "last_error": @json_parser.JsonValue::String(state.last_error), + "flush_policy": @json_parser.JsonValue::String(async_flush_policy_label(state.flush_policy)), + }) +} + +pub fn async_logger_state_to_json(state : AsyncLoggerState) -> @json_parser.JsonValue { + async_logger_state_to_json_value(state) +} + +pub fn stringify_async_logger_state( + state : AsyncLoggerState, + pretty~ : Bool = false, +) -> String { + let value = async_logger_state_to_json_value(state) + if pretty { + @json_parser.stringify_pretty(value, 2) + } else { + @json_parser.stringify(value) + } +} + +pub struct AsyncLoggerConfig { + max_pending : Int + overflow : AsyncOverflowPolicy + max_batch : Int + linger_ms : Int + flush : AsyncFlushPolicy +} + +pub fn AsyncLoggerConfig::new( + max_pending~ : Int = 0, + overflow~ : AsyncOverflowPolicy = AsyncOverflowPolicy::Blocking, + max_batch~ : Int = 1, + linger_ms~ : Int = 0, + flush~ : AsyncFlushPolicy = AsyncFlushPolicy::Never, +) -> AsyncLoggerConfig { + { + max_pending, + overflow, + max_batch: if max_batch <= 1 { 1 } else { max_batch }, + linger_ms: if linger_ms < 0 { 0 } else { linger_ms }, + flush, + } +} + +fn parse_async_overflow(name : String) -> AsyncOverflowPolicy raise { + match name.to_upper() { + "BLOCKING" => AsyncOverflowPolicy::Blocking + "DROPOLDEST" => AsyncOverflowPolicy::DropOldest + "DROPLATEST" => AsyncOverflowPolicy::DropNewest + "DROPNEWEST" => AsyncOverflowPolicy::DropNewest + _ => raise Failure::Failure("Unsupported async overflow policy: " + name) + } +} + +fn parse_async_flush(name : String) -> AsyncFlushPolicy raise { + match name.to_upper() { + "NEVER" => AsyncFlushPolicy::Never + "NONE" => AsyncFlushPolicy::Never + "BATCH" => AsyncFlushPolicy::Batch + "SHUTDOWN" => AsyncFlushPolicy::Shutdown + _ => raise Failure::Failure("Unsupported async flush policy: " + name) + } +} + +pub fn parse_async_logger_config_text(input : String) -> AsyncLoggerConfig raise { + let root = @json_parser.parse(input) + let obj = match root.as_object() { + Some(obj) => obj + None => raise Failure::Failure("Expected object for async logger config") + } + let max_pending = match obj.get("max_pending") { + Some(value) => match value.as_number() { + Some(number) => number.to_int() + None => raise Failure::Failure("Expected number at async_config.max_pending") + } + None => 0 + } + let overflow = match obj.get("overflow") { + Some(value) => match value.as_string() { + Some(text) => parse_async_overflow(text) + None => raise Failure::Failure("Expected string at async_config.overflow") + } + None => AsyncOverflowPolicy::Blocking + } + let max_batch = match obj.get("max_batch") { + Some(value) => match value.as_number() { + Some(number) => number.to_int() + None => raise Failure::Failure("Expected number at async_config.max_batch") + } + None => 1 + } + let linger_ms = match obj.get("linger_ms") { + Some(value) => match value.as_number() { + Some(number) => number.to_int() + None => raise Failure::Failure("Expected number at async_config.linger_ms") + } + None => 0 + } + let flush = match obj.get("flush") { + Some(value) => match value.as_string() { + Some(text) => parse_async_flush(text) + None => raise Failure::Failure("Expected string at async_config.flush") + } + None => AsyncFlushPolicy::Never + } + AsyncLoggerConfig::new( + max_pending=max_pending, + overflow=overflow, + max_batch=max_batch, + linger_ms=linger_ms, + flush=flush, + ) +} + +pub fn async_logger_config_to_json(config : AsyncLoggerConfig) -> @json_parser.JsonValue { + @json_parser.JsonValue::Object({ + "max_pending": @json_parser.JsonValue::Number(config.max_pending.to_double()), + "max_batch": @json_parser.JsonValue::Number(config.max_batch.to_double()), + "linger_ms": @json_parser.JsonValue::Number(config.linger_ms.to_double()), + "overflow": @json_parser.JsonValue::String(match config.overflow { + AsyncOverflowPolicy::Blocking => "Blocking" + AsyncOverflowPolicy::DropOldest => "DropOldest" + AsyncOverflowPolicy::DropNewest => "DropNewest" + }), + "flush": @json_parser.JsonValue::String(match config.flush { + AsyncFlushPolicy::Never => "Never" + AsyncFlushPolicy::Batch => "Batch" + AsyncFlushPolicy::Shutdown => "Shutdown" + }), + }) +} + +pub fn stringify_async_logger_config(config : AsyncLoggerConfig, pretty~ : Bool = false) -> String { + let value = async_logger_config_to_json(config) + if pretty { + @json_parser.stringify_pretty(value, 2) + } else { + @json_parser.stringify(value) + } +} + +pub struct AsyncLoggerBuildConfig { + logger : @bitlogger.LoggerConfig + async_config : AsyncLoggerConfig +} + +pub fn AsyncLoggerBuildConfig::new( + logger~ : @bitlogger.LoggerConfig = @bitlogger.default_logger_config(), + async_config~ : AsyncLoggerConfig = AsyncLoggerConfig::new(), +) -> AsyncLoggerBuildConfig { + { logger, async_config } +} + +pub fn parse_async_logger_build_config_text(input : String) -> AsyncLoggerBuildConfig raise { + let root = @json_parser.parse(input) + let obj = match root.as_object() { + Some(obj) => obj + None => raise Failure::Failure("Expected object at async logger build config root") + } + let logger = match obj.get("logger") { + Some(value) => @bitlogger.parse_logger_config_text(@json_parser.stringify(value)) + None => @bitlogger.default_logger_config() + } + let async_config = match obj.get("async_config") { + Some(value) => parse_async_logger_config_text(@json_parser.stringify(value)) + None => AsyncLoggerConfig::new() + } + AsyncLoggerBuildConfig::new(logger=logger, async_config=async_config) +} + +pub fn async_logger_build_config_to_json( + config : AsyncLoggerBuildConfig, +) -> @json_parser.JsonValue { + @json_parser.JsonValue::Object({ + "logger": @bitlogger.logger_config_to_json(config.logger), + "async_config": async_logger_config_to_json(config.async_config), + }) +} + +pub fn stringify_async_logger_build_config( + config : AsyncLoggerBuildConfig, + pretty~ : Bool = false, +) -> String { + let value = async_logger_build_config_to_json(config) + if pretty { + @json_parser.stringify_pretty(value, 2) + } else { + @json_parser.stringify(value) + } +} + +pub struct AsyncLogger[S] { + min_level : @bitlogger.Level + target : String + timestamp : Bool + overflow : AsyncOverflowPolicy + max_batch : Int + linger_ms : Int + flush_policy : AsyncFlushPolicy + sink : S + flush_sink : (S) -> Int + context_fields : Array[@bitlogger.Field] + filter : (@bitlogger.Record) -> Bool + patch : @bitlogger.RecordPatch + queue : @async.Queue[@bitlogger.Record] + pending_count : Ref[Int] + dropped_count : Ref[Int] + is_closed : Ref[Bool] + is_running : Ref[Bool] + has_failed : Ref[Bool] + last_error : Ref[String] +} + +pub fn[S] async_logger( + sink : S, + config~ : AsyncLoggerConfig = AsyncLoggerConfig::new(), + min_level~ : @bitlogger.Level = @bitlogger.Level::Info, + target~ : String = "", + flush~ : (S) -> Int = fn(_) { 0 }, +) -> AsyncLogger[S] { + { + min_level, + target, + timestamp: false, + overflow: config.overflow, + max_batch: config.max_batch, + linger_ms: config.linger_ms, + flush_policy: config.flush, + sink, + flush_sink: flush, + context_fields: [], + filter: fn(_) { true }, + patch: @bitlogger.identity_patch(), + queue: @async.Queue(kind=queue_kind_of(config)), + pending_count: Ref::new(0), + dropped_count: Ref::new(0), + is_closed: Ref::new(false), + is_running: Ref::new(false), + has_failed: Ref::new(false), + last_error: Ref::new(""), + } +} + +fn queue_kind_of(config : AsyncLoggerConfig) -> @aqueue.Kind { + let limit = if config.max_pending < 0 { 0 } else { config.max_pending } + match config.overflow { + AsyncOverflowPolicy::Blocking => @aqueue.Kind::Blocking(limit) + AsyncOverflowPolicy::DropOldest => @aqueue.Kind::DiscardOldest(limit) + AsyncOverflowPolicy::DropNewest => @aqueue.Kind::DiscardLatest(limit) + } +} + +pub fn[S] AsyncLogger::with_timestamp(self : AsyncLogger[S], enabled~ : Bool = true) -> AsyncLogger[S] { + { ..self, timestamp: enabled } +} + +pub fn[S] AsyncLogger::with_target(self : AsyncLogger[S], target : String) -> AsyncLogger[S] { + { ..self, target } +} + +pub fn[S] AsyncLogger::with_context_fields( + self : AsyncLogger[S], + fields : Array[@bitlogger.Field], +) -> AsyncLogger[S] { + { ..self, context_fields: fields } +} + +pub fn[S] AsyncLogger::with_filter( + self : AsyncLogger[S], + predicate : (@bitlogger.Record) -> Bool, +) -> AsyncLogger[S] { + let current = self.filter + { + ..self, + filter: fn(rec) { + current(rec) && predicate(rec) + }, + } +} + +pub fn[S] AsyncLogger::with_patch( + self : AsyncLogger[S], + patch : @bitlogger.RecordPatch, +) -> AsyncLogger[S] { + let current = self.patch + { + ..self, + patch: fn(rec) { + patch(current(rec)) + }, + } +} + +pub fn[S] AsyncLogger::with_min_level( + self : AsyncLogger[S], + min_level : @bitlogger.Level, +) -> AsyncLogger[S] { + { ..self, min_level } +} + +fn combine_targets(parent : String, child : String) -> String { + if parent == "" { + child + } else if child == "" { + parent + } else { + "\{parent}.\{child}" + } +} + +pub fn[S] AsyncLogger::child(self : AsyncLogger[S], target : String) -> AsyncLogger[S] { + { ..self, target: combine_targets(self.target, target) } +} + +pub fn[S] AsyncLogger::is_enabled(self : AsyncLogger[S], level : @bitlogger.Level) -> Bool { + level.enabled(self.min_level) +} + +fn merge_fields( + left : Array[@bitlogger.Field], + right : Array[@bitlogger.Field], +) -> Array[@bitlogger.Field] { + if left.length() == 0 { + right + } else if right.length() == 0 { + left + } else { + left + right + } +} + +pub async fn[S] AsyncLogger::log( + self : AsyncLogger[S], + level : @bitlogger.Level, + message : String, + fields~ : Array[@bitlogger.Field] = [], + target? : String = "", +) -> Unit { + guard !(async_runtime_guard_closed_on_log() && self.is_closed()) else { + () + } + guard self.is_enabled(level) else { + () + } + let actual_target = if target == "" { self.target } else { target } + let timestamp_ms = if self.timestamp { @env.now() } else { 0UL } + let rec = @bitlogger.Record::new( + level, + message, + timestamp_ms=timestamp_ms, + target=actual_target, + fields=merge_fields(self.context_fields, fields), + ) + let rec = (self.patch)(rec) + guard (self.filter)(rec) else { + () + } + let accepted = self.queue.try_put(rec) catch { + err if err is AsyncLoggerClosed => false + err => raise err + } + if accepted { + self.pending_count.val += 1 + } else { + match self.overflow { + AsyncOverflowPolicy::Blocking => { + self.queue.put(rec) catch { + err if err is AsyncLoggerClosed => () + err => raise err + } + self.pending_count.val += 1 + } + AsyncOverflowPolicy::DropOldest | AsyncOverflowPolicy::DropNewest => { + self.dropped_count.val += 1 + } + } + } +} + +pub async fn[S] AsyncLogger::trace( + self : AsyncLogger[S], + message : String, + fields~ : Array[@bitlogger.Field] = [], +) -> Unit { + self.log(@bitlogger.Level::Trace, message, fields=fields) +} + +pub async fn[S] AsyncLogger::debug( + self : AsyncLogger[S], + message : String, + fields~ : Array[@bitlogger.Field] = [], +) -> Unit { + self.log(@bitlogger.Level::Debug, message, fields=fields) +} + +pub async fn[S] AsyncLogger::info( + self : AsyncLogger[S], + message : String, + fields~ : Array[@bitlogger.Field] = [], +) -> Unit { + self.log(@bitlogger.Level::Info, message, fields=fields) +} + +pub async fn[S] AsyncLogger::warn( + self : AsyncLogger[S], + message : String, + fields~ : Array[@bitlogger.Field] = [], +) -> Unit { + self.log(@bitlogger.Level::Warn, message, fields=fields) +} + +pub async fn[S] AsyncLogger::error( + self : AsyncLogger[S], + message : String, + fields~ : Array[@bitlogger.Field] = [], +) -> Unit { + self.log(@bitlogger.Level::Error, message, fields=fields) +} + +pub fn[S] AsyncLogger::pending_count(self : AsyncLogger[S]) -> Int { + self.pending_count.val +} + +pub fn[S] AsyncLogger::dropped_count(self : AsyncLogger[S]) -> Int { + self.dropped_count.val +} + +pub fn[S] AsyncLogger::is_closed(self : AsyncLogger[S]) -> Bool { + self.is_closed.val +} + +pub fn[S] AsyncLogger::is_running(self : AsyncLogger[S]) -> Bool { + self.is_running.val +} + +pub fn[S] AsyncLogger::has_failed(self : AsyncLogger[S]) -> Bool { + self.has_failed.val +} + +pub fn[S] AsyncLogger::last_error(self : AsyncLogger[S]) -> String { + self.last_error.val +} + +pub fn[S] AsyncLogger::flush_policy(self : AsyncLogger[S]) -> AsyncFlushPolicy { + self.flush_policy +} + +pub fn[S] AsyncLogger::state(self : AsyncLogger[S]) -> AsyncLoggerState { + { + runtime: async_runtime_state(), + pending_count: self.pending_count(), + dropped_count: self.dropped_count(), + is_closed: self.is_closed(), + is_running: self.is_running(), + has_failed: self.has_failed(), + last_error: self.last_error(), + flush_policy: self.flush_policy(), + } +} + +pub fn[S] AsyncLogger::close(self : AsyncLogger[S], clear? : Bool = false) -> Unit { + self.is_closed.val = true + if clear { + let abandoned = self.pending_count() + if abandoned > 0 { + self.dropped_count.val += abandoned + self.pending_count.val = 0 + } + } + self.queue.close(error=AsyncLoggerClosed, clear=clear) +} + +pub async fn[S] AsyncLogger::wait_idle(self : AsyncLogger[S]) -> Unit { + while self.pending_count() > 0 { + if self.has_failed() { + break + } + @async.pause() + } +} + +pub async fn[S] AsyncLogger::shutdown(self : AsyncLogger[S], clear? : Bool = false) -> Unit { + if clear { + self.close(clear=true) + } else { + self.wait_idle() + if async_runtime_shutdown_clears_pending_after_wait_idle() && self.pending_count() > 0 { + self.close(clear=true) + } else { + self.close() + } + } + if async_runtime_shutdown_waits_for_worker() { + while self.is_running() { + @async.pause() + } + } +} + +async fn[S : @bitlogger.Sink] run_worker(logger : AsyncLogger[S]) -> Unit { + while true { + let rec = logger.queue.get() catch { + err if err is AsyncLoggerClosed => break + err => raise err + } + logger.sink.write(rec) + if logger.pending_count.val > 0 { + logger.pending_count.val -= 1 + } + for drained = 1; drained < logger.max_batch; { + let next = logger.queue.try_get() catch { + err if err is AsyncLoggerClosed => None + err => raise err + } + match next { + Some(next) => { + logger.sink.write(next) + if logger.pending_count.val > 0 { + logger.pending_count.val -= 1 + } + continue drained + 1 + } + None => { + if logger.linger_ms <= 0 { + break + } + let waited = @async.with_timeout_opt(logger.linger_ms, () => logger.queue.get()) catch { + err if err is AsyncLoggerClosed => None + err => raise err + } + match waited { + Some(next) => { + logger.sink.write(next) + if logger.pending_count.val > 0 { + logger.pending_count.val -= 1 + } + continue drained + 1 + } + None => break + } + } + } + } + match logger.flush_policy { + AsyncFlushPolicy::Batch => ignore((logger.flush_sink)(logger.sink)) + _ => () + } + } + match logger.flush_policy { + AsyncFlushPolicy::Shutdown => ignore((logger.flush_sink)(logger.sink)) + _ => () + } +} + +pub async fn[S : @bitlogger.Sink] AsyncLogger::run(self : AsyncLogger[S]) -> Unit { + self.is_running.val = true + self.has_failed.val = false + self.last_error.val = "" + run_worker(self) catch { + err => { + self.has_failed.val = true + self.last_error.val = err.to_string() + self.is_running.val = false + raise err + } + } + self.is_running.val = false +} + +pub fn build_async_logger( + config : AsyncLoggerBuildConfig, +) -> AsyncLogger[@bitlogger.RuntimeSink] { + let logger = @bitlogger.build_logger(config.logger) + async_logger( + logger.sink, + config=config.async_config, + min_level=logger.min_level, + target=logger.target, + flush=fn(sink) { sink.flush() }, + ).with_timestamp(enabled=logger.timestamp) +} + +pub fn build_async_text_logger(config : AsyncLoggerBuildConfig) -> AsyncLogger[@bitlogger.FormattedConsoleSink] { + async_logger( + @bitlogger.text_console_sink(config.logger.sink.text_formatter.to_formatter()), + config=config.async_config, + min_level=config.logger.min_level, + target=config.logger.target, + ).with_timestamp(enabled=config.logger.timestamp) +} diff --git a/src-async/async_logger_stub.mbt b/src-async/async_logger_stub.mbt index 76f3662..07a974f 100644 --- a/src-async/async_logger_stub.mbt +++ b/src-async/async_logger_stub.mbt @@ -1,681 +1,20 @@ -pub(all) suberror AsyncLoggerClosed { - AsyncLoggerClosed -} - -pub(all) enum AsyncOverflowPolicy { - Blocking - DropOldest - DropNewest -} - -pub(all) enum AsyncFlushPolicy { - Never - Batch - Shutdown -} - -pub enum AsyncRuntimeMode { - NativeWorker - Compatibility -} - pub fn async_runtime_mode() -> AsyncRuntimeMode { AsyncRuntimeMode::Compatibility } -pub fn async_runtime_mode_label(mode : AsyncRuntimeMode) -> String { - match mode { - AsyncRuntimeMode::NativeWorker => "native_worker" - AsyncRuntimeMode::Compatibility => "compatibility" - } -} - -fn all_async_runtime_modes() -> Array[AsyncRuntimeMode] { - [AsyncRuntimeMode::NativeWorker, AsyncRuntimeMode::Compatibility] -} - pub fn async_runtime_supports_background_worker() -> Bool { ignore(all_async_runtime_modes()) false } -pub struct AsyncRuntimeState { - mode : AsyncRuntimeMode - background_worker : Bool +fn async_runtime_guard_closed_on_log() -> Bool { + true } -pub struct AsyncLoggerState { - runtime : AsyncRuntimeState - pending_count : Int - dropped_count : Int - is_closed : Bool - is_running : Bool - has_failed : Bool - last_error : String - flush_policy : AsyncFlushPolicy +fn async_runtime_shutdown_clears_pending_after_wait_idle() -> Bool { + false } -pub fn async_runtime_state() -> AsyncRuntimeState { - { - mode: async_runtime_mode(), - background_worker: async_runtime_supports_background_worker(), - } -} - -pub fn async_runtime_state_to_json(state : AsyncRuntimeState) -> @json_parser.JsonValue { - @json_parser.JsonValue::Object({ - "mode": @json_parser.JsonValue::String(async_runtime_mode_label(state.mode)), - "background_worker": @json_parser.JsonValue::Bool(state.background_worker), - }) -} - -pub fn stringify_async_runtime_state( - state : AsyncRuntimeState, - pretty~ : Bool = false, -) -> String { - let value = async_runtime_state_to_json(state) - if pretty { - @json_parser.stringify_pretty(value, 2) - } else { - @json_parser.stringify(value) - } -} - -fn async_flush_policy_label(policy : AsyncFlushPolicy) -> String { - match policy { - AsyncFlushPolicy::Never => "Never" - AsyncFlushPolicy::Batch => "Batch" - AsyncFlushPolicy::Shutdown => "Shutdown" - } -} - -fn async_logger_state_to_json_value(state : AsyncLoggerState) -> @json_parser.JsonValue { - @json_parser.JsonValue::Object({ - "runtime": async_runtime_state_to_json(state.runtime), - "pending_count": @json_parser.JsonValue::Number(state.pending_count.to_double()), - "dropped_count": @json_parser.JsonValue::Number(state.dropped_count.to_double()), - "is_closed": @json_parser.JsonValue::Bool(state.is_closed), - "is_running": @json_parser.JsonValue::Bool(state.is_running), - "has_failed": @json_parser.JsonValue::Bool(state.has_failed), - "last_error": @json_parser.JsonValue::String(state.last_error), - "flush_policy": @json_parser.JsonValue::String(async_flush_policy_label(state.flush_policy)), - }) -} - -pub fn async_logger_state_to_json(state : AsyncLoggerState) -> @json_parser.JsonValue { - async_logger_state_to_json_value(state) -} - -pub fn stringify_async_logger_state( - state : AsyncLoggerState, - pretty~ : Bool = false, -) -> String { - let value = async_logger_state_to_json_value(state) - if pretty { - @json_parser.stringify_pretty(value, 2) - } else { - @json_parser.stringify(value) - } -} - -pub struct AsyncLoggerConfig { - max_pending : Int - overflow : AsyncOverflowPolicy - max_batch : Int - linger_ms : Int - flush : AsyncFlushPolicy -} - -pub fn AsyncLoggerConfig::new( - max_pending~ : Int = 0, - overflow~ : AsyncOverflowPolicy = AsyncOverflowPolicy::Blocking, - max_batch~ : Int = 1, - linger_ms~ : Int = 0, - flush~ : AsyncFlushPolicy = AsyncFlushPolicy::Never, -) -> AsyncLoggerConfig { - { - max_pending, - overflow, - max_batch: if max_batch <= 1 { 1 } else { max_batch }, - linger_ms: if linger_ms < 0 { 0 } else { linger_ms }, - flush, - } -} - -fn parse_async_overflow(name : String) -> AsyncOverflowPolicy raise { - match name.to_upper() { - "BLOCKING" => AsyncOverflowPolicy::Blocking - "DROPOLDEST" => AsyncOverflowPolicy::DropOldest - "DROPLATEST" => AsyncOverflowPolicy::DropNewest - "DROPNEWEST" => AsyncOverflowPolicy::DropNewest - _ => raise Failure::Failure("Unsupported async overflow policy: " + name) - } -} - -fn parse_async_flush(name : String) -> AsyncFlushPolicy raise { - match name.to_upper() { - "NEVER" => AsyncFlushPolicy::Never - "NONE" => AsyncFlushPolicy::Never - "BATCH" => AsyncFlushPolicy::Batch - "SHUTDOWN" => AsyncFlushPolicy::Shutdown - _ => raise Failure::Failure("Unsupported async flush policy: " + name) - } -} - -pub fn parse_async_logger_config_text(input : String) -> AsyncLoggerConfig raise { - let root = @json_parser.parse(input) - let obj = match root.as_object() { - Some(obj) => obj - None => raise Failure::Failure("Expected object for async logger config") - } - let max_pending = match obj.get("max_pending") { - Some(value) => match value.as_number() { - Some(number) => number.to_int() - None => raise Failure::Failure("Expected number at async_config.max_pending") - } - None => 0 - } - let overflow = match obj.get("overflow") { - Some(value) => match value.as_string() { - Some(text) => parse_async_overflow(text) - None => raise Failure::Failure("Expected string at async_config.overflow") - } - None => AsyncOverflowPolicy::Blocking - } - let max_batch = match obj.get("max_batch") { - Some(value) => match value.as_number() { - Some(number) => number.to_int() - None => raise Failure::Failure("Expected number at async_config.max_batch") - } - None => 1 - } - let linger_ms = match obj.get("linger_ms") { - Some(value) => match value.as_number() { - Some(number) => number.to_int() - None => raise Failure::Failure("Expected number at async_config.linger_ms") - } - None => 0 - } - let flush = match obj.get("flush") { - Some(value) => match value.as_string() { - Some(text) => parse_async_flush(text) - None => raise Failure::Failure("Expected string at async_config.flush") - } - None => AsyncFlushPolicy::Never - } - AsyncLoggerConfig::new( - max_pending=max_pending, - overflow=overflow, - max_batch=max_batch, - linger_ms=linger_ms, - flush=flush, - ) -} - -pub fn async_logger_config_to_json(config : AsyncLoggerConfig) -> @json_parser.JsonValue { - @json_parser.JsonValue::Object({ - "max_pending": @json_parser.JsonValue::Number(config.max_pending.to_double()), - "max_batch": @json_parser.JsonValue::Number(config.max_batch.to_double()), - "linger_ms": @json_parser.JsonValue::Number(config.linger_ms.to_double()), - "overflow": @json_parser.JsonValue::String(match config.overflow { - AsyncOverflowPolicy::Blocking => "Blocking" - AsyncOverflowPolicy::DropOldest => "DropOldest" - AsyncOverflowPolicy::DropNewest => "DropNewest" - }), - "flush": @json_parser.JsonValue::String(match config.flush { - AsyncFlushPolicy::Never => "Never" - AsyncFlushPolicy::Batch => "Batch" - AsyncFlushPolicy::Shutdown => "Shutdown" - }), - }) -} - -pub fn stringify_async_logger_config(config : AsyncLoggerConfig, pretty~ : Bool = false) -> String { - let value = async_logger_config_to_json(config) - if pretty { - @json_parser.stringify_pretty(value, 2) - } else { - @json_parser.stringify(value) - } -} - -pub struct AsyncLoggerBuildConfig { - logger : @bitlogger.LoggerConfig - async_config : AsyncLoggerConfig -} - -pub fn AsyncLoggerBuildConfig::new( - logger~ : @bitlogger.LoggerConfig = @bitlogger.default_logger_config(), - async_config~ : AsyncLoggerConfig = AsyncLoggerConfig::new(), -) -> AsyncLoggerBuildConfig { - { logger, async_config } -} - -pub fn parse_async_logger_build_config_text(input : String) -> AsyncLoggerBuildConfig raise { - let root = @json_parser.parse(input) - let obj = match root.as_object() { - Some(obj) => obj - None => raise Failure::Failure("Expected object at async logger build config root") - } - let logger = match obj.get("logger") { - Some(value) => @bitlogger.parse_logger_config_text(@json_parser.stringify(value)) - None => @bitlogger.default_logger_config() - } - let async_config = match obj.get("async_config") { - Some(value) => parse_async_logger_config_text(@json_parser.stringify(value)) - None => AsyncLoggerConfig::new() - } - AsyncLoggerBuildConfig::new(logger=logger, async_config=async_config) -} - -pub fn async_logger_build_config_to_json( - config : AsyncLoggerBuildConfig, -) -> @json_parser.JsonValue { - @json_parser.JsonValue::Object({ - "logger": @bitlogger.logger_config_to_json(config.logger), - "async_config": async_logger_config_to_json(config.async_config), - }) -} - -pub fn stringify_async_logger_build_config( - config : AsyncLoggerBuildConfig, - pretty~ : Bool = false, -) -> String { - let value = async_logger_build_config_to_json(config) - if pretty { - @json_parser.stringify_pretty(value, 2) - } else { - @json_parser.stringify(value) - } -} - -pub struct AsyncLogger[S] { - min_level : @bitlogger.Level - target : String - timestamp : Bool - overflow : AsyncOverflowPolicy - max_batch : Int - linger_ms : Int - flush_policy : AsyncFlushPolicy - sink : S - flush_sink : (S) -> Int - context_fields : Array[@bitlogger.Field] - filter : (@bitlogger.Record) -> Bool - patch : @bitlogger.RecordPatch - queue : @async.Queue[@bitlogger.Record] - pending_count : Ref[Int] - dropped_count : Ref[Int] - is_closed : Ref[Bool] - is_running : Ref[Bool] - has_failed : Ref[Bool] - last_error : Ref[String] -} - -pub fn[S] async_logger( - sink : S, - config~ : AsyncLoggerConfig = AsyncLoggerConfig::new(), - min_level~ : @bitlogger.Level = @bitlogger.Level::Info, - target~ : String = "", - flush~ : (S) -> Int = fn(_) { 0 }, -) -> AsyncLogger[S] { - { - min_level, - target, - timestamp: false, - overflow: config.overflow, - max_batch: config.max_batch, - linger_ms: config.linger_ms, - flush_policy: config.flush, - sink, - flush_sink: flush, - context_fields: [], - filter: fn(_) { true }, - patch: @bitlogger.identity_patch(), - queue: @async.Queue(kind=queue_kind_of(config)), - pending_count: Ref::new(0), - dropped_count: Ref::new(0), - is_closed: Ref::new(false), - is_running: Ref::new(false), - has_failed: Ref::new(false), - last_error: Ref::new(""), - } -} - -fn queue_kind_of(config : AsyncLoggerConfig) -> @aqueue.Kind { - let limit = if config.max_pending < 0 { 0 } else { config.max_pending } - match config.overflow { - AsyncOverflowPolicy::Blocking => @aqueue.Kind::Blocking(limit) - AsyncOverflowPolicy::DropOldest => @aqueue.Kind::DiscardOldest(limit) - AsyncOverflowPolicy::DropNewest => @aqueue.Kind::DiscardLatest(limit) - } -} - -pub fn[S] AsyncLogger::with_timestamp(self : AsyncLogger[S], enabled~ : Bool = true) -> AsyncLogger[S] { - { ..self, timestamp: enabled } -} - -pub fn[S] AsyncLogger::with_target(self : AsyncLogger[S], target : String) -> AsyncLogger[S] { - { ..self, target } -} - -pub fn[S] AsyncLogger::with_context_fields( - self : AsyncLogger[S], - fields : Array[@bitlogger.Field], -) -> AsyncLogger[S] { - { ..self, context_fields: fields } -} - -pub fn[S] AsyncLogger::with_filter( - self : AsyncLogger[S], - predicate : (@bitlogger.Record) -> Bool, -) -> AsyncLogger[S] { - let current = self.filter - { - ..self, - filter: fn(rec) { - current(rec) && predicate(rec) - }, - } -} - -pub fn[S] AsyncLogger::with_patch( - self : AsyncLogger[S], - patch : @bitlogger.RecordPatch, -) -> AsyncLogger[S] { - let current = self.patch - { - ..self, - patch: fn(rec) { - patch(current(rec)) - }, - } -} - -pub fn[S] AsyncLogger::with_min_level( - self : AsyncLogger[S], - min_level : @bitlogger.Level, -) -> AsyncLogger[S] { - { ..self, min_level } -} - -fn combine_targets(parent : String, child : String) -> String { - if parent == "" { - child - } else if child == "" { - parent - } else { - "\{parent}.\{child}" - } -} - -pub fn[S] AsyncLogger::child(self : AsyncLogger[S], target : String) -> AsyncLogger[S] { - { ..self, target: combine_targets(self.target, target) } -} - -pub fn[S] AsyncLogger::is_enabled(self : AsyncLogger[S], level : @bitlogger.Level) -> Bool { - level.enabled(self.min_level) -} - -fn merge_fields( - left : Array[@bitlogger.Field], - right : Array[@bitlogger.Field], -) -> Array[@bitlogger.Field] { - if left.length() == 0 { - right - } else if right.length() == 0 { - left - } else { - left + right - } -} - -pub async fn[S] AsyncLogger::log( - self : AsyncLogger[S], - level : @bitlogger.Level, - message : String, - fields~ : Array[@bitlogger.Field] = [], - target? : String = "", -) -> Unit { - guard !self.is_closed() else { - () - } - guard self.is_enabled(level) else { - () - } - let actual_target = if target == "" { self.target } else { target } - let timestamp_ms = if self.timestamp { @env.now() } else { 0UL } - let rec = @bitlogger.Record::new( - level, - message, - timestamp_ms=timestamp_ms, - target=actual_target, - fields=merge_fields(self.context_fields, fields), - ) - let rec = (self.patch)(rec) - guard (self.filter)(rec) else { - () - } - let accepted = self.queue.try_put(rec) catch { - err if err is AsyncLoggerClosed => false - err => raise err - } - if accepted { - self.pending_count.val += 1 - } else { - match self.overflow { - AsyncOverflowPolicy::Blocking => { - self.queue.put(rec) catch { - err if err is AsyncLoggerClosed => () - err => raise err - } - self.pending_count.val += 1 - } - AsyncOverflowPolicy::DropOldest | AsyncOverflowPolicy::DropNewest => { - self.dropped_count.val += 1 - } - } - } -} - -pub async fn[S] AsyncLogger::trace( - self : AsyncLogger[S], - message : String, - fields~ : Array[@bitlogger.Field] = [], -) -> Unit { - self.log(@bitlogger.Level::Trace, message, fields=fields) -} - -pub async fn[S] AsyncLogger::debug( - self : AsyncLogger[S], - message : String, - fields~ : Array[@bitlogger.Field] = [], -) -> Unit { - self.log(@bitlogger.Level::Debug, message, fields=fields) -} - -pub async fn[S] AsyncLogger::info( - self : AsyncLogger[S], - message : String, - fields~ : Array[@bitlogger.Field] = [], -) -> Unit { - self.log(@bitlogger.Level::Info, message, fields=fields) -} - -pub async fn[S] AsyncLogger::warn( - self : AsyncLogger[S], - message : String, - fields~ : Array[@bitlogger.Field] = [], -) -> Unit { - self.log(@bitlogger.Level::Warn, message, fields=fields) -} - -pub async fn[S] AsyncLogger::error( - self : AsyncLogger[S], - message : String, - fields~ : Array[@bitlogger.Field] = [], -) -> Unit { - self.log(@bitlogger.Level::Error, message, fields=fields) -} - -pub fn[S] AsyncLogger::pending_count(self : AsyncLogger[S]) -> Int { - self.pending_count.val -} - -pub fn[S] AsyncLogger::dropped_count(self : AsyncLogger[S]) -> Int { - self.dropped_count.val -} - -pub fn[S] AsyncLogger::is_closed(self : AsyncLogger[S]) -> Bool { - self.is_closed.val -} - -pub fn[S] AsyncLogger::is_running(self : AsyncLogger[S]) -> Bool { - self.is_running.val -} - -pub fn[S] AsyncLogger::has_failed(self : AsyncLogger[S]) -> Bool { - self.has_failed.val -} - -pub fn[S] AsyncLogger::last_error(self : AsyncLogger[S]) -> String { - self.last_error.val -} - -pub fn[S] AsyncLogger::flush_policy(self : AsyncLogger[S]) -> AsyncFlushPolicy { - self.flush_policy -} - -pub fn[S] AsyncLogger::state(self : AsyncLogger[S]) -> AsyncLoggerState { - { - runtime: async_runtime_state(), - pending_count: self.pending_count(), - dropped_count: self.dropped_count(), - is_closed: self.is_closed(), - is_running: self.is_running(), - has_failed: self.has_failed(), - last_error: self.last_error(), - flush_policy: self.flush_policy(), - } -} - -pub fn[S] AsyncLogger::close(self : AsyncLogger[S], clear? : Bool = false) -> Unit { - self.is_closed.val = true - if clear { - let abandoned = self.pending_count() - if abandoned > 0 { - self.dropped_count.val += abandoned - self.pending_count.val = 0 - } - } - self.queue.close(error=AsyncLoggerClosed, clear=clear) -} - -pub async fn[S] AsyncLogger::wait_idle(self : AsyncLogger[S]) -> Unit { - while self.pending_count() > 0 { - if self.has_failed() { - break - } - @async.pause() - } -} - -pub async fn[S] AsyncLogger::shutdown(self : AsyncLogger[S], clear? : Bool = false) -> Unit { - if clear { - self.close(clear=true) - } else { - self.wait_idle() - self.close() - } -} - -pub async fn[S : @bitlogger.Sink] AsyncLogger::run(self : AsyncLogger[S]) -> Unit { - self.is_running.val = true - self.has_failed.val = false - self.last_error.val = "" - run_worker(self) catch { - err => { - self.has_failed.val = true - self.last_error.val = err.to_string() - self.is_running.val = false - raise err - } - } - self.is_running.val = false -} - -async fn[S : @bitlogger.Sink] run_worker(logger : AsyncLogger[S]) -> Unit { - while true { - let rec = logger.queue.get() catch { - err if err is AsyncLoggerClosed => break - err => raise err - } - logger.sink.write(rec) - if logger.pending_count.val > 0 { - logger.pending_count.val -= 1 - } - for drained = 1; drained < logger.max_batch; { - let next = logger.queue.try_get() catch { - err if err is AsyncLoggerClosed => None - err => raise err - } - match next { - Some(next) => { - logger.sink.write(next) - if logger.pending_count.val > 0 { - logger.pending_count.val -= 1 - } - continue drained + 1 - } - None => { - if logger.linger_ms <= 0 { - break - } - let waited = @async.with_timeout_opt(logger.linger_ms, () => logger.queue.get()) catch { - err if err is AsyncLoggerClosed => None - err => raise err - } - match waited { - Some(next) => { - logger.sink.write(next) - if logger.pending_count.val > 0 { - logger.pending_count.val -= 1 - } - continue drained + 1 - } - None => break - } - } - } - } - match logger.flush_policy { - AsyncFlushPolicy::Batch => ignore((logger.flush_sink)(logger.sink)) - _ => () - } - } - match logger.flush_policy { - AsyncFlushPolicy::Shutdown => ignore((logger.flush_sink)(logger.sink)) - _ => () - } -} - -pub fn build_async_logger( - config : AsyncLoggerBuildConfig, -) -> AsyncLogger[@bitlogger.RuntimeSink] { - let logger = @bitlogger.build_logger(config.logger) - async_logger( - logger.sink, - config=config.async_config, - min_level=logger.min_level, - target=logger.target, - flush=fn(sink) { sink.flush() }, - ).with_timestamp(enabled=logger.timestamp) -} - -pub fn build_async_text_logger(config : AsyncLoggerBuildConfig) -> AsyncLogger[@bitlogger.FormattedConsoleSink] { - async_logger( - @bitlogger.text_console_sink(config.logger.sink.text_formatter.to_formatter()), - config=config.async_config, - min_level=config.logger.min_level, - target=config.logger.target, - ).with_timestamp(enabled=config.logger.timestamp) +fn async_runtime_shutdown_waits_for_worker() -> Bool { + false } diff --git a/src/config.mbt b/src/config.mbt index de4c435..f949cbb 100644 --- a/src/config.mbt +++ b/src/config.mbt @@ -162,610 +162,6 @@ pub fn default_logger_config() -> LoggerConfig { LoggerConfig::new() } -pub(all) enum RuntimeSink { - Console(ConsoleSink) - JsonConsole(JsonConsoleSink) - TextConsole(FormattedConsoleSink) - File(FileSink) - QueuedConsole(QueuedSink[ConsoleSink]) - QueuedJsonConsole(QueuedSink[JsonConsoleSink]) - QueuedTextConsole(QueuedSink[FormattedConsoleSink]) - QueuedFile(QueuedSink[FileSink]) -} - -pub struct RuntimeFileState { - file : FileSinkState - queued : Bool - pending_count : Int - dropped_count : Int -} - -pub fn RuntimeFileState::new( - file : FileSinkState, - queued~ : Bool = false, - pending_count~ : Int = 0, - dropped_count~ : Int = 0, -) -> RuntimeFileState { - { file, queued, pending_count, dropped_count } -} - -fn file_sink_policy_to_json_value(policy : FileSinkPolicy) -> @json_parser.JsonValue { - let obj : Map[String, @json_parser.JsonValue] = { - "append": @json_parser.JsonValue::Bool(policy.append), - "auto_flush": @json_parser.JsonValue::Bool(policy.auto_flush), - } - match policy.rotation { - None => obj["rotation"] = @json_parser.JsonValue::Null - Some(rotation) => obj["rotation"] = file_rotation_config_to_json(rotation) - } - @json_parser.JsonValue::Object(obj) -} - -pub fn file_sink_policy_to_json(policy : FileSinkPolicy) -> @json_parser.JsonValue { - file_sink_policy_to_json_value(policy) -} - -pub fn stringify_file_sink_policy(policy : FileSinkPolicy, pretty~ : Bool = false) -> String { - let value = file_sink_policy_to_json_value(policy) - if pretty { - @json_parser.stringify_pretty(value, 2) - } else { - @json_parser.stringify(value) - } -} - -fn file_sink_state_to_json_value(state : FileSinkState) -> @json_parser.JsonValue { - let obj : Map[String, @json_parser.JsonValue] = { - "path": @json_parser.JsonValue::String(state.path), - "available": @json_parser.JsonValue::Bool(state.available), - "append": @json_parser.JsonValue::Bool(state.append), - "auto_flush": @json_parser.JsonValue::Bool(state.auto_flush), - "open_failures": @json_parser.JsonValue::Number(state.open_failures.to_double()), - "write_failures": @json_parser.JsonValue::Number(state.write_failures.to_double()), - "flush_failures": @json_parser.JsonValue::Number(state.flush_failures.to_double()), - "rotation_failures": @json_parser.JsonValue::Number(state.rotation_failures.to_double()), - } - match state.rotation { - None => obj["rotation"] = @json_parser.JsonValue::Null - Some(rotation) => obj["rotation"] = file_rotation_config_to_json(rotation) - } - @json_parser.JsonValue::Object(obj) -} - -pub fn file_sink_state_to_json(state : FileSinkState) -> @json_parser.JsonValue { - file_sink_state_to_json_value(state) -} - -pub fn stringify_file_sink_state(state : FileSinkState, pretty~ : Bool = false) -> String { - let value = file_sink_state_to_json_value(state) - if pretty { - @json_parser.stringify_pretty(value, 2) - } else { - @json_parser.stringify(value) - } -} - -pub fn runtime_file_state_to_json(state : RuntimeFileState) -> @json_parser.JsonValue { - @json_parser.JsonValue::Object({ - "file": file_sink_state_to_json_value(state.file), - "queued": @json_parser.JsonValue::Bool(state.queued), - "pending_count": @json_parser.JsonValue::Number(state.pending_count.to_double()), - "dropped_count": @json_parser.JsonValue::Number(state.dropped_count.to_double()), - }) -} - -pub fn stringify_runtime_file_state(state : RuntimeFileState, pretty~ : Bool = false) -> String { - let value = runtime_file_state_to_json(state) - if pretty { - @json_parser.stringify_pretty(value, 2) - } else { - @json_parser.stringify(value) - } -} - -pub impl Sink for RuntimeSink with write(self, rec) { - match self { - Console(sink) => sink.write(rec) - JsonConsole(sink) => sink.write(rec) - TextConsole(sink) => sink.write(rec) - File(sink) => sink.write(rec) - QueuedConsole(sink) => sink.write(rec) - QueuedJsonConsole(sink) => sink.write(rec) - QueuedTextConsole(sink) => sink.write(rec) - QueuedFile(sink) => sink.write(rec) - } -} - -pub fn RuntimeSink::flush(self : RuntimeSink) -> Int { - match self { - Console(_) => 0 - JsonConsole(_) => 0 - TextConsole(_) => 0 - File(sink) => if sink.flush() { 1 } else { 0 } - QueuedConsole(sink) => sink.flush() - QueuedJsonConsole(sink) => sink.flush() - QueuedTextConsole(sink) => sink.flush() - QueuedFile(sink) => sink.flush() - } -} - -pub fn RuntimeSink::drain(self : RuntimeSink, max_items~ : Int = -1) -> Int { - match self { - Console(_) => 0 - JsonConsole(_) => 0 - TextConsole(_) => 0 - File(sink) => if sink.flush() { 1 } else { 0 } - QueuedConsole(sink) => sink.drain(max_items=max_items) - QueuedJsonConsole(sink) => sink.drain(max_items=max_items) - QueuedTextConsole(sink) => sink.drain(max_items=max_items) - QueuedFile(sink) => sink.drain(max_items=max_items) - } -} - -pub fn RuntimeSink::close(self : RuntimeSink) -> Bool { - match self { - Console(_) => true - JsonConsole(_) => true - TextConsole(_) => true - File(sink) => sink.close() - QueuedConsole(_) => true - QueuedJsonConsole(_) => true - QueuedTextConsole(_) => true - QueuedFile(sink) => sink.sink.close() - } -} - -pub fn RuntimeSink::pending_count(self : RuntimeSink) -> Int { - match self { - Console(_) => 0 - JsonConsole(_) => 0 - TextConsole(_) => 0 - File(_) => 0 - QueuedConsole(sink) => sink.pending_count() - QueuedJsonConsole(sink) => sink.pending_count() - QueuedTextConsole(sink) => sink.pending_count() - QueuedFile(sink) => sink.pending_count() - } -} - -pub fn RuntimeSink::dropped_count(self : RuntimeSink) -> Int { - match self { - Console(_) => 0 - JsonConsole(_) => 0 - TextConsole(_) => 0 - File(_) => 0 - QueuedConsole(sink) => sink.dropped_count() - QueuedJsonConsole(sink) => sink.dropped_count() - QueuedTextConsole(sink) => sink.dropped_count() - QueuedFile(sink) => sink.dropped_count() - } -} - -pub fn RuntimeSink::file_available(self : RuntimeSink) -> Bool { - match self { - File(sink) => sink.is_available() - QueuedFile(sink) => sink.sink.is_available() - _ => false - } -} - -pub fn RuntimeSink::file_reopen(self : RuntimeSink, append~ : Bool? = None) -> Bool { - match self { - File(sink) => sink.reopen(append=append) - QueuedFile(sink) => sink.sink.reopen(append=append) - _ => false - } -} - -pub fn RuntimeSink::file_reopen_with_current_policy(self : RuntimeSink) -> Bool { - match self { - File(sink) => sink.reopen_with_current_policy() - QueuedFile(sink) => sink.sink.reopen_with_current_policy() - _ => false - } -} - -pub fn RuntimeSink::file_reopen_append(self : RuntimeSink) -> Bool { - match self { - File(sink) => sink.reopen_append() - QueuedFile(sink) => sink.sink.reopen_append() - _ => false - } -} - -pub fn RuntimeSink::file_reopen_truncate(self : RuntimeSink) -> Bool { - match self { - File(sink) => sink.reopen_truncate() - QueuedFile(sink) => sink.sink.reopen_truncate() - _ => false - } -} - -pub fn RuntimeSink::file_append_mode(self : RuntimeSink) -> Bool { - match self { - File(sink) => sink.append_mode() - QueuedFile(sink) => sink.sink.append_mode() - _ => false - } -} - -pub fn RuntimeSink::file_set_append_mode(self : RuntimeSink, append : Bool) -> Bool { - match self { - File(sink) => { - sink.set_append_mode(append) - true - } - QueuedFile(sink) => { - sink.sink.set_append_mode(append) - true - } - _ => false - } -} - -pub fn RuntimeSink::file_path(self : RuntimeSink) -> String { - match self { - File(sink) => sink.path() - QueuedFile(sink) => sink.sink.path() - _ => "" - } -} - -pub fn RuntimeSink::file_auto_flush(self : RuntimeSink) -> Bool { - match self { - File(sink) => sink.auto_flush_enabled() - QueuedFile(sink) => sink.sink.auto_flush_enabled() - _ => false - } -} - -pub fn RuntimeSink::file_rotation_enabled(self : RuntimeSink) -> Bool { - match self { - File(sink) => sink.rotation_enabled() - QueuedFile(sink) => sink.sink.rotation_enabled() - _ => false - } -} - -pub fn RuntimeSink::file_rotation_config(self : RuntimeSink) -> FileRotation? { - match self { - File(sink) => sink.rotation_config() - QueuedFile(sink) => sink.sink.rotation_config() - _ => None - } -} - -pub fn RuntimeSink::file_set_auto_flush(self : RuntimeSink, enabled : Bool) -> Bool { - match self { - File(sink) => { - sink.set_auto_flush(enabled) - true - } - QueuedFile(sink) => { - sink.sink.set_auto_flush(enabled) - true - } - _ => false - } -} - -pub fn RuntimeSink::file_set_policy(self : RuntimeSink, policy : FileSinkPolicy) -> Bool { - match self { - File(sink) => { - sink.set_policy(policy) - true - } - QueuedFile(sink) => { - sink.sink.set_policy(policy) - true - } - _ => false - } -} - -pub fn RuntimeSink::file_set_rotation(self : RuntimeSink, rotation : FileRotation?) -> Bool { - match self { - File(sink) => { - sink.set_rotation(rotation) - true - } - QueuedFile(sink) => { - sink.sink.set_rotation(rotation) - true - } - _ => false - } -} - -pub fn RuntimeSink::file_clear_rotation(self : RuntimeSink) -> Bool { - match self { - File(sink) => { - sink.clear_rotation() - true - } - QueuedFile(sink) => { - sink.sink.clear_rotation() - true - } - _ => false - } -} - -pub fn RuntimeSink::file_flush(self : RuntimeSink) -> Bool { - match self { - File(sink) => sink.flush() - QueuedFile(sink) => { - ignore(sink.flush()) - sink.sink.flush() - } - _ => false - } -} - -pub fn RuntimeSink::file_close(self : RuntimeSink) -> Bool { - match self { - File(sink) => sink.close() - QueuedFile(sink) => { - ignore(sink.flush()) - sink.sink.close() - } - _ => false - } -} - -pub fn RuntimeSink::file_open_failures(self : RuntimeSink) -> Int { - match self { - File(sink) => sink.open_failures() - QueuedFile(sink) => sink.sink.open_failures() - _ => 0 - } -} - -pub fn RuntimeSink::file_write_failures(self : RuntimeSink) -> Int { - match self { - File(sink) => sink.write_failures() - QueuedFile(sink) => sink.sink.write_failures() - _ => 0 - } -} - -pub fn RuntimeSink::file_flush_failures(self : RuntimeSink) -> Int { - match self { - File(sink) => sink.flush_failures() - QueuedFile(sink) => sink.sink.flush_failures() - _ => 0 - } -} - -pub fn RuntimeSink::file_rotation_failures(self : RuntimeSink) -> Int { - match self { - File(sink) => sink.rotation_failures() - QueuedFile(sink) => sink.sink.rotation_failures() - _ => 0 - } -} - -pub fn RuntimeSink::file_reset_failure_counters(self : RuntimeSink) -> Bool { - match self { - File(sink) => { - sink.reset_failure_counters() - true - } - QueuedFile(sink) => { - sink.sink.reset_failure_counters() - true - } - _ => false - } -} - -pub fn RuntimeSink::file_reset_policy(self : RuntimeSink) -> Bool { - match self { - File(sink) => { - sink.reset_policy() - true - } - QueuedFile(sink) => { - sink.sink.reset_policy() - true - } - _ => false - } -} - -pub fn RuntimeSink::file_policy(self : RuntimeSink) -> FileSinkPolicy { - match self { - File(sink) => sink.policy() - QueuedFile(sink) => sink.sink.policy() - _ => FileSinkPolicy::new(append=false, auto_flush=false, rotation=None) - } -} - -pub fn RuntimeSink::file_default_policy(self : RuntimeSink) -> FileSinkPolicy { - match self { - File(sink) => sink.default_policy() - QueuedFile(sink) => sink.sink.default_policy() - _ => FileSinkPolicy::new(append=false, auto_flush=false, rotation=None) - } -} - -pub fn RuntimeSink::file_policy_matches_default(self : RuntimeSink) -> Bool { - match self { - File(sink) => sink.policy_matches_default() - QueuedFile(sink) => sink.sink.policy_matches_default() - _ => false - } -} - -pub fn RuntimeSink::file_state(self : RuntimeSink) -> FileSinkState { - match self { - File(sink) => sink.state() - QueuedFile(sink) => sink.sink.state() - _ => FileSinkState::new( - "", - available=false, - append=false, - auto_flush=false, - rotation=None, - open_failures=0, - write_failures=0, - flush_failures=0, - rotation_failures=0, - ) - } -} - -pub fn RuntimeSink::file_runtime_state(self : RuntimeSink) -> RuntimeFileState? { - match self { - File(sink) => Some(RuntimeFileState::new(sink.state())) - QueuedFile(sink) => Some( - RuntimeFileState::new( - sink.sink.state(), - queued=true, - pending_count=sink.pending_count(), - dropped_count=sink.dropped_count(), - ), - ) - _ => None - } -} - -pub type ConfiguredLogger = Logger[RuntimeSink] - -pub fn ConfiguredLogger::flush(self : ConfiguredLogger) -> Int { - self.sink.flush() -} - -pub fn ConfiguredLogger::drain(self : ConfiguredLogger, max_items~ : Int = -1) -> Int { - self.sink.drain(max_items=max_items) -} - -pub fn ConfiguredLogger::close(self : ConfiguredLogger) -> Bool { - self.sink.close() -} - -pub fn ConfiguredLogger::pending_count(self : ConfiguredLogger) -> Int { - self.sink.pending_count() -} - -pub fn ConfiguredLogger::dropped_count(self : ConfiguredLogger) -> Int { - self.sink.dropped_count() -} - -pub fn ConfiguredLogger::file_available(self : ConfiguredLogger) -> Bool { - self.sink.file_available() -} - -pub fn ConfiguredLogger::file_reopen(self : ConfiguredLogger, append~ : Bool? = None) -> Bool { - self.sink.file_reopen(append=append) -} - -pub fn ConfiguredLogger::file_reopen_with_current_policy(self : ConfiguredLogger) -> Bool { - self.sink.file_reopen_with_current_policy() -} - -pub fn ConfiguredLogger::file_reopen_append(self : ConfiguredLogger) -> Bool { - self.sink.file_reopen_append() -} - -pub fn ConfiguredLogger::file_reopen_truncate(self : ConfiguredLogger) -> Bool { - self.sink.file_reopen_truncate() -} - -pub fn ConfiguredLogger::file_append_mode(self : ConfiguredLogger) -> Bool { - self.sink.file_append_mode() -} - -pub fn ConfiguredLogger::file_set_append_mode(self : ConfiguredLogger, append : Bool) -> Bool { - self.sink.file_set_append_mode(append) -} - -pub fn ConfiguredLogger::file_path(self : ConfiguredLogger) -> String { - self.sink.file_path() -} - -pub fn ConfiguredLogger::file_auto_flush(self : ConfiguredLogger) -> Bool { - self.sink.file_auto_flush() -} - -pub fn ConfiguredLogger::file_rotation_enabled(self : ConfiguredLogger) -> Bool { - self.sink.file_rotation_enabled() -} - -pub fn ConfiguredLogger::file_rotation_config(self : ConfiguredLogger) -> FileRotation? { - self.sink.file_rotation_config() -} - -pub fn ConfiguredLogger::file_set_auto_flush(self : ConfiguredLogger, enabled : Bool) -> Bool { - self.sink.file_set_auto_flush(enabled) -} - -pub fn ConfiguredLogger::file_set_policy(self : ConfiguredLogger, policy : FileSinkPolicy) -> Bool { - self.sink.file_set_policy(policy) -} - -pub fn ConfiguredLogger::file_set_rotation( - self : ConfiguredLogger, - rotation : FileRotation?, -) -> Bool { - self.sink.file_set_rotation(rotation) -} - -pub fn ConfiguredLogger::file_clear_rotation(self : ConfiguredLogger) -> Bool { - self.sink.file_clear_rotation() -} - -pub fn ConfiguredLogger::file_flush(self : ConfiguredLogger) -> Bool { - self.sink.file_flush() -} - -pub fn ConfiguredLogger::file_close(self : ConfiguredLogger) -> Bool { - self.sink.file_close() -} - -pub fn ConfiguredLogger::file_open_failures(self : ConfiguredLogger) -> Int { - self.sink.file_open_failures() -} - -pub fn ConfiguredLogger::file_write_failures(self : ConfiguredLogger) -> Int { - self.sink.file_write_failures() -} - -pub fn ConfiguredLogger::file_flush_failures(self : ConfiguredLogger) -> Int { - self.sink.file_flush_failures() -} - -pub fn ConfiguredLogger::file_rotation_failures(self : ConfiguredLogger) -> Int { - self.sink.file_rotation_failures() -} - -pub fn ConfiguredLogger::file_reset_failure_counters(self : ConfiguredLogger) -> Bool { - self.sink.file_reset_failure_counters() -} - -pub fn ConfiguredLogger::file_reset_policy(self : ConfiguredLogger) -> Bool { - self.sink.file_reset_policy() -} - -pub fn ConfiguredLogger::file_policy(self : ConfiguredLogger) -> FileSinkPolicy { - self.sink.file_policy() -} - -pub fn ConfiguredLogger::file_default_policy(self : ConfiguredLogger) -> FileSinkPolicy { - self.sink.file_default_policy() -} - -pub fn ConfiguredLogger::file_policy_matches_default(self : ConfiguredLogger) -> Bool { - self.sink.file_policy_matches_default() -} - -pub fn ConfiguredLogger::file_state(self : ConfiguredLogger) -> FileSinkState { - self.sink.file_state() -} - -pub fn ConfiguredLogger::file_runtime_state(self : ConfiguredLogger) -> RuntimeFileState? { - self.sink.file_runtime_state() -} fn expect_object( value : @json_parser.JsonValue, @@ -1139,59 +535,3 @@ pub fn stringify_logger_config(config : LoggerConfig, pretty~ : Bool = false) -> @json_parser.stringify(value) } } - -fn build_runtime_sink(config : SinkConfig) -> RuntimeSink { - match config.kind { - SinkKind::Console => RuntimeSink::Console(console_sink()) - SinkKind::JsonConsole => RuntimeSink::JsonConsole(json_console_sink()) - SinkKind::TextConsole => RuntimeSink::TextConsole( - text_console_sink(config.text_formatter.to_formatter()), - ) - SinkKind::File => RuntimeSink::File( - file_sink( - config.path, - append=config.append, - auto_flush=config.auto_flush, - rotation=config.rotation, - formatter=fn(rec) { - format_text(rec, formatter=config.text_formatter.to_formatter()) - }, - ), - ) - } -} - -fn apply_queue_config(sink : RuntimeSink, queue : QueueConfig) -> RuntimeSink { - match sink { - Console(inner) => RuntimeSink::QueuedConsole( - queued_sink(inner, max_pending=queue.max_pending, overflow=queue.overflow), - ) - JsonConsole(inner) => RuntimeSink::QueuedJsonConsole( - queued_sink(inner, max_pending=queue.max_pending, overflow=queue.overflow), - ) - TextConsole(inner) => RuntimeSink::QueuedTextConsole( - queued_sink(inner, max_pending=queue.max_pending, overflow=queue.overflow), - ) - File(inner) => RuntimeSink::QueuedFile( - queued_sink(inner, max_pending=queue.max_pending, overflow=queue.overflow), - ) - QueuedConsole(_) => sink - QueuedJsonConsole(_) => sink - QueuedTextConsole(_) => sink - QueuedFile(_) => sink - } -} - -pub fn build_logger(config : LoggerConfig) -> ConfiguredLogger { - let sink = build_runtime_sink(config.sink) - let actual_sink = match config.queue { - None => sink - Some(queue) => apply_queue_config(sink, queue) - } - Logger::new(actual_sink, min_level=config.min_level, target=config.target) - .with_timestamp(enabled=config.timestamp) -} - -pub fn parse_and_build_logger(input : String) -> ConfiguredLogger raise ConfigError { - build_logger(parse_logger_config_text(input)) -} diff --git a/src/runtime_logger.mbt b/src/runtime_logger.mbt new file mode 100644 index 0000000..55fbca3 --- /dev/null +++ b/src/runtime_logger.mbt @@ -0,0 +1,660 @@ +pub(all) enum RuntimeSink { + Console(ConsoleSink) + JsonConsole(JsonConsoleSink) + TextConsole(FormattedConsoleSink) + File(FileSink) + QueuedConsole(QueuedSink[ConsoleSink]) + QueuedJsonConsole(QueuedSink[JsonConsoleSink]) + QueuedTextConsole(QueuedSink[FormattedConsoleSink]) + QueuedFile(QueuedSink[FileSink]) +} + +pub struct RuntimeFileState { + file : FileSinkState + queued : Bool + pending_count : Int + dropped_count : Int +} + +pub fn RuntimeFileState::new( + file : FileSinkState, + queued~ : Bool = false, + pending_count~ : Int = 0, + dropped_count~ : Int = 0, +) -> RuntimeFileState { + { file, queued, pending_count, dropped_count } +} + +fn file_sink_policy_to_json_value(policy : FileSinkPolicy) -> @json_parser.JsonValue { + let obj : Map[String, @json_parser.JsonValue] = { + "append": @json_parser.JsonValue::Bool(policy.append), + "auto_flush": @json_parser.JsonValue::Bool(policy.auto_flush), + } + match policy.rotation { + None => obj["rotation"] = @json_parser.JsonValue::Null + Some(rotation) => obj["rotation"] = file_rotation_config_to_json(rotation) + } + @json_parser.JsonValue::Object(obj) +} + +pub fn file_sink_policy_to_json(policy : FileSinkPolicy) -> @json_parser.JsonValue { + file_sink_policy_to_json_value(policy) +} + +pub fn stringify_file_sink_policy(policy : FileSinkPolicy, pretty~ : Bool = false) -> String { + let value = file_sink_policy_to_json_value(policy) + if pretty { + @json_parser.stringify_pretty(value, 2) + } else { + @json_parser.stringify(value) + } +} + +fn file_sink_state_to_json_value(state : FileSinkState) -> @json_parser.JsonValue { + let obj : Map[String, @json_parser.JsonValue] = { + "path": @json_parser.JsonValue::String(state.path), + "available": @json_parser.JsonValue::Bool(state.available), + "append": @json_parser.JsonValue::Bool(state.append), + "auto_flush": @json_parser.JsonValue::Bool(state.auto_flush), + "open_failures": @json_parser.JsonValue::Number(state.open_failures.to_double()), + "write_failures": @json_parser.JsonValue::Number(state.write_failures.to_double()), + "flush_failures": @json_parser.JsonValue::Number(state.flush_failures.to_double()), + "rotation_failures": @json_parser.JsonValue::Number(state.rotation_failures.to_double()), + } + match state.rotation { + None => obj["rotation"] = @json_parser.JsonValue::Null + Some(rotation) => obj["rotation"] = file_rotation_config_to_json(rotation) + } + @json_parser.JsonValue::Object(obj) +} + +pub fn file_sink_state_to_json(state : FileSinkState) -> @json_parser.JsonValue { + file_sink_state_to_json_value(state) +} + +pub fn stringify_file_sink_state(state : FileSinkState, pretty~ : Bool = false) -> String { + let value = file_sink_state_to_json_value(state) + if pretty { + @json_parser.stringify_pretty(value, 2) + } else { + @json_parser.stringify(value) + } +} + +pub fn runtime_file_state_to_json(state : RuntimeFileState) -> @json_parser.JsonValue { + @json_parser.JsonValue::Object({ + "file": file_sink_state_to_json_value(state.file), + "queued": @json_parser.JsonValue::Bool(state.queued), + "pending_count": @json_parser.JsonValue::Number(state.pending_count.to_double()), + "dropped_count": @json_parser.JsonValue::Number(state.dropped_count.to_double()), + }) +} + +pub fn stringify_runtime_file_state(state : RuntimeFileState, pretty~ : Bool = false) -> String { + let value = runtime_file_state_to_json(state) + if pretty { + @json_parser.stringify_pretty(value, 2) + } else { + @json_parser.stringify(value) + } +} + +pub impl Sink for RuntimeSink with write(self, rec) { + match self { + Console(sink) => sink.write(rec) + JsonConsole(sink) => sink.write(rec) + TextConsole(sink) => sink.write(rec) + File(sink) => sink.write(rec) + QueuedConsole(sink) => sink.write(rec) + QueuedJsonConsole(sink) => sink.write(rec) + QueuedTextConsole(sink) => sink.write(rec) + QueuedFile(sink) => sink.write(rec) + } +} + +pub fn RuntimeSink::flush(self : RuntimeSink) -> Int { + match self { + Console(_) => 0 + JsonConsole(_) => 0 + TextConsole(_) => 0 + File(sink) => if sink.flush() { 1 } else { 0 } + QueuedConsole(sink) => sink.flush() + QueuedJsonConsole(sink) => sink.flush() + QueuedTextConsole(sink) => sink.flush() + QueuedFile(sink) => sink.flush() + } +} + +pub fn RuntimeSink::drain(self : RuntimeSink, max_items~ : Int = -1) -> Int { + match self { + Console(_) => 0 + JsonConsole(_) => 0 + TextConsole(_) => 0 + File(sink) => if sink.flush() { 1 } else { 0 } + QueuedConsole(sink) => sink.drain(max_items=max_items) + QueuedJsonConsole(sink) => sink.drain(max_items=max_items) + QueuedTextConsole(sink) => sink.drain(max_items=max_items) + QueuedFile(sink) => sink.drain(max_items=max_items) + } +} + +pub fn RuntimeSink::close(self : RuntimeSink) -> Bool { + match self { + Console(_) => true + JsonConsole(_) => true + TextConsole(_) => true + File(sink) => sink.close() + QueuedConsole(_) => true + QueuedJsonConsole(_) => true + QueuedTextConsole(_) => true + QueuedFile(sink) => sink.sink.close() + } +} + +pub fn RuntimeSink::pending_count(self : RuntimeSink) -> Int { + match self { + Console(_) => 0 + JsonConsole(_) => 0 + TextConsole(_) => 0 + File(_) => 0 + QueuedConsole(sink) => sink.pending_count() + QueuedJsonConsole(sink) => sink.pending_count() + QueuedTextConsole(sink) => sink.pending_count() + QueuedFile(sink) => sink.pending_count() + } +} + +pub fn RuntimeSink::dropped_count(self : RuntimeSink) -> Int { + match self { + Console(_) => 0 + JsonConsole(_) => 0 + TextConsole(_) => 0 + File(_) => 0 + QueuedConsole(sink) => sink.dropped_count() + QueuedJsonConsole(sink) => sink.dropped_count() + QueuedTextConsole(sink) => sink.dropped_count() + QueuedFile(sink) => sink.dropped_count() + } +} + +pub fn RuntimeSink::file_available(self : RuntimeSink) -> Bool { + match self { + File(sink) => sink.is_available() + QueuedFile(sink) => sink.sink.is_available() + _ => false + } +} + +pub fn RuntimeSink::file_reopen(self : RuntimeSink, append~ : Bool? = None) -> Bool { + match self { + File(sink) => sink.reopen(append=append) + QueuedFile(sink) => sink.sink.reopen(append=append) + _ => false + } +} + +pub fn RuntimeSink::file_reopen_with_current_policy(self : RuntimeSink) -> Bool { + match self { + File(sink) => sink.reopen_with_current_policy() + QueuedFile(sink) => sink.sink.reopen_with_current_policy() + _ => false + } +} + +pub fn RuntimeSink::file_reopen_append(self : RuntimeSink) -> Bool { + match self { + File(sink) => sink.reopen_append() + QueuedFile(sink) => sink.sink.reopen_append() + _ => false + } +} + +pub fn RuntimeSink::file_reopen_truncate(self : RuntimeSink) -> Bool { + match self { + File(sink) => sink.reopen_truncate() + QueuedFile(sink) => sink.sink.reopen_truncate() + _ => false + } +} + +pub fn RuntimeSink::file_append_mode(self : RuntimeSink) -> Bool { + match self { + File(sink) => sink.append_mode() + QueuedFile(sink) => sink.sink.append_mode() + _ => false + } +} + +pub fn RuntimeSink::file_set_append_mode(self : RuntimeSink, append : Bool) -> Bool { + match self { + File(sink) => { + sink.set_append_mode(append) + true + } + QueuedFile(sink) => { + sink.sink.set_append_mode(append) + true + } + _ => false + } +} + +pub fn RuntimeSink::file_path(self : RuntimeSink) -> String { + match self { + File(sink) => sink.path() + QueuedFile(sink) => sink.sink.path() + _ => "" + } +} + +pub fn RuntimeSink::file_auto_flush(self : RuntimeSink) -> Bool { + match self { + File(sink) => sink.auto_flush_enabled() + QueuedFile(sink) => sink.sink.auto_flush_enabled() + _ => false + } +} + +pub fn RuntimeSink::file_rotation_enabled(self : RuntimeSink) -> Bool { + match self { + File(sink) => sink.rotation_enabled() + QueuedFile(sink) => sink.sink.rotation_enabled() + _ => false + } +} + +pub fn RuntimeSink::file_rotation_config(self : RuntimeSink) -> FileRotation? { + match self { + File(sink) => sink.rotation_config() + QueuedFile(sink) => sink.sink.rotation_config() + _ => None + } +} + +pub fn RuntimeSink::file_set_auto_flush(self : RuntimeSink, enabled : Bool) -> Bool { + match self { + File(sink) => { + sink.set_auto_flush(enabled) + true + } + QueuedFile(sink) => { + sink.sink.set_auto_flush(enabled) + true + } + _ => false + } +} + +pub fn RuntimeSink::file_set_policy(self : RuntimeSink, policy : FileSinkPolicy) -> Bool { + match self { + File(sink) => { + sink.set_policy(policy) + true + } + QueuedFile(sink) => { + sink.sink.set_policy(policy) + true + } + _ => false + } +} + +pub fn RuntimeSink::file_set_rotation(self : RuntimeSink, rotation : FileRotation?) -> Bool { + match self { + File(sink) => { + sink.set_rotation(rotation) + true + } + QueuedFile(sink) => { + sink.sink.set_rotation(rotation) + true + } + _ => false + } +} + +pub fn RuntimeSink::file_clear_rotation(self : RuntimeSink) -> Bool { + match self { + File(sink) => { + sink.clear_rotation() + true + } + QueuedFile(sink) => { + sink.sink.clear_rotation() + true + } + _ => false + } +} + +pub fn RuntimeSink::file_flush(self : RuntimeSink) -> Bool { + match self { + File(sink) => sink.flush() + QueuedFile(sink) => { + ignore(sink.flush()) + sink.sink.flush() + } + _ => false + } +} + +pub fn RuntimeSink::file_close(self : RuntimeSink) -> Bool { + match self { + File(sink) => sink.close() + QueuedFile(sink) => { + ignore(sink.flush()) + sink.sink.close() + } + _ => false + } +} + +pub fn RuntimeSink::file_open_failures(self : RuntimeSink) -> Int { + match self { + File(sink) => sink.open_failures() + QueuedFile(sink) => sink.sink.open_failures() + _ => 0 + } +} + +pub fn RuntimeSink::file_write_failures(self : RuntimeSink) -> Int { + match self { + File(sink) => sink.write_failures() + QueuedFile(sink) => sink.sink.write_failures() + _ => 0 + } +} + +pub fn RuntimeSink::file_flush_failures(self : RuntimeSink) -> Int { + match self { + File(sink) => sink.flush_failures() + QueuedFile(sink) => sink.sink.flush_failures() + _ => 0 + } +} + +pub fn RuntimeSink::file_rotation_failures(self : RuntimeSink) -> Int { + match self { + File(sink) => sink.rotation_failures() + QueuedFile(sink) => sink.sink.rotation_failures() + _ => 0 + } +} + +pub fn RuntimeSink::file_reset_failure_counters(self : RuntimeSink) -> Bool { + match self { + File(sink) => { + sink.reset_failure_counters() + true + } + QueuedFile(sink) => { + sink.sink.reset_failure_counters() + true + } + _ => false + } +} + +pub fn RuntimeSink::file_reset_policy(self : RuntimeSink) -> Bool { + match self { + File(sink) => { + sink.reset_policy() + true + } + QueuedFile(sink) => { + sink.sink.reset_policy() + true + } + _ => false + } +} + +pub fn RuntimeSink::file_policy(self : RuntimeSink) -> FileSinkPolicy { + match self { + File(sink) => sink.policy() + QueuedFile(sink) => sink.sink.policy() + _ => FileSinkPolicy::new(append=false, auto_flush=false, rotation=None) + } +} + +pub fn RuntimeSink::file_default_policy(self : RuntimeSink) -> FileSinkPolicy { + match self { + File(sink) => sink.default_policy() + QueuedFile(sink) => sink.sink.default_policy() + _ => FileSinkPolicy::new(append=false, auto_flush=false, rotation=None) + } +} + +pub fn RuntimeSink::file_policy_matches_default(self : RuntimeSink) -> Bool { + match self { + File(sink) => sink.policy_matches_default() + QueuedFile(sink) => sink.sink.policy_matches_default() + _ => false + } +} + +pub fn RuntimeSink::file_state(self : RuntimeSink) -> FileSinkState { + match self { + File(sink) => sink.state() + QueuedFile(sink) => sink.sink.state() + _ => FileSinkState::new( + "", + available=false, + append=false, + auto_flush=false, + rotation=None, + open_failures=0, + write_failures=0, + flush_failures=0, + rotation_failures=0, + ) + } +} + +pub fn RuntimeSink::file_runtime_state(self : RuntimeSink) -> RuntimeFileState? { + match self { + File(sink) => Some(RuntimeFileState::new(sink.state())) + QueuedFile(sink) => Some( + RuntimeFileState::new( + sink.sink.state(), + queued=true, + pending_count=sink.pending_count(), + dropped_count=sink.dropped_count(), + ), + ) + _ => None + } +} + +pub type ConfiguredLogger = Logger[RuntimeSink] + +pub fn ConfiguredLogger::flush(self : ConfiguredLogger) -> Int { + self.sink.flush() +} + +pub fn ConfiguredLogger::drain(self : ConfiguredLogger, max_items~ : Int = -1) -> Int { + self.sink.drain(max_items=max_items) +} + +pub fn ConfiguredLogger::close(self : ConfiguredLogger) -> Bool { + self.sink.close() +} + +pub fn ConfiguredLogger::pending_count(self : ConfiguredLogger) -> Int { + self.sink.pending_count() +} + +pub fn ConfiguredLogger::dropped_count(self : ConfiguredLogger) -> Int { + self.sink.dropped_count() +} + +pub fn ConfiguredLogger::file_available(self : ConfiguredLogger) -> Bool { + self.sink.file_available() +} + +pub fn ConfiguredLogger::file_reopen(self : ConfiguredLogger, append~ : Bool? = None) -> Bool { + self.sink.file_reopen(append=append) +} + +pub fn ConfiguredLogger::file_reopen_with_current_policy(self : ConfiguredLogger) -> Bool { + self.sink.file_reopen_with_current_policy() +} + +pub fn ConfiguredLogger::file_reopen_append(self : ConfiguredLogger) -> Bool { + self.sink.file_reopen_append() +} + +pub fn ConfiguredLogger::file_reopen_truncate(self : ConfiguredLogger) -> Bool { + self.sink.file_reopen_truncate() +} + +pub fn ConfiguredLogger::file_append_mode(self : ConfiguredLogger) -> Bool { + self.sink.file_append_mode() +} + +pub fn ConfiguredLogger::file_set_append_mode(self : ConfiguredLogger, append : Bool) -> Bool { + self.sink.file_set_append_mode(append) +} + +pub fn ConfiguredLogger::file_path(self : ConfiguredLogger) -> String { + self.sink.file_path() +} + +pub fn ConfiguredLogger::file_auto_flush(self : ConfiguredLogger) -> Bool { + self.sink.file_auto_flush() +} + +pub fn ConfiguredLogger::file_rotation_enabled(self : ConfiguredLogger) -> Bool { + self.sink.file_rotation_enabled() +} + +pub fn ConfiguredLogger::file_rotation_config(self : ConfiguredLogger) -> FileRotation? { + self.sink.file_rotation_config() +} + +pub fn ConfiguredLogger::file_set_auto_flush(self : ConfiguredLogger, enabled : Bool) -> Bool { + self.sink.file_set_auto_flush(enabled) +} + +pub fn ConfiguredLogger::file_set_policy(self : ConfiguredLogger, policy : FileSinkPolicy) -> Bool { + self.sink.file_set_policy(policy) +} + +pub fn ConfiguredLogger::file_set_rotation( + self : ConfiguredLogger, + rotation : FileRotation?, +) -> Bool { + self.sink.file_set_rotation(rotation) +} + +pub fn ConfiguredLogger::file_clear_rotation(self : ConfiguredLogger) -> Bool { + self.sink.file_clear_rotation() +} + +pub fn ConfiguredLogger::file_flush(self : ConfiguredLogger) -> Bool { + self.sink.file_flush() +} + +pub fn ConfiguredLogger::file_close(self : ConfiguredLogger) -> Bool { + self.sink.file_close() +} + +pub fn ConfiguredLogger::file_open_failures(self : ConfiguredLogger) -> Int { + self.sink.file_open_failures() +} + +pub fn ConfiguredLogger::file_write_failures(self : ConfiguredLogger) -> Int { + self.sink.file_write_failures() +} + +pub fn ConfiguredLogger::file_flush_failures(self : ConfiguredLogger) -> Int { + self.sink.file_flush_failures() +} + +pub fn ConfiguredLogger::file_rotation_failures(self : ConfiguredLogger) -> Int { + self.sink.file_rotation_failures() +} + +pub fn ConfiguredLogger::file_reset_failure_counters(self : ConfiguredLogger) -> Bool { + self.sink.file_reset_failure_counters() +} + +pub fn ConfiguredLogger::file_reset_policy(self : ConfiguredLogger) -> Bool { + self.sink.file_reset_policy() +} + +pub fn ConfiguredLogger::file_policy(self : ConfiguredLogger) -> FileSinkPolicy { + self.sink.file_policy() +} + +pub fn ConfiguredLogger::file_default_policy(self : ConfiguredLogger) -> FileSinkPolicy { + self.sink.file_default_policy() +} + +pub fn ConfiguredLogger::file_policy_matches_default(self : ConfiguredLogger) -> Bool { + self.sink.file_policy_matches_default() +} + +pub fn ConfiguredLogger::file_state(self : ConfiguredLogger) -> FileSinkState { + self.sink.file_state() +} + +pub fn ConfiguredLogger::file_runtime_state(self : ConfiguredLogger) -> RuntimeFileState? { + self.sink.file_runtime_state() +} + +fn build_runtime_sink(config : SinkConfig) -> RuntimeSink { + match config.kind { + SinkKind::Console => RuntimeSink::Console(console_sink()) + SinkKind::JsonConsole => RuntimeSink::JsonConsole(json_console_sink()) + SinkKind::TextConsole => RuntimeSink::TextConsole( + text_console_sink(config.text_formatter.to_formatter()), + ) + SinkKind::File => RuntimeSink::File( + file_sink( + config.path, + append=config.append, + auto_flush=config.auto_flush, + rotation=config.rotation, + formatter=fn(rec) { + format_text(rec, formatter=config.text_formatter.to_formatter()) + }, + ), + ) + } +} + +fn apply_queue_config(sink : RuntimeSink, queue : QueueConfig) -> RuntimeSink { + match sink { + Console(inner) => RuntimeSink::QueuedConsole( + queued_sink(inner, max_pending=queue.max_pending, overflow=queue.overflow), + ) + JsonConsole(inner) => RuntimeSink::QueuedJsonConsole( + queued_sink(inner, max_pending=queue.max_pending, overflow=queue.overflow), + ) + TextConsole(inner) => RuntimeSink::QueuedTextConsole( + queued_sink(inner, max_pending=queue.max_pending, overflow=queue.overflow), + ) + File(inner) => RuntimeSink::QueuedFile( + queued_sink(inner, max_pending=queue.max_pending, overflow=queue.overflow), + ) + QueuedConsole(_) => sink + QueuedJsonConsole(_) => sink + QueuedTextConsole(_) => sink + QueuedFile(_) => sink + } +} + +pub fn build_logger(config : LoggerConfig) -> ConfiguredLogger { + let sink = build_runtime_sink(config.sink) + let actual_sink = match config.queue { + None => sink + Some(queue) => apply_queue_config(sink, queue) + } + Logger::new(actual_sink, min_level=config.min_level, target=config.target) + .with_timestamp(enabled=config.timestamp) +} + +pub fn parse_and_build_logger(input : String) -> ConfiguredLogger raise ConfigError { + build_logger(parse_logger_config_text(input)) +}