package serve import ( "context" "errors" "fmt" "log/slog" "net" "net/http" "os" "path/filepath" "strings" "time" "connectrpc.com/connect" "connectrpc.com/otelconnect" "connectrpc.com/validate" "github.com/XSAM/otelsql" sqldblogger "github.com/simukti/sqldb-logger" "github.com/spf13/cobra" "github.com/stephenafamo/bob" "github.com/tigorlazuardi/bluemage/go/api" "github.com/tigorlazuardi/bluemage/go/config" v1DeviceConnect "github.com/tigorlazuardi/bluemage/go/gen/proto/device/v1/devicev1connect" v1SubredditsConnect "github.com/tigorlazuardi/bluemage/go/gen/proto/subreddits/v1/subredditsv1connect" "github.com/tigorlazuardi/bluemage/go/gen/reddit" "github.com/tigorlazuardi/bluemage/go/pkg/errs" "github.com/tigorlazuardi/bluemage/go/pkg/log" "github.com/tigorlazuardi/bluemage/go/pkg/telemetry" "github.com/tigorlazuardi/bluemage/go/server" semconv "go.opentelemetry.io/otel/semconv/v1.26.0" "golang.org/x/net/http2" "golang.org/x/net/http2/h2c" ) var Cmd = &cobra.Command{ Use: "serve", RunE: func(cmd *cobra.Command, args []string) error { ctx := cmd.Context() cfg := config.FromContext(ctx) var cleanups []func() error defer func() { var e []error for _, c := range cleanups { e = append(e, c()) } if err := errors.Join(e...); err != nil { fmt.Println(err.Error()) } }() logHandler, cleanup := log.NewHandler(cfg) cleanups = append(cleanups, cleanup) slog.SetDefault(slog.New(logHandler)) tele, err := telemetry.New(ctx, cfg) if err != nil { return errs.Wrap(err, "failed to create telemetry") } cleanups = append(cleanups, tele.Close) if err := os.MkdirAll(cfg.String("download.directory"), 0755); err != nil { return errs.Wrapw(err, "failed to create download directory", "path", cfg.String("download.directory")) } dbpath := cfg.String("db.path") dir := filepath.Dir(dbpath) if err := os.MkdirAll(dir, 0755); err != nil { return errs.Wrapw(err, "failed to create database directory", "path", dir) } dsn := fmt.Sprintf("file:%s", cfg.String("db.path")) sqldb, err := otelsql.Open(cfg.String("db.driver"), dsn, otelsql.WithAttributes(semconv.DBSystemSqlite)) if err != nil { return errs.Wrapw(err, "failed to open database", "driver", cfg.String("db.driver"), "dsn", dsn, ) } for i, pragma := range cfg.Strings("db.pragma") { split := strings.SplitN(pragma, "=", 2) if len(split) < 2 { return errs.Failw("failed to parse db pragma. expected key=value format", "index", i+1, "pragma", pragma, ) } key, value := split[0], split[1] query := fmt.Sprintf("pragma %s = %s", key, value) _, err = sqldb.Exec(query) if err != nil { return errs.Wrapw(err, "failed to execute pragma", "index", i+1, "pragma", pragma, "query", query, ) } } cleanups = append(cleanups, sqldb.Close) if err := otelsql.RegisterDBStatsMetrics(sqldb, otelsql.WithAttributes(semconv.DBSystemSqlite)); err != nil { return errs.Wrapw( err, "failed to register database stats metrics", "driver", cfg.String("db.driver"), "dsn", dsn, ) } sqldb = sqldblogger.OpenDriver( dsn, sqldb.Driver(), log.SQLLogger{}, sqldblogger.WithSQLQueryAsMessage(true), ) db := bob.New(sqldb) client, err := reddit.NewClient("https://reddit.com", reddit.WithClient(&http.Client{ Transport: log.NewRoundTripper(http.DefaultTransport), })) if err != nil { panic(err) } api := &api.API{ Executor: db, DB: sqldb, Reddit: client, } validationInterceptor, err := validate.NewInterceptor() if err != nil { return errs.Wrap(err, "failed to create validation interceptor") } otelInterceptor, err := otelconnect.NewInterceptor() if err != nil { return errs.Wrap(err, "failed to create otel interceptor") } interceptors := []connect.Interceptor{ validationInterceptor, otelInterceptor, server.ErrorMessageInterceptor(), server.LogInterceptor(), } handlerOpts := []connect.HandlerOption{ connect.WithInterceptors(interceptors...), } mux := http.NewServeMux() mux.Handle(v1DeviceConnect.NewDeviceServiceHandler(&server.DeviceHandler{API: api}, handlerOpts...)) mux.Handle(v1SubredditsConnect.NewSubredditsServiceHandler(&server.SubredditHandler{API: api}, handlerOpts...)) server := &http.Server{ Addr: fmt.Sprintf("%s:%s", cfg.String("http.host"), cfg.String("http.port")), Handler: h2c.NewHandler(server.WithCORS(mux), &http2.Server{}), BaseContext: func(net.Listener) context.Context { return ctx }, } go func() { <-ctx.Done() slog.Info("Exit signal received. Shutting down server") shutdownCtx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() _ = server.Shutdown(shutdownCtx) }() slog.Info("ConnectRPC server started", "addr", server.Addr) err = server.ListenAndServe() if err != nil && !errors.Is(err, http.ErrServerClosed) { return errs.Wrap(err, "failed to serve") } slog.Info("ConnectRPC server stopped") return nil }, SilenceUsage: true, } func Register(cmd *cobra.Command, config *config.Config) { }