reddit: added image download api

This commit is contained in:
Tigor Hutasuhut 2024-04-11 16:04:13 +07:00
parent 63f17437b7
commit 7b98cebf7b
8 changed files with 233 additions and 71 deletions

View file

@ -21,7 +21,7 @@ type API struct {
scheduler *cron.Cron scheduler *cron.Cron
scheduleMap map[cron.EntryID]queries.Subreddit scheduleMap map[cron.EntryID]queries.Subreddit
downloadBroadcast *broadcast.Relay[bmessage.DownloadStatusMessage] downloadBroadcast *broadcast.Relay[bmessage.ImageDownloadMessage]
config *config.Config config *config.Config
} }
@ -32,7 +32,7 @@ func New(q *queries.Queries, db *sql.DB, cfg *config.Config) *API {
db: db, db: db,
scheduler: cron.New(), scheduler: cron.New(),
scheduleMap: make(map[cron.EntryID]queries.Subreddit, 8), scheduleMap: make(map[cron.EntryID]queries.Subreddit, 8),
downloadBroadcast: broadcast.NewRelay[bmessage.DownloadStatusMessage](), downloadBroadcast: broadcast.NewRelay[bmessage.ImageDownloadMessage](),
config: cfg, config: cfg,
} }
} }

View file

@ -5,17 +5,22 @@ import (
) )
type ImageMetadata struct { type ImageMetadata struct {
URL string Kind ImageKind
Height int URL string
Width int Height int
ThumbnailURL string Width int
ThumbnailHeight int
ThumbnailWidth int
} }
type DownloadStatusMessage struct { type ImageKind int
const (
KindImage ImageKind = iota
KindThumbnail
)
type ImageDownloadMessage struct {
Metadata ImageMetadata Metadata ImageMetadata
ContantLength units.MetricBytes ContentLength units.MetricBytes
Downloaded units.MetricBytes Downloaded units.MetricBytes
Subreddit string Subreddit string
PostURL string PostURL string

View file

@ -1,9 +0,0 @@
package api
import "github.com/tigorlazuardi/redmage/db/queries"
type DownloadStatusMessage struct {
Data queries.Image
Progress float64
Subreddit string
}

View file

@ -3,17 +3,21 @@ package reddit
import ( import (
"context" "context"
"io" "io"
"net/http"
"github.com/alecthomas/units"
"github.com/tigorlazuardi/redmage/api/bmessage" "github.com/tigorlazuardi/redmage/api/bmessage"
"github.com/tigorlazuardi/redmage/pkg/errs"
"golang.org/x/sync/errgroup"
) )
type DownloadStatusBroadcaster interface { type DownloadStatusBroadcaster interface {
Broadcast(bmessage.DownloadStatusMessage) Broadcast(bmessage.ImageDownloadMessage)
} }
type NullDownloadStatusBroadcaster struct{} type NullDownloadStatusBroadcaster struct{}
func (NullDownloadStatusBroadcaster) Broadcast(bmessage.DownloadStatusMessage) {} func (NullDownloadStatusBroadcaster) Broadcast(bmessage.ImageDownloadMessage) {}
type PostImage struct { type PostImage struct {
ImageURL string ImageURL string
@ -22,6 +26,82 @@ type PostImage struct {
ThumbnailFile io.Reader ThumbnailFile io.Reader
} }
func (reddit *Reddit) DownloadImage(ctx context.Context, post Post, broadcaster DownloadStatusBroadcaster) (images PostImage, err error) { func (reddit *Reddit) DownloadImage(ctx context.Context, post Post, broadcaster DownloadStatusBroadcaster) (image PostImage, err error) {
return images, err imageUrl, thumbnailUrl := post.GetImageURL(), post.GetThumbnailURL()
image.ImageURL = imageUrl
image.ThumbnailURL = thumbnailUrl
group, groupCtx := errgroup.WithContext(ctx)
group.Go(func() error {
var err error
image.ImageFile, err = reddit.downloadImage(groupCtx, post, bmessage.KindImage, broadcaster)
return err
})
group.Go(func() error {
var err error
image.ThumbnailFile, err = reddit.downloadImage(groupCtx, post, bmessage.KindThumbnail, broadcaster)
return err
})
err = group.Wait()
return image, err
}
func (reddit *Reddit) downloadImage(ctx context.Context, post Post, kind bmessage.ImageKind, broadcaster DownloadStatusBroadcaster) (io.Reader, error) {
var (
url string
height int
width int
)
if kind == bmessage.KindImage {
url = post.GetImageURL()
width, height = post.GetImageSize()
} else {
url = post.GetThumbnailURL()
width, height = post.GetThumbnailSize()
}
ctx, cancel := context.WithTimeout(ctx, reddit.Config.Duration("download.timeout.headers"))
defer cancel()
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, errs.Wrapw(err, "reddit: failed to create request", "url", url)
}
resp, err := reddit.Client.Do(req)
if err != nil {
return nil, errs.Wrapw(err, "reddit: failed to execute request", "url", url)
}
idleSpeedStr := reddit.Config.String("download.timeout.idlespeed")
metricSpeed, _ := units.ParseMetricBytes(idleSpeedStr)
if metricSpeed == 0 {
metricSpeed = 10 * units.KB
}
idr := &ImageDownloadReader{
OnProgress: func(downloaded int64, contentLength int64, err error) {
broadcaster.Broadcast(bmessage.ImageDownloadMessage{
Metadata: bmessage.ImageMetadata{
URL: url,
Height: height,
Width: width,
Kind: kind,
},
ContentLength: units.MetricBytes(resp.ContentLength),
Downloaded: units.MetricBytes(downloaded),
Subreddit: post.GetSubreddit(),
PostURL: post.GetPermalink(),
PostID: post.GetID(),
Error: err,
})
},
IdleTimeout: reddit.Config.Duration("download.timeout.idle"),
IdleSpeedThreshold: metricSpeed,
}
resp = idr.WrapHTTPResponse(resp)
reader, writer := io.Pipe()
go func() {
defer resp.Body.Close()
_, err := io.Copy(writer, resp.Body)
_ = writer.CloseWithError(err)
}()
return reader, nil
} }

View file

@ -1,39 +0,0 @@
package reddit
import (
"io"
"net/http"
)
type ProgressReader struct {
OnProgress func(downloaded int64, contentLength int64, err error)
OnClose func(closeErr error)
reader io.ReadCloser
contentLength int64
downloaded int64
}
func (progressReader *ProgressReader) WrapHTTPResponse(resp *http.Response) *http.Response {
progressReader.reader = resp.Body
progressReader.contentLength = resp.ContentLength
resp.Body = progressReader
return resp
}
func (progressReader *ProgressReader) Read(p []byte) (n int, err error) {
n, err = progressReader.reader.Read(p)
progressReader.downloaded += int64(n)
if progressReader.OnProgress != nil {
progressReader.OnProgress(progressReader.downloaded, progressReader.contentLength, err)
}
return n, err
}
func (progressReader *ProgressReader) Close() error {
err := progressReader.reader.Close()
if progressReader.OnClose != nil {
progressReader.OnClose(err)
}
return err
}

View file

@ -0,0 +1,113 @@
package reddit
import (
"errors"
"io"
"net/http"
"sync"
"sync/atomic"
"time"
"github.com/alecthomas/units"
)
var ErrIdleTimeoutReached = errors.New("download idle timeout reached")
type ImageDownloadReader struct {
OnProgress func(downloaded int64, contentLength int64, err error)
OnClose func(closeErr error)
IdleTimeout time.Duration
IdleSpeedThreshold units.MetricBytes
errCancel error
cancelDebounce *time.Timer
reader io.ReadCloser
contentLength int64
downloaded atomic.Int64
deltastart time.Time
deltavalue atomic.Int64
end time.Time
exit chan struct{}
mu sync.Mutex
}
func (idr *ImageDownloadReader) WrapHTTPResponse(resp *http.Response) *http.Response {
idr.reader = resp.Body
idr.contentLength = resp.ContentLength
idr.exit = make(chan struct{}, 1)
resp.Body = idr
go func() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-idr.exit:
return
case <-ticker.C:
idr.checkSpeed()
}
}
}()
return resp
}
func (idr *ImageDownloadReader) checkSpeed() {
now := time.Now()
if idr.deltastart.IsZero() {
idr.deltastart = now
}
if idr.cancelDebounce == nil {
idr.cancelDebounce = time.AfterFunc(idr.IdleTimeout, func() {
idr.mu.Lock()
defer idr.mu.Unlock()
idr.errCancel = ErrIdleTimeoutReached
})
}
if now.Sub(idr.deltastart) < time.Second {
return
}
idr.deltastart = now
delta := idr.deltavalue.Load()
if delta >= idr.IdleSpeedThreshold {
idr.deltavalue.Store(0)
idr.cancelDebounce.Stop()
idr.cancelDebounce = nil
}
}
func (idr *ImageDownloadReader) Read(p []byte) (n int, err error) {
n, err = idr.reader.Read(p)
idr.deltavalue.Add(int64(n))
newd := idr.downloaded.Add(int64(n))
if idr.OnProgress != nil {
idr.OnProgress(newd, idr.contentLength, err)
}
idr.mu.Lock()
if idr.errCancel != nil {
idr.mu.Unlock()
idr.OnProgress(newd, idr.contentLength, idr.errCancel)
return n, idr.errCancel
}
idr.mu.Unlock()
return n, err
}
func (idr *ImageDownloadReader) Close() error {
idr.exit <- struct{}{}
err := idr.reader.Close()
if idr.OnClose != nil {
idr.OnClose(err)
}
return err
}

