152 lines
4 KiB
Go
152 lines
4 KiB
Go
package serve
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"net"
|
|
"net/http"
|
|
"os"
|
|
"path/filepath"
|
|
"time"
|
|
|
|
"connectrpc.com/connect"
|
|
"connectrpc.com/otelconnect"
|
|
"connectrpc.com/validate"
|
|
"github.com/XSAM/otelsql"
|
|
sqldblogger "github.com/simukti/sqldb-logger"
|
|
"github.com/spf13/cobra"
|
|
"github.com/stephenafamo/bob"
|
|
"github.com/tigorlazuardi/bluemage/go/api"
|
|
"github.com/tigorlazuardi/bluemage/go/config"
|
|
"github.com/tigorlazuardi/bluemage/go/gen/proto/device/v1/v1connect"
|
|
"github.com/tigorlazuardi/bluemage/go/pkg/errs"
|
|
"github.com/tigorlazuardi/bluemage/go/pkg/log"
|
|
"github.com/tigorlazuardi/bluemage/go/pkg/telemetry"
|
|
"github.com/tigorlazuardi/bluemage/go/server"
|
|
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
|
|
"golang.org/x/net/http2"
|
|
"golang.org/x/net/http2/h2c"
|
|
)
|
|
|
|
var Cmd = &cobra.Command{
|
|
Use: "serve",
|
|
RunE: func(cmd *cobra.Command, args []string) error {
|
|
ctx := cmd.Context()
|
|
cfg := config.FromContext(ctx)
|
|
var cleanups []func() error
|
|
defer func() {
|
|
var e []error
|
|
for _, c := range cleanups {
|
|
e = append(e, c())
|
|
}
|
|
if err := errors.Join(e...); err != nil {
|
|
fmt.Println(err.Error())
|
|
}
|
|
}()
|
|
|
|
logHandler, cleanup := log.NewHandler(cfg)
|
|
cleanups = append(cleanups, cleanup)
|
|
slog.SetDefault(slog.New(logHandler))
|
|
|
|
tele, err := telemetry.New(ctx, cfg)
|
|
if err != nil {
|
|
return errs.Wrap(err, "failed to create telemetry")
|
|
}
|
|
cleanups = append(cleanups, tele.Close)
|
|
|
|
if err := os.MkdirAll(cfg.String("download.directory"), 0755); err != nil {
|
|
return errs.Wrapw(err, "failed to create download directory", "path", cfg.String("download.directory"))
|
|
}
|
|
|
|
dbpath := cfg.String("db.path")
|
|
dir := filepath.Dir(dbpath)
|
|
if err := os.MkdirAll(dir, 0755); err != nil {
|
|
return errs.Wrapw(err, "failed to create database directory", "path", dir)
|
|
}
|
|
|
|
dsn := fmt.Sprintf("file:%s", cfg.String("db.path"))
|
|
sqldb, err := otelsql.Open(cfg.String("db.driver"), dsn, otelsql.WithAttributes(semconv.DBSystemSqlite))
|
|
if err != nil {
|
|
return errs.Wrapw(err, "failed to open database",
|
|
"driver", cfg.String("db.driver"),
|
|
"dsn", dsn,
|
|
)
|
|
}
|
|
cleanups = append(cleanups, sqldb.Close)
|
|
if err := otelsql.RegisterDBStatsMetrics(sqldb, otelsql.WithAttributes(semconv.DBSystemSqlite)); err != nil {
|
|
return errs.Wrapw(
|
|
err, "failed to register database stats metrics",
|
|
"driver", cfg.String("db.driver"),
|
|
"dsn", dsn,
|
|
)
|
|
}
|
|
|
|
sqldb = sqldblogger.OpenDriver(
|
|
dsn,
|
|
sqldb.Driver(),
|
|
log.SQLLogger{},
|
|
sqldblogger.WithSQLQueryAsMessage(true),
|
|
)
|
|
db := bob.New(sqldb)
|
|
|
|
api := &api.API{
|
|
Executor: db,
|
|
DB: sqldb,
|
|
}
|
|
|
|
handler := &server.Server{
|
|
DeviceHandler: server.DeviceHandler{
|
|
API: api,
|
|
},
|
|
}
|
|
|
|
validationInterceptor, err := validate.NewInterceptor()
|
|
if err != nil {
|
|
return errs.Wrap(err, "failed to create validation interceptor")
|
|
}
|
|
|
|
otelInterceptor, err := otelconnect.NewInterceptor()
|
|
if err != nil {
|
|
return errs.Wrap(err, "failed to create otel interceptor")
|
|
}
|
|
|
|
mux := http.NewServeMux()
|
|
mux.Handle(v1connect.NewDeviceServiceHandler(handler, connect.WithInterceptors(
|
|
validationInterceptor,
|
|
otelInterceptor,
|
|
server.ErrorMessageInterceptor(),
|
|
server.LogInterceptor(),
|
|
)))
|
|
|
|
server := &http.Server{
|
|
Addr: fmt.Sprintf("%s:%s", cfg.String("http.host"), cfg.String("http.port")),
|
|
Handler: h2c.NewHandler(server.WithCORS(mux), &http2.Server{}),
|
|
BaseContext: func(net.Listener) context.Context {
|
|
return ctx
|
|
},
|
|
}
|
|
|
|
go func() {
|
|
<-ctx.Done()
|
|
slog.Info("Exit signal received. Shutting down server")
|
|
shutdownCtx, cancel := context.WithTimeout(context.Background(), time.Second*5)
|
|
defer cancel()
|
|
_ = server.Shutdown(shutdownCtx)
|
|
}()
|
|
|
|
slog.Info("ConnectRPC server started", "addr", server.Addr)
|
|
err = server.ListenAndServe()
|
|
if err != nil && !errors.Is(err, http.ErrServerClosed) {
|
|
return errs.Wrap(err, "failed to serve")
|
|
}
|
|
slog.Info("ConnectRPC server stopped")
|
|
return nil
|
|
},
|
|
SilenceUsage: true,
|
|
}
|
|
|
|
func Register(cmd *cobra.Command, config *config.Config) {
|
|
}
|