9 "github.com/anacrolix/missinggo"
12 // Accesses torrent data via a client.
16 // Ensure operations that change the position are exclusive, like Read()
20 // Required when modifying pos and readahead, or reading them without
27 var _ io.ReadCloser = &Reader{}
29 // Don't wait for pieces to complete and be verified. Read calls return as
30 // soon as they can when the underlying chunks become available.
31 func (r *Reader) SetResponsive() {
35 // Configure the number of bytes ahead of a read that should also be
36 // prioritized in preparation for further reads.
37 func (r *Reader) SetReadahead(readahead int64) {
39 r.readahead = readahead
42 defer r.t.cl.mu.Unlock()
46 func (r *Reader) readable(off int64) (ret bool) {
47 if r.torrentClosed() {
50 req, ok := r.t.torrent.offsetRequest(off)
55 return r.t.torrent.haveChunk(req)
57 return r.t.torrent.pieceComplete(int(req.Index))
60 // How many bytes are available to read. Max is the most we could require.
61 func (r *Reader) available(off, max int64) (ret int64) {
63 req, ok := r.t.torrent.offsetRequest(off)
67 if !r.t.torrent.haveChunk(req) {
70 len1 := int64(req.Length) - (off - r.t.torrent.requestOffset(req))
75 // Ensure that ret hasn't exceeded our original max.
82 func (r *Reader) tickleClient() {
83 r.t.torrent.readersChanged()
86 func (r *Reader) waitReadable(off int64) {
87 // We may have been sent back here because we were told we could read but
93 func (r *Reader) Read(b []byte) (n int, err error) {
98 n1, err = r.readOnceAt(b, r.pos)
101 panic("expected error")
111 if r.pos >= r.t.torrent.length {
113 } else if err == io.EOF {
114 err = io.ErrUnexpectedEOF
119 // Safe to call with or without client lock.
120 func (r *Reader) torrentClosed() bool {
121 return r.t.torrent.isClosed()
124 // Wait until some data should be available to read. Tickles the client if it
125 // isn't. Returns how much should be readable without blocking.
126 func (r *Reader) waitAvailable(pos, wanted int64) (avail int64) {
128 defer r.t.cl.mu.Unlock()
129 for !r.readable(pos) {
132 return r.available(pos, wanted)
135 // Performs at most one successful read to torrent storage.
136 func (r *Reader) readOnceAt(b []byte, pos int64) (n int, err error) {
137 if pos >= r.t.torrent.length {
142 avail := r.waitAvailable(pos, int64(len(b)))
144 if r.torrentClosed() {
145 err = errors.New("torrent closed")
150 pi := int(pos / r.t.Info().PieceLength)
151 ip := r.t.Info().Piece(pi)
152 po := pos % ip.Length()
153 missinggo.LimitLen(&b1, ip.Length()-po)
154 n, err = r.t.torrent.readAt(b1, pos)
159 // log.Printf("%s: error reading from torrent storage pos=%d: %s", r.t, pos, err)
161 r.t.torrent.updateAllPieceCompletions()
162 r.t.torrent.updatePiecePriorities()
167 func (r *Reader) Close() error {
173 func (r *Reader) posChanged() {
175 defer r.t.cl.mu.Unlock()
176 r.t.torrent.readersChanged()
179 func (r *Reader) Seek(off int64, whence int) (ret int64, err error) {
181 defer r.opMu.Unlock()
190 r.pos = r.t.torrent.Info.TotalLength() + off
192 err = errors.New("bad whence")
201 func (r *Reader) Torrent() *Torrent {