]> Sergey Matveev's repositories - btrtrc.git/blob - reader.go
Apply megacheck to torrent package
[btrtrc.git] / reader.go
1 package torrent
2
3 import (
4         "errors"
5         "io"
6         "log"
7         "sync"
8
9         "github.com/anacrolix/missinggo"
10         "golang.org/x/net/context"
11 )
12
13 // Piece range by piece index, [begin, end).
14 type pieceRange struct {
15         begin, end int
16 }
17
18 // Accesses Torrent data via a Client. Reads block until the data is
19 // available. Seeks and readahead also drive Client behaviour.
20 type Reader struct {
21         t          *Torrent
22         responsive bool
23         // Ensure operations that change the position are exclusive, like Read()
24         // and Seek().
25         opMu sync.Mutex
26
27         // Required when modifying pos and readahead, or reading them without
28         // opMu.
29         mu        sync.Locker
30         pos       int64
31         readahead int64
32         // The cached piece range this reader wants downloaded. The zero value
33         // corresponds to nothing. We cache this so that changes can be detected,
34         // and bubbled up to the Torrent only as required.
35         pieces pieceRange
36 }
37
38 var _ io.ReadCloser = &Reader{}
39
40 // Don't wait for pieces to complete and be verified. Read calls return as
41 // soon as they can when the underlying chunks become available.
42 func (r *Reader) SetResponsive() {
43         r.responsive = true
44 }
45
46 // Disable responsive mode.
47 func (r *Reader) SetNonResponsive() {
48         r.responsive = false
49 }
50
51 // Configure the number of bytes ahead of a read that should also be
52 // prioritized in preparation for further reads.
53 func (r *Reader) SetReadahead(readahead int64) {
54         r.mu.Lock()
55         r.readahead = readahead
56         r.mu.Unlock()
57         r.t.cl.mu.Lock()
58         defer r.t.cl.mu.Unlock()
59         r.posChanged()
60 }
61
62 // Return reader's current position.
63 func (r *Reader) CurrentPos() int64 {
64         return r.pos
65 }
66
67 func (r *Reader) readable(off int64) (ret bool) {
68         if r.t.closed.IsSet() {
69                 return true
70         }
71         req, ok := r.t.offsetRequest(off)
72         if !ok {
73                 panic(off)
74         }
75         if r.responsive {
76                 return r.t.haveChunk(req)
77         }
78         return r.t.pieceComplete(int(req.Index))
79 }
80
81 // How many bytes are available to read. Max is the most we could require.
82 func (r *Reader) available(off, max int64) (ret int64) {
83         for max > 0 {
84                 req, ok := r.t.offsetRequest(off)
85                 if !ok {
86                         break
87                 }
88                 if !r.t.haveChunk(req) {
89                         break
90                 }
91                 len1 := int64(req.Length) - (off - r.t.requestOffset(req))
92                 max -= len1
93                 ret += len1
94                 off += len1
95         }
96         // Ensure that ret hasn't exceeded our original max.
97         if max < 0 {
98                 ret += max
99         }
100         return
101 }
102
103 func (r *Reader) waitReadable(off int64) {
104         // We may have been sent back here because we were told we could read but
105         // it failed.
106         r.t.cl.event.Wait()
107 }
108
109 // Calculates the pieces this reader wants downloaded, ignoring the cached
110 // value at r.pieces.
111 func (r *Reader) piecesUncached() (ret pieceRange) {
112         ra := r.readahead
113         if ra < 1 {
114                 ra = 1
115         }
116         ret.begin, ret.end = r.t.byteRegionPieces(r.pos, ra)
117         return
118 }
119
120 func (r *Reader) Read(b []byte) (n int, err error) {
121         return r.ReadContext(context.Background(), b)
122 }
123
124 func (r *Reader) ReadContext(ctx context.Context, b []byte) (n int, err error) {
125         // This is set under the Client lock if the Context is canceled.
126         var ctxErr error
127         if ctx.Done() != nil {
128                 ctx, cancel := context.WithCancel(ctx)
129                 // Abort the goroutine when the function returns.
130                 defer cancel()
131                 go func() {
132                         <-ctx.Done()
133                         r.t.cl.mu.Lock()
134                         ctxErr = ctx.Err()
135                         r.t.cl.event.Broadcast()
136                         r.t.cl.mu.Unlock()
137                 }()
138         }
139         // Hmmm, if a Read gets stuck, this means you can't change position for
140         // other purposes. That seems reasonable, but unusual.
141         r.opMu.Lock()
142         defer r.opMu.Unlock()
143         for len(b) != 0 {
144                 var n1 int
145                 n1, err = r.readOnceAt(b, r.pos, &ctxErr)
146                 if n1 == 0 {
147                         if err == nil {
148                                 panic("expected error")
149                         }
150                         break
151                 }
152                 b = b[n1:]
153                 n += n1
154                 r.mu.Lock()
155                 r.pos += int64(n1)
156                 r.posChanged()
157                 r.mu.Unlock()
158         }
159         if r.pos >= r.t.length {
160                 err = io.EOF
161         } else if err == io.EOF {
162                 err = io.ErrUnexpectedEOF
163         }
164         return
165 }
166
167 // Wait until some data should be available to read. Tickles the client if it
168 // isn't. Returns how much should be readable without blocking.
169 func (r *Reader) waitAvailable(pos, wanted int64, ctxErr *error) (avail int64) {
170         r.t.cl.mu.Lock()
171         defer r.t.cl.mu.Unlock()
172         for !r.readable(pos) && *ctxErr == nil {
173                 r.waitReadable(pos)
174         }
175         return r.available(pos, wanted)
176 }
177
178 // Performs at most one successful read to torrent storage.
179 func (r *Reader) readOnceAt(b []byte, pos int64, ctxErr *error) (n int, err error) {
180         if pos >= r.t.length {
181                 err = io.EOF
182                 return
183         }
184         for {
185                 avail := r.waitAvailable(pos, int64(len(b)), ctxErr)
186                 if avail == 0 {
187                         if r.t.closed.IsSet() {
188                                 err = errors.New("torrent closed")
189                                 return
190                         }
191                         if *ctxErr != nil {
192                                 err = *ctxErr
193                                 return
194                         }
195                 }
196                 b1 := b[:avail]
197                 pi := int(pos / r.t.info.PieceLength)
198                 ip := r.t.info.Piece(pi)
199                 po := pos % r.t.info.PieceLength
200                 missinggo.LimitLen(&b1, ip.Length()-po)
201                 n, err = r.t.readAt(b1, pos)
202                 if n != 0 {
203                         err = nil
204                         return
205                 }
206                 r.t.cl.mu.Lock()
207                 log.Printf("error reading torrent %q piece %d offset %d, %d bytes: %s", r.t, pi, po, len(b1), err)
208                 r.t.updateAllPieceCompletions()
209                 r.t.updateAllPiecePriorities()
210                 r.t.cl.mu.Unlock()
211         }
212 }
213
214 func (r *Reader) Close() error {
215         r.t.cl.mu.Lock()
216         defer r.t.cl.mu.Unlock()
217         r.t.deleteReader(r)
218         return nil
219 }
220
221 func (r *Reader) posChanged() {
222         to := r.piecesUncached()
223         from := r.pieces
224         if to == from {
225                 return
226         }
227         r.pieces = to
228         r.t.readerPosChanged(from, to)
229 }
230
231 func (r *Reader) Seek(off int64, whence int) (ret int64, err error) {
232         r.opMu.Lock()
233         defer r.opMu.Unlock()
234
235         r.mu.Lock()
236         defer r.mu.Unlock()
237         switch whence {
238         case io.SeekStart:
239                 r.pos = off
240         case io.SeekCurrent:
241                 r.pos += off
242         case io.SeekEnd:
243                 r.pos = r.t.info.TotalLength() + off
244         default:
245                 err = errors.New("bad whence")
246         }
247         ret = r.pos
248
249         r.posChanged()
250         return
251 }
252
253 func (r *Reader) Torrent() *Torrent {
254         return r.t
255 }