pubsub: added watermill integration

This commit is contained in:
Tigor Hutasuhut 2024-04-25 13:05:41 +07:00
parent db432f4dc9
commit 7e2cb047c2
7 changed files with 79 additions and 25 deletions

View file

@ -9,14 +9,19 @@ cmd = "go build -o ./tmp/main ."
delay = 1000 delay = 1000
exclude_dir = ["assets", "tmp", "vendor", "testdata", "node_modules"] exclude_dir = ["assets", "tmp", "vendor", "testdata", "node_modules"]
exclude_file = [] exclude_file = []
exclude_regex = ["_test.go", "db/queries/.*\\.go", "_templ.go", "models/.*\\.go"] exclude_regex = [
"_test.go",
"db/queries/.*\\.go",
"_templ.go",
"models/.*\\.go",
]
exclude_unchanged = false exclude_unchanged = false
follow_symlink = false follow_symlink = false
full_bin = "" full_bin = ""
include_dir = [] include_dir = []
include_ext = ["go", "tpl", "tmpl", "html", "sql", "templ"] include_ext = ["go", "tpl", "tmpl", "html", "sql", "templ"]
include_file = [".env"] include_file = [".env"]
kill_delay = "500ms" kill_delay = "1000ms"
log = "build-errors.log" log = "build-errors.log"
poll = false poll = false
poll_interval = 0 poll_interval = 0

View file

