]> Sergey Matveev's repositories - btrtrc.git/blob - reader.go
Fix locks on Reader.SetReadahead
[btrtrc.git] / reader.go
1 package torrent
2
3 import (
4         "errors"
5         "io"
6         "os"
7         "sync"
8 )
9
10 // Accesses torrent data via a client.
11 type Reader struct {
12         t          *Torrent
13         responsive bool
14         // Ensure operations that change the position are exclusive, like Read()
15         // and Seek().
16         opMu sync.Mutex
17
18         // Required when modifying pos and readahead, or reading them without
19         // opMu.
20         mu        sync.Mutex
21         pos       int64
22         readahead int64
23 }
24
25 var _ io.ReadCloser = &Reader{}
26
27 // Don't wait for pieces to complete and be verified. Read calls return as
28 // soon as they can when the underlying chunks become available.
29 func (r *Reader) SetResponsive() {
30         r.responsive = true
31 }
32
33 // Configure the number of bytes ahead of a read that should also be
34 // prioritized in preparation for further reads.
35 func (r *Reader) SetReadahead(readahead int64) {
36         r.mu.Lock()
37         r.readahead = readahead
38         r.mu.Unlock()
39         r.t.cl.mu.Lock()
40         defer r.t.cl.mu.Unlock()
41         r.tickleClient()
42 }
43
44 func (r *Reader) readable(off int64) (ret bool) {
45         if r.torrentClosed() {
46                 return true
47         }
48         req, ok := r.t.torrent.offsetRequest(off)
49         if !ok {
50                 panic(off)
51         }
52         if r.responsive {
53                 return r.t.torrent.haveChunk(req)
54         }
55         return r.t.torrent.pieceComplete(int(req.Index))
56 }
57
58 // How many bytes are available to read. Max is the most we could require.
59 func (r *Reader) available(off, max int64) (ret int64) {
60         for max > 0 {
61                 req, ok := r.t.torrent.offsetRequest(off)
62                 if !ok {
63                         break
64                 }
65                 if !r.t.torrent.haveChunk(req) {
66                         break
67                 }
68                 len1 := int64(req.Length) - (off - r.t.torrent.requestOffset(req))
69                 max -= len1
70                 ret += len1
71                 off += len1
72         }
73         // Ensure that ret hasn't exceeded our original max.
74         if max < 0 {
75                 ret += max
76         }
77         return
78 }
79
80 func (r *Reader) tickleClient() {
81         r.t.torrent.readersChanged()
82 }
83
84 func (r *Reader) waitReadable(off int64) {
85         // We may have been sent back here because we were told we could read but
86         // it failed.
87         r.tickleClient()
88         r.t.cl.event.Wait()
89 }
90
91 func (r *Reader) Read(b []byte) (n int, err error) {
92         r.opMu.Lock()
93         defer r.opMu.Unlock()
94         for len(b) != 0 {
95                 var n1 int
96                 n1, err = r.readOnceAt(b, r.pos)
97                 if n1 == 0 {
98                         if err == nil {
99                                 panic("expected error")
100                         }
101                         break
102                 }
103                 b = b[n1:]
104                 n += n1
105                 r.mu.Lock()
106                 r.pos += int64(n1)
107                 r.mu.Unlock()
108         }
109         if r.pos >= r.t.torrent.length {
110                 err = io.EOF
111         } else if err == io.EOF {
112                 err = io.ErrUnexpectedEOF
113         }
114         return
115 }
116
117 // Safe to call with or without client lock.
118 func (r *Reader) torrentClosed() bool {
119         return r.t.torrent.isClosed()
120 }
121
122 // Wait until some data should be available to read. Tickles the client if it
123 // isn't. Returns how much should be readable without blocking.
124 func (r *Reader) waitAvailable(pos, wanted int64) (avail int64) {
125         r.t.cl.mu.Lock()
126         defer r.t.cl.mu.Unlock()
127         for !r.readable(pos) {
128                 r.waitReadable(pos)
129         }
130         return r.available(pos, wanted)
131 }
132
133 // Performs at most one successful read to torrent storage.
134 func (r *Reader) readOnceAt(b []byte, pos int64) (n int, err error) {
135         if pos >= r.t.torrent.length {
136                 err = io.EOF
137                 return
138         }
139         for {
140                 avail := r.waitAvailable(pos, int64(len(b)))
141                 if avail == 0 {
142                         if r.torrentClosed() {
143                                 err = errors.New("torrent closed")
144                                 return
145                         }
146                 }
147                 b1 := b[:avail]
148                 pi := int(pos / r.t.Info().PieceLength)
149                 ip := r.t.Info().Piece(pi)
150                 po := pos % ip.Length()
151                 if int64(len(b1)) > ip.Length()-po {
152                         b1 = b1[:ip.Length()-po]
153                 }
154                 n, err = r.t.torrent.readAt(b1, pos)
155                 if n != 0 {
156                         return
157                 }
158                 // log.Printf("%s: error reading from torrent storage pos=%d: %s", r.t, pos, err)
159                 r.t.cl.mu.Lock()
160                 r.t.torrent.updateAllPieceCompletions()
161                 r.t.torrent.updatePiecePriorities()
162                 r.t.cl.mu.Unlock()
163         }
164 }
165
166 func (r *Reader) Close() error {
167         r.t.deleteReader(r)
168         r.t = nil
169         return nil
170 }
171
172 func (r *Reader) posChanged() {
173         r.t.cl.mu.Lock()
174         defer r.t.cl.mu.Unlock()
175         r.t.torrent.readersChanged()
176 }
177
178 func (r *Reader) Seek(off int64, whence int) (ret int64, err error) {
179         r.opMu.Lock()
180         defer r.opMu.Unlock()
181
182         r.mu.Lock()
183         switch whence {
184         case os.SEEK_SET:
185                 r.pos = off
186         case os.SEEK_CUR:
187                 r.pos += off
188         case os.SEEK_END:
189                 r.pos = r.t.torrent.Info.TotalLength() + off
190         default:
191                 err = errors.New("bad whence")
192         }
193         ret = r.pos
194         r.mu.Unlock()
195
196         r.posChanged()
197         return
198 }
199
200 func (r *Reader) Torrent() *Torrent {
201         return r.t
202 }