]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Add piece ordering per-connection piece request priority
authorMatt Joiner <anacrolix@gmail.com>
Wed, 3 Dec 2014 00:22:38 +0000 (18:22 -0600)
committerMatt Joiner <anacrolix@gmail.com>
Wed, 3 Dec 2014 00:22:38 +0000 (18:22 -0600)
client.go
connection.go
download_strategies.go
internal/pieceordering/pieceordering.go [new file with mode: 0644]
internal/pieceordering/pieceordering_test.go [new file with mode: 0644]

index 5625f017ee96cbc3cc74a2fb87a640265ce112c5..f1164e5b5d782dc5c7bd68484fe1201ce88f30b4 100644 (file)
--- a/client.go
+++ b/client.go
@@ -37,6 +37,8 @@ import (
        "syscall"
        "time"
 
+       "bitbucket.org/anacrolix/go.torrent/internal/pieceordering"
+
        "github.com/h2so5/utp"
 
        "github.com/anacrolix/libtorgo/bencode"
@@ -890,7 +892,7 @@ func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerS
                })
        }
        if torrent.haveInfo() {
-               conn.initPieceOrder(torrent.NumPieces())
+               me.initRequestOrdering(torrent, conn)
        }
        err = me.connectionLoop(torrent, conn)
        if err != nil {
@@ -900,12 +902,30 @@ func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerS
        return
 }
 
+func (cl *Client) initRequestOrdering(t *torrent, c *connection) {
+       if c.pieceRequestOrder != nil || c.piecePriorities != nil {
+               panic("double init of request ordering")
+       }
+       c.piecePriorities = mathRand.Perm(t.NumPieces())
+       c.pieceRequestOrder = pieceordering.New()
+       for i := 0; i < t.NumPieces(); i++ {
+               if !c.PeerHasPiece(pp.Integer(i)) {
+                       continue
+               }
+               if !t.wantPiece(i) {
+                       continue
+               }
+               c.pieceRequestOrder.SetPiece(i, c.piecePriorities[i])
+       }
+}
+
 func (me *Client) peerGotPiece(t *torrent, c *connection, piece int) {
        for piece >= len(c.PeerPieces) {
                c.PeerPieces = append(c.PeerPieces, false)
        }
        c.PeerPieces[piece] = true
        if t.wantPiece(piece) {
+               c.pieceRequestOrder.SetPiece(piece, c.piecePriorities[piece])
                me.replenishConnRequests(t, c)
        }
 }
@@ -1924,6 +1944,9 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
        delete(t.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec)
        me.dataReady(t, req)
        if len(t.Pieces[req.Index].PendingChunkSpecs) == 0 {
+               for _, c := range t.Conns {
+                       c.pieceRequestOrder.RemovePiece(int(req.Index))
+               }
                me.queuePieceCheck(t, req.Index)
        }
        t.PieceBytesLeftChanged(int(req.Index))
@@ -2015,6 +2038,10 @@ func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) {
                                        panic("wat")
                                }
                        }
