]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Fix the download/prioritize piece functions
authorMatt Joiner <anacrolix@gmail.com>
Mon, 18 Jan 2016 14:28:56 +0000 (01:28 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 18 Jan 2016 14:28:56 +0000 (01:28 +1100)
This involves adding a pendingPieces field to torrent.

client.go
example_test.go
file.go
t.go
torrent.go

index de9e4f6dcd221d7f4e92f08ce7f0be22cbb05c28..32134381f9aead38f579d47510ed0a716b285f97 100644 (file)
--- a/client.go
+++ b/client.go
@@ -10,7 +10,6 @@ import (
        "expvar"
        "fmt"
        "io"
-       "io/ioutil"
        "log"
        "math/big"
        mathRand "math/rand"
@@ -29,6 +28,7 @@ import (
        "github.com/anacrolix/missinggo/pubsub"
        "github.com/anacrolix/sync"
        "github.com/anacrolix/utp"
+       "github.com/bradfitz/iter"
        "github.com/edsrzf/mmap-go"
 
        "github.com/anacrolix/torrent/bencode"
@@ -1963,13 +1963,9 @@ func (t Torrent) AddPeers(pp []Peer) error {
 func (t Torrent) DownloadAll() {
        t.cl.mu.Lock()
        defer t.cl.mu.Unlock()
-       for i := range iter.N(t.torrent.numPieces()) {
-               t.cl.raisePiecePriority(t.torrent, i, PiecePriorityNormal)
+       for i := range iter.N(t.torrent.Info.NumPieces()) {
+               t.torrent.pendPiece(i, t.cl)
        }
-       // Nice to have the first and last pieces sooner for various interactive
-       // purposes.
-       t.cl.raisePiecePriority(t.torrent, 0, PiecePriorityReadahead)
-       t.cl.raisePiecePriority(t.torrent, t.torrent.numPieces()-1, PiecePriorityReadahead)
 }
 
 // Returns nil metainfo if it isn't in the cache. Checks that the retrieved
@@ -2366,59 +2362,11 @@ func (me *Client) WaitAll() bool {
        return true
 }
 
-func (me *Client) connAddRequest(c *connection, req request) (more bool) {
-       if len(c.Requests) >= 64 {
-               return false
-       }
-       more = c.Request(req)
-       return
-}
-
-func (me *Client) connRequestPiecePendingChunks(c *connection, t *torrent, piece int) (more bool) {
-       for _, cs := range t.Pieces[piece].shuffledPendingChunkSpecs(t, piece) {
-               req := request{pp.Integer(piece), cs}
-               if !me.connAddRequest(c, req) {
-                       return false
-               }
-       }
-       return true
-}
-
-func (me *Client) fillRequests(t *torrent, c *connection) {
-       if c.Interested {
-               if c.PeerChoked {
-                       return
-               }
-               if len(c.Requests) > c.requestsLowWater {
-                       return
-               }
-       }
-       if !t.forUrgentPieces(func(piece int) (again bool) {
-               if !c.PeerHasPiece(piece) {
-                       return true
-               }
-               return me.connRequestPiecePendingChunks(c, t, piece)
-       }) {
-               return
-       }
-       t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
-               for i := begin + 1; i < end; i++ {
-                       if !c.PeerHasPiece(i) {
-                               continue
-                       }
-                       if !me.connRequestPiecePendingChunks(c, t, i) {
-                               return false
-                       }
-               }
-               return true
-       })
-}
-
 func (me *Client) replenishConnRequests(t *torrent, c *connection) {
        if !t.haveInfo() {
                return
        }
-       me.fillRequests(t, c)
+       t.fillRequests(c)
        if len(c.Requests) == 0 && !c.PeerChoked {
                // So we're not choked, but we don't want anything right now. We may
                // have completed readahead, and the readahead window has not rolled
@@ -2547,32 +2495,44 @@ func (me *Client) pieceHashed(t *torrent, piece int, correct bool) {
        me.pieceChanged(t, int(piece))
 }
 
-func (me *Client) pieceChanged(t *torrent, piece int) {
-       correct := t.pieceComplete(piece)
-       defer t.publishPieceChange(piece)
-       defer me.event.Broadcast()
-       if !correct {
-               if t.pieceAllDirty(piece) {
-                       t.pendAllChunkSpecs(piece)
-               }
-               if t.wantPiece(piece) {
-                       me.openNewConns(t)
+func (me *Client) onCompletedPiece(t *torrent, piece int) {
+       for _, conn := range t.Conns {
+               conn.Have(piece)
+               for r := range conn.Requests {
+                       if int(r.Index) == piece {
+                               conn.Cancel(r)
+                       }
                }
+               // Could check here if peer doesn't have piece, but due to caching
+               // some peers may have said they have a piece but they don't.
+               me.upload(t, conn)
+       }
+}
+
+func (me *Client) onFailedPiece(t *torrent, piece int) {
+       if t.pieceAllDirty(piece) {
+               t.pendAllChunkSpecs(piece)
        }
+       if !t.wantPiece(piece) {
+               return
+       }
+       me.openNewConns(t)
        for _, conn := range t.Conns {
-               if correct {
-                       conn.Have(piece)
-                       for r := range conn.Requests {
-                               if int(r.Index) == piece {
-                                       conn.Cancel(r)
-                               }
-                       }
-                       me.upload(t, conn)
-               } else if t.wantPiece(piece) && conn.PeerHasPiece(piece) {
+               if conn.PeerHasPiece(piece) {
                        me.replenishConnRequests(t, conn)
                }
        }
-       me.event.Broadcast()
+}
+
+func (me *Client) pieceChanged(t *torrent, piece int) {
+       correct := t.pieceComplete(piece)
+       defer t.publishPieceChange(piece)
+       defer me.event.Broadcast()
+       if correct {
+               me.onCompletedPiece(t, piece)
+       } else {
+               me.onFailedPiece(t, piece)
+       }
 }
 
 func (cl *Client) verifyPiece(t *torrent, piece int) {
index 9dc7502c314afaf5e0c7c9b75c9dec64aa00c51d..8a4377af3b7836f6219482c8325ba211dfbddc7d 100644 (file)
@@ -1,9 +1,10 @@
 package torrent_test
 
 import (
-       "io"
        "log"
 
+       "github.com/anacrolix/missinggo"
+
        "github.com/anacrolix/torrent"
 )
 
@@ -26,5 +27,5 @@ func Example_fileReader() {
        defer r.Close()
        // Access the parts of the torrent pertaining to f. Data will be
        // downloaded as required, per the configuration of the torrent.Reader.
-       _ = io.NewSectionReader(r, f.Offset(), f.Length())
+       _ = missinggo.NewSectionReadSeeker(r, f.Offset(), f.Length())
 }
diff --git a/file.go b/file.go
index 06858113a163ec1d222f23ad4c5e5bff77f95890..8e28781b9405cf434875df7c1ec73acee33482ec 100644 (file)
--- a/file.go
+++ b/file.go
@@ -75,15 +75,6 @@ func (f *File) State() (ret []FilePieceState) {
        return
 }
 
-// Marks pieces in the region of the file for download. This is a helper
-// wrapping Torrent.SetRegionPriority.
-func (f *File) PrioritizeRegion(off, len int64) {
-       if off < 0 || off >= f.length {
-               return
-       }
-       if off+len > f.length {
-               len = f.length - off
-       }
-       off += f.offset
-       f.t.SetRegionPriority(off, len)
+func (f *File) Download() {
+       f.t.DownloadPieces(f.t.torrent.byteRegionPieces(f.offset, f.length))
 }
diff --git a/t.go b/t.go
index bf2b9e2245b797529951eabe340cf332a48b422d..8fac08e7c869d03bc6862e53cc9cb48736627dc8 100644 (file)
--- a/t.go
+++ b/t.go
@@ -137,3 +137,11 @@ func (t Torrent) deleteReader(r *Reader) {
        delete(t.torrent.readers, r)
        t.torrent.readersChanged(t.cl)
 }
+
+func (t Torrent) DownloadPieces(begin, end int) {
+       t.cl.mu.Lock()
+       defer t.cl.mu.Unlock()
+       for i := begin; i < end; i++ {
+               t.torrent.pendPiece(i, t.cl)
+       }
+}
index 2933c809aec794ac13639b25d401ae7a62e91e74..cb1c3fa87435b90f61519e8ace78c8b13e9819df 100644 (file)
@@ -96,6 +96,8 @@ type torrent struct {
        gotMetainfo chan struct{}
 
        readers map[*Reader]struct{}
+
+       pendingPieces map[int]struct{}
 }
 
 var (
@@ -860,6 +862,9 @@ func (t *torrent) piecePriority(piece int) (ret piecePriority) {
        if t.pieceComplete(piece) {
                return
        }
+       if _, ok := t.pendingPieces[piece]; ok {
+               ret = PiecePriorityNormal
+       }
        raiseRet := func(prio piecePriority) {
                if prio > ret {
                        ret = prio
@@ -876,3 +881,64 @@ func (t *torrent) piecePriority(piece int) (ret piecePriority) {
        })
        return
 }
+
+func (t *torrent) pendPiece(piece int, cl *Client) {
+       if t.pendingPieces == nil {
+               t.pendingPieces = make(map[int]struct{}, t.Info.NumPieces())
+       }
+       if _, ok := t.pendingPieces[piece]; ok {
+               return
+       }
+       if t.havePiece(piece) {
+               return
+       }
+       t.pendingPieces[piece] = struct{}{}
+       for _, c := range t.Conns {
+               if !c.PeerHasPiece(piece) {
+                       continue
+               }
+
+       }
+}
+
+func (t *torrent) connRequestPiecePendingChunks(c *connection, piece int) (more bool) {
+       if !c.PeerHasPiece(piece) {
+               return true
+       }
+       for _, cs := range t.Pieces[piece].shuffledPendingChunkSpecs(t, piece) {
+               req := request{pp.Integer(piece), cs}
+               if !c.Request(req) {
+                       return false
+               }
+       }
+       return true
+}
+
+func (t *torrent) fillRequests(c *connection) {
+       if c.Interested {
+               if c.PeerChoked {
+                       return
+               }
+               if len(c.Requests) > c.requestsLowWater {
+                       return
+               }
+       }
+       if !t.forUrgentPieces(func(piece int) (again bool) {
+               return t.connRequestPiecePendingChunks(c, piece)
+       }) {
+               return
+       }
+       t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
+               for i := begin + 1; i < end; i++ {
+                       if !t.connRequestPiecePendingChunks(c, i) {
+                               return false
+                       }
+               }
+               return true
+       })
+       for i := range t.pendingPieces {
+               if !t.connRequestPiecePendingChunks(c, i) {
+                       return
+               }
+       }
+}