From: Matt Joiner Date: Fri, 1 Sep 2017 05:26:50 +0000 (+1000) Subject: Wrap writerCond in tickleWriter and be more selective about using it X-Git-Tag: v1.0.0~410 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=bad6f07f5e66238b99d3851b4584ad83feb5d95b;p=btrtrc.git Wrap writerCond in tickleWriter and be more selective about using it --- diff --git a/connection.go b/connection.go index bf418416..b5b167a2 100644 --- a/connection.go +++ b/connection.go @@ -236,7 +236,7 @@ func (cn *connection) PeerHasPiece(piece int) bool { func (cn *connection) Post(msg pp.Message) { messageTypesPosted.Add(strconv.FormatInt(int64(msg.Type), 10), 1) cn.postedBuffer.Write(msg.MustMarshalBinary()) - cn.writerCond.Broadcast() + cn.tickleWriter() } func (cn *connection) requestMetadataPiece(index int) { @@ -340,7 +340,7 @@ func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) { for r := range cn.requests { if _, ok := rs[r]; !ok { sentCancels = true - delete(cn.requests, r) + cn.deleteRequest(r) // log.Printf("%p: cancelling request: %v", cn, r) if !msg(pp.Message{ Type: pp.Cancel, @@ -390,7 +390,7 @@ func (cn *connection) writer(keepAliveTimeout time.Duration) { cn.mu().Lock() defer cn.mu().Unlock() if time.Since(lastWrite) >= keepAliveTimeout { - cn.writerCond.Broadcast() + cn.tickleWriter() } keepAliveTimer.Reset(keepAliveTimeout) }) @@ -498,7 +498,7 @@ func nextRequestState( } func (cn *connection) updateRequests() { - cn.writerCond.Broadcast() + cn.tickleWriter() } func (cn *connection) desiredRequestState() (map[request]struct{}, bool) { @@ -522,24 +522,22 @@ func undirtiedChunks(piece int, t *Torrent, f func(chunkSpec) bool) bool { }) } -func (cn *connection) stopRequestingPiece(piece int) { - if cn.pieceRequestOrder.Remove(piece) { - cn.writerCond.Broadcast() - } +// check callers updaterequests +func (cn *connection) stopRequestingPiece(piece int) bool { + return cn.pieceRequestOrder.Remove(piece) } // This is distinct from Torrent piece priority, which is the user's // preference. Connection piece priority is specific to a connection, // pseudorandomly avoids connections always requesting the same pieces and // thus wasting effort. -func (cn *connection) updatePiecePriority(piece int) { +func (cn *connection) updatePiecePriority(piece int) bool { tpp := cn.t.piecePriority(piece) if !cn.PeerHasPiece(piece) { tpp = PiecePriorityNone } if tpp == PiecePriorityNone { - cn.stopRequestingPiece(piece) - return + return cn.stopRequestingPiece(piece) } prio := cn.getPieceInclination()[piece] switch tpp { @@ -552,9 +550,7 @@ func (cn *connection) updatePiecePriority(piece int) { panic(tpp) } prio += piece / 3 - if cn.pieceRequestOrder.Set(piece, prio) { - cn.updateRequests() - } + return cn.pieceRequestOrder.Set(piece, prio) } func (cn *connection) getPieceInclination() []int { @@ -572,14 +568,16 @@ func (cn *connection) discardPieceInclination() { cn.pieceInclination = nil } -func (cn *connection) peerHasPieceChanged(piece int) { - cn.updatePiecePriority(piece) -} - func (cn *connection) peerPiecesChanged() { if cn.t.haveInfo() { + prioritiesChanged := false for i := range iter.N(cn.t.numPieces()) { - cn.peerHasPieceChanged(i) + if cn.updatePiecePriority(i) { + prioritiesChanged = true + } + } + if prioritiesChanged { + cn.updateRequests() } } } @@ -599,7 +597,9 @@ func (cn *connection) peerSentHave(piece int) error { } cn.raisePeerMinPieces(piece + 1) cn.peerPieces.Set(piece, true) - cn.peerHasPieceChanged(piece) + if cn.updatePiecePriority(piece) { + cn.updateRequests() + } return nil } @@ -749,10 +749,12 @@ func (c *connection) mainReadLoop() error { // We can then reset our interest. c.updateRequests() case pp.Reject: - c.deleteRequest(newRequest(msg.Index, msg.Begin, msg.Length)) + if c.deleteRequest(newRequest(msg.Index, msg.Begin, msg.Length)) { + c.updateRequests() + } case pp.Unchoke: c.PeerChoked = false - c.writerCond.Broadcast() + c.tickleWriter() case pp.Interested: c.PeerInterested = true c.upload() @@ -950,7 +952,9 @@ func (c *connection) receiveChunk(msg *pp.Message) { req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece))) // Request has been satisfied. - if !c.deleteRequest(req) { + if c.deleteRequest(req) { + c.updateRequests() + } else { unexpectedChunksReceived.Add(1) } @@ -1092,6 +1096,8 @@ func (c *connection) deleteRequest(r request) bool { return false } delete(c.requests, r) - c.writerCond.Broadcast() return true } +func (c *connection) tickleWriter() { + c.writerCond.Broadcast() +} diff --git a/torrent.go b/torrent.go index b741dde3..7f04e5f3 100644 --- a/torrent.go +++ b/torrent.go @@ -798,7 +798,9 @@ func (t *Torrent) maybeNewConns() { func (t *Torrent) piecePriorityChanged(piece int) { for c := range t.conns { - c.updatePiecePriority(piece) + if c.updatePiecePriority(piece) { + c.updateRequests() + } } t.maybeNewConns() t.publishPieceChange(piece) @@ -1424,7 +1426,7 @@ func (t *Torrent) pieceHashed(piece int, correct bool) { func (t *Torrent) cancelRequestsForPiece(piece int) { for cn := range t.conns { - cn.writerCond.Broadcast() + cn.tickleWriter() } }