From 26529333615ae65bf7ca7f6ffc5ee80970cf677c Mon Sep 17 00:00:00 2001 From: Tigor Hutasuhut Date: Tue, 14 May 2024 18:49:45 +0700 Subject: [PATCH] api: fix scheduler deadlocks --- api/api.go | 2 +- api/scheduler/delete.go | 12 +++++++++--- api/scheduler/get.go | 38 ++++++++++++++++++++++++++++---------- api/scheduler/put.go | 14 ++++++++++---- api/scheduler/rebalance.go | 2 +- api/scheduler/scheduler.go | 4 ++-- 6 files changed, 51 insertions(+), 21 deletions(-) diff --git a/api/api.go b/api/api.go index a2d5b6f..91a5c63 100644 --- a/api/api.go +++ b/api/api.go @@ -66,7 +66,7 @@ func New(deps Dependencies) *API { if err := api.scheduler.Sync(context.Background(), api.db); err != nil { panic(err) } - api.scheduler.Start() + go api.scheduler.Start() go api.StartSubredditDownloadPubsub(ch) return api diff --git a/api/scheduler/delete.go b/api/scheduler/delete.go index 95f1c31..a5a8f1d 100644 --- a/api/scheduler/delete.go +++ b/api/scheduler/delete.go @@ -4,10 +4,16 @@ package scheduler // // If job does not exist, it will be a no-op. func (scheduler *Scheduler) Delete(subreddit string) { - scheduler.mu.Lock() - defer scheduler.mu.Unlock() + scheduler.delete(subreddit, true) +} - job := scheduler.Get(subreddit) +func (scheduler *Scheduler) delete(subreddit string, lock bool) { + if lock { + scheduler.mu.Lock() + defer scheduler.mu.Unlock() + } + + job := scheduler.get(subreddit, false) if job != nil { scheduler.scheduler.Remove(job.ID) } diff --git a/api/scheduler/get.go b/api/scheduler/get.go index 32bf2b4..e1bcbf2 100644 --- a/api/scheduler/get.go +++ b/api/scheduler/get.go @@ -2,11 +2,17 @@ package scheduler // List returns a copy of the list of jobs. func (scheduler *Scheduler) List() map[string]*Job { - scheduler.mu.RLock() - defer scheduler.mu.RUnlock() + return scheduler.list(true) +} - m := make(map[string]*Job, len(scheduler.list)) - for k, v := range scheduler.list { +func (scheduler *Scheduler) list(lock bool) map[string]*Job { + if lock { + scheduler.mu.RLock() + defer scheduler.mu.RUnlock() + } + + m := make(map[string]*Job, len(scheduler.entries)) + for k, v := range scheduler.entries { m[k] = v.clone() } @@ -17,10 +23,16 @@ func (scheduler *Scheduler) List() map[string]*Job { // // Returns nil if the subreddit is not found or active. func (scheduler *Scheduler) Get(subreddit string) *Job { - scheduler.mu.RLock() - defer scheduler.mu.RUnlock() + return scheduler.get(subreddit, true) +} - schedule := scheduler.list[subreddit] +func (scheduler *Scheduler) get(subreddit string, lock bool) *Job { + if lock { + scheduler.mu.RLock() + defer scheduler.mu.RUnlock() + } + + schedule := scheduler.entries[subreddit] if schedule != nil { return schedule.clone() } @@ -28,10 +40,16 @@ func (scheduler *Scheduler) Get(subreddit string) *Job { } func (scheduler *Scheduler) Iter(f func(string, *Job) bool) { - scheduler.mu.RLock() - defer scheduler.mu.RUnlock() + scheduler.iter(f, true) +} - for k, v := range scheduler.list { +func (scheduler *Scheduler) iter(f func(string, *Job) bool, lock bool) { + if lock { + scheduler.mu.RLock() + defer scheduler.mu.RUnlock() + } + + for k, v := range scheduler.entries { if !f(k, v.clone()) { break } diff --git a/api/scheduler/put.go b/api/scheduler/put.go index cd1f558..da8a987 100644 --- a/api/scheduler/put.go +++ b/api/scheduler/put.go @@ -11,6 +11,10 @@ import ( // // If job already exists, it will be replaced. func (scheduler *Scheduler) Put(subreddit string, schedule string) (job *Job, err error) { + return scheduler.put(subreddit, schedule, true) +} + +func (scheduler *Scheduler) put(subreddit string, schedule string, lock bool) (job *Job, err error) { sched, err := cron.ParseStandard(schedule) if err != nil { return nil, errs. @@ -18,17 +22,19 @@ func (scheduler *Scheduler) Put(subreddit string, schedule string) (job *Job, er Code(http.StatusBadRequest) } - scheduler.Delete(subreddit) + scheduler.delete(subreddit, false) id := scheduler.scheduler.Schedule(sched, cron.FuncJob(func() { scheduler.run(subreddit) })) e := scheduler.scheduler.Entry(id) job = &Job{ID: id, Entry: e} - scheduler.mu.Lock() - defer scheduler.mu.Unlock() + if lock { + scheduler.mu.Lock() + defer scheduler.mu.Unlock() + } - scheduler.list[subreddit] = job + scheduler.entries[subreddit] = job return job, nil } diff --git a/api/scheduler/rebalance.go b/api/scheduler/rebalance.go index 007e3b4..9c21476 100644 --- a/api/scheduler/rebalance.go +++ b/api/scheduler/rebalance.go @@ -33,7 +33,7 @@ func (scheduler *Scheduler) Sync(ctx context.Context, db bob.Executor) (err erro var errcoll error for _, sub := range subs { - _, err := scheduler.Put(sub.Name, sub.Schedule) + _, err := scheduler.put(sub.Name, sub.Schedule, false) if err != nil { errcoll = errors.Join(errcoll, errs.Wrapw(err, "scheduler: rebalance: failed to add job", "subreddit", sub.Name, "schedule", sub.Schedule)) } diff --git a/api/scheduler/scheduler.go b/api/scheduler/scheduler.go index 287e290..d0145cc 100644 --- a/api/scheduler/scheduler.go +++ b/api/scheduler/scheduler.go @@ -11,7 +11,7 @@ type Runner = func(subreddit string) type Scheduler struct { scheduler *cron.Cron mu sync.RWMutex - list map[string]*Job + entries map[string]*Job run Runner } @@ -30,7 +30,7 @@ func (job *Job) clone() *Job { func New(runner Runner) *Scheduler { return &Scheduler{ scheduler: cron.New(), - list: make(map[string]*Job), + entries: make(map[string]*Job), run: runner, } }