From 7f6987b2a46c7b4a5ab791901ce6b883f963041a Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Fri, 18 Jul 2014 02:37:33 +1000 Subject: [PATCH] Error in connection.writer goroutine wasn't killing the connection --- client.go | 16 ++++++++-------- connection.go | 21 ++++++++++++++------- connection_test.go | 6 +++--- 3 files changed, 25 insertions(+), 18 deletions(-) diff --git a/client.go b/client.go index 8514483f..8a7fe33d 100644 --- a/client.go +++ b/client.go @@ -326,9 +326,10 @@ func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerS Socket: sock, Choked: true, PeerChoked: true, - write: make(chan []byte), + writeCh: make(chan []byte), PeerMaxRequests: 250, // Default in libtorrent is 250. } + go conn.writer() defer func() { // There's a lock and deferred unlock later in this function. The // client will not be locked when this deferred is invoked. @@ -336,13 +337,12 @@ func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerS defer me.mu.Unlock() conn.Close() }() - go conn.writer() // go conn.writeOptimizer() - conn.write <- pp.Bytes(pp.Protocol) - conn.write <- pp.Bytes("\x00\x00\x00\x00\x00\x10\x00\x00") + conn.write(pp.Bytes(pp.Protocol)) + conn.write(pp.Bytes("\x00\x00\x00\x00\x00\x10\x00\x00")) if torrent != nil { - conn.write <- pp.Bytes(torrent.InfoHash[:]) - conn.write <- pp.Bytes(me.PeerId[:]) + conn.write(pp.Bytes(torrent.InfoHash[:])) + conn.write(pp.Bytes(me.PeerId[:])) } var b [28]byte _, err = io.ReadFull(conn.Socket, b[:]) @@ -375,8 +375,8 @@ func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerS if torrent == nil { return } - conn.write <- pp.Bytes(torrent.InfoHash[:]) - conn.write <- pp.Bytes(me.PeerId[:]) + conn.write(pp.Bytes(torrent.InfoHash[:])) + conn.write(pp.Bytes(me.PeerId[:])) } me.mu.Lock() defer me.mu.Unlock() diff --git a/connection.go b/connection.go index 2b471910..f966774f 100644 --- a/connection.go +++ b/connection.go @@ -29,7 +29,7 @@ type connection struct { closed bool mu sync.Mutex // Only for closing. post chan peer_protocol.Message - write chan []byte + writeCh chan []byte // Stuff controlled by the local peer. Interested bool @@ -50,6 +50,12 @@ type connection struct { PeerClientName string } +func (cn *connection) write(b []byte) { + cn.mu.Lock() + cn.writeCh <- b + cn.mu.Unlock() +} + func (cn *connection) completedString() string { if cn.PeerPieces == nil { return "?" @@ -140,7 +146,7 @@ func (c *connection) Close() { if c.post == nil { // writeOptimizer isn't running, so we need to signal the writer to // stop. - close(c.write) + close(c.writeCh) } else { // This will kill the writeOptimizer, and it kills the writer. close(c.post) @@ -274,7 +280,7 @@ var ( // Writes buffers to the socket from the write channel. func (conn *connection) writer() { - for b := range conn.write { + for b := range conn.writeCh { _, err := conn.Socket.Write(b) // log.Printf("wrote %q to %s", b, conn.Socket.RemoteAddr()) if err != nil { @@ -284,17 +290,18 @@ func (conn *connection) writer() { break } } + conn.Close() } func (conn *connection) writeOptimizer(keepAliveDelay time.Duration) { - defer close(conn.write) // Responsible for notifying downstream routines. - pending := list.New() // Message queue. - var nextWrite []byte // Set to nil if we need to need to marshal the next message. + defer close(conn.writeCh) // Responsible for notifying downstream routines. + pending := list.New() // Message queue. + var nextWrite []byte // Set to nil if we need to need to marshal the next message. timer := time.NewTimer(keepAliveDelay) defer timer.Stop() lastWrite := time.Now() for { - write := conn.write // Set to nil if there's nothing to write. + write := conn.writeCh // Set to nil if there's nothing to write. if pending.Len() == 0 { write = nil } else if nextWrite == nil { diff --git a/connection_test.go b/connection_test.go index 280afe9f..0b7f8e28 100644 --- a/connection_test.go +++ b/connection_test.go @@ -11,7 +11,7 @@ func TestCancelRequestOptimized(t *testing.T) { PeerMaxRequests: 1, PeerPieces: []bool{false, true}, post: make(chan peer_protocol.Message), - write: make(chan []byte), + writeCh: make(chan []byte), } if len(c.Requests) != 0 { t.FailNow() @@ -33,14 +33,14 @@ func TestCancelRequestOptimized(t *testing.T) { // Let a keep-alive through to verify there were no pending messages. "\x00\x00\x00\x00", } { - bb := string(<-c.write) + bb := string(<-c.writeCh) if b != bb { t.Fatalf("received message %q is not expected: %q", bb, b) } } close(c.post) // Drain the write channel until it closes. - for b := range c.write { + for b := range c.writeCh { bs := string(b) if bs != "\x00\x00\x00\x00" { t.Fatal("got unexpected non-keepalive") -- 2.48.1