From 6b35321e1ec344538e79cffb92b8c144388c5acb Mon Sep 17 00:00:00 2001 From: Tigor Hutasuhut Date: Fri, 26 Apr 2024 15:13:29 +0700 Subject: [PATCH] events: added events api route --- api/events.go | 10 ++++++++ server/routes/events.go | 57 +++++++++++++++++++++++++++++++++++++++++ server/routes/routes.go | 2 ++ 3 files changed, 69 insertions(+) create mode 100644 api/events.go create mode 100644 server/routes/events.go diff --git a/api/events.go b/api/events.go new file mode 100644 index 0000000..f524511 --- /dev/null +++ b/api/events.go @@ -0,0 +1,10 @@ +package api + +import ( + "github.com/tigorlazuardi/redmage/api/bmessage" +) + +func (api *API) SubscribeImageDownloadEvent() (<-chan bmessage.ImageDownloadMessage, func()) { + listener := api.downloadBroadcast.Listener(10) + return listener.Ch(), listener.Close +} diff --git a/server/routes/events.go b/server/routes/events.go new file mode 100644 index 0000000..654730d --- /dev/null +++ b/server/routes/events.go @@ -0,0 +1,57 @@ +package routes + +import ( + "encoding/json" + "io" + "net/http" + + "github.com/tigorlazuardi/redmage/pkg/log" +) + +func (routes *Routes) EventsAPI(rw http.ResponseWriter, r *http.Request) { + ctx, span := tracer.Start(r.Context(), "*Routes.EventsAPI") + defer span.End() + + flush, ok := rw.(http.Flusher) + if !ok { + rw.WriteHeader(http.StatusInternalServerError) + _ = json.NewEncoder(rw).Encode(map[string]string{"error": "server does not support streaming"}) + return + } + + log.New(ctx).Info("new event stream connection", "user_agent", r.UserAgent()) + + rw.Header().Set("Content-Type", "text/event-stream") + rw.Header().Set("Cache-Control", "no-cache") + rw.Header().Set("Connection", "keep-alive") + rw.WriteHeader(200) + flush.Flush() + + downloadEvent, closeDownloadEvent := routes.API.SubscribeImageDownloadEvent() + defer closeDownloadEvent() + + enc := json.NewEncoder(rw) + + for { + select { + case <-r.Context().Done(): + log.New(ctx).Info("event stream connection closed", "user_agent", r.UserAgent()) + return + case event := <-downloadEvent: + if _, err := io.WriteString(rw, "event: image_download\n"); err != nil { + return + } + if _, err := io.WriteString(rw, "data: "); err != nil { + return + } + if err := enc.Encode(event); err != nil { + return + } + // Single '\n' because enc.Encode already append '\n' + if _, err := io.WriteString(rw, "\n"); err != nil { + return + } + } + flush.Flush() + } +} diff --git a/server/routes/routes.go b/server/routes/routes.go index 0afe43e..371c8b5 100644 --- a/server/routes/routes.go +++ b/server/routes/routes.go @@ -44,6 +44,8 @@ func (routes *Routes) registerV1APIRoutes(router chi.Router) { router.Get("/devices", routes.APIDeviceList) router.Post("/devices", routes.APIDeviceCreate) router.Patch("/devices/{id}", routes.APIDeviceUpdate) + + router.Get("/events", routes.EventsAPI) } func (routes *Routes) registerWWWRoutes(router chi.Router) {