From af2fd593b489fa653a6efeba3cecf33abd23af9e Mon Sep 17 00:00:00 2001 From: Tigor Hutasuhut Date: Thu, 15 Aug 2024 20:39:04 +0700 Subject: [PATCH] api: implemented resolve subreddit name api --- go.mod | 1 + go.sum | 2 + go/api/api.go | 2 + go/api/subreddits_create.go | 10 +- go/api/subreddits_resolve.go | 76 ++++++++++ go/cmd/bluemage/serve/serve.go | 31 ++-- go/converts/subreddit.go | 6 + go/pkg/errs/errs.go | 9 ++ go/pkg/log/http_transport.go | 141 +++++++++++++++++++ go/server/subreddit_handlers.go | 23 ++- schemas/openapi/reddit.yaml | 48 ++----- schemas/proto/subreddits/v1/exist.proto | 16 +++ schemas/proto/subreddits/v1/resolve.proto | 2 + schemas/proto/subreddits/v1/subreddits.proto | 8 ++ 14 files changed, 323 insertions(+), 52 deletions(-) create mode 100644 go/api/subreddits_resolve.go create mode 100644 go/pkg/log/http_transport.go create mode 100644 schemas/proto/subreddits/v1/exist.proto diff --git a/go.mod b/go.mod index c2725b5..53f34cf 100644 --- a/go.mod +++ b/go.mod @@ -52,6 +52,7 @@ require ( github.com/XSAM/otelsql v0.32.0 // indirect github.com/aarondl/json v0.0.0-20221020222930-8b0db17ef1bf // indirect github.com/antlr4-go/antlr/v4 v4.13.0 // indirect + github.com/c2h5oh/datasize v0.0.0-20231215233829-aa82cc1e6500 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dlclark/regexp2 v1.11.0 // indirect diff --git a/go.sum b/go.sum index c92d61b..b75ec1e 100644 --- a/go.sum +++ b/go.sum @@ -28,6 +28,8 @@ github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8 github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g= github.com/bufbuild/protovalidate-go v0.6.3 h1:wxQyzW035zM16Binbaz/nWAzS12dRIXhZdSUWRY7Fv0= github.com/bufbuild/protovalidate-go v0.6.3/go.mod h1:J4PtwP9Z2YAGgB0+o+tTWEDtLtXvz/gfhFZD8pbzM/U= +github.com/c2h5oh/datasize v0.0.0-20231215233829-aa82cc1e6500 h1:6lhrsTEnloDPXyeZBvSYvQf8u86jbKehZPVDDlkgDl4= +github.com/c2h5oh/datasize v0.0.0-20231215233829-aa82cc1e6500/go.mod h1:S/7n9copUssQ56c7aAgHqftWO4LTf4xY6CGWt8Bc+3M= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= diff --git a/go/api/api.go b/go/api/api.go index 3b80fee..707622b 100644 --- a/go/api/api.go +++ b/go/api/api.go @@ -5,12 +5,14 @@ import ( "sync" "github.com/stephenafamo/bob" + "github.com/tigorlazuardi/bluemage/go/gen/reddit" ) type API struct { mu sync.Mutex Executor bob.Executor DB *sql.DB + Reddit *reddit.Client } func (api *API) lockf(f func()) { diff --git a/go/api/subreddits_create.go b/go/api/subreddits_create.go index cd2f7a5..c257bb7 100644 --- a/go/api/subreddits_create.go +++ b/go/api/subreddits_create.go @@ -8,19 +8,21 @@ import ( "github.com/mattn/go-sqlite3" "github.com/tigorlazuardi/bluemage/go/gen/models" "github.com/tigorlazuardi/bluemage/go/pkg/errs" + "github.com/tigorlazuardi/bluemage/go/pkg/telemetry" ) -func (api *API) SubredditCreate(ctx context.Context, subreddit *models.SubredditSetter) (err error) { - // TODO: add check to Reddit API to see if subreddit exists. +func (api *API) SubredditCreate(ctx context.Context, request *models.SubredditSetter) (err error) { + ctx, span := tracer.Start(ctx, "SubredditCreate") + defer func() { telemetry.EndWithStatus(span, err) }() api.lockf(func() { - _, err = models.Subreddits.Insert(ctx, api.Executor, subreddit) + _, err = models.Subreddits.Insert(ctx, api.Executor, request) }) if err != nil { if sqlite3err := new(sqlite3.Error); errors.As(err, &sqlite3err) { if sqlite3err.Code == sqlite3.ErrConstraint { return errs. - Wrapw(err, "subreddit already exists", "input", subreddit). + Wrapw(err, "subreddit already exists", "input", request). Code(connect.CodeAlreadyExists) } } diff --git a/go/api/subreddits_resolve.go b/go/api/subreddits_resolve.go new file mode 100644 index 0000000..d8454a7 --- /dev/null +++ b/go/api/subreddits_resolve.go @@ -0,0 +1,76 @@ +package api + +import ( + "context" + + "connectrpc.com/connect" + "github.com/tigorlazuardi/bluemage/go/gen/reddit" + "github.com/tigorlazuardi/bluemage/go/pkg/errs" + "github.com/tigorlazuardi/bluemage/go/pkg/log" + "github.com/tigorlazuardi/bluemage/go/pkg/telemetry" +) + +type SubredditResolveNameRequest struct { + Name string + Type string +} + +func (api *API) SubredditResolveName(ctx context.Context, request SubredditResolveNameRequest) (resolved string, err error) { + ctx, span := tracer.Start(ctx, "SubredditResolveName") + defer func() { telemetry.EndWithStatus(span, err) }() + + ctx, httplog := log.ContextWithRoundTripCollector(ctx) + typ := reddit.GetListingTypeR + if request.Type == "user" { + typ = reddit.GetListingTypeUser + } + + resp, err := api.Reddit.GetListing(ctx, reddit.GetListingParams{ + Type: typ, + Name: request.Name, + }) + if err != nil { + err = errs.Wrapw(err, "failed to get listing from reddit", "round_trip", httplog) + return resolved, err + } + + switch resp := resp.(type) { + case *reddit.GetListingForbidden: + err = errs. + Failw( + "subreddit is private", + "round_trip", httplog, + ). + Code(connect.CodePermissionDenied) + return resolved, err + case *reddit.GetListingTooManyRequests: + err = errs. + Failw( + "too many requests error response from reddit", + "round_trip", httplog, + ). + Code(connect.CodeResourceExhausted) + return resolved, err + case *reddit.ListingResponse: + if !isValidSubreddit(resp) { + err = errs. + Failf("subreddit '%s' of type '%s' seems to be empty or not valid", request.Name, request.Type). + Details("round_trip", httplog). + Code(connect.CodeNotFound) + return resolved, err + } + data := resp.Data.Children[0].Data + if request.Type == "user" { + return data.Author, nil + } + return data.Subreddit, nil + default: + err = errs. + Failw("unexpected response from reddit", "round_trip", httplog) + return resolved, err + } +} + +func isValidSubreddit(list *reddit.ListingResponse) bool { + return list.Data.After.Null && len(list.Data.Children) == 0 +} diff --git a/go/cmd/bluemage/serve/serve.go b/go/cmd/bluemage/serve/serve.go index 93997e6..6a12807 100644 --- a/go/cmd/bluemage/serve/serve.go +++ b/go/cmd/bluemage/serve/serve.go @@ -20,7 +20,9 @@ import ( "github.com/stephenafamo/bob" "github.com/tigorlazuardi/bluemage/go/api" "github.com/tigorlazuardi/bluemage/go/config" - "github.com/tigorlazuardi/bluemage/go/gen/proto/device/v1/v1connect" + v1DeviceConnect "github.com/tigorlazuardi/bluemage/go/gen/proto/device/v1/v1connect" + v1SubredditsConnect "github.com/tigorlazuardi/bluemage/go/gen/proto/subreddits/v1/v1connect" + "github.com/tigorlazuardi/bluemage/go/gen/reddit" "github.com/tigorlazuardi/bluemage/go/pkg/errs" "github.com/tigorlazuardi/bluemage/go/pkg/log" "github.com/tigorlazuardi/bluemage/go/pkg/telemetry" @@ -91,15 +93,17 @@ var Cmd = &cobra.Command{ ) db := bob.New(sqldb) + client, err := reddit.NewClient("https://reddit.com", reddit.WithClient(&http.Client{ + Transport: log.NewRoundTripper(http.DefaultTransport), + })) + if err != nil { + panic(err) + } + api := &api.API{ Executor: db, DB: sqldb, - } - - handler := &server.Server{ - DeviceHandler: server.DeviceHandler{ - API: api, - }, + Reddit: client, } validationInterceptor, err := validate.NewInterceptor() @@ -112,13 +116,20 @@ var Cmd = &cobra.Command{ return errs.Wrap(err, "failed to create otel interceptor") } - mux := http.NewServeMux() - mux.Handle(v1connect.NewDeviceServiceHandler(handler, connect.WithInterceptors( + interceptors := []connect.Interceptor{ validationInterceptor, otelInterceptor, server.ErrorMessageInterceptor(), server.LogInterceptor(), - ))) + } + + handlerOpts := []connect.HandlerOption{ + connect.WithInterceptors(interceptors...), + } + + mux := http.NewServeMux() + mux.Handle(v1DeviceConnect.NewDeviceServiceHandler(&server.DeviceHandler{API: api}, handlerOpts...)) + mux.Handle(v1SubredditsConnect.NewSubredditsServiceHandler(&server.SubredditHandler{API: api}, handlerOpts...)) server := &http.Server{ Addr: fmt.Sprintf("%s:%s", cfg.String("http.host"), cfg.String("http.port")), diff --git a/go/converts/subreddit.go b/go/converts/subreddit.go index 7ba0149..ffe67c3 100644 --- a/go/converts/subreddit.go +++ b/go/converts/subreddit.go @@ -51,6 +51,12 @@ type SubredditConverter interface { // goverter:ignore Name Type CoverImageID CreatedAt UpdatedAt ProtoSubredditSetterToBobSubredditSetter(*subreddits.SubredditSetter) *models.SubredditSetter + + // goverter:useZeroValueOnPointerInconsistency + ProtoCreateSubredditRequestToAPISubredditResolveNameRequest(*subreddits.CreateSubredditRequest) api.SubredditResolveNameRequest + + // goverter:useZeroValueOnPointerInconsistency + ProtoResolveSubredditNameRequestToAPISubredditResolveName(*subreddits.ResolveSubredditNameRequest) api.SubredditResolveNameRequest } func SubredditTypeToString(subType subreddits.SubredditType) string { diff --git a/go/pkg/errs/errs.go b/go/pkg/errs/errs.go index 10e7523..bec9759 100644 --- a/go/pkg/errs/errs.go +++ b/go/pkg/errs/errs.go @@ -220,6 +220,15 @@ func Failf(message string, args ...any) Error { } } +func Failw(message string, details ...any) Error { + return &Err{ + origin: errors.New(message), + caller: caller.New(3), + code: connect.CodeInternal, + details: details, + } +} + func IntoConnectError(err error) error { if err == nil { return nil diff --git a/go/pkg/log/http_transport.go b/go/pkg/log/http_transport.go new file mode 100644 index 0000000..21d1d49 --- /dev/null +++ b/go/pkg/log/http_transport.go @@ -0,0 +1,141 @@ +package log + +import ( + "bytes" + "context" + "encoding/json" + "io" + "log/slog" + "net/http" + "time" + + "github.com/c2h5oh/datasize" +) + +type RoundTripCollector struct { + Request *http.Request + // HTTP Response. May be nil. + Response *http.Response + Start time.Time + End time.Time + RequestBody *bytes.Buffer + ResponseBody *bytes.Buffer +} + +func (ht RoundTripCollector) LogValue() slog.Value { + values := make([]slog.Attr, 0, 4) + if !ht.Start.IsZero() { + values = append(values, slog.Time("start", ht.Start)) + } + if !ht.End.IsZero() { + values = append(values, slog.Time("end", ht.End)) + } + if !ht.Start.IsZero() && !ht.End.IsZero() { + values = append(values, slog.Duration("duration", ht.End.Sub(ht.Start))) + } + if ht.Request != nil { + vals := make([]slog.Attr, 0, 5) + vals = append(vals, slog.String("url", ht.Request.URL.String())) + vals = append(vals, slog.String("method", ht.Request.Method)) + headers := []slog.Attr{} + for k := range ht.Request.Header { + headers = append(headers, slog.String(k, ht.Request.Header.Get(k))) + } + if len(headers) > 0 { + vals = append(vals, slog.Attr{Key: "headers", Value: slog.GroupValue(headers...)}) + } + if ht.RequestBody.Len() > 0 { + cl := datasize.ByteSize(ht.RequestBody.Len()) + vals = append(vals, slog.String("content_length", cl.HumanReadable())) + if ht.Request.Header.Get("Content-Type") == "application/json" { + vals = append(vals, slog.Any("body", json.RawMessage(ht.RequestBody.Bytes()))) + } else { + vals = append(vals, slog.String("body", ht.RequestBody.String())) + } + } + values = append(values, slog.Attr{Key: "request", Value: slog.GroupValue(vals...)}) + } + if ht.Response != nil { + vals := make([]slog.Attr, 0, 4) + vals = append(vals, slog.Int("code", ht.Response.StatusCode)) + cl := datasize.ByteSize(ht.Response.ContentLength) + vals = append(vals, slog.String("content_length", cl.HumanReadable())) + headers := []slog.Attr{} + for k := range ht.Response.Header { + headers = append(headers, slog.String(k, ht.Response.Header.Get(k))) + } + if ht.ResponseBody.Len() > 0 { + if ht.Response.Header.Get("Content-Type") == "application/json" { + vals = append(vals, slog.Any("body", json.RawMessage(ht.ResponseBody.Bytes()))) + } else { + vals = append(vals, slog.String("body", ht.ResponseBody.String())) + } + } + values = append(values, slog.Attr{Key: "response", Value: slog.GroupValue(vals...)}) + } + return slog.GroupValue(values...) +} + +type httpLogCollectorKey struct{} + +// ContextWithRoundTripCollector injects an *HTTPLogCollector into given context. +func ContextWithRoundTripCollector(ctx context.Context) (context.Context, *RoundTripCollector) { + coll := &RoundTripCollector{} + return context.WithValue(ctx, httpLogCollectorKey{}, coll), coll +} + +// RoundTripCollectorFromContext gets an *HTTPLogCollector instance. +// +// Returns nil if not found. +func RoundTripCollectorFromContext(ctx context.Context) *RoundTripCollector { + coll, _ := ctx.Value(httpLogCollectorKey{}).(*RoundTripCollector) + return coll +} + +type RoundTripper struct { + Next http.RoundTripper +} + +// NewRoundTripper creates a new http log collector round tripper. +// +// If next is nil, uses http.DefaultTransport instead. +func NewRoundTripper(next http.RoundTripper) *RoundTripper { + if next == nil { + next = http.DefaultTransport + } + return &RoundTripper{next} +} + +type bodyCloser struct { + io.Reader + close func() error +} + +func (b bodyCloser) Close() error { + return b.close() +} + +// RoundTrip implements http.RoundTripper +func (ht *RoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + coll := RoundTripCollectorFromContext(req.Context()) + if coll == nil { + return ht.Next.RoundTrip(req) + } + coll.RequestBody = new(bytes.Buffer) + coll.ResponseBody = new(bytes.Buffer) + coll.Request = req + if coll.Request.Body != nil { + tee := io.TeeReader(coll.Request.Body, coll.RequestBody) + req.Body = bodyCloser{tee, req.Body.Close} + } + coll.Start = time.Now() + resp, err := ht.Next.RoundTrip(req) + coll.End = time.Now() + coll.Response = resp + if resp != nil { + coll.Request = resp.Request + tee := io.TeeReader(resp.Body, coll.ResponseBody) + resp.Body = bodyCloser{tee, resp.Body.Close} + } + return resp, err +} diff --git a/go/server/subreddit_handlers.go b/go/server/subreddit_handlers.go index 6a95f8c..5913544 100644 --- a/go/server/subreddit_handlers.go +++ b/go/server/subreddit_handlers.go @@ -41,13 +41,20 @@ func (su *SubredditHandler) CountSubreddits(ctx context.Context, request *connec // - connect.CodeInvalidArgument if validation failed, e.g. Invalid schedule cron format. // - connect.CodeNotFound if the subreddit does not exist. func (su *SubredditHandler) CreateSubreddit(ctx context.Context, request *connect.Request[subreddits.CreateSubredditRequest]) (*connect.Response[subreddits.CreateSubredditResponse], error) { + resolveRequest := subredditConverter.ProtoCreateSubredditRequestToAPISubredditResolveNameRequest(request.Msg) + resolved, err := su.API.SubredditResolveName(ctx, resolveRequest) + if err != nil { + return nil, errs.IntoConnectError(err) + } + request.Msg.Name = resolved + data := subredditConverter.CreateSubredditRequestToModelsSubredditSetter(request.Msg) if err := su.API.SubredditCreate(ctx, data); err != nil { return nil, errs.IntoConnectError(err) } resp := &subreddits.CreateSubredditResponse{ - Name: data.Name.GetOrZero(), + Name: resolved, } return connect.NewResponse(resp), nil @@ -101,7 +108,7 @@ func (su *SubredditHandler) UpdateSubreddit(ctx context.Context, request *connec // DeleteSubreddit deletes a subreddit. // // Returns error with connect.CodeNotFound if subreddit does not exist. -func (su *SubredditHandler) DeleteSubreddit(_ context.Context, _ *connect.Request[subreddits.DeleteSubredditRequest]) (*connect.Response[subreddits.DeleteSubredditResponse], error) { +func (su *SubredditHandler) DeleteSubreddit(ctx context.Context, request *connect.Request[subreddits.DeleteSubredditRequest]) (*connect.Response[subreddits.DeleteSubredditResponse], error) { panic("not implemented") // TODO: Implement } @@ -112,6 +119,14 @@ func (su *SubredditHandler) DeleteSubreddit(_ context.Context, _ *connect.Reques // // Returns error with connect.CodeNotFound if subreddit does not exist. // So this rpc endpoint also acts to check subreddit's existence. -func (su *SubredditHandler) ResolveSubredditName(_ context.Context, _ *connect.Request[subreddits.ResolveSubredditNameRequest]) (*connect.Response[subreddits.ResolveSubredditNameResponse], error) { - panic("not implemented") // TODO: Implement +func (su *SubredditHandler) ResolveSubredditName(ctx context.Context, request *connect.Request[subreddits.ResolveSubredditNameRequest]) (*connect.Response[subreddits.ResolveSubredditNameResponse], error) { + req := subredditConverter.ProtoResolveSubredditNameRequestToAPISubredditResolveName(request.Msg) + resolved, err := su.API.SubredditResolveName(ctx, req) + if err != nil { + return nil, errs.IntoConnectError(err) + } + resp := &subreddits.ResolveSubredditNameResponse{ + Resolved: resolved, + } + return connect.NewResponse(resp), nil } diff --git a/schemas/openapi/reddit.yaml b/schemas/openapi/reddit.yaml index 8b446fe..8ff56cd 100644 --- a/schemas/openapi/reddit.yaml +++ b/schemas/openapi/reddit.yaml @@ -86,11 +86,9 @@ paths: description: |- `after` can be filled with post `name`. - Easiest to find this value is in the `response body` on - `data.after`. + Easiest to find this value is in the `response body` on `data.after`. - `after` tells Reddit to look up posts after - this value. + `after` tells Reddit to look up posts after this value. `after` cannot be used together with `before`. - in: query @@ -114,11 +112,6 @@ paths: Maximum value to fetch is `100`. example: 25 - - in: header - name: User-Agent - schema: - type: string - default: bluemage/v1 operationId: getListing responses: "200": @@ -133,6 +126,12 @@ paths: application/json: schema: type: object + "403": + description: Forbidden + content: + application/json: + schema: + type: object components: schemas: @@ -185,44 +184,25 @@ components: items: $ref: "#/components/schemas/ListingChildData" ListingChildData: - oneOf: - - $ref: "#/components/schemas/T1" - - $ref: "#/components/schemas/T3" - T1: type: object - description: |- - This is listed because this item type is a possibility. - - `t1` item type is unwanted for fetching image posts since - it only contains comment data. + required: + - kind + - data properties: kind: type: string enum: - t1 - T3: - type: object - description: |- - `t3` item type is a post (link) data. - - This is the item type to look for image posts. - - Not all fields are listed here, only fields that are relevant are - listed to reduce deserializing errors. - properties: - kind: - type: string - enum: - t3 data: - $ref: "#/components/schemas/T3Data" - - T3Data: + $ref: "#/components/schemas/ChildData" + ChildData: type: object required: - subreddit - title - name + - author properties: subreddit: type: string diff --git a/schemas/proto/subreddits/v1/exist.proto b/schemas/proto/subreddits/v1/exist.proto new file mode 100644 index 0000000..2857890 --- /dev/null +++ b/schemas/proto/subreddits/v1/exist.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; + +package subreddits.v1; + +import "buf/validate/validate.proto"; + +option go_package = "github.com/tigorlazuardi/bluemage/go/gen/proto/subreddits/v1"; + +message SubredditExistsRequest { + // name of the subreddit. Case insensitive. + string name = 1 [(buf.validate.field).string.min_len = 1]; +} + +message SubredditExistsResponse { + bool exist = 1; +} diff --git a/schemas/proto/subreddits/v1/resolve.proto b/schemas/proto/subreddits/v1/resolve.proto index 4e7b832..39f2580 100644 --- a/schemas/proto/subreddits/v1/resolve.proto +++ b/schemas/proto/subreddits/v1/resolve.proto @@ -3,12 +3,14 @@ syntax = "proto3"; package subreddits.v1; import "buf/validate/validate.proto"; +import "subreddits/v1/types.proto"; option go_package = "github.com/tigorlazuardi/bluemage/go/gen/proto/subreddits/v1"; message ResolveSubredditNameRequest { // name of the subreddit to resolve (check existence and casing). string name = 1 [(buf.validate.field).string.min_len = 1]; + SubredditType type = 2; } message ResolveSubredditNameResponse { diff --git a/schemas/proto/subreddits/v1/subreddits.proto b/schemas/proto/subreddits/v1/subreddits.proto index 0fbecaa..a090e5c 100644 --- a/schemas/proto/subreddits/v1/subreddits.proto +++ b/schemas/proto/subreddits/v1/subreddits.proto @@ -5,6 +5,7 @@ package subreddits.v1; import "subreddits/v1/count.proto"; import "subreddits/v1/create.proto"; import "subreddits/v1/delete.proto"; +import "subreddits/v1/exist.proto"; import "subreddits/v1/get.proto"; import "subreddits/v1/list.proto"; import "subreddits/v1/resolve.proto"; @@ -56,4 +57,11 @@ service SubredditsService { // // Default values count all. rpc CountSubreddits(CountSubredditsRequest) returns (CountSubredditsResponse); + + // SubredditExists checks if the subreddits already handled in database. + // + // THIS ENDPOINT DOES NOT CALL REDDIT API, ONLY DATABASE. + // + // Use ResolveSubredditName to check if subreddit actually exists in Reddit. + rpc SubredditExists(SubredditExistsRequest) returns (SubredditExistsResponse); }