From 50013f7f0d832ad9c022b8d5fecd768018d4fe42 Mon Sep 17 00:00:00 2001 From: Nanaloveyuki Date: Fri, 8 May 2026 16:24:20 +0800 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20Add=20composable=20logger=20core=20?= =?UTF-8?q?utilities?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bitlogger/BitLogger_wbtest.mbt | 345 ++++++++++++++++++++++++++++++ bitlogger/file_backend_native.mbt | 85 ++++++++ bitlogger/file_backend_stub.mbt | 31 +++ bitlogger/filters.mbt | 75 +++++++ bitlogger/formatter.mbt | 54 +++-- bitlogger/level.mbt | 4 +- bitlogger/logger.mbt | 31 +++ bitlogger/moon.pkg | 8 + bitlogger/patchers.mbt | 69 ++++++ bitlogger/sinks.mbt | 242 ++++++++++++++++++++- moon.mod.json | 2 +- 11 files changed, 928 insertions(+), 18 deletions(-) create mode 100644 bitlogger/file_backend_native.mbt create mode 100644 bitlogger/file_backend_stub.mbt create mode 100644 bitlogger/filters.mbt create mode 100644 bitlogger/patchers.mbt diff --git a/bitlogger/BitLogger_wbtest.mbt b/bitlogger/BitLogger_wbtest.mbt index 7e34081..0081285 100644 --- a/bitlogger/BitLogger_wbtest.mbt +++ b/bitlogger/BitLogger_wbtest.mbt @@ -12,6 +12,78 @@ test "logger can enable timestamps" { inspect(logger.timestamp, content="true") } +test "text formatter can customize visible parts" { + let rec = record( + Level::Info, + "hello", + timestamp_ms=123UL, + target="svc.api", + fields=[field("user", "alice"), field("request_id", "42")], + ) + let compact = text_formatter(show_timestamp=false, show_target=false, field_separator=",") + inspect(format_text(rec, formatter=compact), content="[INFO] hello user=alice,request_id=42") +} + +test "text formatter can emit message only" { + let rec = record( + Level::Warn, + "just message", + timestamp_ms=999UL, + target="svc", + fields=[field("ignored", "yes")], + ) + let message_only = text_formatter( + show_timestamp=false, + show_level=false, + show_target=false, + show_fields=false, + ) + inspect(format_text(rec, formatter=message_only), content="just message") +} + +test "formatted callback sink receives rendered text" { + let rendered : Ref[String] = Ref::new("") + let sink = text_callback_sink( + text_formatter(show_timestamp=false, separator=" | "), + fn(text) { + rendered.val = text + }, + ) + let logger = Logger::new(sink, min_level=Level::Info, target="svc") + logger.info("hello", fields=[field("user", "alice")]) + inspect(rendered.val, content="[INFO] | [svc] | hello | user=alice") +} + +test "native file support flag is queryable" { + inspect(native_files_supported() == true || native_files_supported() == false, content="true") +} + +test "file sink availability reflects backend support" { + let sink = file_sink("bitlogger-test.log") + inspect(sink.is_available() == native_files_supported(), content="true") + if sink.is_available() { + inspect(sink.flush(), content="true") + inspect(sink.close(), content="true") + } else { + inspect(sink.flush(), content="false") + inspect(sink.close(), content="false") + } +} + +test "json formatter keeps structured shape" { + let rec = record( + Level::Error, + "failed", + timestamp_ms=55UL, + target="svc", + fields=[field("code", "500")], + ) + inspect( + format_json(rec), + content="{\"level\":\"ERROR\",\"message\":\"failed\",\"fields\":{\"code\":\"500\"},\"timestamp_ms\":\"55\",\"target\":\"svc\"}", + ) +} + test "callback sink receives record" { let captured_target : Ref[String] = Ref::new("") let captured_message : Ref[String] = Ref::new("") @@ -52,3 +124,276 @@ test "callback sink sees child target and context logger shape" { inspect(captured_field_count.val, content="2") inspect(captured_timestamp.val > 0UL, content="true") } + +test "buffered sink flushes manually" { + let flushed_messages : Ref[Array[String]] = Ref::new([]) + let sink = buffered_sink( + callback_sink(fn(rec) { + flushed_messages.val.push(rec.message) + }), + flush_limit=10, + ) + let logger = Logger::new(sink, min_level=Level::Info, target="buffered") + logger.info("one") + logger.info("two") + inspect(sink.pending_count(), content="2") + inspect(flushed_messages.val.length(), content="0") + sink.flush() + inspect(sink.pending_count(), content="0") + inspect(flushed_messages.val.length(), content="2") + inspect(flushed_messages.val[0], content="one") + inspect(flushed_messages.val[1], content="two") +} + +test "buffered sink flushes automatically at limit" { + let flushed_messages : Ref[Array[String]] = Ref::new([]) + let sink = buffered_sink( + callback_sink(fn(rec) { + flushed_messages.val.push(rec.message) + }), + flush_limit=2, + ) + let logger = Logger::new(sink, min_level=Level::Info, target="buffered") + logger.info("one") + inspect(sink.pending_count(), content="1") + logger.info("two") + inspect(sink.pending_count(), content="0") + inspect(flushed_messages.val.length(), content="2") + inspect(flushed_messages.val[0], content="one") + inspect(flushed_messages.val[1], content="two") +} + +test "filter sink only forwards matching records" { + let flushed_messages : Ref[Array[String]] = Ref::new([]) + let sink = filter_sink( + callback_sink(fn(rec) { + flushed_messages.val.push(rec.message) + }), + fn(rec) { + rec.target == "kept" + }, + ) + let kept = Logger::new(sink, min_level=Level::Info, target="kept") + let dropped = Logger::new(sink, min_level=Level::Info, target="dropped") + kept.info("one") + dropped.info("two") + kept.info("three") + inspect(flushed_messages.val.length(), content="2") + inspect(flushed_messages.val[0], content="one") + inspect(flushed_messages.val[1], content="three") +} + +test "logger with_filter composes naturally" { + let flushed_messages : Ref[Array[String]] = Ref::new([]) + let logger = Logger::new( + callback_sink(fn(rec) { + flushed_messages.val.push(rec.message) + }), + min_level=Level::Info, + target="app", + ) + .with_filter(fn(rec) { + rec.target == "app.worker" + }) + logger.info("drop at app") + logger.child("worker").info("keep at worker") + inspect(flushed_messages.val.length(), content="1") + inspect(flushed_messages.val[0], content="keep at worker") +} + +test "filter helpers support target level and message composition" { + let flushed_messages : Ref[Array[String]] = Ref::new([]) + let logger = Logger::new( + callback_sink(fn(rec) { + flushed_messages.val.push(rec.message) + }), + min_level=Level::Trace, + target="service", + ).with_filter(all_of([ + target_has_prefix("service"), + level_at_least(Level::Info), + message_contains("visible"), + ])) + logger.debug("visible debug") + logger.info("hidden info") + logger.child("api").info("visible info") + inspect(flushed_messages.val.length(), content="1") + inspect(flushed_messages.val[0], content="visible info") +} + +test "field helpers can match and negate records" { + let flushed_messages : Ref[Array[String]] = Ref::new([]) + let logger = Logger::new( + callback_sink(fn(rec) { + flushed_messages.val.push(rec.message) + }), + min_level=Level::Info, + target="fields", + ).with_filter(all_of([ + has_field("request_id"), + field_equals("kind", "audit"), + not_(target_is("fields.drop")), + ])) + logger.info("missing field") + logger.info("wrong kind", fields=[field("request_id", "1"), field("kind", "trace")]) + logger.child("drop").info("blocked target", fields=[field("request_id", "2"), field("kind", "audit")]) + logger.info("kept", fields=[field("request_id", "3"), field("kind", "audit")]) + inspect(flushed_messages.val.length(), content="1") + inspect(flushed_messages.val[0], content="kept") +} + +test "any_of helper accepts multiple predicates" { + let flushed_messages : Ref[Array[String]] = Ref::new([]) + let logger = Logger::new( + callback_sink(fn(rec) { + flushed_messages.val.push(rec.message) + }), + min_level=Level::Info, + target="multi", + ).with_filter(any_of([ + target_is("multi.keep"), + field_equals("force", "true"), + ])) + logger.info("drop") + logger.child("keep").info("keep by target") + logger.info("keep by field", fields=[field("force", "true")]) + inspect(flushed_messages.val.length(), content="2") + inspect(flushed_messages.val[0], content="keep by target") + inspect(flushed_messages.val[1], content="keep by field") +} + +test "patch sink can rewrite message target and fields" { + let captured_target : Ref[String] = Ref::new("") + let captured_message : Ref[String] = Ref::new("") + let captured_fields : Ref[Array[Field]] = Ref::new([]) + let logger = Logger::new( + callback_sink(fn(rec) { + captured_target.val = rec.target + captured_message.val = rec.message + captured_fields.val = rec.fields + }), + min_level=Level::Info, + target="auth", + ).with_patch(compose_patches([ + set_target("audit.auth"), + prefix_message("[safe] "), + redact_field("token"), + append_fields([field("service", "bitlogger")]), + ])) + logger.info("login", fields=[field("token", "secret"), field("user", "alice")]) + inspect(captured_target.val, content="audit.auth") + inspect(captured_message.val, content="[safe] login") + inspect(captured_fields.val.length(), content="3") + inspect(captured_fields.val[0].key, content="token") + inspect(captured_fields.val[0].value, content="***") + inspect(captured_fields.val[1].key, content="user") + inspect(captured_fields.val[1].value, content="alice") + inspect(captured_fields.val[2].key, content="service") + inspect(captured_fields.val[2].value, content="bitlogger") +} + +test "patch helpers can redact multiple fields" { + let captured_fields : Ref[Array[Field]] = Ref::new([]) + let logger = Logger::new( + callback_sink(fn(rec) { + captured_fields.val = rec.fields + }), + min_level=Level::Info, + target="audit", + ).with_patch(redact_fields(["token", "password"], placeholder="[redacted]")) + logger.info( + "credentials", + fields=[field("token", "abc"), field("password", "123"), field("user", "alice")], + ) + inspect(captured_fields.val.length(), content="3") + inspect(captured_fields.val[0].value, content="[redacted]") + inspect(captured_fields.val[1].value, content="[redacted]") + inspect(captured_fields.val[2].value, content="alice") +} + +test "queued sink drains in order" { + let flushed_messages : Ref[Array[String]] = Ref::new([]) + let sink = queued_sink( + callback_sink(fn(rec) { + flushed_messages.val.push(rec.message) + }), + ) + let logger = Logger::new(sink, min_level=Level::Info, target="queue") + logger.info("one") + logger.info("two") + logger.info("three") + inspect(sink.pending_count(), content="3") + inspect(sink.dropped_count(), content="0") + inspect(sink.drain(max_items=2), content="2") + inspect(sink.pending_count(), content="1") + inspect(flushed_messages.val.length(), content="2") + inspect(flushed_messages.val[0], content="one") + inspect(flushed_messages.val[1], content="two") + inspect(sink.flush(), content="1") + inspect(sink.pending_count(), content="0") + inspect(flushed_messages.val[2], content="three") +} + +test "queued sink can drop newest when full" { + let flushed_messages : Ref[Array[String]] = Ref::new([]) + let sink = queued_sink( + callback_sink(fn(rec) { + flushed_messages.val.push(rec.message) + }), + max_pending=2, + overflow=QueueOverflowPolicy::DropNewest, + ) + let logger = Logger::new(sink, min_level=Level::Info, target="queue") + logger.info("one") + logger.info("two") + logger.info("three") + inspect(sink.pending_count(), content="2") + inspect(sink.dropped_count(), content="1") + inspect(sink.flush(), content="2") + inspect(flushed_messages.val.length(), content="2") + inspect(flushed_messages.val[0], content="one") + inspect(flushed_messages.val[1], content="two") +} + +test "queued sink can drop oldest when full" { + let flushed_messages : Ref[Array[String]] = Ref::new([]) + let sink = queued_sink( + callback_sink(fn(rec) { + flushed_messages.val.push(rec.message) + }), + max_pending=2, + overflow=QueueOverflowPolicy::DropOldest, + ) + let logger = Logger::new(sink, min_level=Level::Info, target="queue") + logger.info("one") + logger.info("two") + logger.info("three") + inspect(sink.pending_count(), content="2") + inspect(sink.dropped_count(), content="1") + inspect(sink.flush(), content="2") + inspect(flushed_messages.val.length(), content="2") + inspect(flushed_messages.val[0], content="two") + inspect(flushed_messages.val[1], content="three") +} + +test "logger with_queue preserves chaining ergonomics" { + let flushed_messages : Ref[Array[String]] = Ref::new([]) + let logger = Logger::new( + callback_sink(fn(rec) { + flushed_messages.val.push(rec.message) + }), + min_level=Level::Info, + target="service", + ) + .with_patch(prefix_message("[queued] ")) + .with_queue(max_pending=2, overflow=QueueOverflowPolicy::DropOldest) + logger.info("one") + logger.child("api").info("two") + logger.info("three") + inspect(logger.sink.pending_count(), content="2") + inspect(logger.sink.dropped_count(), content="1") + inspect(logger.sink.flush(), content="2") + inspect(flushed_messages.val.length(), content="2") + inspect(flushed_messages.val[0], content="[queued] two") + inspect(flushed_messages.val[1], content="[queued] three") +} diff --git a/bitlogger/file_backend_native.mbt b/bitlogger/file_backend_native.mbt new file mode 100644 index 0000000..f9245e5 --- /dev/null +++ b/bitlogger/file_backend_native.mbt @@ -0,0 +1,85 @@ +fn string_to_c_bytes(str : String) -> Bytes { + let res : Array[Byte] = [] + let len = str.length() + let mut i = 0 + while i < len { + let mut c = str.code_unit_at(i).to_int() + if 0xD800 <= c && c <= 0xDBFF { + c -= 0xD800 + i = i + 1 + let l = str.code_unit_at(i).to_int() - 0xDC00 + c = (c << 10) + l + 0x10000 + } + if c < 0x80 { + res.push(c.to_byte()) + } else if c < 0x800 { + res.push((0xc0 + (c >> 6)).to_byte()) + res.push((0x80 + (c & 0x3f)).to_byte()) + } else if c < 0x10000 { + res.push((0xe0 + (c >> 12)).to_byte()) + res.push((0x80 + ((c >> 6) & 0x3f)).to_byte()) + res.push((0x80 + (c & 0x3f)).to_byte()) + } else { + res.push((0xf0 + (c >> 18)).to_byte()) + res.push((0x80 + ((c >> 12) & 0x3f)).to_byte()) + res.push((0x80 + ((c >> 6) & 0x3f)).to_byte()) + res.push((0x80 + (c & 0x3f)).to_byte()) + } + i = i + 1 + } + res.push((0).to_byte()) + Bytes::from_array(res) +} + +#external +priv type NativeFileHandle + +#borrow(path, mode) +extern "c" fn file_open_ffi(path : Bytes, mode : Bytes) -> NativeFileHandle = "fopen" + +extern "c" fn file_is_null_ffi(handle : NativeFileHandle) -> Bool = "%null?" + +#borrow(buffer) +extern "c" fn file_write_ffi( + buffer : Bytes, + size : Int, + count : Int, + handle : NativeFileHandle, +) -> Int = "fwrite" + +extern "c" fn file_flush_ffi(handle : NativeFileHandle) -> Int = "fflush" + +extern "c" fn file_close_ffi(handle : NativeFileHandle) -> Int = "fclose" + +pub struct FileHandle { + raw : NativeFileHandle + path : String +} + +fn open_file_handle_internal(path : String, append : Bool) -> FileHandle? { + let mode = if append { "ab" } else { "wb" } + let raw = file_open_ffi(string_to_c_bytes(path), string_to_c_bytes(mode)) + if file_is_null_ffi(raw) { + None + } else { + Some({ raw, path }) + } +} + +fn write_file_handle_internal(handle : FileHandle, content : String) -> Bool { + let bytes = string_to_c_bytes(content) + let written = file_write_ffi(bytes, 1, bytes.length() - 1, handle.raw) + written == bytes.length() - 1 +} + +fn flush_file_handle_internal(handle : FileHandle) -> Bool { + file_flush_ffi(handle.raw) == 0 +} + +fn close_file_handle_internal(handle : FileHandle) -> Bool { + file_close_ffi(handle.raw) == 0 +} + +fn native_files_supported_internal() -> Bool { + true +} diff --git a/bitlogger/file_backend_stub.mbt b/bitlogger/file_backend_stub.mbt new file mode 100644 index 0000000..aa623a4 --- /dev/null +++ b/bitlogger/file_backend_stub.mbt @@ -0,0 +1,31 @@ +pub struct FileHandle { + path : String +} + +fn open_file_handle_internal(path : String, append : Bool) -> FileHandle? { + ignore(append) + ignore(path) + let _unused : FileHandle = { path: "" } + ignore(_unused) + None +} + +fn write_file_handle_internal(handle : FileHandle, content : String) -> Bool { + ignore(handle) + ignore(content) + false +} + +fn flush_file_handle_internal(handle : FileHandle) -> Bool { + ignore(handle) + false +} + +fn close_file_handle_internal(handle : FileHandle) -> Bool { + ignore(handle) + false +} + +fn native_files_supported_internal() -> Bool { + false +} diff --git a/bitlogger/filters.mbt b/bitlogger/filters.mbt new file mode 100644 index 0000000..0fcfee1 --- /dev/null +++ b/bitlogger/filters.mbt @@ -0,0 +1,75 @@ +pub type RecordPredicate = (Record) -> Bool + +pub fn level_at_least(min_level : Level) -> RecordPredicate { + fn(rec) { + rec.level.priority() >= min_level.priority() + } +} + +pub fn target_is(target : String) -> RecordPredicate { + fn(rec) { + rec.target == target + } +} + +pub fn target_has_prefix(prefix : String) -> RecordPredicate { + fn(rec) { + rec.target.has_prefix(prefix) + } +} + +pub fn message_contains(fragment : String) -> RecordPredicate { + fn(rec) { + rec.message.contains(fragment) + } +} + +pub fn has_field(key : String) -> RecordPredicate { + fn(rec) { + for field in rec.fields { + if field.key == key { + return true + } + } + false + } +} + +pub fn field_equals(key : String, value : String) -> RecordPredicate { + fn(rec) { + for field in rec.fields { + if field.key == key && field.value == value { + return true + } + } + false + } +} + +pub fn not_(predicate : RecordPredicate) -> RecordPredicate { + fn(rec) { + !(predicate(rec)) + } +} + +pub fn all_of(predicates : Array[RecordPredicate]) -> RecordPredicate { + fn(rec) { + for predicate in predicates { + if !(predicate(rec)) { + return false + } + } + true + } +} + +pub fn any_of(predicates : Array[RecordPredicate]) -> RecordPredicate { + fn(rec) { + for predicate in predicates { + if predicate(rec) { + return true + } + } + false + } +} diff --git a/bitlogger/formatter.mbt b/bitlogger/formatter.mbt index cc0f9a0..0598361 100644 --- a/bitlogger/formatter.mbt +++ b/bitlogger/formatter.mbt @@ -1,3 +1,25 @@ +pub type RecordFormatter = (Record) -> String + +pub struct TextFormatter { + show_timestamp : Bool + show_level : Bool + show_target : Bool + show_fields : Bool + separator : String + field_separator : String +} + +pub fn text_formatter( + show_timestamp~ : Bool = true, + show_level~ : Bool = true, + show_target~ : Bool = true, + show_fields~ : Bool = true, + separator~ : String = " ", + field_separator~ : String = " ", +) -> TextFormatter { + { show_timestamp, show_level, show_target, show_fields, separator, field_separator } +} + fn fields_to_json(fields : Array[Field]) -> Json { let obj : Map[String, Json] = {} for item in fields { @@ -6,26 +28,32 @@ fn fields_to_json(fields : Array[Field]) -> Json { Json::object(obj) } -fn format_record(rec : Record) -> String { - let prefix = if rec.timestamp_ms == 0UL { - "[\{rec.level.label()}]" - } else { - "[\{rec.timestamp_ms.to_string()}] [\{rec.level.label()}]" +fn format_fields(fields : Array[Field], separator : String) -> String { + fields.map(fn(f) { "\{f.key}=\{f.value}" }).join(separator) +} + +pub fn format_text(rec : Record, formatter~ : TextFormatter = text_formatter()) -> String { + let parts : Array[String] = [] + if formatter.show_timestamp && rec.timestamp_ms != 0UL { + parts.push("[\{rec.timestamp_ms.to_string()}]") } - let base = if rec.target == "" { - "\{prefix} \{rec.message}" - } else { - "\{prefix} [\{rec.target}] \{rec.message}" + if formatter.show_level { + parts.push("[\{rec.level.label()}]") } - if rec.fields.length() == 0 { + if formatter.show_target && rec.target != "" { + parts.push("[\{rec.target}]") + } + parts.push(rec.message) + let base = parts.join(formatter.separator) + if !formatter.show_fields || rec.fields.length() == 0 { base } else { - let details = rec.fields.map(fn(f) { "\{f.key}=\{f.value}" }).join(" ") - "\{base} \{details}" + let details = format_fields(rec.fields, formatter.field_separator) + "\{base}\{formatter.separator}\{details}" } } -fn format_record_json(rec : Record) -> String { +pub fn format_json(rec : Record) -> String { let obj : Map[String, Json] = { "level": Json::string(rec.level.label()), "message": Json::string(rec.message), diff --git a/bitlogger/level.mbt b/bitlogger/level.mbt index 83f99d1..e81ea30 100644 --- a/bitlogger/level.mbt +++ b/bitlogger/level.mbt @@ -6,7 +6,7 @@ pub(all) enum Level { Error } -fn Level::priority(self : Level) -> Int { +pub fn Level::priority(self : Level) -> Int { match self { Level::Trace => 10 Level::Debug => 20 @@ -26,6 +26,6 @@ pub fn Level::label(self : Level) -> String { } } -fn Level::enabled(self : Level, min_level : Level) -> Bool { +pub fn Level::enabled(self : Level, min_level : Level) -> Bool { self.priority() >= min_level.priority() } diff --git a/bitlogger/logger.mbt b/bitlogger/logger.mbt index 684361a..2305b67 100644 --- a/bitlogger/logger.mbt +++ b/bitlogger/logger.mbt @@ -36,6 +36,37 @@ pub fn[S] Logger::with_context_fields(self : Logger[S], fields : Array[Field]) - } } +pub fn[S] Logger::with_filter(self : Logger[S], predicate : (Record) -> Bool) -> Logger[FilterSink[S]] { + { + min_level: self.min_level, + sink: filter_sink(self.sink, predicate), + target: self.target, + timestamp: self.timestamp, + } +} + +pub fn[S] Logger::with_patch(self : Logger[S], patch : RecordPatch) -> Logger[PatchSink[S]] { + { + min_level: self.min_level, + sink: patch_sink(self.sink, patch), + target: self.target, + timestamp: self.timestamp, + } +} + +pub fn[S] Logger::with_queue( + self : Logger[S], + max_pending~ : Int = 0, + overflow~ : QueueOverflowPolicy = QueueOverflowPolicy::DropNewest, +) -> Logger[QueuedSink[S]] { + { + min_level: self.min_level, + sink: queued_sink(self.sink, max_pending=max_pending, overflow=overflow), + target: self.target, + timestamp: self.timestamp, + } +} + pub fn[S] Logger::with_min_level(self : Logger[S], min_level : Level) -> Logger[S] { { ..self, min_level } } diff --git a/bitlogger/moon.pkg b/bitlogger/moon.pkg index 5ebe50c..76c5852 100644 --- a/bitlogger/moon.pkg +++ b/bitlogger/moon.pkg @@ -3,5 +3,13 @@ import { "moonbitlang/core/builtin", "moonbitlang/core/env" @env, "moonbitlang/core/json", + "moonbitlang/core/queue" @queue, "moonbitlang/core/ref", } + +options( + targets: { + "file_backend_native.mbt": [ "native", "llvm" ], + "file_backend_stub.mbt": [ "js", "wasm", "wasm-gc" ], + }, +) diff --git a/bitlogger/patchers.mbt b/bitlogger/patchers.mbt new file mode 100644 index 0000000..e5f348e --- /dev/null +++ b/bitlogger/patchers.mbt @@ -0,0 +1,69 @@ +pub type RecordPatch = (Record) -> Record + +pub fn identity_patch() -> RecordPatch { + fn(rec) { rec } +} + +pub fn set_target(target : String) -> RecordPatch { + fn(rec) { + { ..rec, target } + } +} + +pub fn prefix_message(prefix : String) -> RecordPatch { + fn(rec) { + { ..rec, message: "\{prefix}\{rec.message}" } + } +} + +pub fn append_fields(extra_fields : Array[Field]) -> RecordPatch { + fn(rec) { + if extra_fields.length() == 0 { + rec + } else if rec.fields.length() == 0 { + { ..rec, fields: extra_fields } + } else { + { ..rec, fields: rec.fields + extra_fields } + } + } +} + +pub fn redact_field(key : String, placeholder~ : String = "***") -> RecordPatch { + fn(rec) { + { + ..rec, + fields: rec.fields.map(fn(field) { + if field.key == key { + { ..field, value: placeholder } + } else { + field + } + }), + } + } +} + +pub fn redact_fields(keys : Array[String], placeholder~ : String = "***") -> RecordPatch { + fn(rec) { + { + ..rec, + fields: rec.fields.map(fn(field) { + if keys.contains(field.key) { + { ..field, value: placeholder } + } else { + field + } + }), + } + } +} + +pub fn compose_patches(patches : Array[RecordPatch]) -> RecordPatch { + fn(rec) { + let mut current = rec + for patch in patches { + current = patch(current) + } + current + } +} diff --git a/bitlogger/sinks.mbt b/bitlogger/sinks.mbt index e91f556..7543a15 100644 --- a/bitlogger/sinks.mbt +++ b/bitlogger/sinks.mbt @@ -12,7 +12,7 @@ pub fn console_sink() -> ConsoleSink { pub impl Sink for ConsoleSink with write(self, rec) { ignore(self) - println(format_record(rec)) + println(format_text(rec)) } pub struct ContextSink[S] { @@ -41,7 +41,110 @@ pub fn json_console_sink() -> JsonConsoleSink { pub impl Sink for JsonConsoleSink with write(self, rec) { ignore(self) - println(format_record_json(rec)) + println(format_json(rec)) +} + +pub struct FileSink { + handle : Ref[FileHandle?] + formatter : RecordFormatter + auto_flush : Bool +} + +pub fn native_files_supported() -> Bool { + native_files_supported_internal() +} + +pub fn file_sink( + path : String, + append~ : Bool = true, + auto_flush~ : Bool = true, + formatter~ : RecordFormatter = fn(rec) { + format_text(rec) + }, +) -> FileSink { + { + handle: Ref::new(open_file_handle_internal(path, append)), + formatter, + auto_flush, + } +} + +pub fn FileSink::is_available(self : FileSink) -> Bool { + self.handle.val is Some(_) +} + +pub fn FileSink::flush(self : FileSink) -> Bool { + match self.handle.val { + None => false + Some(handle) => flush_file_handle_internal(handle) + } +} + +pub fn FileSink::close(self : FileSink) -> Bool { + match self.handle.val { + None => false + Some(handle) => { + let ok = close_file_handle_internal(handle) + self.handle.val = None + ok + } + } +} + +pub impl Sink for FileSink with write(self, rec) { + match self.handle.val { + None => () + Some(handle) => { + let line = "\{(self.formatter)(rec)}\n" + ignore(write_file_handle_internal(handle, line)) + if self.auto_flush { + ignore(flush_file_handle_internal(handle)) + } + } + } +} + +pub struct FormattedConsoleSink { + formatter : RecordFormatter +} + +pub fn formatted_console_sink(formatter : RecordFormatter) -> FormattedConsoleSink { + { formatter, } +} + +pub fn text_console_sink(formatter : TextFormatter) -> FormattedConsoleSink { + formatted_console_sink(fn(rec) { + format_text(rec, formatter=formatter) + }) +} + +pub impl Sink for FormattedConsoleSink with write(self, rec) { + println((self.formatter)(rec)) +} + +pub struct FormattedCallbackSink { + formatter : RecordFormatter + callback : (String) -> Unit +} + +pub fn formatted_callback_sink( + formatter : RecordFormatter, + callback : (String) -> Unit, +) -> FormattedCallbackSink { + { formatter, callback } +} + +pub fn text_callback_sink( + formatter : TextFormatter, + callback : (String) -> Unit, +) -> FormattedCallbackSink { + formatted_callback_sink(fn(rec) { + format_text(rec, formatter=formatter) + }, callback) +} + +pub impl Sink for FormattedCallbackSink with write(self, rec) { + (self.callback)((self.formatter)(rec)) } pub struct FanoutSink[A, B] { @@ -69,3 +172,138 @@ pub fn callback_sink(callback : (Record) -> Unit) -> CallbackSink { pub impl Sink for CallbackSink with write(self, rec) { (self.callback)(rec) } + +pub struct BufferedSink[S] { + sink : S + buffer : Ref[Array[Record]] + flush_limit : Int +} + +pub fn[S] buffered_sink(sink : S, flush_limit~ : Int = 1) -> BufferedSink[S] { + let actual_limit = if flush_limit <= 0 { 1 } else { flush_limit } + { sink, buffer: Ref::new([]), flush_limit: actual_limit } +} + +pub fn[S] BufferedSink::pending_count(self : BufferedSink[S]) -> Int { + self.buffer.val.length() +} + +pub fn[S : Sink] BufferedSink::flush(self : BufferedSink[S]) -> Unit { + if self.buffer.val.length() == 0 { + () + } else { + let pending = self.buffer.val + self.buffer.val = [] + for rec in pending { + self.sink.write(rec) + } + } +} + +pub impl[S : Sink] Sink for BufferedSink[S] with write(self, rec) { + self.buffer.val.push(rec) + if self.buffer.val.length() >= self.flush_limit { + self.flush() + } +} + +pub(all) enum QueueOverflowPolicy { + DropNewest + DropOldest +} + +pub struct QueuedSink[S] { + sink : S + queue : @queue.Queue[Record] + max_pending : Int + overflow : QueueOverflowPolicy + dropped_count : Ref[Int] +} + +pub fn[S] queued_sink( + sink : S, + max_pending~ : Int = 0, + overflow~ : QueueOverflowPolicy = QueueOverflowPolicy::DropNewest, +) -> QueuedSink[S] { + { + sink, + queue: @queue.Queue::new(), + max_pending, + overflow, + dropped_count: Ref::new(0), + } +} + +pub fn[S] QueuedSink::pending_count(self : QueuedSink[S]) -> Int { + self.queue.length() +} + +pub fn[S] QueuedSink::dropped_count(self : QueuedSink[S]) -> Int { + self.dropped_count.val +} + +pub fn[S : Sink] QueuedSink::drain(self : QueuedSink[S], max_items~ : Int = -1) -> Int { + if max_items == 0 { + return 0 + } + let limit = if max_items < 0 { self.pending_count() } else { max_items } + for drained = 0; drained < limit; { + match self.queue.pop() { + None => break drained + Some(rec) => { + self.sink.write(rec) + continue drained + 1 + } + } + } nobreak { + limit + } +} + +pub fn[S : Sink] QueuedSink::flush(self : QueuedSink[S]) -> Int { + self.drain() +} + +pub impl[S] Sink for QueuedSink[S] with write(self, rec) { + let full = self.max_pending > 0 && self.pending_count() >= self.max_pending + if !full { + self.queue.push(rec) + } else { + self.dropped_count.val += 1 + match self.overflow { + QueueOverflowPolicy::DropNewest => () + QueueOverflowPolicy::DropOldest => { + ignore(self.queue.pop()) + self.queue.push(rec) + } + } + } +} + +pub struct FilterSink[S] { + sink : S + predicate : (Record) -> Bool +} + +pub fn[S] filter_sink(sink : S, predicate : (Record) -> Bool) -> FilterSink[S] { + { sink, predicate } +} + +pub impl[S : Sink] Sink for FilterSink[S] with write(self, rec) { + if (self.predicate)(rec) { + self.sink.write(rec) + } +} + +pub struct PatchSink[S] { + sink : S + patch : RecordPatch +} + +pub fn[S] patch_sink(sink : S, patch : RecordPatch) -> PatchSink[S] { + { sink, patch } +} + +pub impl[S : Sink] Sink for PatchSink[S] with write(self, rec) { + self.sink.write((self.patch)(rec)) +} diff --git a/moon.mod.json b/moon.mod.json index 75a50f3..0f5f036 100644 --- a/moon.mod.json +++ b/moon.mod.json @@ -3,7 +3,7 @@ "version": "0.1.0", "readme": "README.mbt.md", "repository": "", - "license": "Apache-2.0", + "license": "MIT", "keywords": [ "logger", "logging",