diff --git a/bitlogger/file_backend_native.mbt b/bitlogger/file_backend_native.mbt index f9245e5..472536c 100644 --- a/bitlogger/file_backend_native.mbt +++ b/bitlogger/file_backend_native.mbt @@ -32,28 +32,28 @@ fn string_to_c_bytes(str : String) -> Bytes { } #external -priv type NativeFileHandle +type NativeFileHandle #borrow(path, mode) -extern "c" fn file_open_ffi(path : Bytes, mode : Bytes) -> NativeFileHandle = "fopen" +extern "C" fn file_open_ffi(path : Bytes, mode : Bytes) -> NativeFileHandle = "fopen" -extern "c" fn file_is_null_ffi(handle : NativeFileHandle) -> Bool = "%null?" +extern "C" fn file_is_null_ffi(handle : NativeFileHandle) -> Bool = "moonbitlang_async_pointer_is_null" #borrow(buffer) -extern "c" fn file_write_ffi( +extern "C" fn file_write_ffi( buffer : Bytes, size : Int, count : Int, handle : NativeFileHandle, ) -> Int = "fwrite" -extern "c" fn file_flush_ffi(handle : NativeFileHandle) -> Int = "fflush" +extern "C" fn file_flush_ffi(handle : NativeFileHandle) -> Int = "fflush" -extern "c" fn file_close_ffi(handle : NativeFileHandle) -> Int = "fclose" +extern "C" fn file_close_ffi(handle : NativeFileHandle) -> Int = "fclose" pub struct FileHandle { - raw : NativeFileHandle path : String + raw : NativeFileHandle } fn open_file_handle_internal(path : String, append : Bool) -> FileHandle? { diff --git a/bitlogger/record.mbt b/bitlogger/record.mbt index ebe44d1..027b37e 100644 --- a/bitlogger/record.mbt +++ b/bitlogger/record.mbt @@ -15,7 +15,7 @@ pub struct Record { fields : Array[Field] } -fn record( +pub fn Record::new( level : Level, message : String, timestamp_ms~ : UInt64 = 0UL, @@ -24,3 +24,13 @@ fn record( ) -> Record { { level, timestamp_ms, target, message, fields } } + +fn record( + level : Level, + message : String, + timestamp_ms~ : UInt64 = 0UL, + target~ : String = "", + fields~ : Array[Field] = [], +) -> Record { + Record::new(level, message, timestamp_ms=timestamp_ms, target=target, fields=fields) +} diff --git a/bitlogger_async/async_logger_native.mbt b/bitlogger_async/async_logger_native.mbt new file mode 100644 index 0000000..eac2a87 --- /dev/null +++ b/bitlogger_async/async_logger_native.mbt @@ -0,0 +1,260 @@ +pub(all) suberror AsyncLoggerClosed { + AsyncLoggerClosed +} + +pub(all) enum AsyncOverflowPolicy { + Blocking + DropOldest + DropNewest +} + +pub struct AsyncLoggerConfig { + max_pending : Int + overflow : AsyncOverflowPolicy +} + +pub fn AsyncLoggerConfig::new( + max_pending~ : Int = 0, + overflow~ : AsyncOverflowPolicy = AsyncOverflowPolicy::Blocking, +) -> AsyncLoggerConfig { + { max_pending, overflow } +} + +pub struct AsyncLogger[S] { + min_level : @bitlogger.Level + target : String + timestamp : Bool + overflow : AsyncOverflowPolicy + sink : S + context_fields : Array[@bitlogger.Field] + filter : (@bitlogger.Record) -> Bool + patch : @bitlogger.RecordPatch + queue : @async.Queue[@bitlogger.Record] + pending_count : Ref[Int] + dropped_count : Ref[Int] +} + +pub fn[S] async_logger( + sink : S, + config~ : AsyncLoggerConfig = AsyncLoggerConfig::new(), + min_level~ : @bitlogger.Level = @bitlogger.Level::Info, + target~ : String = "", +) -> AsyncLogger[S] { + { + min_level, + target, + timestamp: false, + overflow: config.overflow, + sink, + context_fields: [], + filter: fn(_) { true }, + patch: @bitlogger.identity_patch(), + queue: @async.Queue::new(kind=queue_kind_of(config)), + pending_count: Ref::new(0), + dropped_count: Ref::new(0), + } +} + +fn queue_kind_of(config : AsyncLoggerConfig) -> @aqueue.Kind { + let limit = if config.max_pending < 0 { 0 } else { config.max_pending } + match config.overflow { + AsyncOverflowPolicy::Blocking => @aqueue.Kind::Blocking(limit) + AsyncOverflowPolicy::DropOldest => @aqueue.Kind::DiscardOldest(limit) + AsyncOverflowPolicy::DropNewest => @aqueue.Kind::DiscardLatest(limit) + } +} + +pub fn[S] AsyncLogger::with_timestamp(self : AsyncLogger[S], enabled~ : Bool = true) -> AsyncLogger[S] { + { ..self, timestamp: enabled } +} + +pub fn[S] AsyncLogger::with_target(self : AsyncLogger[S], target : String) -> AsyncLogger[S] { + { ..self, target } +} + +pub fn[S] AsyncLogger::with_context_fields( + self : AsyncLogger[S], + fields : Array[@bitlogger.Field], +) -> AsyncLogger[S] { + { ..self, context_fields: fields } +} + +pub fn[S] AsyncLogger::with_filter( + self : AsyncLogger[S], + predicate : (@bitlogger.Record) -> Bool, +) -> AsyncLogger[S] { + let current = self.filter + { + ..self, + filter: fn(rec) { + current(rec) && predicate(rec) + }, + } +} + +pub fn[S] AsyncLogger::with_patch( + self : AsyncLogger[S], + patch : @bitlogger.RecordPatch, +) -> AsyncLogger[S] { + let current = self.patch + { + ..self, + patch: fn(rec) { + patch(current(rec)) + }, + } +} + +pub fn[S] AsyncLogger::with_min_level( + self : AsyncLogger[S], + min_level : @bitlogger.Level, +) -> AsyncLogger[S] { + { ..self, min_level } +} + +fn combine_targets(parent : String, child : String) -> String { + if parent == "" { + child + } else if child == "" { + parent + } else { + "\{parent}.\{child}" + } +} + +pub fn[S] AsyncLogger::child(self : AsyncLogger[S], target : String) -> AsyncLogger[S] { + { ..self, target: combine_targets(self.target, target) } +} + +pub fn[S] AsyncLogger::is_enabled(self : AsyncLogger[S], level : @bitlogger.Level) -> Bool { + level.enabled(self.min_level) +} + +pub async fn[S] AsyncLogger::log( + self : AsyncLogger[S], + level : @bitlogger.Level, + message : String, + fields~ : Array[@bitlogger.Field] = [], + target? : String = "", +) -> Unit { + guard self.is_enabled(level) else { + () + } + let actual_target = if target == "" { self.target } else { target } + let timestamp_ms = if self.timestamp { @env.now() } else { 0UL } + let rec = @bitlogger.Record::new( + level, + message, + timestamp_ms=timestamp_ms, + target=actual_target, + fields=merge_fields(self.context_fields, fields), + ) + let rec = (self.patch)(rec) + guard (self.filter)(rec) else { + () + } + let accepted = self.queue.try_put(rec) catch { + err if err is AsyncLoggerClosed => false + err => raise err + } + if accepted { + self.pending_count.val += 1 + } else { + match self.overflow { + AsyncOverflowPolicy::Blocking => { + self.queue.put(rec) catch { + err if err is AsyncLoggerClosed => () + err => raise err + } + self.pending_count.val += 1 + } + AsyncOverflowPolicy::DropOldest | AsyncOverflowPolicy::DropNewest => { + self.dropped_count.val += 1 + } + } + } +} + +fn merge_fields( + left : Array[@bitlogger.Field], + right : Array[@bitlogger.Field], +) -> Array[@bitlogger.Field] { + if left.length() == 0 { + right + } else if right.length() == 0 { + left + } else { + left + right + } +} + +pub async fn[S] AsyncLogger::trace( + self : AsyncLogger[S], + message : String, + fields~ : Array[@bitlogger.Field] = [], +) -> Unit { + self.log(@bitlogger.Level::Trace, message, fields=fields) +} + +pub async fn[S] AsyncLogger::debug( + self : AsyncLogger[S], + message : String, + fields~ : Array[@bitlogger.Field] = [], +) -> Unit { + self.log(@bitlogger.Level::Debug, message, fields=fields) +} + +pub async fn[S] AsyncLogger::info( + self : AsyncLogger[S], + message : String, + fields~ : Array[@bitlogger.Field] = [], +) -> Unit { + self.log(@bitlogger.Level::Info, message, fields=fields) +} + +pub async fn[S] AsyncLogger::warn( + self : AsyncLogger[S], + message : String, + fields~ : Array[@bitlogger.Field] = [], +) -> Unit { + self.log(@bitlogger.Level::Warn, message, fields=fields) +} + +pub async fn[S] AsyncLogger::error( + self : AsyncLogger[S], + message : String, + fields~ : Array[@bitlogger.Field] = [], +) -> Unit { + self.log(@bitlogger.Level::Error, message, fields=fields) +} + +pub fn[S] AsyncLogger::pending_count(self : AsyncLogger[S]) -> Int { + self.pending_count.val +} + +pub fn[S] AsyncLogger::dropped_count(self : AsyncLogger[S]) -> Int { + self.dropped_count.val +} + +pub fn[S] AsyncLogger::close(self : AsyncLogger[S], clear? : Bool = false) -> Unit { + self.queue.close(error=AsyncLoggerClosed, clear=clear) +} + +pub async fn[S] AsyncLogger::wait_idle(self : AsyncLogger[S]) -> Unit { + while self.pending_count() > 0 { + @async.pause() + } +} + +pub async fn[S : @bitlogger.Sink] AsyncLogger::run(self : AsyncLogger[S]) -> Unit { + while true { + let rec = self.queue.get() catch { + err if err is AsyncLoggerClosed => break + err => raise err + } + self.sink.write(rec) + if self.pending_count.val > 0 { + self.pending_count.val -= 1 + } + } +} diff --git a/bitlogger_async/async_logger_stub.mbt b/bitlogger_async/async_logger_stub.mbt new file mode 100644 index 0000000..19fc113 --- /dev/null +++ b/bitlogger_async/async_logger_stub.mbt @@ -0,0 +1,36 @@ +pub(all) suberror AsyncLoggerClosed { + AsyncLoggerClosed +} + +pub(all) enum AsyncOverflowPolicy { + Blocking + DropOldest + DropNewest +} + +pub struct AsyncLoggerConfig { + max_pending : Int + overflow : AsyncOverflowPolicy +} + +pub fn AsyncLoggerConfig::new( + max_pending~ : Int = 0, + overflow~ : AsyncOverflowPolicy = AsyncOverflowPolicy::Blocking, +) -> AsyncLoggerConfig { + { max_pending, overflow } +} + +pub struct AsyncLogger[S] {} + +pub fn async_logger[S : @bitlogger.Sink]( + sink : S, + config~ : AsyncLoggerConfig = AsyncLoggerConfig::new(), + min_level~ : @bitlogger.Level = @bitlogger.Level::Info, + target~ : String = "", +) -> AsyncLogger[S] { + ignore(sink) + ignore(config) + ignore(min_level) + ignore(target) + abort("bitlogger_async currently only supports native/llvm backends") +} diff --git a/bitlogger_async/moon.pkg b/bitlogger_async/moon.pkg new file mode 100644 index 0000000..daafd9f --- /dev/null +++ b/bitlogger_async/moon.pkg @@ -0,0 +1,16 @@ +import { + "Nanaloveyuki/BitLogger/bitlogger" @bitlogger, + "moonbitlang/async" @async, + "moonbitlang/async/aqueue" @aqueue, + "moonbitlang/core/env" @env, + "moonbitlang/core/ref", +} + +supported_targets = "+native" + +options( + targets: { + "async_logger_native.mbt": [ "native", "llvm" ], + "async_logger_stub.mbt": [ "js", "wasm", "wasm-gc" ], + }, +) diff --git a/examples/async_basic/main.mbt b/examples/async_basic/main.mbt new file mode 100644 index 0000000..23a1942 --- /dev/null +++ b/examples/async_basic/main.mbt @@ -0,0 +1,27 @@ +async fn main { + let logger = @lib_async.async_logger( + @lib.text_console_sink(@lib.text_formatter(show_timestamp=false, separator=" | ")), + config=@lib_async.AsyncLoggerConfig::new( + max_pending=2, + overflow=@lib_async.AsyncOverflowPolicy::DropOldest, + ), + min_level=@lib.Level::Info, + target="async.demo", + ) + .with_timestamp() + .with_context_fields([@lib.field("service", "bitlogger")]) + .with_filter(@lib.target_has_prefix("async")) + .with_patch(@lib.compose_patches([ + @lib.prefix_message("[async] "), + @lib.redact_fields(["token"]), + ])) + + @async.with_task_group(group => { + group.spawn_bg(allow_failure=true, () => logger.run()) + logger.info("one", fields=[@lib.field("token", "secret")]) + logger.child("worker").info("two") + logger.with_target("skip.demo").info("three") + logger.wait_idle() + logger.close() + }) +} diff --git a/examples/async_basic/moon.pkg b/examples/async_basic/moon.pkg new file mode 100644 index 0000000..f67357c --- /dev/null +++ b/examples/async_basic/moon.pkg @@ -0,0 +1,11 @@ +import { + "Nanaloveyuki/BitLogger/bitlogger" @lib, + "Nanaloveyuki/BitLogger/bitlogger_async" @lib_async, + "moonbitlang/async" @async, +} + +supported_targets = "+native" + +options( + is_main: true, +) diff --git a/moon.mod.json b/moon.mod.json index 4599e91..5769993 100644 --- a/moon.mod.json +++ b/moon.mod.json @@ -2,7 +2,8 @@ "name": "Nanaloveyuki/BitLogger", "version": "0.2.0", "deps": { - "maria/json_parser": "0.1.1" + "maria/json_parser": "0.1.1", + "moonbitlang/async": "0.18.1" }, "readme": "README.mbt.md", "repository": "https://github.com/Nanaloveyuki/BitLogger", @@ -13,4 +14,4 @@ "moonbit" ], "description": "A structured logger for MoonBit." -} +} \ No newline at end of file