diff --git a/api/api.go b/api/api.go index 342fe38..8e9880a 100644 --- a/api/api.go +++ b/api/api.go @@ -17,6 +17,7 @@ import ( "github.com/tigorlazuardi/redmage/pkg/errs" "github.com/tigorlazuardi/redmage/pkg/log" + "github.com/ThreeDotsLabs/watermill" watermillSql "github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql" "github.com/ThreeDotsLabs/watermill/message" watermillSqlite "github.com/walterwanderley/watermill-sqlite" @@ -89,12 +90,15 @@ func New(deps Dependencies) *API { publisher: publisher, } - api.startSubredditDownloadPubsub(ch) + if err := api.StartScheduler(ctx); err != nil { + panic(err) + } + api.StartSubredditDownloadPubsub(ch) return api } func (api *API) StartScheduler(ctx context.Context) error { - subreddits, err := models.Subreddits.Query(ctx, api.db, nil).All() + subreddits, err := models.Subreddits.Query(ctx, api.db, models.SelectWhere.Subreddits.Enable.EQ(1)).All() if err != nil { return errs.Wrapw(err, "failed to get all subreddits") } @@ -115,6 +119,7 @@ func (api *API) StartScheduler(ctx context.Context) error { func (api *API) scheduleSubreddit(subreddit *models.Subreddit) error { id, err := api.scheduler.AddFunc(subreddit.Schedule, func() { + _ = api.publisher.Publish(downloadTopic, message.NewMessage(watermill.NewUUID(), []byte(subreddit.Name))) }) if err != nil { return errs.Wrap(err) diff --git a/api/pubsub_download.go b/api/pubsub_download.go index 295b0de..c5abb42 100644 --- a/api/pubsub_download.go +++ b/api/pubsub_download.go @@ -11,7 +11,7 @@ import ( "go.opentelemetry.io/otel/attribute" ) -func (api *API) startSubredditDownloadPubsub(messages <-chan *message.Message) { +func (api *API) StartSubredditDownloadPubsub(messages <-chan *message.Message) { for msg := range messages { api.subredditSemaphore <- struct{}{} go func(msg *message.Message) {