From 4fdc760554726389503db19fbb7bdef7f6dd0831 Mon Sep 17 00:00:00 2001 From: Tigor Hutasuhut Date: Fri, 12 Apr 2024 22:47:22 +0700 Subject: [PATCH] telemetry: added OpenObserve support --- config/default.go | 11 ++ go.mod | 3 + go.sum | 4 + pkg/log/log.go | 40 +++++- pkg/log/open_observe_handler.go | 214 ++++++++++++++++++++++++++++++++ 5 files changed, 271 insertions(+), 1 deletion(-) create mode 100644 pkg/log/open_observe_handler.go diff --git a/config/default.go b/config/default.go index c716c03..3b67c9b 100644 --- a/config/default.go +++ b/config/default.go @@ -25,4 +25,15 @@ var DefaultConfig = map[string]any{ "http.host": "0.0.0.0", "http.shutdown_timeout": "5s", "http.hotreload": false, + + "telemetry.openobserve.enable": false, + "telemetry.openobserve.log.enable": true, + "telemetry.openobserve.log.level": "info", + "telemetry.openobserve.log.source": true, + "telemetry.openobserve.log.endpoint": "http://localhost:5080/api/default/default/_json", + "telemetry.openobserve.log.concurrency": 4, + "telemetry.openobserve.log.buffer.size": 2 * 1024, // 2kb + "telemetry.openobserve.log.buffer.timeout": "500ms", + "telemetry.openobserve.log.username": "root@example.com", + "telemetry.openobserve.log.password": "Complexpass#123", } diff --git a/go.mod b/go.mod index ee7a67e..bba163b 100644 --- a/go.mod +++ b/go.mod @@ -28,6 +28,8 @@ require ( github.com/teivah/broadcast v0.1.0 ) +require github.com/samber/lo v1.38.1 // indirect + require ( github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect @@ -41,6 +43,7 @@ require ( github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/riandyrn/otelchi v0.6.0 // indirect + github.com/samber/slog-multi v1.0.2 github.com/sethvargo/go-retry v0.2.4 // indirect go.opentelemetry.io/otel v1.25.0 // indirect go.opentelemetry.io/otel/metric v1.25.0 // indirect diff --git a/go.sum b/go.sum index 6fc195a..2120ffc 100644 --- a/go.sum +++ b/go.sum @@ -170,6 +170,10 @@ github.com/riandyrn/otelchi v0.6.0/go.mod h1:BfwVxPKUNgJx12Z8XSrMGYT8/pTge+QNaoj github.com/robfig/cron/v3 v3.0.0 h1:kQ6Cb7aHOHTSzNVNEhmp8EcWKLb4CbiMW9h9VyIhO4E= github.com/robfig/cron/v3 v3.0.0/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM= +github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA= +github.com/samber/slog-multi v1.0.2 h1:6BVH9uHGAsiGkbbtQgAOQJMpKgV8unMrHhhJaw+X1EQ= +github.com/samber/slog-multi v1.0.2/go.mod h1:uLAvHpGqbYgX4FSL0p1ZwoLuveIAJvBECtE07XmYvFo= github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= github.com/sethvargo/go-retry v0.2.4 h1:T+jHEQy/zKJf5s95UkguisicE0zuF9y7+/vgz08Ocec= diff --git a/pkg/log/log.go b/pkg/log/log.go index 5873509..b121077 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "log/slog" + "net/http" "os" "strings" "time" @@ -14,15 +15,33 @@ import ( "github.com/tigorlazuardi/redmage/config" "github.com/tigorlazuardi/redmage/pkg/caller" "go.opentelemetry.io/otel/trace" + + slogmulti "github.com/samber/slog-multi" ) var handler slog.Handler = NullHandler{} func NewHandler(cfg *config.Config) slog.Handler { - if !cfg.Bool("log.enable") { + var handlers []slog.Handler + + if cfg.Bool("log.enable") { + handlers = append(handlers, createStandardLogger(cfg)) + } + + if cfg.Bool("telemetry.openobserve.enable") && cfg.Bool("telemetry.openobserve.log.enable") { + handlers = append(handlers, createO2Logger(cfg)) + } + + if len(handlers) == 0 { return NullHandler{} } + + return slogmulti.Fanout(handlers...) +} + +func createStandardLogger(cfg *config.Config) slog.Handler { var output io.Writer + if strings.ToLower(cfg.String("log.output")) == "stdout" { output = colorable.NewColorableStdout() } else { @@ -44,6 +63,25 @@ func NewHandler(cfg *config.Config) slog.Handler { } } +func createO2Logger(cfg *config.Config) slog.Handler { + var lvl slog.Level + _ = lvl.UnmarshalText(cfg.Bytes("telemetry.openobserve.log.level")) + opts := &slog.HandlerOptions{ + AddSource: cfg.Bool("telemetry.openobserve.log.source"), + Level: lvl, + } + return NewOpenObserveHandler(OpenObserveHandlerOptions{ + HandlerOptions: opts, + BufferSize: cfg.Int("telemetry.openobserve.log.buffer.size"), + BufferTimeout: cfg.Duration("telemetry.openobserve.log.buffer.timeout"), + Concurrency: cfg.Int("telemetry.openobserve.log.concurrency"), + Endpoint: cfg.String("telemetry.openobserve.log.endpoint"), + HTTPClient: http.DefaultClient, + Username: cfg.String("telemetry.openobserve.log.username"), + Password: cfg.String("telemetry.openobserve.log.password"), + }) +} + type Entry struct { ctx context.Context handler slog.Handler diff --git a/pkg/log/open_observe_handler.go b/pkg/log/open_observe_handler.go new file mode 100644 index 0000000..6d216fe --- /dev/null +++ b/pkg/log/open_observe_handler.go @@ -0,0 +1,214 @@ +package log + +import ( + "bytes" + "context" + "fmt" + "io" + "log/slog" + "net/http" + "sync" + "time" +) + +type HTTPClient interface { + Do(req *http.Request) (*http.Response, error) +} + +type OpenObserveHandler struct { + opts OpenObserveHandlerOptions + semaphore chan struct{} + withAttrs []slog.Attr + withGroup []string + buffer *bytes.Buffer + mu sync.Mutex + sendDebounceFunc *time.Timer +} + +func (sl *OpenObserveHandler) clone() *OpenObserveHandler { + return &OpenObserveHandler{ + opts: sl.opts, + semaphore: sl.semaphore, + withAttrs: sl.withAttrs, + withGroup: sl.withGroup, + buffer: &bytes.Buffer{}, + } +} + +func (sl *OpenObserveHandler) Enabled(ctx context.Context, lvl slog.Level) bool { + return sl.opts.HandlerOptions.Level.Level() <= lvl +} + +func (sl *OpenObserveHandler) Handle(ctx context.Context, record slog.Record) error { + sl.mu.Lock() + defer sl.mu.Unlock() + if sl.sendDebounceFunc == nil { + sl.sendDebounceFunc = time.AfterFunc(sl.opts.BufferTimeout, func() { + sl.mu.Lock() + defer sl.mu.Unlock() + if sl.buffer.Len() < 1 { + return + } + b := sl.extractBuffer().Bytes() + if b[len(b)-1] == ',' { + b = b[:len(b)-1] + } + b = append(b, ']') + sl.semaphore <- struct{}{} + go func() { + defer func() { <-sl.semaphore }() + sl.postLog(bytes.NewReader(b)) + }() + }) + } + + if sl.buffer.Len() < 1 { + sl.buffer.WriteRune('[') + } + + jsonHandler := sl.jsonHandler(sl.buffer) + if err := jsonHandler.Handle(ctx, record); err != nil { + return err + } + + if sl.buffer.Len() < sl.opts.BufferSize-1 { + sl.buffer.WriteRune(',') + } else { + sl.sendDebounceFunc.Stop() + sl.sendDebounceFunc = nil + sl.buffer.WriteRune(']') + buf := sl.extractBuffer() + sl.semaphore <- struct{}{} + go func() { + defer func() { <-sl.semaphore }() + sl.postLog(buf) + }() + } + + return nil +} + +func (sl *OpenObserveHandler) extractBuffer() *bytes.Buffer { + b := sl.buffer.Bytes() + newb := make([]byte, len(b)) + copy(newb, b) + if sl.buffer.Cap() > 512*1024 { + sl.buffer = &bytes.Buffer{} + } else { + sl.buffer.Reset() + } + return bytes.NewBuffer(newb) +} + +func noopReplaceAttr(_ []string, attr slog.Attr) slog.Attr { return attr } + +func (sl *OpenObserveHandler) jsonHandler(w io.Writer) slog.Handler { + handler := slog.NewJSONHandler(w, wrapHandlerOptions(sl.opts.HandlerOptions)). + WithAttrs(sl.opts.WithAttrs). + WithAttrs(sl.withAttrs) + for _, name := range sl.withGroup { + handler = handler.WithGroup(name) + } + return handler +} + +func (sl *OpenObserveHandler) postLog(buf io.Reader) { + req, err := http.NewRequest(http.MethodPost, sl.opts.Endpoint, buf) + if err != nil { + fmt.Printf("openobserve: failed to create request: %s\n", err) + return + } + req.SetBasicAuth(sl.opts.Username, sl.opts.Password) + req.Header.Set("Content-Type", "application/json") + + resp, err := sl.opts.HTTPClient.Do(req) + if err != nil { + fmt.Printf("openobserve: failed to execute request: %s\n", err) + return + } + defer resp.Body.Close() + if resp.StatusCode >= 400 { + fmt.Printf("openobserve: unexpected %d status code from openobserve instance when sending logs\n", resp.StatusCode) + body, _ := io.ReadAll(resp.Body) + fmt.Println(string(body)) + } +} + +func (sl *OpenObserveHandler) WithAttrs(attrs []slog.Attr) slog.Handler { + sl2 := sl.clone() + sl2.withAttrs = append(sl2.withAttrs, attrs...) + return sl2 +} + +func (sl *OpenObserveHandler) WithGroup(name string) slog.Handler { + if name == "" { + return sl + } + sl2 := sl.clone() + sl2.withGroup = append(sl2.withGroup, name) + return sl2 +} + +func wrapHandlerOptions(in *slog.HandlerOptions) *slog.HandlerOptions { + return &slog.HandlerOptions{ + AddSource: in.AddSource, + Level: in.Level, + ReplaceAttr: wrapReplaceAttr(in.ReplaceAttr), + } +} + +type replaceAttrFunc = func(group []string, attr slog.Attr) slog.Attr + +func wrapReplaceAttr(replaceAttr replaceAttrFunc) replaceAttrFunc { + return func(group []string, attr slog.Attr) slog.Attr { + if len(group) > 0 { + return replaceAttr(group, attr) + } + if attr.Key == slog.TimeKey { + return slog.Attr{ + Key: "_timestamp", + Value: attr.Value, + } + } + return replaceAttr(group, attr) + } +} + +type OpenObserveHandlerOptions struct { + HandlerOptions *slog.HandlerOptions + + // Maximum size for the buffer to store log messages before flushing. + BufferSize int + + // Maximum time to wait before flushing the buffer. + BufferTimeout time.Duration + + // Maximum number of concurrent requests to send logs. + Concurrency int + + // Endpoint to send logs to. + Endpoint string + + // HTTPClient to use for sending logs. + HTTPClient HTTPClient + + // Attributes to include in every log message. + WithAttrs []slog.Attr + + Username string + Password string +} + +func NewOpenObserveHandler(opts OpenObserveHandlerOptions) *OpenObserveHandler { + if opts.HandlerOptions == nil { + opts.HandlerOptions = &slog.HandlerOptions{} + } + if opts.HandlerOptions.ReplaceAttr == nil { + opts.HandlerOptions.ReplaceAttr = noopReplaceAttr + } + return &OpenObserveHandler{ + opts: opts, + semaphore: make(chan struct{}, opts.Concurrency), + buffer: &bytes.Buffer{}, + } +}