]> Sergey Matveev's repositories - btrtrc.git/commitdiff
missinggo.Event changed, connection.writeOptimizer changes
authorMatt Joiner <anacrolix@gmail.com>
Sat, 7 May 2016 08:56:44 +0000 (18:56 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Sat, 7 May 2016 08:56:44 +0000 (18:56 +1000)
client.go
client_test.go
connection.go
connection_test.go
dht/server.go

index 20371325516e0105f1067ad2f8558e3d19de21e7..33f5b29333ff8e47ad44a6cb71f72e801be2b98b 100644 (file)
--- a/client.go
+++ b/client.go
@@ -881,8 +881,7 @@ func (cl *Client) runHandshookConn(c *connection, t *Torrent) (err error) {
                return
        }
        defer cl.dropConnection(t, c)
-       go c.writer()
-       go c.writeOptimizer(time.Minute)
+       go c.writer(time.Minute)
        cl.sendInitialMessages(c, t)
        err = cl.connectionLoop(t, c)
        if err != nil {
index f85f42c9cce398877c8fc39c78eeb484bde64579..94ea00da07d8b380b9d71f62c30789189b4db2a0 100644 (file)
@@ -93,7 +93,9 @@ func TestTorrentInitialState(t *testing.T) {
        require.NoError(t, err)
        require.Len(t, tor.pieces, 3)
        tor.pendAllChunkSpecs(0)
+       tor.cl.mu.Lock()
        assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
+       tor.cl.mu.Unlock()
        assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
 }
 
index 8d3c45ad5d2be57603ea141536c3d15f12a46ca3..e7b6ffe72eb34c2ce04453f582f5658aa7ef67c2 100644 (file)
@@ -4,13 +4,13 @@ import (
        "bufio"
        "bytes"
        "container/list"
-       "encoding"
        "errors"
        "expvar"
        "fmt"
        "io"
        "net"
        "strconv"
+       "sync"
        "time"
 
        "github.com/anacrolix/missinggo"
@@ -42,8 +42,6 @@ type connection struct {
        Discovery peerSource
        uTP       bool
        closed    missinggo.Event
-       post      chan pp.Message
-       writeCh   chan []byte
 
        UnwantedChunksReceived int
        UsefulChunksReceived   int
@@ -88,6 +86,13 @@ type connection struct {
 
        pieceInclination  []int
        pieceRequestOrder prioritybitmap.PriorityBitmap
+
+       outgoingUnbufferedMessages         *list.List
+       outgoingUnbufferedMessagesNotEmpty missinggo.Event
+}
+
+func (cn *connection) mu() sync.Locker {
+       return &cn.t.cl.mu
 }
 
 func newConnection() (c *connection) {
@@ -95,9 +100,6 @@ func newConnection() (c *connection) {
                Choked:          true,
                PeerChoked:      true,
                PeerMaxRequests: 250,
-
-               writeCh: make(chan []byte),
-               post:    make(chan pp.Message),
        }
        return
 }
@@ -221,11 +223,23 @@ func (cn *connection) PeerHasPiece(piece int) bool {
 }
 
 func (cn *connection) Post(msg pp.Message) {
-       select {
-       case cn.post <- msg:
-               postedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
-       case <-cn.closed.C():
+       switch msg.Type {
+       case pp.Cancel:
+               for e := cn.outgoingUnbufferedMessages.Back(); e != nil; e = e.Prev() {
+                       elemMsg := e.Value.(pp.Message)
+                       if elemMsg.Type == pp.Request && elemMsg.Index == msg.Index && elemMsg.Begin == msg.Begin && elemMsg.Length == msg.Length {
+                               cn.outgoingUnbufferedMessages.Remove(e)
+                               optimizedCancels.Add(1)
+                               return
+                       }
+               }
+       }
+       if cn.outgoingUnbufferedMessages == nil {
+               cn.outgoingUnbufferedMessages = list.New()
        }
+       cn.outgoingUnbufferedMessages.PushBack(msg)
+       cn.outgoingUnbufferedMessagesNotEmpty.Set()
+       postedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
 }
 
 func (cn *connection) RequestPending(r request) bool {
@@ -305,10 +319,7 @@ func (cn *connection) Request(chunk request) bool {
 
 // Returns true if an unsatisfied request was canceled.
 func (cn *connection) Cancel(r request) bool {
-       if cn.Requests == nil {
-               return false
-       }
-       if _, ok := cn.Requests[r]; !ok {
+       if !cn.RequestPending(r) {
                return false
        }
        delete(cn.Requests, r)
@@ -378,111 +389,53 @@ var (
 )
 
 // Writes buffers to the socket from the write channel.
-func (cn *connection) writer() {
+func (cn *connection) writer(keepAliveTimeout time.Duration) {
        defer func() {
-               cn.t.cl.mu.Lock()
-               defer cn.t.cl.mu.Unlock()
+               cn.mu().Lock()
+               defer cn.mu().Unlock()
                cn.Close()
        }()
        // Reduce write syscalls.
        buf := bufio.NewWriter(cn.rw)
+       keepAliveTimer := time.NewTimer(keepAliveTimeout)
        for {
-               if buf.Buffered() == 0 {
-                       // There's nothing to write, so block until we get something.
-                       select {
-                       case b, ok := <-cn.writeCh:
-                               if !ok {
-                                       return
-                               }
-                               connectionWriterWrite.Add(1)
-                               _, err := buf.Write(b)
-                               if err != nil {
-                                       return
-                               }
-                       case <-cn.closed.C():
-                               return
-                       }
-               } else {
-                       // We already have something to write, so flush if there's nothing
-                       // more to write.
-                       select {
-                       case b, ok := <-cn.writeCh:
-                               if !ok {
-                                       return
-                               }
-                               connectionWriterWrite.Add(1)
-                               _, err := buf.Write(b)
-                               if err != nil {
-                                       return
-                               }
-                       case <-cn.closed.C():
-                               return
-                       default:
-                               connectionWriterFlush.Add(1)
-                               err := buf.Flush()
-                               if err != nil {
-                                       return
-                               }
-                       }
-               }
-       }
-}
-
-func (cn *connection) writeOptimizer(keepAliveDelay time.Duration) {
-       defer close(cn.writeCh) // Responsible for notifying downstream routines.
-       pending := list.New()   // Message queue.
-       var nextWrite []byte    // Set to nil if we need to need to marshal the next message.
-       timer := time.NewTimer(keepAliveDelay)
-       defer timer.Stop()
-       lastWrite := time.Now()
-       for {
-               write := cn.writeCh // Set to nil if there's nothing to write.
-               if pending.Len() == 0 {
-                       write = nil
-               } else if nextWrite == nil {
-                       var err error
-                       nextWrite, err = pending.Front().Value.(encoding.BinaryMarshaler).MarshalBinary()
+               cn.mu().Lock()
+               for cn.outgoingUnbufferedMessages.Len() != 0 {
+                       msg := cn.outgoingUnbufferedMessages.Remove(cn.outgoingUnbufferedMessages.Front()).(pp.Message)
+                       cn.mu().Unlock()
+                       b, err := msg.MarshalBinary()
                        if err != nil {
                                panic(err)
                        }
-               }
-       event:
-               select {
-               case <-timer.C:
-                       if pending.Len() != 0 {
-                               break
-                       }
-                       keepAliveTime := lastWrite.Add(keepAliveDelay)
-                       if time.Now().Before(keepAliveTime) {
-                               timer.Reset(keepAliveTime.Sub(time.Now()))
-                               break
-                       }
-                       pending.PushBack(pp.Message{Keepalive: true})
-                       postedKeepalives.Add(1)
-               case msg, ok := <-cn.post:
-                       if !ok {
+                       connectionWriterWrite.Add(1)
+                       n, err := buf.Write(b)
+                       if err != nil {
                                return
                        }
-                       if msg.Type == pp.Cancel {
-                               for e := pending.Back(); e != nil; e = e.Prev() {
-                                       elemMsg := e.Value.(pp.Message)
-                                       if elemMsg.Type == pp.Request && msg.Index == elemMsg.Index && msg.Begin == elemMsg.Begin && msg.Length == elemMsg.Length {
-                                               pending.Remove(e)
-                                               optimizedCancels.Add(1)
-                                               break event
-                                       }
-                               }
+                       keepAliveTimer.Reset(keepAliveTimeout)
+                       if n != len(b) {
+                               panic("short write")
                        }
-                       pending.PushBack(msg)
-               case write <- nextWrite:
-                       pending.Remove(pending.Front())
-                       nextWrite = nil
-                       lastWrite = time.Now()
-                       if pending.Len() == 0 {
-                               timer.Reset(keepAliveDelay)
+                       cn.mu().Lock()
+               }
+               cn.outgoingUnbufferedMessagesNotEmpty.Clear()
+               cn.mu().Unlock()
+               connectionWriterFlush.Add(1)
+               if buf.Buffered() != 0 {
+                       if buf.Flush() != nil {
+                               return
                        }
-               case <-cn.closed.C():
+                       keepAliveTimer.Reset(keepAliveTimeout)
+               }
+               select {
+               case <-cn.closed.LockedChan(cn.mu()):
                        return
+               case <-cn.outgoingUnbufferedMessagesNotEmpty.LockedChan(cn.mu()):
+               case <-keepAliveTimer.C:
+                       cn.mu().Lock()
+                       cn.Post(pp.Message{Keepalive: true})
+                       cn.mu().Unlock()
+                       postedKeepalives.Add(1)
                }
        }
 }
index dc225ad5936b722a84e3bf91019a46648282a3c9..29d032accc57dde33801baa6fb707465fc599a82 100644 (file)
@@ -1,16 +1,19 @@
 package torrent
 
 import (
+       "io"
+       "io/ioutil"
+       "net"
        "testing"
        "time"
 
        "github.com/anacrolix/missinggo/bitmap"
        "github.com/stretchr/testify/assert"
-
-       "github.com/anacrolix/torrent/peer_protocol"
+       "github.com/stretchr/testify/require"
 )
 
 func TestCancelRequestOptimized(t *testing.T) {
+       r, w := io.Pipe()
        c := &connection{
                PeerMaxRequests: 1,
                peerPieces: func() bitmap.Bitmap {
@@ -18,38 +21,39 @@ func TestCancelRequestOptimized(t *testing.T) {
                        bm.Set(1, true)
                        return bm
                }(),
-               post:    make(chan peer_protocol.Message),
-               writeCh: make(chan []byte),
+               rw: struct {
+                       io.Reader
+                       io.Writer
+               }{
+                       Writer: w,
+               },
+               conn: new(net.TCPConn),
+               // For the locks
+               t: &Torrent{cl: &Client{}},
        }
        assert.Len(t, c.Requests, 0)
-       // Keepalive timeout of 0 works because I'm just that good.
-       go c.writeOptimizer(0 * time.Millisecond)
        c.Request(newRequest(1, 2, 3))
-       if len(c.Requests) != 1 {
-               t.Fatal("request was not posted")
-       }
+       require.Len(t, c.Requests, 1)
        // Posting this message should removing the pending Request.
-       if !c.Cancel(newRequest(1, 2, 3)) {
-               t.Fatal("request was not found")
-       }
-       // Check that the write optimization has filtered out the Request message.
-       for _, b := range []string{
-               // The initial request triggers an Interested message.
-               "\x00\x00\x00\x01\x02",
-               // Let a keep-alive through to verify there were no pending messages.
-               "\x00\x00\x00\x00",
-       } {
-               bb := string(<-c.writeCh)
-               if b != bb {
-                       t.Fatalf("received message %q is not expected: %q", bb, b)
-               }
-       }
-       close(c.post)
-       // Drain the write channel until it closes.
-       for b := range c.writeCh {
-               bs := string(b)
-               if bs != "\x00\x00\x00\x00" {
-                       t.Fatal("got unexpected non-keepalive")
-               }
-       }
+       require.True(t, c.Cancel(newRequest(1, 2, 3)))
+       assert.Len(t, c.Requests, 0)
+       // Check that write optimization filters out the Request, due to the
+       // Cancel. We should have received an Interested, due to the initial
+       // request, and then keep-alives until we close the connection.
+       go c.writer(0)
+       b := make([]byte, 9)
+       n, err := io.ReadFull(r, b)
+       require.NoError(t, err)
+       require.EqualValues(t, len(b), n)
+       require.EqualValues(t, "\x00\x00\x00\x01\x02"+"\x00\x00\x00\x00", string(b))
+       time.Sleep(time.Millisecond)
+       c.mu().Lock()
+       c.Close()
+       c.mu().Unlock()
+       w.Close()
+       b, err = ioutil.ReadAll(r)
+       require.NoError(t, err)
+       // A single keep-alive will have gone through, as writer would be stuck
+       // trying to flush it, and then promptly close.
+       require.EqualValues(t, "\x00\x00\x00\x00", string(b))
 }
index 411875377b07fb1cb3ba914aa83c4375b39aa0e7..630fc421f6f883e78c805f42862f26de610cab41 100644 (file)
@@ -585,7 +585,7 @@ func (s *Server) bootstrap() (err error) {
                }()
                s.mu.Unlock()
                select {
-               case <-s.closed.C():
+               case <-s.closed.LockedChan(&s.mu):
                        s.mu.Lock()
                        return
                case <-time.After(15 * time.Second):