]> Sergey Matveev's repositories - btrtrc.git/blob - reader.go
Merge commit 'c8dffdb'
[btrtrc.git] / reader.go
1 package torrent
2
3 import (
4         "errors"
5         "io"
6         "log"
7         "os"
8         "sync"
9
10         "github.com/anacrolix/missinggo"
11         "golang.org/x/net/context"
12 )
13
14 // Piece range by piece index, [begin, end).
15 type pieceRange struct {
16         begin, end int
17 }
18
19 // Accesses Torrent data via a Client. Reads block until the data is
20 // available. Seeks and readahead also drive Client behaviour.
21 type Reader struct {
22         t          *Torrent
23         responsive bool
24         // Ensure operations that change the position are exclusive, like Read()
25         // and Seek().
26         opMu sync.Mutex
27
28         // Required when modifying pos and readahead, or reading them without
29         // opMu.
30         mu        sync.Locker
31         pos       int64
32         readahead int64
33         // The cached piece range this reader wants downloaded. The zero value
34         // corresponds to nothing. We cache this so that changes can be detected,
35         // and bubbled up to the Torrent only as required.
36         pieces pieceRange
37 }
38
39 var _ io.ReadCloser = &Reader{}
40
41 // Don't wait for pieces to complete and be verified. Read calls return as
42 // soon as they can when the underlying chunks become available.
43 func (r *Reader) SetResponsive() {
44         r.responsive = true
45 }
46
47 // Configure the number of bytes ahead of a read that should also be
48 // prioritized in preparation for further reads.
49 func (r *Reader) SetReadahead(readahead int64) {
50         r.mu.Lock()
51         r.readahead = readahead
52         r.mu.Unlock()
53         r.t.cl.mu.Lock()
54         defer r.t.cl.mu.Unlock()
55         r.posChanged()
56 }
57
58 func (r *Reader) readable(off int64) (ret bool) {
59         if r.t.closed.IsSet() {
60                 return true
61         }
62         req, ok := r.t.offsetRequest(off)
63         if !ok {
64                 panic(off)
65         }
66         if r.responsive {
67                 return r.t.haveChunk(req)
68         }
69         return r.t.pieceComplete(int(req.Index))
70 }
71
72 // How many bytes are available to read. Max is the most we could require.
73 func (r *Reader) available(off, max int64) (ret int64) {
74         for max > 0 {
75                 req, ok := r.t.offsetRequest(off)
76                 if !ok {
77                         break
78                 }
79                 if !r.t.haveChunk(req) {
80                         break
81                 }
82                 len1 := int64(req.Length) - (off - r.t.requestOffset(req))
83                 max -= len1
84                 ret += len1
85                 off += len1
86         }
87         // Ensure that ret hasn't exceeded our original max.
88         if max < 0 {
89                 ret += max
90         }
91         return
92 }
93
94 func (r *Reader) tickleClient() {
95         r.t.readersChanged()
96 }
97
98 func (r *Reader) waitReadable(off int64) {
99         // We may have been sent back here because we were told we could read but
100         // it failed.
101         r.tickleClient()
102         r.t.cl.event.Wait()
103 }
104
105 // Calculates the pieces this reader wants downloaded, ignoring the cached
106 // value at r.pieces.
107 func (r *Reader) piecesUncached() (ret pieceRange) {
108         ra := r.readahead
109         if ra < 1 {
110                 ra = 1
111         }
112         ret.begin, ret.end = r.t.byteRegionPieces(r.pos, ra)
113         return
114 }
115
116 func (r *Reader) Read(b []byte) (n int, err error) {
117         return r.ReadContext(b, context.Background())
118 }
119
120 func (r *Reader) ReadContext(b []byte, ctx context.Context) (n int, err error) {
121         // This is set under the Client lock if the Context is canceled.
122         var ctxErr error
123         if ctx.Done() != nil {
124                 ctx, cancel := context.WithCancel(ctx)
125                 // Abort the goroutine when the function returns.
126                 defer cancel()
127                 go func() {
128                         <-ctx.Done()
129                         r.t.cl.mu.Lock()
130                         ctxErr = ctx.Err()
131                         r.t.cl.event.Broadcast()
132                         r.t.cl.mu.Unlock()
133                 }()
134         }
135         // Hmmm, if a Read gets stuck, this means you can't change position for
136         // other purposes. That seems reasonable, but unusual.
137         r.opMu.Lock()
138         defer r.opMu.Unlock()
139         for len(b) != 0 {
140                 var n1 int
141                 n1, err = r.readOnceAt(b, r.pos, &ctxErr)
142                 if n1 == 0 {
143                         if err == nil {
144                                 panic("expected error")
145                         }
146                         break
147                 }
148                 b = b[n1:]
149                 n += n1
150                 r.mu.Lock()
151                 r.pos += int64(n1)
152                 r.posChanged()
153                 r.mu.Unlock()
154         }
155         if r.pos >= r.t.length {
156                 err = io.EOF
157         } else if err == io.EOF {
158                 err = io.ErrUnexpectedEOF
159         }
160         return
161 }
162
163 // Wait until some data should be available to read. Tickles the client if it
164 // isn't. Returns how much should be readable without blocking.
165 func (r *Reader) waitAvailable(pos, wanted int64, ctxErr *error) (avail int64) {
166         r.t.cl.mu.Lock()
167         defer r.t.cl.mu.Unlock()
168         for !r.readable(pos) && *ctxErr == nil {
169                 r.waitReadable(pos)
170         }
171         return r.available(pos, wanted)
172 }
173
174 // Performs at most one successful read to torrent storage.
175 func (r *Reader) readOnceAt(b []byte, pos int64, ctxErr *error) (n int, err error) {
176         if pos >= r.t.length {
177                 err = io.EOF
178                 return
179         }
180         for {
181                 avail := r.waitAvailable(pos, int64(len(b)), ctxErr)
182                 if avail == 0 {
183                         if r.t.closed.IsSet() {
184                                 err = errors.New("torrent closed")
185                                 return
186                         }
187                         if *ctxErr != nil {
188                                 err = *ctxErr
189                                 return
190                         }
191                 }
192                 b1 := b[:avail]
193                 pi := int(pos / r.t.Info().PieceLength)
194                 ip := r.t.Info().Piece(pi)
195                 po := pos % r.t.Info().PieceLength
196                 missinggo.LimitLen(&b1, ip.Length()-po)
197                 n, err = r.t.readAt(b1, pos)
198                 if n != 0 {
199                         err = nil
200                         return
201                 }
202                 log.Printf("error reading torrent %q piece %d offset %d, %d bytes: %s", r.t, pi, po, len(b1), err)
203                 r.t.cl.mu.Lock()
204                 r.t.updateAllPieceCompletions()
205                 r.t.updateAllPiecePriorities()
206                 r.t.cl.mu.Unlock()
207         }
208 }
209
210 func (r *Reader) Close() error {
211         r.t.cl.mu.Lock()
212         defer r.t.cl.mu.Unlock()
213         r.t.deleteReader(r)
214         return nil
215 }
216
217 func (r *Reader) posChanged() {
218         to := r.piecesUncached()
219         from := r.pieces
220         if to == from {
221                 return
222         }
223         r.pieces = to
224         r.t.readerPosChanged(from, to)
225 }
226
227 func (r *Reader) Seek(off int64, whence int) (ret int64, err error) {
228         r.opMu.Lock()
229         defer r.opMu.Unlock()
230
231         r.mu.Lock()
232         defer r.mu.Unlock()
233         switch whence {
234         case os.SEEK_SET:
235                 r.pos = off
236         case os.SEEK_CUR:
237                 r.pos += off
238         case os.SEEK_END:
239                 r.pos = r.t.info.TotalLength() + off
240         default:
241                 err = errors.New("bad whence")
242         }
243         ret = r.pos
244
245         r.posChanged()
246         return
247 }
248
249 func (r *Reader) Torrent() *Torrent {
250         return r.t
251 }