+                       conn.pieceRequestOrder.RemovePiece(int(piece))
+               }
+               if t.wantPiece(int(piece)) && conn.PeerHasPiece(piece) {
+                       conn.pieceRequestOrder.SetPiece(int(piece), conn.piecePriorities[piece])
                }
        }
        if t.haveAllPieces() && me.noUpload {
index 464f14be3dea2c7fd8c25a8a17fde3f06dac608f..16ffedb7235853c1a175630975ca399bc33a0c38 100644 (file)
@@ -8,11 +8,11 @@ import (
        "expvar"
        "fmt"
        "io"
-       "math/rand"
        "net"
        "sync"
        "time"
 
+       "bitbucket.org/anacrolix/go.torrent/internal/pieceordering"
        pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
 )
 
@@ -28,14 +28,16 @@ const (
 
 // Maintains the state of a connection with a peer.
 type connection struct {
-       Socket     net.Conn
-       Discovery  peerSource
-       uTP        bool
-       closing    chan struct{}
-       mu         sync.Mutex // Only for closing.
-       post       chan pp.Message
-       writeCh    chan []byte
-       pieceOrder []int
+       Socket    net.Conn
+       Discovery peerSource
+       uTP       bool
+       closing   chan struct{}
+       mu        sync.Mutex // Only for closing.
+       post      chan pp.Message
+       writeCh   chan []byte
+
+       piecePriorities   []int
+       pieceRequestOrder *pieceordering.Instance
 
        UnwantedChunksReceived int
        UsefulChunksReceived   int
@@ -111,7 +113,6 @@ func (cn *connection) piecesPeerHasCount() (count int) {
 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
 // messages.
 func (cn *connection) setNumPieces(num int) error {
-       cn.initPieceOrder(num)
        if cn.PeerPieces == nil {
                return nil
        }
@@ -134,15 +135,6 @@ func (cn *connection) setNumPieces(num int) error {
        return nil
 }
 
-func (cn *connection) initPieceOrder(numPieces int) {
-       if cn.pieceOrder == nil {
-               cn.pieceOrder = rand.Perm(numPieces)
-       }
-       if len(cn.pieceOrder) != numPieces {
-               panic("piece order initialized with wrong length")
-       }
-}
-
 func eventAgeString(t time.Time) string {
        if t.IsZero() {
                return "never"
index 0b85d190a5d4a78c7523c56c0de329d5761c324a..572eadba3ccbd6ca78a66bde7541e4cd12fa3bb2 100644 (file)
@@ -45,13 +45,13 @@ func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) {
        addRequest := func(req request) (again bool) {
                return c.Request(req)
        }
-       for i := range t.Pieces {
-               pieceIndex := c.pieceOrder[i]
+       for e := c.pieceRequestOrder.First(); e != nil; e = e.Next() {
+               pieceIndex := e.Piece()
                if !c.PeerHasPiece(pp.Integer(pieceIndex)) {
-                       continue
+                       panic("piece in request order but peer doesn't have it")
                }
                if !t.wantPiece(pieceIndex) {
-                       continue
+                       panic("unwanted piece in connection request order")
                }
                piece := t.Pieces[pieceIndex]
                for _, cs := range piece.shuffledPendingChunkSpecs() {
diff --git a/internal/pieceordering/pieceordering.go b/internal/pieceordering/pieceordering.go
new file mode 100644 (file)
index 0000000..71816fc
--- /dev/null
@@ -0,0 +1,78 @@
+package pieceordering
+
+import (
+       "github.com/glenn-brown/skiplist"
+)
+
+type Instance struct {
+       sl        *skiplist.T
+       pieceKeys map[int]int
+}
+
+func New() *Instance {
+       return &Instance{
+               sl: skiplist.New(),
+       }
+}
+
+// Add the piece with the given key. No other piece can have the same key. If
+// the piece is already present, change its key.
+func (me *Instance) SetPiece(piece, key int) {
+       if existingKey, ok := me.pieceKeys[piece]; ok {
+               if existingKey == key {
+                       return
+               }
+               if me.sl.Remove(existingKey).Value.(int) != piece {
+                       panic("piecekeys map lied to us")
+               }
+       }
+       me.sl.Insert(key, piece)
+       if me.pieceKeys == nil {
+               me.pieceKeys = make(map[int]int)
+       }
+       me.pieceKeys[piece] = key
+}
+
+func (me *Instance) RemovePiece(piece int) {
+       key, ok := me.pieceKeys[piece]
+       if !ok {
+               return
+       }
+       el := me.sl.Remove(key)
+       if el == nil {
+               panic("element not present but should be")
+       }
+       if me.sl.Remove(key) != nil {
+               panic("duplicate key")
+       }
+       delete(me.pieceKeys, piece)
+}
+
+func (me Instance) First() Element {
+       e := me.sl.Front()
+       if e == nil {
+               return nil
+       }
+       return element{e}
+}
+
+type Element interface {
+       Piece() int
+       Next() Element
+}
+
+type element struct {
+       sle *skiplist.Element
+}
+
+func (e element) Next() Element {
+       sle := e.sle.Next()
+       if sle == nil {
+               return nil
+       }
+       return element{sle}
+}
+
+func (e element) Piece() int {
+       return e.sle.Value.(int)
+}
diff --git a/internal/pieceordering/pieceordering_test.go b/internal/pieceordering/pieceordering_test.go
new file mode 100644 (file)
index 0000000..30c9c01
--- /dev/null
@@ -0,0 +1,33 @@
+package pieceordering
+
+import (
+       "testing"
+)
+
+func checkOrder(t *testing.T, i *Instance, pp []int) {
+       e := i.First()
+       for _, p := range pp {
+               if p != e.Piece() {
+                       t.FailNow()
+               }
+               e = e.Next()
+       }
+       if e != nil {
+               t.FailNow()
+       }
+}
+
+func TestPieceOrdering(t *testing.T) {
+       i := New()
+       i.SetPiece(0, 1)
+       i.SetPiece(1, 0)
+       checkOrder(t, i, []int{1, 0})
+       i.SetPiece(1, 2)
+       checkOrder(t, i, []int{0, 1})
+       i.RemovePiece(1)
+       checkOrder(t, i, []int{0})
+       i.RemovePiece(2)
+       i.RemovePiece(1)
+       checkOrder(t, i, []int{0})
+       i.RemovePiece(0)
+}