api: added telemetry and migrate to jet for devices endpoint
This commit is contained in:
parent
385d498ab3
commit
4872e653ef
|
@ -1,9 +1,9 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"sync"
|
||||
|
||||
"github.com/go-jet/jet/v2/qrm"
|
||||
"github.com/stephenafamo/bob"
|
||||
"github.com/tigorlazuardi/bluemage/go/gen/converter"
|
||||
)
|
||||
|
@ -13,7 +13,7 @@ var convert converter.DeviceConverterImpl
|
|||
type API struct {
|
||||
mu sync.Mutex
|
||||
Executor bob.Executor
|
||||
DB qrm.Queryable
|
||||
DB *sql.DB
|
||||
}
|
||||
|
||||
func (api *API) lockf(f func()) {
|
||||
|
|
|
@ -10,6 +10,10 @@ import (
|
|||
"github.com/tigorlazuardi/bluemage/go/gen/models"
|
||||
device "github.com/tigorlazuardi/bluemage/go/gen/proto/device/v1"
|
||||
"github.com/tigorlazuardi/bluemage/go/pkg/errs"
|
||||
"github.com/tigorlazuardi/bluemage/go/pkg/telemetry"
|
||||
|
||||
. "github.com/go-jet/jet/v2/sqlite"
|
||||
. "github.com/tigorlazuardi/bluemage/go/gen/jet/table"
|
||||
)
|
||||
|
||||
func queryFromCountDevicesRequest(req *device.CountDevicesRequest) (expr []bob.Mod[*dialect.SelectQuery]) {
|
||||
|
@ -30,10 +34,29 @@ func queryFromCountDevicesRequest(req *device.CountDevicesRequest) (expr []bob.M
|
|||
return expr
|
||||
}
|
||||
|
||||
func (api *API) DevicesCount(ctx context.Context, request *device.CountDevicesRequest) (uint64, error) {
|
||||
count, err := models.Devices.Query(ctx, api.Executor, queryFromCountDevicesRequest(request)...).Count()
|
||||
if err != nil {
|
||||
return 0, errs.Wrapw(err, "failed to count devices", "request", request)
|
||||
func (api *API) DevicesCount(ctx context.Context, request *device.CountDevicesRequest) (count uint64, err error) {
|
||||
ctx, span := tracer.Start(ctx, "DevicesCount")
|
||||
defer func() { telemetry.EndWithStatus(span, err) }()
|
||||
|
||||
cond := Bool(true)
|
||||
switch request.Disabled {
|
||||
case device.DisabledFilter_DISABLED_FILTER_TRUE:
|
||||
cond.AND(Devices.Disabled.EQ(Int(1)))
|
||||
case device.DisabledFilter_DISABLED_FILTER_FALSE:
|
||||
cond.AND(Devices.Disabled.EQ(Int(0)))
|
||||
}
|
||||
return uint64(count), nil
|
||||
if request.Search != "" {
|
||||
cond.AND(Devices.Name.LIKE(String("%" + request.Search + "%")))
|
||||
}
|
||||
|
||||
stmt := SELECT(COUNT(Int(1))).WHERE(cond)
|
||||
query, args := stmt.Sql()
|
||||
err = api.DB.QueryRowContext(ctx, query, args...).Scan(&count)
|
||||
if err != nil {
|
||||
return count, errs.Wrapw(err, "failed to count devices",
|
||||
"request", request,
|
||||
"query", stmt.DebugSql(),
|
||||
)
|
||||
}
|
||||
return count, nil
|
||||
}
|
||||
|
|
|
@ -6,43 +6,54 @@ import (
|
|||
"time"
|
||||
|
||||
"connectrpc.com/connect"
|
||||
"github.com/aarondl/opt/omit"
|
||||
"github.com/mattn/go-sqlite3"
|
||||
"github.com/tigorlazuardi/bluemage/go/gen/models"
|
||||
"github.com/tigorlazuardi/bluemage/go/pkg/errs"
|
||||
"github.com/tigorlazuardi/bluemage/go/pkg/log"
|
||||
"github.com/tigorlazuardi/bluemage/go/pkg/telemetry"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
)
|
||||
|
||||
func (api *API) DevicesCreate(ctx context.Context, params *models.Device) (device *models.Device, err error) {
|
||||
var (
|
||||
deviceCreatedCounter, _ = meter.Int64Counter("devices.created",
|
||||
metric.WithDescription("The number of devices have created."),
|
||||
)
|
||||
deviceCreatedHistogram, _ = meter.Int64Histogram(
|
||||
"devices.creating.duration",
|
||||
metric.WithDescription("The duration for creating devices."),
|
||||
metric.WithUnit("ms"),
|
||||
)
|
||||
)
|
||||
|
||||
func (api *API) DevicesCreate(ctx context.Context, params *models.DeviceSetter) (device *models.Device, err error) {
|
||||
ctx, span := tracer.Start(ctx, "DevicesCreate")
|
||||
defer func() { telemetry.EndWithStatus(span, err) }()
|
||||
start := time.Now()
|
||||
|
||||
ctx, coll := log.WithQueryCollector(ctx)
|
||||
now := time.Now()
|
||||
api.lockf(func() {
|
||||
device, err = models.Devices.Insert(ctx, api.Executor, &models.DeviceSetter{
|
||||
Slug: omit.From(params.Slug),
|
||||
Name: omit.From(params.Name),
|
||||
ResolutionX: omit.From(params.ResolutionX),
|
||||
ResolutionY: omit.From(params.ResolutionY),
|
||||
AspectRatioTolerance: omit.From(params.AspectRatioTolerance),
|
||||
MinX: omit.From(params.MinX),
|
||||
MinY: omit.From(params.MinY),
|
||||
MaxX: omit.From(params.MaxX),
|
||||
MaxY: omit.From(params.MaxY),
|
||||
NSFW: omit.From(params.NSFW),
|
||||
SingleFolderMode: omit.From(params.SingleFolderMode),
|
||||
Disabled: omit.From(params.Disabled),
|
||||
CreatedAt: omit.From(now.Unix()),
|
||||
UpdatedAt: omit.From(now.Unix()),
|
||||
})
|
||||
device, err = models.Devices.Insert(ctx, api.Executor, params)
|
||||
})
|
||||
if err != nil {
|
||||
if sqliteErr := new(sqlite3.Error); errors.As(err, sqliteErr) {
|
||||
if sqliteErr.Code == sqlite3.ErrConstraint {
|
||||
return nil, errs.
|
||||
Wrapw(sqliteErr, "device already exists", "params", params).
|
||||
Wrapw(sqliteErr, "device already exists", "params", params, "query", coll).
|
||||
Code(connect.CodeAlreadyExists)
|
||||
}
|
||||
}
|
||||
return nil, errs.Wrapw(err, "failed to create device", "params", params, "query", coll)
|
||||
return nil, errs.
|
||||
Wrapw(err, "failed to create device",
|
||||
"params", params,
|
||||
"query", coll,
|
||||
)
|
||||
}
|
||||
dur := time.Since(start)
|
||||
deviceCreatedCounter.Add(ctx, 1,
|
||||
metric.WithAttributes(
|
||||
attribute.String("slug", params.Slug.GetOrZero()),
|
||||
))
|
||||
deviceCreatedHistogram.Record(ctx, dur.Milliseconds())
|
||||
return device, nil
|
||||
}
|
||||
|
|
|
@ -5,23 +5,16 @@ import (
|
|||
"errors"
|
||||
|
||||
"connectrpc.com/connect"
|
||||
"github.com/aarondl/opt/omit"
|
||||
"github.com/mattn/go-sqlite3"
|
||||
"github.com/tigorlazuardi/bluemage/go/gen/models"
|
||||
"github.com/tigorlazuardi/bluemage/go/pkg/errs"
|
||||
)
|
||||
|
||||
func (api *API) SubredditCreate(ctx context.Context, subreddit *models.Subreddit) (err error) {
|
||||
func (api *API) SubredditCreate(ctx context.Context, subreddit *models.SubredditSetter) (err error) {
|
||||
// TODO: add check to Reddit API to see if subreddit exists.
|
||||
|
||||
api.lockf(func() {
|
||||
_, err = models.Subreddits.Insert(ctx, api.Executor, &models.SubredditSetter{
|
||||
Name: omit.From(subreddit.Name),
|
||||
DisableScheduler: omit.From(subreddit.DisableScheduler),
|
||||
Type: omit.From(subreddit.Type),
|
||||
Schedule: omit.From(subreddit.Schedule),
|
||||
Countback: omit.From(subreddit.Countback),
|
||||
})
|
||||
_, err = models.Subreddits.Insert(ctx, api.Executor, subreddit)
|
||||
})
|
||||
if err != nil {
|
||||
if sqlite3err := new(sqlite3.Error); errors.As(err, &sqlite3err) {
|
||||
|
|
|
@ -10,13 +10,23 @@ import (
|
|||
// goverter:extend BoolToInt8
|
||||
// goverter:extend Int8ToBool
|
||||
// goverter:extend PtrBoolToOmitInt8
|
||||
// goverter:extend BoolToOmitInt8
|
||||
// goverter:extend PtrStringToOmitString
|
||||
// goverter:extend PtrFloat64ToOmitFloat64
|
||||
// goverter:extend PtrInt32ToOmitInt32
|
||||
// goverter:extend PtrInt64ToOmitInt64
|
||||
// goverter:extend PtrIntToOmitInt
|
||||
// goverter:extend PtrInt8ToOmitInt8
|
||||
// goverter:extend IntToOmitInt
|
||||
// goverter:extend Int8ToOmitInt8
|
||||
// goverter:extend Int32ToOmitInt32
|
||||
// goverter:extend Int64ToOmitInt64
|
||||
// goverter:extend Float64ToOmitFloat64
|
||||
// goverter:extend StringToOmitString
|
||||
type DeviceConverter interface {
|
||||
// goverter:ignore CreatedAt UpdatedAt R
|
||||
// goverter:ignore CreatedAt UpdatedAt
|
||||
// goverter:map Nsfw NSFW
|
||||
CreateDeviceRequestToModelsDevice(*device.CreateDeviceRequest) *models.Device
|
||||
CreateDeviceRequestToModelsDeviceSetter(*device.CreateDeviceRequest) *models.DeviceSetter
|
||||
// goverter:ignore state sizeCache unknownFields
|
||||
ModelsDeviceToCreateDeviceResponse(*models.Device) *device.CreateDeviceResponse
|
||||
// goverter:ignore state sizeCache unknownFields
|
||||
|
@ -50,18 +60,57 @@ func PtrStringToOmitString(s *string) omit.Val[string] {
|
|||
return omit.FromPtr(s)
|
||||
}
|
||||
|
||||
func StringToOmitString(s string) omit.Val[string] {
|
||||
return omit.From(s)
|
||||
}
|
||||
|
||||
func PtrFloat64ToOmitFloat64(f *float64) omit.Val[float64] {
|
||||
return omit.FromPtr(f)
|
||||
}
|
||||
|
||||
func Float64ToOmitFloat64(f float64) omit.Val[float64] {
|
||||
return omit.From(f)
|
||||
}
|
||||
|
||||
func PtrIntToOmitInt(i *int) omit.Val[int] {
|
||||
return omit.FromPtr(i)
|
||||
}
|
||||
|
||||
func IntToOmitInt(i int) omit.Val[int] {
|
||||
return omit.From(i)
|
||||
}
|
||||
|
||||
func PtrInt8ToOmitInt8(i *int8) omit.Val[int8] {
|
||||
return omit.FromPtr(i)
|
||||
}
|
||||
|
||||
func Int8ToOmitInt8(i int8) omit.Val[int8] {
|
||||
return omit.From(i)
|
||||
}
|
||||
|
||||
func PtrInt32ToOmitInt32(i *int32) omit.Val[int32] {
|
||||
return omit.FromPtr(i)
|
||||
}
|
||||
|
||||
func Int32ToOmitInt32(i int32) omit.Val[int32] {
|
||||
return omit.From(i)
|
||||
}
|
||||
|
||||
func PtrInt64ToOmitInt64(i *int64) omit.Val[int64] {
|
||||
return omit.FromPtr(i)
|
||||
}
|
||||
|
||||
func Int64ToOmitInt64(i int64) omit.Val[int64] {
|
||||
return omit.From(i)
|
||||
}
|
||||
|
||||
func BoolToOmitInt8(b bool) omit.Val[int8] {
|
||||
if b {
|
||||
return omit.From(int8(1))
|
||||
}
|
||||
return omit.From(int8(0))
|
||||
}
|
||||
|
||||
func Int8ToBool(i int8) bool {
|
||||
return i > 0
|
||||
}
|
||||
|
|
|
@ -1,16 +1,33 @@
|
|||
package converts
|
||||
|
||||
import (
|
||||
"github.com/aarondl/opt/omit"
|
||||
"github.com/tigorlazuardi/bluemage/go/gen/models"
|
||||
subreddits "github.com/tigorlazuardi/bluemage/go/gen/proto/subreddits/v1"
|
||||
)
|
||||
|
||||
// goverter:converter
|
||||
// goverter:extend BoolToInt8 SubredditTypeToString Int8ToBool PtrBoolToOmitInt8
|
||||
// goverter:extend PtrStringToOmitString PtrFloat64ToOmitFloat64 PtrInt32ToOmitInt32
|
||||
// goverter:extend BoolToInt8
|
||||
// goverter:extend Int8ToBool
|
||||
// goverter:extend PtrBoolToOmitInt8
|
||||
// goverter:extend BoolToOmitInt8
|
||||
// goverter:extend PtrStringToOmitString
|
||||
// goverter:extend PtrFloat64ToOmitFloat64
|
||||
// goverter:extend PtrInt32ToOmitInt32
|
||||
// goverter:extend PtrInt64ToOmitInt64
|
||||
// goverter:extend PtrIntToOmitInt
|
||||
// goverter:extend PtrInt8ToOmitInt8
|
||||
// goverter:extend IntToOmitInt
|
||||
// goverter:extend Int8ToOmitInt8
|
||||
// goverter:extend Int32ToOmitInt32
|
||||
// goverter:extend Int64ToOmitInt64
|
||||
// goverter:extend Float64ToOmitFloat64
|
||||
// goverter:extend StringToOmitString
|
||||
// goverter:extend SubredditTypeToString
|
||||
// goverter:extend SubredditTypeToOmitString
|
||||
type SubredditConverter interface {
|
||||
// goverter:ignore CreatedAt UpdatedAt R CoverImageID
|
||||
CreateSubredditRequestToModelsSubreddit(*subreddits.CreateSubredditRequest) *models.Subreddit
|
||||
// goverter:ignore CreatedAt UpdatedAt CoverImageID
|
||||
CreateSubredditRequestToModelsSubredditSetter(*subreddits.CreateSubredditRequest) *models.SubredditSetter
|
||||
}
|
||||
|
||||
func SubredditTypeToString(subType subreddits.SubredditType) string {
|
||||
|
@ -19,3 +36,10 @@ func SubredditTypeToString(subType subreddits.SubredditType) string {
|
|||
}
|
||||
return "r"
|
||||
}
|
||||
|
||||
func SubredditTypeToOmitString(subType subreddits.SubredditType) omit.Val[string] {
|
||||
if subType == subreddits.SubredditType_SUBREDDIT_TYPE_USER {
|
||||
return omit.From("user")
|
||||
}
|
||||
return omit.From("r")
|
||||
}
|
||||
|
|
|
@ -23,8 +23,8 @@ var deviceConvert converter.DeviceConverterImpl
|
|||
|
||||
// CreateDevice implements v1connect.DeviceServiceHandler.
|
||||
func (d *DeviceHandler) CreateDevice(ctx context.Context, request *connect.Request[device.CreateDeviceRequest]) (*connect.Response[device.CreateDeviceResponse], error) {
|
||||
dev := deviceConvert.CreateDeviceRequestToModelsDevice(request.Msg)
|
||||
dev, err := d.API.DevicesCreate(ctx, dev)
|
||||
set := deviceConvert.CreateDeviceRequestToModelsDeviceSetter(request.Msg)
|
||||
dev, err := d.API.DevicesCreate(ctx, set)
|
||||
if err != nil {
|
||||
return nil, errs.IntoConnectError(err)
|
||||
}
|
||||
|
|
|
@ -27,13 +27,13 @@ type SubredditHandler struct {
|
|||
// - connect.CodeInvalidArgument if validation failed, e.g. Invalid schedule cron format.
|
||||
// - connect.CodeNotFound if the subreddit does not exist.
|
||||
func (su *SubredditHandler) CreateSubreddit(ctx context.Context, request *connect.Request[subreddits.CreateSubredditRequest]) (*connect.Response[subreddits.CreateSubredditResponse], error) {
|
||||
data := subredditConverter.CreateSubredditRequestToModelsSubreddit(request.Msg)
|
||||
data := subredditConverter.CreateSubredditRequestToModelsSubredditSetter(request.Msg)
|
||||
if err := su.API.SubredditCreate(ctx, data); err != nil {
|
||||
return nil, errs.IntoConnectError(err)
|
||||
}
|
||||
|
||||
resp := &subreddits.CreateSubredditResponse{
|
||||
Name: data.Name,
|
||||
Name: data.Name.GetOrZero(),
|
||||
}
|
||||
|
||||
return connect.NewResponse(resp), nil
|
||||
|
|
Loading…
Reference in a new issue