diff --git a/api/scheduler/delete.go b/api/scheduler/delete.go new file mode 100644 index 0000000..95f1c31 --- /dev/null +++ b/api/scheduler/delete.go @@ -0,0 +1,14 @@ +package scheduler + +// Delete removes a job from the scheduler. +// +// If job does not exist, it will be a no-op. +func (scheduler *Scheduler) Delete(subreddit string) { + scheduler.mu.Lock() + defer scheduler.mu.Unlock() + + job := scheduler.Get(subreddit) + if job != nil { + scheduler.scheduler.Remove(job.ID) + } +} diff --git a/api/scheduler/get.go b/api/scheduler/get.go new file mode 100644 index 0000000..32bf2b4 --- /dev/null +++ b/api/scheduler/get.go @@ -0,0 +1,39 @@ +package scheduler + +// List returns a copy of the list of jobs. +func (scheduler *Scheduler) List() map[string]*Job { + scheduler.mu.RLock() + defer scheduler.mu.RUnlock() + + m := make(map[string]*Job, len(scheduler.list)) + for k, v := range scheduler.list { + m[k] = v.clone() + } + + return m +} + +// Get returns the job for the given subreddit. +// +// Returns nil if the subreddit is not found or active. +func (scheduler *Scheduler) Get(subreddit string) *Job { + scheduler.mu.RLock() + defer scheduler.mu.RUnlock() + + schedule := scheduler.list[subreddit] + if schedule != nil { + return schedule.clone() + } + return nil +} + +func (scheduler *Scheduler) Iter(f func(string, *Job) bool) { + scheduler.mu.RLock() + defer scheduler.mu.RUnlock() + + for k, v := range scheduler.list { + if !f(k, v.clone()) { + break + } + } +} diff --git a/api/scheduler/put.go b/api/scheduler/put.go new file mode 100644 index 0000000..cd1f558 --- /dev/null +++ b/api/scheduler/put.go @@ -0,0 +1,34 @@ +package scheduler + +import ( + "net/http" + + "github.com/robfig/cron/v3" + "github.com/tigorlazuardi/redmage/pkg/errs" +) + +// Put adds a job to the scheduler. +// +// If job already exists, it will be replaced. +func (scheduler *Scheduler) Put(subreddit string, schedule string) (job *Job, err error) { + sched, err := cron.ParseStandard(schedule) + if err != nil { + return nil, errs. + Wrapw(err, "scheduler: failed to parse schedule", "subreddit", subreddit, "schedule", schedule). + Code(http.StatusBadRequest) + } + + scheduler.Delete(subreddit) + + id := scheduler.scheduler.Schedule(sched, cron.FuncJob(func() { scheduler.run(subreddit) })) + + e := scheduler.scheduler.Entry(id) + job = &Job{ID: id, Entry: e} + + scheduler.mu.Lock() + defer scheduler.mu.Unlock() + + scheduler.list[subreddit] = job + + return job, nil +} diff --git a/api/scheduler/rebalance.go b/api/scheduler/rebalance.go new file mode 100644 index 0000000..007e3b4 --- /dev/null +++ b/api/scheduler/rebalance.go @@ -0,0 +1,47 @@ +package scheduler + +import ( + "context" + "errors" + + "github.com/stephenafamo/bob" + "github.com/tigorlazuardi/redmage/models" + "github.com/tigorlazuardi/redmage/pkg/errs" +) + +// Sync empties the scheduler and re-adds all enabled jobs from the database. +// +// This is costly but ensures that the scheduler is in sync with the database. +// +// For simpler operation consider using Put and Delete instead. +func (scheduler *Scheduler) Sync(ctx context.Context, db bob.Executor) (err error) { + ctx, span := tracer.Start(ctx, "*Scheduler.Rebalance") + defer span.End() + + subs, err := models.Subreddits.Query(ctx, db, models.SelectWhere.Subreddits.EnableSchedule.EQ(1)).All() + if err != nil { + return errs.Wrapw(err, "scheduler: rebalance: failed to query subreddits") + } + + scheduler.mu.Lock() + defer scheduler.mu.Unlock() + + for _, entry := range scheduler.scheduler.Entries() { + scheduler.scheduler.Remove(entry.ID) + } + + var errcoll error + + for _, sub := range subs { + _, err := scheduler.Put(sub.Name, sub.Schedule) + if err != nil { + errcoll = errors.Join(errcoll, errs.Wrapw(err, "scheduler: rebalance: failed to add job", "subreddit", sub.Name, "schedule", sub.Schedule)) + } + } + + if errcoll != nil { + return errs.Wrapw(errcoll, "scheduler: encountered errors while rebalancing jobs") + } + + return nil +} diff --git a/api/scheduler/scheduler.go b/api/scheduler/scheduler.go index c710887..287e290 100644 --- a/api/scheduler/scheduler.go +++ b/api/scheduler/scheduler.go @@ -1,3 +1,46 @@ package scheduler -type Scheduler struct{} +import ( + "sync" + + "github.com/robfig/cron/v3" +) + +type Runner = func(subreddit string) + +type Scheduler struct { + scheduler *cron.Cron + mu sync.RWMutex + list map[string]*Job + run Runner +} + +type Job struct { + ID cron.EntryID + Entry cron.Entry +} + +func (job *Job) clone() *Job { + return &Job{ + ID: job.ID, + Entry: job.Entry, + } +} + +func New(runner Runner) *Scheduler { + return &Scheduler{ + scheduler: cron.New(), + list: make(map[string]*Job), + run: runner, + } +} + +// Start starts the scheduler in the background. +func (s *Scheduler) Start() { + s.scheduler.Start() +} + +// Stop stops the scheduler. +func (s *Scheduler) Stop() { + s.scheduler.Stop() +} diff --git a/api/scheduler/tracer.go b/api/scheduler/tracer.go new file mode 100644 index 0000000..1a3db06 --- /dev/null +++ b/api/scheduler/tracer.go @@ -0,0 +1,7 @@ +package scheduler + +import ( + "go.opentelemetry.io/otel" +) + +var tracer = otel.Tracer("scheduler")