Bluemage/go/pkg/telemetry/open_observe.go

219 lines
5.1 KiB
Go
Raw Normal View History

package telemetry
import (
"bytes"
"context"
"fmt"
"io"
"log/slog"
"net/http"
"sync"
"time"
)
type HTTPClient interface {
Do(req *http.Request) (*http.Response, error)
}
type OpenObserveHandler struct {
opts OpenObserveHandlerOptions
semaphore chan struct{}
withAttrs []slog.Attr
withGroup []string
buffer *bytes.Buffer
mu sync.Mutex
sendDebounceFunc *time.Timer
}
func (sl *OpenObserveHandler) clone() *OpenObserveHandler {
return &OpenObserveHandler{
opts: sl.opts,
semaphore: sl.semaphore,
withAttrs: sl.withAttrs,
withGroup: sl.withGroup,
buffer: &bytes.Buffer{},
}
}
func (sl *OpenObserveHandler) Enabled(ctx context.Context, lvl slog.Level) bool {
return sl.opts.HandlerOptions.Level.Level() <= lvl
}
func (sl *OpenObserveHandler) Handle(ctx context.Context, record slog.Record) error {
sl.mu.Lock()
defer sl.mu.Unlock()
if sl.sendDebounceFunc == nil {
sl.sendDebounceFunc = time.AfterFunc(sl.opts.BufferTimeout, func() {
sl.mu.Lock()
defer sl.mu.Unlock()
sl.sendDebounceFunc = nil
if sl.buffer.Len() < 1 {
return
}
b := sl.extractBuffer().Bytes()
if b[len(b)-1] == ',' {
b = b[:len(b)-1]
}
if b[len(b)-1] == '\n' {
b = b[:len(b)-1]
}
b = append(b, ']')
sl.semaphore <- struct{}{}
go func() {
defer func() { <-sl.semaphore }()
sl.postLog(bytes.NewReader(b))
}()
})
}
if sl.buffer.Len() < 1 {
sl.buffer.WriteRune('[')
}
jsonHandler := sl.jsonHandler(sl.buffer)
if err := jsonHandler.Handle(ctx, record); err != nil {
return err
}
if sl.buffer.Len() < sl.opts.BufferSize-1 {
sl.buffer.WriteRune(',')
} else {
sl.sendDebounceFunc.Stop()
sl.sendDebounceFunc = nil
sl.buffer.WriteRune(']')
buf := sl.extractBuffer()
sl.semaphore <- struct{}{}
go func() {
defer func() { <-sl.semaphore }()
sl.postLog(buf)
}()
}
return nil
}
func (sl *OpenObserveHandler) extractBuffer() *bytes.Buffer {
b := sl.buffer.Bytes()
newb := make([]byte, len(b))
copy(newb, b)
if sl.buffer.Cap() > 512*1024 {
sl.buffer = &bytes.Buffer{}
} else {
sl.buffer.Reset()
}
return bytes.NewBuffer(newb)
}
func noopReplaceAttr(_ []string, attr slog.Attr) slog.Attr { return attr }
func (sl *OpenObserveHandler) jsonHandler(w io.Writer) slog.Handler {
handler := slog.NewJSONHandler(w, wrapHandlerOptions(sl.opts.HandlerOptions)).
WithAttrs(sl.opts.WithAttrs).
WithAttrs(sl.withAttrs)
for _, name := range sl.withGroup {
handler = handler.WithGroup(name)
}
return handler
}
func (sl *OpenObserveHandler) postLog(buf io.Reader) {
req, err := http.NewRequest(http.MethodPost, sl.opts.Endpoint, buf)
if err != nil {
fmt.Printf("openobserve: failed to create request: %s\n", err)
return
}
req.SetBasicAuth(sl.opts.Username, sl.opts.Password)
req.Header.Set("Content-Type", "application/json")
resp, err := sl.opts.HTTPClient.Do(req)
if err != nil {
fmt.Printf("openobserve: failed to execute request: %s\n", err)
return
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
fmt.Printf("openobserve: unexpected %d status code from openobserve instance when sending logs\n", resp.StatusCode)
body, _ := io.ReadAll(resp.Body)
fmt.Println(string(body))
}
}
func (sl *OpenObserveHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
sl2 := sl.clone()
sl2.withAttrs = append(sl2.withAttrs, attrs...)
return sl2
}
func (sl *OpenObserveHandler) WithGroup(name string) slog.Handler {
if name == "" {
return sl
}
sl2 := sl.clone()
sl2.withGroup = append(sl2.withGroup, name)
return sl2
}
func wrapHandlerOptions(in *slog.HandlerOptions) *slog.HandlerOptions {
return &slog.HandlerOptions{
AddSource: in.AddSource,
Level: in.Level,
ReplaceAttr: wrapReplaceAttr(in.ReplaceAttr),
}
}
type replaceAttrFunc = func(group []string, attr slog.Attr) slog.Attr
func wrapReplaceAttr(replaceAttr replaceAttrFunc) replaceAttrFunc {
return func(group []string, attr slog.Attr) slog.Attr {
if len(group) > 0 {
return replaceAttr(group, attr)
}
if attr.Key == slog.TimeKey {
return slog.Attr{
Key: "_timestamp",
Value: attr.Value,
}
}
return replaceAttr(group, attr)
}
}
type OpenObserveHandlerOptions struct {
HandlerOptions *slog.HandlerOptions
// Maximum size for the buffer to store log messages before flushing.
BufferSize int
// Maximum time to wait before flushing the buffer.
BufferTimeout time.Duration
// Maximum number of concurrent requests to send logs.
Concurrency int
// Endpoint to send logs to.
Endpoint string
// HTTPClient to use for sending logs.
HTTPClient HTTPClient
// Attributes to include in every log message.
WithAttrs []slog.Attr
Username string
Password string
}
func NewOpenObserveHandler(opts OpenObserveHandlerOptions) *OpenObserveHandler {
if opts.HandlerOptions == nil {
opts.HandlerOptions = &slog.HandlerOptions{}
}
if opts.HandlerOptions.ReplaceAttr == nil {
opts.HandlerOptions.ReplaceAttr = noopReplaceAttr
}
return &OpenObserveHandler{
opts: opts,
semaphore: make(chan struct{}, opts.Concurrency),
buffer: &bytes.Buffer{},
}
}