Skip to content

Commit

Permalink
[passthru] Add package with pass-thru reader and writer
Browse files Browse the repository at this point in the history
  • Loading branch information
andyone committed Jan 22, 2024
1 parent ea24996 commit 0073e60
Show file tree
Hide file tree
Showing 7 changed files with 621 additions and 2 deletions.
1 change: 1 addition & 0 deletions .scripts/packages.list
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
L + netutil
* + options
* + pager
* + passthru
* + passwd
* + path
* + pid
Expand Down
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
## Changelog

### 12.96.2
### 12.97.0

* `[passthru]` Added package with pass-thru reader and writer
* `[fmtutil/table]` Improved borders and separators rendering
* `[usage]` Improved environment info output
* `[spinner]` Improved message rendering
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ go get -u github.com/essentialkaos/ek/v12
* [`netutil`](https://kaos.sh/g/ek.v12/netutil) — Package provides methods for working with network
* [`options`](https://kaos.sh/g/ek.v12/options) — Package provides methods for working with command-line options
* [`pager`](https://kaos.sh/g/ek.v12/pager) — Package provides methods for pager setup (more/less)
* [`passthru`](https://kaos.sh/g/ek.v12/passthru) — Package provides Reader and Writer with information about the amount of data being passed
* [`passwd`](https://kaos.sh/g/ek.v12/passwd) — Package contains methods for working with passwords
* [`path`](https://kaos.sh/g/ek.v12/path) — Package for working with paths (fully compatible with base path package)
* [`pid`](https://kaos.sh/g/ek.v12/pid) — Package for working with PID files
Expand Down
2 changes: 1 addition & 1 deletion ek.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
// ////////////////////////////////////////////////////////////////////////////////// //

// VERSION is current ek package version
const VERSION = "12.96.2"
const VERSION = "12.97.0"

// ////////////////////////////////////////////////////////////////////////////////// //

Expand Down
331 changes: 331 additions & 0 deletions passthru/passthru.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,331 @@
// Package passthru provides Reader and Writer with information about the amount
// of data being passed.
package passthru

// ////////////////////////////////////////////////////////////////////////////////// //
// //
// Copyright (c) 2024 ESSENTIAL KAOS //
// Apache License, Version 2.0 <https://www.apache.org/licenses/LICENSE-2.0> //
// //
// ////////////////////////////////////////////////////////////////////////////////// //

import (
"errors"
"io"
"math"
"sync"
"sync/atomic"
"time"
)

// ////////////////////////////////////////////////////////////////////////////////// //

// Reader is pass-thru Reader
type Reader struct {
Calculator *Calculator
Update func(n int)

r io.ReadCloser
current int64
total int64
isClosed *atomic.Bool
}

// Writer is pass-thru Writer
type Writer struct {
Calculator *Calculator
Update func(n int)

w io.WriteCloser
current int64
total int64
isClosed *atomic.Bool
}

// Calculator calcluates pass-thru speed and remaining time

Check warning on line 45 in passthru/passthru.go

View workflow job for this annotation

GitHub Actions / Typos

"calcluates" should be "calculates".

Check warning on line 45 in passthru/passthru.go

View workflow job for this annotation

GitHub Actions / Typos

"calcluates" should be "calculates".
type Calculator struct {
total float64
prev float64
speed float64
decay float64
remaining time.Duration
lastUpdate time.Time

mu sync.Mutex
}

// ////////////////////////////////////////////////////////////////////////////////// //

var (
ErrNilReader = errors.New("Reader is nil")
ErrNilWriter = errors.New("Writer is nil")
)

// ////////////////////////////////////////////////////////////////////////////////// //

// NewReader creates new passthru reader
func NewReader(reader io.ReadCloser, total int64) *Reader {
return &Reader{
r: reader,
total: total,
isClosed: &atomic.Bool{},
}
}

// NewWriter creates new passthru writer
func NewWriter(writer io.WriteCloser, total int64) *Writer {
return &Writer{
w: writer,
total: total,
isClosed: &atomic.Bool{},
}
}

// NewCalculator creates new Calculator struct
func NewCalculator(total int64, winSizeSec float64) *Calculator {
return &Calculator{
total: float64(total),
decay: 2 / (winSizeSec + 1),
mu: sync.Mutex{},
}
}

// ////////////////////////////////////////////////////////////////////////////////// //

// Read implements the standard Read interface
func (r *Reader) Read(p []byte) (int, error) {
if r == nil || r.r == nil {
return -1, ErrNilReader
}

n, err := r.r.Read(p)

if err != nil {
return n, err
}

atomic.AddInt64(&r.current, int64(n))

if r.Update != nil {
r.Update(n)
}

return n, nil
}

// Current returns read amount of data
func (r *Reader) Current() int64 {
if r == nil {
return 0
}

return atomic.LoadInt64(&r.current)
}

// Total returns total amount of data
func (r *Reader) Total() int64 {
if r == nil {
return 0
}

return atomic.LoadInt64(&r.total)
}

// Progress returns percentage of data read
func (r *Reader) Progress() float64 {
if r == nil {
return 0
}

return (float64(r.Current()) / float64(r.Total())) * 100.0
}

// SetTotal sets total amount of data
func (r *Reader) SetTotal(total int64) {
if r == nil {
return
}

atomic.StoreInt64(&r.total, total)
}

// Speed calculates passthru speed and time remaining to process all data.
// If Calculator was not set on the first call, a default calculator with
// a 10 second window is created.
func (r *Reader) Speed() (float64, time.Duration) {
if r == nil || r.Total() <= 0 {
return 0, 0
}

if r.Calculator == nil {
r.Calculator = NewCalculator(r.Total(), 10.0)
}

return r.Calculator.Calculate(atomic.LoadInt64(&r.current))
}

// Close closes the reader
func (r *Reader) Close() error {
if r == nil {
return ErrNilReader
}

atomic.StoreInt64(&r.current, 0)
r.isClosed.Store(true)

return r.r.Close()
}

// IsClosed returns true if reader is closed
func (r *Reader) IsClosed() bool {
if r == nil {
return true
}

return r.isClosed.Load()
}

// ////////////////////////////////////////////////////////////////////////////////// //

// Write implements the standard Write interface
func (w *Writer) Write(p []byte) (int, error) {
if w == nil || w.w == nil {
return -1, ErrNilWriter
}

n, err := w.w.Write(p)

if err != nil {
return n, err
}

atomic.AddInt64(&w.current, int64(n))

if w.Update != nil {
w.Update(n)
}

return n, nil
}

// Current returns written amount of data
func (w *Writer) Current() int64 {
if w == nil {
return 0
}

return atomic.LoadInt64(&w.current)
}

// Total returns total amount of data
func (w *Writer) Total() int64 {
if w == nil {
return 0
}

return atomic.LoadInt64(&w.total)
}

// Progress returns percentage of data written
func (w *Writer) Progress() float64 {
if w == nil {
return 0
}

return (float64(w.Current()) / float64(w.Total())) * 100.0
}

// SetTotal sets total amount of data
func (w *Writer) SetTotal(total int64) {
if w == nil {
return
}

atomic.StoreInt64(&w.total, total)
}

// Speed calculates passthru speed and time remaining to process all data.
// If Calculator was not set on the first call, a default calculator with
// a 10 second window is created.
func (w *Writer) Speed() (float64, time.Duration) {
if w == nil || w.Total() <= 0 {
return 0, 0
}

if w.Calculator == nil {
w.Calculator = NewCalculator(w.Total(), 10.0)
}

return w.Calculator.Calculate(atomic.LoadInt64(&w.current))
}

// Close closes the writer
func (w *Writer) Close() error {
if w == nil {
return ErrNilWriter
}

atomic.StoreInt64(&w.current, 0)
w.isClosed.Store(true)

return w.w.Close()
}

// IsClosed returns true if writer is closed
func (w *Writer) IsClosed() bool {
if w == nil {
return true
}

return w.isClosed.Load()
}

// ////////////////////////////////////////////////////////////////////////////////// //

// Calculate calculates speed and remaining time
func (c *Calculator) Calculate(current int64) (float64, time.Duration) {
if c == nil {
return 0, 0
}

c.mu.Lock()
defer c.mu.Unlock()

cur := float64(current)
now := time.Now()

if c.total == 0 {
return 0, 0
}

if !c.lastUpdate.IsZero() && now.Sub(c.lastUpdate) < time.Second/2 {
return c.speed, c.remaining
}

speed := math.Abs(cur-c.prev) / time.Since(c.lastUpdate).Seconds()
c.speed = (speed * c.decay) + (c.speed * (1.0 - c.decay))

if c.prev != 0 && c.speed > 0 {
c.remaining = time.Duration((c.total-cur)/c.speed) * time.Second
}

if c.remaining > time.Hour {
c.remaining = time.Hour
}

c.prev = cur
c.lastUpdate = now

return c.speed, c.remaining
}

// SetTotal sets total number of objects
func (c *Calculator) SetTotal(total int64) {
if c == nil {
return
}

c.mu.Lock()
c.total = float64(total)
c.mu.Unlock()
}
Loading

0 comments on commit 0073e60

Please sign in to comment.