From 8e45faba28f52656d9d9f6018d83d8330cc75bc3 Mon Sep 17 00:00:00 2001 From: Tigor Hutasuhut Date: Mon, 10 Jun 2024 14:52:53 +0700 Subject: [PATCH] wip: update event system --- api/api.go | 7 ++ api/events/image_download.go | 113 ++++++++++++++++++ api/events/image_download_start.go | 66 ---------- api/events/image_download_subreddit.go | 54 +++++++++ server/routes/events/event_htmx.go | 61 ++++++++++ server/routes/events/event_json.go | 62 ++++++++++ .../{download_simple.go => event_simple.go} | 19 ++- server/routes/events/events.go | 13 +- server/routes/events/subrouter.go | 17 +++ server/routes/routes.go | 3 + views/components/image_card.templ | 1 + .../components/progress/image_download.templ | 69 ++++++++++- 12 files changed, 412 insertions(+), 73 deletions(-) create mode 100644 api/events/image_download.go delete mode 100644 api/events/image_download_start.go create mode 100644 api/events/image_download_subreddit.go create mode 100644 server/routes/events/event_htmx.go create mode 100644 server/routes/events/event_json.go rename server/routes/events/{download_simple.go => event_simple.go} (58%) create mode 100644 server/routes/events/subrouter.go diff --git a/api/api.go b/api/api.go index 18f2e9c..cf3d597 100644 --- a/api/api.go +++ b/api/api.go @@ -8,6 +8,7 @@ import ( "github.com/stephenafamo/bob" "github.com/teivah/broadcast" "github.com/tigorlazuardi/redmage/api/bmessage" + "github.com/tigorlazuardi/redmage/api/events" "github.com/tigorlazuardi/redmage/api/reddit" "github.com/tigorlazuardi/redmage/api/scheduler" "github.com/tigorlazuardi/redmage/config" @@ -24,6 +25,7 @@ type API struct { scheduler *scheduler.Scheduler downloadBroadcast *broadcast.Relay[bmessage.ImageDownloadMessage] + eventBroadcast *broadcast.Relay[events.Event] config *config.Config @@ -57,6 +59,7 @@ func New(deps Dependencies) *API { db: bob.New(deps.DB), sqldb: deps.DB, downloadBroadcast: broadcast.NewRelay[bmessage.ImageDownloadMessage](), + eventBroadcast: broadcast.NewRelay[events.Event](), config: deps.Config, imageSemaphore: make(chan struct{}, deps.Config.Int("download.concurrency.images")), reddit: deps.Reddit, @@ -91,3 +94,7 @@ func (api *API) scheduleRun(subreddit string) { log.New(ctx).Err(err).Error("api: failed to start download subreddit", "subreddit", subreddit) } } + +func (api *API) GetEventBroadcaster() *broadcast.Relay[events.Event] { + return api.eventBroadcast +} diff --git a/api/events/image_download.go b/api/events/image_download.go new file mode 100644 index 0000000..893c06d --- /dev/null +++ b/api/events/image_download.go @@ -0,0 +1,113 @@ +package events + +import ( + "context" + "encoding/json" + "fmt" + "io" + "time" + + "github.com/teivah/broadcast" + "github.com/tigorlazuardi/redmage/pkg/errs" + "github.com/tigorlazuardi/redmage/views/components/progress" +) + +type ImageDownloadEvent string + +const ( + ImageDownloadStart ImageDownloadEvent = "image.download.start" + ImageDownloadEnd ImageDownloadEvent = "image.download.end" + ImageDownloadError ImageDownloadEvent = "image.download.error" + ImageDownloadProgress ImageDownloadEvent = "image.download.progress" +) + +type ImageDownload struct { + EventKind ImageDownloadEvent `json:"event,omitempty"` + ImageURL string `json:"image_url,omitempty"` + ImageHeight int32 `json:"image_height,omitempty"` + ImageWidth int32 `json:"image_width,omitempty"` + ContentLength int64 `json:"content_length,omitempty"` + Downloaded int64 `json:"downloaded,omitempty"` + Subreddit string `json:"subreddit,omitempty"` + PostURL string `json:"post_url,omitempty"` + PostName string `json:"post_name,omitempty"` + PostTitle string `json:"post_title,omitempty"` + PostCreated int64 `json:"post_created,omitempty"` + PostAuthor string `json:"post_author,omitempty"` + PostAuthorURL string `json:"post_author_url,omitempty"` + ImageRelativePath string `json:"image_relative_path,omitempty"` + ImageOriginalURL string `json:"image_original_url,omitempty"` + ImageSize int64 `json:"image_size,omitempty"` + ThumbnailRelativePath string `json:"thumbnail_relative_path,omitempty"` + NSFW int32 `json:"nsfw,omitempty"` + Error error `json:"error,omitempty"` + Device string `json:"device,omitempty"` +} + +// Render the template. +func (im ImageDownload) Render(ctx context.Context, w io.Writer) error { + switch im.EventKind { + case ImageDownloadStart: + return progress.ImageDownloadStartNotification(progress.ImageDownloadStartNotificationData{ + ID: fmt.Sprintf("notif-image-download-%s-%s", im.Subreddit, im.PostName), + Subreddit: im.Subreddit, + PostName: im.PostName, + PostTitle: im.PostTitle, + PostURL: im.PostURL, + AutoRemoveDuration: time.Second * 5, + }).Render(ctx, w) + case ImageDownloadEnd: + return progress.ImageDownloadEndNotification(progress.ImageDownloadEndNotificationData{ + ID: fmt.Sprintf("notif-image-download-%s-%s", im.Subreddit, im.PostName), + Subreddit: im.Subreddit, + PostURL: im.PostName, + PostName: im.PostTitle, + PostTitle: im.PostURL, + AutoRemoveDuration: time.Second * 5, + }).Render(ctx, w) + case ImageDownloadError: + return progress.ImageDownloadErrorNotification(progress.ImageDownloadErrorNotificationData{ + ID: fmt.Sprintf("notif-image-download-%s-%s", im.Subreddit, im.PostName), + Subreddit: im.Subreddit, + PostURL: im.PostName, + PostName: im.PostTitle, + PostTitle: im.PostURL, + Error: im.Error, + AutoRemoveDuration: time.Second * 5, + }).Render(ctx, w) + case ImageDownloadProgress: + return progress.ImageDownloadProgressNotification(progress.ImageDownloadProgressNotificationData{ + ID: fmt.Sprintf("notif-image-download-%s-%s", im.Subreddit, im.PostName), + Subreddit: im.Subreddit, + PostURL: im.PostName, + PostName: im.PostTitle, + PostTitle: im.PostURL, + ContentLength: im.ContentLength, + Downloaded: im.Downloaded, + AutoRemoveDuration: time.Second * 5, + }).Render(ctx, w) + default: + return errs.Fail("events.ImageDownload: unknown event kind", "event", im) + } +} + +// Event returns the event name +func (im ImageDownload) Event() string { + return string(im.EventKind) +} + +// SerializeTo writes the event data to the writer. +// +// SerializeTo must not write multiple linebreaks (single linebreak is fine) +// in succession to the writer since it will mess up SSE events. +func (im ImageDownload) SerializeTo(w io.Writer) error { + return json.NewEncoder(w).Encode(im) +} + +func PublishImageDownloadEvent(bc *broadcast.Relay[Event], event ImageDownload) { + bc.Broadcast(event) + bc.Broadcast(ImageDownloadSubreddit{event}) + if event.EventKind == ImageDownloadEnd { + bc.Broadcast(ImageDownloadSubredditCard{event}) + } +} diff --git a/api/events/image_download_start.go b/api/events/image_download_start.go deleted file mode 100644 index 4ca6dd3..0000000 --- a/api/events/image_download_start.go +++ /dev/null @@ -1,66 +0,0 @@ -package events - -import ( - "context" - "encoding/json" - "io" - - "github.com/tigorlazuardi/redmage/pkg/errs" - "github.com/tigorlazuardi/redmage/views/components/progress" -) - -type ImageDownloadEvent string - -const ( - ImageDownloadStart ImageDownloadEvent = "image.download.start" - ImageDownloadEnd ImageDownloadEvent = "image.download.end" - ImageDownloadError ImageDownloadEvent = "image.download.error" - ImageDownloadProgress ImageDownloadEvent = "image.download.progress" -) - -type ImageDownload struct { - EventKind ImageDownloadEvent `json:"event"` - ImageURL string `json:"image_url"` - ImageHeight int64 `json:"image_height"` - ImageWidth int64 `json:"image_width"` - ContentLength int64 `json:"content_length"` - Downloaded int64 `json:"downloaded"` - Subreddit string `json:"subreddit"` - PostURL string `json:"post_url"` - PostName string `json:"post_name"` - PostTitle string `json:"post_title"` - Error error `json:"error"` -} - -// Render the template. -func (im ImageDownload) Render(ctx context.Context, w io.Writer) error { - switch im.EventKind { - case ImageDownloadStart: - return progress.ImageDownloadStartNotification(progress.ImageDownloadStartNotificationData{}).Render(ctx, w) - case ImageDownloadEnd: - return progress.ImageDownloadEndNotification(progress.ImageDownloadEndNotificationData{}).Render(ctx, w) - default: - return errs.Fail("events.ImageDownload: unknown event kind", "event", im) - } -} - -// Event returns the event name -func (im ImageDownload) Event() string { - return "image.download.notification" -} - -// SerializeTo writes the event data to the writer. -// -// SerializeTo must not write multiple linebreaks (single linebreak is fine) -// in succession to the writer since it will mess up SSE events. -func (im ImageDownload) SerializeTo(w io.Writer) error { - return json.NewEncoder(w).Encode(im) -} - -type ImageDownloadSubreddit struct { - ImageDownload -} - -func (im ImageDownloadSubreddit) Event() string { - return string(im.EventKind) + "." + im.Subreddit -} diff --git a/api/events/image_download_subreddit.go b/api/events/image_download_subreddit.go new file mode 100644 index 0000000..9fcd1db --- /dev/null +++ b/api/events/image_download_subreddit.go @@ -0,0 +1,54 @@ +package events + +import ( + "context" + "io" + "time" + + "github.com/tigorlazuardi/redmage/models" + "github.com/tigorlazuardi/redmage/views/components" +) + +type ImageDownloadSubreddit struct { + ImageDownload +} + +func (im ImageDownloadSubreddit) Event() string { + return string(im.EventKind) + "." + im.Subreddit +} + +type ImageDownloadSubredditCard struct { + ImageDownload +} + +func (im ImageDownloadSubredditCard) Event() string { + return string(im.EventKind) + "." + im.Subreddit + ".card" +} + +func (im ImageDownloadSubredditCard) Render(ctx context.Context, w io.Writer) error { + if im.EventKind == ImageDownloadEnd { + now := time.Now().Unix() + data := &models.Image{ + Subreddit: im.Subreddit, + Device: im.Device, + PostTitle: im.PostTitle, + PostName: im.PostName, + PostURL: im.PostURL, + PostCreated: im.PostCreated, + PostAuthor: im.PostAuthor, + PostAuthorURL: im.PostAuthor, + ImageRelativePath: im.ImageRelativePath, + ImageOriginalURL: im.ImageOriginalURL, + ImageHeight: im.ImageHeight, + ImageWidth: im.ImageWidth, + ImageSize: im.ImageSize, + ThumbnailRelativePath: im.ThumbnailRelativePath, + NSFW: im.NSFW, + CreatedAt: now, + UpdatedAt: now, + } + return components.ImageCard(data, 0).Render(ctx, w) + } else { + return im.ImageDownload.Render(ctx, w) + } +} diff --git a/server/routes/events/event_htmx.go b/server/routes/events/event_htmx.go new file mode 100644 index 0000000..1ab5a3f --- /dev/null +++ b/server/routes/events/event_htmx.go @@ -0,0 +1,61 @@ +package events + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + + "github.com/tigorlazuardi/redmage/pkg/log" +) + +func (handler *Handler) HTMXEvents(rw http.ResponseWriter, r *http.Request) { + ctx, span := tracer.Start(r.Context(), "*Routes.HTMXEvents") + defer span.End() + + flush, ok := rw.(http.Flusher) + if !ok { + rw.WriteHeader(http.StatusInternalServerError) + _ = json.NewEncoder(rw).Encode(map[string]string{"error": "response writer does not support streaming"}) + return + } + filters := strings.Split(r.URL.Query().Get("filter"), ",") + + log.New(ctx).Info("new htmx event stream connection", "user_agent", r.UserAgent()) + rw.Header().Set("Content-Type", "text/event-stream") + rw.Header().Set("Cache-Control", "no-cache") + rw.Header().Set("Connection", "keep-alive") + rw.WriteHeader(200) + flush.Flush() + + ev, close := handler.Subscribe() + defer close() + +loop: + for { + select { + case <-r.Context().Done(): + log.New(ctx).Info("simple event stream connection closed", "user_agent", r.UserAgent()) + return + case event := <-ev: + msg := event.Event() + for _, filter := range filters { + if filter != msg { + continue loop + } + } + if _, err := fmt.Fprintf(rw, "event: %s\ndata: ", msg); err != nil { + return + } + if err := event.Render(ctx, rw); err != nil { + log.New(ctx).Err(err).Error("failed to render event", "user_agent", r.UserAgent()) + return + } + if _, err := io.WriteString(rw, "\n\n"); err != nil { + return + } + flush.Flush() + } + } +} diff --git a/server/routes/events/event_json.go b/server/routes/events/event_json.go new file mode 100644 index 0000000..d26978c --- /dev/null +++ b/server/routes/events/event_json.go @@ -0,0 +1,62 @@ +package events + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + + "github.com/tigorlazuardi/redmage/pkg/log" +) + +func (handler *Handler) JSONEvents(rw http.ResponseWriter, r *http.Request) { + ctx, span := tracer.Start(r.Context(), "*Routes.HTMXEvents") + defer span.End() + + flush, ok := rw.(http.Flusher) + if !ok { + rw.WriteHeader(http.StatusInternalServerError) + _ = json.NewEncoder(rw).Encode(map[string]string{"error": "response writer does not support streaming"}) + return + } + filters := strings.Split(r.URL.Query().Get("filter"), ",") + + log.New(ctx).Info("new htmx event stream connection", "user_agent", r.UserAgent()) + rw.Header().Set("Content-Type", "text/event-stream") + rw.Header().Set("Cache-Control", "no-cache") + rw.Header().Set("Connection", "keep-alive") + rw.WriteHeader(200) + flush.Flush() + + ev, close := handler.Subscribe() + defer close() + +loop: + for { + select { + case <-r.Context().Done(): + log.New(ctx).Info("simple event stream connection closed", "user_agent", r.UserAgent()) + return + case event := <-ev: + msg := event.Event() + for _, filter := range filters { + if filter != msg { + continue loop + } + } + if _, err := fmt.Fprintf(rw, "event: %s\ndata: ", msg); err != nil { + return + } + if err := json.NewEncoder(rw).Encode(event); err != nil { + log.New(ctx).Err(err).Error("failed to send json event", "user_agent", r.UserAgent()) + return + } + // single new line because JSON already has new line + if _, err := io.WriteString(rw, "\n"); err != nil { + return + } + flush.Flush() + } + } +} diff --git a/server/routes/events/download_simple.go b/server/routes/events/event_simple.go similarity index 58% rename from server/routes/events/download_simple.go rename to server/routes/events/event_simple.go index d784f3b..728ab58 100644 --- a/server/routes/events/download_simple.go +++ b/server/routes/events/event_simple.go @@ -4,12 +4,19 @@ import ( "encoding/json" "fmt" "net/http" + "strings" "github.com/tigorlazuardi/redmage/pkg/log" ) -func (handler *Handler) SimpleDownloadEvent(rw http.ResponseWriter, r *http.Request) { - ctx, span := tracer.Start(r.Context(), "*Routes.EventsAPI") +// SimpleEvents is a simple event stream for the purpose of +// notification that something did happen. Not what the content of the event is. +// +// Useful for simple notification whose client just need to know that something +// happened and do something that does not require the content of the event, +// like refreshing the list by calling another http request. +func (handler *Handler) SimpleEvents(rw http.ResponseWriter, r *http.Request) { + ctx, span := tracer.Start(r.Context(), "*Routes.SimpleDownloadEvent") defer span.End() flush, ok := rw.(http.Flusher) @@ -19,6 +26,8 @@ func (handler *Handler) SimpleDownloadEvent(rw http.ResponseWriter, r *http.Requ return } + filters := strings.Split(r.URL.Query().Get("filter"), ",") + log.New(ctx).Info("new simple event stream connection", "user_agent", r.UserAgent()) rw.Header().Set("Content-Type", "text/event-stream") @@ -30,6 +39,7 @@ func (handler *Handler) SimpleDownloadEvent(rw http.ResponseWriter, r *http.Requ ev, close := handler.Subscribe() defer close() +loop: for { select { case <-r.Context().Done(): @@ -37,6 +47,11 @@ func (handler *Handler) SimpleDownloadEvent(rw http.ResponseWriter, r *http.Requ return case event := <-ev: msg := event.Event() + for _, filter := range filters { + if filter != msg { + continue loop + } + } if _, err := fmt.Fprintf(rw, "event: %s\ndata: %s\n\n", msg, msg); err != nil { return } diff --git a/server/routes/events/events.go b/server/routes/events/events.go index b37cf10..77502f0 100644 --- a/server/routes/events/events.go +++ b/server/routes/events/events.go @@ -2,16 +2,23 @@ package events import ( "github.com/teivah/broadcast" - "github.com/tigorlazuardi/redmage/api/events" + apievents "github.com/tigorlazuardi/redmage/api/events" "github.com/tigorlazuardi/redmage/config" ) type Handler struct { Config *config.Config - Broadcast *broadcast.Relay[events.Event] + Broadcast *broadcast.Relay[apievents.Event] } -func (handler *Handler) Subscribe() (<-chan events.Event, func()) { +func NewHandler(cfg *config.Config, broadcast *broadcast.Relay[apievents.Event]) *Handler { + return &Handler{ + Config: cfg, + Broadcast: broadcast, + } +} + +func (handler *Handler) Subscribe() (<-chan apievents.Event, func()) { listener := handler.Broadcast.Listener(10) return listener.Ch(), listener.Close } diff --git a/server/routes/events/subrouter.go b/server/routes/events/subrouter.go new file mode 100644 index 0000000..a26538f --- /dev/null +++ b/server/routes/events/subrouter.go @@ -0,0 +1,17 @@ +package events + +import ( + "github.com/go-chi/chi/v5" + "github.com/riandyrn/otelchi" + "github.com/tigorlazuardi/redmage/server/routes/middleware" + + chimiddleware "github.com/go-chi/chi/v5/middleware" +) + +func (handler *Handler) Route(router chi.Router) { + router.Use(otelchi.Middleware("redmage")) + router.Use(chimiddleware.RequestLogger(middleware.ChiLogger{})) + router.Get("/", handler.HTMXEvents) + router.Get("/simple", handler.SimpleEvents) + router.Get("/json", handler.JSONEvents) +} diff --git a/server/routes/routes.go b/server/routes/routes.go index 975f24e..7a54a78 100644 --- a/server/routes/routes.go +++ b/server/routes/routes.go @@ -10,6 +10,7 @@ import ( "github.com/riandyrn/otelchi" "github.com/tigorlazuardi/redmage/api" "github.com/tigorlazuardi/redmage/config" + "github.com/tigorlazuardi/redmage/server/routes/events" "github.com/tigorlazuardi/redmage/server/routes/middleware" ) @@ -30,6 +31,8 @@ func (routes *Routes) Register(router chi.Router) { router.Route("/htmx", routes.registerHTMXRoutes) router.Route("/api/v1", routes.registerV1APIRoutes) + eventHandler := events.NewHandler(routes.Config, routes.API.GetEventBroadcaster()) + router.Route("/events", eventHandler.Route) router.Group(routes.registerWWWRoutes) } diff --git a/views/components/image_card.templ b/views/components/image_card.templ index d434534..b9ff7fe 100644 --- a/views/components/image_card.templ +++ b/views/components/image_card.templ @@ -37,6 +37,7 @@ templ ImageCard(data *models.Image, opts ImageCardOption) { }, }`, data.CreatedAt) } class="not-prose card card-bordered bg-base-100 hover:bg-base-200 shadow-xl min-w-[16rem] max-w-[16rem] rounded-xl top-0 hover:-top-1 hover:drop-shadow-2xl transition-all" + id={ fmt.Sprintf("image-card-%s-%s", data.Subreddit, data.PostName) } >
$el.remove(), %d) }}", data.AutoRemoveDuration.Milliseconds()) } } onclick="this.remove()" - class="alert alert-info hover:bg-success-content transition-all" + class="alert alert-info hover:bg-info-content transition-all" > $el.remove(), %d) }}", data.AutoRemoveDuration.Milliseconds()) } } onclick="this.remove()" - class="alert alert-info hover:bg-success-content transition-all" + class="alert alert-success hover:bg-success-content transition-all" > } + +type ImageDownloadErrorNotificationData struct { + ID string + Subreddit string + PostURL string + PostName string + PostTitle string + Error error + AutoRemoveDuration time.Duration +} + +templ ImageDownloadErrorNotification(data ImageDownloadErrorNotificationData) { + +} + +type ImageDownloadProgressNotificationData struct { + ID string + Subreddit string + PostURL string + PostName string + PostTitle string + ContentLength int64 + Downloaded int64 + AutoRemoveDuration time.Duration +} + +func (i ImageDownloadProgressNotificationData) GetProgress() float64 { + return float64(i.Downloaded) / float64(i.ContentLength) +} + +templ ImageDownloadProgressNotification(data ImageDownloadProgressNotificationData) { +
0 { + x-data={ fmt.Sprintf("{ init() { setTimeout(() => $el.remove(), %d) }}", data.AutoRemoveDuration.Milliseconds()) } + } + onclick="this.remove()" + class="alert alert-info hover:bg-info-content transition-all" + > + + { data.Subreddit }: + Progress: { fmt.Sprintf("%.2f%%", data.GetProgress()*100) } + { truncateTitle(data.PostTitle) } + +
+}