From 7488e2b5e3303ad8a8fc243ac1fede5217dcce1a Mon Sep 17 00:00:00 2001 From: Tigor Hutasuhut Date: Mon, 13 May 2024 21:10:48 +0700 Subject: [PATCH] api: revamped scheduler to run every minute Schedule now triggered by a runner that loops every minute and check if cron expression passes the time check. --- api/api.go | 13 ++++--- api/scheduler.go | 76 ++++++++++++++++++++++++++++++++++++++ api/scheduler/scheduler.go | 3 ++ 3 files changed, 87 insertions(+), 5 deletions(-) create mode 100644 api/scheduler.go create mode 100644 api/scheduler/scheduler.go diff --git a/api/api.go b/api/api.go index 1f20d6d..181fe09 100644 --- a/api/api.go +++ b/api/api.go @@ -24,8 +24,9 @@ type API struct { db bob.Executor sqldb *sql.DB - scheduler *cron.Cron - scheduleMap map[cron.EntryID]*models.Subreddit + scheduleStopper func() + scheduler *cron.Cron + scheduleMap map[cron.EntryID]*models.Subreddit downloadBroadcast *broadcast.Relay[bmessage.ImageDownloadMessage] @@ -67,9 +68,7 @@ func New(deps Dependencies) *API { publisher: deps.Publisher, } - if err := api.StartScheduler(context.Background()); err != nil { - panic(err) - } + api.scheduleStopper = api.startScheduler() go api.StartSubredditDownloadPubsub(ch) return api } @@ -106,3 +105,7 @@ func (api *API) scheduleSubreddit(subreddit *models.Subreddit) error { return nil } + +func (api *API) Close() { + api.scheduleStopper() +} diff --git a/api/scheduler.go b/api/scheduler.go new file mode 100644 index 0000000..130a8db --- /dev/null +++ b/api/scheduler.go @@ -0,0 +1,76 @@ +package api + +import ( + "context" + "time" + + "github.com/robfig/cron/v3" + "github.com/tigorlazuardi/redmage/models" + "github.com/tigorlazuardi/redmage/pkg/log" +) + +func (api *API) startScheduler() func() { + now := time.Now() + + stop := make(chan struct{}) + + parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor) + var ticker *time.Ticker + + nextMinute := now.Truncate(time.Minute).Add(time.Minute) + log.New(context.Background()).Infof("starting scheduler at %s", nextMinute) + timer := time.AfterFunc(nextMinute.Sub(now), func() { + api.scheduleRun(time.Now().Truncate(time.Second).Truncate(0), parser) + ticker = time.NewTicker(time.Minute) + go func() { + for { + select { + case <-stop: + return + case now := <-ticker.C: + api.scheduleRun(now.Truncate(time.Second).Truncate(0), parser) + } + } + }() + }) + return func() { + log.New(context.Background()).Info("scheduler: stop called") + timer.Stop() + if ticker != nil { + ticker.Stop() + } + stop <- struct{}{} + } +} + +func (api *API) scheduleRun(now time.Time, parser cron.Parser) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + ctx, span := tracer.Start(ctx, "scheduler:tick") + defer span.End() + + previous := now.Add(-time.Minute).Truncate(time.Minute) + + log.New(ctx).Info("scheduler: run") + subreddits, err := models.Subreddits.Query(ctx, api.db, models.SelectWhere.Subreddits.EnableSchedule.EQ(1)).All() + if err != nil { + log.New(ctx).Err(err).Error("scheduler: failed to query subreddits") + return + } + for _, subreddit := range subreddits { + schedule, err := parser.Parse(subreddit.Schedule) + if err != nil { + log.New(ctx).Err(err).Error("scheduler: failed to parse schedule") + continue + } + next := schedule.Next(previous) + log.New(ctx).Info("scheduler: check time", "subreddit", subreddit.Name, "trigger_time", next, "now", now, "should_run", now.After(next)) + if now.After(next) { + err := api.PubsubStartDownloadSubreddit(ctx, PubsubStartDownloadSubredditParams{Subreddit: subreddit.Name}) + if err != nil { + log.New(ctx).Err(err).Error("scheduler: failed to start download", "subreddit", subreddit.Name) + continue + } + } + } +} diff --git a/api/scheduler/scheduler.go b/api/scheduler/scheduler.go new file mode 100644 index 0000000..c710887 --- /dev/null +++ b/api/scheduler/scheduler.go @@ -0,0 +1,3 @@ +package scheduler + +type Scheduler struct{}