wip: update event system

This commit is contained in:
Tigor Hutasuhut 2024-06-10 14:52:53 +07:00
parent d902a2b8c1
commit 8e45faba28
12 changed files with 412 additions and 73 deletions

View file

@ -8,6 +8,7 @@ import (
"github.com/stephenafamo/bob" "github.com/stephenafamo/bob"
"github.com/teivah/broadcast" "github.com/teivah/broadcast"
"github.com/tigorlazuardi/redmage/api/bmessage" "github.com/tigorlazuardi/redmage/api/bmessage"
"github.com/tigorlazuardi/redmage/api/events"
"github.com/tigorlazuardi/redmage/api/reddit" "github.com/tigorlazuardi/redmage/api/reddit"
"github.com/tigorlazuardi/redmage/api/scheduler" "github.com/tigorlazuardi/redmage/api/scheduler"
"github.com/tigorlazuardi/redmage/config" "github.com/tigorlazuardi/redmage/config"
@ -24,6 +25,7 @@ type API struct {
scheduler *scheduler.Scheduler scheduler *scheduler.Scheduler
downloadBroadcast *broadcast.Relay[bmessage.ImageDownloadMessage] downloadBroadcast *broadcast.Relay[bmessage.ImageDownloadMessage]
eventBroadcast *broadcast.Relay[events.Event]
config *config.Config config *config.Config
@ -57,6 +59,7 @@ func New(deps Dependencies) *API {
db: bob.New(deps.DB), db: bob.New(deps.DB),
sqldb: deps.DB, sqldb: deps.DB,
downloadBroadcast: broadcast.NewRelay[bmessage.ImageDownloadMessage](), downloadBroadcast: broadcast.NewRelay[bmessage.ImageDownloadMessage](),
eventBroadcast: broadcast.NewRelay[events.Event](),
config: deps.Config, config: deps.Config,
imageSemaphore: make(chan struct{}, deps.Config.Int("download.concurrency.images")), imageSemaphore: make(chan struct{}, deps.Config.Int("download.concurrency.images")),
reddit: deps.Reddit, reddit: deps.Reddit,
@ -91,3 +94,7 @@ func (api *API) scheduleRun(subreddit string) {
log.New(ctx).Err(err).Error("api: failed to start download subreddit", "subreddit", subreddit) log.New(ctx).Err(err).Error("api: failed to start download subreddit", "subreddit", subreddit)
} }
} }
func (api *API) GetEventBroadcaster() *broadcast.Relay[events.Event] {
return api.eventBroadcast
}

View file

