From: Matt Joiner Date: Sat, 3 Feb 2018 03:40:03 +0000 (+1100) Subject: Share the post write buffer with the synchronous one in the connection writer X-Git-Tag: v1.0.0~213 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=86aabb081c6f0e46ef188349a82d82aa9415190b;p=btrtrc.git Share the post write buffer with the synchronous one in the connection writer This should prevent messages posted while generating synchronous messages don't get out of order. --- diff --git a/connection.go b/connection.go index 9e025497..73b1f4ce 100644 --- a/connection.go +++ b/connection.go @@ -93,9 +93,9 @@ type connection struct { pieceInclination []int pieceRequestOrder prioritybitmap.PriorityBitmap - postedBuffer bytes.Buffer - uploadTimer *time.Timer - writerCond sync.Cond + writeBuffer bytes.Buffer + uploadTimer *time.Timer + writerCond sync.Cond } func (cn *connection) peerHasAllPieces() (all bool, known bool) { @@ -245,7 +245,7 @@ func (cn *connection) PeerHasPiece(piece int) bool { func (cn *connection) Post(msg pp.Message) { messageTypesPosted.Add(strconv.FormatInt(int64(msg.Type), 10), 1) - cn.postedBuffer.Write(msg.MustMarshalBinary()) + cn.writeBuffer.Write(msg.MustMarshalBinary()) cn.tickleWriter() } @@ -404,7 +404,7 @@ func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) { // connection is writable. func (cn *connection) writer(keepAliveTimeout time.Duration) { var ( - buf bytes.Buffer + // buf bytes.Buffer lastWrite time.Time = time.Now() ) var keepAliveTimer *time.Timer @@ -424,24 +424,25 @@ func (cn *connection) writer(keepAliveTimeout time.Duration) { if cn.closed.IsSet() { return } - buf.Write(cn.postedBuffer.Bytes()) - cn.postedBuffer.Reset() - if buf.Len() == 0 { + if cn.writeBuffer.Len() == 0 { cn.fillWriteBuffer(func(msg pp.Message) bool { cn.wroteMsg(&msg) - buf.Write(msg.MustMarshalBinary()) - return buf.Len() < 1<<16 + cn.writeBuffer.Write(msg.MustMarshalBinary()) + return cn.writeBuffer.Len() < 1<<16 }) } - if buf.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout { - buf.Write(pp.Message{Keepalive: true}.MustMarshalBinary()) + if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout { + cn.writeBuffer.Write(pp.Message{Keepalive: true}.MustMarshalBinary()) postedKeepalives.Add(1) } - if buf.Len() == 0 { + if cn.writeBuffer.Len() == 0 { // TODO: Minimize wakeups.... cn.writerCond.Wait() continue } + var buf bytes.Buffer + buf.Write(cn.writeBuffer.Bytes()) + cn.writeBuffer.Reset() cn.mu().Unlock() // log.Printf("writing %d bytes", buf.Len()) n, err := cn.w.Write(buf.Bytes())