]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Error in connection.writer goroutine wasn't killing the connection
authorMatt Joiner <anacrolix@gmail.com>
Thu, 17 Jul 2014 16:37:33 +0000 (02:37 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Thu, 17 Jul 2014 16:37:33 +0000 (02:37 +1000)
client.go
connection.go
connection_test.go

index 8514483fdb976a8e619e06d639ff223016d6a59d..8a7fe33dac480f4ebe041438591e954868d0b6cb 100644 (file)
--- a/client.go
+++ b/client.go
@@ -326,9 +326,10 @@ func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerS
                Socket:          sock,
                Choked:          true,
                PeerChoked:      true,
-               write:           make(chan []byte),
+               writeCh:         make(chan []byte),
                PeerMaxRequests: 250, // Default in libtorrent is 250.
        }
+       go conn.writer()
        defer func() {
                // There's a lock and deferred unlock later in this function. The
                // client will not be locked when this deferred is invoked.
@@ -336,13 +337,12 @@ func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerS
                defer me.mu.Unlock()
                conn.Close()
        }()
-       go conn.writer()
        // go conn.writeOptimizer()
-       conn.write <- pp.Bytes(pp.Protocol)
-       conn.write <- pp.Bytes("\x00\x00\x00\x00\x00\x10\x00\x00")
+       conn.write(pp.Bytes(pp.Protocol))
+       conn.write(pp.Bytes("\x00\x00\x00\x00\x00\x10\x00\x00"))
        if torrent != nil {
-               conn.write <- pp.Bytes(torrent.InfoHash[:])
-               conn.write <- pp.Bytes(me.PeerId[:])
+               conn.write(pp.Bytes(torrent.InfoHash[:]))
+               conn.write(pp.Bytes(me.PeerId[:]))
        }
        var b [28]byte
        _, err = io.ReadFull(conn.Socket, b[:])
@@ -375,8 +375,8 @@ func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerS
                if torrent == nil {
                        return
                }
-               conn.write <- pp.Bytes(torrent.InfoHash[:])
-               conn.write <- pp.Bytes(me.PeerId[:])
+               conn.write(pp.Bytes(torrent.InfoHash[:]))
+               conn.write(pp.Bytes(me.PeerId[:]))
        }
        me.mu.Lock()
        defer me.mu.Unlock()
index 2b47191024551685ae26359c1a80649e2f6016d1..f966774fbd60d1bbe9cc6d044d6a728ab6a3e12b 100644 (file)
@@ -29,7 +29,7 @@ type connection struct {
        closed    bool
        mu        sync.Mutex // Only for closing.
        post      chan peer_protocol.Message
-       write     chan []byte
+       writeCh   chan []byte
 
        // Stuff controlled by the local peer.
        Interested bool
@@ -50,6 +50,12 @@ type connection struct {
        PeerClientName   string
 }
 
+func (cn *connection) write(b []byte) {
+       cn.mu.Lock()
+       cn.writeCh <- b
+       cn.mu.Unlock()
+}
+
 func (cn *connection) completedString() string {
        if cn.PeerPieces == nil {
                return "?"
@@ -140,7 +146,7 @@ func (c *connection) Close() {
        if c.post == nil {
                // writeOptimizer isn't running, so we need to signal the writer to
                // stop.
-               close(c.write)
+               close(c.writeCh)
        } else {
                // This will kill the writeOptimizer, and it kills the writer.
                close(c.post)
@@ -274,7 +280,7 @@ var (
 
 // Writes buffers to the socket from the write channel.
 func (conn *connection) writer() {
-       for b := range conn.write {
+       for b := range conn.writeCh {
                _, err := conn.Socket.Write(b)
                // log.Printf("wrote %q to %s", b, conn.Socket.RemoteAddr())
                if err != nil {
@@ -284,17 +290,18 @@ func (conn *connection) writer() {
                        break
                }
        }
+       conn.Close()
 }
 
 func (conn *connection) writeOptimizer(keepAliveDelay time.Duration) {
-       defer close(conn.write) // Responsible for notifying downstream routines.
-       pending := list.New()   // Message queue.
-       var nextWrite []byte    // Set to nil if we need to need to marshal the next message.
+       defer close(conn.writeCh) // Responsible for notifying downstream routines.
+       pending := list.New()     // Message queue.
+       var nextWrite []byte      // Set to nil if we need to need to marshal the next message.
        timer := time.NewTimer(keepAliveDelay)
        defer timer.Stop()
        lastWrite := time.Now()
        for {
-               write := conn.write // Set to nil if there's nothing to write.
+               write := conn.writeCh // Set to nil if there's nothing to write.
                if pending.Len() == 0 {
                        write = nil
                } else if nextWrite == nil {
index 280afe9f2ceead4e0a4b351434d8c3e5645ff003..0b7f8e28940fcedf0b7654567aa2013a027ca2cd 100644 (file)
@@ -11,7 +11,7 @@ func TestCancelRequestOptimized(t *testing.T) {
                PeerMaxRequests: 1,
                PeerPieces:      []bool{false, true},
                post:            make(chan peer_protocol.Message),
-               write:           make(chan []byte),
+               writeCh:         make(chan []byte),
        }
        if len(c.Requests) != 0 {
                t.FailNow()
@@ -33,14 +33,14 @@ func TestCancelRequestOptimized(t *testing.T) {
                // Let a keep-alive through to verify there were no pending messages.
                "\x00\x00\x00\x00",
        } {
-               bb := string(<-c.write)
+               bb := string(<-c.writeCh)
                if b != bb {
                        t.Fatalf("received message %q is not expected: %q", bb, b)
                }
        }
        close(c.post)
        // Drain the write channel until it closes.
-       for b := range c.write {
+       for b := range c.writeCh {
                bs := string(b)
                if bs != "\x00\x00\x00\x00" {
                        t.Fatal("got unexpected non-keepalive")