10 // Accesses torrent data via a client.
14 // Ensure operations that change the position are exclusive, like Read()
18 // Required when modifying pos and readahead, or reading them without
25 var _ io.ReadCloser = &Reader{}
27 // Don't wait for pieces to complete and be verified. Read calls return as
28 // soon as they can when the underlying chunks become available.
29 func (r *Reader) SetResponsive() {
33 // Configure the number of bytes ahead of a read that should also be
34 // prioritized in preparation for further reads.
35 func (r *Reader) SetReadahead(readahead int64) {
38 r.readahead = readahead
41 func (r *Reader) readable(off int64) (ret bool) {
42 if r.torrentClosed() {
45 req, ok := r.t.torrent.offsetRequest(off)
50 return r.t.torrent.haveChunk(req)
52 return r.t.torrent.pieceComplete(int(req.Index))
55 // How many bytes are available to read. Max is the most we could require.
56 func (r *Reader) available(off, max int64) (ret int64) {
58 req, ok := r.t.torrent.offsetRequest(off)
62 if !r.t.torrent.haveChunk(req) {
65 len1 := int64(req.Length) - (off - r.t.torrent.requestOffset(req))
70 // Ensure that ret hasn't exceeded our original max.
77 func (r *Reader) tickleClient() {
78 r.t.torrent.readersChanged()
81 func (r *Reader) waitReadable(off int64) {
82 // We may have been sent back here because we were told we could read but
88 func (r *Reader) Read(b []byte) (n int, err error) {
93 n1, err = r.readOnceAt(b, r.pos)
96 panic("expected error")
106 if r.pos >= r.t.torrent.length {
108 } else if err == io.EOF {
109 err = io.ErrUnexpectedEOF
114 // Safe to call with or without client lock.
115 func (r *Reader) torrentClosed() bool {
116 return r.t.torrent.isClosed()
119 // Wait until some data should be available to read. Tickles the client if it
120 // isn't. Returns how much should be readable without blocking.
121 func (r *Reader) waitAvailable(pos, wanted int64) (avail int64) {
123 defer r.t.cl.mu.Unlock()
124 for !r.readable(pos) {
127 return r.available(pos, wanted)
130 // Performs at most one successful read to torrent storage.
131 func (r *Reader) readOnceAt(b []byte, pos int64) (n int, err error) {
132 if pos >= r.t.torrent.length {
137 avail := r.waitAvailable(pos, int64(len(b)))
139 if r.torrentClosed() {
140 err = errors.New("torrent closed")
145 pi := int(pos / r.t.Info().PieceLength)
146 ip := r.t.Info().Piece(pi)
147 po := pos % ip.Length()
148 if int64(len(b1)) > ip.Length()-po {
149 b1 = b1[:ip.Length()-po]
151 n, err = r.t.torrent.readAt(b1, pos)
155 // log.Printf("%s: error reading from torrent storage pos=%d: %s", r.t, pos, err)
157 r.t.torrent.updateAllPieceCompletions()
158 r.t.torrent.updatePiecePriorities()
163 func (r *Reader) Close() error {
169 func (r *Reader) posChanged() {
171 defer r.t.cl.mu.Unlock()
172 r.t.torrent.readersChanged()
175 func (r *Reader) Seek(off int64, whence int) (ret int64, err error) {
177 defer r.opMu.Unlock()
186 r.pos = r.t.torrent.Info.TotalLength() + off
188 err = errors.New("bad whence")
197 func (r *Reader) Torrent() *Torrent {