telemetry: update metrics method

This commit is contained in:
Tigor Hutasuhut 2024-08-12 22:00:21 +07:00
parent 62aee841cd
commit 50011ca3ac
5 changed files with 109 additions and 8 deletions

View file

@ -58,7 +58,8 @@ var Cmd = &cobra.Command{
return errs.Wrapw(err, "failed to create download directory", "path", cfg.String("download.directory"))
}
sqldb, err := sql.Open(cfg.String("db.driver"), fmt.Sprintf("file:%s", cfg.String("db.path")))
dbPath := fmt.Sprintf("file:%s", cfg.String("db.path"))
sqldb, err := sql.Open(cfg.String("db.driver"), dbPath)
if err != nil {
return errs.Wrapw(err, "failed to open database",
"driver", cfg.String("db.driver"),
@ -68,7 +69,7 @@ var Cmd = &cobra.Command{
cleanups = append(cleanups, sqldb.Close)
sqldb = sqldblogger.OpenDriver(
"file:data.db",
dbPath,
sqldb.Driver(),
log.SQLLogger{},
sqldblogger.WithSQLQueryAsMessage(true),

View file

@ -66,6 +66,7 @@ var DefaultConfig = Entries{
{"telemetry.openobserve.metrics.enable", true, "Wether to enable sending metrics to OpenObserve server", false},
{"telemetry.openobserve.metrics.url", "http://localhost:5080/api/default/v1/metrics", "Endpoint url to send traces to", false},
{"telemetry.openobserve.metrics.auth", "Basic AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", "Authorization token for the Endpoint URL", false},
{"telemetry.openobserve.metrics.interval", "5s", "Interval of metrics being sent", false},
{"telemetry.traces.ratio", float64(1), "Sampling ratio between 0 to 1 on how many traces are sent to the server. Value of 1 will send everything", false},
{"telemetry.service.name", "bluemage", "Name of the service to send to telemetry server", true},

84
go/pkg/errs/multi.go Normal file
View file

@ -0,0 +1,84 @@
package errs
import (
"log/slog"
"strconv"
"strings"
"sync"
)
type Multi []error
func (mu Multi) LogValue() slog.Value {
vals := make([]slog.Attr, len(mu))
for i, e := range mu {
vals[i] = slog.Any(strconv.Itoa(i+1), e)
}
return slog.GroupValue(vals...)
}
func (mu Multi) Error() string {
s := strings.Builder{}
for i, err := range mu.Clean() {
if i > 0 {
s.WriteString("; ")
}
s.WriteString(strconv.Itoa(i + 1))
s.WriteString(". ")
s.WriteString(err.Error())
}
return s.String()
}
func (mu Multi) Clean() Multi {
out := make(Multi, 0, len(mu))
for _, err := range mu {
if err != nil {
out = append(out, err)
}
}
return out
}
func (mu Multi) Resolve() error {
out := mu.Clean()
if len(out) == 0 {
return nil
}
return out
}
// Close closes multiple close functions in go routines
// and returns all that errors in the order they are defined.
//
// All errors that are nil will be discarded from the list.
//
// If no errors met, Close returns nil.
func Close(funcs ...func() error) error {
if len(funcs) == 0 {
return nil
}
errs := make(Multi, len(funcs))
wg := sync.WaitGroup{}
wg.Add(len(funcs))
for i, fn := range funcs {
go func(i int, fn func() error) {
defer wg.Done()
errs[i] = fn()
}(i, fn)
}
wg.Wait()
return errs.Resolve()
}
// Join joins the error into one error.
//
// Join discards errors that are nil.
//
// If all errs are nil, Join returns nil.
func Join(errs ...error) error {
if len(errs) == 0 {
return nil
}
return Multi(errs).Resolve()
}

View file

@ -20,6 +20,7 @@ import (
type Telemetry struct {
tracer *sdktrace.TracerProvider
metric *metric.MeterProvider
}
func New(ctx context.Context, cfg *config.Config) (tele Telemetry, err error) {
@ -35,6 +36,7 @@ func New(ctx context.Context, cfg *config.Config) (tele Telemetry, err error) {
return tele, err
}
otel.SetMeterProvider(metricsProvider)
tele.metric = metricsProvider
}
tele.tracer = tracerProvider
otel.SetTracerProvider(tracerProvider)
@ -78,8 +80,14 @@ func createMetricsProvider(ctx context.Context, cfg *config.Config, res *resourc
if err != nil {
return nil, errs.Wrap(err, "failed to create otlpmetrichttp exporter")
}
return metric.NewMeterProvider(
metric.WithReader(metric.NewPeriodicReader(exp)),
metric.WithReader(
metric.NewPeriodicReader(
exp,
metric.WithInterval(cfg.Duration("telemetry.openobserve.metrics.interval")),
),
),
metric.WithResource(res),
), nil
}
@ -88,13 +96,21 @@ func (te *Telemetry) Close() error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return te.tracer.Shutdown(ctx)
return errs.Close(
func() error { return te.tracer.Shutdown(ctx) },
func() error {
if te.metric != nil {
return te.metric.Shutdown(ctx)
}
return nil
},
)
}
func newResource(cfg *config.Config) *resource.Resource {
res, err := resource.Merge(resource.Default(), resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String("bluemage"),
semconv.ServiceNameKey.String(cfg.String("telemetry.service.name")),
semconv.ServiceVersionKey.String(cfg.String("runtime.version")),
attribute.String("environment", cfg.String("runtime.environment")),
))

View file

@ -21,12 +21,11 @@ func LogInterceptor() connect.UnaryInterceptorFunc {
start := time.Now()
resp, err = next(ctx, ar)
dur := time.Since(start)
durFloat := float64(dur) / float64(time.Second)
if err != nil {
slog.ErrorContext(ctx, "RPC Error",
"procedure", ar.Spec().Procedure,
"method", ar.HTTPMethod(),
"duration", fmt.Sprintf("%.3fs", durFloat),
"duration", fmt.Sprintf("%.3fs", dur.Seconds()),
"error", errs.DrillToError(err),
"span.id", span.SpanContext().SpanID().String(),
"trace.id", span.SpanContext().TraceID().String(),
@ -35,7 +34,7 @@ func LogInterceptor() connect.UnaryInterceptorFunc {
slog.InfoContext(ctx, "RPC Call",
"procedure", ar.Spec().Procedure,
"method", ar.HTTPMethod(),
"duration", fmt.Sprintf("%.3fs", durFloat),
"duration", fmt.Sprintf("%.3fs", dur.Seconds()),
"span.id", span.SpanContext().SpanID().String(),
"trace.id", span.SpanContext().TraceID().String(),
)