connection.go | 68 +++++++++++++++++++++++++++++++++-------------------- diff --git a/connection.go b/connection.go index 1d70c7a7998de34e146d455adb86cea60ac08b55..ba0d124a8f85077dc1b1b1262c94450c6c49a735 100644 --- a/connection.go +++ b/connection.go @@ -431,41 +431,57 @@ }) c.Interested = interested } -var connectionWriterFlush = expvar.NewInt("connectionWriterFlush") -var connectionWriterWrite = expvar.NewInt("connectionWriterWrite") +var ( + // Track connection writer buffer writes and flushes, to determine its + // efficiency. + connectionWriterFlush = expvar.NewInt("connectionWriterFlush") + connectionWriterWrite = expvar.NewInt("connectionWriterWrite") +) // Writes buffers to the socket from the write channel. func (conn *connection) writer() { // Reduce write syscalls. - buf := bufio.NewWriterSize(conn.rw, 0x8000) // 32 KiB - // Receives when buf is not empty. - notEmpty := make(chan struct{}, 1) + buf := bufio.NewWriter(conn.rw) for { - if buf.Buffered() != 0 { - // Make sure it's receivable. + if buf.Buffered() == 0 { + // There's nothing to write, so block until we get something. select { - case notEmpty <- struct{}{}: - default: - } - } - select { - case b, ok := <-conn.writeCh: - if !ok { - return - } - connectionWriterWrite.Add(1) - _, err := buf.Write(b) - if err != nil { - conn.Close() + case b, ok := <-conn.writeCh: + if !ok { + return + } + connectionWriterWrite.Add(1) + _, err := buf.Write(b) + if err != nil { + conn.Close() + return + } + case <-conn.closing: return } - case <-conn.closing: - return - case <-notEmpty: - connectionWriterFlush.Add(1) - err := buf.Flush() - if err != nil { + } else { + // We already have something to write, so flush if there's nothing + // more to write. + select { + case b, ok := <-conn.writeCh: + if !ok { + return + } + connectionWriterWrite.Add(1) + _, err := buf.Write(b) + if err != nil { + conn.Close() + return + } + case <-conn.closing: return + default: + connectionWriterFlush.Add(1) + err := buf.Flush() + if err != nil { + conn.Close() + return + } } } }