diff --git a/api/pubsub_download.go b/api/pubsub_download.go index 5e1781b..d10970a 100644 --- a/api/pubsub_download.go +++ b/api/pubsub_download.go @@ -7,6 +7,7 @@ import ( "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" + "github.com/stephenafamo/bob" "github.com/tigorlazuardi/redmage/models" "github.com/tigorlazuardi/redmage/pkg/errs" "github.com/tigorlazuardi/redmage/pkg/log" @@ -22,25 +23,11 @@ func (api *API) StartSubredditDownloadPubsub(messages <-chan *message.Message) { log.New(context.Background()).Err(err).Error("failed to unmarshal json for download pubsub", "topic", downloadTopic) return } - ctx := context.Background() - if _, err := api.ScheduleSet(ctx, ScheduleSetParams{ - Subreddit: subreddit.Name, - Status: ScheduleStatusEnqueued, - }); err != nil { - log.New(ctx).Err(err).Error("failed to set schedule status", "subreddit", subreddit.Name, "status", ScheduleStatusDownloading.String()) - } log.New(context.Background()).Debug("received pubsub message", - "message", msg, - "len", len(api.subredditSemaphore), - "cap", cap(api.subredditSemaphore), - "download.concurrency.subreddits", api.config.Int("download.concurrency.subreddits"), + "message", string(msg.Payload), ) - api.subredditSemaphore <- struct{}{} - go func(msg *message.Message, subreddit *models.Subreddit) { - defer func() { - msg.Ack() - <-api.subredditSemaphore - }() + func(msg *message.Message, subreddit *models.Subreddit) { + defer msg.Ack() var err error ctx, span := tracer.Start(context.Background(), "Download Subreddit Pubsub") defer func() { @@ -102,12 +89,26 @@ func (api *API) PubsubStartDownloadSubreddit(ctx context.Context, params PubsubS return errs.Wrapw(err, "failed to verify subreddit existence", "params", params) } - payload, _ := json.Marshal(subreddit) + err = api.withTransaction(ctx, func(exec bob.Executor) error { + _, err := api.scheduleSet(ctx, exec, ScheduleSetParams{ + Subreddit: subreddit.Name, + Status: ScheduleStatusEnqueued, + }) + if err != nil { + return err + } - err = api.publisher.Publish(downloadTopic, message.NewMessage(watermill.NewUUID(), payload)) - if err != nil { - return errs.Wrapw(err, "failed to enqueue reddit download", "params", params) - } + payload, err := json.Marshal(subreddit) + if err != nil { + return errs.Wrapw(err, "failed to marshal subreddit") + } + + err = api.publisher.Publish(downloadTopic, message.NewMessage(watermill.NewUUID(), payload)) + if err != nil { + return errs.Wrapw(err, "failed to enqueue reddit download", "params", params) + } + return nil + }) return nil } diff --git a/api/schedule_set.go b/api/schedule_set.go index 9c79aa9..1451d2f 100644 --- a/api/schedule_set.go +++ b/api/schedule_set.go @@ -13,9 +13,9 @@ type ScheduleStatus int8 const ( ScheduleStatusDisabled ScheduleStatus = iota ScheduleStatusStandby + ScheduleStatusError ScheduleStatusEnqueued ScheduleStatusDownloading - ScheduleStatusError ) func (ss ScheduleStatus) String() string { @@ -49,19 +49,27 @@ func (api *API) ScheduleSet(ctx context.Context, params ScheduleSetParams) (sche defer span.End() errTx := api.withTransaction(ctx, func(exec bob.Executor) error { - schedule, err = api.ScheduleStatusUpsert(ctx, params) - if err != nil { - return errs.Wrapw(err, "failed to set schedule status", "params", params) - } - - _, err = api.ScheduleHistoryInsert(ctx, params) - if err != nil { - return errs.Wrapw(err, "failed to insert schedule history", "params", params) - } - - // TODO: Create cron job schedule rebalancing - return nil + schedule, err = api.scheduleSet(ctx, exec, params) + return err }) return schedule, errTx } + +func (api *API) scheduleSet(ctx context.Context, exec bob.Executor, params ScheduleSetParams) (schedule *models.ScheduleStatus, err error) { + ctx, span := tracer.Start(ctx, "*API.scheduleSet") + defer span.End() + + schedule, err = api.scheduleStatusUpsert(ctx, exec, params) + if err != nil { + return schedule, errs.Wrapw(err, "failed to set schedule status", "params", params) + } + + _, err = api.scheduleHistoryInsert(ctx, exec, params) + if err != nil { + return schedule, errs.Wrapw(err, "failed to insert schedule history", "params", params) + } + + // TODO: Create cron job schedule rebalancing if schedule is set to disabled. + return schedule, nil +} diff --git a/db/migrations/20240503212902_create_table_schedule_history.sql b/db/migrations/20240503212902_create_table_schedule_history.sql index 3ecdab4..1267785 100644 --- a/db/migrations/20240503212902_create_table_schedule_history.sql +++ b/db/migrations/20240503212902_create_table_schedule_history.sql @@ -1,7 +1,7 @@ -- +goose Up -- +goose StatementBegin CREATE TABLE schedule_histories( - id INTEGER PRIMARY KEY, + id INTEGER PRIMARY KEY AUTOINCREMENT, subreddit VARCHAR(255) NOT NULL, status TINYINT NOT NULL DEFAULT 0, error_message VARCHAR(255) NOT NULL DEFAULT '',