package logx import ( "bytes" "context" "encoding/json" "fmt" "net/http" "os" "time" "git.liteyuki.org/LiteyukiStudio/folium/osx" "github.com/prometheus/client_golang/prometheus" ) type ConsoleCollector struct{} func NewConsoleCollector() *ConsoleCollector { return &ConsoleCollector{} } func (c *ConsoleCollector) Collect(ctx context.Context, e Entry) error { // 简单输出 JSON 行,供外部 log collector(Fluent/FluentBit)抓取 b, _ := json.Marshal(e) fmt.Fprintln(os.Stdout, string(b)) return nil } func (c *ConsoleCollector) Close() error { return nil } // LokiCollector:把日志推送到 Loki push API(基本实现) type LokiCollector struct { url string client *http.Client labels string // e.g. `{job="myapp"}` } func NewLokiCollector(collectorURL string, labels map[string]string) *LokiCollector { // 构建 labels 字符串 lbl := "{" first := true for k, v := range labels { if !first { lbl += "," } lbl += fmt.Sprintf("%s=\"%s\"", k, v) first = false } lbl += "}" return &LokiCollector{ url: collectorURL, client: &http.Client{Timeout: 5 * time.Second}, labels: lbl, } } type lokiStream struct { Stream map[string]string `json:"stream"` Values [][2]string `json:"values"` } type lokiPush struct { Streams []lokiStream `json:"streams"` } func (c *LokiCollector) Collect(ctx context.Context, e Entry) error { // Loki requires timestamp in nanoseconds as string ts := fmt.Sprintf("%d", e.Time.UnixNano()) lineB, _ := json.Marshal(e) stream := lokiStream{ Stream: map[string]string{"level": string(e.Level)}, Values: [][2]string{{ts, string(lineB)}}, } push := lokiPush{Streams: []lokiStream{stream}} body, _ := json.Marshal(push) req, _ := http.NewRequestWithContext(ctx, "POST", c.url+"/loki/api/v1/push", bytes.NewBuffer(body)) req.Header.Set("Content-Type", "application/json") resp, err := c.client.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode >= 300 { return fmt.Errorf("loki push status %d", resp.StatusCode) } return nil } func (c *LokiCollector) Close() error { return nil } // ESCollector:把日志写入 Elasticsearch 简单实现(使用 index API) type ESCollector struct { url string // e.g. http://es:9200 index string // index name client *http.Client auth *basicAuth // optional } type basicAuth struct { user string pass string } func NewESCollector(url, index string, authUser, authPass string) *ESCollector { var auth *basicAuth if authUser != "" { auth = &basicAuth{user: authUser, pass: authPass} } return &ESCollector{ url: url, index: index, client: &http.Client{Timeout: 5 * time.Second}, auth: auth, } } func (c *ESCollector) Collect(ctx context.Context, e Entry) error { b, _ := json.Marshal(e) endpoint := fmt.Sprintf("%s/%s/_doc", c.url, c.index) req, _ := http.NewRequestWithContext(ctx, "POST", endpoint, bytes.NewBuffer(b)) req.Header.Set("Content-Type", "application/json") if c.auth != nil { req.SetBasicAuth(c.auth.user, c.auth.pass) } resp, err := c.client.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode >= 300 { return fmt.Errorf("es index status %d", resp.StatusCode) } return nil } func (c *ESCollector) Close() error { return nil } // PromCollector:记录日志计数到 Prometheus(便于监控日志量) type PromCollector struct { counter *prometheus.CounterVec } func NewPromCollector(namespace string) *PromCollector { cv := prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: namespace, Name: "app_logs_total", Help: "Total application logs by level", }, []string{"level"}) // 忽略重复注册错误(如果已存在,使用已注册的) _ = prometheus.Register(cv) return &PromCollector{ counter: cv, } } func (p *PromCollector) Collect(ctx context.Context, e Entry) error { p.counter.WithLabelValues(string(e.Level)).Inc() return nil } func (p *PromCollector) Close() error { return nil } // LoadCollectorsFromEnv 根据环境变量 LOG_COLLECTORS 加载对应的 collectors func LoadCollectorsFromEnv() []Collector { collectors := osx.GetEnv("LOG_COLLECTORS", "") var result []Collector if collectors == "" { return result } types := bytes.Split([]byte(collectors), []byte(",")) for _, t := range types { switch string(bytes.TrimSpace(t)) { case "console": result = append(result, NewConsoleCollector()) case "loki": lokiURL := osx.GetEnv("LOKI_URL", "http://localhost:3100") labels := map[string]string{ "job": osx.GetEnv("LOKI_JOB", "myapp"), } result = append(result, NewLokiCollector(lokiURL, labels)) case "elasticsearch": esURL := osx.GetEnv("ES_URL", "http://localhost:9200") esIndex := osx.GetEnv("ES_INDEX", "app-logs") esUser := osx.GetEnv("ES_USER", "") esPass := osx.GetEnv("ES_PASS", "") result = append(result, NewESCollector(esURL, esIndex, esUser, esPass)) case "prometheus": namespace := osx.GetEnv("PROM_NAMESPACE", "myapp") result = append(result, NewPromCollector(namespace)) // 可扩展更多 collector 类型 } } return result }