diff --git a/api/pubsub_download.go b/api/pubsub_download.go index 687309a..e47d09f 100644 --- a/api/pubsub_download.go +++ b/api/pubsub_download.go @@ -96,26 +96,28 @@ func (api *API) PubsubStartDownloadSubreddit(ctx context.Context, params PubsubS return errs.Wrapw(err, "failed to verify subreddit existence", "params", params) } - err = api.withTransaction(ctx, func(exec bob.Executor) error { - _, err := api.scheduleSet(ctx, exec, ScheduleSetParams{ - Subreddit: subreddit.Name, - Status: ScheduleStatusEnqueued, + api.lockf(func() { + err = api.withTransaction(ctx, func(exec bob.Executor) error { + _, err := api.scheduleSet(ctx, exec, ScheduleSetParams{ + Subreddit: subreddit.Name, + Status: ScheduleStatusEnqueued, + }) + if err != nil { + return err + } + + payload, err := json.Marshal(subreddit) + if err != nil { + return errs.Wrapw(err, "failed to marshal subreddit") + } + + err = api.publisher.Publish(downloadTopic, message.NewMessage(watermill.NewUUID(), payload)) + if err != nil { + return errs.Wrapw(err, "failed to enqueue reddit download", "params", params) + } + return nil }) - if err != nil { - return err - } - - payload, err := json.Marshal(subreddit) - if err != nil { - return errs.Wrapw(err, "failed to marshal subreddit") - } - - err = api.publisher.Publish(downloadTopic, message.NewMessage(watermill.NewUUID(), payload)) - if err != nil { - return errs.Wrapw(err, "failed to enqueue reddit download", "params", params) - } - return nil }) - return nil + return err }