@ -0,0 +1,113 @@
package events
import (
"context"
"encoding/json"
"fmt"
"io"
"time"
"github.com/teivah/broadcast"
"github.com/tigorlazuardi/redmage/pkg/errs"
"github.com/tigorlazuardi/redmage/views/components/progress"
)
type ImageDownloadEvent string
const (
ImageDownloadStart ImageDownloadEvent = "image.download.start"
ImageDownloadEnd ImageDownloadEvent = "image.download.end"
ImageDownloadError ImageDownloadEvent = "image.download.error"
ImageDownloadProgress ImageDownloadEvent = "image.download.progress"
)
type ImageDownload struct {
EventKind ImageDownloadEvent `json:"event,omitempty"`
ImageURL string `json:"image_url,omitempty"`
ImageHeight int32 `json:"image_height,omitempty"`
ImageWidth int32 `json:"image_width,omitempty"`
ContentLength int64 `json:"content_length,omitempty"`
Downloaded int64 `json:"downloaded,omitempty"`
Subreddit string `json:"subreddit,omitempty"`
PostURL string `json:"post_url,omitempty"`
PostName string `json:"post_name,omitempty"`
PostTitle string `json:"post_title,omitempty"`
PostCreated int64 `json:"post_created,omitempty"`
PostAuthor string `json:"post_author,omitempty"`
PostAuthorURL string `json:"post_author_url,omitempty"`
ImageRelativePath string `json:"image_relative_path,omitempty"`
ImageOriginalURL string `json:"image_original_url,omitempty"`
ImageSize int64 `json:"image_size,omitempty"`
ThumbnailRelativePath string `json:"thumbnail_relative_path,omitempty"`
NSFW int32 `json:"nsfw,omitempty"`
Error error `json:"error,omitempty"`
Device string `json:"device,omitempty"`
}
// Render the template.
func (im ImageDownload) Render(ctx context.Context, w io.Writer) error {
switch im.EventKind {
case ImageDownloadStart:
return progress.ImageDownloadStartNotification(progress.ImageDownloadStartNotificationData{
ID: fmt.Sprintf("notif-image-download-%s-%s", im.Subreddit, im.PostName),
Subreddit: im.Subreddit,
PostName: im.PostName,
PostTitle: im.PostTitle,
PostURL: im.PostURL,
AutoRemoveDuration: time.Second * 5,
}).Render(ctx, w)
case ImageDownloadEnd:
return progress.ImageDownloadEndNotification(progress.ImageDownloadEndNotificationData{
ID: fmt.Sprintf("notif-image-download-%s-%s", im.Subreddit, im.PostName),
Subreddit: im.Subreddit,
PostURL: im.PostName,
PostName: im.PostTitle,
PostTitle: im.PostURL,
AutoRemoveDuration: time.Second * 5,
}).Render(ctx, w)
case ImageDownloadError:
return progress.ImageDownloadErrorNotification(progress.ImageDownloadErrorNotificationData{
ID: fmt.Sprintf("notif-image-download-%s-%s", im.Subreddit, im.PostName),
Subreddit: im.Subreddit,
PostURL: im.PostName,
PostName: im.PostTitle,
PostTitle: im.PostURL,
Error: im.Error,
AutoRemoveDuration: time.Second * 5,
}).Render(ctx, w)
case ImageDownloadProgress:
return progress.ImageDownloadProgressNotification(progress.ImageDownloadProgressNotificationData{
ID: fmt.Sprintf("notif-image-download-%s-%s", im.Subreddit, im.PostName),
Subreddit: im.Subreddit,
PostURL: im.PostName,
PostName: im.PostTitle,
PostTitle: im.PostURL,
ContentLength: im.ContentLength,
Downloaded: im.Downloaded,
AutoRemoveDuration: time.Second * 5,
}).Render(ctx, w)
default:
return errs.Fail("events.ImageDownload: unknown event kind", "event", im)
}
}
// Event returns the event name
func (im ImageDownload) Event() string {
return string(im.EventKind)
}
// SerializeTo writes the event data to the writer.
//
// SerializeTo must not write multiple linebreaks (single linebreak is fine)
// in succession to the writer since it will mess up SSE events.
func (im ImageDownload) SerializeTo(w io.Writer) error {
return json.NewEncoder(w).Encode(im)
}
func PublishImageDownloadEvent(bc *broadcast.Relay[Event], event ImageDownload) {
bc.Broadcast(event)
bc.Broadcast(ImageDownloadSubreddit{event})
if event.EventKind == ImageDownloadEnd {
bc.Broadcast(ImageDownloadSubredditCard{event})
}
}

View file

