From 7e2cb047c27966ca0880e58682453dfea3e32f96 Mon Sep 17 00:00:00 2001 From: Tigor Hutasuhut Date: Thu, 25 Apr 2024 13:05:41 +0700 Subject: [PATCH] pubsub: added watermill integration --- .air.toml | 11 ++++++++--- api/api.go | 38 ++++++++++++++++++++++++++------------ bobgen.yaml | 2 ++ cli/serve.go | 14 ++++++++++---- db/db.go | 16 ++++++++++++---- go.mod | 5 +++-- go.sum | 18 ++++++++++++++++++ 7 files changed, 79 insertions(+), 25 deletions(-) diff --git a/.air.toml b/.air.toml index cab8d46..f7e4655 100644 --- a/.air.toml +++ b/.air.toml @@ -9,19 +9,24 @@ cmd = "go build -o ./tmp/main ." delay = 1000 exclude_dir = ["assets", "tmp", "vendor", "testdata", "node_modules"] exclude_file = [] -exclude_regex = ["_test.go", "db/queries/.*\\.go", "_templ.go", "models/.*\\.go"] +exclude_regex = [ + "_test.go", + "db/queries/.*\\.go", + "_templ.go", + "models/.*\\.go", +] exclude_unchanged = false follow_symlink = false full_bin = "" include_dir = [] include_ext = ["go", "tpl", "tmpl", "html", "sql", "templ"] include_file = [".env"] -kill_delay = "500ms" +kill_delay = "1000ms" log = "build-errors.log" poll = false poll_interval = 0 post_cmd = [] -pre_cmd = [ "make prepare" ] +pre_cmd = ["make prepare"] rerun = false rerun_delay = 500 send_interrupt = true diff --git a/api/api.go b/api/api.go index 6c73aed..037a402 100644 --- a/api/api.go +++ b/api/api.go @@ -5,6 +5,7 @@ import ( "database/sql" "fmt" "os" + "os/signal" "github.com/robfig/cron/v3" "github.com/stephenafamo/bob" @@ -19,6 +20,7 @@ import ( "github.com/ThreeDotsLabs/watermill" watermillSql "github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql" "github.com/ThreeDotsLabs/watermill/message" + watermillSqlite "github.com/walterwanderley/watermill-sqlite" ) type API struct { @@ -42,32 +44,41 @@ type API struct { } type Dependencies struct { - DB *sql.DB - Config *config.Config - Reddit *reddit.Reddit + DB *sql.DB + PubsubDB *sql.DB + Config *config.Config + Reddit *reddit.Reddit } -const downloadTopic = "subreddit.download" +const downloadTopic = "subreddit_download" + +var watermillLogger = watermill.NewStdLoggerWithOut(os.Stderr, false, false) func New(deps Dependencies) *API { ackDeadline := deps.Config.Duration("download.pubsub.ack.deadline") - subscriber, err := watermillSql.NewSubscriber(deps.DB, watermillSql.SubscriberConfig{ + subscriber, err := watermillSql.NewSubscriber(deps.PubsubDB, watermillSql.SubscriberConfig{ AckDeadline: &ackDeadline, - SchemaAdapter: watermillSql.DefaultPostgreSQLSchema{}, - OffsetsAdapter: watermillSql.DefaultPostgreSQLOffsetsAdapter{}, + SchemaAdapter: watermillSqlite.DefaultSQLiteSchema{}, + OffsetsAdapter: watermillSqlite.DefaultSQLiteOffsetsAdapter{}, InitializeSchema: true, - }, watermill.NewStdLoggerWithOut(os.Stderr, true, true)) + }, watermillLogger) if err != nil { panic(err) } - publisher, err := watermillSql.NewPublisher(deps.DB, watermillSql.PublisherConfig{ - SchemaAdapter: watermillSql.DefaultPostgreSQLSchema{}, + publisher, err := watermillSql.NewPublisher(deps.PubsubDB, watermillSql.PublisherConfig{ + SchemaAdapter: watermillSqlite.DefaultSQLiteSchema{}, AutoInitializeSchema: true, - }, watermill.NewStdLoggerWithOut(os.Stderr, true, true)) + }, watermillLogger) if err != nil { panic(err) } - return &API{ + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) + defer cancel() + ch, err := subscriber.Subscribe(ctx, downloadTopic) + if err != nil { + panic(err) + } + api := &API{ db: deps.DB, exec: bob.New(deps.DB), scheduler: cron.New(), @@ -80,6 +91,9 @@ func New(deps Dependencies) *API { subscriber: subscriber, publisher: publisher, } + + api.startSubredditDownloadPubsub(ch) + return api } func (api *API) StartScheduler(ctx context.Context) error { diff --git a/bobgen.yaml b/bobgen.yaml index 1aab4be..656a0c0 100644 --- a/bobgen.yaml +++ b/bobgen.yaml @@ -7,3 +7,5 @@ sqlite: dsn: "data.db" except: goose_db_version: + watermill_offset_subreddit_download: + watermill_subreddit_download: diff --git a/cli/serve.go b/cli/serve.go index 5ba18a8..f339b4b 100644 --- a/cli/serve.go +++ b/cli/serve.go @@ -28,21 +28,27 @@ var serveCmd = &cobra.Command{ } defer tele.Close() - db, err := db.Open(cfg) + database, err := db.Open(cfg) if err != nil { log.New(cmd.Context()).Err(err).Error("failed to connect database") os.Exit(1) } + pubsubDatabase, err := db.OpenSilent(cfg) + if err != nil { + log.New(cmd.Context()).Err(err).Error("failed to connect database") + os.Exit(1) + } red := &reddit.Reddit{ Client: http.DefaultClient, Config: cfg, } api := api.New(api.Dependencies{ - DB: db, - Config: cfg, - Reddit: red, + DB: database, + PubsubDB: pubsubDatabase, + Config: cfg, + Reddit: red, }) server := server.New(cfg, api, PublicDir) diff --git a/db/db.go b/db/db.go index 317b637..12d9602 100644 --- a/db/db.go +++ b/db/db.go @@ -17,6 +17,18 @@ import ( var Migrations fs.FS func Open(cfg *config.Config) (*sql.DB, error) { + dsn := cfg.String("db.string") + db, err := OpenSilent(cfg) + if err != nil { + return db, err + } + db = sqldblogger.OpenDriver(dsn, db.Driver(), sqlLogger{}, + sqldblogger.WithSQLQueryAsMessage(true), + ) + return db, err +} + +func OpenSilent(cfg *config.Config) (*sql.DB, error) { driver := cfg.String("db.driver") dsn := cfg.String("db.string") db, err := otelsql.Open(driver, dsn, otelsql.WithAttributes( @@ -38,9 +50,5 @@ func Open(cfg *config.Config) (*sql.DB, error) { return db, errs.Wrapw(err, "failed to migrate database", "dialect", driver) } } - - db = sqldblogger.OpenDriver(dsn, db.Driver(), sqlLogger{}, - sqldblogger.WithSQLQueryAsMessage(true), - ) return db, err } diff --git a/go.mod b/go.mod index 9fc4c1e..983fccf 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.22.1 require ( github.com/ThreeDotsLabs/watermill v1.3.5 + github.com/ThreeDotsLabs/watermill-sql/v3 v3.0.1 github.com/a-h/templ v0.2.648 github.com/aarondl/opt v0.0.0-20240108180805-338d04d857dc github.com/adrg/xdg v0.4.0 @@ -31,10 +32,10 @@ require ( github.com/spf13/pflag v1.0.5 github.com/stephenafamo/bob v0.25.0 github.com/teivah/broadcast v0.1.0 + github.com/walterwanderley/watermill-sqlite v0.0.0-20240320170051-3fffaf9a8167 ) require ( - github.com/ThreeDotsLabs/watermill-sql/v3 v3.0.1 // indirect github.com/aarondl/json v0.0.0-20221020222930-8b0db17ef1bf // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/google/uuid v1.6.0 // indirect @@ -79,7 +80,7 @@ require ( go.opentelemetry.io/otel/trace v1.25.0 go.uber.org/multierr v1.11.0 // indirect golang.org/x/exp v0.0.0-20231219180239-dc181d75b848 // indirect - golang.org/x/sync v0.6.0 // indirect + golang.org/x/sync v0.6.0 golang.org/x/sys v0.18.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 6772fdb..86296ea 100644 --- a/go.sum +++ b/go.sum @@ -108,10 +108,22 @@ github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4= github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8= +github.com/jackc/chunkreader/v2 v2.0.1/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= +github.com/jackc/pgconn v1.6.4 h1:S7T6cx5o2OqmxdHaXLH1ZeD1SbI8jBznyYE9Ec0RCQ8= +github.com/jackc/pgconn v1.6.4/go.mod h1:w2pne1C2tZgP+TvjqLpOigGzNqjBgQW9dUw/4Chex78= +github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE= +github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgproto3/v2 v2.0.2 h1:q1Hsy66zh4vuNsajBUF2PNqfAMMfxU5mk594lPE9vjY= +github.com/jackc/pgproto3/v2 v2.0.2/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgtype v1.4.2 h1:t+6LWm5eWPLX1H5Se702JSBcirq6uWa4jiG4wV1rAWY= +github.com/jackc/pgtype v1.4.2/go.mod h1:JCULISAZBFGrHaOXIIFiyfzW5VY0GRitRr8NeJsrdig= +github.com/jackc/pgx/v4 v4.8.1 h1:SUbCLP2pXvf/Sr/25KsuI4aTxiFYIvpfk4l6aTSdyCw= +github.com/jackc/pgx/v4 v4.8.1/go.mod h1:4HOLxrl8wToZJReD04/yB20GDwf4KBYETvlHciCnwW0= github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw= github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A= github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= @@ -150,6 +162,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/lib/pq v1.10.7 h1:p7ZhMD+KsSRozJr34udlUrhboJwWAgCg34+/ZZNvZZw= +github.com/lib/pq v1.10.7/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/libsql/sqlite-antlr4-parser v0.0.0-20230802215326-5cb5bb604475 h1:6PfEMwfInASh9hkN83aR0j4W/eKaAZt/AURtXAXlas0= github.com/libsql/sqlite-antlr4-parser v0.0.0-20230802215326-5cb5bb604475/go.mod h1:20nXSmcf0nAscrzqsXeC2/tA3KkV2eCiJqYuyAgl+ss= github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8= @@ -257,6 +271,8 @@ github.com/volatiletech/inflect v0.0.1 h1:2a6FcMQyhmPZcLa+uet3VJ8gLn/9svWhJxJYwv github.com/volatiletech/inflect v0.0.1/go.mod h1:IBti31tG6phkHitLlr5j7shC5SOo//x0AjDzaJU1PLA= github.com/volatiletech/strmangle v0.0.4 h1:CxrEPhobZL/PCZOTDSH1aq7s4Kv76hQpRoTVVlUOim4= github.com/volatiletech/strmangle v0.0.4/go.mod h1:ycDvbDkjDvhC0NUU8w3fWwl5JEMTV56vTKXzR3GeR+0= +github.com/walterwanderley/watermill-sqlite v0.0.0-20240320170051-3fffaf9a8167 h1:0oKJp4zW6HxKN6I1dLXJu5fz+Jwn0b9UKevpV7vDI2c= +github.com/walterwanderley/watermill-sqlite v0.0.0-20240320170051-3fffaf9a8167/go.mod h1:5EI8ouD5SfNdBka0Za07u1MfdYhZbdzy5sjsrtMWXCw= github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo= github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHovont7NscjpAxXsDA8S8BMYve8Y5+7cuRE7R0= @@ -311,6 +327,8 @@ golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.17.0 h1:FvmRgNOcs3kOa+T20R1uhfP9F6HgG2mfxDv1vrx1Htc= golang.org/x/tools v0.17.0/go.mod h1:xsh6VxdV005rRVaS6SSAf9oiAqljS7UZUacMZ8Bnsps= +golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= +golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de h1:F6qOa9AZTYJXOUEr4jDysRDLrm4PHePlge4v4TGAlxY= google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:VUhTRKeHn9wwcdrk73nvdC9gF178Tzhmt/qyaFcPLSo= google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de h1:jFNzHPIeuzhdRwVhbZdiym9q0ory/xY3sA+v2wPg8I0=