From ee2e307f6a8df2bf967f99688be5c078522db168 Mon Sep 17 00:00:00 2001 From: Tigor Hutasuhut Date: Tue, 14 May 2024 12:08:34 +0700 Subject: [PATCH] api: integrated scheduler --- api/api.go | 68 +++++++++++++++---------------------------- api/scheduler.go | 76 ------------------------------------------------ 2 files changed, 23 insertions(+), 121 deletions(-) delete mode 100644 api/scheduler.go diff --git a/api/api.go b/api/api.go index 181fe09..a2d5b6f 100644 --- a/api/api.go +++ b/api/api.go @@ -3,20 +3,16 @@ package api import ( "context" "database/sql" - "encoding/json" - "fmt" - "github.com/robfig/cron/v3" "github.com/stephenafamo/bob" "github.com/teivah/broadcast" "github.com/tigorlazuardi/redmage/api/bmessage" "github.com/tigorlazuardi/redmage/api/reddit" + "github.com/tigorlazuardi/redmage/api/scheduler" "github.com/tigorlazuardi/redmage/config" - "github.com/tigorlazuardi/redmage/models" - "github.com/tigorlazuardi/redmage/pkg/errs" "github.com/tigorlazuardi/redmage/pkg/log" + "go.opentelemetry.io/otel/attribute" - "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" ) @@ -24,9 +20,7 @@ type API struct { db bob.Executor sqldb *sql.DB - scheduleStopper func() - scheduler *cron.Cron - scheduleMap map[cron.EntryID]*models.Subreddit + scheduler *scheduler.Scheduler downloadBroadcast *broadcast.Relay[bmessage.ImageDownloadMessage] @@ -55,11 +49,10 @@ func New(deps Dependencies) *API { if err != nil { panic(err) } + api := &API{ db: bob.New(deps.DB), sqldb: deps.DB, - scheduler: cron.New(), - scheduleMap: make(map[cron.EntryID]*models.Subreddit, 8), downloadBroadcast: broadcast.NewRelay[bmessage.ImageDownloadMessage](), config: deps.Config, imageSemaphore: make(chan struct{}, deps.Config.Int("download.concurrency.images")), @@ -68,44 +61,29 @@ func New(deps Dependencies) *API { publisher: deps.Publisher, } - api.scheduleStopper = api.startScheduler() + api.scheduler = scheduler.New(api.scheduleRun) + + if err := api.scheduler.Sync(context.Background(), api.db); err != nil { + panic(err) + } + api.scheduler.Start() + go api.StartSubredditDownloadPubsub(ch) return api } -func (api *API) StartScheduler(ctx context.Context) error { - subreddits, err := models.Subreddits.Query(ctx, api.db, models.SelectWhere.Subreddits.EnableSchedule.EQ(1)).All() +func (api *API) scheduleRun(subreddit string) { + ctx, cancel := context.WithTimeout(context.Background(), api.config.Duration("scheduler.timeout")) + defer cancel() + + ctx, span := tracer.Start(ctx, "*API.scheduleRun") + defer span.End() + span.SetAttributes(attribute.String("subreddit", subreddit)) + + log.New(ctx).Info("api: schedule run", "subreddit", subreddit) + + err := api.PubsubStartDownloadSubreddit(ctx, PubsubStartDownloadSubredditParams{Subreddit: subreddit}) if err != nil { - return errs.Wrapw(err, "failed to get all subreddits") + log.New(ctx).Err(err).Error("api: failed to start download subreddit", "subreddit", subreddit) } - - for _, subreddit := range subreddits { - err := api.scheduleSubreddit(subreddit) - if err != nil { - log.New(ctx).Err(err).Error( - fmt.Sprintf("failed to start scheduler for subreddit '%s'", subreddit.Name), - "subreddit", subreddit, - ) - continue - } - } - - return nil -} - -func (api *API) scheduleSubreddit(subreddit *models.Subreddit) error { - id, err := api.scheduler.AddFunc(subreddit.Schedule, func() { - payload, _ := json.Marshal(subreddit) - _ = api.publisher.Publish(downloadTopic, message.NewMessage(watermill.NewUUID(), payload)) - }) - if err != nil { - return errs.Wrap(err) - } - api.scheduleMap[id] = subreddit - - return nil -} - -func (api *API) Close() { - api.scheduleStopper() } diff --git a/api/scheduler.go b/api/scheduler.go deleted file mode 100644 index 130a8db..0000000 --- a/api/scheduler.go +++ /dev/null @@ -1,76 +0,0 @@ -package api - -import ( - "context" - "time" - - "github.com/robfig/cron/v3" - "github.com/tigorlazuardi/redmage/models" - "github.com/tigorlazuardi/redmage/pkg/log" -) - -func (api *API) startScheduler() func() { - now := time.Now() - - stop := make(chan struct{}) - - parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor) - var ticker *time.Ticker - - nextMinute := now.Truncate(time.Minute).Add(time.Minute) - log.New(context.Background()).Infof("starting scheduler at %s", nextMinute) - timer := time.AfterFunc(nextMinute.Sub(now), func() { - api.scheduleRun(time.Now().Truncate(time.Second).Truncate(0), parser) - ticker = time.NewTicker(time.Minute) - go func() { - for { - select { - case <-stop: - return - case now := <-ticker.C: - api.scheduleRun(now.Truncate(time.Second).Truncate(0), parser) - } - } - }() - }) - return func() { - log.New(context.Background()).Info("scheduler: stop called") - timer.Stop() - if ticker != nil { - ticker.Stop() - } - stop <- struct{}{} - } -} - -func (api *API) scheduleRun(now time.Time, parser cron.Parser) { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - ctx, span := tracer.Start(ctx, "scheduler:tick") - defer span.End() - - previous := now.Add(-time.Minute).Truncate(time.Minute) - - log.New(ctx).Info("scheduler: run") - subreddits, err := models.Subreddits.Query(ctx, api.db, models.SelectWhere.Subreddits.EnableSchedule.EQ(1)).All() - if err != nil { - log.New(ctx).Err(err).Error("scheduler: failed to query subreddits") - return - } - for _, subreddit := range subreddits { - schedule, err := parser.Parse(subreddit.Schedule) - if err != nil { - log.New(ctx).Err(err).Error("scheduler: failed to parse schedule") - continue - } - next := schedule.Next(previous) - log.New(ctx).Info("scheduler: check time", "subreddit", subreddit.Name, "trigger_time", next, "now", now, "should_run", now.After(next)) - if now.After(next) { - err := api.PubsubStartDownloadSubreddit(ctx, PubsubStartDownloadSubredditParams{Subreddit: subreddit.Name}) - if err != nil { - log.New(ctx).Err(err).Error("scheduler: failed to start download", "subreddit", subreddit.Name) - continue - } - } - } -}