client.go | 112 +++++++++++++++++------------------------------------ example_test.go | 5 +++-- file.go | 13 ++----------- t.go | 8 ++++++++ torrent.go | 66 +++++++++++++++++++++++++++++++++++++++++++++++++++++ diff --git a/client.go b/client.go index de9e4f6dcd221d7f4e92f08ce7f0be22cbb05c28..32134381f9aead38f579d47510ed0a716b285f97 100644 --- a/client.go +++ b/client.go @@ -10,7 +10,6 @@ "errors" "expvar" "fmt" "io" - "io/ioutil" "log" "math/big" mathRand "math/rand" @@ -29,6 +28,7 @@ "github.com/anacrolix/missinggo/perf" "github.com/anacrolix/missinggo/pubsub" "github.com/anacrolix/sync" "github.com/anacrolix/utp" + "github.com/bradfitz/iter" "github.com/edsrzf/mmap-go" "github.com/anacrolix/torrent/bencode" @@ -1963,13 +1963,9 @@ // GotInfo. func (t Torrent) DownloadAll() { t.cl.mu.Lock() defer t.cl.mu.Unlock() - for i := range iter.N(t.torrent.numPieces()) { - t.cl.raisePiecePriority(t.torrent, i, PiecePriorityNormal) + for i := range iter.N(t.torrent.Info.NumPieces()) { + t.torrent.pendPiece(i, t.cl) } - // Nice to have the first and last pieces sooner for various interactive - // purposes. - t.cl.raisePiecePriority(t.torrent, 0, PiecePriorityReadahead) - t.cl.raisePiecePriority(t.torrent, t.torrent.numPieces()-1, PiecePriorityReadahead) } // Returns nil metainfo if it isn't in the cache. Checks that the retrieved @@ -2366,59 +2362,11 @@ } return true } -func (me *Client) connAddRequest(c *connection, req request) (more bool) { - if len(c.Requests) >= 64 { - return false - } - more = c.Request(req) - return -} - -func (me *Client) connRequestPiecePendingChunks(c *connection, t *torrent, piece int) (more bool) { - for _, cs := range t.Pieces[piece].shuffledPendingChunkSpecs(t, piece) { - req := request{pp.Integer(piece), cs} - if !me.connAddRequest(c, req) { - return false - } - } - return true -} - -func (me *Client) fillRequests(t *torrent, c *connection) { - if c.Interested { - if c.PeerChoked { - return - } - if len(c.Requests) > c.requestsLowWater { - return - } - } - if !t.forUrgentPieces(func(piece int) (again bool) { - if !c.PeerHasPiece(piece) { - return true - } - return me.connRequestPiecePendingChunks(c, t, piece) - }) { - return - } - t.forReaderWantedRegionPieces(func(begin, end int) (again bool) { - for i := begin + 1; i < end; i++ { - if !c.PeerHasPiece(i) { - continue - } - if !me.connRequestPiecePendingChunks(c, t, i) { - return false - } - } - return true - }) -} - func (me *Client) replenishConnRequests(t *torrent, c *connection) { if !t.haveInfo() { return } - me.fillRequests(t, c) + t.fillRequests(c) if len(c.Requests) == 0 && !c.PeerChoked { // So we're not choked, but we don't want anything right now. We may // have completed readahead, and the readahead window has not rolled @@ -2547,32 +2495,44 @@ } me.pieceChanged(t, int(piece)) } -func (me *Client) pieceChanged(t *torrent, piece int) { - correct := t.pieceComplete(piece) - defer t.publishPieceChange(piece) - defer me.event.Broadcast() - if !correct { - if t.pieceAllDirty(piece) { - t.pendAllChunkSpecs(piece) - } - if t.wantPiece(piece) { - me.openNewConns(t) +func (me *Client) onCompletedPiece(t *torrent, piece int) { + for _, conn := range t.Conns { + conn.Have(piece) + for r := range conn.Requests { + if int(r.Index) == piece { + conn.Cancel(r) + } } + // Could check here if peer doesn't have piece, but due to caching + // some peers may have said they have a piece but they don't. + me.upload(t, conn) + } +} + +func (me *Client) onFailedPiece(t *torrent, piece int) { + if t.pieceAllDirty(piece) { + t.pendAllChunkSpecs(piece) } + if !t.wantPiece(piece) { + return + } + me.openNewConns(t) for _, conn := range t.Conns { - if correct { - conn.Have(piece) - for r := range conn.Requests { - if int(r.Index) == piece { - conn.Cancel(r) - } - } - me.upload(t, conn) - } else if t.wantPiece(piece) && conn.PeerHasPiece(piece) { + if conn.PeerHasPiece(piece) { me.replenishConnRequests(t, conn) } } - me.event.Broadcast() +} + +func (me *Client) pieceChanged(t *torrent, piece int) { + correct := t.pieceComplete(piece) + defer t.publishPieceChange(piece) + defer me.event.Broadcast() + if correct { + me.onCompletedPiece(t, piece) + } else { + me.onFailedPiece(t, piece) + } } func (cl *Client) verifyPiece(t *torrent, piece int) { diff --git a/example_test.go b/example_test.go index 9dc7502c314afaf5e0c7c9b75c9dec64aa00c51d..8a4377af3b7836f6219482c8325ba211dfbddc7d 100644 --- a/example_test.go +++ b/example_test.go @@ -1,8 +1,9 @@ package torrent_test import ( - "io" "log" + + "github.com/anacrolix/missinggo" "github.com/anacrolix/torrent" ) @@ -26,5 +27,5 @@ r := t.NewReader() defer r.Close() // Access the parts of the torrent pertaining to f. Data will be // downloaded as required, per the configuration of the torrent.Reader. - _ = io.NewSectionReader(r, f.Offset(), f.Length()) + _ = missinggo.NewSectionReadSeeker(r, f.Offset(), f.Length()) } diff --git a/file.go b/file.go index 06858113a163ec1d222f23ad4c5e5bff77f95890..8e28781b9405cf434875df7c1ec73acee33482ec 100644 --- a/file.go +++ b/file.go @@ -75,15 +75,6 @@ } return } -// Marks pieces in the region of the file for download. This is a helper -// wrapping Torrent.SetRegionPriority. -func (f *File) PrioritizeRegion(off, len int64) { - if off < 0 || off >= f.length { - return - } - if off+len > f.length { - len = f.length - off - } - off += f.offset - f.t.SetRegionPriority(off, len) +func (f *File) Download() { + f.t.DownloadPieces(f.t.torrent.byteRegionPieces(f.offset, f.length)) } diff --git a/t.go b/t.go index bf2b9e2245b797529951eabe340cf332a48b422d..8fac08e7c869d03bc6862e53cc9cb48736627dc8 100644 --- a/t.go +++ b/t.go @@ -137,3 +137,11 @@ defer t.cl.mu.Unlock() delete(t.torrent.readers, r) t.torrent.readersChanged(t.cl) } + +func (t Torrent) DownloadPieces(begin, end int) { + t.cl.mu.Lock() + defer t.cl.mu.Unlock() + for i := begin; i < end; i++ { + t.torrent.pendPiece(i, t.cl) + } +} diff --git a/torrent.go b/torrent.go index 2933c809aec794ac13639b25d401ae7a62e91e74..cb1c3fa87435b90f61519e8ace78c8b13e9819df 100644 --- a/torrent.go +++ b/torrent.go @@ -96,6 +96,8 @@ // Closed when .Info is set. gotMetainfo chan struct{} readers map[*Reader]struct{} + + pendingPieces map[int]struct{} } var ( @@ -860,6 +862,9 @@ ret = PiecePriorityNone if t.pieceComplete(piece) { return } + if _, ok := t.pendingPieces[piece]; ok { + ret = PiecePriorityNormal + } raiseRet := func(prio piecePriority) { if prio > ret { ret = prio @@ -876,3 +881,64 @@ return true }) return } + +func (t *torrent) pendPiece(piece int, cl *Client) { + if t.pendingPieces == nil { + t.pendingPieces = make(map[int]struct{}, t.Info.NumPieces()) + } + if _, ok := t.pendingPieces[piece]; ok { + return + } + if t.havePiece(piece) { + return + } + t.pendingPieces[piece] = struct{}{} + for _, c := range t.Conns { + if !c.PeerHasPiece(piece) { + continue + } + + } +} + +func (t *torrent) connRequestPiecePendingChunks(c *connection, piece int) (more bool) { + if !c.PeerHasPiece(piece) { + return true + } + for _, cs := range t.Pieces[piece].shuffledPendingChunkSpecs(t, piece) { + req := request{pp.Integer(piece), cs} + if !c.Request(req) { + return false + } + } + return true +} + +func (t *torrent) fillRequests(c *connection) { + if c.Interested { + if c.PeerChoked { + return + } + if len(c.Requests) > c.requestsLowWater { + return + } + } + if !t.forUrgentPieces(func(piece int) (again bool) { + return t.connRequestPiecePendingChunks(c, piece) + }) { + return + } + t.forReaderWantedRegionPieces(func(begin, end int) (again bool) { + for i := begin + 1; i < end; i++ { + if !t.connRequestPiecePendingChunks(c, i) { + return false + } + } + return true + }) + for i := range t.pendingPieces { + if !t.connRequestPiecePendingChunks(c, i) { + return + } + } +}