From 986b9093a00d487915da8fdfc36a3a53981abfa9 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 31 Aug 2017 16:26:45 +1000 Subject: [PATCH] Make connection.writer pull from a buffer --- client.go | 1 + connection.go | 96 ++++++++++++++++++---------------------------- connection_test.go | 3 +- 3 files changed, 40 insertions(+), 60 deletions(-) diff --git a/client.go b/client.go index d81697b8..6c999a98 100644 --- a/client.go +++ b/client.go @@ -1386,6 +1386,7 @@ func (cl *Client) newConnection(nc net.Conn) (c *connection) { PeerChoked: true, PeerMaxRequests: 250, } + c.writerCond.L = &cl.mu c.setRW(connStatsReadWriter{nc, &cl.mu, c}) c.r = rateLimitedReader{cl.downloadLimit, c.r} return diff --git a/connection.go b/connection.go index 8ec21010..11075edf 100644 --- a/connection.go +++ b/connection.go @@ -3,7 +3,6 @@ package torrent import ( "bufio" "bytes" - "container/list" "errors" "expvar" "fmt" @@ -99,8 +98,8 @@ type connection struct { pieceInclination []int pieceRequestOrder prioritybitmap.PriorityBitmap - outgoingUnbufferedMessages *list.List - outgoingUnbufferedMessagesNotEmpty missinggo.Event + postedBuffer bytes.Buffer + writerCond sync.Cond } func (cn *connection) mu() sync.Locker { @@ -238,23 +237,9 @@ func (cn *connection) PeerHasPiece(piece int) bool { } func (cn *connection) Post(msg pp.Message) { - switch msg.Type { - case pp.Cancel: - for e := cn.outgoingUnbufferedMessages.Back(); e != nil; e = e.Prev() { - elemMsg := e.Value.(pp.Message) - if elemMsg.Type == pp.Request && elemMsg.Index == msg.Index && elemMsg.Begin == msg.Begin && elemMsg.Length == msg.Length { - cn.outgoingUnbufferedMessages.Remove(e) - optimizedCancels.Add(1) - return - } - } - } - if cn.outgoingUnbufferedMessages == nil { - cn.outgoingUnbufferedMessages = list.New() - } - cn.outgoingUnbufferedMessages.PushBack(msg) - cn.outgoingUnbufferedMessagesNotEmpty.Set() postedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1) + cn.postedBuffer.Write(msg.MustMarshalBinary()) + cn.writerCond.Broadcast() } func (cn *connection) RequestPending(r request) bool { @@ -405,54 +390,49 @@ var ( // Writes buffers to the socket from the write channel. func (cn *connection) writer(keepAliveTimeout time.Duration) { - defer func() { + var ( + buf bytes.Buffer + lastWrite time.Time = time.Now() + ) + var keepAliveTimer *time.Timer + keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() { cn.mu().Lock() defer cn.mu().Unlock() - cn.Close() - }() - // Reduce write syscalls. - buf := bufio.NewWriter(cn.w) - keepAliveTimer := time.NewTimer(keepAliveTimeout) + if time.Since(lastWrite) >= keepAliveTimeout { + cn.writerCond.Broadcast() + } + keepAliveTimer.Reset(keepAliveTimeout) + }) + cn.mu().Lock() + defer cn.mu().Unlock() + defer cn.Close() + defer keepAliveTimer.Stop() for { - cn.mu().Lock() - for cn.outgoingUnbufferedMessages != nil && cn.outgoingUnbufferedMessages.Len() != 0 { - msg := cn.outgoingUnbufferedMessages.Remove(cn.outgoingUnbufferedMessages.Front()).(pp.Message) - cn.mu().Unlock() - b, err := msg.MarshalBinary() - if err != nil { - panic(err) - } - connectionWriterWrite.Add(1) - n, err := buf.Write(b) - if err != nil { - return - } - keepAliveTimer.Reset(keepAliveTimeout) - if n != len(b) { - panic("short write") - } - cn.mu().Lock() - cn.wroteMsg(&msg) + buf.Write(cn.postedBuffer.Bytes()) + cn.postedBuffer.Reset() + if buf.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout { + buf.Write(pp.Message{Keepalive: true}.MustMarshalBinary()) + postedKeepalives.Add(1) + } + if buf.Len() == 0 { + cn.writerCond.Wait() + continue } - cn.outgoingUnbufferedMessagesNotEmpty.Clear() cn.mu().Unlock() - connectionWriterFlush.Add(1) - if buf.Buffered() != 0 { - if buf.Flush() != nil { - return - } + // log.Printf("writing %d bytes", buf.Len()) + n, err := cn.w.Write(buf.Bytes()) + cn.mu().Lock() + if n != 0 { + lastWrite = time.Now() keepAliveTimer.Reset(keepAliveTimeout) } - select { - case <-cn.closed.LockedChan(cn.mu()): + if err != nil { return - case <-cn.outgoingUnbufferedMessagesNotEmpty.LockedChan(cn.mu()): - case <-keepAliveTimer.C: - cn.mu().Lock() - cn.Post(pp.Message{Keepalive: true}) - cn.mu().Unlock() - postedKeepalives.Add(1) } + if n != buf.Len() { + panic("short write") + } + buf.Reset() } } diff --git a/connection_test.go b/connection_test.go index 151fb57c..9fd4066d 100644 --- a/connection_test.go +++ b/connection_test.go @@ -1,7 +1,6 @@ package torrent import ( - "container/list" "io" "io/ioutil" "net" @@ -73,8 +72,8 @@ func TestSendBitfieldThenHave(t *testing.T) { }, r: r, w: w, - outgoingUnbufferedMessages: list.New(), } + c.writerCond.L = &c.t.cl.mu go c.writer(time.Minute) c.mu().Lock() c.Bitfield([]bool{false, true, false}) -- 2.48.1