package serve import ( "context" "database/sql" "errors" "fmt" "log/slog" "net" "net/http" "os" "time" "connectrpc.com/connect" "connectrpc.com/otelconnect" "connectrpc.com/validate" sqldblogger "github.com/simukti/sqldb-logger" "github.com/spf13/cobra" "github.com/stephenafamo/bob" "github.com/tigorlazuardi/bluemage/go/api" "github.com/tigorlazuardi/bluemage/go/config" "github.com/tigorlazuardi/bluemage/go/gen/proto/device/v1/v1connect" "github.com/tigorlazuardi/bluemage/go/pkg/errs" "github.com/tigorlazuardi/bluemage/go/pkg/log" "github.com/tigorlazuardi/bluemage/go/pkg/telemetry" "github.com/tigorlazuardi/bluemage/go/server" "golang.org/x/net/http2" "golang.org/x/net/http2/h2c" ) var Cmd = &cobra.Command{ Use: "serve", RunE: func(cmd *cobra.Command, args []string) error { ctx := cmd.Context() cfg := config.FromContext(ctx) var cleanups []func() error defer func() { var e []error for _, c := range cleanups { e = append(e, c()) } if err := errors.Join(e...); err != nil { fmt.Println(err.Error()) } }() logHandler, cleanup := log.NewHandler(cfg) cleanups = append(cleanups, cleanup) slog.SetDefault(slog.New(logHandler)) tele, err := telemetry.New(ctx, cfg) if err != nil { return errs.Wrap(err, "failed to create telemetry") } cleanups = append(cleanups, tele.Close) if err := os.MkdirAll(cfg.String("download.directory"), 0755); err != nil { return errs.Wrapw(err, "failed to create download directory", "path", cfg.String("download.directory")) } dbPath := fmt.Sprintf("file:%s", cfg.String("db.path")) sqldb, err := sql.Open(cfg.String("db.driver"), dbPath) if err != nil { return errs.Wrapw(err, "failed to open database", "driver", cfg.String("db.driver"), "dsn", cfg.String("db.dsn"), ) } cleanups = append(cleanups, sqldb.Close) sqldb = sqldblogger.OpenDriver( dbPath, sqldb.Driver(), log.SQLLogger{}, sqldblogger.WithSQLQueryAsMessage(true), ) db := bob.New(sqldb) api := &api.API{ Executor: db, DB: sqldb, } handler := &server.Server{ DeviceHandler: server.DeviceHandler{ API: api, }, } validationInterceptor, err := validate.NewInterceptor() if err != nil { return errs.Wrap(err, "failed to create validation interceptor") } otelInterceptor, err := otelconnect.NewInterceptor() if err != nil { return errs.Wrap(err, "failed to create otel interceptor") } mux := http.NewServeMux() mux.Handle(v1connect.NewDeviceServiceHandler(handler, connect.WithInterceptors( validationInterceptor, otelInterceptor, server.LogInterceptor(), ))) server := &http.Server{ Addr: fmt.Sprintf("%s:%s", cfg.String("http.host"), cfg.String("http.port")), Handler: h2c.NewHandler(server.WithCORS(mux), &http2.Server{}), BaseContext: func(net.Listener) context.Context { return ctx }, } go func() { <-ctx.Done() slog.Info("Exit signal received. Shutting down server") shutdownCtx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() _ = server.Shutdown(shutdownCtx) }() slog.Info("ConnectRPC server started", "addr", server.Addr) err = server.ListenAndServe() if err != nil && !errors.Is(err, http.ErrServerClosed) { return errs.Wrap(err, "failed to serve") } slog.Info("ConnectRPC server stopped") return nil }, SilenceUsage: true, } func Register(cmd *cobra.Command, config *config.Config) { }