client.go | 6 +++--- connection.go | 72 +++++++++++++++++++++++++++++++++++++++-------------- piece.go | 1 + reader.go | 4 ++-- t.go | 4 ++-- torrent.go | 95 +++++++++++++++++++++++++++++++++++++++++++---------- diff --git a/client.go b/client.go index 2daff28265c416fb5cf8360e635c052cfbfe5bfd..2def4f902b94b60b6e75a2690f2f70f2c2b27331 100644 --- a/client.go +++ b/client.go @@ -1184,9 +1184,7 @@ return errors.New("peer got out of range piece index") } c.PeerPieces[piece] = true } - if t.wantPiece(piece) { - c.updateRequests() - } + c.updatePiecePriority(piece) return nil } @@ -1625,6 +1623,7 @@ func (me *Client) dropConnection(t *torrent, c *connection) { me.event.Broadcast() c.Close() + if me.deleteConnection(t, c) { me.openNewConns(t) } @@ -2055,6 +2054,7 @@ return } // TODO: Tidy this up? t = newTorrent(spec.InfoHash) + t.cl = cl if spec.ChunkSize != 0 { t.chunkSize = pp.Integer(spec.ChunkSize) } diff --git a/connection.go b/connection.go index fd28df3c850fdac9b197848a768896c1c072be50..419f6a9fd850312e3f5c24748b9fb6a6a4ba1a55 100644 --- a/connection.go +++ b/connection.go @@ -13,6 +13,9 @@ "net" "time" "github.com/anacrolix/missinggo" + "github.com/anacrolix/missinggo/itertools" + "github.com/anacrolix/missinggo/prioritybitmap" + "github.com/anacrolix/torrent/bencode" pp "github.com/anacrolix/torrent/peer_protocol" ) @@ -75,6 +78,9 @@ PeerMaxRequests int // Maximum pending requests the peer allows. PeerExtensionIDs map[string]byte PeerClientName string + + pieceInclination []int + pieceRequestOrder prioritybitmap.PriorityBitmap } func newConnection() (c *connection) { @@ -234,6 +240,7 @@ } func (c *connection) Close() { c.closed.Set() + c.discardPieceInclination() // TODO: This call blocks sometimes, why? go c.conn.Close() } @@ -550,26 +557,53 @@ } } func (c *connection) fillRequests() { - if !c.t.forUrgentPieces(func(piece int) (again bool) { - return c.t.connRequestPiecePendingChunks(c, piece) - }) { + itertools.ForIterable(&c.pieceRequestOrder, func(_piece interface{}) (more bool) { + return c.requestPiecePendingChunks(_piece.(int)) + }) +} + +func (c *connection) requestPiecePendingChunks(piece int) (again bool) { + return c.t.connRequestPiecePendingChunks(c, piece) +} + +func (c *connection) stopRequestingPiece(piece int) { + c.pieceRequestOrder.Remove(piece) +} + +func (c *connection) updatePiecePriority(piece int) { + if !c.PeerHasPiece(piece) { + return + } + tpp := c.t.piecePriority(piece) + if tpp == PiecePriorityNone { + c.stopRequestingPiece(piece) return } - c.t.forReaderOffsetPieces(func(begin, end int) (again bool) { - for i := begin + 1; i < end; i++ { - if !c.t.connRequestPiecePendingChunks(c, i) { - return false - } - } - return true - }) - for it := c.t.pendingPieces.Iter(); it.Next(); { - i := it.Value() - if !c.t.wantPiece(i) { - continue - } - if !c.t.connRequestPiecePendingChunks(c, i) { - return - } + prio := c.getPieceInclination()[piece] + switch tpp { + case PiecePriorityNormal: + case PiecePriorityReadahead: + prio -= c.t.numPieces() + case PiecePriorityNext, PiecePriorityNow: + prio -= 2 * c.t.numPieces() + default: + panic(tpp) + } + c.pieceRequestOrder.Set(piece, prio) + c.updateRequests() +} + +func (c *connection) getPieceInclination() []int { + if c.pieceInclination == nil { + c.pieceInclination = c.t.getConnPieceInclination() } + return c.pieceInclination +} + +func (c *connection) discardPieceInclination() { + if c.pieceInclination == nil { + return + } + c.t.putPieceInclination(c.pieceInclination) + c.pieceInclination = nil } diff --git a/piece.go b/piece.go index 0643878c2e73fe29f50eee208133337208b8776c..843ddb9e797f12e4643679469d96701697c73f89 100644 --- a/piece.go +++ b/piece.go @@ -31,6 +31,7 @@ Hashing bool QueuedForHash bool EverHashed bool PublicPieceState PieceState + priority piecePriority pendingWritesMutex sync.Mutex pendingWrites int diff --git a/reader.go b/reader.go index acc481cf2d9e1e8fe2449d6d5c49ca4c0c70037e..9d30781de8ebff7ff5979165fd3cd73faa01c5a7 100644 --- a/reader.go +++ b/reader.go @@ -74,7 +74,7 @@ return } func (r *Reader) tickleClient() { - r.t.torrent.readersChanged(r.t.cl) + r.t.torrent.readersChanged() } func (r *Reader) waitReadable(off int64) { @@ -152,7 +152,7 @@ func (r *Reader) posChanged() { r.t.cl.mu.Lock() defer r.t.cl.mu.Unlock() - r.t.torrent.readersChanged(r.t.cl) + r.t.torrent.readersChanged() } func (r *Reader) Seek(off int64, whence int) (ret int64, err error) { diff --git a/t.go b/t.go index 123cb22549df4de156ecee24cab6ba6572368d73..ffb64dd3ac221289a31d512cdffd4e680089a85c 100644 --- a/t.go +++ b/t.go @@ -127,14 +127,14 @@ if t.torrent.readers == nil { t.torrent.readers = make(map[*Reader]struct{}) } t.torrent.readers[r] = struct{}{} - t.torrent.readersChanged(t.cl) + t.torrent.readersChanged() } func (t Torrent) deleteReader(r *Reader) { t.cl.mu.Lock() defer t.cl.mu.Unlock() delete(t.torrent.readers, r) - t.torrent.readersChanged(t.cl) + t.torrent.readersChanged() } func (t Torrent) DownloadPieces(begin, end int) { diff --git a/torrent.go b/torrent.go index 214fc1af4ebd4209610f705460c2f8427119d454..b50db665b08c43fa3daf469ead4cbcfa4f41dcab 100644 --- a/torrent.go +++ b/torrent.go @@ -6,6 +6,7 @@ "expvar" "fmt" "io" "log" + "math/rand" "net" "sort" "sync" @@ -51,6 +52,8 @@ } // Is not aware of Client. Maintains state of torrent for with-in a Client. type torrent struct { + cl *Client + stateMu sync.Mutex closing chan struct{} @@ -99,12 +102,15 @@ gotMetainfo chan struct{} readers map[*Reader]struct{} - pendingPieces *bitmap.Bitmap + pendingPieces bitmap.Bitmap + + connPieceInclinationPool sync.Pool } var ( - piecePrioritiesReused = expvar.NewInt("piecePrioritiesReused") - piecePrioritiesNew = expvar.NewInt("piecePrioritiesNew") + pieceInclinationsReused = expvar.NewInt("pieceInclinationsReused") + pieceInclinationsNew = expvar.NewInt("pieceInclinationsNew") + pieceInclinationsPut = expvar.NewInt("pieceInclinationsPut") ) func (t *torrent) setDisplayName(dn string) { @@ -836,13 +842,43 @@ return true }) } -func (t *torrent) readersChanged(cl *Client) { - // Accept new connections. - cl.event.Broadcast() +func (t *torrent) readersChanged() { + t.updatePiecePriorities() +} + +func (t *torrent) maybeNewConns() { + // Tickle the accept routine. + t.cl.event.Broadcast() + t.openNewConns() +} + +func (t *torrent) piecePriorityChanged(piece int) { + for _, c := range t.Conns { + c.updatePiecePriority(piece) + } + t.maybeNewConns() +} + +func (t *torrent) updatePiecePriority(piece int) bool { + p := &t.Pieces[piece] + newPrio := t.piecePriorityUncached(piece) + if newPrio == p.priority { + return false + } + p.priority = newPrio + return true +} + +func (t *torrent) updatePiecePriorities() { + for i := range t.Pieces { + if t.updatePiecePriority(i) { + t.piecePriorityChanged(i) + } + } for _, c := range t.Conns { c.updateRequests() } - cl.openNewConns(t) + t.maybeNewConns() } func (t *torrent) byteRegionPieces(off, size int64) (begin, end int) { @@ -884,7 +920,14 @@ } return true } -func (t *torrent) piecePriority(piece int) (ret piecePriority) { +func (t *torrent) piecePriority(piece int) piecePriority { + if !t.haveInfo() { + return PiecePriorityNone + } + return t.Pieces[piece].priority +} + +func (t *torrent) piecePriorityUncached(piece int) (ret piecePriority) { ret = PiecePriorityNone if t.pieceComplete(piece) { return @@ -910,9 +953,6 @@ return } func (t *torrent) pendPiece(piece int, cl *Client) { - if t.pendingPieces == nil { - t.pendingPieces = bitmap.New() - } if t.pendingPieces.Contains(piece) { return } @@ -920,14 +960,10 @@ if t.havePiece(piece) { return } t.pendingPieces.Add(piece) - for _, c := range t.Conns { - if !c.PeerHasPiece(piece) { - continue - } - c.updateRequests() + if !t.updatePiecePriority(piece) { + return } - cl.openNewConns(t) - cl.pieceChanged(t, piece) + t.piecePriorityChanged(piece) } func (t *torrent) connRequestPiecePendingChunks(c *connection, piece int) (more bool) { @@ -947,3 +983,26 @@ func (t *torrent) pendRequest(req request) { ci := chunkIndex(req.chunkSpec, t.chunkSize) t.Pieces[req.Index].pendChunkIndex(ci) } + +func (t *torrent) pieceChanged(piece int) { + t.cl.pieceChanged(t, piece) +} + +func (t *torrent) openNewConns() { + t.cl.openNewConns(t) +} + +func (t *torrent) getConnPieceInclination() []int { + _ret := t.connPieceInclinationPool.Get() + if _ret == nil { + pieceInclinationsNew.Add(1) + return rand.Perm(t.numPieces()) + } + pieceInclinationsReused.Add(1) + return _ret.([]int) +} + +func (t *torrent) putPieceInclination(pi []int) { + t.connPieceInclinationPool.Put(pi) + pieceInclinationsPut.Add(1) +}