From b0d52be920f3861d169711b8b7228b9fb699883e Mon Sep 17 00:00:00 2001 From: Tigor Hutasuhut Date: Mon, 26 Aug 2024 19:42:19 +0700 Subject: [PATCH] zoptions: added TeeNamedReadCloser to copy reader as it's consumed --- core/zerr/error.go | 11 ++++ core/zoptions/notify.go | 94 +++++++++++++++++++++++++------ internal/bufferpool/bufferpool.go | 86 ++++++++++++++++++++++++++++ 3 files changed, 174 insertions(+), 17 deletions(-) create mode 100644 internal/bufferpool/bufferpool.go diff --git a/core/zerr/error.go b/core/zerr/error.go index 88c75f0..56984ed 100644 --- a/core/zerr/error.go +++ b/core/zerr/error.go @@ -70,7 +70,18 @@ type Error interface { // .Sequence().Iter() on this error. Join(errs ...error) Error + // Log logs the error to stderr. + // + // Format will be pretty if current runtime is run in interactive + // environment (TTY is detected), e.g. running inside terminal via `go run` command. + // + // Format will be JSON if current runtime does not have TTY, e.g. + // running inside Docker container, Kubernetes pod, or any non-interactive environment + // like VSCode debugger. Log(ctx context.Context) Error + + // Notify sends the error to the zen Server + // which will then handle the error and create reports. Notify(ctx context.Context, opts ...zoptions.NotifyOption) Error // Sequence returns the state of the error sequence. diff --git a/core/zoptions/notify.go b/core/zoptions/notify.go index e8d3f14..2b824bd 100644 --- a/core/zoptions/notify.go +++ b/core/zoptions/notify.go @@ -5,6 +5,8 @@ import ( "io" "os" "time" + + "gitlab.bareksa.com/backend/zen/internal/bufferpool" ) type NotifyParameters struct { @@ -61,7 +63,7 @@ type NotifyParameters struct { // // Attachments are guaranteed to be closed wether the notification is sent successfully or not // to avoid resource leaks. - Attachments []NamedReader + Attachments []NamedReadCloser } type NotifyOption interface { @@ -115,7 +117,7 @@ func (no NotifiyOptionBuilder) MaxBackoff(maxBackoff time.Duration) NotifiyOptio // // Attachments are guaranteed to be closed wether the notification is sent successfully or not // to avoid resource leaks. -func (no NotifiyOptionBuilder) Attachments(attachments ...NamedReader) NotifiyOptionBuilder { +func (no NotifiyOptionBuilder) Attachments(attachments ...NamedReadCloser) NotifiyOptionBuilder { return append(no, NotifyOptionFunc(func(parameters *NotifyParameters) { parameters.Attachments = append(parameters.Attachments, attachments...) })) @@ -162,12 +164,12 @@ func Notify() NotifiyOptionBuilder { return NotifiyOptionBuilder{} } -// NamedReader is an interface that extends io.ReadCloser with +// NamedReadCloser is an interface that extends io.ReadCloser with // Filename and MimeType methods. // // This is used to pass a file with its metadata to the notification // system. -type NamedReader interface { +type NamedReadCloser interface { io.ReadCloser // Filename returns the name of the file. // @@ -205,34 +207,40 @@ type NamedReader interface { MimeType() string } -// NewNamedReader creates a new NamedReader from the given filename, mimeType and reader. +// NewNamedReadCloser creates a new NamedReader from the given filename, mimeType and reader. // -// If reader implements io.Closer, it will be closed when the NamedReader is closed. +// If reader implements io.Closer, it will be closed when the NamedReadCloser is closed. // // Otherwise, the reader will be wrapped with io.NopCloser. -func NewNamedReader(filename, mimeType string, reader io.Reader) NamedReader { +// +// NewNamedReadCloser claims ownership of the reader and the caller should not use the reader +// after passing it to NewNamedReadCloser. +// +// To create a NamedReadCloser that can copy the reader as it's consumed, e.g. the reader +// is used for HTTP.Request.Body for example, use TeeNamedReader. +func NewNamedReadCloser(filename, mimeType string, reader io.Reader) NamedReadCloser { var rc io.ReadCloser if c, ok := reader.(io.ReadCloser); ok { rc = c } else { rc = io.NopCloser(reader) } - return &namedReader{ + return &namedReadCloser{ filename: filename, mimeType: mimeType, reader: rc, } } -// NewNamedReaderFromFile same as NewNamedReader but reads the file from the filesystem. +// NewNamedReadCloserFromFile same as NewNamedReader but reads the file from the filesystem. // // If the file does not exist or there is an error in permissions on reading the file, // the error will be logged but otherwise ignored. // // Example: // -// NewNamedReaderFromFile("path/to/file.txt", "text/plain") -func NewNamedReaderFromFile(path string, mimetype string) NamedReader { +// NewNamedReadCloserFromFile("path/to/file.txt", "text/plain") +func NewNamedReadCloserFromFile(path string, mimetype string) NamedReadCloser { var rc io.ReadCloser f, err := os.Open(path) if err != nil { @@ -245,32 +253,84 @@ func NewNamedReaderFromFile(path string, mimetype string) NamedReader { rc = f } } - return &namedReader{ + return &namedReadCloser{ filename: path, mimeType: mimetype, reader: rc, } } -type namedReader struct { +type readCloser struct { + tee io.Reader + close func() error +} + +// Read implements io.Reader. +func (re *readCloser) Read(p []byte) (n int, err error) { + return re.tee.Read(p) +} + +// Close implements io.Closer. +func (re *readCloser) Close() error { + return re.close() +} + +// TeeNamedReadCloser copies everything read from returned rc to namedReadCloser as it's consumed. +// +// Useful for e.g. HTTP Request Body. +// +// if reader is io.ReadCloser, it will be closed when the returned rc is closed (NOT namedReadCloser!). +// Otherwise, the reader will be wrapped with io.NopCloser. +// +// Example HTTP Request Body: +// +// // tee will forward the close method if the original reader is a ReadCloser. +// tee, named := zoptions.TeeNamedReadCloser("file.txt", "text/plain", body) +// req, _ := http.NewRequest("POST", "http://example.com", tee) // use tee as the body. +// +// resp, err := http.DefaultClient.Do(req) +// // named now contains copy of values of the reader +// // and can be used to send the file as attachment. +func TeeNamedReadCloser(name, mimeType string, reader io.Reader) (rc io.ReadCloser, namedReadCloser NamedReadCloser) { + buf := bufferpool.Get() + if e, ok := reader.(interface{ Len() int }); ok { + buf.Grow(e.Len()) + } + var closeFunc func() error + if c, ok := reader.(io.ReadCloser); ok { + closeFunc = c.Close + } else { + closeFunc = func() error { return nil } + } + + tee := io.TeeReader(reader, buf) + teeReader := &readCloser{ + tee: tee, + close: closeFunc, + } + nrc := NewNamedReadCloser(name, mimeType, buf) + return teeReader, nrc +} + +type namedReadCloser struct { filename string mimeType string reader io.ReadCloser } -func (na *namedReader) Read(p []byte) (n int, err error) { +func (na *namedReadCloser) Read(p []byte) (n int, err error) { return na.reader.Read(p) } -func (na *namedReader) Close() error { +func (na *namedReadCloser) Close() error { return na.reader.Close() } -func (na *namedReader) Filename() string { +func (na *namedReadCloser) Filename() string { return na.filename } -func (na *namedReader) MimeType() string { +func (na *namedReadCloser) MimeType() string { return na.mimeType } diff --git a/internal/bufferpool/bufferpool.go b/internal/bufferpool/bufferpool.go new file mode 100644 index 0000000..a4d774f --- /dev/null +++ b/internal/bufferpool/bufferpool.go @@ -0,0 +1,86 @@ +// bufferpool is a simple buffer pool for bytes.Buffer. +// +// It is useful for reducing memory allocations when working with bytes.Buffer. +// +// Maximum buffer size is 4MB. Anything bigger will be discarded to be garbage collected +// and avoid huge memory usage. + +package bufferpool + +import ( + "bytes" + "sync" + "sync/atomic" +) + +type BufferPool struct { + pool *sync.Pool + maxSharedCapacity uint64 + currentCapacity atomic.Uint64 + maxBufferCapacity int +} + +// Buffer is a wrapper around bytes.Buffer +// that contains a reference to the pool. +// +// When Buffer Close method is called, it will be put back to the pool. +// +// Close never returns an error. The signature is to implement io.Closer. +type Buffer struct { + *bytes.Buffer + pool *BufferPool +} + +// Close puts the buffer back to the pool. +// It never returns an error. +func (buf *Buffer) Close() error { + buf.pool.Put(buf) + return nil +} + +func (b *BufferPool) Get() *Buffer { + buf := b.pool.Get().(*Buffer) + b.currentCapacity.Add(-uint64(buf.Cap())) + return buf +} + +func (b *BufferPool) Put(buf *Buffer) { + bufCap := uint64(buf.Cap()) + overloaded := b.currentCapacity.Add(bufCap) > b.maxSharedCapacity + if buf.Cap() < b.maxBufferCapacity && !overloaded { + buf.Reset() + b.pool.Put(buf) + } else { + b.currentCapacity.Add(-bufCap) + } +} + +func New(sharedCapacity uint64, bufferCapacity int) *BufferPool { + b := &BufferPool{ + maxSharedCapacity: sharedCapacity, + maxBufferCapacity: bufferCapacity, + } + b.pool.New = func() any { + return &Buffer{&bytes.Buffer{}, b} + } + return b +} + +const ( + sharedCap uint64 = 64 * 1024 * 1024 // 64MB + bufCap int = 8 * 1024 * 1024 // 8MB +) + +var pool = New(sharedCap, bufCap) + +// Get returns a bytes.Buffer from the global pool. +func Get() *Buffer { + return pool.Get() +} + +// Put puts the bytes.Buffer back to the the pool it origins from. +// +// Same method as calling Close on the Buffer. +func Put(buf *Buffer) { + buf.pool.Put(buf) +}