@ -1,66 +0,0 @@
package events
import (
"context"
"encoding/json"
"io"
"github.com/tigorlazuardi/redmage/pkg/errs"
"github.com/tigorlazuardi/redmage/views/components/progress"
)
type ImageDownloadEvent string
const (
ImageDownloadStart ImageDownloadEvent = "image.download.start"
ImageDownloadEnd ImageDownloadEvent = "image.download.end"
ImageDownloadError ImageDownloadEvent = "image.download.error"
ImageDownloadProgress ImageDownloadEvent = "image.download.progress"
)
type ImageDownload struct {
EventKind ImageDownloadEvent `json:"event"`
ImageURL string `json:"image_url"`
ImageHeight int64 `json:"image_height"`
ImageWidth int64 `json:"image_width"`
ContentLength int64 `json:"content_length"`
Downloaded int64 `json:"downloaded"`
Subreddit string `json:"subreddit"`
PostURL string `json:"post_url"`
PostName string `json:"post_name"`
PostTitle string `json:"post_title"`
Error error `json:"error"`
}
// Render the template.
func (im ImageDownload) Render(ctx context.Context, w io.Writer) error {
switch im.EventKind {
case ImageDownloadStart:
return progress.ImageDownloadStartNotification(progress.ImageDownloadStartNotificationData{}).Render(ctx, w)
case ImageDownloadEnd:
return progress.ImageDownloadEndNotification(progress.ImageDownloadEndNotificationData{}).Render(ctx, w)
default:
return errs.Fail("events.ImageDownload: unknown event kind", "event", im)
}
}
// Event returns the event name
func (im ImageDownload) Event() string {
return "image.download.notification"
}
// SerializeTo writes the event data to the writer.
//
// SerializeTo must not write multiple linebreaks (single linebreak is fine)
// in succession to the writer since it will mess up SSE events.
func (im ImageDownload) SerializeTo(w io.Writer) error {
return json.NewEncoder(w).Encode(im)
}
type ImageDownloadSubreddit struct {
ImageDownload
}
func (im ImageDownloadSubreddit) Event() string {
return string(im.EventKind) + "." + im.Subreddit
}

View file

@ -0,0 +1,54 @@
package events
import (
"context"
"io"
"time"
"github.com/tigorlazuardi/redmage/models"
"github.com/tigorlazuardi/redmage/views/components"
)
type ImageDownloadSubreddit struct {
ImageDownload
}
func (im ImageDownloadSubreddit) Event() string {
return string(im.EventKind) + "." + im.Subreddit
}
type ImageDownloadSubredditCard struct {
ImageDownload
}
func (im ImageDownloadSubredditCard) Event() string {
return string(im.EventKind) + "." + im.Subreddit + ".card"
}
func (im ImageDownloadSubredditCard) Render(ctx context.Context, w io.Writer) error {
if im.EventKind == ImageDownloadEnd {
now := time.Now().Unix()
data := &models.Image{
Subreddit: im.Subreddit,
Device: im.Device,
PostTitle: im.PostTitle,
PostName: im.PostName,
PostURL: im.PostURL,
PostCreated: im.PostCreated,
PostAuthor: im.PostAuthor,
PostAuthorURL: im.PostAuthor,
ImageRelativePath: im.ImageRelativePath,
ImageOriginalURL: im.ImageOriginalURL,
ImageHeight: im.ImageHeight,
ImageWidth: im.ImageWidth,
ImageSize: im.ImageSize,
ThumbnailRelativePath: im.ThumbnailRelativePath,
NSFW: im.NSFW,
CreatedAt: now,
UpdatedAt: now,
}
return components.ImageCard(data, 0).Render(ctx, w)
} else {
return im.ImageDownload.Render(ctx, w)
}
}

View file

@ -0,0 +1,61 @@
package events
import (
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"github.com/tigorlazuardi/redmage/pkg/log"
)
func (handler *Handler) HTMXEvents(rw http.ResponseWriter, r *http.Request) {
ctx, span := tracer.Start(r.Context(), "*Routes.HTMXEvents")
defer span.End()
flush, ok := rw.(http.Flusher)
if !ok {
rw.WriteHeader(http.StatusInternalServerError)
_ = json.NewEncoder(rw).Encode(map[string]string{"error": "response writer does not support streaming"})
return
}
filters := strings.Split(r.URL.Query().Get("filter"), ",")
log.New(ctx).Info("new htmx event stream connection", "user_agent", r.UserAgent())
rw.Header().Set("Content-Type", "text/event-stream")
rw.Header().Set("Cache-Control", "no-cache")
rw.Header().Set("Connection", "keep-alive")
rw.WriteHeader(200)
flush.Flush()
ev, close := handler.Subscribe()
defer close()
loop:
for {
select {
case <-r.Context().Done():
log.New(ctx).Info("simple event stream connection closed", "user_agent", r.UserAgent())
return
case event := <-ev:
msg := event.Event()
for _, filter := range filters {
if filter != msg {
continue loop
}
}
if _, err := fmt.Fprintf(rw, "event: %s\ndata: ", msg); err != nil {
return
}
if err := event.Render(ctx, rw); err != nil {
log.New(ctx).Err(err).Error("failed to render event", "user_agent", r.UserAgent())
return
}
if _, err := io.WriteString(rw, "\n\n"); err != nil {
return
}
flush.Flush()
}
}
}

