From 64c3d57a6199daf9820f3b43fc228042fd6ab16b Mon Sep 17 00:00:00 2001 From: Tigor Hutasuhut Date: Fri, 3 May 2024 23:40:05 +0700 Subject: [PATCH] schedule: add schedule status and history tables --- api/api.go | 4 +- api/pubsub_download.go | 50 +++++++++++--- api/schedule_history_insert.go | 36 ++++++++++ api/schedule_set.go | 67 +++++++++++++++++++ api/schedule_status_upsert.go | 39 +++++++++++ api/subreddits_create.go | 8 +++ api/transaction.go | 25 +++++++ ...503212313_create_table_schedule_status.sql | 22 ++++++ ...03212902_create_table_schedule_history.sql | 23 +++++++ 9 files changed, 262 insertions(+), 12 deletions(-) create mode 100644 api/schedule_history_insert.go create mode 100644 api/schedule_set.go create mode 100644 api/schedule_status_upsert.go create mode 100644 api/transaction.go create mode 100644 db/migrations/20240503212313_create_table_schedule_status.sql create mode 100644 db/migrations/20240503212902_create_table_schedule_history.sql diff --git a/api/api.go b/api/api.go index a5452d4..f26e54f 100644 --- a/api/api.go +++ b/api/api.go @@ -21,7 +21,8 @@ import ( ) type API struct { - db bob.Executor + db bob.Executor + sqldb *sql.DB scheduler *cron.Cron scheduleMap map[cron.EntryID]*models.Subreddit @@ -56,6 +57,7 @@ func New(deps Dependencies) *API { } api := &API{ db: bob.New(deps.DB), + sqldb: deps.DB, scheduler: cron.New(), scheduleMap: make(map[cron.EntryID]*models.Subreddit, 8), downloadBroadcast: broadcast.NewRelay[bmessage.ImageDownloadMessage](), diff --git a/api/pubsub_download.go b/api/pubsub_download.go index 28e106b..5e1781b 100644 --- a/api/pubsub_download.go +++ b/api/pubsub_download.go @@ -17,6 +17,18 @@ import ( func (api *API) StartSubredditDownloadPubsub(messages <-chan *message.Message) { for msg := range messages { + var subreddit *models.Subreddit + if err := json.Unmarshal(msg.Payload, &subreddit); err != nil { + log.New(context.Background()).Err(err).Error("failed to unmarshal json for download pubsub", "topic", downloadTopic) + return + } + ctx := context.Background() + if _, err := api.ScheduleSet(ctx, ScheduleSetParams{ + Subreddit: subreddit.Name, + Status: ScheduleStatusEnqueued, + }); err != nil { + log.New(ctx).Err(err).Error("failed to set schedule status", "subreddit", subreddit.Name, "status", ScheduleStatusDownloading.String()) + } log.New(context.Background()).Debug("received pubsub message", "message", msg, "len", len(api.subredditSemaphore), @@ -24,23 +36,39 @@ func (api *API) StartSubredditDownloadPubsub(messages <-chan *message.Message) { "download.concurrency.subreddits", api.config.Int("download.concurrency.subreddits"), ) api.subredditSemaphore <- struct{}{} - go func(msg *message.Message) { + go func(msg *message.Message, subreddit *models.Subreddit) { defer func() { msg.Ack() <-api.subredditSemaphore }() - var ( - err error - subreddit *models.Subreddit - ) + var err error ctx, span := tracer.Start(context.Background(), "Download Subreddit Pubsub") - defer func() { telemetry.EndWithStatus(span, err) }() + defer func() { + if err != nil { + if _, err := api.ScheduleSet(ctx, ScheduleSetParams{ + Subreddit: subreddit.Name, + Status: ScheduleStatusError, + ErrorMessage: err.Error(), + }); err != nil { + log.New(ctx).Err(err).Error("failed to set schedule status", "subreddit", subreddit.Name, "status", ScheduleStatusError.String()) + } + } else { + if _, err := api.ScheduleSet(ctx, ScheduleSetParams{ + Subreddit: subreddit.Name, + Status: ScheduleStatusStandby, + }); err != nil { + log.New(ctx).Err(err).Error("failed to set schedule status", "subreddit", subreddit.Name, "status", ScheduleStatusStandby.String()) + } + } + telemetry.EndWithStatus(span, err) + }() span.AddEvent("pubsub." + downloadTopic) - - err = json.Unmarshal(msg.Payload, &subreddit) + _, err = api.ScheduleSet(ctx, ScheduleSetParams{ + Subreddit: subreddit.Name, + Status: ScheduleStatusDownloading, + }) if err != nil { - log.New(ctx).Err(err).Error("failed to unmarshal json for download pubsub", "topic", downloadTopic) - return + log.New(ctx).Err(err).Error("failed to set schedule status", "subreddit", subreddit.Name, "status", ScheduleStatusDownloading.String()) } devices, err := models.Devices.Query(ctx, api.db).All() @@ -54,7 +82,7 @@ func (api *API) StartSubredditDownloadPubsub(messages <-chan *message.Message) { log.New(ctx).Err(err).Error("failed to download subreddit images", "subreddit", subreddit) return } - }(msg) + }(msg, subreddit) } } diff --git a/api/schedule_history_insert.go b/api/schedule_history_insert.go new file mode 100644 index 0000000..0267acd --- /dev/null +++ b/api/schedule_history_insert.go @@ -0,0 +1,36 @@ +package api + +import ( + "context" + "time" + + "github.com/aarondl/opt/omit" + "github.com/stephenafamo/bob" + "github.com/tigorlazuardi/redmage/models" + "github.com/tigorlazuardi/redmage/pkg/errs" +) + +func (api *API) ScheduleHistoryInsert(ctx context.Context, params ScheduleSetParams) (history *models.ScheduleHistory, err error) { + ctx, span := tracer.Start(ctx, "*API.ScheduleHistoryInsert") + defer span.End() + + return api.scheduleHistoryInsert(ctx, api.db, params) +} + +func (api *API) scheduleHistoryInsert(ctx context.Context, exec bob.Executor, params ScheduleSetParams) (history *models.ScheduleHistory, err error) { + ctx, span := tracer.Start(ctx, "*API.scheduleHistoryInsert") + defer span.End() + + now := time.Now() + + history, err = models.ScheduleHistories.Insert(ctx, exec, &models.ScheduleHistorySetter{ + Subreddit: omit.FromCond(params.Subreddit, params.Subreddit != ""), + Status: omit.From(params.Status.Int8()), + ErrorMessage: omit.FromCond(params.ErrorMessage, params.Status == ScheduleStatusError), + CreatedAt: omit.From(now.Unix()), + }) + if err != nil { + return history, errs.Wrapw(err, "failed to insert schedule history", "params", params) + } + return history, err +} diff --git a/api/schedule_set.go b/api/schedule_set.go new file mode 100644 index 0000000..9c79aa9 --- /dev/null +++ b/api/schedule_set.go @@ -0,0 +1,67 @@ +package api + +import ( + "context" + + "github.com/stephenafamo/bob" + "github.com/tigorlazuardi/redmage/models" + "github.com/tigorlazuardi/redmage/pkg/errs" +) + +type ScheduleStatus int8 + +const ( + ScheduleStatusDisabled ScheduleStatus = iota + ScheduleStatusStandby + ScheduleStatusEnqueued + ScheduleStatusDownloading + ScheduleStatusError +) + +func (ss ScheduleStatus) String() string { + switch ss { + case ScheduleStatusDisabled: + return "Disabled" + case ScheduleStatusStandby: + return "Standby" + case ScheduleStatusEnqueued: + return "Enqueued" + case ScheduleStatusDownloading: + return "Downloading" + case ScheduleStatusError: + return "Error" + } + return "Unknown" +} + +func (ss ScheduleStatus) Int8() int8 { + return int8(ss) +} + +type ScheduleSetParams struct { + Subreddit string + Status ScheduleStatus + ErrorMessage string +} + +func (api *API) ScheduleSet(ctx context.Context, params ScheduleSetParams) (schedule *models.ScheduleStatus, err error) { + ctx, span := tracer.Start(ctx, "*API.ScheduleSet") + defer span.End() + + errTx := api.withTransaction(ctx, func(exec bob.Executor) error { + schedule, err = api.ScheduleStatusUpsert(ctx, params) + if err != nil { + return errs.Wrapw(err, "failed to set schedule status", "params", params) + } + + _, err = api.ScheduleHistoryInsert(ctx, params) + if err != nil { + return errs.Wrapw(err, "failed to insert schedule history", "params", params) + } + + // TODO: Create cron job schedule rebalancing + return nil + }) + + return schedule, errTx +} diff --git a/api/schedule_status_upsert.go b/api/schedule_status_upsert.go new file mode 100644 index 0000000..b64f45d --- /dev/null +++ b/api/schedule_status_upsert.go @@ -0,0 +1,39 @@ +package api + +import ( + "context" + "time" + + "github.com/aarondl/opt/omit" + "github.com/stephenafamo/bob" + "github.com/tigorlazuardi/redmage/models" + "github.com/tigorlazuardi/redmage/pkg/errs" +) + +func (api *API) ScheduleStatusUpsert(ctx context.Context, params ScheduleSetParams) (schedule *models.ScheduleStatus, err error) { + ctx, span := tracer.Start(ctx, "*API.CreateNewScheduleStatus") + defer span.End() + return api.scheduleStatusUpsert(ctx, api.db, params) +} + +func (api *API) scheduleStatusUpsert(ctx context.Context, exec bob.Executor, params ScheduleSetParams) (schedule *models.ScheduleStatus, err error) { + ctx, span := tracer.Start(ctx, "*API.createNewScheduleStatus") + defer span.End() + now := time.Now() + ss, err := models.ScheduleStatuses.Upsert(ctx, exec, true, []string{"subreddit"}, []string{ + "subreddit", + "status", + "error_message", + "updated_at", + }, &models.ScheduleStatusSetter{ + Subreddit: omit.FromCond(params.Subreddit, params.Subreddit != ""), + Status: omit.From(params.Status.Int8()), + ErrorMessage: omit.From(params.ErrorMessage), + CreatedAt: omit.From(now.Unix()), + UpdatedAt: omit.From(now.Unix()), + }) + if err != nil { + return ss, errs.Wrapw(err, "failed to upsert schedule status", "params", params) + } + return ss, err +} diff --git a/api/subreddits_create.go b/api/subreddits_create.go index 538e9d6..b1ba41a 100644 --- a/api/subreddits_create.go +++ b/api/subreddits_create.go @@ -38,5 +38,13 @@ func (api *API) SubredditsCreate(ctx context.Context, params *models.Subreddit) } } + _, err = api.ScheduleSet(ctx, ScheduleSetParams{ + Subreddit: subreddit.Name, + Status: ScheduleStatus(params.EnableSchedule), // Possible value should only be 0 or 1 + }) + if err != nil { + return subreddit, errs.Wrapw(err, "failed to set schedule status") + } + return subreddit, nil } diff --git a/api/transaction.go b/api/transaction.go new file mode 100644 index 0000000..c1342f2 --- /dev/null +++ b/api/transaction.go @@ -0,0 +1,25 @@ +package api + +import ( + "context" + + "github.com/stephenafamo/bob" + "github.com/tigorlazuardi/redmage/pkg/errs" +) + +type executor func(exec bob.Executor) error + +func (api *API) withTransaction(ctx context.Context, f executor) (err error) { + tx, err := api.sqldb.BeginTx(ctx, nil) + if err != nil { + return errs.Wrapw(err, "failed to begin transaction") + } + + exec := bob.New(tx) + err = f(exec) + if err != nil { + _ = tx.Rollback() + return err + } + return tx.Commit() +} diff --git a/db/migrations/20240503212313_create_table_schedule_status.sql b/db/migrations/20240503212313_create_table_schedule_status.sql new file mode 100644 index 0000000..f59b6f3 --- /dev/null +++ b/db/migrations/20240503212313_create_table_schedule_status.sql @@ -0,0 +1,22 @@ +-- +goose Up +-- +goose StatementBegin +CREATE TABLE schedule_status( + id INTEGER PRIMARY KEY, + subreddit VARCHAR(255) NOT NULL, + status TINYINT NOT NULL DEFAULT 0, + error_message VARCHAR(255) NOT NULL DEFAULT '', + created_at BIGINT DEFAULT 0 NOT NULL, + updated_at BIGINT DEFAULT 0 NOT NULL, + CONSTRAINT fk_scheduler_status_subreddit + FOREIGN KEY (subreddit) + REFERENCES subreddits(name) + ON DELETE CASCADE +); + +CREATE UNIQUE INDEX idx_unique_schedule_status_per_subreddit ON schedule_status(subreddit); +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +DROP TABLE IF EXISTS schedule_status; +-- +goose StatementEnd diff --git a/db/migrations/20240503212902_create_table_schedule_history.sql b/db/migrations/20240503212902_create_table_schedule_history.sql new file mode 100644 index 0000000..3ecdab4 --- /dev/null +++ b/db/migrations/20240503212902_create_table_schedule_history.sql @@ -0,0 +1,23 @@ +-- +goose Up +-- +goose StatementBegin +CREATE TABLE schedule_histories( + id INTEGER PRIMARY KEY, + subreddit VARCHAR(255) NOT NULL, + status TINYINT NOT NULL DEFAULT 0, + error_message VARCHAR(255) NOT NULL DEFAULT '', + created_at BIGINT DEFAULT 0 NOT NULL, + CONSTRAINT fk_scheduler_histories_subreddit + FOREIGN KEY (subreddit) + REFERENCES subreddits(name) + ON DELETE CASCADE +); + +CREATE INDEX idx_schedule_histories_subreddit_created_at ON schedule_histories(subreddit, created_at DESC); +CREATE INDEX idx_schedule_histories_created_at ON schedule_histories(created_at DESC); +-- +goose StatementEnd + + +-- +goose Down +-- +goose StatementBegin +DROP TABLE IF EXISTS schedule_histories; +-- +goose StatementEnd