Schedule now triggered by a runner that loops every minute and check if cron expression passes the time check.
This commit is contained in:
parent
a17fdf9551
commit
7488e2b5e3
13
api/api.go
13
api/api.go
|
@ -24,8 +24,9 @@ type API struct {
|
|||
db bob.Executor
|
||||
sqldb *sql.DB
|
||||
|
||||
scheduler *cron.Cron
|
||||
scheduleMap map[cron.EntryID]*models.Subreddit
|
||||
scheduleStopper func()
|
||||
scheduler *cron.Cron
|
||||
scheduleMap map[cron.EntryID]*models.Subreddit
|
||||
|
||||
downloadBroadcast *broadcast.Relay[bmessage.ImageDownloadMessage]
|
||||
|
||||
|
@ -67,9 +68,7 @@ func New(deps Dependencies) *API {
|
|||
publisher: deps.Publisher,
|
||||
}
|
||||
|
||||
if err := api.StartScheduler(context.Background()); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
api.scheduleStopper = api.startScheduler()
|
||||
go api.StartSubredditDownloadPubsub(ch)
|
||||
return api
|
||||
}
|
||||
|
@ -106,3 +105,7 @@ func (api *API) scheduleSubreddit(subreddit *models.Subreddit) error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (api *API) Close() {
|
||||
api.scheduleStopper()
|
||||
}
|
||||
|
|
76
api/scheduler.go
Normal file
76
api/scheduler.go
Normal file
|
@ -0,0 +1,76 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/robfig/cron/v3"
|
||||
"github.com/tigorlazuardi/redmage/models"
|
||||
"github.com/tigorlazuardi/redmage/pkg/log"
|
||||
)
|
||||
|
||||
func (api *API) startScheduler() func() {
|
||||
now := time.Now()
|
||||
|
||||
stop := make(chan struct{})
|
||||
|
||||
parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor)
|
||||
var ticker *time.Ticker
|
||||
|
||||
nextMinute := now.Truncate(time.Minute).Add(time.Minute)
|
||||
log.New(context.Background()).Infof("starting scheduler at %s", nextMinute)
|
||||
timer := time.AfterFunc(nextMinute.Sub(now), func() {
|
||||
api.scheduleRun(time.Now().Truncate(time.Second).Truncate(0), parser)
|
||||
ticker = time.NewTicker(time.Minute)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-stop:
|
||||
return
|
||||
case now := <-ticker.C:
|
||||
api.scheduleRun(now.Truncate(time.Second).Truncate(0), parser)
|
||||
}
|
||||
}
|
||||
}()
|
||||
})
|
||||
return func() {
|
||||
log.New(context.Background()).Info("scheduler: stop called")
|
||||
timer.Stop()
|
||||
if ticker != nil {
|
||||
ticker.Stop()
|
||||
}
|
||||
stop <- struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
func (api *API) scheduleRun(now time.Time, parser cron.Parser) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
ctx, span := tracer.Start(ctx, "scheduler:tick")
|
||||
defer span.End()
|
||||
|
||||
previous := now.Add(-time.Minute).Truncate(time.Minute)
|
||||
|
||||
log.New(ctx).Info("scheduler: run")
|
||||
subreddits, err := models.Subreddits.Query(ctx, api.db, models.SelectWhere.Subreddits.EnableSchedule.EQ(1)).All()
|
||||
if err != nil {
|
||||
log.New(ctx).Err(err).Error("scheduler: failed to query subreddits")
|
||||
return
|
||||
}
|
||||
for _, subreddit := range subreddits {
|
||||
schedule, err := parser.Parse(subreddit.Schedule)
|
||||
if err != nil {
|
||||
log.New(ctx).Err(err).Error("scheduler: failed to parse schedule")
|
||||
continue
|
||||
}
|
||||
next := schedule.Next(previous)
|
||||
log.New(ctx).Info("scheduler: check time", "subreddit", subreddit.Name, "trigger_time", next, "now", now, "should_run", now.After(next))
|
||||
if now.After(next) {
|
||||
err := api.PubsubStartDownloadSubreddit(ctx, PubsubStartDownloadSubredditParams{Subreddit: subreddit.Name})
|
||||
if err != nil {
|
||||
log.New(ctx).Err(err).Error("scheduler: failed to start download", "subreddit", subreddit.Name)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
3
api/scheduler/scheduler.go
Normal file
3
api/scheduler/scheduler.go
Normal file
|
@ -0,0 +1,3 @@
|
|||
package scheduler
|
||||
|
||||
type Scheduler struct{}
|
Loading…
Reference in a new issue