View file

@ -0,0 +1,62 @@
package events
import (
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"github.com/tigorlazuardi/redmage/pkg/log"
)
func (handler *Handler) JSONEvents(rw http.ResponseWriter, r *http.Request) {
ctx, span := tracer.Start(r.Context(), "*Routes.HTMXEvents")
defer span.End()
flush, ok := rw.(http.Flusher)
if !ok {
rw.WriteHeader(http.StatusInternalServerError)
_ = json.NewEncoder(rw).Encode(map[string]string{"error": "response writer does not support streaming"})
return
}
filters := strings.Split(r.URL.Query().Get("filter"), ",")
log.New(ctx).Info("new htmx event stream connection", "user_agent", r.UserAgent())
rw.Header().Set("Content-Type", "text/event-stream")
rw.Header().Set("Cache-Control", "no-cache")
rw.Header().Set("Connection", "keep-alive")
rw.WriteHeader(200)
flush.Flush()
ev, close := handler.Subscribe()
defer close()
loop:
for {
select {
case <-r.Context().Done():
log.New(ctx).Info("simple event stream connection closed", "user_agent", r.UserAgent())
return
case event := <-ev:
msg := event.Event()
for _, filter := range filters {
if filter != msg {
continue loop
}
}
if _, err := fmt.Fprintf(rw, "event: %s\ndata: ", msg); err != nil {
return
}
if err := json.NewEncoder(rw).Encode(event); err != nil {
log.New(ctx).Err(err).Error("failed to send json event", "user_agent", r.UserAgent())
return
}
// single new line because JSON already has new line
if _, err := io.WriteString(rw, "\n"); err != nil {
return
}
flush.Flush()
}
}
}

View file

