o2logger: flush buffer on cleanup

This commit is contained in:
Tigor Hutasuhut 2024-08-11 21:57:36 +07:00
parent 528e408f4d
commit 9056b48e54
2 changed files with 48 additions and 19 deletions

View file

@ -35,7 +35,14 @@ func NewHandler(cfg *config.Config) (slog.Handler, func() error) {
} }
if cfg.Bool("telemetry.openobserve.enable") && cfg.Bool("telemetry.openobserve.log.enable") { if cfg.Bool("telemetry.openobserve.enable") && cfg.Bool("telemetry.openobserve.log.enable") {
handlers = append(handlers, createO2Logger(cfg)) log, clean := createO2Logger(cfg)
cl := cleanup
handlers = append(handlers, log)
cleanup = func() error {
err := cl()
clean()
return err
}
} }
if len(handlers) == 0 { if len(handlers) == 0 {
@ -88,14 +95,14 @@ func createStandardLogger(cfg *config.Config) (slog.Handler, func() error) {
} }
} }
func createO2Logger(cfg *config.Config) slog.Handler { func createO2Logger(cfg *config.Config) (slog.Handler, func()) {
var lvl slog.Level var lvl slog.Level
_ = lvl.UnmarshalText(cfg.Bytes("telemetry.openobserve.log.level")) _ = lvl.UnmarshalText(cfg.Bytes("telemetry.openobserve.log.level"))
opts := &slog.HandlerOptions{ opts := &slog.HandlerOptions{
AddSource: cfg.Bool("telemetry.openobserve.log.source"), AddSource: cfg.Bool("telemetry.openobserve.log.source"),
Level: lvl, Level: lvl,
} }
return telemetry.NewOpenObserveHandler(telemetry.OpenObserveHandlerOptions{ handler := telemetry.NewOpenObserveHandler(telemetry.OpenObserveHandlerOptions{
HandlerOptions: opts, HandlerOptions: opts,
BufferSize: cfg.Int("telemetry.openobserve.log.buffer.size"), BufferSize: cfg.Int("telemetry.openobserve.log.buffer.size"),
BufferTimeout: cfg.Duration("telemetry.openobserve.log.buffer.timeout"), BufferTimeout: cfg.Duration("telemetry.openobserve.log.buffer.timeout"),
@ -105,4 +112,5 @@ func createO2Logger(cfg *config.Config) slog.Handler {
Username: cfg.String("telemetry.openobserve.log.username"), Username: cfg.String("telemetry.openobserve.log.username"),
Password: cfg.String("telemetry.openobserve.log.password"), Password: cfg.String("telemetry.openobserve.log.password"),
}) })
return handler, handler.Flush
} }

View file

@ -47,22 +47,7 @@ func (sl *OpenObserveHandler) Handle(ctx context.Context, record slog.Record) er
sl.mu.Lock() sl.mu.Lock()
defer sl.mu.Unlock() defer sl.mu.Unlock()
sl.sendDebounceFunc = nil sl.sendDebounceFunc = nil
if sl.buffer.Len() < 1 { sl.flushAsync()
return
}
b := sl.extractBuffer().Bytes()
if b[len(b)-1] == ',' {
b = b[:len(b)-1]
}
if b[len(b)-1] == '\n' {
b = b[:len(b)-1]
}
b = append(b, ']')
sl.semaphore <- struct{}{}
go func() {
defer func() { <-sl.semaphore }()
sl.postLog(bytes.NewReader(b))
}()
}) })
} }
@ -92,6 +77,42 @@ func (sl *OpenObserveHandler) Handle(ctx context.Context, record slog.Record) er
return nil return nil
} }
func (sl *OpenObserveHandler) Flush() {
sl.mu.Lock()
defer sl.mu.Unlock()
if sl.buffer.Len() < 1 {
return
}
b := sl.extractBuffer().Bytes()
if b[len(b)-1] == ',' {
b = b[:len(b)-1]
}
if b[len(b)-1] == '\n' {
b = b[:len(b)-1]
}
b = append(b, ']')
sl.postLog(bytes.NewReader(b))
}
func (sl *OpenObserveHandler) flushAsync() {
if sl.buffer.Len() < 1 {
return
}
b := sl.extractBuffer().Bytes()
if b[len(b)-1] == ',' {
b = b[:len(b)-1]
}
if b[len(b)-1] == '\n' {
b = b[:len(b)-1]
}
b = append(b, ']')
sl.semaphore <- struct{}{}
go func() {
defer func() { <-sl.semaphore }()
sl.postLog(bytes.NewReader(b))
}()
}
func (sl *OpenObserveHandler) extractBuffer() *bytes.Buffer { func (sl *OpenObserveHandler) extractBuffer() *bytes.Buffer {
b := sl.buffer.Bytes() b := sl.buffer.Bytes()
newb := make([]byte, len(b)) newb := make([]byte, len(b))