122 lines
3.6 KiB
Go
122 lines
3.6 KiB
Go
package telemetry
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/tigorlazuardi/bluemage/go/config"
|
|
"github.com/tigorlazuardi/bluemage/go/pkg/errs"
|
|
|
|
"go.opentelemetry.io/otel"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
|
|
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
|
|
"go.opentelemetry.io/otel/propagation"
|
|
"go.opentelemetry.io/otel/sdk/metric"
|
|
"go.opentelemetry.io/otel/sdk/resource"
|
|
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
|
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
|
|
)
|
|
|
|
type Telemetry struct {
|
|
tracer *sdktrace.TracerProvider
|
|
metric *metric.MeterProvider
|
|
}
|
|
|
|
func New(ctx context.Context, cfg *config.Config) (tele Telemetry, err error) {
|
|
otel.SetTextMapPropagator(createPropagator())
|
|
res := newResource(cfg)
|
|
tracerProvider, err := createTracerProvider(ctx, cfg, res)
|
|
if err != nil {
|
|
return tele, err
|
|
}
|
|
if cfg.Bool("telemetry.openobserve.enable") && cfg.Bool("telemetry.openobserve.metrics.enable") {
|
|
metricsProvider, err := createMetricsProvider(ctx, cfg, res)
|
|
if err != nil {
|
|
return tele, err
|
|
}
|
|
otel.SetMeterProvider(metricsProvider)
|
|
tele.metric = metricsProvider
|
|
}
|
|
tele.tracer = tracerProvider
|
|
otel.SetTracerProvider(tracerProvider)
|
|
return tele, err
|
|
}
|
|
|
|
func createPropagator() propagation.TextMapPropagator {
|
|
return propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})
|
|
}
|
|
|
|
func createTracerProvider(ctx context.Context, cfg *config.Config, res *resource.Resource) (*sdktrace.TracerProvider, error) {
|
|
opts := []sdktrace.TracerProviderOption{
|
|
sdktrace.WithResource(res),
|
|
sdktrace.WithSampler(sdktrace.TraceIDRatioBased(cfg.Float64("telemetry.traces.ratio"))),
|
|
}
|
|
|
|
if cfg.Bool("telemetry.openobserve.enable") && cfg.Bool("telemetry.openobserve.traces.enable") {
|
|
url := cfg.String("telemetry.openobserve.traces.url")
|
|
o2exporter, err := otlptracehttp.New(ctx,
|
|
otlptracehttp.WithEndpointURL(url),
|
|
otlptracehttp.WithHeaders(map[string]string{
|
|
"Authorization": cfg.String("telemetry.openobserve.traces.auth"),
|
|
}),
|
|
)
|
|
if err != nil {
|
|
return nil, errs.Wrap(err, "failed to create otlptracehttp exporter")
|
|
}
|
|
opts = append(opts, sdktrace.WithBatcher(o2exporter))
|
|
}
|
|
|
|
return sdktrace.NewTracerProvider(opts...), nil
|
|
}
|
|
|
|
func createMetricsProvider(ctx context.Context, cfg *config.Config, res *resource.Resource) (*metric.MeterProvider, error) {
|
|
exp, err := otlpmetrichttp.New(ctx,
|
|
otlpmetrichttp.WithEndpointURL(cfg.String("telemetry.openobserve.metrics.url")),
|
|
otlpmetrichttp.WithHeaders(map[string]string{
|
|
"Authorization": cfg.String("telemetry.openobserve.metrics.auth"),
|
|
}),
|
|
)
|
|
if err != nil {
|
|
return nil, errs.Wrap(err, "failed to create otlpmetrichttp exporter")
|
|
}
|
|
|
|
return metric.NewMeterProvider(
|
|
metric.WithReader(
|
|
metric.NewPeriodicReader(
|
|
exp,
|
|
metric.WithInterval(cfg.Duration("telemetry.openobserve.metrics.interval")),
|
|
),
|
|
),
|
|
metric.WithResource(res),
|
|
), nil
|
|
}
|
|
|
|
func (te *Telemetry) Close() error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
return errs.Close(
|
|
func() error { return te.tracer.Shutdown(ctx) },
|
|
func() error {
|
|
if te.metric != nil {
|
|
return te.metric.Shutdown(ctx)
|
|
}
|
|
return nil
|
|
},
|
|
)
|
|
}
|
|
|
|
func newResource(cfg *config.Config) *resource.Resource {
|
|
res, err := resource.Merge(resource.Default(), resource.NewWithAttributes(
|
|
semconv.SchemaURL,
|
|
semconv.ServiceNameKey.String(cfg.String("telemetry.service.name")),
|
|
semconv.ServiceVersionKey.String(cfg.String("runtime.version")),
|
|
attribute.String("environment", cfg.String("runtime.environment")),
|
|
))
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
return res
|
|
}
|