]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Make connection.writer pull from a buffer
authorMatt Joiner <anacrolix@gmail.com>
Thu, 31 Aug 2017 06:26:45 +0000 (16:26 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Thu, 31 Aug 2017 06:26:45 +0000 (16:26 +1000)
client.go
connection.go
connection_test.go

index d81697b85423c2bd186c7f6177c7f1653ba3f346..6c999a98e2d260c3a24e940f861396ea5253b993 100644 (file)
--- a/client.go
+++ b/client.go
@@ -1386,6 +1386,7 @@ func (cl *Client) newConnection(nc net.Conn) (c *connection) {
                PeerChoked:      true,
                PeerMaxRequests: 250,
        }
+       c.writerCond.L = &cl.mu
        c.setRW(connStatsReadWriter{nc, &cl.mu, c})
        c.r = rateLimitedReader{cl.downloadLimit, c.r}
        return
index 8ec21010d89c4103bcab6b1a70244b57f5a3cac3..11075edf4c0898024f022791e4457511c8d4f73c 100644 (file)
@@ -3,7 +3,6 @@ package torrent
 import (
        "bufio"
        "bytes"
-       "container/list"
        "errors"
        "expvar"
        "fmt"
@@ -99,8 +98,8 @@ type connection struct {
        pieceInclination  []int
        pieceRequestOrder prioritybitmap.PriorityBitmap
 
-       outgoingUnbufferedMessages         *list.List
-       outgoingUnbufferedMessagesNotEmpty missinggo.Event
+       postedBuffer bytes.Buffer
+       writerCond   sync.Cond
 }
 
 func (cn *connection) mu() sync.Locker {
@@ -238,23 +237,9 @@ func (cn *connection) PeerHasPiece(piece int) bool {
 }
 
 func (cn *connection) Post(msg pp.Message) {
-       switch msg.Type {
-       case pp.Cancel:
-               for e := cn.outgoingUnbufferedMessages.Back(); e != nil; e = e.Prev() {
-                       elemMsg := e.Value.(pp.Message)
-                       if elemMsg.Type == pp.Request && elemMsg.Index == msg.Index && elemMsg.Begin == msg.Begin && elemMsg.Length == msg.Length {
-                               cn.outgoingUnbufferedMessages.Remove(e)
-                               optimizedCancels.Add(1)
-                               return
-                       }
-               }
-       }
-       if cn.outgoingUnbufferedMessages == nil {
-               cn.outgoingUnbufferedMessages = list.New()
-       }
-       cn.outgoingUnbufferedMessages.PushBack(msg)
-       cn.outgoingUnbufferedMessagesNotEmpty.Set()
        postedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
+       cn.postedBuffer.Write(msg.MustMarshalBinary())
+       cn.writerCond.Broadcast()
 }
 
 func (cn *connection) RequestPending(r request) bool {
@@ -405,54 +390,49 @@ var (
 
 // Writes buffers to the socket from the write channel.
 func (cn *connection) writer(keepAliveTimeout time.Duration) {
-       defer func() {
+       var (
+               buf       bytes.Buffer
+               lastWrite time.Time = time.Now()
+       )
+       var keepAliveTimer *time.Timer
+       keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() {
                cn.mu().Lock()
                defer cn.mu().Unlock()
-               cn.Close()
-       }()
-       // Reduce write syscalls.
-       buf := bufio.NewWriter(cn.w)
-       keepAliveTimer := time.NewTimer(keepAliveTimeout)
+               if time.Since(lastWrite) >= keepAliveTimeout {
+                       cn.writerCond.Broadcast()
+               }
+               keepAliveTimer.Reset(keepAliveTimeout)
+       })
+       cn.mu().Lock()
+       defer cn.mu().Unlock()
+       defer cn.Close()
+       defer keepAliveTimer.Stop()
        for {
-               cn.mu().Lock()
-               for cn.outgoingUnbufferedMessages != nil && cn.outgoingUnbufferedMessages.Len() != 0 {
-                       msg := cn.outgoingUnbufferedMessages.Remove(cn.outgoingUnbufferedMessages.Front()).(pp.Message)
-                       cn.mu().Unlock()
-                       b, err := msg.MarshalBinary()
-                       if err != nil {
-                               panic(err)
-                       }
-                       connectionWriterWrite.Add(1)
-                       n, err := buf.Write(b)
-                       if err != nil {
-                               return
-                       }
-                       keepAliveTimer.Reset(keepAliveTimeout)
-                       if n != len(b) {
-                               panic("short write")
-                       }
-                       cn.mu().Lock()
-                       cn.wroteMsg(&msg)
+               buf.Write(cn.postedBuffer.Bytes())
+               cn.postedBuffer.Reset()
+               if buf.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout {
+                       buf.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
+                       postedKeepalives.Add(1)
+               }
+               if buf.Len() == 0 {
+                       cn.writerCond.Wait()
+                       continue
                }
-               cn.outgoingUnbufferedMessagesNotEmpty.Clear()
                cn.mu().Unlock()
-               connectionWriterFlush.Add(1)
-               if buf.Buffered() != 0 {
-                       if buf.Flush() != nil {
-                               return
-                       }
+               // log.Printf("writing %d bytes", buf.Len())
+               n, err := cn.w.Write(buf.Bytes())
+               cn.mu().Lock()
+               if n != 0 {
+                       lastWrite = time.Now()
                        keepAliveTimer.Reset(keepAliveTimeout)
                }
-               select {
-               case <-cn.closed.LockedChan(cn.mu()):
+               if err != nil {
                        return
-               case <-cn.outgoingUnbufferedMessagesNotEmpty.LockedChan(cn.mu()):
-               case <-keepAliveTimer.C:
-                       cn.mu().Lock()
-                       cn.Post(pp.Message{Keepalive: true})
-                       cn.mu().Unlock()
-                       postedKeepalives.Add(1)
                }
+               if n != buf.Len() {
+                       panic("short write")
+               }
+               buf.Reset()
        }
 }
 
index 151fb57c00899fb833b10b3f24b60f43a55127ee..9fd4066da60b163fd16cd3358173a601fd69d45a 100644 (file)
@@ -1,7 +1,6 @@
 package torrent
 
 import (
-       "container/list"
        "io"
        "io/ioutil"
        "net"
@@ -73,8 +72,8 @@ func TestSendBitfieldThenHave(t *testing.T) {
                },
                r: r,
                w: w,
-               outgoingUnbufferedMessages: list.New(),
        }
+       c.writerCond.L = &c.t.cl.mu
        go c.writer(time.Minute)
        c.mu().Lock()
        c.Bitfield([]bool{false, true, false})