Search Apps Documentation Source Content File Folder Download Copy

flow.gno

8.37 Kb ยท 288 lines
  1//
  2// Written by Maxim Khitrov (November 2012)
  3//
  4// XXX modified to disable blocking, time.Sleep().
  5
  6// Package flow provides the tools for monitoring and limiting the flow rate
  7// of an arbitrary data stream.
  8package flow
  9
 10import (
 11	"math"
 12	// "sync"
 13	"time"
 14)
 15
 16// Monitor monitors and limits the transfer rate of a data stream.
 17type Monitor struct {
 18	// mu      sync.Mutex    // Mutex guarding access to all internal fields
 19	active  bool          // Flag indicating an active transfer
 20	start   time.Duration // Transfer start time (clock() value)
 21	bytes   int64         // Total number of bytes transferred
 22	samples int64         // Total number of samples taken
 23
 24	rSample float64 // Most recent transfer rate sample (bytes per second)
 25	rEMA    float64 // Exponential moving average of rSample
 26	rPeak   float64 // Peak transfer rate (max of all rSamples)
 27	rWindow float64 // rEMA window (seconds)
 28
 29	sBytes int64         // Number of bytes transferred since sLast
 30	sLast  time.Duration // Most recent sample time (stop time when inactive)
 31	sRate  time.Duration // Sampling rate
 32
 33	tBytes int64         // Number of bytes expected in the current transfer
 34	tLast  time.Duration // Time of the most recent transfer of at least 1 byte
 35}
 36
 37// New creates a new flow control monitor. Instantaneous transfer rate is
 38// measured and updated for each sampleRate interval. windowSize determines the
 39// weight of each sample in the exponential moving average (EMA) calculation.
 40// The exact formulas are:
 41//
 42//	sampleTime = currentTime - prevSampleTime
 43//	sampleRate = byteCount / sampleTime
 44//	weight     = 1 - exp(-sampleTime/windowSize)
 45//	newRate    = weight*sampleRate + (1-weight)*oldRate
 46//
 47// The default values for sampleRate and windowSize (if <= 0) are 100ms and 1s,
 48// respectively.
 49func New(sampleRate, windowSize time.Duration) *Monitor {
 50	if sampleRate = clockRound(sampleRate); sampleRate <= 0 {
 51		sampleRate = 5 * clockRate
 52	}
 53	if windowSize <= 0 {
 54		windowSize = 1 * time.Second
 55	}
 56	now := clock()
 57	return &Monitor{
 58		active:  true,
 59		start:   now,
 60		rWindow: windowSize.Seconds(),
 61		sLast:   now,
 62		sRate:   sampleRate,
 63		tLast:   now,
 64	}
 65}
 66
 67// Update records the transfer of n bytes and returns n. It should be called
 68// after each Read/Write operation, even if n is 0.
 69func (m *Monitor) Update(n int) int {
 70	// m.mu.Lock()
 71	m.update(n)
 72	// m.mu.Unlock()
 73	return n
 74}
 75
 76// Hack to set the current rEMA.
 77func (m *Monitor) SetREMA(rEMA float64) {
 78	// m.mu.Lock()
 79	m.rEMA = rEMA
 80	m.samples++
 81	// m.mu.Unlock()
 82}
 83
 84// IO is a convenience method intended to wrap io.Reader and io.Writer method
 85// execution. It calls m.Update(n) and then returns (n, err) unmodified.
 86func (m *Monitor) IO(n int, err error) (int, error) {
 87	return m.Update(n), err
 88}
 89
 90// Done marks the transfer as finished and prevents any further updates or
 91// limiting. Instantaneous and current transfer rates drop to 0. Update, IO, and
 92// Limit methods become NOOPs. It returns the total number of bytes transferred.
 93func (m *Monitor) Done() int64 {
 94	// m.mu.Lock()
 95	if now := m.update(0); m.sBytes > 0 {
 96		m.reset(now)
 97	}
 98	m.active = false
 99	m.tLast = 0
100	n := m.bytes
101	// m.mu.Unlock()
102	return n
103}
104
105// timeRemLimit is the maximum Status.TimeRem value.
106const timeRemLimit = 999*time.Hour + 59*time.Minute + 59*time.Second
107
108// Status represents the current Monitor status. All transfer rates are in bytes
109// per second rounded to the nearest byte.
110type Status struct {
111	Active   bool          // Flag indicating an active transfer
112	Start    time.Time     // Transfer start time
113	Duration time.Duration // Time period covered by the statistics
114	Idle     time.Duration // Time since the last transfer of at least 1 byte
115	Bytes    int64         // Total number of bytes transferred
116	Samples  int64         // Total number of samples taken
117	InstRate int64         // Instantaneous transfer rate
118	CurRate  int64         // Current transfer rate (EMA of InstRate)
119	AvgRate  int64         // Average transfer rate (Bytes / Duration)
120	PeakRate int64         // Maximum instantaneous transfer rate
121	BytesRem int64         // Number of bytes remaining in the transfer
122	TimeRem  time.Duration // Estimated time to completion
123	Progress Percent       // Overall transfer progress
124}
125
126func (s Status) String() string {
127	return "STATUS{}"
128}
129
130// Status returns current transfer status information. The returned value
131// becomes static after a call to Done.
132func (m *Monitor) Status() Status {
133	// m.mu.Lock()
134	now := m.update(0)
135	s := Status{
136		Active:   m.active,
137		Start:    clockToTime(m.start),
138		Duration: m.sLast - m.start,
139		Idle:     now - m.tLast,
140		Bytes:    m.bytes,
141		Samples:  m.samples,
142		PeakRate: round(m.rPeak),
143		BytesRem: m.tBytes - m.bytes,
144		Progress: percentOf(float64(m.bytes), float64(m.tBytes)),
145	}
146	if s.BytesRem < 0 {
147		s.BytesRem = 0
148	}
149	if s.Duration > 0 {
150		rAvg := float64(s.Bytes) / s.Duration.Seconds()
151		s.AvgRate = round(rAvg)
152		if s.Active {
153			s.InstRate = round(m.rSample)
154			s.CurRate = round(m.rEMA)
155			if s.BytesRem > 0 {
156				if tRate := 0.8*m.rEMA + 0.2*rAvg; tRate > 0 {
157					ns := float64(s.BytesRem) / tRate * 1e9
158					if ns > float64(timeRemLimit) {
159						ns = float64(timeRemLimit)
160					}
161					s.TimeRem = clockRound(time.Duration(ns))
162				}
163			}
164		}
165	}
166	// m.mu.Unlock()
167	return s
168}
169
170// Limit restricts the instantaneous (per-sample) data flow to rate bytes per
171// second. It returns the maximum number of bytes (0 <= n <= want) that may be
172// transferred immediately without exceeding the limit. If block == true, the
173// call blocks until n > 0. want is returned unmodified if want < 1, rate < 1,
174// or the transfer is inactive (after a call to Done).
175//
176// At least one byte is always allowed to be transferred in any given sampling
177// period. Thus, if the sampling rate is 100ms, the lowest achievable flow rate
178// is 10 bytes per second.
179//
180// For usage examples, see the implementation of Reader and Writer in io.go.
181func (m *Monitor) Limit(want int, rate int64, block bool) (n int) {
182	if block {
183		panic("blocking not yet supported")
184	}
185	if want < 1 || rate < 1 {
186		return want
187	}
188	// m.mu.Lock()
189
190	// Determine the maximum number of bytes that can be sent in one sample
191	limit := round(float64(rate) * m.sRate.Seconds())
192	if limit <= 0 {
193		limit = 1
194	}
195
196	_ = m.update(0)
197	/* XXX
198	// If block == true, wait until m.sBytes < limit
199	if now := m.update(0); block {
200		for m.sBytes >= limit && m.active {
201			now = m.waitNextSample(now)
202		}
203	}
204	*/
205
206	// Make limit <= want (unlimited if the transfer is no longer active)
207	if limit -= m.sBytes; limit > int64(want) || !m.active {
208		limit = int64(want)
209	}
210	// m.mu.Unlock()
211
212	if limit < 0 {
213		limit = 0
214	}
215	return int(limit)
216}
217
218// SetTransferSize specifies the total size of the data transfer, which allows
219// the Monitor to calculate the overall progress and time to completion.
220func (m *Monitor) SetTransferSize(bytes int64) {
221	if bytes < 0 {
222		bytes = 0
223	}
224	// m.mu.Lock()
225	m.tBytes = bytes
226	// m.mu.Unlock()
227}
228
229// update accumulates the transferred byte count for the current sample until
230// clock() - m.sLast >= m.sRate. The monitor status is updated once the current
231// sample is done.
232func (m *Monitor) update(n int) (now time.Duration) {
233	if !m.active {
234		return
235	}
236	if now = clock(); n > 0 {
237		m.tLast = now
238	}
239	m.sBytes += int64(n)
240	if sTime := now - m.sLast; sTime >= m.sRate {
241		t := sTime.Seconds()
242		if m.rSample = float64(m.sBytes) / t; m.rSample > m.rPeak {
243			m.rPeak = m.rSample
244		}
245
246		// Exponential moving average using a method similar to *nix load
247		// average calculation. Longer sampling periods carry greater weight.
248		if m.samples > 0 {
249			w := math.Exp(-t / m.rWindow)
250			m.rEMA = m.rSample + w*(m.rEMA-m.rSample)
251		} else {
252			m.rEMA = m.rSample
253		}
254		m.reset(now)
255	}
256	return
257}
258
259// reset clears the current sample state in preparation for the next sample.
260func (m *Monitor) reset(sampleTime time.Duration) {
261	m.bytes += m.sBytes
262	m.samples++
263	m.sBytes = 0
264	m.sLast = sampleTime
265}
266
267/*
268// waitNextSample sleeps for the remainder of the current sample. The lock is
269// released and reacquired during the actual sleep period, so it's possible for
270// the transfer to be inactive when this method returns.
271func (m *Monitor) waitNextSample(now time.Duration) time.Duration {
272	const minWait = 5 * time.Millisecond
273	current := m.sLast
274
275	// sleep until the last sample time changes (ideally, just one iteration)
276	for m.sLast == current && m.active {
277		d := current + m.sRate - now
278		// m.mu.Unlock()
279		if d < minWait {
280			d = minWait
281		}
282		time.Sleep(d)
283		// m.mu.Lock()
284		now = m.update(0)
285	}
286	return now
287}
288*/