@ -4,12 +4,19 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http" "net/http"
"strings"
"github.com/tigorlazuardi/redmage/pkg/log" "github.com/tigorlazuardi/redmage/pkg/log"
) )
func (handler *Handler) SimpleDownloadEvent(rw http.ResponseWriter, r *http.Request) { // SimpleEvents is a simple event stream for the purpose of
ctx, span := tracer.Start(r.Context(), "*Routes.EventsAPI") // notification that something did happen. Not what the content of the event is.
//
// Useful for simple notification whose client just need to know that something
// happened and do something that does not require the content of the event,
// like refreshing the list by calling another http request.
func (handler *Handler) SimpleEvents(rw http.ResponseWriter, r *http.Request) {
ctx, span := tracer.Start(r.Context(), "*Routes.SimpleDownloadEvent")
defer span.End() defer span.End()
flush, ok := rw.(http.Flusher) flush, ok := rw.(http.Flusher)
@ -19,6 +26,8 @@ func (handler *Handler) SimpleDownloadEvent(rw http.ResponseWriter, r *http.Requ
return return
} }
filters := strings.Split(r.URL.Query().Get("filter"), ",")
log.New(ctx).Info("new simple event stream connection", "user_agent", r.UserAgent()) log.New(ctx).Info("new simple event stream connection", "user_agent", r.UserAgent())
rw.Header().Set("Content-Type", "text/event-stream") rw.Header().Set("Content-Type", "text/event-stream")
@ -30,6 +39,7 @@ func (handler *Handler) SimpleDownloadEvent(rw http.ResponseWriter, r *http.Requ
ev, close := handler.Subscribe() ev, close := handler.Subscribe()
defer close() defer close()
loop:
for { for {
select { select {
case <-r.Context().Done(): case <-r.Context().Done():
@ -37,6 +47,11 @@ func (handler *Handler) SimpleDownloadEvent(rw http.ResponseWriter, r *http.Requ
return return
case event := <-ev: case event := <-ev:
msg := event.Event() msg := event.Event()
for _, filter := range filters {
if filter != msg {
continue loop
}
}
if _, err := fmt.Fprintf(rw, "event: %s\ndata: %s\n\n", msg, msg); err != nil { if _, err := fmt.Fprintf(rw, "event: %s\ndata: %s\n\n", msg, msg); err != nil {
return return
} }

View file

@ -2,16 +2,23 @@ package events
import ( import (
"github.com/teivah/broadcast" "github.com/teivah/broadcast"
"github.com/tigorlazuardi/redmage/api/events" apievents "github.com/tigorlazuardi/redmage/api/events"
"github.com/tigorlazuardi/redmage/config" "github.com/tigorlazuardi/redmage/config"
) )
type Handler struct { type Handler struct {
Config *config.Config Config *config.Config
Broadcast *broadcast.Relay[events.Event] Broadcast *broadcast.Relay[apievents.Event]
} }
func (handler *Handler) Subscribe() (<-chan events.Event, func()) { func NewHandler(cfg *config.Config, broadcast *broadcast.Relay[apievents.Event]) *Handler {
return &Handler{
Config: cfg,
Broadcast: broadcast,
}
}
func (handler *Handler) Subscribe() (<-chan apievents.Event, func()) {
listener := handler.Broadcast.Listener(10) listener := handler.Broadcast.Listener(10)
return listener.Ch(), listener.Close return listener.Ch(), listener.Close
} }

View file

@ -0,0 +1,17 @@
package events
import (
"github.com/go-chi/chi/v5"
"github.com/riandyrn/otelchi"
"github.com/tigorlazuardi/redmage/server/routes/middleware"
chimiddleware "github.com/go-chi/chi/v5/middleware"
)
func (handler *Handler) Route(router chi.Router) {
router.Use(otelchi.Middleware("redmage"))
router.Use(chimiddleware.RequestLogger(middleware.ChiLogger{}))
router.Get("/", handler.HTMXEvents)
router.Get("/simple", handler.SimpleEvents)
router.Get("/json", handler.JSONEvents)
}

View file

@ -10,6 +10,7 @@ import (
"github.com/riandyrn/otelchi" "github.com/riandyrn/otelchi"
"github.com/tigorlazuardi/redmage/api" "github.com/tigorlazuardi/redmage/api"
"github.com/tigorlazuardi/redmage/config" "github.com/tigorlazuardi/redmage/config"
"github.com/tigorlazuardi/redmage/server/routes/events"
"github.com/tigorlazuardi/redmage/server/routes/middleware" "github.com/tigorlazuardi/redmage/server/routes/middleware"
) )
@ -30,6 +31,8 @@ func (routes *Routes) Register(router chi.Router) {
router.Route("/htmx", routes.registerHTMXRoutes) router.Route("/htmx", routes.registerHTMXRoutes)
router.Route("/api/v1", routes.registerV1APIRoutes) router.Route("/api/v1", routes.registerV1APIRoutes)
eventHandler := events.NewHandler(routes.Config, routes.API.GetEventBroadcaster())
router.Route("/events", eventHandler.Route)
router.Group(routes.registerWWWRoutes) router.Group(routes.registerWWWRoutes)
} }

View file

@ -37,6 +37,7 @@ templ ImageCard(data *models.Image, opts ImageCardOption) {
}, },
}`, data.CreatedAt) } }`, data.CreatedAt) }
class="not-prose card card-bordered bg-base-100 hover:bg-base-200 shadow-xl min-w-[16rem] max-w-[16rem] rounded-xl top-0 hover:-top-1 hover:drop-shadow-2xl transition-all" class="not-prose card card-bordered bg-base-100 hover:bg-base-200 shadow-xl min-w-[16rem] max-w-[16rem] rounded-xl top-0 hover:-top-1 hover:drop-shadow-2xl transition-all"
id={ fmt.Sprintf("image-card-%s-%s", data.Subreddit, data.PostName) }
> >
<figure> <figure>
<a <a

View file

@ -19,7 +19,7 @@ templ ImageDownloadStartNotification(data ImageDownloadStartNotificationData) {
x-data={ fmt.Sprintf("{ init() { setTimeout(() => $el.remove(), %d) }}", data.AutoRemoveDuration.Milliseconds()) } x-data={ fmt.Sprintf("{ init() { setTimeout(() => $el.remove(), %d) }}", data.AutoRemoveDuration.Milliseconds()) }
} }
onclick="this.remove()" onclick="this.remove()"
class="alert alert-info hover:bg-success-content transition-all" class="alert alert-info hover:bg-info-content transition-all"
> >
<span> <span>
<a <a
@ -55,7 +55,7 @@ templ ImageDownloadEndNotification(data ImageDownloadEndNotificationData) {
x-data={ fmt.Sprintf("{ init() { setTimeout(() => $el.remove(), %d) }}", data.AutoRemoveDuration.Milliseconds()) } x-data={ fmt.Sprintf("{ init() { setTimeout(() => $el.remove(), %d) }}", data.AutoRemoveDuration.Milliseconds()) }
} }
onclick="this.remove()" onclick="this.remove()"
class="alert alert-info hover:bg-success-content transition-all" class="alert alert-success hover:bg-success-content transition-all"
> >
<span> <span>
<a <a
@ -67,3 +67,68 @@ templ ImageDownloadEndNotification(data ImageDownloadEndNotificationData) {
</span> </span>
</div> </div>
} }
type ImageDownloadErrorNotificationData struct {
ID string
Subreddit string
PostURL string
PostName string
PostTitle string
Error error
AutoRemoveDuration time.Duration
}
templ ImageDownloadErrorNotification(data ImageDownloadErrorNotificationData) {
<div
id={ data.ID }
if data.AutoRemoveDuration > 0 {
x-data={ fmt.Sprintf("{ init() { setTimeout(() => $el.remove(), %d) }}", data.AutoRemoveDuration.Milliseconds()) }
}
onclick="this.remove()"
class="alert alert-error hover:bg-error-content transition-all"
>
<span>
<a
target="_blank"
href={ templ.SafeURL(fmt.Sprintf("https://www.reddit.com/r/%s", data.Subreddit)) }
>{ data.Subreddit }</a>:
{ data.Error.Error() }
<a href={ templ.SafeURL(data.PostURL) }>{ truncateTitle(data.PostTitle) }</a>
</span>
</div>
}
type ImageDownloadProgressNotificationData struct {
ID string
Subreddit string
PostURL string
PostName string
PostTitle string
ContentLength int64
Downloaded int64
AutoRemoveDuration time.Duration
}
func (i ImageDownloadProgressNotificationData) GetProgress() float64 {
return float64(i.Downloaded) / float64(i.ContentLength)
}
templ ImageDownloadProgressNotification(data ImageDownloadProgressNotificationData) {
<div
id={ data.ID }
if data.AutoRemoveDuration > 0 {
x-data={ fmt.Sprintf("{ init() { setTimeout(() => $el.remove(), %d) }}", data.AutoRemoveDuration.Milliseconds()) }
}
onclick="this.remove()"
class="alert alert-info hover:bg-info-content transition-all"
>
<span>
<a
target="_blank"
href={ templ.SafeURL(fmt.Sprintf("https://www.reddit.com/r/%s", data.Subreddit)) }
>{ data.Subreddit }</a>:
Progress: { fmt.Sprintf("%.2f%%", data.GetProgress()*100) }
<a href={ templ.SafeURL(data.PostURL) }>{ truncateTitle(data.PostTitle) }</a>
</span>
</div>
}