Bluemage/go/cmd/bluemage/serve/serve.go

163 lines
4.5 KiB
Go

package serve
import (
"context"
"errors"
"fmt"
"log/slog"
"net"
"net/http"
"os"
"path/filepath"
"time"
"connectrpc.com/connect"
"connectrpc.com/otelconnect"
"connectrpc.com/validate"
"github.com/XSAM/otelsql"
sqldblogger "github.com/simukti/sqldb-logger"
"github.com/spf13/cobra"
"github.com/stephenafamo/bob"
"github.com/tigorlazuardi/bluemage/go/api"
"github.com/tigorlazuardi/bluemage/go/config"
v1DeviceConnect "github.com/tigorlazuardi/bluemage/go/gen/proto/device/v1/devicev1connect"
v1SubredditsConnect "github.com/tigorlazuardi/bluemage/go/gen/proto/subreddits/v1/subredditsv1connect"
"github.com/tigorlazuardi/bluemage/go/gen/reddit"
"github.com/tigorlazuardi/bluemage/go/pkg/errs"
"github.com/tigorlazuardi/bluemage/go/pkg/log"
"github.com/tigorlazuardi/bluemage/go/pkg/telemetry"
"github.com/tigorlazuardi/bluemage/go/server"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
)
var Cmd = &cobra.Command{
Use: "serve",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
cfg := config.FromContext(ctx)
var cleanups []func() error
defer func() {
var e []error
for _, c := range cleanups {
e = append(e, c())
}
if err := errors.Join(e...); err != nil {
fmt.Println(err.Error())
}
}()
logHandler, cleanup := log.NewHandler(cfg)
cleanups = append(cleanups, cleanup)
slog.SetDefault(slog.New(logHandler))
tele, err := telemetry.New(ctx, cfg)
if err != nil {
return errs.Wrap(err, "failed to create telemetry")
}
cleanups = append(cleanups, tele.Close)
if err := os.MkdirAll(cfg.String("download.directory"), 0755); err != nil {
return errs.Wrapw(err, "failed to create download directory", "path", cfg.String("download.directory"))
}
dbpath := cfg.String("db.path")
dir := filepath.Dir(dbpath)
if err := os.MkdirAll(dir, 0755); err != nil {
return errs.Wrapw(err, "failed to create database directory", "path", dir)
}
dsn := fmt.Sprintf("file:%s", cfg.String("db.path"))
sqldb, err := otelsql.Open(cfg.String("db.driver"), dsn, otelsql.WithAttributes(semconv.DBSystemSqlite))
if err != nil {
return errs.Wrapw(err, "failed to open database",
"driver", cfg.String("db.driver"),
"dsn", dsn,
)
}
cleanups = append(cleanups, sqldb.Close)
if err := otelsql.RegisterDBStatsMetrics(sqldb, otelsql.WithAttributes(semconv.DBSystemSqlite)); err != nil {
return errs.Wrapw(
err, "failed to register database stats metrics",
"driver", cfg.String("db.driver"),
"dsn", dsn,
)
}
sqldb = sqldblogger.OpenDriver(
dsn,
sqldb.Driver(),
log.SQLLogger{},
sqldblogger.WithSQLQueryAsMessage(true),
)
db := bob.New(sqldb)
client, err := reddit.NewClient("https://reddit.com", reddit.WithClient(&http.Client{
Transport: log.NewRoundTripper(http.DefaultTransport),
}))
if err != nil {
panic(err)
}
api := &api.API{
Executor: db,
DB: sqldb,
Reddit: client,
}
validationInterceptor, err := validate.NewInterceptor()
if err != nil {
return errs.Wrap(err, "failed to create validation interceptor")
}
otelInterceptor, err := otelconnect.NewInterceptor()
if err != nil {
return errs.Wrap(err, "failed to create otel interceptor")
}
interceptors := []connect.Interceptor{
validationInterceptor,
otelInterceptor,
server.ErrorMessageInterceptor(),
server.LogInterceptor(),
}
handlerOpts := []connect.HandlerOption{
connect.WithInterceptors(interceptors...),
}
mux := http.NewServeMux()
mux.Handle(v1DeviceConnect.NewDeviceServiceHandler(&server.DeviceHandler{API: api}, handlerOpts...))
mux.Handle(v1SubredditsConnect.NewSubredditsServiceHandler(&server.SubredditHandler{API: api}, handlerOpts...))
server := &http.Server{
Addr: fmt.Sprintf("%s:%s", cfg.String("http.host"), cfg.String("http.port")),
Handler: h2c.NewHandler(server.WithCORS(mux), &http2.Server{}),
BaseContext: func(net.Listener) context.Context {
return ctx
},
}
go func() {
<-ctx.Done()
slog.Info("Exit signal received. Shutting down server")
shutdownCtx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
_ = server.Shutdown(shutdownCtx)
}()
slog.Info("ConnectRPC server started", "addr", server.Addr)
err = server.ListenAndServe()
if err != nil && !errors.Is(err, http.ErrServerClosed) {
return errs.Wrap(err, "failed to serve")
}
slog.Info("ConnectRPC server stopped")
return nil
},
SilenceUsage: true,
}
func Register(cmd *cobra.Command, config *config.Config) {
}