]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Give each connection its own piece priority order
authorMatt Joiner <anacrolix@gmail.com>
Mon, 1 Dec 2014 09:36:25 +0000 (03:36 -0600)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 1 Dec 2014 09:36:25 +0000 (03:36 -0600)
client.go
connection.go
download_strategies.go

index c886a0ef6e97b835b87424bd216e62b3bedf0a83..741416272435286f44e8b4ec3a09a1423da1dbf9 100644 (file)
--- a/client.go
+++ b/client.go
@@ -836,6 +836,9 @@ func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerS
                        Port: uint16(addr.Port),
                })
        }
+       if torrent.haveInfo() {
+               conn.initPieceOrder(torrent.NumPieces())
+       }
        err = me.connectionLoop(torrent, conn)
        if err != nil {
                err = fmt.Errorf("during Connection loop with peer %q: %s", conn.PeerID, err)
@@ -1758,36 +1761,11 @@ func (me *Client) WaitAll() bool {
        return true
 }
 
-func (cl *Client) assertRequestHeat() {
-       dds, ok := cl.downloadStrategy.(*DefaultDownloadStrategy)
-       if !ok {
-               return
-       }
-       for _, t := range cl.torrents {
-               m := make(map[request]int, 3000)
-               for _, cn := range t.Conns {
-                       for r := range cn.Requests {
-                               m[r]++
-                       }
-               }
-               for r, h := range dds.heat[t] {
-                       if m[r] != h {
-                               panic(fmt.Sprintln(m[r], h))
-                       }
-               }
-       }
-}
-
 func (me *Client) replenishConnRequests(t *torrent, c *connection) {
        if !t.haveInfo() {
                return
        }
-       for _, p := range me.downloadStrategy.FillRequests(t, c) {
-               // Make sure the state of pieces that would have been requested is
-               // known.
-               me.queueFirstHash(t, p)
-       }
-       //me.assertRequestHeat()
+       me.downloadStrategy.FillRequests(t, c)
        if len(c.Requests) == 0 && !c.PeerChoked {
                c.SetInterested(false)
        }
@@ -1916,11 +1894,6 @@ func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) {
                                }
                        }
                }
