zoptions: added TeeNamedReadCloser to copy reader as it's consumed

This commit is contained in:
Tigor Hutasuhut 2024-08-26 19:42:19 +07:00
parent 34e7ed06cb
commit b0d52be920
3 changed files with 174 additions and 17 deletions

View file

@ -70,7 +70,18 @@ type Error interface {
// .Sequence().Iter() on this error. // .Sequence().Iter() on this error.
Join(errs ...error) Error Join(errs ...error) Error
// Log logs the error to stderr.
//
// Format will be pretty if current runtime is run in interactive
// environment (TTY is detected), e.g. running inside terminal via `go run` command.
//
// Format will be JSON if current runtime does not have TTY, e.g.
// running inside Docker container, Kubernetes pod, or any non-interactive environment
// like VSCode debugger.
Log(ctx context.Context) Error Log(ctx context.Context) Error
// Notify sends the error to the zen Server
// which will then handle the error and create reports.
Notify(ctx context.Context, opts ...zoptions.NotifyOption) Error Notify(ctx context.Context, opts ...zoptions.NotifyOption) Error
// Sequence returns the state of the error sequence. // Sequence returns the state of the error sequence.

View file

@ -5,6 +5,8 @@ import (
"io" "io"
"os" "os"
"time" "time"
"gitlab.bareksa.com/backend/zen/internal/bufferpool"
) )
type NotifyParameters struct { type NotifyParameters struct {
@ -61,7 +63,7 @@ type NotifyParameters struct {
// //
// Attachments are guaranteed to be closed wether the notification is sent successfully or not // Attachments are guaranteed to be closed wether the notification is sent successfully or not
// to avoid resource leaks. // to avoid resource leaks.
Attachments []NamedReader Attachments []NamedReadCloser
} }
type NotifyOption interface { type NotifyOption interface {
@ -115,7 +117,7 @@ func (no NotifiyOptionBuilder) MaxBackoff(maxBackoff time.Duration) NotifiyOptio
// //
// Attachments are guaranteed to be closed wether the notification is sent successfully or not // Attachments are guaranteed to be closed wether the notification is sent successfully or not
// to avoid resource leaks. // to avoid resource leaks.
func (no NotifiyOptionBuilder) Attachments(attachments ...NamedReader) NotifiyOptionBuilder { func (no NotifiyOptionBuilder) Attachments(attachments ...NamedReadCloser) NotifiyOptionBuilder {
return append(no, NotifyOptionFunc(func(parameters *NotifyParameters) { return append(no, NotifyOptionFunc(func(parameters *NotifyParameters) {
parameters.Attachments = append(parameters.Attachments, attachments...) parameters.Attachments = append(parameters.Attachments, attachments...)
})) }))
@ -162,12 +164,12 @@ func Notify() NotifiyOptionBuilder {
return NotifiyOptionBuilder{} return NotifiyOptionBuilder{}
} }
// NamedReader is an interface that extends io.ReadCloser with // NamedReadCloser is an interface that extends io.ReadCloser with
// Filename and MimeType methods. // Filename and MimeType methods.
// //
// This is used to pass a file with its metadata to the notification // This is used to pass a file with its metadata to the notification
// system. // system.
type NamedReader interface { type NamedReadCloser interface {
io.ReadCloser io.ReadCloser
// Filename returns the name of the file. // Filename returns the name of the file.
// //
@ -205,34 +207,40 @@ type NamedReader interface {
MimeType() string MimeType() string
} }
// NewNamedReader creates a new NamedReader from the given filename, mimeType and reader. // NewNamedReadCloser creates a new NamedReader from the given filename, mimeType and reader.
// //
// If reader implements io.Closer, it will be closed when the NamedReader is closed. // If reader implements io.Closer, it will be closed when the NamedReadCloser is closed.
// //
// Otherwise, the reader will be wrapped with io.NopCloser. // Otherwise, the reader will be wrapped with io.NopCloser.
func NewNamedReader(filename, mimeType string, reader io.Reader) NamedReader { //
// NewNamedReadCloser claims ownership of the reader and the caller should not use the reader
// after passing it to NewNamedReadCloser.
//
// To create a NamedReadCloser that can copy the reader as it's consumed, e.g. the reader
// is used for HTTP.Request.Body for example, use TeeNamedReader.
func NewNamedReadCloser(filename, mimeType string, reader io.Reader) NamedReadCloser {
var rc io.ReadCloser var rc io.ReadCloser
if c, ok := reader.(io.ReadCloser); ok { if c, ok := reader.(io.ReadCloser); ok {
rc = c rc = c
} else { } else {
rc = io.NopCloser(reader) rc = io.NopCloser(reader)
} }
return &namedReader{ return &namedReadCloser{
filename: filename, filename: filename,
mimeType: mimeType, mimeType: mimeType,
reader: rc, reader: rc,
} }
} }
// NewNamedReaderFromFile same as NewNamedReader but reads the file from the filesystem. // NewNamedReadCloserFromFile same as NewNamedReader but reads the file from the filesystem.
// //
// If the file does not exist or there is an error in permissions on reading the file, // If the file does not exist or there is an error in permissions on reading the file,
// the error will be logged but otherwise ignored. // the error will be logged but otherwise ignored.
// //
// Example: // Example:
// //
// NewNamedReaderFromFile("path/to/file.txt", "text/plain") // NewNamedReadCloserFromFile("path/to/file.txt", "text/plain")
func NewNamedReaderFromFile(path string, mimetype string) NamedReader { func NewNamedReadCloserFromFile(path string, mimetype string) NamedReadCloser {
var rc io.ReadCloser var rc io.ReadCloser
f, err := os.Open(path) f, err := os.Open(path)
if err != nil { if err != nil {
@ -245,32 +253,84 @@ func NewNamedReaderFromFile(path string, mimetype string) NamedReader {
rc = f rc = f
} }
} }
return &namedReader{ return &namedReadCloser{
filename: path, filename: path,
mimeType: mimetype, mimeType: mimetype,
reader: rc, reader: rc,
} }
} }
type namedReader struct { type readCloser struct {
tee io.Reader
close func() error
}
// Read implements io.Reader.
func (re *readCloser) Read(p []byte) (n int, err error) {
return re.tee.Read(p)
}
// Close implements io.Closer.
func (re *readCloser) Close() error {
return re.close()
}
// TeeNamedReadCloser copies everything read from returned rc to namedReadCloser as it's consumed.
//
// Useful for e.g. HTTP Request Body.
//
// if reader is io.ReadCloser, it will be closed when the returned rc is closed (NOT namedReadCloser!).
// Otherwise, the reader will be wrapped with io.NopCloser.
//
// Example HTTP Request Body:
//
// // tee will forward the close method if the original reader is a ReadCloser.
// tee, named := zoptions.TeeNamedReadCloser("file.txt", "text/plain", body)
// req, _ := http.NewRequest("POST", "http://example.com", tee) // use tee as the body.
//
// resp, err := http.DefaultClient.Do(req)
// // named now contains copy of values of the reader
// // and can be used to send the file as attachment.
func TeeNamedReadCloser(name, mimeType string, reader io.Reader) (rc io.ReadCloser, namedReadCloser NamedReadCloser) {
buf := bufferpool.Get()
if e, ok := reader.(interface{ Len() int }); ok {
buf.Grow(e.Len())
}
var closeFunc func() error
if c, ok := reader.(io.ReadCloser); ok {
closeFunc = c.Close
} else {
closeFunc = func() error { return nil }
}
tee := io.TeeReader(reader, buf)
teeReader := &readCloser{
tee: tee,
close: closeFunc,
}
nrc := NewNamedReadCloser(name, mimeType, buf)
return teeReader, nrc
}
type namedReadCloser struct {
filename string filename string
mimeType string mimeType string
reader io.ReadCloser reader io.ReadCloser
} }
func (na *namedReader) Read(p []byte) (n int, err error) { func (na *namedReadCloser) Read(p []byte) (n int, err error) {
return na.reader.Read(p) return na.reader.Read(p)
} }
func (na *namedReader) Close() error { func (na *namedReadCloser) Close() error {
return na.reader.Close() return na.reader.Close()
} }
func (na *namedReader) Filename() string { func (na *namedReadCloser) Filename() string {
return na.filename return na.filename
} }
func (na *namedReader) MimeType() string { func (na *namedReadCloser) MimeType() string {
return na.mimeType return na.mimeType
} }

View file

@ -0,0 +1,86 @@
// bufferpool is a simple buffer pool for bytes.Buffer.
//
// It is useful for reducing memory allocations when working with bytes.Buffer.
//
// Maximum buffer size is 4MB. Anything bigger will be discarded to be garbage collected
// and avoid huge memory usage.
package bufferpool
import (
"bytes"
"sync"
"sync/atomic"
)
type BufferPool struct {
pool *sync.Pool
maxSharedCapacity uint64
currentCapacity atomic.Uint64
maxBufferCapacity int
}
// Buffer is a wrapper around bytes.Buffer
// that contains a reference to the pool.
//
// When Buffer Close method is called, it will be put back to the pool.
//
// Close never returns an error. The signature is to implement io.Closer.
type Buffer struct {
*bytes.Buffer
pool *BufferPool
}
// Close puts the buffer back to the pool.
// It never returns an error.
func (buf *Buffer) Close() error {
buf.pool.Put(buf)
return nil
}
func (b *BufferPool) Get() *Buffer {
buf := b.pool.Get().(*Buffer)
b.currentCapacity.Add(-uint64(buf.Cap()))
return buf
}
func (b *BufferPool) Put(buf *Buffer) {
bufCap := uint64(buf.Cap())
overloaded := b.currentCapacity.Add(bufCap) > b.maxSharedCapacity
if buf.Cap() < b.maxBufferCapacity && !overloaded {
buf.Reset()
b.pool.Put(buf)
} else {
b.currentCapacity.Add(-bufCap)
}
}
func New(sharedCapacity uint64, bufferCapacity int) *BufferPool {
b := &BufferPool{
maxSharedCapacity: sharedCapacity,
maxBufferCapacity: bufferCapacity,
}
b.pool.New = func() any {
return &Buffer{&bytes.Buffer{}, b}
}
return b
}
const (
sharedCap uint64 = 64 * 1024 * 1024 // 64MB
bufCap int = 8 * 1024 * 1024 // 8MB
)
var pool = New(sharedCap, bufCap)
// Get returns a bytes.Buffer from the global pool.
func Get() *Buffer {
return pool.Get()
}
// Put puts the bytes.Buffer back to the the pool it origins from.
//
// Same method as calling Close on the Buffer.
func Put(buf *Buffer) {
buf.pool.Put(buf)
}