// activity elsewhere in the Client, and some is determined locally when the
// connection is writable.
func (cn *peerConnMsgWriter) run(keepAliveTimeout time.Duration) {
- var (
- lastWrite time.Time = time.Now()
- keepAliveTimer *time.Timer
- )
- cn.mu.Lock()
- defer cn.mu.Unlock()
- keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() {
- cn.mu.Lock()
- defer cn.mu.Unlock()
- if time.Since(lastWrite) >= keepAliveTimeout {
- cn.writeCond.Broadcast()
- }
- keepAliveTimer.Reset(keepAliveTimeout)
- })
- defer keepAliveTimer.Stop()
+ lastWrite := time.Now()
frontBuf := new(bytes.Buffer)
for {
if cn.closed.IsSet() {
return
}
- keepAlive := false
- if cn.writeBuffer.Len() == 0 {
- func() {
- cn.mu.Unlock()
- defer cn.mu.Lock()
- cn.fillWriteBuffer()
- keepAlive = cn.keepAlive()
- }()
- }
+ cn.fillWriteBuffer()
+ keepAlive := cn.keepAlive()
+ cn.mu.Lock()
if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout && keepAlive {
cn.writeBuffer.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
torrent.Add("written keepalives", 1)
select {
case <-cn.closed.Done():
case <-writeCond:
+ case <-time.After(time.Until(lastWrite.Add(keepAliveTimeout))):
}
- cn.mu.Lock()
continue
}
// Flip the buffers.
frontBuf, cn.writeBuffer = cn.writeBuffer, frontBuf
cn.mu.Unlock()
n, err := cn.w.Write(frontBuf.Bytes())
- cn.mu.Lock()
if n != 0 {
lastWrite = time.Now()
- keepAliveTimer.Reset(keepAliveTimeout)
}
if err != nil {
cn.logger.WithDefaultLevel(log.Debug).Printf("error writing: %v", err)