]> Sergey Matveev's repositories - btrtrc.git/blob - reader.go
Rework Reader position changes affecting piece priorities
[btrtrc.git] / reader.go
1 package torrent
2
3 import (
4         "errors"
5         "io"
6         "log"
7         "os"
8         "sync"
9
10         "github.com/anacrolix/missinggo"
11         "golang.org/x/net/context"
12 )
13
14 // Accesses torrent data via a client.
15 type Reader struct {
16         t          *Torrent
17         responsive bool
18         // Ensure operations that change the position are exclusive, like Read()
19         // and Seek().
20         opMu sync.Mutex
21
22         // Required when modifying pos and readahead, or reading them without
23         // opMu.
24         mu        sync.Locker
25         pos       int64
26         readahead int64
27 }
28
29 var _ io.ReadCloser = &Reader{}
30
31 // Don't wait for pieces to complete and be verified. Read calls return as
32 // soon as they can when the underlying chunks become available.
33 func (r *Reader) SetResponsive() {
34         r.responsive = true
35 }
36
37 // Configure the number of bytes ahead of a read that should also be
38 // prioritized in preparation for further reads.
39 func (r *Reader) SetReadahead(readahead int64) {
40         r.mu.Lock()
41         r.readahead = readahead
42         r.mu.Unlock()
43         r.t.cl.mu.Lock()
44         defer r.t.cl.mu.Unlock()
45         r.tickleClient()
46 }
47
48 func (r *Reader) readable(off int64) (ret bool) {
49         if r.t.closed.IsSet() {
50                 return true
51         }
52         req, ok := r.t.offsetRequest(off)
53         if !ok {
54                 panic(off)
55         }
56         if r.responsive {
57                 return r.t.haveChunk(req)
58         }
59         return r.t.pieceComplete(int(req.Index))
60 }
61
62 // How many bytes are available to read. Max is the most we could require.
63 func (r *Reader) available(off, max int64) (ret int64) {
64         for max > 0 {
65                 req, ok := r.t.offsetRequest(off)
66                 if !ok {
67                         break
68                 }
69                 if !r.t.haveChunk(req) {
70                         break
71                 }
72                 len1 := int64(req.Length) - (off - r.t.requestOffset(req))
73                 max -= len1
74                 ret += len1
75                 off += len1
76         }
77         // Ensure that ret hasn't exceeded our original max.
78         if max < 0 {
79                 ret += max
80         }
81         return
82 }
83
84 func (r *Reader) tickleClient() {
85         r.t.readersChanged()
86 }
87
88 func (r *Reader) waitReadable(off int64) {
89         // We may have been sent back here because we were told we could read but
90         // it failed.
91         r.tickleClient()
92         r.t.cl.event.Wait()
93 }
94
95 func (r *Reader) Read(b []byte) (n int, err error) {
96         return r.ReadContext(b, context.Background())
97 }
98
99 func (r *Reader) ReadContext(b []byte, ctx context.Context) (n int, err error) {
100         // This is set under the Client lock if the Context is canceled.
101         var ctxErr error
102         if ctx.Done() != nil {
103                 ctx, cancel := context.WithCancel(ctx)
104                 // Abort the goroutine when the function returns.
105                 defer cancel()
106                 go func() {
107                         <-ctx.Done()
108                         r.t.cl.mu.Lock()
109                         ctxErr = ctx.Err()
110                         r.t.cl.event.Broadcast()
111                         r.t.cl.mu.Unlock()
112                 }()
113         }
114         // Hmmm, if a Read gets stuck, this means you can't change position for
115         // other purposes. That seems reasonable, but unusual.
116         r.opMu.Lock()
117         defer r.opMu.Unlock()
118         for len(b) != 0 {
119                 var n1 int
120                 n1, err = r.readOnceAt(b, r.pos, &ctxErr)
121                 if n1 == 0 {
122                         if err == nil {
123                                 panic("expected error")
124                         }
125                         break
126                 }
127                 b = b[n1:]
128                 n += n1
129                 r.mu.Lock()
130                 r.pos += int64(n1)
131                 r.posChanged()
132                 r.mu.Unlock()
133         }
134         if r.pos >= r.t.length {
135                 err = io.EOF
136         } else if err == io.EOF {
137                 err = io.ErrUnexpectedEOF
138         }
139         return
140 }
141
142 // Wait until some data should be available to read. Tickles the client if it
143 // isn't. Returns how much should be readable without blocking.
144 func (r *Reader) waitAvailable(pos, wanted int64, ctxErr *error) (avail int64) {
145         r.t.cl.mu.Lock()
146         defer r.t.cl.mu.Unlock()
147         for !r.readable(pos) && *ctxErr == nil {
148                 r.waitReadable(pos)
149         }
150         return r.available(pos, wanted)
151 }
152
153 // Performs at most one successful read to torrent storage.
154 func (r *Reader) readOnceAt(b []byte, pos int64, ctxErr *error) (n int, err error) {
155         if pos >= r.t.length {
156                 err = io.EOF
157                 return
158         }
159         for {
160                 avail := r.waitAvailable(pos, int64(len(b)), ctxErr)
161                 if avail == 0 {
162                         if r.t.closed.IsSet() {
163                                 err = errors.New("torrent closed")
164                                 return
165                         }
166                         if *ctxErr != nil {
167                                 err = *ctxErr
168                                 return
169                         }
170                 }
171                 b1 := b[:avail]
172                 pi := int(pos / r.t.Info().PieceLength)
173                 ip := r.t.Info().Piece(pi)
174                 po := pos % r.t.Info().PieceLength
175                 missinggo.LimitLen(&b1, ip.Length()-po)
176                 n, err = r.t.readAt(b1, pos)
177                 if n != 0 {
178                         err = nil
179                         return
180                 }
181                 log.Printf("error reading torrent %q piece %d offset %d, %d bytes: %s", r.t, pi, po, len(b1), err)
182                 r.t.cl.mu.Lock()
183                 r.t.updateAllPieceCompletions()
184                 r.t.updatePiecePriorities()
185                 r.t.cl.mu.Unlock()
186         }
187 }
188
189 func (r *Reader) Close() error {
190         r.t.deleteReader(r)
191         r.t = nil
192         return nil
193 }
194
195 func (r *Reader) posChanged() {
196         r.t.readersChanged()
197 }
198
199 func (r *Reader) Seek(off int64, whence int) (ret int64, err error) {
200         r.opMu.Lock()
201         defer r.opMu.Unlock()
202
203         r.mu.Lock()
204         defer r.mu.Unlock()
205         switch whence {
206         case os.SEEK_SET:
207                 r.pos = off
208         case os.SEEK_CUR:
209                 r.pos += off
210         case os.SEEK_END:
211                 r.pos = r.t.info.TotalLength() + off
212         default:
213                 err = errors.New("bad whence")
214         }
215         ret = r.pos
216
217         r.posChanged()
218         return
219 }
220
221 func (r *Reader) Torrent() *Torrent {
222         return r.t
223 }