cn.requestsLowWater = len(cn.requests) / 2
}
if cn.pex.IsEnabled() {
- cn.pex.Share(msg) // gated internally
+ if flow := cn.pex.Share(msg); !flow {
+ return
+ }
}
cn.upload(msg)
}
}
// Share is called from the writer goroutine if when it is woken up with the write buffers empty
-func (s *pexConnState) Share(postfn messageWriter) {
+// Returns whether there's more room on the send buffer to write to.
+func (s *pexConnState) Share(postfn messageWriter) bool {
select {
case <-s.gate:
if tx := s.genmsg(); tx != nil {
s.dbg.Print("sending PEX message: ", tx)
- postfn(tx.Message(s.xid))
+ flow := postfn(tx.Message(s.xid))
s.sched(pexInterval)
+ return flow
} else {
// no PEX to send this time - try again shortly
s.sched(pexRetryDelay)
}
default:
- return
}
+ return true
}
// Recv is called from the reader goroutine