events: added events api route
This commit is contained in:
parent
0b04bed992
commit
6b35321e1e
10
api/events.go
Normal file
10
api/events.go
Normal file
|
@ -0,0 +1,10 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"github.com/tigorlazuardi/redmage/api/bmessage"
|
||||
)
|
||||
|
||||
func (api *API) SubscribeImageDownloadEvent() (<-chan bmessage.ImageDownloadMessage, func()) {
|
||||
listener := api.downloadBroadcast.Listener(10)
|
||||
return listener.Ch(), listener.Close
|
||||
}
|
57
server/routes/events.go
Normal file
57
server/routes/events.go
Normal file
|
@ -0,0 +1,57 @@
|
|||
package routes
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/http"
|
||||
|
||||
"github.com/tigorlazuardi/redmage/pkg/log"
|
||||
)
|
||||
|
||||
func (routes *Routes) EventsAPI(rw http.ResponseWriter, r *http.Request) {
|
||||
ctx, span := tracer.Start(r.Context(), "*Routes.EventsAPI")
|
||||
defer span.End()
|
||||
|
||||
flush, ok := rw.(http.Flusher)
|
||||
if !ok {
|
||||
rw.WriteHeader(http.StatusInternalServerError)
|
||||
_ = json.NewEncoder(rw).Encode(map[string]string{"error": "server does not support streaming"})
|
||||
return
|
||||
}
|
||||
|
||||
log.New(ctx).Info("new event stream connection", "user_agent", r.UserAgent())
|
||||
|
||||
rw.Header().Set("Content-Type", "text/event-stream")
|
||||
rw.Header().Set("Cache-Control", "no-cache")
|
||||
rw.Header().Set("Connection", "keep-alive")
|
||||
rw.WriteHeader(200)
|
||||
flush.Flush()
|
||||
|
||||
downloadEvent, closeDownloadEvent := routes.API.SubscribeImageDownloadEvent()
|
||||
defer closeDownloadEvent()
|
||||
|
||||
enc := json.NewEncoder(rw)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-r.Context().Done():
|
||||
log.New(ctx).Info("event stream connection closed", "user_agent", r.UserAgent())
|
||||
return
|
||||
case event := <-downloadEvent:
|
||||
if _, err := io.WriteString(rw, "event: image_download\n"); err != nil {
|
||||
return
|
||||
}
|
||||
if _, err := io.WriteString(rw, "data: "); err != nil {
|
||||
return
|
||||
}
|
||||
if err := enc.Encode(event); err != nil {
|
||||
return
|
||||
}
|
||||
// Single '\n' because enc.Encode already append '\n'
|
||||
if _, err := io.WriteString(rw, "\n"); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
flush.Flush()
|
||||
}
|
||||
}
|
|
@ -44,6 +44,8 @@ func (routes *Routes) registerV1APIRoutes(router chi.Router) {
|
|||
router.Get("/devices", routes.APIDeviceList)
|
||||
router.Post("/devices", routes.APIDeviceCreate)
|
||||
router.Patch("/devices/{id}", routes.APIDeviceUpdate)
|
||||
|
||||
router.Get("/events", routes.EventsAPI)
|
||||
}
|
||||
|
||||
func (routes *Routes) registerWWWRoutes(router chi.Router) {
|
||||
|
|
Loading…
Reference in a new issue