]> Sergey Matveev's repositories - btrtrc.git/blob - reader.go
Massively reduce CPU when Readers are blocked on unavailable data
[btrtrc.git] / reader.go
1 package torrent
2
3 import (
4         "errors"
5         "io"
6         "log"
7         "os"
8         "sync"
9
10         "github.com/anacrolix/missinggo"
11         "golang.org/x/net/context"
12 )
13
14 // Piece range by piece index, [begin, end).
15 type pieceRange struct {
16         begin, end int
17 }
18
19 // Accesses Torrent data via a Client. Reads block until the data is
20 // available. Seeks and readahead also drive Client behaviour.
21 type Reader struct {
22         t          *Torrent
23         responsive bool
24         // Ensure operations that change the position are exclusive, like Read()
25         // and Seek().
26         opMu sync.Mutex
27
28         // Required when modifying pos and readahead, or reading them without
29         // opMu.
30         mu        sync.Locker
31         pos       int64
32         readahead int64
33         // The cached piece range this reader wants downloaded. The zero value
34         // corresponds to nothing. We cache this so that changes can be detected,
35         // and bubbled up to the Torrent only as required.
36         pieces pieceRange
37 }
38
39 var _ io.ReadCloser = &Reader{}
40
41 // Don't wait for pieces to complete and be verified. Read calls return as
42 // soon as they can when the underlying chunks become available.
43 func (r *Reader) SetResponsive() {
44         r.responsive = true
45 }
46
47 // Configure the number of bytes ahead of a read that should also be
48 // prioritized in preparation for further reads.
49 func (r *Reader) SetReadahead(readahead int64) {
50         r.mu.Lock()
51         r.readahead = readahead
52         r.mu.Unlock()
53         r.t.cl.mu.Lock()
54         defer r.t.cl.mu.Unlock()
55         r.posChanged()
56 }
57
58 func (r *Reader) readable(off int64) (ret bool) {
59         if r.t.closed.IsSet() {
60                 return true
61         }
62         req, ok := r.t.offsetRequest(off)
63         if !ok {
64                 panic(off)
65         }
66         if r.responsive {
67                 return r.t.haveChunk(req)
68         }
69         return r.t.pieceComplete(int(req.Index))
70 }
71
72 // How many bytes are available to read. Max is the most we could require.
73 func (r *Reader) available(off, max int64) (ret int64) {
74         for max > 0 {
75                 req, ok := r.t.offsetRequest(off)
76                 if !ok {
77                         break
78                 }
79                 if !r.t.haveChunk(req) {
80                         break
81                 }
82                 len1 := int64(req.Length) - (off - r.t.requestOffset(req))
83                 max -= len1
84                 ret += len1
85                 off += len1
86         }
87         // Ensure that ret hasn't exceeded our original max.
88         if max < 0 {
89                 ret += max
90         }
91         return
92 }
93
94 func (r *Reader) tickleClient() {
95         r.t.readersChanged()
96 }
97
98 func (r *Reader) waitReadable(off int64) {
99         // We may have been sent back here because we were told we could read but
100         // it failed.
101         r.t.cl.event.Wait()
102 }
103
104 // Calculates the pieces this reader wants downloaded, ignoring the cached
105 // value at r.pieces.
106 func (r *Reader) piecesUncached() (ret pieceRange) {
107         ra := r.readahead
108         if ra < 1 {
109                 ra = 1
110         }
111         ret.begin, ret.end = r.t.byteRegionPieces(r.pos, ra)
112         return
113 }
114
115 func (r *Reader) Read(b []byte) (n int, err error) {
116         return r.ReadContext(b, context.Background())
117 }
118
119 func (r *Reader) ReadContext(b []byte, ctx context.Context) (n int, err error) {
120         // This is set under the Client lock if the Context is canceled.
121         var ctxErr error
122         if ctx.Done() != nil {
123                 ctx, cancel := context.WithCancel(ctx)
124                 // Abort the goroutine when the function returns.
125                 defer cancel()
126                 go func() {
127                         <-ctx.Done()
128                         r.t.cl.mu.Lock()
129                         ctxErr = ctx.Err()
130                         r.t.cl.event.Broadcast()
131                         r.t.cl.mu.Unlock()
132                 }()
133         }
134         // Hmmm, if a Read gets stuck, this means you can't change position for
135         // other purposes. That seems reasonable, but unusual.
136         r.opMu.Lock()
137         defer r.opMu.Unlock()
138         for len(b) != 0 {
139                 var n1 int
140                 n1, err = r.readOnceAt(b, r.pos, &ctxErr)
141                 if n1 == 0 {
142                         if err == nil {
143                                 panic("expected error")
144                         }
145                         break
146                 }
147                 b = b[n1:]
148                 n += n1
149                 r.mu.Lock()
150                 r.pos += int64(n1)
151                 r.posChanged()
152                 r.mu.Unlock()
153         }
154         if r.pos >= r.t.length {
155                 err = io.EOF
156         } else if err == io.EOF {
157                 err = io.ErrUnexpectedEOF
158         }
159         return
160 }
161
162 // Wait until some data should be available to read. Tickles the client if it
163 // isn't. Returns how much should be readable without blocking.
164 func (r *Reader) waitAvailable(pos, wanted int64, ctxErr *error) (avail int64) {
165         r.t.cl.mu.Lock()
166         defer r.t.cl.mu.Unlock()
167         for !r.readable(pos) && *ctxErr == nil {
168                 r.waitReadable(pos)
169         }
170         return r.available(pos, wanted)
171 }
172
173 // Performs at most one successful read to torrent storage.
174 func (r *Reader) readOnceAt(b []byte, pos int64, ctxErr *error) (n int, err error) {
175         if pos >= r.t.length {
176                 err = io.EOF
177                 return
178         }
179         for {
180                 avail := r.waitAvailable(pos, int64(len(b)), ctxErr)
181                 if avail == 0 {
182                         if r.t.closed.IsSet() {
183                                 err = errors.New("torrent closed")
184                                 return
185                         }
186                         if *ctxErr != nil {
187                                 err = *ctxErr
188                                 return
189                         }
190                 }
191                 b1 := b[:avail]
192                 pi := int(pos / r.t.Info().PieceLength)
193                 ip := r.t.Info().Piece(pi)
194                 po := pos % r.t.Info().PieceLength
195                 missinggo.LimitLen(&b1, ip.Length()-po)
196                 n, err = r.t.readAt(b1, pos)
197                 if n != 0 {
198                         err = nil
199                         return
200                 }
201                 log.Printf("error reading torrent %q piece %d offset %d, %d bytes: %s", r.t, pi, po, len(b1), err)
202                 r.t.cl.mu.Lock()
203                 r.t.updateAllPieceCompletions()
204                 r.t.updateAllPiecePriorities()
205                 r.t.cl.mu.Unlock()
206         }
207 }
208
209 func (r *Reader) Close() error {
210         r.t.cl.mu.Lock()
211         defer r.t.cl.mu.Unlock()
212         r.t.deleteReader(r)
213         return nil
214 }
215
216 func (r *Reader) posChanged() {
217         to := r.piecesUncached()
218         from := r.pieces
219         if to == from {
220                 return
221         }
222         r.pieces = to
223         r.t.readerPosChanged(from, to)
224 }
225
226 func (r *Reader) Seek(off int64, whence int) (ret int64, err error) {
227         r.opMu.Lock()
228         defer r.opMu.Unlock()
229
230         r.mu.Lock()
231         defer r.mu.Unlock()
232         switch whence {
233         case os.SEEK_SET:
234                 r.pos = off
235         case os.SEEK_CUR:
236                 r.pos += off
237         case os.SEEK_END:
238                 r.pos = r.t.info.TotalLength() + off
239         default:
240                 err = errors.New("bad whence")
241         }
242         ret = r.pos
243
244         r.posChanged()
245         return
246 }
247
248 func (r *Reader) Torrent() *Torrent {
249         return r.t
250 }