2024-04-25 12:31:20 +07:00
|
|
|
package api
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2024-04-25 20:22:05 +07:00
|
|
|
"encoding/json"
|
|
|
|
"net/http"
|
2024-04-25 12:31:20 +07:00
|
|
|
|
2024-04-25 20:22:05 +07:00
|
|
|
"github.com/ThreeDotsLabs/watermill"
|
2024-04-25 12:31:20 +07:00
|
|
|
"github.com/ThreeDotsLabs/watermill/message"
|
|
|
|
"github.com/tigorlazuardi/redmage/api/reddit"
|
|
|
|
"github.com/tigorlazuardi/redmage/models"
|
2024-04-25 20:22:05 +07:00
|
|
|
"github.com/tigorlazuardi/redmage/pkg/errs"
|
2024-04-25 12:31:20 +07:00
|
|
|
"github.com/tigorlazuardi/redmage/pkg/log"
|
|
|
|
"github.com/tigorlazuardi/redmage/pkg/telemetry"
|
|
|
|
"go.opentelemetry.io/otel/attribute"
|
2024-04-25 20:22:05 +07:00
|
|
|
"go.opentelemetry.io/otel/trace"
|
2024-04-25 12:31:20 +07:00
|
|
|
)
|
|
|
|
|
2024-04-25 13:52:38 +07:00
|
|
|
func (api *API) StartSubredditDownloadPubsub(messages <-chan *message.Message) {
|
2024-04-25 12:31:20 +07:00
|
|
|
for msg := range messages {
|
|
|
|
api.subredditSemaphore <- struct{}{}
|
|
|
|
go func(msg *message.Message) {
|
|
|
|
defer func() {
|
|
|
|
msg.Ack()
|
|
|
|
<-api.subredditSemaphore
|
|
|
|
}()
|
2024-04-25 20:22:05 +07:00
|
|
|
var (
|
|
|
|
err error
|
|
|
|
subreddit *models.Subreddit
|
|
|
|
)
|
2024-04-25 12:31:20 +07:00
|
|
|
ctx, span := tracer.Start(context.Background(), "Download Subreddit Pubsub")
|
|
|
|
defer func() { telemetry.EndWithStatus(span, err) }()
|
|
|
|
span.AddEvent("pubsub." + downloadTopic)
|
|
|
|
|
2024-04-25 20:22:05 +07:00
|
|
|
err = json.Unmarshal(msg.Payload, &subreddit)
|
2024-04-25 12:31:20 +07:00
|
|
|
if err != nil {
|
2024-04-25 20:22:05 +07:00
|
|
|
log.New(ctx).Err(err).Error("failed to unmarshal json for download pubsub", "topic", downloadTopic)
|
2024-04-25 12:31:20 +07:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2024-04-25 13:28:35 +07:00
|
|
|
devices, err := models.Devices.Query(ctx, api.db).All()
|
2024-04-25 12:31:20 +07:00
|
|
|
if err != nil {
|
|
|
|
log.New(ctx).Err(err).Error("failed to query devices")
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2024-04-25 20:22:05 +07:00
|
|
|
err = api.DownloadSubredditImages(ctx, subreddit.Name, DownloadSubredditParams{
|
2024-04-25 12:31:20 +07:00
|
|
|
Countback: int(subreddit.Countback),
|
|
|
|
Devices: devices,
|
|
|
|
SubredditType: reddit.SubredditType(subreddit.Subtype),
|
|
|
|
})
|
|
|
|
if err != nil {
|
2024-04-25 20:22:05 +07:00
|
|
|
log.New(ctx).Err(err).Error("failed to download subreddit images", "subreddit", subreddit)
|
2024-04-25 12:31:20 +07:00
|
|
|
return
|
|
|
|
}
|
|
|
|
}(msg)
|
|
|
|
}
|
|
|
|
}
|
2024-04-25 20:22:05 +07:00
|
|
|
|
|
|
|
type PubsubStartDownloadSubredditParams struct {
|
|
|
|
Subreddit string `json:"subreddit"`
|
|
|
|
}
|
|
|
|
|
|
|
|
func (api *API) PubsubStartDownloadSubreddit(ctx context.Context, params PubsubStartDownloadSubredditParams) (err error) {
|
|
|
|
ctx, span := tracer.Start(ctx, "*API.PubsubStartDownloadSubreddit", trace.WithAttributes(attribute.String("subreddit", params.Subreddit)))
|
|
|
|
defer span.End()
|
|
|
|
|
|
|
|
subreddit, err := models.Subreddits.Query(ctx, api.db, models.SelectWhere.Subreddits.Name.EQ(params.Subreddit)).One()
|
|
|
|
if err != nil {
|
|
|
|
if err.Error() == "sql: no rows in result set" {
|
|
|
|
return errs.Wrapw(err, "subreddit not found", "params", params).Code(http.StatusNotFound)
|
|
|
|
}
|
|
|
|
return errs.Wrapw(err, "failed to verify subreddit existence", "params", params)
|
|
|
|
}
|
|
|
|
|
|
|
|
payload, _ := json.Marshal(subreddit)
|
|
|
|
|
|
|
|
err = api.publisher.Publish(downloadTopic, message.NewMessage(watermill.NewUUID(), payload))
|
|
|
|
if err != nil {
|
|
|
|
return errs.Wrapw(err, "failed to enqueue reddit download", "params", params)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|