]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Reintroduce connection piece inclinations, and begin caching piece priorities
authorMatt Joiner <anacrolix@gmail.com>
Mon, 1 Feb 2016 10:11:41 +0000 (21:11 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 1 Feb 2016 10:11:41 +0000 (21:11 +1100)
client.go
connection.go
piece.go
reader.go
t.go
torrent.go

index 2daff28265c416fb5cf8360e635c052cfbfe5bfd..2def4f902b94b60b6e75a2690f2f70f2c2b27331 100644 (file)
--- a/client.go
+++ b/client.go
@@ -1184,9 +1184,7 @@ func (me *Client) peerGotPiece(t *torrent, c *connection, piece int) error {
                }
                c.PeerPieces[piece] = true
        }
-       if t.wantPiece(piece) {
-               c.updateRequests()
-       }
+       c.updatePiecePriority(piece)
        return nil
 }
 
@@ -1625,6 +1623,7 @@ func (me *Client) deleteConnection(t *torrent, c *connection) bool {
 func (me *Client) dropConnection(t *torrent, c *connection) {
        me.event.Broadcast()
        c.Close()
+
        if me.deleteConnection(t, c) {
                me.openNewConns(t)
        }
@@ -2055,6 +2054,7 @@ func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (T Torrent, new bool, err er
                }
                // TODO: Tidy this up?
                t = newTorrent(spec.InfoHash)
+               t.cl = cl
                if spec.ChunkSize != 0 {
                        t.chunkSize = pp.Integer(spec.ChunkSize)
                }
index fd28df3c850fdac9b197848a768896c1c072be50..419f6a9fd850312e3f5c24748b9fb6a6a4ba1a55 100644 (file)
@@ -13,6 +13,9 @@ import (
        "time"
 
        "github.com/anacrolix/missinggo"
+       "github.com/anacrolix/missinggo/itertools"
+       "github.com/anacrolix/missinggo/prioritybitmap"
+
        "github.com/anacrolix/torrent/bencode"
        pp "github.com/anacrolix/torrent/peer_protocol"
 )
@@ -75,6 +78,9 @@ type connection struct {
        PeerMaxRequests  int // Maximum pending requests the peer allows.
        PeerExtensionIDs map[string]byte
        PeerClientName   string
+
+       pieceInclination  []int
+       pieceRequestOrder prioritybitmap.PriorityBitmap
 }
 
 func newConnection() (c *connection) {
@@ -234,6 +240,7 @@ func (cn *connection) WriteStatus(w io.Writer, t *torrent) {
 
 func (c *connection) Close() {
        c.closed.Set()
+       c.discardPieceInclination()
        // TODO: This call blocks sometimes, why?
        go c.conn.Close()
 }
@@ -550,26 +557,53 @@ func (c *connection) updateRequests() {
 }
 
 func (c *connection) fillRequests() {
-       if !c.t.forUrgentPieces(func(piece int) (again bool) {
-               return c.t.connRequestPiecePendingChunks(c, piece)
-       }) {
+       itertools.ForIterable(&c.pieceRequestOrder, func(_piece interface{}) (more bool) {
+               return c.requestPiecePendingChunks(_piece.(int))
+       })
+}
+
+func (c *connection) requestPiecePendingChunks(piece int) (again bool) {
+       return c.t.connRequestPiecePendingChunks(c, piece)
+}
+
+func (c *connection) stopRequestingPiece(piece int) {
+       c.pieceRequestOrder.Remove(piece)
+}
+
+func (c *connection) updatePiecePriority(piece int) {
+       if !c.PeerHasPiece(piece) {
                return
        }
-       c.t.forReaderOffsetPieces(func(begin, end int) (again bool) {
-               for i := begin + 1; i < end; i++ {
-                       if !c.t.connRequestPiecePendingChunks(c, i) {
-                               return false
-                       }
-               }
-               return true
-       })
-       for it := c.t.pendingPieces.Iter(); it.Next(); {
-               i := it.Value()
-               if !c.t.wantPiece(i) {
-                       continue
-               }
-               if !c.t.connRequestPiecePendingChunks(c, i) {
-                       return
-               }
+       tpp := c.t.piecePriority(piece)
+       if tpp == PiecePriorityNone {
+               c.stopRequestingPiece(piece)
+               return
+       }
+       prio := c.getPieceInclination()[piece]
+       switch tpp {
+       case PiecePriorityNormal:
+       case PiecePriorityReadahead:
+               prio -= c.t.numPieces()
+       case PiecePriorityNext, PiecePriorityNow:
+               prio -= 2 * c.t.numPieces()
+       default:
+               panic(tpp)
+       }
+       c.pieceRequestOrder.Set(piece, prio)
+       c.updateRequests()
+}
+
+func (c *connection) getPieceInclination() []int {
+       if c.pieceInclination == nil {
+               c.pieceInclination = c.t.getConnPieceInclination()
+       }
+       return c.pieceInclination
+}
+
+func (c *connection) discardPieceInclination() {
+       if c.pieceInclination == nil {
+               return
        }
+       c.t.putPieceInclination(c.pieceInclination)
+       c.pieceInclination = nil
 }
index 0643878c2e73fe29f50eee208133337208b8776c..843ddb9e797f12e4643679469d96701697c73f89 100644 (file)
--- a/piece.go
+++ b/piece.go
@@ -31,6 +31,7 @@ type piece struct {
        QueuedForHash    bool
        EverHashed       bool
        PublicPieceState PieceState
+       priority         piecePriority
 
        pendingWritesMutex sync.Mutex
        pendingWrites      int
index acc481cf2d9e1e8fe2449d6d5c49ca4c0c70037e..9d30781de8ebff7ff5979165fd3cd73faa01c5a7 100644 (file)
--- a/reader.go
+++ b/reader.go
@@ -74,7 +74,7 @@ func (r *Reader) available(off, max int64) (ret int64) {
 }
 
 func (r *Reader) tickleClient() {
-       r.t.torrent.readersChanged(r.t.cl)
+       r.t.torrent.readersChanged()
 }
 
 func (r *Reader) waitReadable(off int64) {
@@ -152,7 +152,7 @@ func (r *Reader) Close() error {
 func (r *Reader) posChanged() {
        r.t.cl.mu.Lock()
        defer r.t.cl.mu.Unlock()
-       r.t.torrent.readersChanged(r.t.cl)
+       r.t.torrent.readersChanged()
 }
 
 func (r *Reader) Seek(off int64, whence int) (ret int64, err error) {
diff --git a/t.go b/t.go
index 123cb22549df4de156ecee24cab6ba6572368d73..ffb64dd3ac221289a31d512cdffd4e680089a85c 100644 (file)
--- a/t.go
+++ b/t.go
@@ -127,14 +127,14 @@ func (t Torrent) addReader(r *Reader) {
                t.torrent.readers = make(map[*Reader]struct{})
        }
        t.torrent.readers[r] = struct{}{}
-       t.torrent.readersChanged(t.cl)
+       t.torrent.readersChanged()
 }
 
 func (t Torrent) deleteReader(r *Reader) {
        t.cl.mu.Lock()
        defer t.cl.mu.Unlock()
        delete(t.torrent.readers, r)
-       t.torrent.readersChanged(t.cl)
+       t.torrent.readersChanged()
 }
 
 func (t Torrent) DownloadPieces(begin, end int) {
index 214fc1af4ebd4209610f705460c2f8427119d454..b50db665b08c43fa3daf469ead4cbcfa4f41dcab 100644 (file)
@@ -6,6 +6,7 @@ import (
        "fmt"
        "io"
        "log"
+       "math/rand"
        "net"
        "sort"
        "sync"
@@ -51,6 +52,8 @@ type peersKey struct {
 
 // Is not aware of Client. Maintains state of torrent for with-in a Client.
 type torrent struct {
+       cl *Client
+
        stateMu sync.Mutex
        closing chan struct{}
 
@@ -99,12 +102,15 @@ type torrent struct {
 
        readers map[*Reader]struct{}
 
-       pendingPieces *bitmap.Bitmap
+       pendingPieces bitmap.Bitmap
+
+       connPieceInclinationPool sync.Pool
 }
 
 var (
-       piecePrioritiesReused = expvar.NewInt("piecePrioritiesReused")
-       piecePrioritiesNew    = expvar.NewInt("piecePrioritiesNew")
+       pieceInclinationsReused = expvar.NewInt("pieceInclinationsReused")
+       pieceInclinationsNew    = expvar.NewInt("pieceInclinationsNew")
+       pieceInclinationsPut    = expvar.NewInt("pieceInclinationsPut")
 )
 
 func (t *torrent) setDisplayName(dn string) {
@@ -836,13 +842,43 @@ func (t *torrent) forUrgentPieces(f func(piece int) (again bool)) (all bool) {
        })
 }
 
-func (t *torrent) readersChanged(cl *Client) {
-       // Accept new connections.
-       cl.event.Broadcast()
+func (t *torrent) readersChanged() {
+       t.updatePiecePriorities()
+}
+
+func (t *torrent) maybeNewConns() {
+       // Tickle the accept routine.
+       t.cl.event.Broadcast()
+       t.openNewConns()
+}
+
+func (t *torrent) piecePriorityChanged(piece int) {
+       for _, c := range t.Conns {
+               c.updatePiecePriority(piece)
+       }
+       t.maybeNewConns()
+}
+
+func (t *torrent) updatePiecePriority(piece int) bool {
+       p := &t.Pieces[piece]
+       newPrio := t.piecePriorityUncached(piece)
+       if newPrio == p.priority {
+               return false
+       }
+       p.priority = newPrio
+       return true
+}
+
+func (t *torrent) updatePiecePriorities() {
+       for i := range t.Pieces {
+               if t.updatePiecePriority(i) {
+                       t.piecePriorityChanged(i)
+               }
+       }
        for _, c := range t.Conns {
                c.updateRequests()
        }
-       cl.openNewConns(t)
+       t.maybeNewConns()
 }
 
 func (t *torrent) byteRegionPieces(off, size int64) (begin, end int) {
@@ -884,7 +920,14 @@ func (t *torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all
        return true
 }
 
-func (t *torrent) piecePriority(piece int) (ret piecePriority) {
+func (t *torrent) piecePriority(piece int) piecePriority {
+       if !t.haveInfo() {
+               return PiecePriorityNone
+       }
+       return t.Pieces[piece].priority
+}
+
+func (t *torrent) piecePriorityUncached(piece int) (ret piecePriority) {
        ret = PiecePriorityNone
        if t.pieceComplete(piece) {
                return
@@ -910,9 +953,6 @@ func (t *torrent) piecePriority(piece int) (ret piecePriority) {
 }
 
 func (t *torrent) pendPiece(piece int, cl *Client) {
-       if t.pendingPieces == nil {
-               t.pendingPieces = bitmap.New()
-       }
        if t.pendingPieces.Contains(piece) {
                return
        }
@@ -920,14 +960,10 @@ func (t *torrent) pendPiece(piece int, cl *Client) {
                return
        }
        t.pendingPieces.Add(piece)
-       for _, c := range t.Conns {
-               if !c.PeerHasPiece(piece) {
-                       continue
-               }
-               c.updateRequests()
+       if !t.updatePiecePriority(piece) {
+               return
        }
-       cl.openNewConns(t)
-       cl.pieceChanged(t, piece)
+       t.piecePriorityChanged(piece)
 }
 
 func (t *torrent) connRequestPiecePendingChunks(c *connection, piece int) (more bool) {
@@ -947,3 +983,26 @@ func (t *torrent) pendRequest(req request) {
        ci := chunkIndex(req.chunkSpec, t.chunkSize)
        t.Pieces[req.Index].pendChunkIndex(ci)
 }
+
+func (t *torrent) pieceChanged(piece int) {
+       t.cl.pieceChanged(t, piece)
+}
+
+func (t *torrent) openNewConns() {
+       t.cl.openNewConns(t)
+}
+
+func (t *torrent) getConnPieceInclination() []int {
+       _ret := t.connPieceInclinationPool.Get()
+       if _ret == nil {
+               pieceInclinationsNew.Add(1)
+               return rand.Perm(t.numPieces())
+       }
+       pieceInclinationsReused.Add(1)
+       return _ret.([]int)
+}
+
+func (t *torrent) putPieceInclination(pi []int) {
+       t.connPieceInclinationPool.Put(pi)
+       pieceInclinationsPut.Add(1)
+}