Compare commits

..

10 commits

Author SHA1 Message Date
Tigor Hutasuhut a648fa745a events: fix filters not handling empty filter query params
Some checks failed
/ push (push) Failing after 13m15s
2024-06-10 23:36:26 +07:00
Tigor Hutasuhut 8e45faba28 wip: update event system 2024-06-10 14:52:53 +07:00
Tigor Hutasuhut d902a2b8c1 wip: update event 2024-06-05 16:18:22 +07:00
Tigor Hutasuhut 0c623d1bf1 schedule-history: if current is latest, fetch more history if not full
Some checks failed
/ push (push) Has been cancelled
2024-06-05 14:20:21 +07:00
Tigor Hutasuhut bc005f8b30 refactor: direction is now revsered boolean 2024-06-05 11:12:20 +07:00
Tigor Hutasuhut f71cda7c92 schedule-history: revamp schedule history page
Some checks failed
/ push (push) Has been cancelled
2024-06-04 21:08:36 +07:00
Tigor Hutasuhut 4ae2b36b4b schedule-history: fix time zone issue in schedule history list by day
Some checks failed
/ push (push) Has been cancelled
2024-06-04 00:30:55 +07:00
Tigor Hutasuhut a3fb38821a fix: api deadlock
Some checks failed
/ push (push) Has been cancelled
2024-06-04 00:24:21 +07:00
Tigor Hutasuhut a5c2b337f4 Revert "fix: database lock by adding lock function when set schedule"
This reverts commit 0c784b4cc9.
2024-06-04 00:23:13 +07:00
Tigor Hutasuhut cb74b0d817 wip: prepare to implement server-sent events for image download progress 2024-06-04 00:15:07 +07:00
23 changed files with 883 additions and 183 deletions

View file

