]> Sergey Matveev's repositories - btrtrc.git/blob - reader.go
Improve the log message when piece reads fail
[btrtrc.git] / reader.go
1 package torrent
2
3 import (
4         "errors"
5         "io"
6         "log"
7         "os"
8         "sync"
9
10         "github.com/anacrolix/missinggo"
11         "golang.org/x/net/context"
12 )
13
14 // Accesses torrent data via a client.
15 type Reader struct {
16         t          *Torrent
17         responsive bool
18         // Ensure operations that change the position are exclusive, like Read()
19         // and Seek().
20         opMu sync.Mutex
21
22         // Required when modifying pos and readahead, or reading them without
23         // opMu.
24         mu        sync.Mutex
25         pos       int64
26         readahead int64
27 }
28
29 var _ io.ReadCloser = &Reader{}
30
31 // Don't wait for pieces to complete and be verified. Read calls return as
32 // soon as they can when the underlying chunks become available.
33 func (r *Reader) SetResponsive() {
34         r.responsive = true
35 }
36
37 // Configure the number of bytes ahead of a read that should also be
38 // prioritized in preparation for further reads.
39 func (r *Reader) SetReadahead(readahead int64) {
40         r.mu.Lock()
41         r.readahead = readahead
42         r.mu.Unlock()
43         r.t.cl.mu.Lock()
44         defer r.t.cl.mu.Unlock()
45         r.tickleClient()
46 }
47
48 func (r *Reader) readable(off int64) (ret bool) {
49         if r.t.closed.IsSet() {
50                 return true
51         }
52         req, ok := r.t.offsetRequest(off)
53         if !ok {
54                 panic(off)
55         }
56         if r.responsive {
57                 return r.t.haveChunk(req)
58         }
59         return r.t.pieceComplete(int(req.Index))
60 }
61
62 // How many bytes are available to read. Max is the most we could require.
63 func (r *Reader) available(off, max int64) (ret int64) {
64         for max > 0 {
65                 req, ok := r.t.offsetRequest(off)
66                 if !ok {
67                         break
68                 }
69                 if !r.t.haveChunk(req) {
70                         break
71                 }
72                 len1 := int64(req.Length) - (off - r.t.requestOffset(req))
73                 max -= len1
74                 ret += len1
75                 off += len1
76         }
77         // Ensure that ret hasn't exceeded our original max.
78         if max < 0 {
79                 ret += max
80         }
81         return
82 }
83
84 func (r *Reader) tickleClient() {
85         r.t.readersChanged()
86 }
87
88 func (r *Reader) waitReadable(off int64) {
89         // We may have been sent back here because we were told we could read but
90         // it failed.
91         r.tickleClient()
92         r.t.cl.event.Wait()
93 }
94
95 func (r *Reader) Read(b []byte) (n int, err error) {
96         return r.ReadContext(b, context.Background())
97 }
98
99 func (r *Reader) ReadContext(b []byte, ctx context.Context) (n int, err error) {
100         // This is set under the Client lock if the Context is canceled.
101         var ctxErr error
102         if ctx.Done() != nil {
103                 ctx, cancel := context.WithCancel(ctx)
104                 // Abort the goroutine when the function returns.
105                 defer cancel()
106                 go func() {
107                         <-ctx.Done()
108                         r.t.cl.mu.Lock()
109                         ctxErr = ctx.Err()
110                         r.t.cl.event.Broadcast()
111                         r.t.cl.mu.Unlock()
112                 }()
113         }
114         // Hmmm, if a Read gets stuck, this means you can't change position for
115         // other purposes. That seems reasonable, but unusual.
116         r.opMu.Lock()
117         defer r.opMu.Unlock()
118         for len(b) != 0 {
119                 var n1 int
120                 n1, err = r.readOnceAt(b, r.pos, &ctxErr)
121                 if n1 == 0 {
122                         if err == nil {
123                                 panic("expected error")
124                         }
125                         break
126                 }
127                 b = b[n1:]
128                 n += n1
129                 r.mu.Lock()
130                 r.pos += int64(n1)
131                 r.mu.Unlock()
132         }
133         if r.pos >= r.t.length {
134                 err = io.EOF
135         } else if err == io.EOF {
136                 err = io.ErrUnexpectedEOF
137         }
138         return
139 }
140
141 // Wait until some data should be available to read. Tickles the client if it
142 // isn't. Returns how much should be readable without blocking.
143 func (r *Reader) waitAvailable(pos, wanted int64, ctxErr *error) (avail int64) {
144         r.t.cl.mu.Lock()
145         defer r.t.cl.mu.Unlock()
146         for !r.readable(pos) && *ctxErr == nil {
147                 r.waitReadable(pos)
148         }
149         return r.available(pos, wanted)
150 }
151
152 // Performs at most one successful read to torrent storage.
153 func (r *Reader) readOnceAt(b []byte, pos int64, ctxErr *error) (n int, err error) {
154         if pos >= r.t.length {
155                 err = io.EOF
156                 return
157         }
158         for {
159                 avail := r.waitAvailable(pos, int64(len(b)), ctxErr)
160                 if avail == 0 {
161                         if r.t.closed.IsSet() {
162                                 err = errors.New("torrent closed")
163                                 return
164                         }
165                         if *ctxErr != nil {
166                                 err = *ctxErr
167                                 return
168                         }
169                 }
170                 b1 := b[:avail]
171                 pi := int(pos / r.t.Info().PieceLength)
172                 ip := r.t.Info().Piece(pi)
173                 po := pos % r.t.Info().PieceLength
174                 missinggo.LimitLen(&b1, ip.Length()-po)
175                 n, err = r.t.readAt(b1, pos)
176                 if n != 0 {
177                         err = nil
178                         return
179                 }
180                 log.Printf("error reading torrent %q piece %d offset %d, %d bytes: %s", r.t, pi, po, len(b1), err)
181                 r.t.cl.mu.Lock()
182                 r.t.updateAllPieceCompletions()
183                 r.t.updatePiecePriorities()
184                 r.t.cl.mu.Unlock()
185         }
186 }
187
188 func (r *Reader) Close() error {
189         r.t.deleteReader(r)
190         r.t = nil
191         return nil
192 }
193
194 func (r *Reader) posChanged() {
195         r.t.cl.mu.Lock()
196         defer r.t.cl.mu.Unlock()
197         r.t.readersChanged()
198 }
199
200 func (r *Reader) Seek(off int64, whence int) (ret int64, err error) {
201         r.opMu.Lock()
202         defer r.opMu.Unlock()
203
204         r.mu.Lock()
205         switch whence {
206         case os.SEEK_SET:
207                 r.pos = off
208         case os.SEEK_CUR:
209                 r.pos += off
210         case os.SEEK_END:
211                 r.pos = r.t.info.TotalLength() + off
212         default:
213                 err = errors.New("bad whence")
214         }
215         ret = r.pos
216         r.mu.Unlock()
217
218         r.posChanged()
219         return
220 }
221
222 func (r *Reader) Torrent() *Torrent {
223         return r.t
224 }