feat: add logging and collector implementations
- Introduced `hertzx` package with `NewHertz` function for server initialization. - Implemented `logx` package with various log collectors: Console, Loki, Elasticsearch, and Prometheus. - Added `Logger` struct to manage logging levels and collectors. - Created environment variable loading functionality in `osx` package to support configuration. - Enhanced logging capabilities with structured log entries and asynchronous collection.
This commit is contained in:
187
logx/collector.go
Normal file
187
logx/collector.go
Normal file
@@ -0,0 +1,187 @@
|
||||
package logx
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"git.liteyuki.org/LiteyukiStudio/folium/osx"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
type ConsoleCollector struct{}
|
||||
|
||||
func NewConsoleCollector() *ConsoleCollector { return &ConsoleCollector{} }
|
||||
|
||||
func (c *ConsoleCollector) Collect(ctx context.Context, e Entry) error {
|
||||
// 简单输出 JSON 行,供外部 log collector(Fluent/FluentBit)抓取
|
||||
b, _ := json.Marshal(e)
|
||||
fmt.Fprintln(os.Stdout, string(b))
|
||||
return nil
|
||||
}
|
||||
func (c *ConsoleCollector) Close() error { return nil }
|
||||
|
||||
// LokiCollector:把日志推送到 Loki push API(基本实现)
|
||||
type LokiCollector struct {
|
||||
url string
|
||||
client *http.Client
|
||||
labels string // e.g. `{job="myapp"}`
|
||||
}
|
||||
|
||||
func NewLokiCollector(collectorURL string, labels map[string]string) *LokiCollector {
|
||||
// 构建 labels 字符串
|
||||
lbl := "{"
|
||||
first := true
|
||||
for k, v := range labels {
|
||||
if !first {
|
||||
lbl += ","
|
||||
}
|
||||
lbl += fmt.Sprintf("%s=\"%s\"", k, v)
|
||||
first = false
|
||||
}
|
||||
lbl += "}"
|
||||
return &LokiCollector{
|
||||
url: collectorURL,
|
||||
client: &http.Client{Timeout: 5 * time.Second},
|
||||
labels: lbl,
|
||||
}
|
||||
}
|
||||
|
||||
type lokiStream struct {
|
||||
Stream map[string]string `json:"stream"`
|
||||
Values [][2]string `json:"values"`
|
||||
}
|
||||
type lokiPush struct {
|
||||
Streams []lokiStream `json:"streams"`
|
||||
}
|
||||
|
||||
func (c *LokiCollector) Collect(ctx context.Context, e Entry) error {
|
||||
// Loki requires timestamp in nanoseconds as string
|
||||
ts := fmt.Sprintf("%d", e.Time.UnixNano())
|
||||
lineB, _ := json.Marshal(e)
|
||||
stream := lokiStream{
|
||||
Stream: map[string]string{"level": string(e.Level)},
|
||||
Values: [][2]string{{ts, string(lineB)}},
|
||||
}
|
||||
push := lokiPush{Streams: []lokiStream{stream}}
|
||||
body, _ := json.Marshal(push)
|
||||
req, _ := http.NewRequestWithContext(ctx, "POST", c.url+"/loki/api/v1/push", bytes.NewBuffer(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
resp, err := c.client.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode >= 300 {
|
||||
return fmt.Errorf("loki push status %d", resp.StatusCode)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (c *LokiCollector) Close() error { return nil }
|
||||
|
||||
// ESCollector:把日志写入 Elasticsearch 简单实现(使用 index API)
|
||||
type ESCollector struct {
|
||||
url string // e.g. http://es:9200
|
||||
index string // index name
|
||||
client *http.Client
|
||||
auth *basicAuth // optional
|
||||
}
|
||||
|
||||
type basicAuth struct {
|
||||
user string
|
||||
pass string
|
||||
}
|
||||
|
||||
func NewESCollector(url, index string, authUser, authPass string) *ESCollector {
|
||||
var auth *basicAuth
|
||||
if authUser != "" {
|
||||
auth = &basicAuth{user: authUser, pass: authPass}
|
||||
}
|
||||
return &ESCollector{
|
||||
url: url,
|
||||
index: index,
|
||||
client: &http.Client{Timeout: 5 * time.Second},
|
||||
auth: auth,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ESCollector) Collect(ctx context.Context, e Entry) error {
|
||||
b, _ := json.Marshal(e)
|
||||
endpoint := fmt.Sprintf("%s/%s/_doc", c.url, c.index)
|
||||
req, _ := http.NewRequestWithContext(ctx, "POST", endpoint, bytes.NewBuffer(b))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
if c.auth != nil {
|
||||
req.SetBasicAuth(c.auth.user, c.auth.pass)
|
||||
}
|
||||
resp, err := c.client.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode >= 300 {
|
||||
return fmt.Errorf("es index status %d", resp.StatusCode)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (c *ESCollector) Close() error { return nil }
|
||||
|
||||
// PromCollector:记录日志计数到 Prometheus(便于监控日志量)
|
||||
type PromCollector struct {
|
||||
counter *prometheus.CounterVec
|
||||
}
|
||||
|
||||
func NewPromCollector(namespace string) *PromCollector {
|
||||
cv := prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Name: "app_logs_total",
|
||||
Help: "Total application logs by level",
|
||||
}, []string{"level"})
|
||||
// 忽略重复注册错误(如果已存在,使用已注册的)
|
||||
_ = prometheus.Register(cv)
|
||||
return &PromCollector{
|
||||
counter: cv,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PromCollector) Collect(ctx context.Context, e Entry) error {
|
||||
p.counter.WithLabelValues(string(e.Level)).Inc()
|
||||
return nil
|
||||
}
|
||||
func (p *PromCollector) Close() error { return nil }
|
||||
|
||||
// LoadCollectorsFromEnv 根据环境变量 LOG_COLLECTORS 加载对应的 collectors
|
||||
func LoadCollectorsFromEnv() []Collector {
|
||||
collectors := osx.GetEnv("LOG_COLLECTORS", "")
|
||||
var result []Collector
|
||||
if collectors == "" {
|
||||
return result
|
||||
}
|
||||
types := bytes.Split([]byte(collectors), []byte(","))
|
||||
for _, t := range types {
|
||||
switch string(bytes.TrimSpace(t)) {
|
||||
case "console":
|
||||
result = append(result, NewConsoleCollector())
|
||||
case "loki":
|
||||
lokiURL := osx.GetEnv("LOKI_URL", "http://localhost:3100")
|
||||
labels := map[string]string{
|
||||
"job": osx.GetEnv("LOKI_JOB", "myapp"),
|
||||
}
|
||||
result = append(result, NewLokiCollector(lokiURL, labels))
|
||||
case "elasticsearch":
|
||||
esURL := osx.GetEnv("ES_URL", "http://localhost:9200")
|
||||
esIndex := osx.GetEnv("ES_INDEX", "app-logs")
|
||||
esUser := osx.GetEnv("ES_USER", "")
|
||||
esPass := osx.GetEnv("ES_PASS", "")
|
||||
result = append(result, NewESCollector(esURL, esIndex, esUser, esPass))
|
||||
case "prometheus":
|
||||
namespace := osx.GetEnv("PROM_NAMESPACE", "myapp")
|
||||
result = append(result, NewPromCollector(namespace))
|
||||
// 可扩展更多 collector 类型
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
Reference in New Issue
Block a user