api: enhance the schedule status sets

This commit is contained in:
Tigor Hutasuhut 2024-05-04 09:53:27 +07:00
parent 7333e09e49
commit fea186924c
3 changed files with 45 additions and 36 deletions

View file

@ -7,6 +7,7 @@ import (
"github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message" "github.com/ThreeDotsLabs/watermill/message"
"github.com/stephenafamo/bob"
"github.com/tigorlazuardi/redmage/models" "github.com/tigorlazuardi/redmage/models"
"github.com/tigorlazuardi/redmage/pkg/errs" "github.com/tigorlazuardi/redmage/pkg/errs"
"github.com/tigorlazuardi/redmage/pkg/log" "github.com/tigorlazuardi/redmage/pkg/log"
@ -22,25 +23,11 @@ func (api *API) StartSubredditDownloadPubsub(messages <-chan *message.Message) {
log.New(context.Background()).Err(err).Error("failed to unmarshal json for download pubsub", "topic", downloadTopic) log.New(context.Background()).Err(err).Error("failed to unmarshal json for download pubsub", "topic", downloadTopic)
return return
} }
ctx := context.Background()
if _, err := api.ScheduleSet(ctx, ScheduleSetParams{
Subreddit: subreddit.Name,
Status: ScheduleStatusEnqueued,
}); err != nil {
log.New(ctx).Err(err).Error("failed to set schedule status", "subreddit", subreddit.Name, "status", ScheduleStatusDownloading.String())
}
log.New(context.Background()).Debug("received pubsub message", log.New(context.Background()).Debug("received pubsub message",
"message", msg, "message", string(msg.Payload),
"len", len(api.subredditSemaphore),
"cap", cap(api.subredditSemaphore),
"download.concurrency.subreddits", api.config.Int("download.concurrency.subreddits"),
) )
api.subredditSemaphore <- struct{}{} func(msg *message.Message, subreddit *models.Subreddit) {
go func(msg *message.Message, subreddit *models.Subreddit) { defer msg.Ack()
defer func() {
msg.Ack()
<-api.subredditSemaphore
}()
var err error var err error
ctx, span := tracer.Start(context.Background(), "Download Subreddit Pubsub") ctx, span := tracer.Start(context.Background(), "Download Subreddit Pubsub")
defer func() { defer func() {
@ -102,12 +89,26 @@ func (api *API) PubsubStartDownloadSubreddit(ctx context.Context, params PubsubS
return errs.Wrapw(err, "failed to verify subreddit existence", "params", params) return errs.Wrapw(err, "failed to verify subreddit existence", "params", params)
} }
payload, _ := json.Marshal(subreddit) err = api.withTransaction(ctx, func(exec bob.Executor) error {
_, err := api.scheduleSet(ctx, exec, ScheduleSetParams{
Subreddit: subreddit.Name,
Status: ScheduleStatusEnqueued,
})
if err != nil {
return err
}
err = api.publisher.Publish(downloadTopic, message.NewMessage(watermill.NewUUID(), payload)) payload, err := json.Marshal(subreddit)
if err != nil { if err != nil {
return errs.Wrapw(err, "failed to enqueue reddit download", "params", params) return errs.Wrapw(err, "failed to marshal subreddit")
} }
err = api.publisher.Publish(downloadTopic, message.NewMessage(watermill.NewUUID(), payload))
if err != nil {
return errs.Wrapw(err, "failed to enqueue reddit download", "params", params)
}
return nil
})
return nil return nil
} }

View file

@ -13,9 +13,9 @@ type ScheduleStatus int8
const ( const (
ScheduleStatusDisabled ScheduleStatus = iota ScheduleStatusDisabled ScheduleStatus = iota
ScheduleStatusStandby ScheduleStatusStandby
ScheduleStatusError
ScheduleStatusEnqueued ScheduleStatusEnqueued
ScheduleStatusDownloading ScheduleStatusDownloading
ScheduleStatusError
) )
func (ss ScheduleStatus) String() string { func (ss ScheduleStatus) String() string {
@ -49,19 +49,27 @@ func (api *API) ScheduleSet(ctx context.Context, params ScheduleSetParams) (sche
defer span.End() defer span.End()
errTx := api.withTransaction(ctx, func(exec bob.Executor) error { errTx := api.withTransaction(ctx, func(exec bob.Executor) error {
schedule, err = api.ScheduleStatusUpsert(ctx, params) schedule, err = api.scheduleSet(ctx, exec, params)
if err != nil { return err
return errs.Wrapw(err, "failed to set schedule status", "params", params)
}
_, err = api.ScheduleHistoryInsert(ctx, params)
if err != nil {
return errs.Wrapw(err, "failed to insert schedule history", "params", params)
}
// TODO: Create cron job schedule rebalancing
return nil
}) })
return schedule, errTx return schedule, errTx
} }
func (api *API) scheduleSet(ctx context.Context, exec bob.Executor, params ScheduleSetParams) (schedule *models.ScheduleStatus, err error) {
ctx, span := tracer.Start(ctx, "*API.scheduleSet")
defer span.End()
schedule, err = api.scheduleStatusUpsert(ctx, exec, params)
if err != nil {
return schedule, errs.Wrapw(err, "failed to set schedule status", "params", params)
}
_, err = api.scheduleHistoryInsert(ctx, exec, params)
if err != nil {
return schedule, errs.Wrapw(err, "failed to insert schedule history", "params", params)
}
// TODO: Create cron job schedule rebalancing if schedule is set to disabled.
return schedule, nil
}

View file

@ -1,7 +1,7 @@
-- +goose Up -- +goose Up
-- +goose StatementBegin -- +goose StatementBegin
CREATE TABLE schedule_histories( CREATE TABLE schedule_histories(
id INTEGER PRIMARY KEY, id INTEGER PRIMARY KEY AUTOINCREMENT,
subreddit VARCHAR(255) NOT NULL, subreddit VARCHAR(255) NOT NULL,
status TINYINT NOT NULL DEFAULT 0, status TINYINT NOT NULL DEFAULT 0,
error_message VARCHAR(255) NOT NULL DEFAULT '', error_message VARCHAR(255) NOT NULL DEFAULT '',