Files
folium/logx/collector.go
Snowykami b0d224dc64 feat: add logging and collector implementations
- Introduced `hertzx` package with `NewHertz` function for server initialization.
- Implemented `logx` package with various log collectors: Console, Loki, Elasticsearch, and Prometheus.
- Added `Logger` struct to manage logging levels and collectors.
- Created environment variable loading functionality in `osx` package to support configuration.
- Enhanced logging capabilities with structured log entries and asynchronous collection.
2025-11-01 13:20:02 +08:00

188 lines
5.0 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package logx
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"os"
"time"
"git.liteyuki.org/LiteyukiStudio/folium/osx"
"github.com/prometheus/client_golang/prometheus"
)
type ConsoleCollector struct{}
func NewConsoleCollector() *ConsoleCollector { return &ConsoleCollector{} }
func (c *ConsoleCollector) Collect(ctx context.Context, e Entry) error {
// 简单输出 JSON 行,供外部 log collectorFluent/FluentBit抓取
b, _ := json.Marshal(e)
fmt.Fprintln(os.Stdout, string(b))
return nil
}
func (c *ConsoleCollector) Close() error { return nil }
// LokiCollector把日志推送到 Loki push API基本实现
type LokiCollector struct {
url string
client *http.Client
labels string // e.g. `{job="myapp"}`
}
func NewLokiCollector(collectorURL string, labels map[string]string) *LokiCollector {
// 构建 labels 字符串
lbl := "{"
first := true
for k, v := range labels {
if !first {
lbl += ","
}
lbl += fmt.Sprintf("%s=\"%s\"", k, v)
first = false
}
lbl += "}"
return &LokiCollector{
url: collectorURL,
client: &http.Client{Timeout: 5 * time.Second},
labels: lbl,
}
}
type lokiStream struct {
Stream map[string]string `json:"stream"`
Values [][2]string `json:"values"`
}
type lokiPush struct {
Streams []lokiStream `json:"streams"`
}
func (c *LokiCollector) Collect(ctx context.Context, e Entry) error {
// Loki requires timestamp in nanoseconds as string
ts := fmt.Sprintf("%d", e.Time.UnixNano())
lineB, _ := json.Marshal(e)
stream := lokiStream{
Stream: map[string]string{"level": string(e.Level)},
Values: [][2]string{{ts, string(lineB)}},
}
push := lokiPush{Streams: []lokiStream{stream}}
body, _ := json.Marshal(push)
req, _ := http.NewRequestWithContext(ctx, "POST", c.url+"/loki/api/v1/push", bytes.NewBuffer(body))
req.Header.Set("Content-Type", "application/json")
resp, err := c.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
return fmt.Errorf("loki push status %d", resp.StatusCode)
}
return nil
}
func (c *LokiCollector) Close() error { return nil }
// ESCollector把日志写入 Elasticsearch 简单实现(使用 index API
type ESCollector struct {
url string // e.g. http://es:9200
index string // index name
client *http.Client
auth *basicAuth // optional
}
type basicAuth struct {
user string
pass string
}
func NewESCollector(url, index string, authUser, authPass string) *ESCollector {
var auth *basicAuth
if authUser != "" {
auth = &basicAuth{user: authUser, pass: authPass}
}
return &ESCollector{
url: url,
index: index,
client: &http.Client{Timeout: 5 * time.Second},
auth: auth,
}
}
func (c *ESCollector) Collect(ctx context.Context, e Entry) error {
b, _ := json.Marshal(e)
endpoint := fmt.Sprintf("%s/%s/_doc", c.url, c.index)
req, _ := http.NewRequestWithContext(ctx, "POST", endpoint, bytes.NewBuffer(b))
req.Header.Set("Content-Type", "application/json")
if c.auth != nil {
req.SetBasicAuth(c.auth.user, c.auth.pass)
}
resp, err := c.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
return fmt.Errorf("es index status %d", resp.StatusCode)
}
return nil
}
func (c *ESCollector) Close() error { return nil }
// PromCollector记录日志计数到 Prometheus便于监控日志量
type PromCollector struct {
counter *prometheus.CounterVec
}
func NewPromCollector(namespace string) *PromCollector {
cv := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "app_logs_total",
Help: "Total application logs by level",
}, []string{"level"})
// 忽略重复注册错误(如果已存在,使用已注册的)
_ = prometheus.Register(cv)
return &PromCollector{
counter: cv,
}
}
func (p *PromCollector) Collect(ctx context.Context, e Entry) error {
p.counter.WithLabelValues(string(e.Level)).Inc()
return nil
}
func (p *PromCollector) Close() error { return nil }
// LoadCollectorsFromEnv 根据环境变量 LOG_COLLECTORS 加载对应的 collectors
func LoadCollectorsFromEnv() []Collector {
collectors := osx.GetEnv("LOG_COLLECTORS", "")
var result []Collector
if collectors == "" {
return result
}
types := bytes.Split([]byte(collectors), []byte(","))
for _, t := range types {
switch string(bytes.TrimSpace(t)) {
case "console":
result = append(result, NewConsoleCollector())
case "loki":
lokiURL := osx.GetEnv("LOKI_URL", "http://localhost:3100")
labels := map[string]string{
"job": osx.GetEnv("LOKI_JOB", "myapp"),
}
result = append(result, NewLokiCollector(lokiURL, labels))
case "elasticsearch":
esURL := osx.GetEnv("ES_URL", "http://localhost:9200")
esIndex := osx.GetEnv("ES_INDEX", "app-logs")
esUser := osx.GetEnv("ES_USER", "")
esPass := osx.GetEnv("ES_PASS", "")
result = append(result, NewESCollector(esURL, esIndex, esUser, esPass))
case "prometheus":
namespace := osx.GetEnv("PROM_NAMESPACE", "myapp")
result = append(result, NewPromCollector(namespace))
// 可扩展更多 collector 类型
}
}
return result
}