View file

@ -197,20 +197,32 @@ func (post *Post) GetImageURL() string {
return post.Data.URL return post.Data.URL
} }
func (post *Post) GetImageSize() (height, width int) { func (post *Post) GetImageSize() (width, height int) {
if len(post.Data.Preview.Images) == 0 { if len(post.Data.Preview.Images) == 0 {
return 0, 0 return 0, 0
} }
source := post.Data.Preview.Images[0].Source source := post.Data.Preview.Images[0].Source
return source.Height, source.Width return source.Width, source.Height
} }
func (post *Post) GetThumbnailURL() string { func (post *Post) GetThumbnailURL() string {
return post.Data.Thumbnail return post.Data.Thumbnail
} }
func (post *Post) GetThumbnailSize() (height, width int) { func (post *Post) GetThumbnailSize() (width, height int) {
return post.Data.ThumbnailHeight, post.Data.ThumbnailWidth return post.Data.ThumbnailWidth, post.Data.ThumbnailHeight
}
func (post *Post) GetSubreddit() string {
return post.Data.Subreddit
}
func (post *Post) GetPermalink() string {
return post.Data.Permalink
}
func (post *Post) GetID() string {
return post.Data.ID
} }
type Data struct { type Data struct {

View file

@ -15,11 +15,11 @@ var DefaultConfig = map[string]any{
"db.string": "data.db", "db.string": "data.db",
"db.automigrate": true, "db.automigrate": true,
"download.concurrency": 5, "download.concurrency": 5,
"download.directory": "", "download.directory": "",
"download.timeout.firstbyte": "30s", "download.timeout.headers": "10s",
"download.timeout.idleconnection": "5s", "download.timeout.idle": "5s",
"download.timeout.idlespeed": 10 * 1024, // 10KB "download.timeout.idlespeed": "10KB",
"http.port": "8080", "http.port": "8080",
"http.host": "0.0.0.0", "http.host": "0.0.0.0",