telemetry: added OpenObserve support

This commit is contained in:
Tigor Hutasuhut 2024-04-12 22:47:22 +07:00
parent 407f1b9c70
commit 4fdc760554
5 changed files with 271 additions and 1 deletions

View file

@ -25,4 +25,15 @@ var DefaultConfig = map[string]any{
"http.host": "0.0.0.0",
"http.shutdown_timeout": "5s",
"http.hotreload": false,
"telemetry.openobserve.enable": false,
"telemetry.openobserve.log.enable": true,
"telemetry.openobserve.log.level": "info",
"telemetry.openobserve.log.source": true,
"telemetry.openobserve.log.endpoint": "http://localhost:5080/api/default/default/_json",
"telemetry.openobserve.log.concurrency": 4,
"telemetry.openobserve.log.buffer.size": 2 * 1024, // 2kb
"telemetry.openobserve.log.buffer.timeout": "500ms",
"telemetry.openobserve.log.username": "root@example.com",
"telemetry.openobserve.log.password": "Complexpass#123",
}

3
go.mod
View file

@ -28,6 +28,8 @@ require (
github.com/teivah/broadcast v0.1.0
)
require github.com/samber/lo v1.38.1 // indirect
require (
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
@ -41,6 +43,7 @@ require (
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/riandyrn/otelchi v0.6.0 // indirect
github.com/samber/slog-multi v1.0.2
github.com/sethvargo/go-retry v0.2.4 // indirect
go.opentelemetry.io/otel v1.25.0 // indirect
go.opentelemetry.io/otel/metric v1.25.0 // indirect

4
go.sum
View file

@ -170,6 +170,10 @@ github.com/riandyrn/otelchi v0.6.0/go.mod h1:BfwVxPKUNgJx12Z8XSrMGYT8/pTge+QNaoj
github.com/robfig/cron/v3 v3.0.0 h1:kQ6Cb7aHOHTSzNVNEhmp8EcWKLb4CbiMW9h9VyIhO4E=
github.com/robfig/cron/v3 v3.0.0/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM=
github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
github.com/samber/slog-multi v1.0.2 h1:6BVH9uHGAsiGkbbtQgAOQJMpKgV8unMrHhhJaw+X1EQ=
github.com/samber/slog-multi v1.0.2/go.mod h1:uLAvHpGqbYgX4FSL0p1ZwoLuveIAJvBECtE07XmYvFo=
github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
github.com/sethvargo/go-retry v0.2.4 h1:T+jHEQy/zKJf5s95UkguisicE0zuF9y7+/vgz08Ocec=

View file

@ -5,6 +5,7 @@ import (
"fmt"
"io"
"log/slog"
"net/http"
"os"
"strings"
"time"
@ -14,15 +15,33 @@ import (
"github.com/tigorlazuardi/redmage/config"
"github.com/tigorlazuardi/redmage/pkg/caller"
"go.opentelemetry.io/otel/trace"
slogmulti "github.com/samber/slog-multi"
)
var handler slog.Handler = NullHandler{}
func NewHandler(cfg *config.Config) slog.Handler {
if !cfg.Bool("log.enable") {
var handlers []slog.Handler
if cfg.Bool("log.enable") {
handlers = append(handlers, createStandardLogger(cfg))
}
if cfg.Bool("telemetry.openobserve.enable") && cfg.Bool("telemetry.openobserve.log.enable") {
handlers = append(handlers, createO2Logger(cfg))
}
if len(handlers) == 0 {
return NullHandler{}
}
return slogmulti.Fanout(handlers...)
}
func createStandardLogger(cfg *config.Config) slog.Handler {
var output io.Writer
if strings.ToLower(cfg.String("log.output")) == "stdout" {
output = colorable.NewColorableStdout()
} else {
@ -44,6 +63,25 @@ func NewHandler(cfg *config.Config) slog.Handler {
}
}
func createO2Logger(cfg *config.Config) slog.Handler {
var lvl slog.Level
_ = lvl.UnmarshalText(cfg.Bytes("telemetry.openobserve.log.level"))
opts := &slog.HandlerOptions{
AddSource: cfg.Bool("telemetry.openobserve.log.source"),
Level: lvl,
}
return NewOpenObserveHandler(OpenObserveHandlerOptions{
HandlerOptions: opts,
BufferSize: cfg.Int("telemetry.openobserve.log.buffer.size"),
BufferTimeout: cfg.Duration("telemetry.openobserve.log.buffer.timeout"),
Concurrency: cfg.Int("telemetry.openobserve.log.concurrency"),
Endpoint: cfg.String("telemetry.openobserve.log.endpoint"),
HTTPClient: http.DefaultClient,
Username: cfg.String("telemetry.openobserve.log.username"),
Password: cfg.String("telemetry.openobserve.log.password"),
})
}
type Entry struct {
ctx context.Context
handler slog.Handler

View file

@ -0,0 +1,214 @@
package log
import (
"bytes"
"context"
"fmt"
"io"
"log/slog"
"net/http"
"sync"
"time"
)
type HTTPClient interface {
Do(req *http.Request) (*http.Response, error)
}
type OpenObserveHandler struct {
opts OpenObserveHandlerOptions
semaphore chan struct{}
withAttrs []slog.Attr
withGroup []string
buffer *bytes.Buffer
mu sync.Mutex
sendDebounceFunc *time.Timer
}
func (sl *OpenObserveHandler) clone() *OpenObserveHandler {
return &OpenObserveHandler{
opts: sl.opts,
semaphore: sl.semaphore,
withAttrs: sl.withAttrs,
withGroup: sl.withGroup,
buffer: &bytes.Buffer{},
}
}
func (sl *OpenObserveHandler) Enabled(ctx context.Context, lvl slog.Level) bool {
return sl.opts.HandlerOptions.Level.Level() <= lvl
}
func (sl *OpenObserveHandler) Handle(ctx context.Context, record slog.Record) error {
sl.mu.Lock()
defer sl.mu.Unlock()
if sl.sendDebounceFunc == nil {
sl.sendDebounceFunc = time.AfterFunc(sl.opts.BufferTimeout, func() {
sl.mu.Lock()
defer sl.mu.Unlock()
if sl.buffer.Len() < 1 {
return
}
b := sl.extractBuffer().Bytes()
if b[len(b)-1] == ',' {
b = b[:len(b)-1]
}
b = append(b, ']')
sl.semaphore <- struct{}{}
go func() {
defer func() { <-sl.semaphore }()
sl.postLog(bytes.NewReader(b))
}()
})
}
if sl.buffer.Len() < 1 {
sl.buffer.WriteRune('[')
}
jsonHandler := sl.jsonHandler(sl.buffer)
if err := jsonHandler.Handle(ctx, record); err != nil {
return err
}
if sl.buffer.Len() < sl.opts.BufferSize-1 {
sl.buffer.WriteRune(',')
} else {
sl.sendDebounceFunc.Stop()
sl.sendDebounceFunc = nil
sl.buffer.WriteRune(']')
buf := sl.extractBuffer()
sl.semaphore <- struct{}{}
go func() {
defer func() { <-sl.semaphore }()
sl.postLog(buf)
}()
}
return nil
}
func (sl *OpenObserveHandler) extractBuffer() *bytes.Buffer {
b := sl.buffer.Bytes()
newb := make([]byte, len(b))
copy(newb, b)
if sl.buffer.Cap() > 512*1024 {
sl.buffer = &bytes.Buffer{}
} else {
sl.buffer.Reset()
}
return bytes.NewBuffer(newb)
}
func noopReplaceAttr(_ []string, attr slog.Attr) slog.Attr { return attr }
func (sl *OpenObserveHandler) jsonHandler(w io.Writer) slog.Handler {
handler := slog.NewJSONHandler(w, wrapHandlerOptions(sl.opts.HandlerOptions)).
WithAttrs(sl.opts.WithAttrs).
WithAttrs(sl.withAttrs)
for _, name := range sl.withGroup {
handler = handler.WithGroup(name)
}
return handler
}
func (sl *OpenObserveHandler) postLog(buf io.Reader) {
req, err := http.NewRequest(http.MethodPost, sl.opts.Endpoint, buf)
if err != nil {
fmt.Printf("openobserve: failed to create request: %s\n", err)
return
}
req.SetBasicAuth(sl.opts.Username, sl.opts.Password)
req.Header.Set("Content-Type", "application/json")
resp, err := sl.opts.HTTPClient.Do(req)
if err != nil {
fmt.Printf("openobserve: failed to execute request: %s\n", err)
return
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
fmt.Printf("openobserve: unexpected %d status code from openobserve instance when sending logs\n", resp.StatusCode)
body, _ := io.ReadAll(resp.Body)
fmt.Println(string(body))
}
}
func (sl *OpenObserveHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
sl2 := sl.clone()
sl2.withAttrs = append(sl2.withAttrs, attrs...)
return sl2
}
func (sl *OpenObserveHandler) WithGroup(name string) slog.Handler {
if name == "" {
return sl
}
sl2 := sl.clone()
sl2.withGroup = append(sl2.withGroup, name)
return sl2
}
func wrapHandlerOptions(in *slog.HandlerOptions) *slog.HandlerOptions {
return &slog.HandlerOptions{
AddSource: in.AddSource,
Level: in.Level,
ReplaceAttr: wrapReplaceAttr(in.ReplaceAttr),
}
}
type replaceAttrFunc = func(group []string, attr slog.Attr) slog.Attr
func wrapReplaceAttr(replaceAttr replaceAttrFunc) replaceAttrFunc {
return func(group []string, attr slog.Attr) slog.Attr {
if len(group) > 0 {
return replaceAttr(group, attr)
}
if attr.Key == slog.TimeKey {
return slog.Attr{
Key: "_timestamp",
Value: attr.Value,
}
}
return replaceAttr(group, attr)
}
}
type OpenObserveHandlerOptions struct {
HandlerOptions *slog.HandlerOptions
// Maximum size for the buffer to store log messages before flushing.
BufferSize int
// Maximum time to wait before flushing the buffer.
BufferTimeout time.Duration
// Maximum number of concurrent requests to send logs.
Concurrency int
// Endpoint to send logs to.
Endpoint string
// HTTPClient to use for sending logs.
HTTPClient HTTPClient
// Attributes to include in every log message.
WithAttrs []slog.Attr
Username string
Password string
}
func NewOpenObserveHandler(opts OpenObserveHandlerOptions) *OpenObserveHandler {
if opts.HandlerOptions == nil {
opts.HandlerOptions = &slog.HandlerOptions{}
}
if opts.HandlerOptions.ReplaceAttr == nil {
opts.HandlerOptions.ReplaceAttr = noopReplaceAttr
}
return &OpenObserveHandler{
opts: opts,
semaphore: make(chan struct{}, opts.Concurrency),
buffer: &bytes.Buffer{},
}
}