api: integrated scheduler
This commit is contained in:
parent
8698d61e1d
commit
ee2e307f6a
68
api/api.go
68
api/api.go
|
@ -3,20 +3,16 @@ package api
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"github.com/robfig/cron/v3"
|
|
||||||
"github.com/stephenafamo/bob"
|
"github.com/stephenafamo/bob"
|
||||||
"github.com/teivah/broadcast"
|
"github.com/teivah/broadcast"
|
||||||
"github.com/tigorlazuardi/redmage/api/bmessage"
|
"github.com/tigorlazuardi/redmage/api/bmessage"
|
||||||
"github.com/tigorlazuardi/redmage/api/reddit"
|
"github.com/tigorlazuardi/redmage/api/reddit"
|
||||||
|
"github.com/tigorlazuardi/redmage/api/scheduler"
|
||||||
"github.com/tigorlazuardi/redmage/config"
|
"github.com/tigorlazuardi/redmage/config"
|
||||||
"github.com/tigorlazuardi/redmage/models"
|
|
||||||
"github.com/tigorlazuardi/redmage/pkg/errs"
|
|
||||||
"github.com/tigorlazuardi/redmage/pkg/log"
|
"github.com/tigorlazuardi/redmage/pkg/log"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
|
||||||
"github.com/ThreeDotsLabs/watermill"
|
|
||||||
"github.com/ThreeDotsLabs/watermill/message"
|
"github.com/ThreeDotsLabs/watermill/message"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -24,9 +20,7 @@ type API struct {
|
||||||
db bob.Executor
|
db bob.Executor
|
||||||
sqldb *sql.DB
|
sqldb *sql.DB
|
||||||
|
|
||||||
scheduleStopper func()
|
scheduler *scheduler.Scheduler
|
||||||
scheduler *cron.Cron
|
|
||||||
scheduleMap map[cron.EntryID]*models.Subreddit
|
|
||||||
|
|
||||||
downloadBroadcast *broadcast.Relay[bmessage.ImageDownloadMessage]
|
downloadBroadcast *broadcast.Relay[bmessage.ImageDownloadMessage]
|
||||||
|
|
||||||
|
@ -55,11 +49,10 @@ func New(deps Dependencies) *API {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
api := &API{
|
api := &API{
|
||||||
db: bob.New(deps.DB),
|
db: bob.New(deps.DB),
|
||||||
sqldb: deps.DB,
|
sqldb: deps.DB,
|
||||||
scheduler: cron.New(),
|
|
||||||
scheduleMap: make(map[cron.EntryID]*models.Subreddit, 8),
|
|
||||||
downloadBroadcast: broadcast.NewRelay[bmessage.ImageDownloadMessage](),
|
downloadBroadcast: broadcast.NewRelay[bmessage.ImageDownloadMessage](),
|
||||||
config: deps.Config,
|
config: deps.Config,
|
||||||
imageSemaphore: make(chan struct{}, deps.Config.Int("download.concurrency.images")),
|
imageSemaphore: make(chan struct{}, deps.Config.Int("download.concurrency.images")),
|
||||||
|
@ -68,44 +61,29 @@ func New(deps Dependencies) *API {
|
||||||
publisher: deps.Publisher,
|
publisher: deps.Publisher,
|
||||||
}
|
}
|
||||||
|
|
||||||
api.scheduleStopper = api.startScheduler()
|
api.scheduler = scheduler.New(api.scheduleRun)
|
||||||
|
|
||||||
|
if err := api.scheduler.Sync(context.Background(), api.db); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
api.scheduler.Start()
|
||||||
|
|
||||||
go api.StartSubredditDownloadPubsub(ch)
|
go api.StartSubredditDownloadPubsub(ch)
|
||||||
return api
|
return api
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *API) StartScheduler(ctx context.Context) error {
|
func (api *API) scheduleRun(subreddit string) {
|
||||||
subreddits, err := models.Subreddits.Query(ctx, api.db, models.SelectWhere.Subreddits.EnableSchedule.EQ(1)).All()
|
ctx, cancel := context.WithTimeout(context.Background(), api.config.Duration("scheduler.timeout"))
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
ctx, span := tracer.Start(ctx, "*API.scheduleRun")
|
||||||
|
defer span.End()
|
||||||
|
span.SetAttributes(attribute.String("subreddit", subreddit))
|
||||||
|
|
||||||
|
log.New(ctx).Info("api: schedule run", "subreddit", subreddit)
|
||||||
|
|
||||||
|
err := api.PubsubStartDownloadSubreddit(ctx, PubsubStartDownloadSubredditParams{Subreddit: subreddit})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errs.Wrapw(err, "failed to get all subreddits")
|
log.New(ctx).Err(err).Error("api: failed to start download subreddit", "subreddit", subreddit)
|
||||||
}
|
|
||||||
|
|
||||||
for _, subreddit := range subreddits {
|
|
||||||
err := api.scheduleSubreddit(subreddit)
|
|
||||||
if err != nil {
|
|
||||||
log.New(ctx).Err(err).Error(
|
|
||||||
fmt.Sprintf("failed to start scheduler for subreddit '%s'", subreddit.Name),
|
|
||||||
"subreddit", subreddit,
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (api *API) scheduleSubreddit(subreddit *models.Subreddit) error {
|
|
||||||
id, err := api.scheduler.AddFunc(subreddit.Schedule, func() {
|
|
||||||
payload, _ := json.Marshal(subreddit)
|
|
||||||
_ = api.publisher.Publish(downloadTopic, message.NewMessage(watermill.NewUUID(), payload))
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return errs.Wrap(err)
|
|
||||||
}
|
|
||||||
api.scheduleMap[id] = subreddit
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (api *API) Close() {
|
|
||||||
api.scheduleStopper()
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,76 +0,0 @@
|
||||||
package api
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/robfig/cron/v3"
|
|
||||||
"github.com/tigorlazuardi/redmage/models"
|
|
||||||
"github.com/tigorlazuardi/redmage/pkg/log"
|
|
||||||
)
|
|
||||||
|
|
||||||
func (api *API) startScheduler() func() {
|
|
||||||
now := time.Now()
|
|
||||||
|
|
||||||
stop := make(chan struct{})
|
|
||||||
|
|
||||||
parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor)
|
|
||||||
var ticker *time.Ticker
|
|
||||||
|
|
||||||
nextMinute := now.Truncate(time.Minute).Add(time.Minute)
|
|
||||||
log.New(context.Background()).Infof("starting scheduler at %s", nextMinute)
|
|
||||||
timer := time.AfterFunc(nextMinute.Sub(now), func() {
|
|
||||||
api.scheduleRun(time.Now().Truncate(time.Second).Truncate(0), parser)
|
|
||||||
ticker = time.NewTicker(time.Minute)
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-stop:
|
|
||||||
return
|
|
||||||
case now := <-ticker.C:
|
|
||||||
api.scheduleRun(now.Truncate(time.Second).Truncate(0), parser)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
})
|
|
||||||
return func() {
|
|
||||||
log.New(context.Background()).Info("scheduler: stop called")
|
|
||||||
timer.Stop()
|
|
||||||
if ticker != nil {
|
|
||||||
ticker.Stop()
|
|
||||||
}
|
|
||||||
stop <- struct{}{}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (api *API) scheduleRun(now time.Time, parser cron.Parser) {
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
ctx, span := tracer.Start(ctx, "scheduler:tick")
|
|
||||||
defer span.End()
|
|
||||||
|
|
||||||
previous := now.Add(-time.Minute).Truncate(time.Minute)
|
|
||||||
|
|
||||||
log.New(ctx).Info("scheduler: run")
|
|
||||||
subreddits, err := models.Subreddits.Query(ctx, api.db, models.SelectWhere.Subreddits.EnableSchedule.EQ(1)).All()
|
|
||||||
if err != nil {
|
|
||||||
log.New(ctx).Err(err).Error("scheduler: failed to query subreddits")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
for _, subreddit := range subreddits {
|
|
||||||
schedule, err := parser.Parse(subreddit.Schedule)
|
|
||||||
if err != nil {
|
|
||||||
log.New(ctx).Err(err).Error("scheduler: failed to parse schedule")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
next := schedule.Next(previous)
|
|
||||||
log.New(ctx).Info("scheduler: check time", "subreddit", subreddit.Name, "trigger_time", next, "now", now, "should_run", now.After(next))
|
|
||||||
if now.After(next) {
|
|
||||||
err := api.PubsubStartDownloadSubreddit(ctx, PubsubStartDownloadSubredditParams{Subreddit: subreddit.Name})
|
|
||||||
if err != nil {
|
|
||||||
log.New(ctx).Err(err).Error("scheduler: failed to start download", "subreddit", subreddit.Name)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in a new issue