scheduler: fix missing default sheduler timeout causing scheduler to
Some checks failed
/ push (push) Has been cancelled
Some checks failed
/ push (push) Has been cancelled
never runs
This commit is contained in:
parent
a98dd2d33e
commit
f78661aa6e
|
@ -4,15 +4,12 @@ package scheduler
|
||||||
//
|
//
|
||||||
// If job does not exist, it will be a no-op.
|
// If job does not exist, it will be a no-op.
|
||||||
func (scheduler *Scheduler) Delete(subreddit string) {
|
func (scheduler *Scheduler) Delete(subreddit string) {
|
||||||
scheduler.delete(subreddit, true)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (scheduler *Scheduler) delete(subreddit string, lock bool) {
|
|
||||||
if lock {
|
|
||||||
scheduler.mu.Lock()
|
scheduler.mu.Lock()
|
||||||
defer scheduler.mu.Unlock()
|
defer scheduler.mu.Unlock()
|
||||||
|
scheduler.delete(subreddit)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (scheduler *Scheduler) delete(subreddit string) {
|
||||||
job := scheduler.get(subreddit, false)
|
job := scheduler.get(subreddit, false)
|
||||||
if job != nil {
|
if job != nil {
|
||||||
scheduler.scheduler.Remove(job.ID)
|
scheduler.scheduler.Remove(job.ID)
|
||||||
|
|
|
@ -11,10 +11,12 @@ import (
|
||||||
//
|
//
|
||||||
// If job already exists, it will be replaced.
|
// If job already exists, it will be replaced.
|
||||||
func (scheduler *Scheduler) Put(subreddit string, schedule string) (job *Job, err error) {
|
func (scheduler *Scheduler) Put(subreddit string, schedule string) (job *Job, err error) {
|
||||||
return scheduler.put(subreddit, schedule, true)
|
scheduler.mu.Lock()
|
||||||
|
defer scheduler.mu.Unlock()
|
||||||
|
return scheduler.put(subreddit, schedule)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (scheduler *Scheduler) put(subreddit string, schedule string, lock bool) (job *Job, err error) {
|
func (scheduler *Scheduler) put(subreddit string, schedule string) (job *Job, err error) {
|
||||||
sched, err := cron.ParseStandard(schedule)
|
sched, err := cron.ParseStandard(schedule)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errs.
|
return nil, errs.
|
||||||
|
@ -22,18 +24,13 @@ func (scheduler *Scheduler) put(subreddit string, schedule string, lock bool) (j
|
||||||
Code(http.StatusBadRequest)
|
Code(http.StatusBadRequest)
|
||||||
}
|
}
|
||||||
|
|
||||||
scheduler.delete(subreddit, false)
|
scheduler.delete(subreddit)
|
||||||
|
|
||||||
id := scheduler.scheduler.Schedule(sched, cron.FuncJob(func() { scheduler.run(subreddit) }))
|
id := scheduler.scheduler.Schedule(sched, cron.FuncJob(func() { scheduler.run(subreddit) }))
|
||||||
|
|
||||||
e := scheduler.scheduler.Entry(id)
|
e := scheduler.scheduler.Entry(id)
|
||||||
job = &Job{ID: id, Entry: e}
|
job = &Job{ID: id, Entry: e}
|
||||||
|
|
||||||
if lock {
|
|
||||||
scheduler.mu.Lock()
|
|
||||||
defer scheduler.mu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
scheduler.entries[subreddit] = job
|
scheduler.entries[subreddit] = job
|
||||||
|
|
||||||
return job, nil
|
return job, nil
|
||||||
|
|
|
@ -30,17 +30,15 @@ func (scheduler *Scheduler) Sync(ctx context.Context, db bob.Executor) (err erro
|
||||||
scheduler.scheduler.Remove(entry.ID)
|
scheduler.scheduler.Remove(entry.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
var errcoll error
|
errcoll := make([]error, 0, len(subs))
|
||||||
|
|
||||||
for _, sub := range subs {
|
for _, sub := range subs {
|
||||||
_, err := scheduler.put(sub.Name, sub.Schedule, false)
|
_, err := scheduler.put(sub.Name, sub.Schedule)
|
||||||
if err != nil {
|
errcoll = append(errcoll, err)
|
||||||
errcoll = errors.Join(errcoll, errs.Wrapw(err, "scheduler: rebalance: failed to add job", "subreddit", sub.Name, "schedule", sub.Schedule))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if errcoll != nil {
|
if err := errors.Join(errcoll...); err != nil {
|
||||||
return errs.Wrapw(errcoll, "scheduler: encountered errors while rebalancing jobs")
|
return errs.Wrapw(err, "scheduler: encountered errors while rebalancing jobs")
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -46,5 +46,12 @@ func (api *API) SubredditsCreate(ctx context.Context, params *models.Subreddit)
|
||||||
return subreddit, errs.Wrapw(err, "failed to set schedule status")
|
return subreddit, errs.Wrapw(err, "failed to set schedule status")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if params.EnableSchedule == 1 {
|
||||||
|
_, err = api.scheduler.Put(subreddit.Name, subreddit.Schedule)
|
||||||
|
if err != nil {
|
||||||
|
return subreddit, errs.Wrapw(err, "failed to put job to scheduler")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return subreddit, nil
|
return subreddit, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@ package config
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"path"
|
"path"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/adrg/xdg"
|
"github.com/adrg/xdg"
|
||||||
)
|
)
|
||||||
|
@ -64,4 +65,6 @@ var DefaultConfig = map[string]any{
|
||||||
|
|
||||||
"runtime.version": Version,
|
"runtime.version": Version,
|
||||||
"runtime.environment": "development",
|
"runtime.environment": "development",
|
||||||
|
|
||||||
|
"scheduler.timeout": time.Second * 10,
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue