From 7228e40c955a75ece7d86299da2c34fcd5efe8b2 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Tue, 2 Dec 2014 18:22:38 -0600 Subject: [PATCH] Add piece ordering per-connection piece request priority --- client.go | 29 +++++++- connection.go | 30 +++----- download_strategies.go | 8 +- internal/pieceordering/pieceordering.go | 78 ++++++++++++++++++++ internal/pieceordering/pieceordering_test.go | 33 +++++++++ 5 files changed, 154 insertions(+), 24 deletions(-) create mode 100644 internal/pieceordering/pieceordering.go create mode 100644 internal/pieceordering/pieceordering_test.go diff --git a/client.go b/client.go index 5625f017..f1164e5b 100644 --- a/client.go +++ b/client.go @@ -37,6 +37,8 @@ import ( "syscall" "time" + "bitbucket.org/anacrolix/go.torrent/internal/pieceordering" + "github.com/h2so5/utp" "github.com/anacrolix/libtorgo/bencode" @@ -890,7 +892,7 @@ func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerS }) } if torrent.haveInfo() { - conn.initPieceOrder(torrent.NumPieces()) + me.initRequestOrdering(torrent, conn) } err = me.connectionLoop(torrent, conn) if err != nil { @@ -900,12 +902,30 @@ func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerS return } +func (cl *Client) initRequestOrdering(t *torrent, c *connection) { + if c.pieceRequestOrder != nil || c.piecePriorities != nil { + panic("double init of request ordering") + } + c.piecePriorities = mathRand.Perm(t.NumPieces()) + c.pieceRequestOrder = pieceordering.New() + for i := 0; i < t.NumPieces(); i++ { + if !c.PeerHasPiece(pp.Integer(i)) { + continue + } + if !t.wantPiece(i) { + continue + } + c.pieceRequestOrder.SetPiece(i, c.piecePriorities[i]) + } +} + func (me *Client) peerGotPiece(t *torrent, c *connection, piece int) { for piece >= len(c.PeerPieces) { c.PeerPieces = append(c.PeerPieces, false) } c.PeerPieces[piece] = true if t.wantPiece(piece) { + c.pieceRequestOrder.SetPiece(piece, c.piecePriorities[piece]) me.replenishConnRequests(t, c) } } @@ -1924,6 +1944,9 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er delete(t.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec) me.dataReady(t, req) if len(t.Pieces[req.Index].PendingChunkSpecs) == 0 { + for _, c := range t.Conns { + c.pieceRequestOrder.RemovePiece(int(req.Index)) + } me.queuePieceCheck(t, req.Index) } t.PieceBytesLeftChanged(int(req.Index)) @@ -2015,6 +2038,10 @@ func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) { panic("wat") } } + conn.pieceRequestOrder.RemovePiece(int(piece)) + } + if t.wantPiece(int(piece)) && conn.PeerHasPiece(piece) { + conn.pieceRequestOrder.SetPiece(int(piece), conn.piecePriorities[piece]) } } if t.haveAllPieces() && me.noUpload { diff --git a/connection.go b/connection.go index 464f14be..16ffedb7 100644 --- a/connection.go +++ b/connection.go @@ -8,11 +8,11 @@ import ( "expvar" "fmt" "io" - "math/rand" "net" "sync" "time" + "bitbucket.org/anacrolix/go.torrent/internal/pieceordering" pp "bitbucket.org/anacrolix/go.torrent/peer_protocol" ) @@ -28,14 +28,16 @@ const ( // Maintains the state of a connection with a peer. type connection struct { - Socket net.Conn - Discovery peerSource - uTP bool - closing chan struct{} - mu sync.Mutex // Only for closing. - post chan pp.Message - writeCh chan []byte - pieceOrder []int + Socket net.Conn + Discovery peerSource + uTP bool + closing chan struct{} + mu sync.Mutex // Only for closing. + post chan pp.Message + writeCh chan []byte + + piecePriorities []int + pieceRequestOrder *pieceordering.Instance UnwantedChunksReceived int UsefulChunksReceived int @@ -111,7 +113,6 @@ func (cn *connection) piecesPeerHasCount() (count int) { // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE // messages. func (cn *connection) setNumPieces(num int) error { - cn.initPieceOrder(num) if cn.PeerPieces == nil { return nil } @@ -134,15 +135,6 @@ func (cn *connection) setNumPieces(num int) error { return nil } -func (cn *connection) initPieceOrder(numPieces int) { - if cn.pieceOrder == nil { - cn.pieceOrder = rand.Perm(numPieces) - } - if len(cn.pieceOrder) != numPieces { - panic("piece order initialized with wrong length") - } -} - func eventAgeString(t time.Time) string { if t.IsZero() { return "never" diff --git a/download_strategies.go b/download_strategies.go index 0b85d190..572eadba 100644 --- a/download_strategies.go +++ b/download_strategies.go @@ -45,13 +45,13 @@ func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) { addRequest := func(req request) (again bool) { return c.Request(req) } - for i := range t.Pieces { - pieceIndex := c.pieceOrder[i] + for e := c.pieceRequestOrder.First(); e != nil; e = e.Next() { + pieceIndex := e.Piece() if !c.PeerHasPiece(pp.Integer(pieceIndex)) { - continue + panic("piece in request order but peer doesn't have it") } if !t.wantPiece(pieceIndex) { - continue + panic("unwanted piece in connection request order") } piece := t.Pieces[pieceIndex] for _, cs := range piece.shuffledPendingChunkSpecs() { diff --git a/internal/pieceordering/pieceordering.go b/internal/pieceordering/pieceordering.go new file mode 100644 index 00000000..71816fc6 --- /dev/null +++ b/internal/pieceordering/pieceordering.go @@ -0,0 +1,78 @@ +package pieceordering + +import ( + "github.com/glenn-brown/skiplist" +) + +type Instance struct { + sl *skiplist.T + pieceKeys map[int]int +} + +func New() *Instance { + return &Instance{ + sl: skiplist.New(), + } +} + +// Add the piece with the given key. No other piece can have the same key. If +// the piece is already present, change its key. +func (me *Instance) SetPiece(piece, key int) { + if existingKey, ok := me.pieceKeys[piece]; ok { + if existingKey == key { + return + } + if me.sl.Remove(existingKey).Value.(int) != piece { + panic("piecekeys map lied to us") + } + } + me.sl.Insert(key, piece) + if me.pieceKeys == nil { + me.pieceKeys = make(map[int]int) + } + me.pieceKeys[piece] = key +} + +func (me *Instance) RemovePiece(piece int) { + key, ok := me.pieceKeys[piece] + if !ok { + return + } + el := me.sl.Remove(key) + if el == nil { + panic("element not present but should be") + } + if me.sl.Remove(key) != nil { + panic("duplicate key") + } + delete(me.pieceKeys, piece) +} + +func (me Instance) First() Element { + e := me.sl.Front() + if e == nil { + return nil + } + return element{e} +} + +type Element interface { + Piece() int + Next() Element +} + +type element struct { + sle *skiplist.Element +} + +func (e element) Next() Element { + sle := e.sle.Next() + if sle == nil { + return nil + } + return element{sle} +} + +func (e element) Piece() int { + return e.sle.Value.(int) +} diff --git a/internal/pieceordering/pieceordering_test.go b/internal/pieceordering/pieceordering_test.go new file mode 100644 index 00000000..30c9c01c --- /dev/null +++ b/internal/pieceordering/pieceordering_test.go @@ -0,0 +1,33 @@ +package pieceordering + +import ( + "testing" +) + +func checkOrder(t *testing.T, i *Instance, pp []int) { + e := i.First() + for _, p := range pp { + if p != e.Piece() { + t.FailNow() + } + e = e.Next() + } + if e != nil { + t.FailNow() + } +} + +func TestPieceOrdering(t *testing.T) { + i := New() + i.SetPiece(0, 1) + i.SetPiece(1, 0) + checkOrder(t, i, []int{1, 0}) + i.SetPiece(1, 2) + checkOrder(t, i, []int{0, 1}) + i.RemovePiece(1) + checkOrder(t, i, []int{0}) + i.RemovePiece(2) + i.RemovePiece(1) + checkOrder(t, i, []int{0}) + i.RemovePiece(0) +} -- 2.48.1