From f78661aa6e51c6f2e86343f4d2624debcaa28b6f Mon Sep 17 00:00:00 2001 From: Tigor Hutasuhut Date: Tue, 21 May 2024 22:27:11 +0700 Subject: [PATCH] scheduler: fix missing default sheduler timeout causing scheduler to never runs --- api/scheduler/delete.go | 11 ++++------- api/scheduler/put.go | 13 +++++-------- api/scheduler/rebalance.go | 12 +++++------- api/subreddits_create.go | 7 +++++++ config/default.go | 3 +++ 5 files changed, 24 insertions(+), 22 deletions(-) diff --git a/api/scheduler/delete.go b/api/scheduler/delete.go index a5a8f1d..acc4417 100644 --- a/api/scheduler/delete.go +++ b/api/scheduler/delete.go @@ -4,15 +4,12 @@ package scheduler // // If job does not exist, it will be a no-op. func (scheduler *Scheduler) Delete(subreddit string) { - scheduler.delete(subreddit, true) + scheduler.mu.Lock() + defer scheduler.mu.Unlock() + scheduler.delete(subreddit) } -func (scheduler *Scheduler) delete(subreddit string, lock bool) { - if lock { - scheduler.mu.Lock() - defer scheduler.mu.Unlock() - } - +func (scheduler *Scheduler) delete(subreddit string) { job := scheduler.get(subreddit, false) if job != nil { scheduler.scheduler.Remove(job.ID) diff --git a/api/scheduler/put.go b/api/scheduler/put.go index da8a987..b7aa4ae 100644 --- a/api/scheduler/put.go +++ b/api/scheduler/put.go @@ -11,10 +11,12 @@ import ( // // If job already exists, it will be replaced. func (scheduler *Scheduler) Put(subreddit string, schedule string) (job *Job, err error) { - return scheduler.put(subreddit, schedule, true) + scheduler.mu.Lock() + defer scheduler.mu.Unlock() + return scheduler.put(subreddit, schedule) } -func (scheduler *Scheduler) put(subreddit string, schedule string, lock bool) (job *Job, err error) { +func (scheduler *Scheduler) put(subreddit string, schedule string) (job *Job, err error) { sched, err := cron.ParseStandard(schedule) if err != nil { return nil, errs. @@ -22,18 +24,13 @@ func (scheduler *Scheduler) put(subreddit string, schedule string, lock bool) (j Code(http.StatusBadRequest) } - scheduler.delete(subreddit, false) + scheduler.delete(subreddit) id := scheduler.scheduler.Schedule(sched, cron.FuncJob(func() { scheduler.run(subreddit) })) e := scheduler.scheduler.Entry(id) job = &Job{ID: id, Entry: e} - if lock { - scheduler.mu.Lock() - defer scheduler.mu.Unlock() - } - scheduler.entries[subreddit] = job return job, nil diff --git a/api/scheduler/rebalance.go b/api/scheduler/rebalance.go index 9c21476..ca17e7e 100644 --- a/api/scheduler/rebalance.go +++ b/api/scheduler/rebalance.go @@ -30,17 +30,15 @@ func (scheduler *Scheduler) Sync(ctx context.Context, db bob.Executor) (err erro scheduler.scheduler.Remove(entry.ID) } - var errcoll error + errcoll := make([]error, 0, len(subs)) for _, sub := range subs { - _, err := scheduler.put(sub.Name, sub.Schedule, false) - if err != nil { - errcoll = errors.Join(errcoll, errs.Wrapw(err, "scheduler: rebalance: failed to add job", "subreddit", sub.Name, "schedule", sub.Schedule)) - } + _, err := scheduler.put(sub.Name, sub.Schedule) + errcoll = append(errcoll, err) } - if errcoll != nil { - return errs.Wrapw(errcoll, "scheduler: encountered errors while rebalancing jobs") + if err := errors.Join(errcoll...); err != nil { + return errs.Wrapw(err, "scheduler: encountered errors while rebalancing jobs") } return nil diff --git a/api/subreddits_create.go b/api/subreddits_create.go index b1ba41a..2305139 100644 --- a/api/subreddits_create.go +++ b/api/subreddits_create.go @@ -46,5 +46,12 @@ func (api *API) SubredditsCreate(ctx context.Context, params *models.Subreddit) return subreddit, errs.Wrapw(err, "failed to set schedule status") } + if params.EnableSchedule == 1 { + _, err = api.scheduler.Put(subreddit.Name, subreddit.Schedule) + if err != nil { + return subreddit, errs.Wrapw(err, "failed to put job to scheduler") + } + } + return subreddit, nil } diff --git a/config/default.go b/config/default.go index df70d2f..0e0519a 100644 --- a/config/default.go +++ b/config/default.go @@ -2,6 +2,7 @@ package config import ( "path" + "time" "github.com/adrg/xdg" ) @@ -64,4 +65,6 @@ var DefaultConfig = map[string]any{ "runtime.version": Version, "runtime.environment": "development", + + "scheduler.timeout": time.Second * 10, }