]> Sergey Matveev's repositories - btrtrc.git/blob - reader.go
Update Reader pos between reads
[btrtrc.git] / reader.go
1 package torrent
2
3 import (
4         "errors"
5         "io"
6         "log"
7         "os"
8         "sync"
9 )
10
11 // Accesses torrent data via a client.
12 type Reader struct {
13         t          *Torrent
14         responsive bool
15         opMu       sync.Mutex
16
17         mu        sync.Mutex
18         pos       int64
19         readahead int64
20 }
21
22 var _ io.ReadCloser = &Reader{}
23
24 // Don't wait for pieces to complete and be verified. Read calls return as
25 // soon as they can when the underlying chunks become available.
26 func (r *Reader) SetResponsive() {
27         r.responsive = true
28 }
29
30 // Configure the number of bytes ahead of a read that should also be
31 // prioritized in preparation for further reads.
32 func (r *Reader) SetReadahead(readahead int64) {
33         r.mu.Lock()
34         defer r.mu.Unlock()
35         r.readahead = readahead
36 }
37
38 func (r *Reader) readable(off int64) (ret bool) {
39         if r.torrentClosed() {
40                 return true
41         }
42         req, ok := r.t.torrent.offsetRequest(off)
43         if !ok {
44                 panic(off)
45         }
46         if r.responsive {
47                 return r.t.torrent.haveChunk(req)
48         }
49         return r.t.torrent.pieceComplete(int(req.Index))
50 }
51
52 // How many bytes are available to read. Max is the most we could require.
53 func (r *Reader) available(off, max int64) (ret int64) {
54         for max > 0 {
55                 req, ok := r.t.torrent.offsetRequest(off)
56                 if !ok {
57                         break
58                 }
59                 if !r.t.torrent.haveChunk(req) {
60                         break
61                 }
62                 len1 := int64(req.Length) - (off - r.t.torrent.requestOffset(req))
63                 max -= len1
64                 ret += len1
65                 off += len1
66         }
67         // Ensure that ret hasn't exceeded our original max.
68         if max < 0 {
69                 ret += max
70         }
71         return
72 }
73
74 func (r *Reader) tickleClient() {
75         r.t.torrent.readersChanged()
76 }
77
78 func (r *Reader) waitReadable(off int64) {
79         // We may have been sent back here because we were told we could read but
80         // it failed.
81         r.tickleClient()
82         r.t.cl.event.Wait()
83 }
84
85 func (r *Reader) Read(b []byte) (n int, err error) {
86         r.opMu.Lock()
87         defer r.opMu.Unlock()
88         for len(b) != 0 {
89                 var n1 int
90                 n1, err = r.readOnceAt(b, r.pos)
91                 if n1 == 0 {
92                         if err == nil {
93                                 panic("expected error")
94                         }
95                         break
96                 }
97                 b = b[n1:]
98                 n += n1
99                 r.pos += int64(n1)
100         }
101         if r.pos >= r.t.torrent.length {
102                 err = io.EOF
103         } else if err == io.EOF {
104                 err = io.ErrUnexpectedEOF
105         }
106         return
107 }
108
109 // Safe to call with or without client lock.
110 func (r *Reader) torrentClosed() bool {
111         return r.t.torrent.isClosed()
112 }
113
114 // Wait until some data should be available to read. Tickles the client if it
115 // isn't. Returns how much should be readable without blocking.
116 func (r *Reader) waitAvailable(pos, wanted int64) (avail int64) {
117         r.t.cl.mu.Lock()
118         defer r.t.cl.mu.Unlock()
119         for !r.readable(pos) {
120                 r.waitReadable(pos)
121         }
122         return r.available(pos, wanted)
123 }
124
125 // Performs at most one successful read to torrent storage.
126 func (r *Reader) readOnceAt(b []byte, pos int64) (n int, err error) {
127         if pos >= r.t.torrent.length {
128                 err = io.EOF
129                 return
130         }
131         for {
132                 avail := r.waitAvailable(pos, int64(len(b)))
133                 if avail == 0 {
134                         if r.torrentClosed() {
135                                 err = errors.New("torrent closed")
136                                 return
137                         }
138                 }
139                 b1 := b[:avail]
140                 pi := int(pos / r.t.Info().PieceLength)
141                 ip := r.t.Info().Piece(pi)
142                 po := pos % ip.Length()
143                 if int64(len(b1)) > ip.Length()-po {
144                         b1 = b1[:ip.Length()-po]
145                 }
146                 n, err = r.t.torrent.readAt(b1, pos)
147                 if n != 0 {
148                         return
149                 }
150                 // log.Printf("%s: error reading from torrent storage pos=%d: %s", r.t, pos, err)
151                 r.t.cl.mu.Lock()
152                 r.t.torrent.updateAllPieceCompletions()
153                 r.t.torrent.updatePiecePriorities()
154                 r.t.cl.mu.Unlock()
155         }
156 }
157
158 func (r *Reader) Close() error {
159         r.t.deleteReader(r)
160         r.t = nil
161         return nil
162 }
163
164 func (r *Reader) posChanged() {
165         r.t.cl.mu.Lock()
166         defer r.t.cl.mu.Unlock()
167         r.t.torrent.readersChanged()
168 }
169
170 func (r *Reader) Seek(off int64, whence int) (ret int64, err error) {
171         r.mu.Lock()
172         switch whence {
173         case os.SEEK_SET:
174                 r.pos = off
175         case os.SEEK_CUR:
176                 r.pos += off
177         case os.SEEK_END:
178                 r.pos = r.t.torrent.Info.TotalLength() + off
179         default:
180                 err = errors.New("bad whence")
181         }
182         ret = r.pos
183         r.mu.Unlock()
184         r.posChanged()
185         return
186 }