]> Sergey Matveev's repositories - btrtrc.git/blob - reader.go
Update the readahead window on read failure
[btrtrc.git] / reader.go
1 package torrent
2
3 import (
4         "context"
5         "errors"
6         "fmt"
7         "io"
8         "sync"
9
10         "github.com/anacrolix/log"
11         "github.com/anacrolix/missinggo"
12 )
13
14 type Reader interface {
15         io.Reader
16         io.Seeker
17         io.Closer
18         missinggo.ReadContexter
19         SetReadahead(int64)
20         SetResponsive()
21 }
22
23 // Piece range by piece index, [begin, end).
24 type pieceRange struct {
25         begin, end pieceIndex
26 }
27
28 // Accesses Torrent data via a Client. Reads block until the data is
29 // available. Seeks and readahead also drive Client behaviour.
30 type reader struct {
31         t          *Torrent
32         responsive bool
33         // Adjust the read/seek window to handle Readers locked to File extents
34         // and the like.
35         offset, length int64
36         // Ensure operations that change the position are exclusive, like Read()
37         // and Seek().
38         opMu sync.Mutex
39
40         // Required when modifying pos and readahead, or reading them without
41         // opMu.
42         mu        sync.Locker
43         pos       int64
44         readahead int64
45         // The cached piece range this reader wants downloaded. The zero value
46         // corresponds to nothing. We cache this so that changes can be detected,
47         // and bubbled up to the Torrent only as required.
48         pieces pieceRange
49 }
50
51 var _ io.ReadCloser = &reader{}
52
53 // Don't wait for pieces to complete and be verified. Read calls return as
54 // soon as they can when the underlying chunks become available.
55 func (r *reader) SetResponsive() {
56         r.responsive = true
57         r.t.cl.event.Broadcast()
58 }
59
60 // Disable responsive mode. TODO: Remove?
61 func (r *reader) SetNonResponsive() {
62         r.responsive = false
63         r.t.cl.event.Broadcast()
64 }
65
66 // Configure the number of bytes ahead of a read that should also be
67 // prioritized in preparation for further reads.
68 func (r *reader) SetReadahead(readahead int64) {
69         r.mu.Lock()
70         r.readahead = readahead
71         r.mu.Unlock()
72         r.t.cl.lock()
73         defer r.t.cl.unlock()
74         r.posChanged()
75 }
76
77 // How many bytes are available to read. Max is the most we could require.
78 func (r *reader) available(off, max int64) (ret int64) {
79         off += r.offset
80         for max > 0 {
81                 req, ok := r.t.offsetRequest(off)
82                 if !ok {
83                         break
84                 }
85                 if !r.responsive && !r.t.pieceComplete(pieceIndex(req.Index)) {
86                         break
87                 }
88                 if !r.t.haveChunk(req) {
89                         break
90                 }
91                 len1 := int64(req.Length) - (off - r.t.requestOffset(req))
92                 max -= len1
93                 ret += len1
94                 off += len1
95         }
96         // Ensure that ret hasn't exceeded our original max.
97         if max < 0 {
98                 ret += max
99         }
100         return
101 }
102
103 func (r *reader) waitReadable(off int64) {
104         // We may have been sent back here because we were told we could read but
105         // it failed.
106         r.t.cl.event.Wait()
107 }
108
109 // Calculates the pieces this reader wants downloaded, ignoring the cached
110 // value at r.pieces.
111 func (r *reader) piecesUncached() (ret pieceRange) {
112         ra := r.readahead
113         if ra < 1 {
114                 // Needs to be at least 1, because [x, x) means we don't want
115                 // anything.
116                 ra = 1
117         }
118         if ra > r.length-r.pos {
119                 ra = r.length - r.pos
120         }
121         ret.begin, ret.end = r.t.byteRegionPieces(r.torrentOffset(r.pos), ra)
122         return
123 }
124
125 func (r *reader) Read(b []byte) (n int, err error) {
126         return r.ReadContext(context.Background(), b)
127 }
128
129 func (r *reader) ReadContext(ctx context.Context, b []byte) (n int, err error) {
130         // This is set under the Client lock if the Context is canceled. I think we coordinate on a
131         // separate variable so as to avoid false negatives with race conditions due to Contexts being
132         // synchronized.
133         var ctxErr error
134         if ctx.Done() != nil {
135                 ctx, cancel := context.WithCancel(ctx)
136                 // Abort the goroutine when the function returns.
137                 defer cancel()
138                 go func() {
139                         <-ctx.Done()
140                         r.t.cl.lock()
141                         ctxErr = ctx.Err()
142                         r.t.tickleReaders()
143                         r.t.cl.unlock()
144                 }()
145         }
146         // Hmmm, if a Read gets stuck, this means you can't change position for
147         // other purposes. That seems reasonable, but unusual.
148         r.opMu.Lock()
149         defer r.opMu.Unlock()
150         n, err = r.readOnceAt(b, r.pos, &ctxErr)
151         if n == 0 {
152                 if err == nil {
153                         panic("expected error")
154                 } else {
155                         return
156                 }
157         }
158
159         r.mu.Lock()
160         r.pos += int64(n)
161         r.posChanged()
162         r.mu.Unlock()
163         if r.pos >= r.length {
164                 err = io.EOF
165         } else if err == io.EOF {
166                 err = io.ErrUnexpectedEOF
167         }
168         return
169 }
170
171 // Wait until some data should be available to read. Tickles the client if it
172 // isn't. Returns how much should be readable without blocking.
173 func (r *reader) waitAvailable(pos, wanted int64, ctxErr *error, wait bool) (avail int64, err error) {
174         r.t.cl.lock()
175         defer r.t.cl.unlock()
176         for {
177                 avail = r.available(pos, wanted)
178                 if avail != 0 {
179                         return
180                 }
181                 if r.t.closed.IsSet() {
182                         err = errors.New("torrent closed")
183                         return
184                 }
185                 if *ctxErr != nil {
186                         err = *ctxErr
187                         return
188                 }
189                 if r.t.dataDownloadDisallowed || !r.t.networkingEnabled {
190                         err = errors.New("downloading disabled and data not already available")
191                         return
192                 }
193                 if !wait {
194                         return
195                 }
196                 r.waitReadable(pos)
197         }
198 }
199
200 // Adds the reader's torrent offset to the reader object offset (for example the reader might be
201 // constrainted to a particular file within the torrent).
202 func (r *reader) torrentOffset(readerPos int64) int64 {
203         return r.offset + readerPos
204 }
205
206 // Performs at most one successful read to torrent storage.
207 func (r *reader) readOnceAt(b []byte, pos int64, ctxErr *error) (n int, err error) {
208         if pos >= r.length {
209                 err = io.EOF
210                 return
211         }
212         for {
213                 var avail int64
214                 avail, err = r.waitAvailable(pos, int64(len(b)), ctxErr, n == 0)
215                 if avail == 0 {
216                         return
217                 }
218                 firstPieceIndex := pieceIndex(r.torrentOffset(pos) / r.t.info.PieceLength)
219                 firstPieceOffset := r.torrentOffset(pos) % r.t.info.PieceLength
220                 b1 := missinggo.LimitLen(b, avail)
221                 n, err = r.t.readAt(b1, r.torrentOffset(pos))
222                 if n != 0 {
223                         err = nil
224                         return
225                 }
226                 r.t.cl.lock()
227                 // TODO: Just reset pieces in the readahead window. This might help
228                 // prevent thrashing with small caches and file and piece priorities.
229                 r.log(log.Fstr("error reading torrent %s piece %d offset %d, %d bytes: %v",
230                         r.t.infoHash.HexString(), firstPieceIndex, firstPieceOffset, len(b1), err))
231                 if !r.t.updatePieceCompletion(firstPieceIndex) {
232                         r.log(log.Fstr("piece %d completion unchanged", firstPieceIndex))
233                 }
234                 // Update the rest of the piece completions in the readahead window, without alerting to
235                 // changes (since only the first piece, the one above, could have generated the read error
236                 // we're currently handling).
237                 if r.pieces.begin != firstPieceIndex {
238                         panic(fmt.Sprint(r.pieces.begin, firstPieceIndex))
239                 }
240                 for index := r.pieces.begin + 1; index < r.pieces.end; index++ {
241                         r.t.updatePieceCompletion(index)
242                 }
243                 r.t.cl.unlock()
244         }
245 }
246
247 func (r *reader) Close() error {
248         r.t.cl.lock()
249         defer r.t.cl.unlock()
250         r.t.deleteReader(r)
251         return nil
252 }
253
254 func (r *reader) posChanged() {
255         to := r.piecesUncached()
256         from := r.pieces
257         if to == from {
258                 return
259         }
260         r.pieces = to
261         // log.Printf("reader pos changed %v->%v", from, to)
262         r.t.readerPosChanged(from, to)
263 }
264
265 func (r *reader) Seek(off int64, whence int) (ret int64, err error) {
266         r.opMu.Lock()
267         defer r.opMu.Unlock()
268
269         r.mu.Lock()
270         defer r.mu.Unlock()
271         switch whence {
272         case io.SeekStart:
273                 r.pos = off
274         case io.SeekCurrent:
275                 r.pos += off
276         case io.SeekEnd:
277                 r.pos = r.length + off
278         default:
279                 err = errors.New("bad whence")
280         }
281         ret = r.pos
282
283         r.posChanged()
284         return
285 }
286
287 func (r *reader) log(m log.Msg) {
288         r.t.logger.Log(m.Skip(1))
289 }