]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Wrap writerCond in tickleWriter and be more selective about using it
authorMatt Joiner <anacrolix@gmail.com>
Fri, 1 Sep 2017 05:26:50 +0000 (15:26 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Fri, 1 Sep 2017 05:26:50 +0000 (15:26 +1000)
connection.go
torrent.go

index bf418416df87adfb1151680af8fafcae50a09e0e..b5b167a241ed4803737b3926b2bdfc49f181217b 100644 (file)
@@ -236,7 +236,7 @@ func (cn *connection) PeerHasPiece(piece int) bool {
 func (cn *connection) Post(msg pp.Message) {
        messageTypesPosted.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
        cn.postedBuffer.Write(msg.MustMarshalBinary())
-       cn.writerCond.Broadcast()
+       cn.tickleWriter()
 }
 
 func (cn *connection) requestMetadataPiece(index int) {
@@ -340,7 +340,7 @@ func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
        for r := range cn.requests {
                if _, ok := rs[r]; !ok {
                        sentCancels = true
-                       delete(cn.requests, r)
+                       cn.deleteRequest(r)
                        // log.Printf("%p: cancelling request: %v", cn, r)
                        if !msg(pp.Message{
                                Type:   pp.Cancel,
@@ -390,7 +390,7 @@ func (cn *connection) writer(keepAliveTimeout time.Duration) {
                cn.mu().Lock()
                defer cn.mu().Unlock()
                if time.Since(lastWrite) >= keepAliveTimeout {
-                       cn.writerCond.Broadcast()
+                       cn.tickleWriter()
                }
                keepAliveTimer.Reset(keepAliveTimeout)
        })
@@ -498,7 +498,7 @@ func nextRequestState(
 }
 
 func (cn *connection) updateRequests() {
-       cn.writerCond.Broadcast()
+       cn.tickleWriter()
 }
 
 func (cn *connection) desiredRequestState() (map[request]struct{}, bool) {
@@ -522,24 +522,22 @@ func undirtiedChunks(piece int, t *Torrent, f func(chunkSpec) bool) bool {
        })
 }
 
-func (cn *connection) stopRequestingPiece(piece int) {
-       if cn.pieceRequestOrder.Remove(piece) {
-               cn.writerCond.Broadcast()
-       }
+// check callers updaterequests
+func (cn *connection) stopRequestingPiece(piece int) bool {
+       return cn.pieceRequestOrder.Remove(piece)
 }
 
 // This is distinct from Torrent piece priority, which is the user's
 // preference. Connection piece priority is specific to a connection,
 // pseudorandomly avoids connections always requesting the same pieces and
 // thus wasting effort.
-func (cn *connection) updatePiecePriority(piece int) {
+func (cn *connection) updatePiecePriority(piece int) bool {
        tpp := cn.t.piecePriority(piece)
        if !cn.PeerHasPiece(piece) {
                tpp = PiecePriorityNone
        }
        if tpp == PiecePriorityNone {
-               cn.stopRequestingPiece(piece)
-               return
+               return cn.stopRequestingPiece(piece)
        }
        prio := cn.getPieceInclination()[piece]
        switch tpp {
@@ -552,9 +550,7 @@ func (cn *connection) updatePiecePriority(piece int) {
                panic(tpp)
        }
        prio += piece / 3
-       if cn.pieceRequestOrder.Set(piece, prio) {
-               cn.updateRequests()
-       }
+       return cn.pieceRequestOrder.Set(piece, prio)
 }
 
 func (cn *connection) getPieceInclination() []int {
@@ -572,14 +568,16 @@ func (cn *connection) discardPieceInclination() {
        cn.pieceInclination = nil
 }
 
-func (cn *connection) peerHasPieceChanged(piece int) {
-       cn.updatePiecePriority(piece)
-}
-
 func (cn *connection) peerPiecesChanged() {
        if cn.t.haveInfo() {
+               prioritiesChanged := false
                for i := range iter.N(cn.t.numPieces()) {
-                       cn.peerHasPieceChanged(i)
+                       if cn.updatePiecePriority(i) {
+                               prioritiesChanged = true
+                       }
+               }
+               if prioritiesChanged {
+                       cn.updateRequests()
                }
        }
 }
@@ -599,7 +597,9 @@ func (cn *connection) peerSentHave(piece int) error {
        }
        cn.raisePeerMinPieces(piece + 1)
        cn.peerPieces.Set(piece, true)
-       cn.peerHasPieceChanged(piece)
+       if cn.updatePiecePriority(piece) {
+               cn.updateRequests()
+       }
        return nil
 }
 
@@ -749,10 +749,12 @@ func (c *connection) mainReadLoop() error {
                        // We can then reset our interest.
                        c.updateRequests()
                case pp.Reject:
-                       c.deleteRequest(newRequest(msg.Index, msg.Begin, msg.Length))
+                       if c.deleteRequest(newRequest(msg.Index, msg.Begin, msg.Length)) {
+                               c.updateRequests()
+                       }
                case pp.Unchoke:
                        c.PeerChoked = false
-                       c.writerCond.Broadcast()
+                       c.tickleWriter()
                case pp.Interested:
                        c.PeerInterested = true
                        c.upload()
@@ -950,7 +952,9 @@ func (c *connection) receiveChunk(msg *pp.Message) {
        req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
 
        // Request has been satisfied.
-       if !c.deleteRequest(req) {
+       if c.deleteRequest(req) {
+               c.updateRequests()
+       } else {
                unexpectedChunksReceived.Add(1)
        }
 
@@ -1092,6 +1096,8 @@ func (c *connection) deleteRequest(r request) bool {
                return false
        }
        delete(c.requests, r)
-       c.writerCond.Broadcast()
        return true
 }
+func (c *connection) tickleWriter() {
+       c.writerCond.Broadcast()
+}
index b741dde32cf15c2d333126dd1be56b8c31441f73..7f04e5f3aae1c62568fef25a2cefc1e4124517f8 100644 (file)
@@ -798,7 +798,9 @@ func (t *Torrent) maybeNewConns() {
 
 func (t *Torrent) piecePriorityChanged(piece int) {
        for c := range t.conns {
-               c.updatePiecePriority(piece)
+               if c.updatePiecePriority(piece) {
+                       c.updateRequests()
+               }
        }
        t.maybeNewConns()
        t.publishPieceChange(piece)
@@ -1424,7 +1426,7 @@ func (t *Torrent) pieceHashed(piece int, correct bool) {
 
 func (t *Torrent) cancelRequestsForPiece(piece int) {
        for cn := range t.conns {
-               cn.writerCond.Broadcast()
+               cn.tickleWriter()
        }
 }