1//
2// Written by Maxim Khitrov (November 2012)
3//
4
5package flow
6
7import (
8 "errors"
9 "io"
10)
11
12// ErrLimit is returned by the Writer when a non-blocking write is short due to
13// the transfer rate limit.
14var ErrLimit = errors.New("flowrate: flow rate limit exceeded")
15
16// Limiter is implemented by the Reader and Writer to provide a consistent
17// interface for monitoring and controlling data transfer.
18type Limiter interface {
19 Done() int64
20 Status() Status
21 SetTransferSize(bytes int64)
22 SetLimit(new int64) (old int64)
23 SetBlocking(new bool) (old bool)
24}
25
26// Reader implements io.ReadCloser with a restriction on the rate of data
27// transfer.
28type Reader struct {
29 io.Reader // Data source
30 *Monitor // Flow control monitor
31
32 limit int64 // Rate limit in bytes per second (unlimited when <= 0)
33 block bool // What to do when no new bytes can be read due to the limit
34}
35
36// NewReader restricts all Read operations on r to limit bytes per second.
37func NewReader(r io.Reader, limit int64) *Reader {
38 return &Reader{r, New(0, 0), limit, false} // XXX default false
39}
40
41// Read reads up to len(p) bytes into p without exceeding the current transfer
42// rate limit. It returns (0, nil) immediately if r is non-blocking and no new
43// bytes can be read at this time.
44func (r *Reader) Read(p []byte) (n int, err error) {
45 p = p[:r.Limit(len(p), r.limit, r.block)]
46 if len(p) > 0 {
47 n, err = r.IO(r.Reader.Read(p))
48 }
49 return
50}
51
52// SetLimit changes the transfer rate limit to new bytes per second and returns
53// the previous setting.
54func (r *Reader) SetLimit(new int64) (old int64) {
55 old, r.limit = r.limit, new
56 return
57}
58
59// SetBlocking changes the blocking behavior and returns the previous setting. A
60// Read call on a non-blocking reader returns immediately if no additional bytes
61// may be read at this time due to the rate limit.
62func (r *Reader) SetBlocking(new bool) (old bool) {
63 if new == true {
64 panic("blocking not yet supported")
65 }
66 old, r.block = r.block, new
67 return
68}
69
70// Close closes the underlying reader if it implements the io.Closer interface.
71func (r *Reader) Close() error {
72 defer r.Done()
73 if c, ok := r.Reader.(io.Closer); ok {
74 return c.Close()
75 }
76 return nil
77}
78
79// Writer implements io.WriteCloser with a restriction on the rate of data
80// transfer.
81type Writer struct {
82 io.Writer // Data destination
83 *Monitor // Flow control monitor
84
85 limit int64 // Rate limit in bytes per second (unlimited when <= 0)
86 block bool // What to do when no new bytes can be written due to the limit
87}
88
89// NewWriter restricts all Write operations on w to limit bytes per second. The
90// transfer rate and the default blocking behavior (true) can be changed
91// directly on the returned *Writer.
92func NewWriter(w io.Writer, limit int64) *Writer {
93 return &Writer{w, New(0, 0), limit, false} // XXX default false
94}
95
96// Write writes len(p) bytes from p to the underlying data stream without
97// exceeding the current transfer rate limit. It returns (n, ErrLimit) if w is
98// non-blocking and no additional bytes can be written at this time.
99func (w *Writer) Write(p []byte) (n int, err error) {
100 var c int
101 for len(p) > 0 && err == nil {
102 s := p[:w.Limit(len(p), w.limit, w.block)]
103 if len(s) > 0 {
104 c, err = w.IO(w.Writer.Write(s))
105 } else {
106 return n, ErrLimit
107 }
108 p = p[c:]
109 n += c
110 }
111 return
112}
113
114// SetLimit changes the transfer rate limit to new bytes per second and returns
115// the previous setting.
116func (w *Writer) SetLimit(new int64) (old int64) {
117 old, w.limit = w.limit, new
118 return
119}
120
121// SetBlocking changes the blocking behavior and returns the previous setting. A
122// Write call on a non-blocking writer returns as soon as no additional bytes
123// may be written at this time due to the rate limit.
124func (w *Writer) SetBlocking(new bool) (old bool) {
125 old, w.block = w.block, new
126 return
127}
128
129// Close closes the underlying writer if it implements the io.Closer interface.
130func (w *Writer) Close() error {
131 defer w.Done()
132 if c, ok := w.Writer.(io.Closer); ok {
133 return c.Close()
134 }
135 return nil
136}
io.gno
3.92 Kb ยท 136 lines