Redmage/api/pubsub_download.go

114 lines
3.9 KiB
Go
Raw Normal View History

2024-04-25 12:31:20 +07:00
package api
import (
"context"
2024-04-25 20:22:05 +07:00
"encoding/json"
"net/http"
2024-04-25 12:31:20 +07:00
2024-04-25 20:22:05 +07:00
"github.com/ThreeDotsLabs/watermill"
2024-04-25 12:31:20 +07:00
"github.com/ThreeDotsLabs/watermill/message"
"github.com/tigorlazuardi/redmage/models"
2024-04-25 20:22:05 +07:00
"github.com/tigorlazuardi/redmage/pkg/errs"
2024-04-25 12:31:20 +07:00
"github.com/tigorlazuardi/redmage/pkg/log"
"github.com/tigorlazuardi/redmage/pkg/telemetry"
"go.opentelemetry.io/otel/attribute"
2024-04-25 20:22:05 +07:00
"go.opentelemetry.io/otel/trace"
2024-04-25 12:31:20 +07:00
)
2024-04-25 13:52:38 +07:00
func (api *API) StartSubredditDownloadPubsub(messages <-chan *message.Message) {
2024-04-25 12:31:20 +07:00
for msg := range messages {
var subreddit *models.Subreddit
if err := json.Unmarshal(msg.Payload, &subreddit); err != nil {
log.New(context.Background()).Err(err).Error("failed to unmarshal json for download pubsub", "topic", downloadTopic)
return
}
ctx := context.Background()
if _, err := api.ScheduleSet(ctx, ScheduleSetParams{
Subreddit: subreddit.Name,
Status: ScheduleStatusEnqueued,
}); err != nil {
log.New(ctx).Err(err).Error("failed to set schedule status", "subreddit", subreddit.Name, "status", ScheduleStatusDownloading.String())
}
log.New(context.Background()).Debug("received pubsub message",
2024-04-26 22:13:04 +07:00
"message", msg,
"len", len(api.subredditSemaphore),
"cap", cap(api.subredditSemaphore),
"download.concurrency.subreddits", api.config.Int("download.concurrency.subreddits"),
2024-04-26 22:13:04 +07:00
)
2024-04-25 12:31:20 +07:00
api.subredditSemaphore <- struct{}{}
go func(msg *message.Message, subreddit *models.Subreddit) {
2024-04-25 12:31:20 +07:00
defer func() {
msg.Ack()
<-api.subredditSemaphore
}()
var err error
2024-04-25 12:31:20 +07:00
ctx, span := tracer.Start(context.Background(), "Download Subreddit Pubsub")
defer func() {
if err != nil {
if _, err := api.ScheduleSet(ctx, ScheduleSetParams{
Subreddit: subreddit.Name,
Status: ScheduleStatusError,
ErrorMessage: err.Error(),
}); err != nil {
log.New(ctx).Err(err).Error("failed to set schedule status", "subreddit", subreddit.Name, "status", ScheduleStatusError.String())
}
} else {
if _, err := api.ScheduleSet(ctx, ScheduleSetParams{
Subreddit: subreddit.Name,
Status: ScheduleStatusStandby,
}); err != nil {
log.New(ctx).Err(err).Error("failed to set schedule status", "subreddit", subreddit.Name, "status", ScheduleStatusStandby.String())
}
}
telemetry.EndWithStatus(span, err)
}()
2024-04-25 12:31:20 +07:00
span.AddEvent("pubsub." + downloadTopic)
_, err = api.ScheduleSet(ctx, ScheduleSetParams{
Subreddit: subreddit.Name,
Status: ScheduleStatusDownloading,
})
2024-04-25 12:31:20 +07:00
if err != nil {
log.New(ctx).Err(err).Error("failed to set schedule status", "subreddit", subreddit.Name, "status", ScheduleStatusDownloading.String())
2024-04-25 12:31:20 +07:00
}
2024-04-25 13:28:35 +07:00
devices, err := models.Devices.Query(ctx, api.db).All()
2024-04-25 12:31:20 +07:00
if err != nil {
log.New(ctx).Err(err).Error("failed to query devices")
return
}
err = api.DownloadSubredditImages(ctx, subreddit, devices)
2024-04-25 12:31:20 +07:00
if err != nil {
2024-04-25 20:22:05 +07:00
log.New(ctx).Err(err).Error("failed to download subreddit images", "subreddit", subreddit)
2024-04-25 12:31:20 +07:00
return
}
}(msg, subreddit)
2024-04-25 12:31:20 +07:00
}
}
2024-04-25 20:22:05 +07:00
type PubsubStartDownloadSubredditParams struct {
Subreddit string `json:"subreddit"`
}
func (api *API) PubsubStartDownloadSubreddit(ctx context.Context, params PubsubStartDownloadSubredditParams) (err error) {
ctx, span := tracer.Start(ctx, "*API.PubsubStartDownloadSubreddit", trace.WithAttributes(attribute.String("subreddit", params.Subreddit)))
defer span.End()
subreddit, err := models.Subreddits.Query(ctx, api.db, models.SelectWhere.Subreddits.Name.EQ(params.Subreddit)).One()
if err != nil {
if err.Error() == "sql: no rows in result set" {
return errs.Wrapw(err, "subreddit not registered", "params", params).Code(http.StatusNotFound)
2024-04-25 20:22:05 +07:00
}
return errs.Wrapw(err, "failed to verify subreddit existence", "params", params)
}
payload, _ := json.Marshal(subreddit)
err = api.publisher.Publish(downloadTopic, message.NewMessage(watermill.NewUUID(), payload))
if err != nil {
return errs.Wrapw(err, "failed to enqueue reddit download", "params", params)
}
return nil
}