events: implemented in download section

This commit is contained in:
Tigor Hutasuhut 2024-06-11 15:13:56 +07:00
parent f13b54d549
commit ca96d4a805
5 changed files with 66 additions and 45 deletions

View file

@ -139,7 +139,7 @@ func (api *API) downloadSubredditImage(ctx context.Context, post reddit.Post, su
ctx, span := tracer.Start(ctx, "*API.downloadSubredditImage") ctx, span := tracer.Start(ctx, "*API.downloadSubredditImage")
defer span.End() defer span.End()
imageHandler, err := api.reddit.DownloadImage(ctx, post, api.downloadBroadcast) imageHandler, err := api.reddit.DownloadImage(ctx, post, api.eventBroadcast)
if err != nil { if err != nil {
return errs.Wrapw(err, "failed to download image") return errs.Wrapw(err, "failed to download image")
} }

View file

@ -44,6 +44,10 @@ type ImageDownload struct {
Device string `json:"device,omitempty"` Device string `json:"device,omitempty"`
} }
func (id ImageDownload) Clone() ImageDownload {
return id
}
// Render the template. // Render the template.
func (im ImageDownload) Render(ctx context.Context, w io.Writer) error { func (im ImageDownload) Render(ctx context.Context, w io.Writer) error {
switch im.EventKind { switch im.EventKind {

View file

@ -7,7 +7,9 @@ import (
"net/http" "net/http"
"github.com/alecthomas/units" "github.com/alecthomas/units"
"github.com/teivah/broadcast"
"github.com/tigorlazuardi/redmage/api/bmessage" "github.com/tigorlazuardi/redmage/api/bmessage"
"github.com/tigorlazuardi/redmage/api/events"
"github.com/tigorlazuardi/redmage/pkg/errs" "github.com/tigorlazuardi/redmage/pkg/errs"
) )
@ -35,7 +37,7 @@ func (po *PostImage) Close() error {
// DownloadImage downloades the image. // DownloadImage downloades the image.
// //
// If downloading image or thumbnail fails // If downloading image or thumbnail fails
func (reddit *Reddit) DownloadImage(ctx context.Context, post Post, broadcaster DownloadStatusBroadcaster) (image PostImage, err error) { func (reddit *Reddit) DownloadImage(ctx context.Context, post Post, broadcaster *broadcast.Relay[events.Event]) (image PostImage, err error) {
ctx, span := tracer.Start(ctx, "*Reddit.DownloadImage") ctx, span := tracer.Start(ctx, "*Reddit.DownloadImage")
defer span.End() defer span.End()
imageUrl := post.GetImageURL() imageUrl := post.GetImageURL()
@ -45,7 +47,7 @@ func (reddit *Reddit) DownloadImage(ctx context.Context, post Post, broadcaster
return image, err return image, err
} }
func (reddit *Reddit) DownloadThumbnail(ctx context.Context, post Post, broadcaster DownloadStatusBroadcaster) (image PostImage, err error) { func (reddit *Reddit) DownloadThumbnail(ctx context.Context, post Post, broadcaster *broadcast.Relay[events.Event]) (image PostImage, err error) {
ctx, span := tracer.Start(ctx, "*Reddit.DownloadThumbnail") ctx, span := tracer.Start(ctx, "*Reddit.DownloadThumbnail")
defer span.End() defer span.End()
imageUrl := post.GetThumbnailURL() imageUrl := post.GetThumbnailURL()
@ -55,7 +57,7 @@ func (reddit *Reddit) DownloadThumbnail(ctx context.Context, post Post, broadcas
return image, err return image, err
} }
func (reddit *Reddit) downloadImage(ctx context.Context, post Post, kind bmessage.ImageKind, broadcaster DownloadStatusBroadcaster) (io.ReadCloser, error) { func (reddit *Reddit) downloadImage(ctx context.Context, post Post, kind bmessage.ImageKind, broadcaster *broadcast.Relay[events.Event]) (io.ReadCloser, error) {
var ( var (
url string url string
height int64 height int64
@ -87,45 +89,46 @@ func (reddit *Reddit) downloadImage(ctx context.Context, post Post, kind bmessag
if metricSpeed == 0 { if metricSpeed == 0 {
metricSpeed = 10 * units.KB metricSpeed = 10 * units.KB
} }
metadata := bmessage.ImageMetadata{ eventData := events.ImageDownload{
URL: url, ImageURL: post.GetImageURL(),
Height: height, ImageHeight: int32(height),
Width: width, ImageWidth: int32(width),
Kind: kind, Subreddit: post.GetSubreddit(),
PostURL: post.GetPostURL(),
PostName: post.GetPostName(),
PostTitle: post.GetPostTitle(),
PostCreated: post.GetPostCreated(),
PostAuthor: post.GetAuthor(),
PostAuthorURL: post.GetAuthorURL(),
ImageOriginalURL: post.GetImageURL(),
NSFW: int32(post.IsNSFWInt()),
} }
idr := &ImageDownloadReader{ idr := &ImageDownloadReader{
OnProgress: func(downloaded int64, contentLength int64, err error) { OnProgress: func(downloaded int64, contentLength int64, err error) {
var event bmessage.DownloadEvent var kind events.ImageDownloadEvent
if errors.Is(err, io.EOF) { if errors.Is(err, io.EOF) {
err = nil err = nil
} }
if err != nil { if err != nil {
event = bmessage.DownloadError kind = events.ImageDownloadError
} else { } else {
event = bmessage.DownloadProgress kind = events.ImageDownloadProgress
} }
broadcaster.Broadcast(bmessage.ImageDownloadMessage{ ev := eventData.Clone()
Event: event, ev.EventKind = kind
Metadata: metadata, ev.Downloaded = downloaded
ContentLength: units.MetricBytes(resp.ContentLength), ev.ImageSize = contentLength
Downloaded: units.MetricBytes(downloaded), ev.ContentLength = contentLength
Subreddit: post.GetSubreddit(), events.PublishImageDownloadEvent(broadcaster, ev)
PostURL: post.GetPermalink(),
PostID: post.GetID(),
Error: err,
})
}, },
OnClose: func(downloaded, contentLength int64, closeErr error) { OnClose: func(downloaded, contentLength int64, closeErr error) {
broadcaster.Broadcast(bmessage.ImageDownloadMessage{ ev := eventData.Clone()
Event: bmessage.DownloadEnd, ev.EventKind = events.ImageDownloadEnd
Metadata: metadata, ev.Downloaded = downloaded
ContentLength: units.MetricBytes(resp.ContentLength), ev.ImageSize = contentLength
Downloaded: units.MetricBytes(downloaded), ev.ContentLength = contentLength
Subreddit: post.GetSubreddit(), ev.Error = closeErr
PostURL: post.GetPostURL(), events.PublishImageDownloadEvent(broadcaster, ev)
PostID: post.GetID(),
Error: closeErr,
})
}, },
IdleTimeout: reddit.Config.Duration("download.timeout.idle"), IdleTimeout: reddit.Config.Duration("download.timeout.idle"),
IdleSpeedThreshold: metricSpeed, IdleSpeedThreshold: metricSpeed,
@ -135,18 +138,11 @@ func (reddit *Reddit) downloadImage(ctx context.Context, post Post, kind bmessag
reader, writer := io.Pipe() reader, writer := io.Pipe()
go func() { go func() {
defer resp.Body.Close() defer resp.Body.Close()
broadcaster.Broadcast(bmessage.ImageDownloadMessage{ ev := eventData.Clone()
Event: bmessage.DownloadStart, ev.EventKind = events.ImageDownloadStart
Metadata: bmessage.ImageMetadata{ ev.ImageSize = resp.ContentLength
URL: url, ev.ContentLength = resp.ContentLength
Height: height, events.PublishImageDownloadEvent(broadcaster, ev)
Width: width,
Kind: kind,
},
Subreddit: post.GetSubreddit(),
PostURL: post.GetPostURL(),
PostID: post.GetID(),
})
_, err := io.Copy(writer, resp.Body) _, err := io.Copy(writer, resp.Body)
_ = writer.CloseWithError(err) _ = writer.CloseWithError(err)
}() }()

View file

@ -225,6 +225,13 @@ func (post *Post) IsNSFW() bool {
return post.Data.Over18 return post.Data.Over18
} }
func (post *Post) IsNSFWInt() int {
if post.IsNSFW() {
return 1
}
return 0
}
func (post *Post) IsImagePost() bool { func (post *Post) IsImagePost() bool {
return post.Data.PostHint == "image" return post.Data.PostHint == "image"
} }
@ -375,6 +382,18 @@ func (post *Post) GetPostURL() string {
return fmt.Sprintf("https://reddit.com%s", post.Data.Permalink) return fmt.Sprintf("https://reddit.com%s", post.Data.Permalink)
} }
func (post *Post) GetPostName() string {
return post.Data.Name
}
func (post *Post) GetPostTitle() string {
return post.Data.Title
}
func (post *Post) GetPostCreated() int64 {
return int64(post.Data.Created)
}
func (post *Post) GetID() string { func (post *Post) GetID() string {
return post.Data.ID return post.Data.ID
} }

View file

@ -1,6 +1,8 @@
package reddit package reddit
import "github.com/tigorlazuardi/redmage/config" import (
"github.com/tigorlazuardi/redmage/config"
)
type Reddit struct { type Reddit struct {
Client Client Client Client