From 86c3a0d699d0290dcb2ee9957795479b65ef3153 Mon Sep 17 00:00:00 2001 From: Tigor Hutasuhut Date: Thu, 25 Apr 2024 13:52:38 +0700 Subject: [PATCH] api: started cron scheduler --- api/api.go | 9 +++++++-- api/pubsub_download.go | 2 +- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/api/api.go b/api/api.go index 342fe38..8e9880a 100644 --- a/api/api.go +++ b/api/api.go @@ -17,6 +17,7 @@ import ( "github.com/tigorlazuardi/redmage/pkg/errs" "github.com/tigorlazuardi/redmage/pkg/log" + "github.com/ThreeDotsLabs/watermill" watermillSql "github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql" "github.com/ThreeDotsLabs/watermill/message" watermillSqlite "github.com/walterwanderley/watermill-sqlite" @@ -89,12 +90,15 @@ func New(deps Dependencies) *API { publisher: publisher, } - api.startSubredditDownloadPubsub(ch) + if err := api.StartScheduler(ctx); err != nil { + panic(err) + } + api.StartSubredditDownloadPubsub(ch) return api } func (api *API) StartScheduler(ctx context.Context) error { - subreddits, err := models.Subreddits.Query(ctx, api.db, nil).All() + subreddits, err := models.Subreddits.Query(ctx, api.db, models.SelectWhere.Subreddits.Enable.EQ(1)).All() if err != nil { return errs.Wrapw(err, "failed to get all subreddits") } @@ -115,6 +119,7 @@ func (api *API) StartScheduler(ctx context.Context) error { func (api *API) scheduleSubreddit(subreddit *models.Subreddit) error { id, err := api.scheduler.AddFunc(subreddit.Schedule, func() { + _ = api.publisher.Publish(downloadTopic, message.NewMessage(watermill.NewUUID(), []byte(subreddit.Name))) }) if err != nil { return errs.Wrap(err) diff --git a/api/pubsub_download.go b/api/pubsub_download.go index 295b0de..c5abb42 100644 --- a/api/pubsub_download.go +++ b/api/pubsub_download.go @@ -11,7 +11,7 @@ import ( "go.opentelemetry.io/otel/attribute" ) -func (api *API) startSubredditDownloadPubsub(messages <-chan *message.Message) { +func (api *API) StartSubredditDownloadPubsub(messages <-chan *message.Message) { for msg := range messages { api.subredditSemaphore <- struct{}{} go func(msg *message.Message) {