From: Matt Joiner Date: Mon, 1 Feb 2016 10:11:41 +0000 (+1100) Subject: Reintroduce connection piece inclinations, and begin caching piece priorities X-Git-Tag: v1.0.0~928 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=afa264e6c52bf53079467ce904fb78445a18b12c;p=btrtrc.git Reintroduce connection piece inclinations, and begin caching piece priorities --- diff --git a/client.go b/client.go index 2daff282..2def4f90 100644 --- a/client.go +++ b/client.go @@ -1184,9 +1184,7 @@ func (me *Client) peerGotPiece(t *torrent, c *connection, piece int) error { } c.PeerPieces[piece] = true } - if t.wantPiece(piece) { - c.updateRequests() - } + c.updatePiecePriority(piece) return nil } @@ -1625,6 +1623,7 @@ func (me *Client) deleteConnection(t *torrent, c *connection) bool { func (me *Client) dropConnection(t *torrent, c *connection) { me.event.Broadcast() c.Close() + if me.deleteConnection(t, c) { me.openNewConns(t) } @@ -2055,6 +2054,7 @@ func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (T Torrent, new bool, err er } // TODO: Tidy this up? t = newTorrent(spec.InfoHash) + t.cl = cl if spec.ChunkSize != 0 { t.chunkSize = pp.Integer(spec.ChunkSize) } diff --git a/connection.go b/connection.go index fd28df3c..419f6a9f 100644 --- a/connection.go +++ b/connection.go @@ -13,6 +13,9 @@ import ( "time" "github.com/anacrolix/missinggo" + "github.com/anacrolix/missinggo/itertools" + "github.com/anacrolix/missinggo/prioritybitmap" + "github.com/anacrolix/torrent/bencode" pp "github.com/anacrolix/torrent/peer_protocol" ) @@ -75,6 +78,9 @@ type connection struct { PeerMaxRequests int // Maximum pending requests the peer allows. PeerExtensionIDs map[string]byte PeerClientName string + + pieceInclination []int + pieceRequestOrder prioritybitmap.PriorityBitmap } func newConnection() (c *connection) { @@ -234,6 +240,7 @@ func (cn *connection) WriteStatus(w io.Writer, t *torrent) { func (c *connection) Close() { c.closed.Set() + c.discardPieceInclination() // TODO: This call blocks sometimes, why? go c.conn.Close() } @@ -550,26 +557,53 @@ func (c *connection) updateRequests() { } func (c *connection) fillRequests() { - if !c.t.forUrgentPieces(func(piece int) (again bool) { - return c.t.connRequestPiecePendingChunks(c, piece) - }) { + itertools.ForIterable(&c.pieceRequestOrder, func(_piece interface{}) (more bool) { + return c.requestPiecePendingChunks(_piece.(int)) + }) +} + +func (c *connection) requestPiecePendingChunks(piece int) (again bool) { + return c.t.connRequestPiecePendingChunks(c, piece) +} + +func (c *connection) stopRequestingPiece(piece int) { + c.pieceRequestOrder.Remove(piece) +} + +func (c *connection) updatePiecePriority(piece int) { + if !c.PeerHasPiece(piece) { return } - c.t.forReaderOffsetPieces(func(begin, end int) (again bool) { - for i := begin + 1; i < end; i++ { - if !c.t.connRequestPiecePendingChunks(c, i) { - return false - } - } - return true - }) - for it := c.t.pendingPieces.Iter(); it.Next(); { - i := it.Value() - if !c.t.wantPiece(i) { - continue - } - if !c.t.connRequestPiecePendingChunks(c, i) { - return - } + tpp := c.t.piecePriority(piece) + if tpp == PiecePriorityNone { + c.stopRequestingPiece(piece) + return + } + prio := c.getPieceInclination()[piece] + switch tpp { + case PiecePriorityNormal: + case PiecePriorityReadahead: + prio -= c.t.numPieces() + case PiecePriorityNext, PiecePriorityNow: + prio -= 2 * c.t.numPieces() + default: + panic(tpp) + } + c.pieceRequestOrder.Set(piece, prio) + c.updateRequests() +} + +func (c *connection) getPieceInclination() []int { + if c.pieceInclination == nil { + c.pieceInclination = c.t.getConnPieceInclination() + } + return c.pieceInclination +} + +func (c *connection) discardPieceInclination() { + if c.pieceInclination == nil { + return } + c.t.putPieceInclination(c.pieceInclination) + c.pieceInclination = nil } diff --git a/piece.go b/piece.go index 0643878c..843ddb9e 100644 --- a/piece.go +++ b/piece.go @@ -31,6 +31,7 @@ type piece struct { QueuedForHash bool EverHashed bool PublicPieceState PieceState + priority piecePriority pendingWritesMutex sync.Mutex pendingWrites int diff --git a/reader.go b/reader.go index acc481cf..9d30781d 100644 --- a/reader.go +++ b/reader.go @@ -74,7 +74,7 @@ func (r *Reader) available(off, max int64) (ret int64) { } func (r *Reader) tickleClient() { - r.t.torrent.readersChanged(r.t.cl) + r.t.torrent.readersChanged() } func (r *Reader) waitReadable(off int64) { @@ -152,7 +152,7 @@ func (r *Reader) Close() error { func (r *Reader) posChanged() { r.t.cl.mu.Lock() defer r.t.cl.mu.Unlock() - r.t.torrent.readersChanged(r.t.cl) + r.t.torrent.readersChanged() } func (r *Reader) Seek(off int64, whence int) (ret int64, err error) { diff --git a/t.go b/t.go index 123cb225..ffb64dd3 100644 --- a/t.go +++ b/t.go @@ -127,14 +127,14 @@ func (t Torrent) addReader(r *Reader) { t.torrent.readers = make(map[*Reader]struct{}) } t.torrent.readers[r] = struct{}{} - t.torrent.readersChanged(t.cl) + t.torrent.readersChanged() } func (t Torrent) deleteReader(r *Reader) { t.cl.mu.Lock() defer t.cl.mu.Unlock() delete(t.torrent.readers, r) - t.torrent.readersChanged(t.cl) + t.torrent.readersChanged() } func (t Torrent) DownloadPieces(begin, end int) { diff --git a/torrent.go b/torrent.go index 214fc1af..b50db665 100644 --- a/torrent.go +++ b/torrent.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "log" + "math/rand" "net" "sort" "sync" @@ -51,6 +52,8 @@ type peersKey struct { // Is not aware of Client. Maintains state of torrent for with-in a Client. type torrent struct { + cl *Client + stateMu sync.Mutex closing chan struct{} @@ -99,12 +102,15 @@ type torrent struct { readers map[*Reader]struct{} - pendingPieces *bitmap.Bitmap + pendingPieces bitmap.Bitmap + + connPieceInclinationPool sync.Pool } var ( - piecePrioritiesReused = expvar.NewInt("piecePrioritiesReused") - piecePrioritiesNew = expvar.NewInt("piecePrioritiesNew") + pieceInclinationsReused = expvar.NewInt("pieceInclinationsReused") + pieceInclinationsNew = expvar.NewInt("pieceInclinationsNew") + pieceInclinationsPut = expvar.NewInt("pieceInclinationsPut") ) func (t *torrent) setDisplayName(dn string) { @@ -836,13 +842,43 @@ func (t *torrent) forUrgentPieces(f func(piece int) (again bool)) (all bool) { }) } -func (t *torrent) readersChanged(cl *Client) { - // Accept new connections. - cl.event.Broadcast() +func (t *torrent) readersChanged() { + t.updatePiecePriorities() +} + +func (t *torrent) maybeNewConns() { + // Tickle the accept routine. + t.cl.event.Broadcast() + t.openNewConns() +} + +func (t *torrent) piecePriorityChanged(piece int) { + for _, c := range t.Conns { + c.updatePiecePriority(piece) + } + t.maybeNewConns() +} + +func (t *torrent) updatePiecePriority(piece int) bool { + p := &t.Pieces[piece] + newPrio := t.piecePriorityUncached(piece) + if newPrio == p.priority { + return false + } + p.priority = newPrio + return true +} + +func (t *torrent) updatePiecePriorities() { + for i := range t.Pieces { + if t.updatePiecePriority(i) { + t.piecePriorityChanged(i) + } + } for _, c := range t.Conns { c.updateRequests() } - cl.openNewConns(t) + t.maybeNewConns() } func (t *torrent) byteRegionPieces(off, size int64) (begin, end int) { @@ -884,7 +920,14 @@ func (t *torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all return true } -func (t *torrent) piecePriority(piece int) (ret piecePriority) { +func (t *torrent) piecePriority(piece int) piecePriority { + if !t.haveInfo() { + return PiecePriorityNone + } + return t.Pieces[piece].priority +} + +func (t *torrent) piecePriorityUncached(piece int) (ret piecePriority) { ret = PiecePriorityNone if t.pieceComplete(piece) { return @@ -910,9 +953,6 @@ func (t *torrent) piecePriority(piece int) (ret piecePriority) { } func (t *torrent) pendPiece(piece int, cl *Client) { - if t.pendingPieces == nil { - t.pendingPieces = bitmap.New() - } if t.pendingPieces.Contains(piece) { return } @@ -920,14 +960,10 @@ func (t *torrent) pendPiece(piece int, cl *Client) { return } t.pendingPieces.Add(piece) - for _, c := range t.Conns { - if !c.PeerHasPiece(piece) { - continue - } - c.updateRequests() + if !t.updatePiecePriority(piece) { + return } - cl.openNewConns(t) - cl.pieceChanged(t, piece) + t.piecePriorityChanged(piece) } func (t *torrent) connRequestPiecePendingChunks(c *connection, piece int) (more bool) { @@ -947,3 +983,26 @@ func (t *torrent) pendRequest(req request) { ci := chunkIndex(req.chunkSpec, t.chunkSize) t.Pieces[req.Index].pendChunkIndex(ci) } + +func (t *torrent) pieceChanged(piece int) { + t.cl.pieceChanged(t, piece) +} + +func (t *torrent) openNewConns() { + t.cl.openNewConns(t) +} + +func (t *torrent) getConnPieceInclination() []int { + _ret := t.connPieceInclinationPool.Get() + if _ret == nil { + pieceInclinationsNew.Add(1) + return rand.Perm(t.numPieces()) + } + pieceInclinationsReused.Add(1) + return _ret.([]int) +} + +func (t *torrent) putPieceInclination(pi []int) { + t.connPieceInclinationPool.Put(pi) + pieceInclinationsPut.Add(1) +}