@ -5,6 +5,7 @@ import (
"database/sql" "database/sql"
"fmt" "fmt"
"os" "os"
"os/signal"
"github.com/robfig/cron/v3" "github.com/robfig/cron/v3"
"github.com/stephenafamo/bob" "github.com/stephenafamo/bob"
@ -19,6 +20,7 @@ import (
"github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill"
watermillSql "github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql" watermillSql "github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql"
"github.com/ThreeDotsLabs/watermill/message" "github.com/ThreeDotsLabs/watermill/message"
watermillSqlite "github.com/walterwanderley/watermill-sqlite"
) )
type API struct { type API struct {
@ -43,31 +45,40 @@ type API struct {
type Dependencies struct { type Dependencies struct {
DB *sql.DB DB *sql.DB
PubsubDB *sql.DB
Config *config.Config Config *config.Config
Reddit *reddit.Reddit Reddit *reddit.Reddit
} }
const downloadTopic = "subreddit.download" const downloadTopic = "subreddit_download"
var watermillLogger = watermill.NewStdLoggerWithOut(os.Stderr, false, false)
func New(deps Dependencies) *API { func New(deps Dependencies) *API {
ackDeadline := deps.Config.Duration("download.pubsub.ack.deadline") ackDeadline := deps.Config.Duration("download.pubsub.ack.deadline")
subscriber, err := watermillSql.NewSubscriber(deps.DB, watermillSql.SubscriberConfig{ subscriber, err := watermillSql.NewSubscriber(deps.PubsubDB, watermillSql.SubscriberConfig{
AckDeadline: &ackDeadline, AckDeadline: &ackDeadline,
SchemaAdapter: watermillSql.DefaultPostgreSQLSchema{}, SchemaAdapter: watermillSqlite.DefaultSQLiteSchema{},
OffsetsAdapter: watermillSql.DefaultPostgreSQLOffsetsAdapter{}, OffsetsAdapter: watermillSqlite.DefaultSQLiteOffsetsAdapter{},
InitializeSchema: true, InitializeSchema: true,
}, watermill.NewStdLoggerWithOut(os.Stderr, true, true)) }, watermillLogger)
if err != nil { if err != nil {
panic(err) panic(err)
} }
publisher, err := watermillSql.NewPublisher(deps.DB, watermillSql.PublisherConfig{ publisher, err := watermillSql.NewPublisher(deps.PubsubDB, watermillSql.PublisherConfig{
SchemaAdapter: watermillSql.DefaultPostgreSQLSchema{}, SchemaAdapter: watermillSqlite.DefaultSQLiteSchema{},
AutoInitializeSchema: true, AutoInitializeSchema: true,
}, watermill.NewStdLoggerWithOut(os.Stderr, true, true)) }, watermillLogger)
if err != nil { if err != nil {
panic(err) panic(err)
} }
return &API{ ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
ch, err := subscriber.Subscribe(ctx, downloadTopic)
if err != nil {
panic(err)
}
api := &API{
db: deps.DB, db: deps.DB,
exec: bob.New(deps.DB), exec: bob.New(deps.DB),
scheduler: cron.New(), scheduler: cron.New(),
@ -80,6 +91,9 @@ func New(deps Dependencies) *API {
subscriber: subscriber, subscriber: subscriber,
publisher: publisher, publisher: publisher,
} }
api.startSubredditDownloadPubsub(ch)
return api
} }
func (api *API) StartScheduler(ctx context.Context) error { func (api *API) StartScheduler(ctx context.Context) error {

View file

@ -7,3 +7,5 @@ sqlite:
dsn: "data.db" dsn: "data.db"
except: except:
goose_db_version: goose_db_version:
watermill_offset_subreddit_download:
watermill_subreddit_download:

View file

@ -28,19 +28,25 @@ var serveCmd = &cobra.Command{
} }
defer tele.Close() defer tele.Close()
db, err := db.Open(cfg) database, err := db.Open(cfg)
if err != nil { if err != nil {
log.New(cmd.Context()).Err(err).Error("failed to connect database") log.New(cmd.Context()).Err(err).Error("failed to connect database")
os.Exit(1) os.Exit(1)
} }
pubsubDatabase, err := db.OpenSilent(cfg)
if err != nil {
log.New(cmd.Context()).Err(err).Error("failed to connect database")
os.Exit(1)
}
red := &reddit.Reddit{ red := &reddit.Reddit{
Client: http.DefaultClient, Client: http.DefaultClient,
Config: cfg, Config: cfg,
} }
api := api.New(api.Dependencies{ api := api.New(api.Dependencies{
DB: db, DB: database,
PubsubDB: pubsubDatabase,
Config: cfg, Config: cfg,
Reddit: red, Reddit: red,
}) })

View file

@ -17,6 +17,18 @@ import (
var Migrations fs.FS var Migrations fs.FS
func Open(cfg *config.Config) (*sql.DB, error) { func Open(cfg *config.Config) (*sql.DB, error) {
dsn := cfg.String("db.string")
db, err := OpenSilent(cfg)
if err != nil {
return db, err
}
db = sqldblogger.OpenDriver(dsn, db.Driver(), sqlLogger{},
sqldblogger.WithSQLQueryAsMessage(true),
)
return db, err
}
func OpenSilent(cfg *config.Config) (*sql.DB, error) {
driver := cfg.String("db.driver") driver := cfg.String("db.driver")
dsn := cfg.String("db.string") dsn := cfg.String("db.string")
db, err := otelsql.Open(driver, dsn, otelsql.WithAttributes( db, err := otelsql.Open(driver, dsn, otelsql.WithAttributes(
@ -38,9 +50,5 @@ func Open(cfg *config.Config) (*sql.DB, error) {
return db, errs.Wrapw(err, "failed to migrate database", "dialect", driver) return db, errs.Wrapw(err, "failed to migrate database", "dialect", driver)
} }
} }
db = sqldblogger.OpenDriver(dsn, db.Driver(), sqlLogger{},
sqldblogger.WithSQLQueryAsMessage(true),
)
return db, err return db, err
} }

5
go.mod
View file

@ -4,6 +4,7 @@ go 1.22.1
require ( require (
github.com/ThreeDotsLabs/watermill v1.3.5 github.com/ThreeDotsLabs/watermill v1.3.5
github.com/ThreeDotsLabs/watermill-sql/v3 v3.0.1
github.com/a-h/templ v0.2.648 github.com/a-h/templ v0.2.648
github.com/aarondl/opt v0.0.0-20240108180805-338d04d857dc github.com/aarondl/opt v0.0.0-20240108180805-338d04d857dc
github.com/adrg/xdg v0.4.0 github.com/adrg/xdg v0.4.0
@ -31,10 +32,10 @@ require (
github.com/spf13/pflag v1.0.5 github.com/spf13/pflag v1.0.5
github.com/stephenafamo/bob v0.25.0 github.com/stephenafamo/bob v0.25.0
github.com/teivah/broadcast v0.1.0 github.com/teivah/broadcast v0.1.0
github.com/walterwanderley/watermill-sqlite v0.0.0-20240320170051-3fffaf9a8167
) )
require ( require (
github.com/ThreeDotsLabs/watermill-sql/v3 v3.0.1 // indirect
github.com/aarondl/json v0.0.0-20221020222930-8b0db17ef1bf // indirect github.com/aarondl/json v0.0.0-20221020222930-8b0db17ef1bf // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/google/uuid v1.6.0 // indirect github.com/google/uuid v1.6.0 // indirect
@ -79,7 +80,7 @@ require (
go.opentelemetry.io/otel/trace v1.25.0 go.opentelemetry.io/otel/trace v1.25.0
go.uber.org/multierr v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect
golang.org/x/exp v0.0.0-20231219180239-dc181d75b848 // indirect golang.org/x/exp v0.0.0-20231219180239-dc181d75b848 // indirect
golang.org/x/sync v0.6.0 // indirect golang.org/x/sync v0.6.0
golang.org/x/sys v0.18.0 // indirect golang.org/x/sys v0.18.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
) )

18
go.sum
View file

@ -108,10 +108,22 @@ github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4=
github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY= github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8=
github.com/jackc/chunkreader/v2 v2.0.1/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk=
github.com/jackc/pgconn v1.6.4 h1:S7T6cx5o2OqmxdHaXLH1ZeD1SbI8jBznyYE9Ec0RCQ8=
github.com/jackc/pgconn v1.6.4/go.mod h1:w2pne1C2tZgP+TvjqLpOigGzNqjBgQW9dUw/4Chex78=
github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE=
github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgproto3/v2 v2.0.2 h1:q1Hsy66zh4vuNsajBUF2PNqfAMMfxU5mk594lPE9vjY=
github.com/jackc/pgproto3/v2 v2.0.2/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgtype v1.4.2 h1:t+6LWm5eWPLX1H5Se702JSBcirq6uWa4jiG4wV1rAWY=
github.com/jackc/pgtype v1.4.2/go.mod h1:JCULISAZBFGrHaOXIIFiyfzW5VY0GRitRr8NeJsrdig=
github.com/jackc/pgx/v4 v4.8.1 h1:SUbCLP2pXvf/Sr/25KsuI4aTxiFYIvpfk4l6aTSdyCw=
github.com/jackc/pgx/v4 v4.8.1/go.mod h1:4HOLxrl8wToZJReD04/yB20GDwf4KBYETvlHciCnwW0=
github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw= github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw=
github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A= github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
@ -150,6 +162,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lib/pq v1.10.7 h1:p7ZhMD+KsSRozJr34udlUrhboJwWAgCg34+/ZZNvZZw=
github.com/lib/pq v1.10.7/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/libsql/sqlite-antlr4-parser v0.0.0-20230802215326-5cb5bb604475 h1:6PfEMwfInASh9hkN83aR0j4W/eKaAZt/AURtXAXlas0= github.com/libsql/sqlite-antlr4-parser v0.0.0-20230802215326-5cb5bb604475 h1:6PfEMwfInASh9hkN83aR0j4W/eKaAZt/AURtXAXlas0=
github.com/libsql/sqlite-antlr4-parser v0.0.0-20230802215326-5cb5bb604475/go.mod h1:20nXSmcf0nAscrzqsXeC2/tA3KkV2eCiJqYuyAgl+ss= github.com/libsql/sqlite-antlr4-parser v0.0.0-20230802215326-5cb5bb604475/go.mod h1:20nXSmcf0nAscrzqsXeC2/tA3KkV2eCiJqYuyAgl+ss=
github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8= github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8=
@ -257,6 +271,8 @@ github.com/volatiletech/inflect v0.0.1 h1:2a6FcMQyhmPZcLa+uet3VJ8gLn/9svWhJxJYwv
github.com/volatiletech/inflect v0.0.1/go.mod h1:IBti31tG6phkHitLlr5j7shC5SOo//x0AjDzaJU1PLA= github.com/volatiletech/inflect v0.0.1/go.mod h1:IBti31tG6phkHitLlr5j7shC5SOo//x0AjDzaJU1PLA=
github.com/volatiletech/strmangle v0.0.4 h1:CxrEPhobZL/PCZOTDSH1aq7s4Kv76hQpRoTVVlUOim4= github.com/volatiletech/strmangle v0.0.4 h1:CxrEPhobZL/PCZOTDSH1aq7s4Kv76hQpRoTVVlUOim4=
github.com/volatiletech/strmangle v0.0.4/go.mod h1:ycDvbDkjDvhC0NUU8w3fWwl5JEMTV56vTKXzR3GeR+0= github.com/volatiletech/strmangle v0.0.4/go.mod h1:ycDvbDkjDvhC0NUU8w3fWwl5JEMTV56vTKXzR3GeR+0=
github.com/walterwanderley/watermill-sqlite v0.0.0-20240320170051-3fffaf9a8167 h1:0oKJp4zW6HxKN6I1dLXJu5fz+Jwn0b9UKevpV7vDI2c=
github.com/walterwanderley/watermill-sqlite v0.0.0-20240320170051-3fffaf9a8167/go.mod h1:5EI8ouD5SfNdBka0Za07u1MfdYhZbdzy5sjsrtMWXCw=
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo= github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo=
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHovont7NscjpAxXsDA8S8BMYve8Y5+7cuRE7R0= github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHovont7NscjpAxXsDA8S8BMYve8Y5+7cuRE7R0=
@ -311,6 +327,8 @@ golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/tools v0.17.0 h1:FvmRgNOcs3kOa+T20R1uhfP9F6HgG2mfxDv1vrx1Htc= golang.org/x/tools v0.17.0 h1:FvmRgNOcs3kOa+T20R1uhfP9F6HgG2mfxDv1vrx1Htc=
golang.org/x/tools v0.17.0/go.mod h1:xsh6VxdV005rRVaS6SSAf9oiAqljS7UZUacMZ8Bnsps= golang.org/x/tools v0.17.0/go.mod h1:xsh6VxdV005rRVaS6SSAf9oiAqljS7UZUacMZ8Bnsps=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de h1:F6qOa9AZTYJXOUEr4jDysRDLrm4PHePlge4v4TGAlxY= google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de h1:F6qOa9AZTYJXOUEr4jDysRDLrm4PHePlge4v4TGAlxY=
google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:VUhTRKeHn9wwcdrk73nvdC9gF178Tzhmt/qyaFcPLSo= google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:VUhTRKeHn9wwcdrk73nvdC9gF178Tzhmt/qyaFcPLSo=
google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de h1:jFNzHPIeuzhdRwVhbZdiym9q0ory/xY3sA+v2wPg8I0= google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de h1:jFNzHPIeuzhdRwVhbZdiym9q0ory/xY3sA+v2wPg8I0=