scheduler: implemented more proper scheduler api

This commit is contained in:
Tigor Hutasuhut 2024-05-14 09:51:53 +07:00
parent a5192c949b
commit f3ce9950a3
6 changed files with 185 additions and 1 deletions

14
api/scheduler/delete.go Normal file
View file

@ -0,0 +1,14 @@
package scheduler
// Delete removes a job from the scheduler.
//
// If job does not exist, it will be a no-op.
func (scheduler *Scheduler) Delete(subreddit string) {
scheduler.mu.Lock()
defer scheduler.mu.Unlock()
job := scheduler.Get(subreddit)
if job != nil {
scheduler.scheduler.Remove(job.ID)
}
}

39
api/scheduler/get.go Normal file
View file

@ -0,0 +1,39 @@
package scheduler
// List returns a copy of the list of jobs.
func (scheduler *Scheduler) List() map[string]*Job {
scheduler.mu.RLock()
defer scheduler.mu.RUnlock()
m := make(map[string]*Job, len(scheduler.list))
for k, v := range scheduler.list {
m[k] = v.clone()
}
return m
}
// Get returns the job for the given subreddit.
//
// Returns nil if the subreddit is not found or active.
func (scheduler *Scheduler) Get(subreddit string) *Job {
scheduler.mu.RLock()
defer scheduler.mu.RUnlock()
schedule := scheduler.list[subreddit]
if schedule != nil {
return schedule.clone()
}
return nil
}
func (scheduler *Scheduler) Iter(f func(string, *Job) bool) {
scheduler.mu.RLock()
defer scheduler.mu.RUnlock()
for k, v := range scheduler.list {
if !f(k, v.clone()) {
break
}
}
}

34
api/scheduler/put.go Normal file
View file

@ -0,0 +1,34 @@
package scheduler
import (
"net/http"
"github.com/robfig/cron/v3"
"github.com/tigorlazuardi/redmage/pkg/errs"
)
// Put adds a job to the scheduler.
//
// If job already exists, it will be replaced.
func (scheduler *Scheduler) Put(subreddit string, schedule string) (job *Job, err error) {
sched, err := cron.ParseStandard(schedule)
if err != nil {
return nil, errs.
Wrapw(err, "scheduler: failed to parse schedule", "subreddit", subreddit, "schedule", schedule).
Code(http.StatusBadRequest)
}
scheduler.Delete(subreddit)
id := scheduler.scheduler.Schedule(sched, cron.FuncJob(func() { scheduler.run(subreddit) }))
e := scheduler.scheduler.Entry(id)
job = &Job{ID: id, Entry: e}
scheduler.mu.Lock()
defer scheduler.mu.Unlock()
scheduler.list[subreddit] = job
return job, nil
}

View file

@ -0,0 +1,47 @@
package scheduler
import (
"context"
"errors"
"github.com/stephenafamo/bob"
"github.com/tigorlazuardi/redmage/models"
"github.com/tigorlazuardi/redmage/pkg/errs"
)
// Sync empties the scheduler and re-adds all enabled jobs from the database.
//
// This is costly but ensures that the scheduler is in sync with the database.
//
// For simpler operation consider using Put and Delete instead.
func (scheduler *Scheduler) Sync(ctx context.Context, db bob.Executor) (err error) {
ctx, span := tracer.Start(ctx, "*Scheduler.Rebalance")
defer span.End()
subs, err := models.Subreddits.Query(ctx, db, models.SelectWhere.Subreddits.EnableSchedule.EQ(1)).All()
if err != nil {
return errs.Wrapw(err, "scheduler: rebalance: failed to query subreddits")
}
scheduler.mu.Lock()
defer scheduler.mu.Unlock()
for _, entry := range scheduler.scheduler.Entries() {
scheduler.scheduler.Remove(entry.ID)
}
var errcoll error
for _, sub := range subs {
_, err := scheduler.Put(sub.Name, sub.Schedule)
if err != nil {
errcoll = errors.Join(errcoll, errs.Wrapw(err, "scheduler: rebalance: failed to add job", "subreddit", sub.Name, "schedule", sub.Schedule))
}
}
if errcoll != nil {
return errs.Wrapw(errcoll, "scheduler: encountered errors while rebalancing jobs")
}
return nil
}

View file

@ -1,3 +1,46 @@
package scheduler
type Scheduler struct{}
import (
"sync"
"github.com/robfig/cron/v3"
)
type Runner = func(subreddit string)
type Scheduler struct {
scheduler *cron.Cron
mu sync.RWMutex
list map[string]*Job
run Runner
}
type Job struct {
ID cron.EntryID
Entry cron.Entry
}
func (job *Job) clone() *Job {
return &Job{
ID: job.ID,
Entry: job.Entry,
}
}
func New(runner Runner) *Scheduler {
return &Scheduler{
scheduler: cron.New(),
list: make(map[string]*Job),
run: runner,
}
}
// Start starts the scheduler in the background.
func (s *Scheduler) Start() {
s.scheduler.Start()
}
// Stop stops the scheduler.
func (s *Scheduler) Stop() {
s.scheduler.Stop()
}

7
api/scheduler/tracer.go Normal file
View file

@ -0,0 +1,7 @@
package scheduler
import (
"go.opentelemetry.io/otel"
)
var tracer = otel.Tracer("scheduler")