From 7b98cebf7b14c799d8a9e31cd089d89c9c33f30d Mon Sep 17 00:00:00 2001 From: Tigor Hutasuhut Date: Thu, 11 Apr 2024 16:04:13 +0700 Subject: [PATCH] reddit: added image download api --- api/api.go | 4 +- api/bmessage/bmessage.go | 21 ++++-- api/broadcast_download_message.go | 9 --- api/reddit/download_images.go | 88 +++++++++++++++++++++- api/reddit/fancy_reader.go | 39 ---------- api/reddit/image_download_reader.go | 113 ++++++++++++++++++++++++++++ api/reddit/post.go | 20 ++++- config/default.go | 10 +-- 8 files changed, 233 insertions(+), 71 deletions(-) delete mode 100644 api/broadcast_download_message.go delete mode 100644 api/reddit/fancy_reader.go create mode 100644 api/reddit/image_download_reader.go diff --git a/api/api.go b/api/api.go index aa49528..769bccb 100644 --- a/api/api.go +++ b/api/api.go @@ -21,7 +21,7 @@ type API struct { scheduler *cron.Cron scheduleMap map[cron.EntryID]queries.Subreddit - downloadBroadcast *broadcast.Relay[bmessage.DownloadStatusMessage] + downloadBroadcast *broadcast.Relay[bmessage.ImageDownloadMessage] config *config.Config } @@ -32,7 +32,7 @@ func New(q *queries.Queries, db *sql.DB, cfg *config.Config) *API { db: db, scheduler: cron.New(), scheduleMap: make(map[cron.EntryID]queries.Subreddit, 8), - downloadBroadcast: broadcast.NewRelay[bmessage.DownloadStatusMessage](), + downloadBroadcast: broadcast.NewRelay[bmessage.ImageDownloadMessage](), config: cfg, } } diff --git a/api/bmessage/bmessage.go b/api/bmessage/bmessage.go index 26deea8..0c3ea8e 100644 --- a/api/bmessage/bmessage.go +++ b/api/bmessage/bmessage.go @@ -5,17 +5,22 @@ import ( ) type ImageMetadata struct { - URL string - Height int - Width int - ThumbnailURL string - ThumbnailHeight int - ThumbnailWidth int + Kind ImageKind + URL string + Height int + Width int } -type DownloadStatusMessage struct { +type ImageKind int + +const ( + KindImage ImageKind = iota + KindThumbnail +) + +type ImageDownloadMessage struct { Metadata ImageMetadata - ContantLength units.MetricBytes + ContentLength units.MetricBytes Downloaded units.MetricBytes Subreddit string PostURL string diff --git a/api/broadcast_download_message.go b/api/broadcast_download_message.go deleted file mode 100644 index cf273e4..0000000 --- a/api/broadcast_download_message.go +++ /dev/null @@ -1,9 +0,0 @@ -package api - -import "github.com/tigorlazuardi/redmage/db/queries" - -type DownloadStatusMessage struct { - Data queries.Image - Progress float64 - Subreddit string -} diff --git a/api/reddit/download_images.go b/api/reddit/download_images.go index 21d71a5..0173e32 100644 --- a/api/reddit/download_images.go +++ b/api/reddit/download_images.go @@ -3,17 +3,21 @@ package reddit import ( "context" "io" + "net/http" + "github.com/alecthomas/units" "github.com/tigorlazuardi/redmage/api/bmessage" + "github.com/tigorlazuardi/redmage/pkg/errs" + "golang.org/x/sync/errgroup" ) type DownloadStatusBroadcaster interface { - Broadcast(bmessage.DownloadStatusMessage) + Broadcast(bmessage.ImageDownloadMessage) } type NullDownloadStatusBroadcaster struct{} -func (NullDownloadStatusBroadcaster) Broadcast(bmessage.DownloadStatusMessage) {} +func (NullDownloadStatusBroadcaster) Broadcast(bmessage.ImageDownloadMessage) {} type PostImage struct { ImageURL string @@ -22,6 +26,82 @@ type PostImage struct { ThumbnailFile io.Reader } -func (reddit *Reddit) DownloadImage(ctx context.Context, post Post, broadcaster DownloadStatusBroadcaster) (images PostImage, err error) { - return images, err +func (reddit *Reddit) DownloadImage(ctx context.Context, post Post, broadcaster DownloadStatusBroadcaster) (image PostImage, err error) { + imageUrl, thumbnailUrl := post.GetImageURL(), post.GetThumbnailURL() + image.ImageURL = imageUrl + image.ThumbnailURL = thumbnailUrl + + group, groupCtx := errgroup.WithContext(ctx) + group.Go(func() error { + var err error + image.ImageFile, err = reddit.downloadImage(groupCtx, post, bmessage.KindImage, broadcaster) + return err + }) + group.Go(func() error { + var err error + image.ThumbnailFile, err = reddit.downloadImage(groupCtx, post, bmessage.KindThumbnail, broadcaster) + return err + }) + + err = group.Wait() + return image, err +} + +func (reddit *Reddit) downloadImage(ctx context.Context, post Post, kind bmessage.ImageKind, broadcaster DownloadStatusBroadcaster) (io.Reader, error) { + var ( + url string + height int + width int + ) + if kind == bmessage.KindImage { + url = post.GetImageURL() + width, height = post.GetImageSize() + } else { + url = post.GetThumbnailURL() + width, height = post.GetThumbnailSize() + } + ctx, cancel := context.WithTimeout(ctx, reddit.Config.Duration("download.timeout.headers")) + defer cancel() + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return nil, errs.Wrapw(err, "reddit: failed to create request", "url", url) + } + resp, err := reddit.Client.Do(req) + if err != nil { + return nil, errs.Wrapw(err, "reddit: failed to execute request", "url", url) + } + idleSpeedStr := reddit.Config.String("download.timeout.idlespeed") + metricSpeed, _ := units.ParseMetricBytes(idleSpeedStr) + if metricSpeed == 0 { + metricSpeed = 10 * units.KB + } + idr := &ImageDownloadReader{ + OnProgress: func(downloaded int64, contentLength int64, err error) { + broadcaster.Broadcast(bmessage.ImageDownloadMessage{ + Metadata: bmessage.ImageMetadata{ + URL: url, + Height: height, + Width: width, + Kind: kind, + }, + ContentLength: units.MetricBytes(resp.ContentLength), + Downloaded: units.MetricBytes(downloaded), + Subreddit: post.GetSubreddit(), + PostURL: post.GetPermalink(), + PostID: post.GetID(), + Error: err, + }) + }, + IdleTimeout: reddit.Config.Duration("download.timeout.idle"), + IdleSpeedThreshold: metricSpeed, + } + + resp = idr.WrapHTTPResponse(resp) + reader, writer := io.Pipe() + go func() { + defer resp.Body.Close() + _, err := io.Copy(writer, resp.Body) + _ = writer.CloseWithError(err) + }() + return reader, nil } diff --git a/api/reddit/fancy_reader.go b/api/reddit/fancy_reader.go deleted file mode 100644 index 01a4350..0000000 --- a/api/reddit/fancy_reader.go +++ /dev/null @@ -1,39 +0,0 @@ -package reddit - -import ( - "io" - "net/http" -) - -type ProgressReader struct { - OnProgress func(downloaded int64, contentLength int64, err error) - OnClose func(closeErr error) - - reader io.ReadCloser - contentLength int64 - downloaded int64 -} - -func (progressReader *ProgressReader) WrapHTTPResponse(resp *http.Response) *http.Response { - progressReader.reader = resp.Body - progressReader.contentLength = resp.ContentLength - resp.Body = progressReader - return resp -} - -func (progressReader *ProgressReader) Read(p []byte) (n int, err error) { - n, err = progressReader.reader.Read(p) - progressReader.downloaded += int64(n) - if progressReader.OnProgress != nil { - progressReader.OnProgress(progressReader.downloaded, progressReader.contentLength, err) - } - return n, err -} - -func (progressReader *ProgressReader) Close() error { - err := progressReader.reader.Close() - if progressReader.OnClose != nil { - progressReader.OnClose(err) - } - return err -} diff --git a/api/reddit/image_download_reader.go b/api/reddit/image_download_reader.go new file mode 100644 index 0000000..eb44a08 --- /dev/null +++ b/api/reddit/image_download_reader.go @@ -0,0 +1,113 @@ +package reddit + +import ( + "errors" + "io" + "net/http" + "sync" + "sync/atomic" + "time" + + "github.com/alecthomas/units" +) + +var ErrIdleTimeoutReached = errors.New("download idle timeout reached") + +type ImageDownloadReader struct { + OnProgress func(downloaded int64, contentLength int64, err error) + OnClose func(closeErr error) + IdleTimeout time.Duration + IdleSpeedThreshold units.MetricBytes + + errCancel error + cancelDebounce *time.Timer + reader io.ReadCloser + contentLength int64 + + downloaded atomic.Int64 + + deltastart time.Time + deltavalue atomic.Int64 + + end time.Time + + exit chan struct{} + + mu sync.Mutex +} + +func (idr *ImageDownloadReader) WrapHTTPResponse(resp *http.Response) *http.Response { + idr.reader = resp.Body + idr.contentLength = resp.ContentLength + idr.exit = make(chan struct{}, 1) + resp.Body = idr + go func() { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for { + select { + case <-idr.exit: + return + case <-ticker.C: + idr.checkSpeed() + } + } + }() + return resp +} + +func (idr *ImageDownloadReader) checkSpeed() { + now := time.Now() + if idr.deltastart.IsZero() { + idr.deltastart = now + } + + if idr.cancelDebounce == nil { + idr.cancelDebounce = time.AfterFunc(idr.IdleTimeout, func() { + idr.mu.Lock() + defer idr.mu.Unlock() + idr.errCancel = ErrIdleTimeoutReached + }) + } + + if now.Sub(idr.deltastart) < time.Second { + return + } + idr.deltastart = now + + delta := idr.deltavalue.Load() + + if delta >= idr.IdleSpeedThreshold { + idr.deltavalue.Store(0) + idr.cancelDebounce.Stop() + idr.cancelDebounce = nil + } +} + +func (idr *ImageDownloadReader) Read(p []byte) (n int, err error) { + n, err = idr.reader.Read(p) + + idr.deltavalue.Add(int64(n)) + newd := idr.downloaded.Add(int64(n)) + if idr.OnProgress != nil { + idr.OnProgress(newd, idr.contentLength, err) + } + + idr.mu.Lock() + if idr.errCancel != nil { + idr.mu.Unlock() + idr.OnProgress(newd, idr.contentLength, idr.errCancel) + return n, idr.errCancel + } + idr.mu.Unlock() + return n, err +} + +func (idr *ImageDownloadReader) Close() error { + idr.exit <- struct{}{} + err := idr.reader.Close() + if idr.OnClose != nil { + idr.OnClose(err) + } + return err +} diff --git a/api/reddit/post.go b/api/reddit/post.go index b603ca3..d27d494 100644 --- a/api/reddit/post.go +++ b/api/reddit/post.go @@ -197,20 +197,32 @@ func (post *Post) GetImageURL() string { return post.Data.URL } -func (post *Post) GetImageSize() (height, width int) { +func (post *Post) GetImageSize() (width, height int) { if len(post.Data.Preview.Images) == 0 { return 0, 0 } source := post.Data.Preview.Images[0].Source - return source.Height, source.Width + return source.Width, source.Height } func (post *Post) GetThumbnailURL() string { return post.Data.Thumbnail } -func (post *Post) GetThumbnailSize() (height, width int) { - return post.Data.ThumbnailHeight, post.Data.ThumbnailWidth +func (post *Post) GetThumbnailSize() (width, height int) { + return post.Data.ThumbnailWidth, post.Data.ThumbnailHeight +} + +func (post *Post) GetSubreddit() string { + return post.Data.Subreddit +} + +func (post *Post) GetPermalink() string { + return post.Data.Permalink +} + +func (post *Post) GetID() string { + return post.Data.ID } type Data struct { diff --git a/config/default.go b/config/default.go index 4dafb59..c716c03 100644 --- a/config/default.go +++ b/config/default.go @@ -15,11 +15,11 @@ var DefaultConfig = map[string]any{ "db.string": "data.db", "db.automigrate": true, - "download.concurrency": 5, - "download.directory": "", - "download.timeout.firstbyte": "30s", - "download.timeout.idleconnection": "5s", - "download.timeout.idlespeed": 10 * 1024, // 10KB + "download.concurrency": 5, + "download.directory": "", + "download.timeout.headers": "10s", + "download.timeout.idle": "5s", + "download.timeout.idlespeed": "10KB", "http.port": "8080", "http.host": "0.0.0.0",