]> Sergey Matveev's repositories - btrtrc.git/blob - reader.go
Fix races in Reader
[btrtrc.git] / reader.go
1 package torrent
2
3 import (
4         "errors"
5         "io"
6         "os"
7         "sync"
8 )
9
10 // Accesses torrent data via a client.
11 type Reader struct {
12         t          *Torrent
13         responsive bool
14         // Ensure operations that change the position are exclusive, like Read()
15         // and Seek().
16         opMu sync.Mutex
17
18         // Required when modifying pos and readahead, or reading them without
19         // opMu.
20         mu        sync.Mutex
21         pos       int64
22         readahead int64
23 }
24
25 var _ io.ReadCloser = &Reader{}
26
27 // Don't wait for pieces to complete and be verified. Read calls return as
28 // soon as they can when the underlying chunks become available.
29 func (r *Reader) SetResponsive() {
30         r.responsive = true
31 }
32
33 // Configure the number of bytes ahead of a read that should also be
34 // prioritized in preparation for further reads.
35 func (r *Reader) SetReadahead(readahead int64) {
36         r.mu.Lock()
37         defer r.mu.Unlock()
38         r.readahead = readahead
39 }
40
41 func (r *Reader) readable(off int64) (ret bool) {
42         if r.torrentClosed() {
43                 return true
44         }
45         req, ok := r.t.torrent.offsetRequest(off)
46         if !ok {
47                 panic(off)
48         }
49         if r.responsive {
50                 return r.t.torrent.haveChunk(req)
51         }
52         return r.t.torrent.pieceComplete(int(req.Index))
53 }
54
55 // How many bytes are available to read. Max is the most we could require.
56 func (r *Reader) available(off, max int64) (ret int64) {
57         for max > 0 {
58                 req, ok := r.t.torrent.offsetRequest(off)
59                 if !ok {
60                         break
61                 }
62                 if !r.t.torrent.haveChunk(req) {
63                         break
64                 }
65                 len1 := int64(req.Length) - (off - r.t.torrent.requestOffset(req))
66                 max -= len1
67                 ret += len1
68                 off += len1
69         }
70         // Ensure that ret hasn't exceeded our original max.
71         if max < 0 {
72                 ret += max
73         }
74         return
75 }
76
77 func (r *Reader) tickleClient() {
78         r.t.torrent.readersChanged()
79 }
80
81 func (r *Reader) waitReadable(off int64) {
82         // We may have been sent back here because we were told we could read but
83         // it failed.
84         r.tickleClient()
85         r.t.cl.event.Wait()
86 }
87
88 func (r *Reader) Read(b []byte) (n int, err error) {
89         r.opMu.Lock()
90         defer r.opMu.Unlock()
91         for len(b) != 0 {
92                 var n1 int
93                 n1, err = r.readOnceAt(b, r.pos)
94                 if n1 == 0 {
95                         if err == nil {
96                                 panic("expected error")
97                         }
98                         break
99                 }
100                 b = b[n1:]
101                 n += n1
102                 r.mu.Lock()
103                 r.pos += int64(n1)
104                 r.mu.Unlock()
105         }
106         if r.pos >= r.t.torrent.length {
107                 err = io.EOF
108         } else if err == io.EOF {
109                 err = io.ErrUnexpectedEOF
110         }
111         return
112 }
113
114 // Safe to call with or without client lock.
115 func (r *Reader) torrentClosed() bool {
116         return r.t.torrent.isClosed()
117 }
118
119 // Wait until some data should be available to read. Tickles the client if it
120 // isn't. Returns how much should be readable without blocking.
121 func (r *Reader) waitAvailable(pos, wanted int64) (avail int64) {
122         r.t.cl.mu.Lock()
123         defer r.t.cl.mu.Unlock()
124         for !r.readable(pos) {
125                 r.waitReadable(pos)
126         }
127         return r.available(pos, wanted)
128 }
129
130 // Performs at most one successful read to torrent storage.
131 func (r *Reader) readOnceAt(b []byte, pos int64) (n int, err error) {
132         if pos >= r.t.torrent.length {
133                 err = io.EOF
134                 return
135         }
136         for {
137                 avail := r.waitAvailable(pos, int64(len(b)))
138                 if avail == 0 {
139                         if r.torrentClosed() {
140                                 err = errors.New("torrent closed")
141                                 return
142                         }
143                 }
144                 b1 := b[:avail]
145                 pi := int(pos / r.t.Info().PieceLength)
146                 ip := r.t.Info().Piece(pi)
147                 po := pos % ip.Length()
148                 if int64(len(b1)) > ip.Length()-po {
149                         b1 = b1[:ip.Length()-po]
150                 }
151                 n, err = r.t.torrent.readAt(b1, pos)
152                 if n != 0 {
153                         return
154                 }
155                 // log.Printf("%s: error reading from torrent storage pos=%d: %s", r.t, pos, err)
156                 r.t.cl.mu.Lock()
157                 r.t.torrent.updateAllPieceCompletions()
158                 r.t.torrent.updatePiecePriorities()
159                 r.t.cl.mu.Unlock()
160         }
161 }
162
163 func (r *Reader) Close() error {
164         r.t.deleteReader(r)
165         r.t = nil
166         return nil
167 }
168
169 func (r *Reader) posChanged() {
170         r.t.cl.mu.Lock()
171         defer r.t.cl.mu.Unlock()
172         r.t.torrent.readersChanged()
173 }
174
175 func (r *Reader) Seek(off int64, whence int) (ret int64, err error) {
176         r.opMu.Lock()
177         defer r.opMu.Unlock()
178
179         r.mu.Lock()
180         switch whence {
181         case os.SEEK_SET:
182                 r.pos = off
183         case os.SEEK_CUR:
184                 r.pos += off
185         case os.SEEK_END:
186                 r.pos = r.t.torrent.Info.TotalLength() + off
187         default:
188                 err = errors.New("bad whence")
189         }
190         ret = r.pos
191         r.mu.Unlock()
192
193         r.posChanged()
194         return
195 }