}
c.PeerPieces[piece] = true
}
- if t.wantPiece(piece) {
- c.updateRequests()
- }
+ c.updatePiecePriority(piece)
return nil
}
func (me *Client) dropConnection(t *torrent, c *connection) {
me.event.Broadcast()
c.Close()
+
if me.deleteConnection(t, c) {
me.openNewConns(t)
}
}
// TODO: Tidy this up?
t = newTorrent(spec.InfoHash)
+ t.cl = cl
if spec.ChunkSize != 0 {
t.chunkSize = pp.Integer(spec.ChunkSize)
}
"time"
"github.com/anacrolix/missinggo"
+ "github.com/anacrolix/missinggo/itertools"
+ "github.com/anacrolix/missinggo/prioritybitmap"
+
"github.com/anacrolix/torrent/bencode"
pp "github.com/anacrolix/torrent/peer_protocol"
)
PeerMaxRequests int // Maximum pending requests the peer allows.
PeerExtensionIDs map[string]byte
PeerClientName string
+
+ pieceInclination []int
+ pieceRequestOrder prioritybitmap.PriorityBitmap
}
func newConnection() (c *connection) {
func (c *connection) Close() {
c.closed.Set()
+ c.discardPieceInclination()
// TODO: This call blocks sometimes, why?
go c.conn.Close()
}
}
func (c *connection) fillRequests() {
- if !c.t.forUrgentPieces(func(piece int) (again bool) {
- return c.t.connRequestPiecePendingChunks(c, piece)
- }) {
+ itertools.ForIterable(&c.pieceRequestOrder, func(_piece interface{}) (more bool) {
+ return c.requestPiecePendingChunks(_piece.(int))
+ })
+}
+
+func (c *connection) requestPiecePendingChunks(piece int) (again bool) {
+ return c.t.connRequestPiecePendingChunks(c, piece)
+}
+
+func (c *connection) stopRequestingPiece(piece int) {
+ c.pieceRequestOrder.Remove(piece)
+}
+
+func (c *connection) updatePiecePriority(piece int) {
+ if !c.PeerHasPiece(piece) {
return
}
- c.t.forReaderOffsetPieces(func(begin, end int) (again bool) {
- for i := begin + 1; i < end; i++ {
- if !c.t.connRequestPiecePendingChunks(c, i) {
- return false
- }
- }
- return true
- })
- for it := c.t.pendingPieces.Iter(); it.Next(); {
- i := it.Value()
- if !c.t.wantPiece(i) {
- continue
- }
- if !c.t.connRequestPiecePendingChunks(c, i) {
- return
- }
+ tpp := c.t.piecePriority(piece)
+ if tpp == PiecePriorityNone {
+ c.stopRequestingPiece(piece)
+ return
+ }
+ prio := c.getPieceInclination()[piece]
+ switch tpp {
+ case PiecePriorityNormal:
+ case PiecePriorityReadahead:
+ prio -= c.t.numPieces()
+ case PiecePriorityNext, PiecePriorityNow:
+ prio -= 2 * c.t.numPieces()
+ default:
+ panic(tpp)
+ }
+ c.pieceRequestOrder.Set(piece, prio)
+ c.updateRequests()
+}
+
+func (c *connection) getPieceInclination() []int {
+ if c.pieceInclination == nil {
+ c.pieceInclination = c.t.getConnPieceInclination()
+ }
+ return c.pieceInclination
+}
+
+func (c *connection) discardPieceInclination() {
+ if c.pieceInclination == nil {
+ return
}
+ c.t.putPieceInclination(c.pieceInclination)
+ c.pieceInclination = nil
}
QueuedForHash bool
EverHashed bool
PublicPieceState PieceState
+ priority piecePriority
pendingWritesMutex sync.Mutex
pendingWrites int
}
func (r *Reader) tickleClient() {
- r.t.torrent.readersChanged(r.t.cl)
+ r.t.torrent.readersChanged()
}
func (r *Reader) waitReadable(off int64) {
func (r *Reader) posChanged() {
r.t.cl.mu.Lock()
defer r.t.cl.mu.Unlock()
- r.t.torrent.readersChanged(r.t.cl)
+ r.t.torrent.readersChanged()
}
func (r *Reader) Seek(off int64, whence int) (ret int64, err error) {
t.torrent.readers = make(map[*Reader]struct{})
}
t.torrent.readers[r] = struct{}{}
- t.torrent.readersChanged(t.cl)
+ t.torrent.readersChanged()
}
func (t Torrent) deleteReader(r *Reader) {
t.cl.mu.Lock()
defer t.cl.mu.Unlock()
delete(t.torrent.readers, r)
- t.torrent.readersChanged(t.cl)
+ t.torrent.readersChanged()
}
func (t Torrent) DownloadPieces(begin, end int) {
"fmt"
"io"
"log"
+ "math/rand"
"net"
"sort"
"sync"
// Is not aware of Client. Maintains state of torrent for with-in a Client.
type torrent struct {
+ cl *Client
+
stateMu sync.Mutex
closing chan struct{}
readers map[*Reader]struct{}
- pendingPieces *bitmap.Bitmap
+ pendingPieces bitmap.Bitmap
+
+ connPieceInclinationPool sync.Pool
}
var (
- piecePrioritiesReused = expvar.NewInt("piecePrioritiesReused")
- piecePrioritiesNew = expvar.NewInt("piecePrioritiesNew")
+ pieceInclinationsReused = expvar.NewInt("pieceInclinationsReused")
+ pieceInclinationsNew = expvar.NewInt("pieceInclinationsNew")
+ pieceInclinationsPut = expvar.NewInt("pieceInclinationsPut")
)
func (t *torrent) setDisplayName(dn string) {
})
}
-func (t *torrent) readersChanged(cl *Client) {
- // Accept new connections.
- cl.event.Broadcast()
+func (t *torrent) readersChanged() {
+ t.updatePiecePriorities()
+}
+
+func (t *torrent) maybeNewConns() {
+ // Tickle the accept routine.
+ t.cl.event.Broadcast()
+ t.openNewConns()
+}
+
+func (t *torrent) piecePriorityChanged(piece int) {
+ for _, c := range t.Conns {
+ c.updatePiecePriority(piece)
+ }
+ t.maybeNewConns()
+}
+
+func (t *torrent) updatePiecePriority(piece int) bool {
+ p := &t.Pieces[piece]
+ newPrio := t.piecePriorityUncached(piece)
+ if newPrio == p.priority {
+ return false
+ }
+ p.priority = newPrio
+ return true
+}
+
+func (t *torrent) updatePiecePriorities() {
+ for i := range t.Pieces {
+ if t.updatePiecePriority(i) {
+ t.piecePriorityChanged(i)
+ }
+ }
for _, c := range t.Conns {
c.updateRequests()
}
- cl.openNewConns(t)
+ t.maybeNewConns()
}
func (t *torrent) byteRegionPieces(off, size int64) (begin, end int) {
return true
}
-func (t *torrent) piecePriority(piece int) (ret piecePriority) {
+func (t *torrent) piecePriority(piece int) piecePriority {
+ if !t.haveInfo() {
+ return PiecePriorityNone
+ }
+ return t.Pieces[piece].priority
+}
+
+func (t *torrent) piecePriorityUncached(piece int) (ret piecePriority) {
ret = PiecePriorityNone
if t.pieceComplete(piece) {
return
}
func (t *torrent) pendPiece(piece int, cl *Client) {
- if t.pendingPieces == nil {
- t.pendingPieces = bitmap.New()
- }
if t.pendingPieces.Contains(piece) {
return
}
return
}
t.pendingPieces.Add(piece)
- for _, c := range t.Conns {
- if !c.PeerHasPiece(piece) {
- continue
- }
- c.updateRequests()
+ if !t.updatePiecePriority(piece) {
+ return
}
- cl.openNewConns(t)
- cl.pieceChanged(t, piece)
+ t.piecePriorityChanged(piece)
}
func (t *torrent) connRequestPiecePendingChunks(c *connection, piece int) (more bool) {
ci := chunkIndex(req.chunkSpec, t.chunkSize)
t.Pieces[req.Index].pendChunkIndex(ci)
}
+
+func (t *torrent) pieceChanged(piece int) {
+ t.cl.pieceChanged(t, piece)
+}
+
+func (t *torrent) openNewConns() {
+ t.cl.openNewConns(t)
+}
+
+func (t *torrent) getConnPieceInclination() []int {
+ _ret := t.connPieceInclinationPool.Get()
+ if _ret == nil {
+ pieceInclinationsNew.Add(1)
+ return rand.Perm(t.numPieces())
+ }
+ pieceInclinationsReused.Add(1)
+ return _ret.([]int)
+}
+
+func (t *torrent) putPieceInclination(pi []int) {
+ t.connPieceInclinationPool.Put(pi)
+ pieceInclinationsPut.Add(1)
+}