@ -8,6 +8,7 @@ import (
"github.com/stephenafamo/bob"
"github.com/teivah/broadcast"
"github.com/tigorlazuardi/redmage/api/bmessage"
"github.com/tigorlazuardi/redmage/api/events"
"github.com/tigorlazuardi/redmage/api/reddit"
"github.com/tigorlazuardi/redmage/api/scheduler"
"github.com/tigorlazuardi/redmage/config"
@ -24,6 +25,7 @@ type API struct {
scheduler *scheduler.Scheduler
downloadBroadcast *broadcast.Relay[bmessage.ImageDownloadMessage]
eventBroadcast *broadcast.Relay[events.Event]
config *config.Config
@ -57,6 +59,7 @@ func New(deps Dependencies) *API {
db: bob.New(deps.DB),
sqldb: deps.DB,
downloadBroadcast: broadcast.NewRelay[bmessage.ImageDownloadMessage](),
eventBroadcast: broadcast.NewRelay[events.Event](),
config: deps.Config,
imageSemaphore: make(chan struct{}, deps.Config.Int("download.concurrency.images")),
reddit: deps.Reddit,
@ -91,3 +94,7 @@ func (api *API) scheduleRun(subreddit string) {
log.New(ctx).Err(err).Error("api: failed to start download subreddit", "subreddit", subreddit)
}
}
func (api *API) GetEventBroadcaster() *broadcast.Relay[events.Event] {
return api.eventBroadcast
}

18
api/events/events.go Normal file
View file

@ -0,0 +1,18 @@
package events
import (
"io"
"github.com/a-h/templ"
)
type Event interface {
templ.Component
// Event returns the event name
Event() string
// SerializeTo writes the event data to the writer.
//
// SerializeTo must not write multiple linebreaks (single linebreak is fine)
// in succession to the writer since it will mess up SSE events.
SerializeTo(w io.Writer) error
}

View file

@ -0,0 +1,113 @@
package events
import (
"context"
"encoding/json"
"fmt"
"io"
"time"
"github.com/teivah/broadcast"
"github.com/tigorlazuardi/redmage/pkg/errs"
"github.com/tigorlazuardi/redmage/views/components/progress"
)
type ImageDownloadEvent string
const (
ImageDownloadStart ImageDownloadEvent = "image.download.start"
ImageDownloadEnd ImageDownloadEvent = "image.download.end"
ImageDownloadError ImageDownloadEvent = "image.download.error"
ImageDownloadProgress ImageDownloadEvent = "image.download.progress"
)
type ImageDownload struct {
EventKind ImageDownloadEvent `json:"event,omitempty"`
ImageURL string `json:"image_url,omitempty"`
ImageHeight int32 `json:"image_height,omitempty"`
ImageWidth int32 `json:"image_width,omitempty"`
ContentLength int64 `json:"content_length,omitempty"`
Downloaded int64 `json:"downloaded,omitempty"`
Subreddit string `json:"subreddit,omitempty"`
PostURL string `json:"post_url,omitempty"`
PostName string `json:"post_name,omitempty"`
PostTitle string `json:"post_title,omitempty"`
PostCreated int64 `json:"post_created,omitempty"`
PostAuthor string `json:"post_author,omitempty"`
PostAuthorURL string `json:"post_author_url,omitempty"`
ImageRelativePath string `json:"image_relative_path,omitempty"`
ImageOriginalURL string `json:"image_original_url,omitempty"`
ImageSize int64 `json:"image_size,omitempty"`
ThumbnailRelativePath string `json:"thumbnail_relative_path,omitempty"`
NSFW int32 `json:"nsfw,omitempty"`
Error error `json:"error,omitempty"`
Device string `json:"device,omitempty"`
}
// Render the template.
func (im ImageDownload) Render(ctx context.Context, w io.Writer) error {
switch im.EventKind {
case ImageDownloadStart:
return progress.ImageDownloadStartNotification(progress.ImageDownloadStartNotificationData{
ID: fmt.Sprintf("notif-image-download-%s-%s", im.Subreddit, im.PostName),
Subreddit: im.Subreddit,
PostName: im.PostName,
PostTitle: im.PostTitle,
PostURL: im.PostURL,
AutoRemoveDuration: time.Second * 5,
}).Render(ctx, w)
case ImageDownloadEnd:
return progress.ImageDownloadEndNotification(progress.ImageDownloadEndNotificationData{
ID: fmt.Sprintf("notif-image-download-%s-%s", im.Subreddit, im.PostName),
Subreddit: im.Subreddit,
PostURL: im.PostName,
PostName: im.PostTitle,
PostTitle: im.PostURL,
AutoRemoveDuration: time.Second * 5,
}).Render(ctx, w)
case ImageDownloadError:
return progress.ImageDownloadErrorNotification(progress.ImageDownloadErrorNotificationData{
ID: fmt.Sprintf("notif-image-download-%s-%s", im.Subreddit, im.PostName),
Subreddit: im.Subreddit,
PostURL: im.PostName,
PostName: im.PostTitle,
PostTitle: im.PostURL,
Error: im.Error,
AutoRemoveDuration: time.Second * 5,
}).Render(ctx, w)
case ImageDownloadProgress:
return progress.ImageDownloadProgressNotification(progress.ImageDownloadProgressNotificationData{
ID: fmt.Sprintf("notif-image-download-%s-%s", im.Subreddit, im.PostName),
Subreddit: im.Subreddit,
PostURL: im.PostName,
PostName: im.PostTitle,
PostTitle: im.PostURL,
ContentLength: im.ContentLength,
Downloaded: im.Downloaded,
AutoRemoveDuration: time.Second * 5,
}).Render(ctx, w)
default:
return errs.Fail("events.ImageDownload: unknown event kind", "event", im)
}
}
// Event returns the event name
func (im ImageDownload) Event() string {
return string(im.EventKind)
}
// SerializeTo writes the event data to the writer.
//
// SerializeTo must not write multiple linebreaks (single linebreak is fine)
// in succession to the writer since it will mess up SSE events.
func (im ImageDownload) SerializeTo(w io.Writer) error {
return json.NewEncoder(w).Encode(im)
}
func PublishImageDownloadEvent(bc *broadcast.Relay[Event], event ImageDownload) {
bc.Broadcast(event)
bc.Broadcast(ImageDownloadSubreddit{event})
if event.EventKind == ImageDownloadEnd {
bc.Broadcast(ImageDownloadSubredditCard{event})
}
}

View file

@ -0,0 +1,54 @@
package events
import (
"context"
"io"
"time"
"github.com/tigorlazuardi/redmage/models"
"github.com/tigorlazuardi/redmage/views/components"
)
type ImageDownloadSubreddit struct {
ImageDownload
}
func (im ImageDownloadSubreddit) Event() string {
return string(im.EventKind) + "." + im.Subreddit
}
type ImageDownloadSubredditCard struct {
ImageDownload
}
func (im ImageDownloadSubredditCard) Event() string {
return string(im.EventKind) + "." + im.Subreddit + ".card"
}
func (im ImageDownloadSubredditCard) Render(ctx context.Context, w io.Writer) error {
if im.EventKind == ImageDownloadEnd {
now := time.Now().Unix()
data := &models.Image{
Subreddit: im.Subreddit,
Device: im.Device,
PostTitle: im.PostTitle,
PostName: im.PostName,
PostURL: im.PostURL,
PostCreated: im.PostCreated,
PostAuthor: im.PostAuthor,
PostAuthorURL: im.PostAuthor,
ImageRelativePath: im.ImageRelativePath,
ImageOriginalURL: im.ImageOriginalURL,
ImageHeight: im.ImageHeight,
ImageWidth: im.ImageWidth,
ImageSize: im.ImageSize,
ThumbnailRelativePath: im.ThumbnailRelativePath,
NSFW: im.NSFW,
CreatedAt: now,
UpdatedAt: now,
}
return components.ImageCard(data, 0).Render(ctx, w)
} else {
return im.ImageDownload.Render(ctx, w)
}
}

View file

@ -7,7 +7,6 @@ import (
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/stephenafamo/bob"
"github.com/tigorlazuardi/redmage/models"
"github.com/tigorlazuardi/redmage/pkg/errs"
"github.com/tigorlazuardi/redmage/pkg/log"
@ -96,14 +95,12 @@ func (api *API) PubsubStartDownloadSubreddit(ctx context.Context, params PubsubS
return errs.Wrapw(err, "failed to verify subreddit existence", "params", params)
}
api.lockf(func() {
err = api.withTransaction(ctx, func(exec bob.Executor) error {
_, err := api.scheduleSet(ctx, exec, ScheduleSetParams{
_, errSchedule := api.scheduleSet(ctx, api.db, ScheduleSetParams{
Subreddit: subreddit.Name,
Status: ScheduleStatusEnqueued,
})
if err != nil {
return err
if errSchedule != nil {
log.New(ctx).Err(errSchedule).Error("failed to set schedule status", "subreddit", subreddit.Name, "status", ScheduleStatusEnqueued.String())
}
payload, err := json.Marshal(subreddit)
@ -116,8 +113,4 @@ func (api *API) PubsubStartDownloadSubreddit(ctx context.Context, params PubsubS
return errs.Wrapw(err, "failed to enqueue reddit download", "params", params)
}
return nil
})
})
return err
}

View file

@ -0,0 +1,24 @@
package api
import (
"net/http"
"github.com/stephenafamo/bob/dialect/sqlite/sm"
"github.com/tigorlazuardi/redmage/models"
"github.com/tigorlazuardi/redmage/pkg/errs"
"golang.org/x/net/context"
)
func (api *API) ScheduleHistoryLatest(ctx context.Context) (result *models.ScheduleHistory, err error) {
ctx, span := tracer.Start(ctx, "*API.ScheduleHistoryLatest")
defer span.End()
result, err = models.ScheduleHistories.Query(ctx, api.db, sm.OrderBy(models.ScheduleHistoryColumns.CreatedAt).Desc()).One()
if err != nil {
if err.Error() == "sql: no rows in result set" {
return result, errs.Wrapw(err, "last schedule history not found").Code(http.StatusNotFound)
}
return result, errs.Wrapw(err, "failed to find last schedule history")
}
return result, nil
}

View file

@ -2,31 +2,29 @@ package api
import (
"context"
"slices"
"strconv"
"strings"
"time"
"github.com/stephenafamo/bob"
"github.com/stephenafamo/bob/dialect/sqlite"
"github.com/stephenafamo/bob/dialect/sqlite/dialect"
"github.com/stephenafamo/bob/dialect/sqlite/sm"
"github.com/tigorlazuardi/redmage/api/utils"
"github.com/tigorlazuardi/redmage/models"
"github.com/tigorlazuardi/redmage/pkg/errs"
)
type ScheduleHistoryListParams struct {
Subreddit string
After time.Time
Before time.Time
Time time.Time
Reversed bool
Limit int64
Offset int64
OrderBy string
Sort string
}
func (params *ScheduleHistoryListParams) FillFromQuery(query Queryable) {
params.Subreddit = query.Get("subreddit")
params.Reversed = query.Get("direction") == "before"
params.Limit, _ = strconv.ParseInt(query.Get("limit"), 10, 64)
if params.Limit < 1 {
params.Limit = 100
@ -35,61 +33,45 @@ func (params *ScheduleHistoryListParams) FillFromQuery(query Queryable) {
params.Limit = 1000
}
params.Offset, _ = strconv.ParseInt(query.Get("offset"), 10, 64)
if params.Offset < 0 {
params.Offset = 0
}
now := time.Now()
afterInt, _ := strconv.ParseInt(query.Get("after"), 10, 64)
if afterInt > 0 {
params.After = time.Unix(afterInt, 0)
} else if afterInt < 0 {
params.After = now.Add(time.Duration(afterInt) * time.Second)
timeInt, _ := strconv.ParseInt(query.Get("time"), 10, 64)
if timeInt > 0 {
params.Time = time.Unix(timeInt, 0)
} else if timeInt < 0 {
params.Time = now.Add(time.Duration(timeInt) * time.Second)
}
beforeInt, _ := strconv.ParseInt(query.Get("before"), 10, 64)
if beforeInt > 0 {
params.Before = time.Unix(beforeInt, 0)
} else if beforeInt < 0 {
params.Before = now.Add(time.Duration(beforeInt) * time.Second)
if params.Time.After(now) {
params.Time = time.Time{}
}
params.OrderBy = query.Get("order_by")
params.Sort = query.Get("sort")
}
func (params ScheduleHistoryListParams) CountQuery() (expr []bob.Mod[*dialect.SelectQuery]) {
if params.Subreddit != "" {
expr = append(expr, models.SelectWhere.ScheduleHistories.Subreddit.EQ(params.Subreddit))
}
if !params.After.IsZero() {
expr = append(expr, models.SelectWhere.ScheduleHistories.CreatedAt.GTE(params.After.Unix()))
}
if !params.Before.IsZero() {
expr = append(expr, models.SelectWhere.ScheduleHistories.CreatedAt.LTE(params.Before.Unix()))
}
return expr
}
func (params ScheduleHistoryListParams) Query() (expr []bob.Mod[*dialect.SelectQuery]) {
expr = append(expr, params.CountQuery()...)
if !params.Time.IsZero() {
if params.Reversed {
expr = append(expr,
models.SelectWhere.ScheduleHistories.CreatedAt.GT(params.Time.Unix()),
)
} else {
expr = append(expr, models.SelectWhere.ScheduleHistories.CreatedAt.LT(params.Time.Unix()))
}
}
if params.Limit > 0 {
expr = append(expr, sm.Limit(params.Limit))
}
if params.Offset > 0 {
expr = append(expr, sm.Offset(params.Offset))
}
if params.OrderBy != "" {
if strings.ToLower(params.Sort) == "desc" {
expr = append(expr, sm.OrderBy(sqlite.Quote(params.OrderBy)).Desc())
if params.Reversed {
expr = append(expr, sm.OrderBy(models.ScheduleHistoryColumns.CreatedAt).Asc())
} else {
expr = append(expr, sm.OrderBy(sqlite.Quote(params.OrderBy)).Asc())
}
} else {
expr = append(expr, sm.OrderBy(models.ScheduleHistoryColumns.CreatedAt).Desc(), sm.OrderBy(models.ScheduleHistoryColumns.Status).Desc())
expr = append(expr, sm.OrderBy(models.ScheduleHistoryColumns.CreatedAt).Desc())
}
return expr
@ -100,6 +82,71 @@ type ScheduleHistoryListResult struct {
Total int64 `json:"count"`
}
func (result ScheduleHistoryListResult) GetLast() *models.ScheduleHistory {
if len(result.Schedules) > 0 {
return result.Schedules[len(result.Schedules)-1]
}
return nil
}
func (result ScheduleHistoryListResult) GetLastTime() time.Time {
if schedule := result.GetLast(); schedule != nil {
return time.Unix(schedule.CreatedAt, 0)
}
return time.Now()
}
func (result ScheduleHistoryListResult) GetFirstTime() time.Time {
if schedule := result.GetFirst(); schedule != nil {
return time.Unix(schedule.CreatedAt, 0)
}
return time.Now()
}
func (result ScheduleHistoryListResult) GetFirst() *models.ScheduleHistory {
if len(result.Schedules) > 0 {
return result.Schedules[0]
}
return nil
}
func (result ScheduleHistoryListResult) SplitByDay() (out []ScheduleHistoryListResultDay) {
out = make([]ScheduleHistoryListResultDay, 0, 4)
var lastDay time.Time
var lastIdx int
for _, schedule := range result.Schedules {
t := utils.StartOfDay(time.Unix(schedule.CreatedAt, 0).In(time.Local))
if !t.Equal(lastDay) {
out = append(out, ScheduleHistoryListResultDay{
Date: t,
})
lastDay = t
lastIdx = len(out) - 1
out[lastIdx].Schedules = append(out[lastIdx].Schedules, schedule)
out[lastIdx].Total += 1
} else {
out[lastIdx].Schedules = append(out[lastIdx].Schedules, schedule)
out[lastIdx].Total += 1
}
}
return
}
type ScheduleHistoryListResultDay struct {
Date time.Time `json:"date"`
ScheduleHistoryListResult
}
func (resultDay ScheduleHistoryListResultDay) GetLast() *models.ScheduleHistory {
if len(resultDay.Schedules) > 0 {
return resultDay.Schedules[len(resultDay.Schedules)-1]
}
return nil
}
func (api *API) ScheduleHistoryList(ctx context.Context, params ScheduleHistoryListParams) (result ScheduleHistoryListResult, err error) {
ctx, span := tracer.Start(ctx, "*API.ScheduleHistoryList")
defer span.End()
@ -114,5 +161,9 @@ func (api *API) ScheduleHistoryList(ctx context.Context, params ScheduleHistoryL
return result, errs.Wrapw(err, "failed to count schedule histories", "query", params)
}
if params.Reversed {
slices.Reverse(result.Schedules)
}
return result, nil
}

View file

@ -29,8 +29,8 @@ func (params *ScheduleHistoryListByDateParams) FillFromQuery(query Queryable) {
}
func (params *ScheduleHistoryListByDateParams) CountQuery() (expr []bob.Mod[*dialect.SelectQuery]) {
unixTopTime := time.Date(params.Date.Year(), params.Date.Month(), params.Date.Day(), 23, 59, 59, 0, params.Date.Location()).Unix()
unixLowTime := time.Date(params.Date.Year(), params.Date.Month(), params.Date.Day(), 0, 0, 0, 0, params.Date.Location()).Unix()
unixTopTime := time.Date(params.Date.Year(), params.Date.Month(), params.Date.Day(), 23, 59, 59, 0, time.UTC).Unix()
unixLowTime := time.Date(params.Date.Year(), params.Date.Month(), params.Date.Day(), 0, 0, 0, 0, time.UTC).Unix()
expr = append(expr,
models.SelectWhere.ScheduleHistories.CreatedAt.GTE(unixLowTime),
models.SelectWhere.ScheduleHistories.CreatedAt.LTE(unixTopTime),

7
api/utils/utils.go Normal file
View file

@ -0,0 +1,7 @@
package utils
import "time"
func StartOfDay(t time.Time) time.Time {
return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location())
}

View file

@ -1,11 +1,11 @@
package pubsub
import (
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-bolt/pkg/bolt"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/tigorlazuardi/redmage/config"
"github.com/tigorlazuardi/redmage/pkg/errs"
"github.com/tigorlazuardi/redmage/pkg/log"
"go.etcd.io/bbolt"
)
@ -23,7 +23,7 @@ func NewPublisher(db *bbolt.DB) (message.Publisher, error) {
return bolt.NewPublisher(db, bolt.PublisherConfig{
Common: bolt.CommonConfig{
Bucket: []bolt.BucketName{bolt.BucketName("watermill")},
Logger: &log.WatermillLogger{},
Logger: watermill.NopLogger{},
},
})
}
@ -33,7 +33,7 @@ func NewSubscriber(db *bbolt.DB) (message.Subscriber, error) {
Common: bolt.CommonConfig{
Bucket: []bolt.BucketName{bolt.BucketName("watermill")},
Marshaler: nil,
Logger: &log.WatermillLogger{},
Logger: watermill.NopLogger{},
},
})
}

View file

@ -0,0 +1,64 @@
package events
import (
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"github.com/tigorlazuardi/redmage/pkg/log"
)
func (handler *Handler) HTMXEvents(rw http.ResponseWriter, r *http.Request) {
ctx, span := tracer.Start(r.Context(), "*Routes.HTMXEvents")
defer span.End()
flush, ok := rw.(http.Flusher)
if !ok {
rw.WriteHeader(http.StatusInternalServerError)
_ = json.NewEncoder(rw).Encode(map[string]string{"error": "response writer does not support streaming"})
return
}
var filters []string
if q := r.URL.Query().Get("filter"); q != "" {
filters = strings.Split(q, ",")
}
log.New(ctx).Info("new htmx event stream connection", "user_agent", r.UserAgent())
rw.Header().Set("Content-Type", "text/event-stream")
rw.Header().Set("Cache-Control", "no-cache")
rw.Header().Set("Connection", "keep-alive")
rw.WriteHeader(200)
flush.Flush()
ev, close := handler.Subscribe()
defer close()
loop:
for {
select {
case <-r.Context().Done():
log.New(ctx).Info("HTMX event stream connection closed", "user_agent", r.UserAgent())
return
case event := <-ev:
msg := event.Event()
for _, filter := range filters {
if filter != msg {
continue loop
}
}
if _, err := fmt.Fprintf(rw, "event: %s\ndata: ", msg); err != nil {
return
}
if err := event.Render(ctx, rw); err != nil {
log.New(ctx).Err(err).Error("failed to render event", "user_agent", r.UserAgent())
return
}
if _, err := io.WriteString(rw, "\n\n"); err != nil {
return
}
flush.Flush()
}
}
}

View file

@ -0,0 +1,65 @@
package events
import (
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"github.com/tigorlazuardi/redmage/pkg/log"
)
func (handler *Handler) JSONEvents(rw http.ResponseWriter, r *http.Request) {
ctx, span := tracer.Start(r.Context(), "*Routes.HTMXEvents")
defer span.End()
flush, ok := rw.(http.Flusher)
if !ok {
rw.WriteHeader(http.StatusInternalServerError)
_ = json.NewEncoder(rw).Encode(map[string]string{"error": "response writer does not support streaming"})
return
}
var filters []string
if q := r.URL.Query().Get("filter"); q != "" {
filters = strings.Split(q, ",")
}
log.New(ctx).Info("new json event stream connection", "user_agent", r.UserAgent())
rw.Header().Set("Content-Type", "text/event-stream")
rw.Header().Set("Cache-Control", "no-cache")
rw.Header().Set("Connection", "keep-alive")
rw.WriteHeader(200)
flush.Flush()
ev, close := handler.Subscribe()
defer close()
loop:
for {
select {
case <-r.Context().Done():
log.New(ctx).Info("json event stream connection closed", "user_agent", r.UserAgent())
return
case event := <-ev:
msg := event.Event()
for _, filter := range filters {
if filter != msg {
continue loop
}
}
if _, err := fmt.Fprintf(rw, "event: %s\ndata: ", msg); err != nil {
return
}
if err := json.NewEncoder(rw).Encode(event); err != nil {
log.New(ctx).Err(err).Error("failed to send json event", "user_agent", r.UserAgent())
return
}
// single new line because JSON already has new line
if _, err := io.WriteString(rw, "\n"); err != nil {
return
}
flush.Flush()
}
}
}

View file

@ -0,0 +1,64 @@
package events
import (
"encoding/json"
"fmt"
"net/http"
"strings"
"github.com/tigorlazuardi/redmage/pkg/log"
)
// SimpleEvents is a simple event stream for the purpose of
// notification that something did happen. Not what the content of the event is.
//
// Useful for simple notification whose client just need to know that something
// happened and do something that does not require the content of the event,
// like refreshing the list by calling another http request.
func (handler *Handler) SimpleEvents(rw http.ResponseWriter, r *http.Request) {
ctx, span := tracer.Start(r.Context(), "*Routes.SimpleDownloadEvent")
defer span.End()
flush, ok := rw.(http.Flusher)
if !ok {
rw.WriteHeader(http.StatusInternalServerError)
_ = json.NewEncoder(rw).Encode(map[string]string{"error": "response writer does not support streaming"})
return
}
var filters []string
if q := r.URL.Query().Get("filter"); q != "" {
filters = strings.Split(q, ",")
}
log.New(ctx).Info("new simple event stream connection", "user_agent", r.UserAgent())
rw.Header().Set("Content-Type", "text/event-stream")
rw.Header().Set("Cache-Control", "no-cache")
rw.Header().Set("Connection", "keep-alive")
rw.WriteHeader(200)
flush.Flush()
ev, close := handler.Subscribe()
defer close()
loop:
for {
select {
case <-r.Context().Done():
log.New(ctx).Info("simple event stream connection closed", "user_agent", r.UserAgent())
return
case event := <-ev:
msg := event.Event()
for _, filter := range filters {
if filter != msg {
continue loop
}
}
if _, err := fmt.Fprintf(rw, "event: %s\ndata: %s\n\n", msg, msg); err != nil {
return
}
flush.Flush()
}
}
}

View file

@ -0,0 +1,24 @@
package events
import (
"github.com/teivah/broadcast"
apievents "github.com/tigorlazuardi/redmage/api/events"
"github.com/tigorlazuardi/redmage/config"
)
type Handler struct {
Config *config.Config
Broadcast *broadcast.Relay[apievents.Event]
}
func NewHandler(cfg *config.Config, broadcast *broadcast.Relay[apievents.Event]) *Handler {
return &Handler{
Config: cfg,
Broadcast: broadcast,
}
}
func (handler *Handler) Subscribe() (<-chan apievents.Event, func()) {
listener := handler.Broadcast.Listener(10)
return listener.Ch(), listener.Close
}

View file

@ -0,0 +1,17 @@
package events
import (
"github.com/go-chi/chi/v5"
"github.com/riandyrn/otelchi"
"github.com/tigorlazuardi/redmage/server/routes/middleware"
chimiddleware "github.com/go-chi/chi/v5/middleware"
)
func (handler *Handler) Route(router chi.Router) {
router.Use(otelchi.Middleware("redmage"))
router.Use(chimiddleware.RequestLogger(middleware.ChiLogger{}))
router.Get("/", handler.HTMXEvents)
router.Get("/simple", handler.SimpleEvents)
router.Get("/json", handler.JSONEvents)
}

View file

@ -0,0 +1,5 @@
package events
import "go.opentelemetry.io/otel"
var tracer = otel.Tracer("server/routes/events")

View file

@ -3,6 +3,7 @@ package routes
import (
"net/http"
"github.com/tigorlazuardi/redmage/api"
"github.com/tigorlazuardi/redmage/pkg/errs"
"github.com/tigorlazuardi/redmage/pkg/log"
"github.com/tigorlazuardi/redmage/views"
@ -18,7 +19,20 @@ func (routes *Routes) PageScheduleHistory(rw http.ResponseWriter, req *http.Requ
var data schedulehistories.Data
data.Params.FillFromQuery(req.URL.Query())
result, err := routes.API.ScheduleHistoryListByDate(ctx, data.Params)
result, err := routes.API.ScheduleHistoryList(ctx, data.Params)
if err != nil {
log.New(ctx).Err(err).Error("Failed to list schedule histories")
code, message := errs.HTTPMessage(err)
rw.WriteHeader(code)
data.Error = message
if err := schedulehistories.View(c, data).Render(ctx, rw); err != nil {
log.New(ctx).Err(err).Error("Failed to render schedule histories view")
}
return
}
data.ScheduleHistories = result
latest, err := routes.API.ScheduleHistoryLatest(ctx)
if err != nil {
log.New(ctx).Err(err).Error("Failed to list schedule histories")
code, message := errs.HTTPMessage(err)
@ -30,7 +44,29 @@ func (routes *Routes) PageScheduleHistory(rw http.ResponseWriter, req *http.Requ
return
}
data.ScheduleHistories = result.Schedules
if first := data.ScheduleHistories.GetFirst(); first != nil {
if first.ID == latest.ID {
data.IsCurrent = true
}
if data.IsCurrent && len(data.ScheduleHistories.Schedules) < int(data.ScheduleHistories.Total) {
data.Params = api.ScheduleHistoryListParams{
Subreddit: data.Params.Subreddit,
Limit: data.Params.Limit,
}
data.ScheduleHistories, err = routes.API.ScheduleHistoryList(ctx, data.Params)
if err != nil {
log.New(ctx).Err(err).Error("Failed to list schedule histories")
code, message := errs.HTTPMessage(err)
rw.WriteHeader(code)
data.Error = message
if err := schedulehistories.View(c, data).Render(ctx, rw); err != nil {
log.New(ctx).Err(err).Error("Failed to render schedule histories view")
}
return
}
}
}
if err := schedulehistories.View(c, data).Render(ctx, rw); err != nil {
log.New(ctx).Err(err).Error("Failed to render schedule histories view")

View file

@ -10,6 +10,7 @@ import (
"github.com/riandyrn/otelchi"
"github.com/tigorlazuardi/redmage/api"
"github.com/tigorlazuardi/redmage/config"
"github.com/tigorlazuardi/redmage/server/routes/events"
"github.com/tigorlazuardi/redmage/server/routes/middleware"
)
@ -30,6 +31,8 @@ func (routes *Routes) Register(router chi.Router) {
router.Route("/htmx", routes.registerHTMXRoutes)
router.Route("/api/v1", routes.registerV1APIRoutes)
eventHandler := events.NewHandler(routes.Config, routes.API.GetEventBroadcaster())
router.Route("/events", eventHandler.Route)
router.Group(routes.registerWWWRoutes)
}

View file

@ -6,20 +6,20 @@ templ ActionButton(components ...templ.Component) {
<div
class="max-xs:toast max-xs:z-40"
x-data="{ show: false }"
@click="show = !show; if (!show) document.activeElement.blur()"
@click.away="show = false"
>
<div class="dropdown dropdown-hover dropdown-top xs:dropdown-bottom dropdown-end">
<div
tabindex="0"
role="button"
class="btn btn-primary max-xs:btn-circle max-lg:btn-square xs:btn-outline m-1 max-xs:border-none"
x-ref="button"
>
@icons.Kebab("h-8 w-8")
</div>
<ul
tabindex="0"
class="dropdown-content z-[1] menu p-2 shadow bg-base-100 rounded-box w-52 m-0 border-primary border-2"
@click="document.activeElement.blur()"
>
for i, component := range components {
if i > 0 {

View file

@ -37,6 +37,7 @@ templ ImageCard(data *models.Image, opts ImageCardOption) {
},
}`, data.CreatedAt) }
class="not-prose card card-bordered bg-base-100 hover:bg-base-200 shadow-xl min-w-[16rem] max-w-[16rem] rounded-xl top-0 hover:-top-1 hover:drop-shadow-2xl transition-all"
id={ fmt.Sprintf("image-card-%s-%s", data.Subreddit, data.PostName) }
>
<figure>
<a

View file

@ -0,0 +1,134 @@
package progress
import "time"
import "fmt"
type ImageDownloadStartNotificationData struct {
ID string
Subreddit string
PostURL string
PostName string
PostTitle string
AutoRemoveDuration time.Duration
}
templ ImageDownloadStartNotification(data ImageDownloadStartNotificationData) {
<div
id={ data.ID }
if data.AutoRemoveDuration > 0 {
x-data={ fmt.Sprintf("{ init() { setTimeout(() => $el.remove(), %d) }}", data.AutoRemoveDuration.Milliseconds()) }
}
onclick="this.remove()"
class="alert alert-info hover:bg-info-content transition-all"
>
<span>
<a
target="_blank"
href={ templ.SafeURL(fmt.Sprintf("https://www.reddit.com/r/%s", data.Subreddit)) }
>{ data.Subreddit }</a>:
Start Downloading
<a href={ templ.SafeURL(data.PostURL) }>{ truncateTitle(data.PostTitle) }</a>
</span>
</div>
}
func truncateTitle(s string) string {
if len(s) > 20 {
return s[:20] + "..."
}
return s
}
type ImageDownloadEndNotificationData struct {
ID string
Subreddit string
PostURL string
PostName string
PostTitle string
AutoRemoveDuration time.Duration
}
templ ImageDownloadEndNotification(data ImageDownloadEndNotificationData) {
<div
id={ data.ID }
if data.AutoRemoveDuration > 0 {
x-data={ fmt.Sprintf("{ init() { setTimeout(() => $el.remove(), %d) }}", data.AutoRemoveDuration.Milliseconds()) }
}
onclick="this.remove()"
class="alert alert-success hover:bg-success-content transition-all"
>
<span>
<a
target="_blank"
href={ templ.SafeURL(fmt.Sprintf("https://www.reddit.com/r/%s", data.Subreddit)) }
>{ data.Subreddit }</a>:
Finished Downloading
<a href={ templ.SafeURL(data.PostURL) }>{ truncateTitle(data.PostTitle) }</a>
</span>
</div>
}
type ImageDownloadErrorNotificationData struct {
ID string
Subreddit string
PostURL string
PostName string
PostTitle string
Error error
AutoRemoveDuration time.Duration
}
templ ImageDownloadErrorNotification(data ImageDownloadErrorNotificationData) {
<div
id={ data.ID }
if data.AutoRemoveDuration > 0 {
x-data={ fmt.Sprintf("{ init() { setTimeout(() => $el.remove(), %d) }}", data.AutoRemoveDuration.Milliseconds()) }
}
onclick="this.remove()"
class="alert alert-error hover:bg-error-content transition-all"
>
<span>
<a
target="_blank"
href={ templ.SafeURL(fmt.Sprintf("https://www.reddit.com/r/%s", data.Subreddit)) }
>{ data.Subreddit }</a>:
{ data.Error.Error() }
<a href={ templ.SafeURL(data.PostURL) }>{ truncateTitle(data.PostTitle) }</a>
</span>
</div>
}
type ImageDownloadProgressNotificationData struct {
ID string
Subreddit string
PostURL string
PostName string
PostTitle string
ContentLength int64
Downloaded int64
AutoRemoveDuration time.Duration
}
func (i ImageDownloadProgressNotificationData) GetProgress() float64 {
return float64(i.Downloaded) / float64(i.ContentLength)
}
templ ImageDownloadProgressNotification(data ImageDownloadProgressNotificationData) {
<div
id={ data.ID }
if data.AutoRemoveDuration > 0 {
x-data={ fmt.Sprintf("{ init() { setTimeout(() => $el.remove(), %d) }}", data.AutoRemoveDuration.Milliseconds()) }
}
onclick="this.remove()"
class="alert alert-info hover:bg-info-content transition-all"
>
<span>
<a
target="_blank"
href={ templ.SafeURL(fmt.Sprintf("https://www.reddit.com/r/%s", data.Subreddit)) }
>{ data.Subreddit }</a>:
Progress: { fmt.Sprintf("%.2f%%", data.GetProgress()*100) }
<a href={ templ.SafeURL(data.PostURL) }>{ truncateTitle(data.PostTitle) }</a>
</span>
</div>
}

View file

@ -34,7 +34,6 @@ templ ChevronBoldLeft(class ...string) {
class={ strings.Join(class, " ") }
}
>
<title>chevron-left</title>
<desc>Created with Sketch Beta.</desc>
<defs></defs>
<g id="Page-1" stroke="currentColor" stroke-width="1" fill="currentColor" fill-rule="evenodd" sketch:type="MSPage">

View file

@ -2,24 +2,21 @@ package schedulehistories
import "github.com/tigorlazuardi/redmage/views"
import "github.com/tigorlazuardi/redmage/views/components"
import "github.com/tigorlazuardi/redmage/models"
import "github.com/tigorlazuardi/redmage/api"
import "fmt"
import "time"
import "github.com/tigorlazuardi/redmage/views/icons"
import "github.com/tigorlazuardi/redmage/models"
type Data struct {
ScheduleHistories models.ScheduleHistorySlice
Params api.ScheduleHistoryListByDateParams
ScheduleHistories api.ScheduleHistoryListResult
Params api.ScheduleHistoryListParams
FirstSchedule *models.ScheduleHistory
LastSchedule *models.ScheduleHistory
IsCurrent bool
Error string
}
func (data Data) isCurrentDay() bool {
now := time.Now()
return now.Format(time.DateOnly) == data.Params.Date.Format(time.DateOnly)
}
templ View(c *views.Context, data Data) {
@components.Doctype() {
@components.Head(c,
@ -42,15 +39,18 @@ templ Content(c *views.Context, data Data) {
<main class="prose min-w-full">
<h1>Schedule History ({ time.Local.String() })</h1>
<div class="divider my-0"></div>
@dateBar(data, true)
if len(data.ScheduleHistories) == 0 {
@dateBar(data)
if len(data.ScheduleHistories.Schedules) == 0 {
<h2>There are no history schedules found for current date.</h2>
}
if len(data.ScheduleHistories) > 0 {
if len(data.ScheduleHistories.Schedules) > 0 {
for _, history := range data.ScheduleHistories.SplitByDay() {
<h1 class="mb-6 mt-8 max-xs:text-xl">{ history.Date.Format("Monday, 02 January 2006") }</h1>
<div class="divider my-2"></div>
<div class="grid sm:grid-cols-[1fr,9fr] gap-x-4 gap-y-2 sm:gap-y-4">
<span class="font-bold max-sm:hidden text-center">Time</span>
<span class="font-bold max-sm:hidden">Event</span>
for i, schedule := range data.ScheduleHistories {
for i, schedule := range history.Schedules {
if i > 0 {
<div class="divider sm:hidden"></div>
}
@ -111,8 +111,9 @@ templ Content(c *views.Context, data Data) {
}
</div>
}
if len(data.ScheduleHistories) > 20 {
@dateBar(data, false)
}
if len(data.ScheduleHistories.Schedules) > 20 {
@dateBar(data)
}
@actionButton(data)
</main>
@ -120,14 +121,25 @@ templ Content(c *views.Context, data Data) {
templ actionButton(data Data) {
<div class="xs:hidden">
@components.ActionButton(
actionButtonNext(data),
actionButtonPrev(data),
)
@components.ActionButton(actionButtonItems(data)...)
</div>
}
templ dateBar(data Data, showDate bool) {
func actionButtonItems(data Data) []templ.Component {
out := make([]templ.Component, 0, 2)
if len(data.ScheduleHistories.Schedules) >= int(data.Params.Limit) || data.Params.Reversed {
out = append(out, actionButtonNext(data))
}
if !data.IsCurrent {
out = append(out, actionButtonPrev(data))
}
if data.IsCurrent {
out = append(out, actionButtonRefresh())
}
return out
}
templ dateBar(data Data) {
<div
class="flex flex-wrap justify-between my-4 items-center"
hx-boost="true"
@ -135,50 +147,59 @@ templ dateBar(data Data, showDate bool) {
hx-swap="outerHTML"
hx-target="#root-content"
>
if data.isCurrentDay() {
if data.IsCurrent {
<div class="tooltip" data-tip="Refresh">
<a
href="/history"
class="btn btn-primary btn-outline btn-square text-base-100"
>
@icons.Refresh("w-6 h-6")
</a>
</div>
} else {
<div class="tooltip" data-tip="Previous">
<a
href={ templ.SafeURL(fmt.Sprintf("/history?date=%s", data.Params.Date.Add(time.Hour*24).Format(time.DateOnly))) }
href={ templ.SafeURL(fmt.Sprintf("/history?time=%d&direction=before", data.ScheduleHistories.GetFirstTime().Unix())) }
class="btn btn-primary btn-outline btn-square text-base-100"
>
@icons.ChevronBoldLeft("w-6 h-6")
</a>
</div>
}
if showDate {
<span class="max-xs:hidden text-primary font-bold sm:text-2xl">{ data.Params.Date.Format("Monday, 02 January 2006") }</span>
<span class="xs:hidden text-primary font-bold">{ data.Params.Date.Format("Mon, 02 Jan") }</span>
}
if len(data.ScheduleHistories.Schedules) >= int(data.Params.Limit) || data.Params.Reversed {
<div class="tooltip" data-tip="Next">
<a
href={ templ.SafeURL(fmt.Sprintf("/history?date=%s", data.Params.Date.Add(time.Hour*-24).Format(time.DateOnly))) }
href={ templ.SafeURL(fmt.Sprintf("/history?time=%d", data.ScheduleHistories.GetLastTime().Unix())) }
class="btn btn-primary btn-outline btn-square text-base-100 no-underline"
>
@icons.ChevronBoldRight("w-6 h-6")
</a>
</div>
}
</div>
}
templ actionButtonNext(data Data) {
<a
href={ templ.SafeURL(fmt.Sprintf("/history?date=%s", data.Params.Date.Add(time.Hour*-24).Format(time.DateOnly))) }
href={ templ.SafeURL(fmt.Sprintf("/history?time=%d", data.ScheduleHistories.GetLastTime().Unix())) }
class="btn btn-ghost btn-sm no-underline m-0"
>Next</a>
}
templ actionButtonPrev(data Data) {
<a
href={ templ.SafeURL(fmt.Sprintf("/history?date=%s", data.Params.Date.Add(time.Hour*24).Format(time.DateOnly))) }
href={ templ.SafeURL(fmt.Sprintf("/history?time=%d&direction=before", data.ScheduleHistories.GetFirstTime().Unix())) }
class="btn btn-ghost btn-sm no-underline m-0"
>Previous</a>
}
templ actionButtonRefresh() {
<a
href="/history"
class="btn btn-ghost btn-sm no-underline m-0"
>Refresh</a>
}
templ subredditLink(subreddit string) {
<a href={ templ.URL(fmt.Sprintf("/subreddits/details/%s", subreddit)) } class="text-primary">{ subreddit }</a>
}