diff --git a/go/pkg/log/log.go b/go/pkg/log/log.go index 507f067..4d955a6 100644 --- a/go/pkg/log/log.go +++ b/go/pkg/log/log.go @@ -35,7 +35,14 @@ func NewHandler(cfg *config.Config) (slog.Handler, func() error) { } if cfg.Bool("telemetry.openobserve.enable") && cfg.Bool("telemetry.openobserve.log.enable") { - handlers = append(handlers, createO2Logger(cfg)) + log, clean := createO2Logger(cfg) + cl := cleanup + handlers = append(handlers, log) + cleanup = func() error { + err := cl() + clean() + return err + } } if len(handlers) == 0 { @@ -88,14 +95,14 @@ func createStandardLogger(cfg *config.Config) (slog.Handler, func() error) { } } -func createO2Logger(cfg *config.Config) slog.Handler { +func createO2Logger(cfg *config.Config) (slog.Handler, func()) { var lvl slog.Level _ = lvl.UnmarshalText(cfg.Bytes("telemetry.openobserve.log.level")) opts := &slog.HandlerOptions{ AddSource: cfg.Bool("telemetry.openobserve.log.source"), Level: lvl, } - return telemetry.NewOpenObserveHandler(telemetry.OpenObserveHandlerOptions{ + handler := telemetry.NewOpenObserveHandler(telemetry.OpenObserveHandlerOptions{ HandlerOptions: opts, BufferSize: cfg.Int("telemetry.openobserve.log.buffer.size"), BufferTimeout: cfg.Duration("telemetry.openobserve.log.buffer.timeout"), @@ -105,4 +112,5 @@ func createO2Logger(cfg *config.Config) slog.Handler { Username: cfg.String("telemetry.openobserve.log.username"), Password: cfg.String("telemetry.openobserve.log.password"), }) + return handler, handler.Flush } diff --git a/go/pkg/telemetry/open_observe.go b/go/pkg/telemetry/open_observe.go index 6112750..839cc21 100644 --- a/go/pkg/telemetry/open_observe.go +++ b/go/pkg/telemetry/open_observe.go @@ -47,22 +47,7 @@ func (sl *OpenObserveHandler) Handle(ctx context.Context, record slog.Record) er sl.mu.Lock() defer sl.mu.Unlock() sl.sendDebounceFunc = nil - if sl.buffer.Len() < 1 { - return - } - b := sl.extractBuffer().Bytes() - if b[len(b)-1] == ',' { - b = b[:len(b)-1] - } - if b[len(b)-1] == '\n' { - b = b[:len(b)-1] - } - b = append(b, ']') - sl.semaphore <- struct{}{} - go func() { - defer func() { <-sl.semaphore }() - sl.postLog(bytes.NewReader(b)) - }() + sl.flushAsync() }) } @@ -92,6 +77,42 @@ func (sl *OpenObserveHandler) Handle(ctx context.Context, record slog.Record) er return nil } +func (sl *OpenObserveHandler) Flush() { + sl.mu.Lock() + defer sl.mu.Unlock() + if sl.buffer.Len() < 1 { + return + } + b := sl.extractBuffer().Bytes() + if b[len(b)-1] == ',' { + b = b[:len(b)-1] + } + if b[len(b)-1] == '\n' { + b = b[:len(b)-1] + } + b = append(b, ']') + sl.postLog(bytes.NewReader(b)) +} + +func (sl *OpenObserveHandler) flushAsync() { + if sl.buffer.Len() < 1 { + return + } + b := sl.extractBuffer().Bytes() + if b[len(b)-1] == ',' { + b = b[:len(b)-1] + } + if b[len(b)-1] == '\n' { + b = b[:len(b)-1] + } + b = append(b, ']') + sl.semaphore <- struct{}{} + go func() { + defer func() { <-sl.semaphore }() + sl.postLog(bytes.NewReader(b)) + }() +} + func (sl *OpenObserveHandler) extractBuffer() *bytes.Buffer { b := sl.buffer.Bytes() newb := make([]byte, len(b))