"syscall"
"time"
+ "bitbucket.org/anacrolix/go.torrent/internal/pieceordering"
+
"github.com/h2so5/utp"
"github.com/anacrolix/libtorgo/bencode"
})
}
if torrent.haveInfo() {
- conn.initPieceOrder(torrent.NumPieces())
+ me.initRequestOrdering(torrent, conn)
}
err = me.connectionLoop(torrent, conn)
if err != nil {
return
}
+func (cl *Client) initRequestOrdering(t *torrent, c *connection) {
+ if c.pieceRequestOrder != nil || c.piecePriorities != nil {
+ panic("double init of request ordering")
+ }
+ c.piecePriorities = mathRand.Perm(t.NumPieces())
+ c.pieceRequestOrder = pieceordering.New()
+ for i := 0; i < t.NumPieces(); i++ {
+ if !c.PeerHasPiece(pp.Integer(i)) {
+ continue
+ }
+ if !t.wantPiece(i) {
+ continue
+ }
+ c.pieceRequestOrder.SetPiece(i, c.piecePriorities[i])
+ }
+}
+
func (me *Client) peerGotPiece(t *torrent, c *connection, piece int) {
for piece >= len(c.PeerPieces) {
c.PeerPieces = append(c.PeerPieces, false)
}
c.PeerPieces[piece] = true
if t.wantPiece(piece) {
+ c.pieceRequestOrder.SetPiece(piece, c.piecePriorities[piece])
me.replenishConnRequests(t, c)
}
}
delete(t.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec)
me.dataReady(t, req)
if len(t.Pieces[req.Index].PendingChunkSpecs) == 0 {
+ for _, c := range t.Conns {
+ c.pieceRequestOrder.RemovePiece(int(req.Index))
+ }
me.queuePieceCheck(t, req.Index)
}
t.PieceBytesLeftChanged(int(req.Index))
panic("wat")
}
}
+ conn.pieceRequestOrder.RemovePiece(int(piece))
+ }
+ if t.wantPiece(int(piece)) && conn.PeerHasPiece(piece) {
+ conn.pieceRequestOrder.SetPiece(int(piece), conn.piecePriorities[piece])
}
}
if t.haveAllPieces() && me.noUpload {
"expvar"
"fmt"
"io"
- "math/rand"
"net"
"sync"
"time"
+ "bitbucket.org/anacrolix/go.torrent/internal/pieceordering"
pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
)
// Maintains the state of a connection with a peer.
type connection struct {
- Socket net.Conn
- Discovery peerSource
- uTP bool
- closing chan struct{}
- mu sync.Mutex // Only for closing.
- post chan pp.Message
- writeCh chan []byte
- pieceOrder []int
+ Socket net.Conn
+ Discovery peerSource
+ uTP bool
+ closing chan struct{}
+ mu sync.Mutex // Only for closing.
+ post chan pp.Message
+ writeCh chan []byte
+
+ piecePriorities []int
+ pieceRequestOrder *pieceordering.Instance
UnwantedChunksReceived int
UsefulChunksReceived int
// invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
// messages.
func (cn *connection) setNumPieces(num int) error {
- cn.initPieceOrder(num)
if cn.PeerPieces == nil {
return nil
}
return nil
}
-func (cn *connection) initPieceOrder(numPieces int) {
- if cn.pieceOrder == nil {
- cn.pieceOrder = rand.Perm(numPieces)
- }
- if len(cn.pieceOrder) != numPieces {
- panic("piece order initialized with wrong length")
- }
-}
-
func eventAgeString(t time.Time) string {
if t.IsZero() {
return "never"
addRequest := func(req request) (again bool) {
return c.Request(req)
}
- for i := range t.Pieces {
- pieceIndex := c.pieceOrder[i]
+ for e := c.pieceRequestOrder.First(); e != nil; e = e.Next() {
+ pieceIndex := e.Piece()
if !c.PeerHasPiece(pp.Integer(pieceIndex)) {
- continue
+ panic("piece in request order but peer doesn't have it")
}
if !t.wantPiece(pieceIndex) {
- continue
+ panic("unwanted piece in connection request order")
}
piece := t.Pieces[pieceIndex]
for _, cs := range piece.shuffledPendingChunkSpecs() {
--- /dev/null
+package pieceordering
+
+import (
+ "github.com/glenn-brown/skiplist"
+)
+
+type Instance struct {
+ sl *skiplist.T
+ pieceKeys map[int]int
+}
+
+func New() *Instance {
+ return &Instance{
+ sl: skiplist.New(),
+ }
+}
+
+// Add the piece with the given key. No other piece can have the same key. If
+// the piece is already present, change its key.
+func (me *Instance) SetPiece(piece, key int) {
+ if existingKey, ok := me.pieceKeys[piece]; ok {
+ if existingKey == key {
+ return
+ }
+ if me.sl.Remove(existingKey).Value.(int) != piece {
+ panic("piecekeys map lied to us")
+ }
+ }
+ me.sl.Insert(key, piece)
+ if me.pieceKeys == nil {
+ me.pieceKeys = make(map[int]int)
+ }
+ me.pieceKeys[piece] = key
+}
+
+func (me *Instance) RemovePiece(piece int) {
+ key, ok := me.pieceKeys[piece]
+ if !ok {
+ return
+ }
+ el := me.sl.Remove(key)
+ if el == nil {
+ panic("element not present but should be")
+ }
+ if me.sl.Remove(key) != nil {
+ panic("duplicate key")
+ }
+ delete(me.pieceKeys, piece)
+}
+
+func (me Instance) First() Element {
+ e := me.sl.Front()
+ if e == nil {
+ return nil
+ }
+ return element{e}
+}
+
+type Element interface {
+ Piece() int
+ Next() Element
+}
+
+type element struct {
+ sle *skiplist.Element
+}
+
+func (e element) Next() Element {
+ sle := e.sle.Next()
+ if sle == nil {
+ return nil
+ }
+ return element{sle}
+}
+
+func (e element) Piece() int {
+ return e.sle.Value.(int)
+}
--- /dev/null
+package pieceordering
+
+import (
+ "testing"
+)
+
+func checkOrder(t *testing.T, i *Instance, pp []int) {
+ e := i.First()
+ for _, p := range pp {
+ if p != e.Piece() {
+ t.FailNow()
+ }
+ e = e.Next()
+ }
+ if e != nil {
+ t.FailNow()
+ }
+}
+
+func TestPieceOrdering(t *testing.T) {
+ i := New()
+ i.SetPiece(0, 1)
+ i.SetPiece(1, 0)
+ checkOrder(t, i, []int{1, 0})
+ i.SetPiece(1, 2)
+ checkOrder(t, i, []int{0, 1})
+ i.RemovePiece(1)
+ checkOrder(t, i, []int{0})
+ i.RemovePiece(2)
+ i.RemovePiece(1)
+ checkOrder(t, i, []int{0})
+ i.RemovePiece(0)
+}