api: fix scheduler deadlocks
Some checks failed
/ push (push) Has been cancelled

This commit is contained in:
Tigor Hutasuhut 2024-05-14 18:49:45 +07:00
parent 0d849ec172
commit 2652933361
6 changed files with 51 additions and 21 deletions

View file

@ -66,7 +66,7 @@ func New(deps Dependencies) *API {
if err := api.scheduler.Sync(context.Background(), api.db); err != nil { if err := api.scheduler.Sync(context.Background(), api.db); err != nil {
panic(err) panic(err)
} }
api.scheduler.Start() go api.scheduler.Start()
go api.StartSubredditDownloadPubsub(ch) go api.StartSubredditDownloadPubsub(ch)
return api return api

View file

@ -4,10 +4,16 @@ package scheduler
// //
// If job does not exist, it will be a no-op. // If job does not exist, it will be a no-op.
func (scheduler *Scheduler) Delete(subreddit string) { func (scheduler *Scheduler) Delete(subreddit string) {
scheduler.mu.Lock() scheduler.delete(subreddit, true)
defer scheduler.mu.Unlock() }
job := scheduler.Get(subreddit) func (scheduler *Scheduler) delete(subreddit string, lock bool) {
if lock {
scheduler.mu.Lock()
defer scheduler.mu.Unlock()
}
job := scheduler.get(subreddit, false)
if job != nil { if job != nil {
scheduler.scheduler.Remove(job.ID) scheduler.scheduler.Remove(job.ID)
} }

View file

@ -2,11 +2,17 @@ package scheduler
// List returns a copy of the list of jobs. // List returns a copy of the list of jobs.
func (scheduler *Scheduler) List() map[string]*Job { func (scheduler *Scheduler) List() map[string]*Job {
scheduler.mu.RLock() return scheduler.list(true)
defer scheduler.mu.RUnlock() }
m := make(map[string]*Job, len(scheduler.list)) func (scheduler *Scheduler) list(lock bool) map[string]*Job {
for k, v := range scheduler.list { if lock {
scheduler.mu.RLock()
defer scheduler.mu.RUnlock()
}
m := make(map[string]*Job, len(scheduler.entries))
for k, v := range scheduler.entries {
m[k] = v.clone() m[k] = v.clone()
} }
@ -17,10 +23,16 @@ func (scheduler *Scheduler) List() map[string]*Job {
// //
// Returns nil if the subreddit is not found or active. // Returns nil if the subreddit is not found or active.
func (scheduler *Scheduler) Get(subreddit string) *Job { func (scheduler *Scheduler) Get(subreddit string) *Job {
scheduler.mu.RLock() return scheduler.get(subreddit, true)
defer scheduler.mu.RUnlock() }
schedule := scheduler.list[subreddit] func (scheduler *Scheduler) get(subreddit string, lock bool) *Job {
if lock {
scheduler.mu.RLock()
defer scheduler.mu.RUnlock()
}
schedule := scheduler.entries[subreddit]
if schedule != nil { if schedule != nil {
return schedule.clone() return schedule.clone()
} }
@ -28,10 +40,16 @@ func (scheduler *Scheduler) Get(subreddit string) *Job {
} }
func (scheduler *Scheduler) Iter(f func(string, *Job) bool) { func (scheduler *Scheduler) Iter(f func(string, *Job) bool) {
scheduler.mu.RLock() scheduler.iter(f, true)
defer scheduler.mu.RUnlock() }
for k, v := range scheduler.list { func (scheduler *Scheduler) iter(f func(string, *Job) bool, lock bool) {
if lock {
scheduler.mu.RLock()
defer scheduler.mu.RUnlock()
}
for k, v := range scheduler.entries {
if !f(k, v.clone()) { if !f(k, v.clone()) {
break break
} }

View file

@ -11,6 +11,10 @@ import (
// //
// If job already exists, it will be replaced. // If job already exists, it will be replaced.
func (scheduler *Scheduler) Put(subreddit string, schedule string) (job *Job, err error) { func (scheduler *Scheduler) Put(subreddit string, schedule string) (job *Job, err error) {
return scheduler.put(subreddit, schedule, true)
}
func (scheduler *Scheduler) put(subreddit string, schedule string, lock bool) (job *Job, err error) {
sched, err := cron.ParseStandard(schedule) sched, err := cron.ParseStandard(schedule)
if err != nil { if err != nil {
return nil, errs. return nil, errs.
@ -18,17 +22,19 @@ func (scheduler *Scheduler) Put(subreddit string, schedule string) (job *Job, er
Code(http.StatusBadRequest) Code(http.StatusBadRequest)
} }
scheduler.Delete(subreddit) scheduler.delete(subreddit, false)
id := scheduler.scheduler.Schedule(sched, cron.FuncJob(func() { scheduler.run(subreddit) })) id := scheduler.scheduler.Schedule(sched, cron.FuncJob(func() { scheduler.run(subreddit) }))
e := scheduler.scheduler.Entry(id) e := scheduler.scheduler.Entry(id)
job = &Job{ID: id, Entry: e} job = &Job{ID: id, Entry: e}
scheduler.mu.Lock() if lock {
defer scheduler.mu.Unlock() scheduler.mu.Lock()
defer scheduler.mu.Unlock()
}
scheduler.list[subreddit] = job scheduler.entries[subreddit] = job
return job, nil return job, nil
} }

View file

@ -33,7 +33,7 @@ func (scheduler *Scheduler) Sync(ctx context.Context, db bob.Executor) (err erro
var errcoll error var errcoll error
for _, sub := range subs { for _, sub := range subs {
_, err := scheduler.Put(sub.Name, sub.Schedule) _, err := scheduler.put(sub.Name, sub.Schedule, false)
if err != nil { if err != nil {
errcoll = errors.Join(errcoll, errs.Wrapw(err, "scheduler: rebalance: failed to add job", "subreddit", sub.Name, "schedule", sub.Schedule)) errcoll = errors.Join(errcoll, errs.Wrapw(err, "scheduler: rebalance: failed to add job", "subreddit", sub.Name, "schedule", sub.Schedule))
} }

View file

@ -11,7 +11,7 @@ type Runner = func(subreddit string)
type Scheduler struct { type Scheduler struct {
scheduler *cron.Cron scheduler *cron.Cron
mu sync.RWMutex mu sync.RWMutex
list map[string]*Job entries map[string]*Job
run Runner run Runner
} }
@ -30,7 +30,7 @@ func (job *Job) clone() *Job {
func New(runner Runner) *Scheduler { func New(runner Runner) *Scheduler {
return &Scheduler{ return &Scheduler{
scheduler: cron.New(), scheduler: cron.New(),
list: make(map[string]*Job), entries: make(map[string]*Job),
run: runner, run: runner,
} }
} }