From 50011ca3acebea2d76c641b5da8fc831ae02a4b8 Mon Sep 17 00:00:00 2001 From: Tigor Hutasuhut Date: Mon, 12 Aug 2024 22:00:21 +0700 Subject: [PATCH] telemetry: update metrics method --- go/cmd/bluemage/serve/serve.go | 5 +- go/config/default.go | 1 + go/pkg/errs/multi.go | 84 ++++++++++++++++++++++++++++++++++ go/pkg/telemetry/telemetry.go | 22 +++++++-- go/server/interceptor.go | 5 +- 5 files changed, 109 insertions(+), 8 deletions(-) create mode 100644 go/pkg/errs/multi.go diff --git a/go/cmd/bluemage/serve/serve.go b/go/cmd/bluemage/serve/serve.go index afdea58..f715962 100644 --- a/go/cmd/bluemage/serve/serve.go +++ b/go/cmd/bluemage/serve/serve.go @@ -58,7 +58,8 @@ var Cmd = &cobra.Command{ return errs.Wrapw(err, "failed to create download directory", "path", cfg.String("download.directory")) } - sqldb, err := sql.Open(cfg.String("db.driver"), fmt.Sprintf("file:%s", cfg.String("db.path"))) + dbPath := fmt.Sprintf("file:%s", cfg.String("db.path")) + sqldb, err := sql.Open(cfg.String("db.driver"), dbPath) if err != nil { return errs.Wrapw(err, "failed to open database", "driver", cfg.String("db.driver"), @@ -68,7 +69,7 @@ var Cmd = &cobra.Command{ cleanups = append(cleanups, sqldb.Close) sqldb = sqldblogger.OpenDriver( - "file:data.db", + dbPath, sqldb.Driver(), log.SQLLogger{}, sqldblogger.WithSQLQueryAsMessage(true), diff --git a/go/config/default.go b/go/config/default.go index 60bf516..514d42d 100644 --- a/go/config/default.go +++ b/go/config/default.go @@ -66,6 +66,7 @@ var DefaultConfig = Entries{ {"telemetry.openobserve.metrics.enable", true, "Wether to enable sending metrics to OpenObserve server", false}, {"telemetry.openobserve.metrics.url", "http://localhost:5080/api/default/v1/metrics", "Endpoint url to send traces to", false}, {"telemetry.openobserve.metrics.auth", "Basic AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", "Authorization token for the Endpoint URL", false}, + {"telemetry.openobserve.metrics.interval", "5s", "Interval of metrics being sent", false}, {"telemetry.traces.ratio", float64(1), "Sampling ratio between 0 to 1 on how many traces are sent to the server. Value of 1 will send everything", false}, {"telemetry.service.name", "bluemage", "Name of the service to send to telemetry server", true}, diff --git a/go/pkg/errs/multi.go b/go/pkg/errs/multi.go new file mode 100644 index 0000000..b52852c --- /dev/null +++ b/go/pkg/errs/multi.go @@ -0,0 +1,84 @@ +package errs + +import ( + "log/slog" + "strconv" + "strings" + "sync" +) + +type Multi []error + +func (mu Multi) LogValue() slog.Value { + vals := make([]slog.Attr, len(mu)) + for i, e := range mu { + vals[i] = slog.Any(strconv.Itoa(i+1), e) + } + return slog.GroupValue(vals...) +} + +func (mu Multi) Error() string { + s := strings.Builder{} + for i, err := range mu.Clean() { + if i > 0 { + s.WriteString("; ") + } + s.WriteString(strconv.Itoa(i + 1)) + s.WriteString(". ") + s.WriteString(err.Error()) + } + return s.String() +} + +func (mu Multi) Clean() Multi { + out := make(Multi, 0, len(mu)) + for _, err := range mu { + if err != nil { + out = append(out, err) + } + } + return out +} + +func (mu Multi) Resolve() error { + out := mu.Clean() + if len(out) == 0 { + return nil + } + return out +} + +// Close closes multiple close functions in go routines +// and returns all that errors in the order they are defined. +// +// All errors that are nil will be discarded from the list. +// +// If no errors met, Close returns nil. +func Close(funcs ...func() error) error { + if len(funcs) == 0 { + return nil + } + errs := make(Multi, len(funcs)) + wg := sync.WaitGroup{} + wg.Add(len(funcs)) + for i, fn := range funcs { + go func(i int, fn func() error) { + defer wg.Done() + errs[i] = fn() + }(i, fn) + } + wg.Wait() + return errs.Resolve() +} + +// Join joins the error into one error. +// +// Join discards errors that are nil. +// +// If all errs are nil, Join returns nil. +func Join(errs ...error) error { + if len(errs) == 0 { + return nil + } + return Multi(errs).Resolve() +} diff --git a/go/pkg/telemetry/telemetry.go b/go/pkg/telemetry/telemetry.go index 8da4c03..758acdd 100644 --- a/go/pkg/telemetry/telemetry.go +++ b/go/pkg/telemetry/telemetry.go @@ -20,6 +20,7 @@ import ( type Telemetry struct { tracer *sdktrace.TracerProvider + metric *metric.MeterProvider } func New(ctx context.Context, cfg *config.Config) (tele Telemetry, err error) { @@ -35,6 +36,7 @@ func New(ctx context.Context, cfg *config.Config) (tele Telemetry, err error) { return tele, err } otel.SetMeterProvider(metricsProvider) + tele.metric = metricsProvider } tele.tracer = tracerProvider otel.SetTracerProvider(tracerProvider) @@ -78,8 +80,14 @@ func createMetricsProvider(ctx context.Context, cfg *config.Config, res *resourc if err != nil { return nil, errs.Wrap(err, "failed to create otlpmetrichttp exporter") } + return metric.NewMeterProvider( - metric.WithReader(metric.NewPeriodicReader(exp)), + metric.WithReader( + metric.NewPeriodicReader( + exp, + metric.WithInterval(cfg.Duration("telemetry.openobserve.metrics.interval")), + ), + ), metric.WithResource(res), ), nil } @@ -88,13 +96,21 @@ func (te *Telemetry) Close() error { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - return te.tracer.Shutdown(ctx) + return errs.Close( + func() error { return te.tracer.Shutdown(ctx) }, + func() error { + if te.metric != nil { + return te.metric.Shutdown(ctx) + } + return nil + }, + ) } func newResource(cfg *config.Config) *resource.Resource { res, err := resource.Merge(resource.Default(), resource.NewWithAttributes( semconv.SchemaURL, - semconv.ServiceNameKey.String("bluemage"), + semconv.ServiceNameKey.String(cfg.String("telemetry.service.name")), semconv.ServiceVersionKey.String(cfg.String("runtime.version")), attribute.String("environment", cfg.String("runtime.environment")), )) diff --git a/go/server/interceptor.go b/go/server/interceptor.go index e8315f9..e99cd41 100644 --- a/go/server/interceptor.go +++ b/go/server/interceptor.go @@ -21,12 +21,11 @@ func LogInterceptor() connect.UnaryInterceptorFunc { start := time.Now() resp, err = next(ctx, ar) dur := time.Since(start) - durFloat := float64(dur) / float64(time.Second) if err != nil { slog.ErrorContext(ctx, "RPC Error", "procedure", ar.Spec().Procedure, "method", ar.HTTPMethod(), - "duration", fmt.Sprintf("%.3fs", durFloat), + "duration", fmt.Sprintf("%.3fs", dur.Seconds()), "error", errs.DrillToError(err), "span.id", span.SpanContext().SpanID().String(), "trace.id", span.SpanContext().TraceID().String(), @@ -35,7 +34,7 @@ func LogInterceptor() connect.UnaryInterceptorFunc { slog.InfoContext(ctx, "RPC Call", "procedure", ar.Spec().Procedure, "method", ar.HTTPMethod(), - "duration", fmt.Sprintf("%.3fs", durFloat), + "duration", fmt.Sprintf("%.3fs", dur.Seconds()), "span.id", span.SpanContext().SpanID().String(), "trace.id", span.SpanContext().TraceID().String(), )