163 lines
4.5 KiB
Go
163 lines
4.5 KiB
Go
package serve
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"net"
|
|
"net/http"
|
|
"os"
|
|
"path/filepath"
|
|
"time"
|
|
|
|
"connectrpc.com/connect"
|
|
"connectrpc.com/otelconnect"
|
|
"connectrpc.com/validate"
|
|
"github.com/XSAM/otelsql"
|
|
sqldblogger "github.com/simukti/sqldb-logger"
|
|
"github.com/spf13/cobra"
|
|
"github.com/stephenafamo/bob"
|
|
"github.com/tigorlazuardi/bluemage/go/api"
|
|
"github.com/tigorlazuardi/bluemage/go/config"
|
|
v1DeviceConnect "github.com/tigorlazuardi/bluemage/go/gen/proto/device/v1/devicev1connect"
|
|
v1SubredditsConnect "github.com/tigorlazuardi/bluemage/go/gen/proto/subreddits/v1/subredditsv1connect"
|
|
"github.com/tigorlazuardi/bluemage/go/gen/reddit"
|
|
"github.com/tigorlazuardi/bluemage/go/pkg/errs"
|
|
"github.com/tigorlazuardi/bluemage/go/pkg/log"
|
|
"github.com/tigorlazuardi/bluemage/go/pkg/telemetry"
|
|
"github.com/tigorlazuardi/bluemage/go/server"
|
|
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
|
|
"golang.org/x/net/http2"
|
|
"golang.org/x/net/http2/h2c"
|
|
)
|
|
|
|
var Cmd = &cobra.Command{
|
|
Use: "serve",
|
|
RunE: func(cmd *cobra.Command, args []string) error {
|
|
ctx := cmd.Context()
|
|
cfg := config.FromContext(ctx)
|
|
var cleanups []func() error
|
|
defer func() {
|
|
var e []error
|
|
for _, c := range cleanups {
|
|
e = append(e, c())
|
|
}
|
|
if err := errors.Join(e...); err != nil {
|
|
fmt.Println(err.Error())
|
|
}
|
|
}()
|
|
|
|
logHandler, cleanup := log.NewHandler(cfg)
|
|
cleanups = append(cleanups, cleanup)
|
|
slog.SetDefault(slog.New(logHandler))
|
|
|
|
tele, err := telemetry.New(ctx, cfg)
|
|
if err != nil {
|
|
return errs.Wrap(err, "failed to create telemetry")
|
|
}
|
|
cleanups = append(cleanups, tele.Close)
|
|
|
|
if err := os.MkdirAll(cfg.String("download.directory"), 0755); err != nil {
|
|
return errs.Wrapw(err, "failed to create download directory", "path", cfg.String("download.directory"))
|
|
}
|
|
|
|
dbpath := cfg.String("db.path")
|
|
dir := filepath.Dir(dbpath)
|
|
if err := os.MkdirAll(dir, 0755); err != nil {
|
|
return errs.Wrapw(err, "failed to create database directory", "path", dir)
|
|
}
|
|
|
|
dsn := fmt.Sprintf("file:%s", cfg.String("db.path"))
|
|
sqldb, err := otelsql.Open(cfg.String("db.driver"), dsn, otelsql.WithAttributes(semconv.DBSystemSqlite))
|
|
if err != nil {
|
|
return errs.Wrapw(err, "failed to open database",
|
|
"driver", cfg.String("db.driver"),
|
|
"dsn", dsn,
|
|
)
|
|
}
|
|
cleanups = append(cleanups, sqldb.Close)
|
|
if err := otelsql.RegisterDBStatsMetrics(sqldb, otelsql.WithAttributes(semconv.DBSystemSqlite)); err != nil {
|
|
return errs.Wrapw(
|
|
err, "failed to register database stats metrics",
|
|
"driver", cfg.String("db.driver"),
|
|
"dsn", dsn,
|
|
)
|
|
}
|
|
|
|
sqldb = sqldblogger.OpenDriver(
|
|
dsn,
|
|
sqldb.Driver(),
|
|
log.SQLLogger{},
|
|
sqldblogger.WithSQLQueryAsMessage(true),
|
|
)
|
|
db := bob.New(sqldb)
|
|
|
|
client, err := reddit.NewClient("https://reddit.com", reddit.WithClient(&http.Client{
|
|
Transport: log.NewRoundTripper(http.DefaultTransport),
|
|
}))
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
api := &api.API{
|
|
Executor: db,
|
|
DB: sqldb,
|
|
Reddit: client,
|
|
}
|
|
|
|
validationInterceptor, err := validate.NewInterceptor()
|
|
if err != nil {
|
|
return errs.Wrap(err, "failed to create validation interceptor")
|
|
}
|
|
|
|
otelInterceptor, err := otelconnect.NewInterceptor()
|
|
if err != nil {
|
|
return errs.Wrap(err, "failed to create otel interceptor")
|
|
}
|
|
|
|
interceptors := []connect.Interceptor{
|
|
validationInterceptor,
|
|
otelInterceptor,
|
|
server.ErrorMessageInterceptor(),
|
|
server.LogInterceptor(),
|
|
}
|
|
|
|
handlerOpts := []connect.HandlerOption{
|
|
connect.WithInterceptors(interceptors...),
|
|
}
|
|
|
|
mux := http.NewServeMux()
|
|
mux.Handle(v1DeviceConnect.NewDeviceServiceHandler(&server.DeviceHandler{API: api}, handlerOpts...))
|
|
mux.Handle(v1SubredditsConnect.NewSubredditsServiceHandler(&server.SubredditHandler{API: api}, handlerOpts...))
|
|
|
|
server := &http.Server{
|
|
Addr: fmt.Sprintf("%s:%s", cfg.String("http.host"), cfg.String("http.port")),
|
|
Handler: h2c.NewHandler(server.WithCORS(mux), &http2.Server{}),
|
|
BaseContext: func(net.Listener) context.Context {
|
|
return ctx
|
|
},
|
|
}
|
|
|
|
go func() {
|
|
<-ctx.Done()
|
|
slog.Info("Exit signal received. Shutting down server")
|
|
shutdownCtx, cancel := context.WithTimeout(context.Background(), time.Second*5)
|
|
defer cancel()
|
|
_ = server.Shutdown(shutdownCtx)
|
|
}()
|
|
|
|
slog.Info("ConnectRPC server started", "addr", server.Addr)
|
|
err = server.ListenAndServe()
|
|
if err != nil && !errors.Is(err, http.ErrServerClosed) {
|
|
return errs.Wrap(err, "failed to serve")
|
|
}
|
|
slog.Info("ConnectRPC server stopped")
|
|
return nil
|
|
},
|
|
SilenceUsage: true,
|
|
}
|
|
|
|
func Register(cmd *cobra.Command, config *config.Config) {
|
|
}
|