"expvar"
"fmt"
"io"
- "io/ioutil"
"log"
"math/big"
mathRand "math/rand"
"github.com/anacrolix/missinggo/pubsub"
"github.com/anacrolix/sync"
"github.com/anacrolix/utp"
+ "github.com/bradfitz/iter"
"github.com/edsrzf/mmap-go"
"github.com/anacrolix/torrent/bencode"
func (t Torrent) DownloadAll() {
t.cl.mu.Lock()
defer t.cl.mu.Unlock()
- for i := range iter.N(t.torrent.numPieces()) {
- t.cl.raisePiecePriority(t.torrent, i, PiecePriorityNormal)
+ for i := range iter.N(t.torrent.Info.NumPieces()) {
+ t.torrent.pendPiece(i, t.cl)
}
- // Nice to have the first and last pieces sooner for various interactive
- // purposes.
- t.cl.raisePiecePriority(t.torrent, 0, PiecePriorityReadahead)
- t.cl.raisePiecePriority(t.torrent, t.torrent.numPieces()-1, PiecePriorityReadahead)
}
// Returns nil metainfo if it isn't in the cache. Checks that the retrieved
return true
}
-func (me *Client) connAddRequest(c *connection, req request) (more bool) {
- if len(c.Requests) >= 64 {
- return false
- }
- more = c.Request(req)
- return
-}
-
-func (me *Client) connRequestPiecePendingChunks(c *connection, t *torrent, piece int) (more bool) {
- for _, cs := range t.Pieces[piece].shuffledPendingChunkSpecs(t, piece) {
- req := request{pp.Integer(piece), cs}
- if !me.connAddRequest(c, req) {
- return false
- }
- }
- return true
-}
-
-func (me *Client) fillRequests(t *torrent, c *connection) {
- if c.Interested {
- if c.PeerChoked {
- return
- }
- if len(c.Requests) > c.requestsLowWater {
- return
- }
- }
- if !t.forUrgentPieces(func(piece int) (again bool) {
- if !c.PeerHasPiece(piece) {
- return true
- }
- return me.connRequestPiecePendingChunks(c, t, piece)
- }) {
- return
- }
- t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
- for i := begin + 1; i < end; i++ {
- if !c.PeerHasPiece(i) {
- continue
- }
- if !me.connRequestPiecePendingChunks(c, t, i) {
- return false
- }
- }
- return true
- })
-}
-
func (me *Client) replenishConnRequests(t *torrent, c *connection) {
if !t.haveInfo() {
return
}
- me.fillRequests(t, c)
+ t.fillRequests(c)
if len(c.Requests) == 0 && !c.PeerChoked {
// So we're not choked, but we don't want anything right now. We may
// have completed readahead, and the readahead window has not rolled
me.pieceChanged(t, int(piece))
}
-func (me *Client) pieceChanged(t *torrent, piece int) {
- correct := t.pieceComplete(piece)
- defer t.publishPieceChange(piece)
- defer me.event.Broadcast()
- if !correct {
- if t.pieceAllDirty(piece) {
- t.pendAllChunkSpecs(piece)
- }
- if t.wantPiece(piece) {
- me.openNewConns(t)
+func (me *Client) onCompletedPiece(t *torrent, piece int) {
+ for _, conn := range t.Conns {
+ conn.Have(piece)
+ for r := range conn.Requests {
+ if int(r.Index) == piece {
+ conn.Cancel(r)
+ }
}
+ // Could check here if peer doesn't have piece, but due to caching
+ // some peers may have said they have a piece but they don't.
+ me.upload(t, conn)
+ }
+}
+
+func (me *Client) onFailedPiece(t *torrent, piece int) {
+ if t.pieceAllDirty(piece) {
+ t.pendAllChunkSpecs(piece)
}
+ if !t.wantPiece(piece) {
+ return
+ }
+ me.openNewConns(t)
for _, conn := range t.Conns {
- if correct {
- conn.Have(piece)
- for r := range conn.Requests {
- if int(r.Index) == piece {
- conn.Cancel(r)
- }
- }
- me.upload(t, conn)
- } else if t.wantPiece(piece) && conn.PeerHasPiece(piece) {
+ if conn.PeerHasPiece(piece) {
me.replenishConnRequests(t, conn)
}
}
- me.event.Broadcast()
+}
+
+func (me *Client) pieceChanged(t *torrent, piece int) {
+ correct := t.pieceComplete(piece)
+ defer t.publishPieceChange(piece)
+ defer me.event.Broadcast()
+ if correct {
+ me.onCompletedPiece(t, piece)
+ } else {
+ me.onFailedPiece(t, piece)
+ }
}
func (cl *Client) verifyPiece(t *torrent, piece int) {
gotMetainfo chan struct{}
readers map[*Reader]struct{}
+
+ pendingPieces map[int]struct{}
}
var (
if t.pieceComplete(piece) {
return
}
+ if _, ok := t.pendingPieces[piece]; ok {
+ ret = PiecePriorityNormal
+ }
raiseRet := func(prio piecePriority) {
if prio > ret {
ret = prio
})
return
}
+
+func (t *torrent) pendPiece(piece int, cl *Client) {
+ if t.pendingPieces == nil {
+ t.pendingPieces = make(map[int]struct{}, t.Info.NumPieces())
+ }
+ if _, ok := t.pendingPieces[piece]; ok {
+ return
+ }
+ if t.havePiece(piece) {
+ return
+ }
+ t.pendingPieces[piece] = struct{}{}
+ for _, c := range t.Conns {
+ if !c.PeerHasPiece(piece) {
+ continue
+ }
+
+ }
+}
+
+func (t *torrent) connRequestPiecePendingChunks(c *connection, piece int) (more bool) {
+ if !c.PeerHasPiece(piece) {
+ return true
+ }
+ for _, cs := range t.Pieces[piece].shuffledPendingChunkSpecs(t, piece) {
+ req := request{pp.Integer(piece), cs}
+ if !c.Request(req) {
+ return false
+ }
+ }
+ return true
+}
+
+func (t *torrent) fillRequests(c *connection) {
+ if c.Interested {
+ if c.PeerChoked {
+ return
+ }
+ if len(c.Requests) > c.requestsLowWater {
+ return
+ }
+ }
+ if !t.forUrgentPieces(func(piece int) (again bool) {
+ return t.connRequestPiecePendingChunks(c, piece)
+ }) {
+ return
+ }
+ t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
+ for i := begin + 1; i < end; i++ {
+ if !t.connRequestPiecePendingChunks(c, i) {
+ return false
+ }
+ }
+ return true
+ })
+ for i := range t.pendingPieces {
+ if !t.connRequestPiecePendingChunks(c, i) {
+ return
+ }
+ }
+}