Had several weeks of testing. Removes a lot of the "helper" reading methods, but this was necessary to allow per-Torrent reading options.
* Track upload and download data.
- * Emulate a UDP server in the UDP tracker tests.
- * Make use of sparse file regions in download data for faster hashing.
- * If we're choked and interested, we never send not-interested if there's nothing we want?
- * Don't announce torrents that don't need active peers. It spams UDP, fills memory, and publicizes what we have loaded.
- * Randomize triedAddrs bloom filter to allow different Addr sets on each Announce.
- * When lots of good connections, it'll do a huge readahead, then refuse to trickle new pieces because we sent not interested to them all, thereby reducing the number of unchoked connections.
\ No newline at end of file
+ * Emulate a UDP server in the UDP tracker tests rather than communicating with the Internet.
+ * Make use of sparse file regions in download data for faster hashing. This is available as whence 3 and 4 on some OS?
+ * When we're choked and interested, are we not interested if there's no longer anything that we want?
+ * dht: Randomize triedAddrs bloom filter to allow different Addr sets on each Announce.
+ * dht: Verify that the triedAddrs bloom filter is working well, github's willf made a bunch of changes.
+ * Rearrange the local-peer choked/interested status flags to be more natural to read.
+ * Check that pruning is working correctly. worstConns sorting might need an adjustment to how it factors in the good/unwanted chunks ratio.
+ * data/blob: Deleting incomplete data triggers io.ErrUnexpectedEOF that isn't recovered from.
+ * Responsive reader needs to apply some readahead.
\ No newline at end of file
}
}
-// Read torrent data at the given offset. Will block until it is available.
-func (cl *Client) torrentReadAt(t *torrent, off int64, p []byte) (n int, err error) {
- cl.mu.Lock()
- defer cl.mu.Unlock()
- index := int(off / int64(t.usualPieceSize()))
- // Reading outside the bounds of a file is an error.
- if index < 0 {
- err = os.ErrInvalid
- return
- }
- if int(index) >= len(t.Pieces) {
- err = io.EOF
- return
- }
- pieceOff := pp.Integer(off % int64(t.usualPieceSize()))
- pieceLeft := int(t.pieceLength(index) - pieceOff)
- if pieceLeft <= 0 {
- err = io.EOF
- return
- }
- if len(p) > pieceLeft {
- p = p[:pieceLeft]
- }
- if len(p) == 0 {
- panic(len(p))
- }
- // TODO: ReadAt should always try to fill the buffer.
- for {
- avail := cl.prepareRead(t, off)
- if avail < int64(len(p)) {
- p = p[:avail]
- }
- n, err = dataReadAt(t.data, p, off)
- if n != 0 || err != io.ErrUnexpectedEOF {
- break
- }
- // If we reach here, the data we thought was ready, isn't. So we
- // prepare it again, and retry.
- }
- return
-}
-
-// Sets priorities to download from the given offset. Returns when the piece
-// at the given offset can be read. Returns the number of bytes that are
-// immediately available from the offset.
-func (cl *Client) prepareRead(t *torrent, off int64) (n int64) {
- index := int(off / int64(t.usualPieceSize()))
- // Reading outside the bounds of a file is an error.
- if index < 0 || index >= t.numPieces() {
- return
- }
- piece := t.Pieces[index]
- cl.readRaisePiecePriorities(t, off)
- for !t.pieceComplete(index) && !t.isClosed() {
- // This is to prevent being starved if a piece is dropped before we
- // can read it.
- cl.readRaisePiecePriorities(t, off)
- piece.Event.Wait()
- }
- return t.Info.Piece(index).Length() - off%t.Info.PieceLength
-}
-
-func (T Torrent) prepareRead(off int64) (avail int64) {
- T.cl.mu.Lock()
- defer T.cl.mu.Unlock()
- return T.cl.prepareRead(T.torrent, off)
-}
-
-// Data implements a streaming interface that's more efficient than ReadAt.
+// A Data that implements this has a streaming interface that should be
+// preferred over ReadAt. For example, the data is stored in blocks on the
+// network and have a fixed cost to open.
type SectionOpener interface {
+ // Open a ReadCloser at the given offset into torrent data. n is how many
+ // bytes we intend to read.
OpenSection(off, n int64) (io.ReadCloser, error)
}
func dataReadAt(d data.Data, b []byte, off int64) (n int, err error) {
+ // defer func() {
+ // if err == io.ErrUnexpectedEOF && n != 0 {
+ // err = nil
+ // }
+ // }()
+ // log.Println("data read at", len(b), off)
again:
if ra, ok := d.(io.ReaderAt); ok {
return ra.ReadAt(b, off)
return int((readahead+pieceLength-1)/pieceLength - 1)
}
-func (cl *Client) readRaisePiecePriorities(t *torrent, off int64) {
+func (cl *Client) readRaisePiecePriorities(t *torrent, off, readaheadBytes int64) {
index := int(off / int64(t.usualPieceSize()))
cl.raisePiecePriority(t, index, piecePriorityNow)
index++
return
}
cl.raisePiecePriority(t, index, piecePriorityNext)
- for range iter.N(readaheadPieces(5*1024*1024, t.Info.PieceLength)) {
+ for range iter.N(readaheadPieces(readaheadBytes, t.Info.PieceLength)) {
index++
if index >= t.numPieces() {
break
}
}
+func (cl *Client) addUrgentRequests(t *torrent, off int64, n int) {
+ for n > 0 {
+ req, ok := t.offsetRequest(off)
+ if !ok {
+ break
+ }
+ if _, ok := t.urgent[req]; !ok && !t.haveChunk(req) {
+ if t.urgent == nil {
+ t.urgent = make(map[request]struct{}, (n+chunkSize-1)/chunkSize)
+ }
+ t.urgent[req] = struct{}{}
+ cl.event.Broadcast() // Why?
+ index := int(req.Index)
+ cl.queueFirstHash(t, index)
+ cl.pieceChanged(t, index)
+ }
+ reqOff := t.requestOffset(req)
+ n1 := req.Length - pp.Integer(off-reqOff)
+ off += int64(n1)
+ n -= int(n1)
+ }
+ // log.Print(t.urgent)
+}
+
func (cl *Client) configDir() string {
if cl._configDir == "" {
return filepath.Join(os.Getenv("HOME"), ".config/torrent")
dhtCfg.Conn = cl.utpSock.PacketConn()
}
cl.dHT, err = dht.NewServer(dhtCfg)
- if cl.ipBlockList != nil {
- cl.dHT.SetIPBlockList(cl.ipBlockList)
- }
if err != nil {
return
}
+ if cl.ipBlockList != nil {
+ cl.dHT.SetIPBlockList(cl.ipBlockList)
+ }
}
return
if err != nil {
return
}
+ for index := range iter.N(t.numPieces()) {
+ cl.pieceChanged(t, index)
+ }
cl.startTorrent(t)
return
}
t.Trackers = newTrackers
}
-// A handle to a live torrent within a Client.
-type Torrent struct {
- cl *Client
- *torrent
-}
-
// Don't call this before the info is available.
func (t *torrent) BytesCompleted() int64 {
if !t.haveInfo() {
t.cl.mu.Unlock()
}
-// Provides access to regions of torrent data that correspond to its files.
-type File struct {
- t Torrent
- path string
- offset int64
- length int64
- fi metainfo.FileInfo
-}
-
-func (f File) FileInfo() metainfo.FileInfo {
- return f.fi
-}
-
-func (f File) Path() string {
- return f.path
-}
-
// A file-like handle to some torrent data resource.
type Handle interface {
io.Reader
io.ReaderAt
}
-// Implements a Handle within a subsection of another Handle.
-type sectionHandle struct {
- h Handle
- off, n, cur int64
-}
-
-func (me *sectionHandle) Seek(offset int64, whence int) (ret int64, err error) {
- if whence == 0 {
- offset += me.off
- } else if whence == 2 {
- whence = 0
- offset += me.off + me.n
- }
- ret, err = me.h.Seek(offset, whence)
- me.cur = ret
- ret -= me.off
- return
-}
-
-func (me *sectionHandle) Close() error {
- return me.h.Close()
-}
-
-func (me *sectionHandle) Read(b []byte) (n int, err error) {
- max := me.off + me.n - me.cur
- if int64(len(b)) > max {
- b = b[:max]
- }
- n, err = me.h.Read(b)
- me.cur += int64(n)
- if err != nil {
- return
- }
- if me.cur == me.off+me.n {
- err = io.EOF
- }
- return
-}
-
-func (me *sectionHandle) ReadAt(b []byte, off int64) (n int, err error) {
- if off >= me.n {
- err = io.EOF
- return
- }
- if int64(len(b)) >= me.n-off {
- b = b[:me.n-off]
- }
- return me.h.ReadAt(b, me.off+off)
-}
-
-func (f File) Open() (h Handle, err error) {
- h = f.t.NewReadHandle()
- _, err = h.Seek(f.offset, os.SEEK_SET)
- if err != nil {
- h.Close()
- return
- }
- h = §ionHandle{h, f.offset, f.Length(), f.offset}
- return
-}
-
-func (f File) ReadAt(p []byte, off int64) (n int, err error) {
- maxLen := f.length - off
- if int64(len(p)) > maxLen {
- p = p[:maxLen]
- }
- return f.t.ReadAt(p, off+f.offset)
-}
-
-func (f *File) Length() int64 {
- return f.length
-}
-
-type FilePieceState struct {
- Length int64
- State byte
-}
-
-func (f *File) Progress() (ret []FilePieceState) {
- pieceSize := int64(f.t.usualPieceSize())
- off := f.offset % pieceSize
- remaining := f.length
- for i := int(f.offset / pieceSize); ; i++ {
- if remaining == 0 {
- break
- }
- len1 := pieceSize - off
- if len1 > remaining {
- len1 = remaining
- }
- ret = append(ret, FilePieceState{len1, f.t.pieceStatusChar(i)})
- off = 0
- remaining -= len1
- }
- return
-}
-
-func (f *File) PrioritizeRegion(off, len int64) {
- if off < 0 || off >= f.length {
- return
- }
- if off+len > f.length {
- len = f.length - off
- }
- off += f.offset
- f.t.SetRegionPriority(off, len)
-}
-
// Returns handles to the files in the torrent. This requires the metainfo is
// available first.
func (t Torrent) Files() (ret []File) {
t.cl.raisePiecePriority(t.torrent, t.numPieces()-1, piecePriorityReadahead)
}
-func (me Torrent) ReadAt(p []byte, off int64) (n int, err error) {
- return me.cl.torrentReadAt(me.torrent, off, p)
-}
-
// Returns nil metainfo if it isn't in the cache. Checks that the retrieved
// metainfo has the correct infohash.
func (cl *Client) torrentCacheMetaInfo(ih InfoHash) (mi *metainfo.MetaInfo, err error) {
}
}
addRequest := func(req request) (again bool) {
+ // TODO: Couldn't this check also be done *after* the request?
if len(c.Requests) >= 64 {
return false
}
return c.Request(req)
}
+ for req := range t.urgent {
+ if !addRequest(req) {
+ return
+ }
+ }
for e := c.pieceRequestOrder.First(); e != nil; e = e.Next() {
pieceIndex := e.Piece()
if !c.PeerHasPiece(pieceIndex) {
piece := t.Pieces[req.Index]
// Do we actually want this chunk?
- if _, ok := piece.PendingChunkSpecs[req.chunkSpec]; !ok || piece.Priority == piecePriorityNone {
+ if !t.wantChunk(req) {
unusedDownloadedChunksCount.Add(1)
c.UnwantedChunksReceived++
return nil
return fmt.Errorf("error writing chunk: %s", err)
}
+ // log.Println("got chunk", req)
+ piece.Event.Broadcast()
// Record that we have the chunk.
delete(piece.PendingChunkSpecs, req.chunkSpec)
+ delete(t.urgent, req)
if len(piece.PendingChunkSpecs) == 0 {
for _, c := range t.Conns {
c.pieceRequestOrder.DeletePiece(int(req.Index))
me.pieceChanged(t, int(piece))
}
+// TODO: Check this isn't called more than once for each piece being correct.
func (me *Client) pieceChanged(t *torrent, piece int) {
correct := t.pieceComplete(piece)
p := t.Pieces[piece]
if correct {
p.Priority = piecePriorityNone
p.PendingChunkSpecs = nil
+ for req := range t.urgent {
+ if int(req.Index) == piece {
+ delete(t.urgent, req)
+ }
+ }
p.Event.Broadcast()
} else {
if len(p.PendingChunkSpecs) == 0 {
t.pendAllChunkSpecs(int(piece))
}
- if p.Priority != piecePriorityNone {
+ if t.wantPiece(piece) {
me.openNewConns(t)
}
}
import (
"encoding/binary"
"fmt"
- "io"
"io/ioutil"
"log"
"net"
Port: util.AddrPort(seeder.ListenAddr()),
},
})
- _greeting, err := ioutil.ReadAll(io.NewSectionReader(leecherGreeting, 0, leecherGreeting.Length()))
+ r := leecherGreeting.NewReader()
+ defer r.Close()
+ _greeting, err := ioutil.ReadAll(r)
if err != nil {
- t.Fatal(err)
+ t.Fatalf("%q %s", string(_greeting), err)
}
greeting := string(_greeting)
if greeting != testutil.GreetingFileContents {
return
}
pp := cn.piecePriorities[piece]
- // Priority goes to Now, then Next in connection order. Then Readahead in
- // by piece index. Then normal again by connection order.
+ // Priority regions not to scale. Within each region, piece is randomized
+ // according to connection.
+
+ // [ Now ]
+ // [ Next ]
+ // [ Readahead ]
+ // [ Normal ]
key := func() int {
switch priority {
case piecePriorityNow:
import (
"encoding/hex"
"io"
+ "log"
"github.com/anacrolix/libtorgo/metainfo"
)
func (me *data) Close() {}
func (me *data) ReadAt(b []byte, off int64) (n int, err error) {
- p := me.info.Piece(int(off / me.info.PieceLength))
- f := me.store.pieceRead(p)
- if f == nil {
- err = io.ErrUnexpectedEOF
- return
- }
- defer f.Close()
- n, err = f.ReadAt(b, off%me.info.PieceLength)
- if err == io.EOF {
- err = io.ErrUnexpectedEOF
+ for len(b) != 0 {
+ if off >= me.info.TotalLength() {
+ err = io.EOF
+ break
+ }
+ p := me.info.Piece(int(off / me.info.PieceLength))
+ f := me.store.pieceRead(p)
+ if f == nil {
+ log.Println("piece not found", p)
+ err = io.ErrUnexpectedEOF
+ break
+ }
+ b1 := b
+ maxN1 := int(p.Length() - off%me.info.PieceLength)
+ if len(b1) > maxN1 {
+ b1 = b1[:maxN1]
+ }
+ var n1 int
+ n1, err = f.ReadAt(b1, off%me.info.PieceLength)
+ f.Close()
+ n += n1
+ off += int64(n1)
+ b = b[n1:]
+ if err == io.EOF {
+ err = nil
+ break
+ }
+ if err != nil {
+ break
+ }
}
return
}
--- /dev/null
+package torrent
+
+import "github.com/anacrolix/libtorgo/metainfo"
+
+// Provides access to regions of torrent data that correspond to its files.
+type File struct {
+ t Torrent
+ path string
+ offset int64
+ length int64
+ fi metainfo.FileInfo
+}
+
+// Data for this file begins this far into the torrent.
+func (f *File) Offset() int64 {
+ return f.offset
+}
+
+func (f File) FileInfo() metainfo.FileInfo {
+ return f.fi
+}
+
+func (f File) Path() string {
+ return f.path
+}
+
+func (f *File) Length() int64 {
+ return f.length
+}
+
+type FilePieceState struct {
+ Length int64
+ State byte
+}
+
+func (f *File) Progress() (ret []FilePieceState) {
+ pieceSize := int64(f.t.usualPieceSize())
+ off := f.offset % pieceSize
+ remaining := f.length
+ for i := int(f.offset / pieceSize); ; i++ {
+ if remaining == 0 {
+ break
+ }
+ len1 := pieceSize - off
+ if len1 > remaining {
+ len1 = remaining
+ }
+ ret = append(ret, FilePieceState{len1, f.t.pieceStatusChar(i)})
+ off = 0
+ remaining -= len1
+ }
+ return
+}
+
+func (f *File) PrioritizeRegion(off, len int64) {
+ if off < 0 || off >= f.length {
+ return
+ }
+ if off+len > f.length {
+ len = f.length - off
+ }
+ off += f.offset
+ f.t.SetRegionPriority(off, len)
+}
)
readDone := make(chan struct{})
go func() {
- _n, _err = t.ReadAt(p, off)
+ r := t.NewReader()
+ defer r.Close()
+ _n, _err = r.ReadAt(p, off)
+ log.Println(_n, p)
close(readDone)
}()
select {
--- /dev/null
+package torrent
+
+import (
+ "errors"
+ "io"
+ "os"
+)
+
+// Accesses torrent data via a client.
+type Reader struct {
+ t *Torrent
+ pos int64
+ responsive bool
+ readahead int64
+}
+
+var _ io.ReadCloser = &Reader{}
+
+// Don't wait for pieces to complete and be verified. Read calls return as
+// soon as they can when the underlying chunks become available.
+func (r *Reader) SetResponsive() {
+ r.responsive = true
+}
+
+func (r *Reader) SetReadahead(readahead int64) {
+ r.readahead = readahead
+}
+
+func (r *Reader) raisePriorities(off int64, n int) {
+ if r.responsive {
+ r.t.cl.addUrgentRequests(r.t.torrent, off, n)
+ }
+ r.t.cl.readRaisePiecePriorities(r.t.torrent, off, int64(n)+r.readahead)
+}
+
+func (r *Reader) readable(off int64) (ret bool) {
+ // log.Println("readable", off)
+ // defer func() {
+ // log.Println("readable", ret)
+ // }()
+ req, ok := r.t.offsetRequest(off)
+ if !ok {
+ panic(off)
+ }
+ if r.responsive {
+ return r.t.haveChunk(req)
+ }
+ return r.t.pieceComplete(int(req.Index))
+}
+
+// How many bytes are available to read. Max is the most we could require.
+func (r *Reader) available(off, max int64) (ret int64) {
+ for max > 0 {
+ req, ok := r.t.offsetRequest(off)
+ if !ok {
+ break
+ }
+ if !r.t.haveChunk(req) {
+ break
+ }
+ len1 := int64(req.Length) - (off - r.t.requestOffset(req))
+ max -= len1
+ ret += len1
+ off += len1
+ }
+ return
+}
+
+func (r *Reader) waitReadable(off int64) {
+ r.t.Pieces[off/int64(r.t.usualPieceSize())].Event.Wait()
+}
+
+func (r *Reader) ReadAt(b []byte, off int64) (n int, err error) {
+ return r.readAt(b, off)
+}
+
+func (r *Reader) Read(b []byte) (n int, err error) {
+ n, err = r.readAt(b, r.pos)
+ r.pos += int64(n)
+ if n != 0 && err == io.ErrUnexpectedEOF {
+ err = nil
+ }
+ return
+}
+
+func (r *Reader) readAt(b []byte, pos int64) (n int, err error) {
+ // defer func() {
+ // log.Println(pos, n, err)
+ // }()
+ r.t.cl.mu.Lock()
+ defer r.t.cl.mu.Unlock()
+ maxLen := r.t.Info.TotalLength() - pos
+ if maxLen <= 0 {
+ err = io.EOF
+ return
+ }
+ if int64(len(b)) > maxLen {
+ b = b[:maxLen]
+ }
+ r.raisePriorities(pos, len(b))
+ for !r.readable(pos) {
+ r.raisePriorities(pos, len(b))
+ r.waitReadable(pos)
+ }
+ avail := r.available(pos, int64(len(b)))
+ // log.Println("available", avail)
+ if int64(len(b)) > avail {
+ b = b[:avail]
+ }
+ n, err = dataReadAt(r.t.data, b, pos)
+ return
+}
+
+func (r *Reader) Close() error {
+ r.t = nil
+ return nil
+}
+
+func (r *Reader) Seek(off int64, whence int) (ret int64, err error) {
+ switch whence {
+ case os.SEEK_SET:
+ r.pos = off
+ case os.SEEK_CUR:
+ r.pos += off
+ case os.SEEK_END:
+ r.pos = r.t.Info.TotalLength() + off
+ default:
+ err = errors.New("bad whence")
+ }
+ ret = r.pos
+ return
+}
--- /dev/null
+package torrent
+
+// The public interface for a torrent within a Client.
+
+// A handle to a live torrent within a Client.
+type Torrent struct {
+ cl *Client
+ *torrent
+}
+
+func (t *Torrent) NewReader() (ret *Reader) {
+ ret = &Reader{
+ t: t,
+ readahead: 5 * 1024 * 1024,
+ }
+ return
+}
import (
"container/heap"
- "errors"
"fmt"
"io"
"log"
"net"
- "os"
"sort"
"sync"
"time"
InfoHash InfoHash
Pieces []*piece
+ // Chunks that are wanted before all others. This is for
+ // responsive/streaming readers that want to unblock ASAP.
+ urgent map[request]struct{}
// Total length of the torrent in bytes. Stored because it's not O(1) to
// get this from the info dict.
length int64
return t.data != nil && t.data.PieceComplete(piece)
}
-// A file-like handle to torrent data that implements SectionOpener. Opened
-// sections will be reused so long as Reads and ReadAt's are contiguous.
-type handle struct {
- rc io.ReadCloser
- rcOff int64
- curOff int64
- so SectionOpener
- size int64
- t Torrent
-}
-
-func (h *handle) Close() error {
- if h.rc != nil {
- return h.rc.Close()
- }
- return nil
-}
-
-func (h *handle) ReadAt(b []byte, off int64) (n int, err error) {
- return h.readAt(b, off)
-}
-
-func (h *handle) readAt(b []byte, off int64) (n int, err error) {
- avail := h.t.prepareRead(off)
- if int64(len(b)) > avail {
- b = b[:avail]
- }
- if int64(len(b)) > h.size-off {
- b = b[:h.size-off]
- }
- if h.rcOff != off && h.rc != nil {
- h.rc.Close()
- h.rc = nil
- }
- if h.rc == nil {
- h.rc, err = h.so.OpenSection(off, h.size-off)
- if err != nil {
- return
- }
- h.rcOff = off
- }
- n, err = h.rc.Read(b)
- h.rcOff += int64(n)
- return
-}
-
-func (h *handle) Read(b []byte) (n int, err error) {
- n, err = h.readAt(b, h.curOff)
- h.curOff = h.rcOff
- return
-}
-
-func (h *handle) Seek(off int64, whence int) (newOff int64, err error) {
- switch whence {
- case os.SEEK_SET:
- h.curOff = off
- case os.SEEK_CUR:
- h.curOff += off
- case os.SEEK_END:
- h.curOff = h.size + off
- default:
- err = errors.New("bad whence")
- }
- newOff = h.curOff
- return
-}
-
-// Implements Handle on top of an io.SectionReader.
-type sectionReaderHandle struct {
- *io.SectionReader
-}
-
-func (sectionReaderHandle) Close() error { return nil }
-
-func (T Torrent) NewReadHandle() Handle {
- if so, ok := T.data.(SectionOpener); ok {
- return &handle{
- so: so,
- size: T.Length(),
- t: T,
- }
- }
- return sectionReaderHandle{io.NewSectionReader(T, 0, T.Length())}
-}
-
func (t *torrent) numConnsUnchoked() (num int) {
for _, c := range t.Conns {
if !c.PeerChoked {
for _, c := range t.Conns {
c.Close()
}
- t.pruneTimer.Stop()
+ if t.pruneTimer != nil {
+ t.pruneTimer.Stop()
+ }
}
func (t *torrent) addPeer(p Peer) {
}
fmt.Fprintln(w)
}
+ fmt.Fprintf(w, "Urgent:")
+ for req := range t.urgent {
+ fmt.Fprintf(w, " %s", req)
+ }
+ fmt.Fprintln(w)
fmt.Fprintf(w, "Trackers: ")
for _, tier := range t.Trackers {
for _, tr := range tier {
func (t *torrent) bitfield() (bf []bool) {
for _, p := range t.Pieces {
+ // TODO: Check this logic.
bf = append(bf, p.EverHashed && len(p.PendingChunkSpecs) == 0)
}
return
}
func (t *torrent) haveChunk(r request) bool {
- p := t.Pieces[r.Index]
- if !p.EverHashed {
+ if !t.haveInfo() {
return false
}
- _, ok := p.PendingChunkSpecs[r.chunkSpec]
+ piece := t.Pieces[r.Index]
+ _, ok := piece.PendingChunkSpecs[r.chunkSpec]
+ // log.Println("have chunk", r, !ok)
return !ok
}
return false
}
_, ok := t.Pieces[r.Index].PendingChunkSpecs[r.chunkSpec]
+ if ok {
+ return true
+ }
+ _, ok = t.urgent[r]
return ok
}
+func (t *torrent) urgentChunkInPiece(piece int) bool {
+ for req := range t.urgent {
+ if int(req.Index) == piece {
+ return true
+ }
+ }
+ return false
+}
+
func (t *torrent) wantPiece(index int) bool {
if !t.haveInfo() {
return false
}
p := t.Pieces[index]
- // Put piece complete check last, since it's the slowest!
- return p.Priority != piecePriorityNone && !p.QueuedForHash && !p.Hashing && !t.pieceComplete(index)
+ if p.QueuedForHash {
+ return false
+ }
+ if p.Hashing {
+ return false
+ }
+ if p.Priority == piecePriorityNone {
+ if !t.urgentChunkInPiece(index) {
+ return false
+ }
+ }
+ // Put piece complete check last, since it's the slowest as it can involve
+ // calling out into external data stores.
+ return !t.pieceComplete(index)
}
func (t *torrent) connHasWantedPieces(c *connection) bool {