-               // Do this even if the piece is correct because new first-hashings may
-               // need to be scheduled.
-               if conn.PeerHasPiece(piece) {
-                       me.replenishConnRequests(t, conn)
-               }
        }
        if t.haveAllPieces() && me.noUpload {
                t.CeaseNetworking()
index 1a75e17a6f0d15129aef0580d70420fc82ec103c..464f14be3dea2c7fd8c25a8a17fde3f06dac608f 100644 (file)
@@ -8,6 +8,7 @@ import (
        "expvar"
        "fmt"
        "io"
+       "math/rand"
        "net"
        "sync"
        "time"
@@ -27,13 +28,14 @@ const (
 
 // Maintains the state of a connection with a peer.
 type connection struct {
-       Socket    net.Conn
-       Discovery peerSource
-       uTP       bool
-       closing   chan struct{}
-       mu        sync.Mutex // Only for closing.
-       post      chan pp.Message
-       writeCh   chan []byte
+       Socket     net.Conn
+       Discovery  peerSource
+       uTP        bool
+       closing    chan struct{}
+       mu         sync.Mutex // Only for closing.
+       post       chan pp.Message
+       writeCh    chan []byte
+       pieceOrder []int
 
        UnwantedChunksReceived int
        UsefulChunksReceived   int
@@ -109,6 +111,7 @@ func (cn *connection) piecesPeerHasCount() (count int) {
 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
 // messages.
 func (cn *connection) setNumPieces(num int) error {
+       cn.initPieceOrder(num)
        if cn.PeerPieces == nil {
                return nil
        }
@@ -131,6 +134,15 @@ func (cn *connection) setNumPieces(num int) error {
        return nil
 }
 
+func (cn *connection) initPieceOrder(numPieces int) {
+       if cn.pieceOrder == nil {
+               cn.pieceOrder = rand.Perm(numPieces)
+       }
+       if len(cn.pieceOrder) != numPieces {
+               panic("piece order initialized with wrong length")
+       }
+}
+
 func eventAgeString(t time.Time) string {
        if t.IsZero() {
                return "never"
index 5317cbf9cbac6dd2bf9b3fbde859405ad73d6f6f..0b85d190a5d4a78c7523c56c0de329d5761c324a 100644 (file)
@@ -10,10 +10,8 @@ import (
 )
 
 type DownloadStrategy interface {
-       // Tops up the outgoing pending requests. Returns the indices of pieces
-       // that would be requested. This is used to determine if pieces require
-       // hashing so the completed state is known.
-       FillRequests(*torrent, *connection) (pieces []int)
+       // Tops up the outgoing pending requests.
+       FillRequests(*torrent, *connection)
        TorrentStarted(*torrent)
        TorrentStopped(*torrent)
        DeleteRequest(*torrent, request)
@@ -25,97 +23,53 @@ type DownloadStrategy interface {
        PendingData(*torrent) bool
 }
 
-type DefaultDownloadStrategy struct {
-       heat map[*torrent]map[request]int
-}
+type DefaultDownloadStrategy struct{}
 
 func (me *DefaultDownloadStrategy) PendingData(t *torrent) bool {
        return !t.haveAllPieces()
 }
 
-func (me *DefaultDownloadStrategy) AssertNotRequested(t *torrent, r request) {
-       if me.heat[t][r] != 0 {
-               panic("outstanding requests break invariant")
-       }
-}
+func (me *DefaultDownloadStrategy) AssertNotRequested(t *torrent, r request) {}
 
 func (me *DefaultDownloadStrategy) WriteStatus(w io.Writer) {}
 
-func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) (pieces []int) {
+func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) {
        if c.Interested {
                if c.PeerChoked {
                        return
                }
-               if len(c.Requests) >= (c.PeerMaxRequests+1)/2 {
+               if len(c.Requests) != 0 {
                        return
                }
        }
-       th := s.heat[t]
        addRequest := func(req request) (again bool) {
-               piece := t.Pieces[req.Index]
-               if piece.Hashing || piece.QueuedForHash {
-                       // We can't be sure we want this.
-                       return true
-               }
-               if piece.Complete() {
-                       // We already have this.
-                       return true
-               }
-               if c.RequestPending(req) {
-                       return true
+               return c.Request(req)
+       }
+       for i := range t.Pieces {
+               pieceIndex := c.pieceOrder[i]
+               if !c.PeerHasPiece(pp.Integer(pieceIndex)) {
+                       continue
                }
-               again = c.Request(req)
-               if c.RequestPending(req) {
-                       th[req]++
+               if !t.wantPiece(pieceIndex) {
+                       continue
                }
-               return
-       }
-       // Then finish off incomplete pieces in order of bytes remaining.
-       for _, heatThreshold := range []int{1, 4, 15, 60} {
-               for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() {
-                       pieceIndex := pp.Integer(e.Value.(int))
-                       piece := t.Pieces[pieceIndex]
-                       if !piece.EverHashed {
-                               pieces = append(pieces, int(pieceIndex))
-                               continue
-                       }
-                       for chunkSpec := range t.Pieces[pieceIndex].PendingChunkSpecs {
-                               r := request{pieceIndex, chunkSpec}
-                               if th[r] >= heatThreshold {
-                                       continue
-                               }
-                               if !addRequest(r) {
-                                       return
-                               }
+               piece := t.Pieces[pieceIndex]
+               for _, cs := range piece.shuffledPendingChunkSpecs() {
+                       r := request{pp.Integer(pieceIndex), cs}
+                       if !addRequest(r) {
+                               return
                        }
                }
        }
        return
 }
 
-func (s *DefaultDownloadStrategy) TorrentStarted(t *torrent) {
-       if s.heat[t] != nil {
-               panic("torrent already started")
-       }
-       if s.heat == nil {
-               s.heat = make(map[*torrent]map[request]int, 10)
-       }
-       s.heat[t] = make(map[request]int, t.ChunkCount())
-}
+func (s *DefaultDownloadStrategy) TorrentStarted(t *torrent) {}
 
 func (s *DefaultDownloadStrategy) TorrentStopped(t *torrent) {
-       if _, ok := s.heat[t]; !ok {
-               panic("torrent not yet started")
-       }
-       delete(s.heat, t)
 }
 
 func (s *DefaultDownloadStrategy) DeleteRequest(t *torrent, r request) {
-       m := s.heat[t]
-       if m[r] <= 0 {
-               panic("no pending requests")
-       }
-       m[r]--
 }
 
 func (me *DefaultDownloadStrategy) TorrentGotChunk(t *torrent, c request)      {}
@@ -176,18 +130,10 @@ type requestFiller struct {
        c *connection
        t *torrent
        s *responsiveDownloadStrategy
-
-       // The set of pieces that were considered for requesting.
-       pieces map[int]struct{}
 }
 
 // Wrapper around connection.request that tracks request heat.
 func (me *requestFiller) request(req request) bool {
-       if me.pieces == nil {
-               me.pieces = make(map[int]struct{})
-       }
-       // log.Print(req)
-       me.pieces[int(req.Index)] = struct{}{}
        if me.c.RequestPending(req) {
                return true
        }
@@ -340,12 +286,9 @@ func (me *requestFiller) readahead() bool {
        return true
 }
 
-func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) (pieces []int) {
+func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) {
        rf := requestFiller{c: c, t: t, s: me}
        rf.Run()
-       for p := range rf.pieces {
-               pieces = append(pieces, p)
-       }
        return
 }
 
@@ -385,5 +328,13 @@ func (s *responsiveDownloadStrategy) AssertNotRequested(t *torrent, r request) {
 }
 
 func (me *responsiveDownloadStrategy) PendingData(t *torrent) bool {
-       return len(me.FillRequests(t, me.dummyConn)) != 0
+       if len(me.priorities[t]) != 0 {
+               return true
+       }
+       for index := range t.Pieces {
+               if t.wantPiece(index) {
+                       return true
+               }
+       }
+       return false
 }