]> Sergey Matveev's repositories - btrtrc.git/blob - reader.go
Suppress piece read errors when data is obtained
[btrtrc.git] / reader.go
1 package torrent
2
3 import (
4         "errors"
5         "io"
6         "os"
7         "sync"
8
9         "github.com/anacrolix/missinggo"
10 )
11
12 // Accesses torrent data via a client.
13 type Reader struct {
14         t          *Torrent
15         responsive bool
16         // Ensure operations that change the position are exclusive, like Read()
17         // and Seek().
18         opMu sync.Mutex
19
20         // Required when modifying pos and readahead, or reading them without
21         // opMu.
22         mu        sync.Mutex
23         pos       int64
24         readahead int64
25 }
26
27 var _ io.ReadCloser = &Reader{}
28
29 // Don't wait for pieces to complete and be verified. Read calls return as
30 // soon as they can when the underlying chunks become available.
31 func (r *Reader) SetResponsive() {
32         r.responsive = true
33 }
34
35 // Configure the number of bytes ahead of a read that should also be
36 // prioritized in preparation for further reads.
37 func (r *Reader) SetReadahead(readahead int64) {
38         r.mu.Lock()
39         r.readahead = readahead
40         r.mu.Unlock()
41         r.t.cl.mu.Lock()
42         defer r.t.cl.mu.Unlock()
43         r.tickleClient()
44 }
45
46 func (r *Reader) readable(off int64) (ret bool) {
47         if r.torrentClosed() {
48                 return true
49         }
50         req, ok := r.t.torrent.offsetRequest(off)
51         if !ok {
52                 panic(off)
53         }
54         if r.responsive {
55                 return r.t.torrent.haveChunk(req)
56         }
57         return r.t.torrent.pieceComplete(int(req.Index))
58 }
59
60 // How many bytes are available to read. Max is the most we could require.
61 func (r *Reader) available(off, max int64) (ret int64) {
62         for max > 0 {
63                 req, ok := r.t.torrent.offsetRequest(off)
64                 if !ok {
65                         break
66                 }
67                 if !r.t.torrent.haveChunk(req) {
68                         break
69                 }
70                 len1 := int64(req.Length) - (off - r.t.torrent.requestOffset(req))
71                 max -= len1
72                 ret += len1
73                 off += len1
74         }
75         // Ensure that ret hasn't exceeded our original max.
76         if max < 0 {
77                 ret += max
78         }
79         return
80 }
81
82 func (r *Reader) tickleClient() {
83         r.t.torrent.readersChanged()
84 }
85
86 func (r *Reader) waitReadable(off int64) {
87         // We may have been sent back here because we were told we could read but
88         // it failed.
89         r.tickleClient()
90         r.t.cl.event.Wait()
91 }
92
93 func (r *Reader) Read(b []byte) (n int, err error) {
94         r.opMu.Lock()
95         defer r.opMu.Unlock()
96         for len(b) != 0 {
97                 var n1 int
98                 n1, err = r.readOnceAt(b, r.pos)
99                 if n1 == 0 {
100                         if err == nil {
101                                 panic("expected error")
102                         }
103                         break
104                 }
105                 b = b[n1:]
106                 n += n1
107                 r.mu.Lock()
108                 r.pos += int64(n1)
109                 r.mu.Unlock()
110         }
111         if r.pos >= r.t.torrent.length {
112                 err = io.EOF
113         } else if err == io.EOF {
114                 err = io.ErrUnexpectedEOF
115         }
116         return
117 }
118
119 // Safe to call with or without client lock.
120 func (r *Reader) torrentClosed() bool {
121         return r.t.torrent.isClosed()
122 }
123
124 // Wait until some data should be available to read. Tickles the client if it
125 // isn't. Returns how much should be readable without blocking.
126 func (r *Reader) waitAvailable(pos, wanted int64) (avail int64) {
127         r.t.cl.mu.Lock()
128         defer r.t.cl.mu.Unlock()
129         for !r.readable(pos) {
130                 r.waitReadable(pos)
131         }
132         return r.available(pos, wanted)
133 }
134
135 // Performs at most one successful read to torrent storage.
136 func (r *Reader) readOnceAt(b []byte, pos int64) (n int, err error) {
137         if pos >= r.t.torrent.length {
138                 err = io.EOF
139                 return
140         }
141         for {
142                 avail := r.waitAvailable(pos, int64(len(b)))
143                 if avail == 0 {
144                         if r.torrentClosed() {
145                                 err = errors.New("torrent closed")
146                                 return
147                         }
148                 }
149                 b1 := b[:avail]
150                 pi := int(pos / r.t.Info().PieceLength)
151                 ip := r.t.Info().Piece(pi)
152                 po := pos % ip.Length()
153                 missinggo.LimitLen(&b1, ip.Length()-po)
154                 n, err = r.t.torrent.readAt(b1, pos)
155                 if n != 0 {
156                         err = nil
157                         return
158                 }
159                 // log.Printf("%s: error reading from torrent storage pos=%d: %s", r.t, pos, err)
160                 r.t.cl.mu.Lock()
161                 r.t.torrent.updateAllPieceCompletions()
162                 r.t.torrent.updatePiecePriorities()
163                 r.t.cl.mu.Unlock()
164         }
165 }
166
167 func (r *Reader) Close() error {
168         r.t.deleteReader(r)
169         r.t = nil
170         return nil
171 }
172
173 func (r *Reader) posChanged() {
174         r.t.cl.mu.Lock()
175         defer r.t.cl.mu.Unlock()
176         r.t.torrent.readersChanged()
177 }
178
179 func (r *Reader) Seek(off int64, whence int) (ret int64, err error) {
180         r.opMu.Lock()
181         defer r.opMu.Unlock()
182
183         r.mu.Lock()
184         switch whence {
185         case os.SEEK_SET:
186                 r.pos = off
187         case os.SEEK_CUR:
188                 r.pos += off
189         case os.SEEK_END:
190                 r.pos = r.t.torrent.Info.TotalLength() + off
191         default:
192                 err = errors.New("bad whence")
193         }
194         ret = r.pos
195         r.mu.Unlock()
196
197         r.posChanged()
198         return
199 }
200
201 func (r *Reader) Torrent() *Torrent {
202         return r.t
203 }