func (cn *connection) Post(msg pp.Message) {
messageTypesPosted.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
cn.postedBuffer.Write(msg.MustMarshalBinary())
- cn.writerCond.Broadcast()
+ cn.tickleWriter()
}
func (cn *connection) requestMetadataPiece(index int) {
for r := range cn.requests {
if _, ok := rs[r]; !ok {
sentCancels = true
- delete(cn.requests, r)
+ cn.deleteRequest(r)
// log.Printf("%p: cancelling request: %v", cn, r)
if !msg(pp.Message{
Type: pp.Cancel,
cn.mu().Lock()
defer cn.mu().Unlock()
if time.Since(lastWrite) >= keepAliveTimeout {
- cn.writerCond.Broadcast()
+ cn.tickleWriter()
}
keepAliveTimer.Reset(keepAliveTimeout)
})
}
func (cn *connection) updateRequests() {
- cn.writerCond.Broadcast()
+ cn.tickleWriter()
}
func (cn *connection) desiredRequestState() (map[request]struct{}, bool) {
})
}
-func (cn *connection) stopRequestingPiece(piece int) {
- if cn.pieceRequestOrder.Remove(piece) {
- cn.writerCond.Broadcast()
- }
+// check callers updaterequests
+func (cn *connection) stopRequestingPiece(piece int) bool {
+ return cn.pieceRequestOrder.Remove(piece)
}
// This is distinct from Torrent piece priority, which is the user's
// preference. Connection piece priority is specific to a connection,
// pseudorandomly avoids connections always requesting the same pieces and
// thus wasting effort.
-func (cn *connection) updatePiecePriority(piece int) {
+func (cn *connection) updatePiecePriority(piece int) bool {
tpp := cn.t.piecePriority(piece)
if !cn.PeerHasPiece(piece) {
tpp = PiecePriorityNone
}
if tpp == PiecePriorityNone {
- cn.stopRequestingPiece(piece)
- return
+ return cn.stopRequestingPiece(piece)
}
prio := cn.getPieceInclination()[piece]
switch tpp {
panic(tpp)
}
prio += piece / 3
- if cn.pieceRequestOrder.Set(piece, prio) {
- cn.updateRequests()
- }
+ return cn.pieceRequestOrder.Set(piece, prio)
}
func (cn *connection) getPieceInclination() []int {
cn.pieceInclination = nil
}
-func (cn *connection) peerHasPieceChanged(piece int) {
- cn.updatePiecePriority(piece)
-}
-
func (cn *connection) peerPiecesChanged() {
if cn.t.haveInfo() {
+ prioritiesChanged := false
for i := range iter.N(cn.t.numPieces()) {
- cn.peerHasPieceChanged(i)
+ if cn.updatePiecePriority(i) {
+ prioritiesChanged = true
+ }
+ }
+ if prioritiesChanged {
+ cn.updateRequests()
}
}
}
}
cn.raisePeerMinPieces(piece + 1)
cn.peerPieces.Set(piece, true)
- cn.peerHasPieceChanged(piece)
+ if cn.updatePiecePriority(piece) {
+ cn.updateRequests()
+ }
return nil
}
// We can then reset our interest.
c.updateRequests()
case pp.Reject:
- c.deleteRequest(newRequest(msg.Index, msg.Begin, msg.Length))
+ if c.deleteRequest(newRequest(msg.Index, msg.Begin, msg.Length)) {
+ c.updateRequests()
+ }
case pp.Unchoke:
c.PeerChoked = false
- c.writerCond.Broadcast()
+ c.tickleWriter()
case pp.Interested:
c.PeerInterested = true
c.upload()
req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
// Request has been satisfied.
- if !c.deleteRequest(req) {
+ if c.deleteRequest(req) {
+ c.updateRequests()
+ } else {
unexpectedChunksReceived.Add(1)
}
return false
}
delete(c.requests, r)
- c.writerCond.Broadcast()
return true
}
+func (c *connection) tickleWriter() {
+ c.writerCond.Broadcast()
+}