Redmage/api/api.go

131 lines
3.5 KiB
Go
Raw Normal View History

2024-04-08 23:18:09 +07:00
package api
import (
2024-04-09 21:49:23 +07:00
"context"
2024-04-08 23:18:09 +07:00
"database/sql"
2024-04-09 21:49:23 +07:00
"fmt"
2024-04-25 12:31:20 +07:00
"os"
2024-04-25 13:05:41 +07:00
"os/signal"
2024-04-08 23:18:09 +07:00
2024-04-09 21:49:23 +07:00
"github.com/robfig/cron/v3"
2024-04-24 13:01:13 +07:00
"github.com/stephenafamo/bob"
2024-04-09 21:49:23 +07:00
"github.com/teivah/broadcast"
2024-04-10 22:38:19 +07:00
"github.com/tigorlazuardi/redmage/api/bmessage"
2024-04-14 00:32:55 +07:00
"github.com/tigorlazuardi/redmage/api/reddit"
"github.com/tigorlazuardi/redmage/config"
2024-04-25 12:31:20 +07:00
"github.com/tigorlazuardi/redmage/models"
2024-04-09 21:49:23 +07:00
"github.com/tigorlazuardi/redmage/pkg/errs"
"github.com/tigorlazuardi/redmage/pkg/log"
2024-04-25 12:31:20 +07:00
2024-04-25 13:52:38 +07:00
"github.com/ThreeDotsLabs/watermill"
2024-04-25 12:31:20 +07:00
watermillSql "github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql"
"github.com/ThreeDotsLabs/watermill/message"
2024-04-25 13:05:41 +07:00
watermillSqlite "github.com/walterwanderley/watermill-sqlite"
2024-04-08 23:18:09 +07:00
)
type API struct {
2024-04-25 13:28:35 +07:00
db bob.Executor
2024-04-09 21:49:23 +07:00
scheduler *cron.Cron
2024-04-25 12:31:20 +07:00
scheduleMap map[cron.EntryID]*models.Subreddit
2024-04-09 21:49:23 +07:00
2024-04-11 16:04:13 +07:00
downloadBroadcast *broadcast.Relay[bmessage.ImageDownloadMessage]
config *config.Config
2024-04-14 00:32:55 +07:00
imageSemaphore chan struct{}
subredditSemaphore chan struct{}
reddit *reddit.Reddit
2024-04-25 12:31:20 +07:00
subscriber message.Subscriber
publisher message.Publisher
2024-04-14 00:32:55 +07:00
}
type Dependencies struct {
2024-04-25 13:05:41 +07:00
DB *sql.DB
PubsubDB *sql.DB
Config *config.Config
Reddit *reddit.Reddit
2024-04-09 21:49:23 +07:00
}
2024-04-25 13:05:41 +07:00
const downloadTopic = "subreddit_download"
2024-04-25 13:22:08 +07:00
var watermillLogger = &log.WatermillLogger{}
2024-04-25 12:31:20 +07:00
2024-04-14 00:32:55 +07:00
func New(deps Dependencies) *API {
2024-04-25 12:31:20 +07:00
ackDeadline := deps.Config.Duration("download.pubsub.ack.deadline")
2024-04-25 13:05:41 +07:00
subscriber, err := watermillSql.NewSubscriber(deps.PubsubDB, watermillSql.SubscriberConfig{
2024-04-25 12:31:20 +07:00
AckDeadline: &ackDeadline,
2024-04-25 13:05:41 +07:00
SchemaAdapter: watermillSqlite.DefaultSQLiteSchema{},
OffsetsAdapter: watermillSqlite.DefaultSQLiteOffsetsAdapter{},
2024-04-25 12:31:20 +07:00
InitializeSchema: true,
2024-04-25 13:05:41 +07:00
}, watermillLogger)
2024-04-25 12:31:20 +07:00
if err != nil {
panic(err)
}
2024-04-25 13:05:41 +07:00
publisher, err := watermillSql.NewPublisher(deps.PubsubDB, watermillSql.PublisherConfig{
SchemaAdapter: watermillSqlite.DefaultSQLiteSchema{},
2024-04-25 12:31:20 +07:00
AutoInitializeSchema: true,
2024-04-25 13:05:41 +07:00
}, watermillLogger)
2024-04-25 12:31:20 +07:00
if err != nil {
panic(err)
}
2024-04-25 13:05:41 +07:00
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
ch, err := subscriber.Subscribe(ctx, downloadTopic)
if err != nil {
panic(err)
}
api := &API{
2024-04-25 13:28:35 +07:00
db: bob.New(deps.DB),
2024-04-14 00:32:55 +07:00
scheduler: cron.New(),
2024-04-25 12:31:20 +07:00
scheduleMap: make(map[cron.EntryID]*models.Subreddit, 8),
2024-04-14 00:32:55 +07:00
downloadBroadcast: broadcast.NewRelay[bmessage.ImageDownloadMessage](),
config: deps.Config,
imageSemaphore: make(chan struct{}, deps.Config.Int("download.concurrency.images")),
subredditSemaphore: make(chan struct{}, deps.Config.Int("download.concurrency.subreddits")),
reddit: deps.Reddit,
2024-04-25 12:31:20 +07:00
subscriber: subscriber,
publisher: publisher,
2024-04-09 21:49:23 +07:00
}
2024-04-25 13:05:41 +07:00
2024-04-25 13:52:38 +07:00
if err := api.StartScheduler(ctx); err != nil {
panic(err)
}
api.StartSubredditDownloadPubsub(ch)
2024-04-25 13:05:41 +07:00
return api
2024-04-09 21:49:23 +07:00
}
func (api *API) StartScheduler(ctx context.Context) error {
2024-04-25 13:52:38 +07:00
subreddits, err := models.Subreddits.Query(ctx, api.db, models.SelectWhere.Subreddits.Enable.EQ(1)).All()
2024-04-09 21:49:23 +07:00
if err != nil {
return errs.Wrapw(err, "failed to get all subreddits")
}
for _, subreddit := range subreddits {
2024-04-12 01:32:06 +07:00
err := api.scheduleSubreddit(subreddit)
2024-04-09 21:49:23 +07:00
if err != nil {
2024-04-12 01:32:06 +07:00
log.New(ctx).Err(err).Error(
fmt.Sprintf("failed to start scheduler for subreddit '%s'", subreddit.Name),
"subreddit", subreddit,
)
2024-04-09 21:49:23 +07:00
continue
}
}
return nil
2024-04-08 23:18:09 +07:00
}
2024-04-12 01:32:06 +07:00
2024-04-25 12:31:20 +07:00
func (api *API) scheduleSubreddit(subreddit *models.Subreddit) error {
2024-04-12 01:32:06 +07:00
id, err := api.scheduler.AddFunc(subreddit.Schedule, func() {
2024-04-25 13:52:38 +07:00
_ = api.publisher.Publish(downloadTopic, message.NewMessage(watermill.NewUUID(), []byte(subreddit.Name)))
2024-04-12 01:32:06 +07:00
})
if err != nil {
return errs.Wrap(err)
}
api.scheduleMap[id] = subreddit
return nil
}