1//
2// Written by Maxim Khitrov (November 2012)
3//
4// XXX modified to disable blocking, time.Sleep().
5
6// Package flow provides the tools for monitoring and limiting the flow rate
7// of an arbitrary data stream.
8package flow
9
10import (
11 "math"
12 // "sync"
13 "time"
14)
15
16// Monitor monitors and limits the transfer rate of a data stream.
17type Monitor struct {
18 // mu sync.Mutex // Mutex guarding access to all internal fields
19 active bool // Flag indicating an active transfer
20 start time.Duration // Transfer start time (clock() value)
21 bytes int64 // Total number of bytes transferred
22 samples int64 // Total number of samples taken
23
24 rSample float64 // Most recent transfer rate sample (bytes per second)
25 rEMA float64 // Exponential moving average of rSample
26 rPeak float64 // Peak transfer rate (max of all rSamples)
27 rWindow float64 // rEMA window (seconds)
28
29 sBytes int64 // Number of bytes transferred since sLast
30 sLast time.Duration // Most recent sample time (stop time when inactive)
31 sRate time.Duration // Sampling rate
32
33 tBytes int64 // Number of bytes expected in the current transfer
34 tLast time.Duration // Time of the most recent transfer of at least 1 byte
35}
36
37// New creates a new flow control monitor. Instantaneous transfer rate is
38// measured and updated for each sampleRate interval. windowSize determines the
39// weight of each sample in the exponential moving average (EMA) calculation.
40// The exact formulas are:
41//
42// sampleTime = currentTime - prevSampleTime
43// sampleRate = byteCount / sampleTime
44// weight = 1 - exp(-sampleTime/windowSize)
45// newRate = weight*sampleRate + (1-weight)*oldRate
46//
47// The default values for sampleRate and windowSize (if <= 0) are 100ms and 1s,
48// respectively.
49func New(sampleRate, windowSize time.Duration) *Monitor {
50 if sampleRate = clockRound(sampleRate); sampleRate <= 0 {
51 sampleRate = 5 * clockRate
52 }
53 if windowSize <= 0 {
54 windowSize = 1 * time.Second
55 }
56 now := clock()
57 return &Monitor{
58 active: true,
59 start: now,
60 rWindow: windowSize.Seconds(),
61 sLast: now,
62 sRate: sampleRate,
63 tLast: now,
64 }
65}
66
67// Update records the transfer of n bytes and returns n. It should be called
68// after each Read/Write operation, even if n is 0.
69func (m *Monitor) Update(n int) int {
70 // m.mu.Lock()
71 m.update(n)
72 // m.mu.Unlock()
73 return n
74}
75
76// Hack to set the current rEMA.
77func (m *Monitor) SetREMA(rEMA float64) {
78 // m.mu.Lock()
79 m.rEMA = rEMA
80 m.samples++
81 // m.mu.Unlock()
82}
83
84// IO is a convenience method intended to wrap io.Reader and io.Writer method
85// execution. It calls m.Update(n) and then returns (n, err) unmodified.
86func (m *Monitor) IO(n int, err error) (int, error) {
87 return m.Update(n), err
88}
89
90// Done marks the transfer as finished and prevents any further updates or
91// limiting. Instantaneous and current transfer rates drop to 0. Update, IO, and
92// Limit methods become NOOPs. It returns the total number of bytes transferred.
93func (m *Monitor) Done() int64 {
94 // m.mu.Lock()
95 if now := m.update(0); m.sBytes > 0 {
96 m.reset(now)
97 }
98 m.active = false
99 m.tLast = 0
100 n := m.bytes
101 // m.mu.Unlock()
102 return n
103}
104
105// timeRemLimit is the maximum Status.TimeRem value.
106const timeRemLimit = 999*time.Hour + 59*time.Minute + 59*time.Second
107
108// Status represents the current Monitor status. All transfer rates are in bytes
109// per second rounded to the nearest byte.
110type Status struct {
111 Active bool // Flag indicating an active transfer
112 Start time.Time // Transfer start time
113 Duration time.Duration // Time period covered by the statistics
114 Idle time.Duration // Time since the last transfer of at least 1 byte
115 Bytes int64 // Total number of bytes transferred
116 Samples int64 // Total number of samples taken
117 InstRate int64 // Instantaneous transfer rate
118 CurRate int64 // Current transfer rate (EMA of InstRate)
119 AvgRate int64 // Average transfer rate (Bytes / Duration)
120 PeakRate int64 // Maximum instantaneous transfer rate
121 BytesRem int64 // Number of bytes remaining in the transfer
122 TimeRem time.Duration // Estimated time to completion
123 Progress Percent // Overall transfer progress
124}
125
126func (s Status) String() string {
127 return "STATUS{}"
128}
129
130// Status returns current transfer status information. The returned value
131// becomes static after a call to Done.
132func (m *Monitor) Status() Status {
133 // m.mu.Lock()
134 now := m.update(0)
135 s := Status{
136 Active: m.active,
137 Start: clockToTime(m.start),
138 Duration: m.sLast - m.start,
139 Idle: now - m.tLast,
140 Bytes: m.bytes,
141 Samples: m.samples,
142 PeakRate: round(m.rPeak),
143 BytesRem: m.tBytes - m.bytes,
144 Progress: percentOf(float64(m.bytes), float64(m.tBytes)),
145 }
146 if s.BytesRem < 0 {
147 s.BytesRem = 0
148 }
149 if s.Duration > 0 {
150 rAvg := float64(s.Bytes) / s.Duration.Seconds()
151 s.AvgRate = round(rAvg)
152 if s.Active {
153 s.InstRate = round(m.rSample)
154 s.CurRate = round(m.rEMA)
155 if s.BytesRem > 0 {
156 if tRate := 0.8*m.rEMA + 0.2*rAvg; tRate > 0 {
157 ns := float64(s.BytesRem) / tRate * 1e9
158 if ns > float64(timeRemLimit) {
159 ns = float64(timeRemLimit)
160 }
161 s.TimeRem = clockRound(time.Duration(ns))
162 }
163 }
164 }
165 }
166 // m.mu.Unlock()
167 return s
168}
169
170// Limit restricts the instantaneous (per-sample) data flow to rate bytes per
171// second. It returns the maximum number of bytes (0 <= n <= want) that may be
172// transferred immediately without exceeding the limit. If block == true, the
173// call blocks until n > 0. want is returned unmodified if want < 1, rate < 1,
174// or the transfer is inactive (after a call to Done).
175//
176// At least one byte is always allowed to be transferred in any given sampling
177// period. Thus, if the sampling rate is 100ms, the lowest achievable flow rate
178// is 10 bytes per second.
179//
180// For usage examples, see the implementation of Reader and Writer in io.go.
181func (m *Monitor) Limit(want int, rate int64, block bool) (n int) {
182 if block {
183 panic("blocking not yet supported")
184 }
185 if want < 1 || rate < 1 {
186 return want
187 }
188 // m.mu.Lock()
189
190 // Determine the maximum number of bytes that can be sent in one sample
191 limit := round(float64(rate) * m.sRate.Seconds())
192 if limit <= 0 {
193 limit = 1
194 }
195
196 _ = m.update(0)
197 /* XXX
198 // If block == true, wait until m.sBytes < limit
199 if now := m.update(0); block {
200 for m.sBytes >= limit && m.active {
201 now = m.waitNextSample(now)
202 }
203 }
204 */
205
206 // Make limit <= want (unlimited if the transfer is no longer active)
207 if limit -= m.sBytes; limit > int64(want) || !m.active {
208 limit = int64(want)
209 }
210 // m.mu.Unlock()
211
212 if limit < 0 {
213 limit = 0
214 }
215 return int(limit)
216}
217
218// SetTransferSize specifies the total size of the data transfer, which allows
219// the Monitor to calculate the overall progress and time to completion.
220func (m *Monitor) SetTransferSize(bytes int64) {
221 if bytes < 0 {
222 bytes = 0
223 }
224 // m.mu.Lock()
225 m.tBytes = bytes
226 // m.mu.Unlock()
227}
228
229// update accumulates the transferred byte count for the current sample until
230// clock() - m.sLast >= m.sRate. The monitor status is updated once the current
231// sample is done.
232func (m *Monitor) update(n int) (now time.Duration) {
233 if !m.active {
234 return
235 }
236 if now = clock(); n > 0 {
237 m.tLast = now
238 }
239 m.sBytes += int64(n)
240 if sTime := now - m.sLast; sTime >= m.sRate {
241 t := sTime.Seconds()
242 if m.rSample = float64(m.sBytes) / t; m.rSample > m.rPeak {
243 m.rPeak = m.rSample
244 }
245
246 // Exponential moving average using a method similar to *nix load
247 // average calculation. Longer sampling periods carry greater weight.
248 if m.samples > 0 {
249 w := math.Exp(-t / m.rWindow)
250 m.rEMA = m.rSample + w*(m.rEMA-m.rSample)
251 } else {
252 m.rEMA = m.rSample
253 }
254 m.reset(now)
255 }
256 return
257}
258
259// reset clears the current sample state in preparation for the next sample.
260func (m *Monitor) reset(sampleTime time.Duration) {
261 m.bytes += m.sBytes
262 m.samples++
263 m.sBytes = 0
264 m.sLast = sampleTime
265}
266
267/*
268// waitNextSample sleeps for the remainder of the current sample. The lock is
269// released and reacquired during the actual sleep period, so it's possible for
270// the transfer to be inactive when this method returns.
271func (m *Monitor) waitNextSample(now time.Duration) time.Duration {
272 const minWait = 5 * time.Millisecond
273 current := m.sLast
274
275 // sleep until the last sample time changes (ideally, just one iteration)
276 for m.sLast == current && m.active {
277 d := current + m.sRate - now
278 // m.mu.Unlock()
279 if d < minWait {
280 d = minWait
281 }
282 time.Sleep(d)
283 // m.mu.Lock()
284 now = m.update(0)
285 }
286 return now
287}
288*/
flow.gno
8.37 Kb ยท 288 lines