From: Matt Joiner Date: Sat, 7 May 2016 08:56:44 +0000 (+1000) Subject: missinggo.Event changed, connection.writeOptimizer changes X-Git-Tag: v1.0.0~747 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=0fd73396fd9ac3cf9c47bf50841f4ba8dd6132f0;p=btrtrc.git missinggo.Event changed, connection.writeOptimizer changes --- diff --git a/client.go b/client.go index 20371325..33f5b293 100644 --- a/client.go +++ b/client.go @@ -881,8 +881,7 @@ func (cl *Client) runHandshookConn(c *connection, t *Torrent) (err error) { return } defer cl.dropConnection(t, c) - go c.writer() - go c.writeOptimizer(time.Minute) + go c.writer(time.Minute) cl.sendInitialMessages(c, t) err = cl.connectionLoop(t, c) if err != nil { diff --git a/client_test.go b/client_test.go index f85f42c9..94ea00da 100644 --- a/client_test.go +++ b/client_test.go @@ -93,7 +93,9 @@ func TestTorrentInitialState(t *testing.T) { require.NoError(t, err) require.Len(t, tor.pieces, 3) tor.pendAllChunkSpecs(0) + tor.cl.mu.Lock() assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0)) + tor.cl.mu.Unlock() assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize)) } diff --git a/connection.go b/connection.go index 8d3c45ad..e7b6ffe7 100644 --- a/connection.go +++ b/connection.go @@ -4,13 +4,13 @@ import ( "bufio" "bytes" "container/list" - "encoding" "errors" "expvar" "fmt" "io" "net" "strconv" + "sync" "time" "github.com/anacrolix/missinggo" @@ -42,8 +42,6 @@ type connection struct { Discovery peerSource uTP bool closed missinggo.Event - post chan pp.Message - writeCh chan []byte UnwantedChunksReceived int UsefulChunksReceived int @@ -88,6 +86,13 @@ type connection struct { pieceInclination []int pieceRequestOrder prioritybitmap.PriorityBitmap + + outgoingUnbufferedMessages *list.List + outgoingUnbufferedMessagesNotEmpty missinggo.Event +} + +func (cn *connection) mu() sync.Locker { + return &cn.t.cl.mu } func newConnection() (c *connection) { @@ -95,9 +100,6 @@ func newConnection() (c *connection) { Choked: true, PeerChoked: true, PeerMaxRequests: 250, - - writeCh: make(chan []byte), - post: make(chan pp.Message), } return } @@ -221,11 +223,23 @@ func (cn *connection) PeerHasPiece(piece int) bool { } func (cn *connection) Post(msg pp.Message) { - select { - case cn.post <- msg: - postedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1) - case <-cn.closed.C(): + switch msg.Type { + case pp.Cancel: + for e := cn.outgoingUnbufferedMessages.Back(); e != nil; e = e.Prev() { + elemMsg := e.Value.(pp.Message) + if elemMsg.Type == pp.Request && elemMsg.Index == msg.Index && elemMsg.Begin == msg.Begin && elemMsg.Length == msg.Length { + cn.outgoingUnbufferedMessages.Remove(e) + optimizedCancels.Add(1) + return + } + } + } + if cn.outgoingUnbufferedMessages == nil { + cn.outgoingUnbufferedMessages = list.New() } + cn.outgoingUnbufferedMessages.PushBack(msg) + cn.outgoingUnbufferedMessagesNotEmpty.Set() + postedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1) } func (cn *connection) RequestPending(r request) bool { @@ -305,10 +319,7 @@ func (cn *connection) Request(chunk request) bool { // Returns true if an unsatisfied request was canceled. func (cn *connection) Cancel(r request) bool { - if cn.Requests == nil { - return false - } - if _, ok := cn.Requests[r]; !ok { + if !cn.RequestPending(r) { return false } delete(cn.Requests, r) @@ -378,111 +389,53 @@ var ( ) // Writes buffers to the socket from the write channel. -func (cn *connection) writer() { +func (cn *connection) writer(keepAliveTimeout time.Duration) { defer func() { - cn.t.cl.mu.Lock() - defer cn.t.cl.mu.Unlock() + cn.mu().Lock() + defer cn.mu().Unlock() cn.Close() }() // Reduce write syscalls. buf := bufio.NewWriter(cn.rw) + keepAliveTimer := time.NewTimer(keepAliveTimeout) for { - if buf.Buffered() == 0 { - // There's nothing to write, so block until we get something. - select { - case b, ok := <-cn.writeCh: - if !ok { - return - } - connectionWriterWrite.Add(1) - _, err := buf.Write(b) - if err != nil { - return - } - case <-cn.closed.C(): - return - } - } else { - // We already have something to write, so flush if there's nothing - // more to write. - select { - case b, ok := <-cn.writeCh: - if !ok { - return - } - connectionWriterWrite.Add(1) - _, err := buf.Write(b) - if err != nil { - return - } - case <-cn.closed.C(): - return - default: - connectionWriterFlush.Add(1) - err := buf.Flush() - if err != nil { - return - } - } - } - } -} - -func (cn *connection) writeOptimizer(keepAliveDelay time.Duration) { - defer close(cn.writeCh) // Responsible for notifying downstream routines. - pending := list.New() // Message queue. - var nextWrite []byte // Set to nil if we need to need to marshal the next message. - timer := time.NewTimer(keepAliveDelay) - defer timer.Stop() - lastWrite := time.Now() - for { - write := cn.writeCh // Set to nil if there's nothing to write. - if pending.Len() == 0 { - write = nil - } else if nextWrite == nil { - var err error - nextWrite, err = pending.Front().Value.(encoding.BinaryMarshaler).MarshalBinary() + cn.mu().Lock() + for cn.outgoingUnbufferedMessages.Len() != 0 { + msg := cn.outgoingUnbufferedMessages.Remove(cn.outgoingUnbufferedMessages.Front()).(pp.Message) + cn.mu().Unlock() + b, err := msg.MarshalBinary() if err != nil { panic(err) } - } - event: - select { - case <-timer.C: - if pending.Len() != 0 { - break - } - keepAliveTime := lastWrite.Add(keepAliveDelay) - if time.Now().Before(keepAliveTime) { - timer.Reset(keepAliveTime.Sub(time.Now())) - break - } - pending.PushBack(pp.Message{Keepalive: true}) - postedKeepalives.Add(1) - case msg, ok := <-cn.post: - if !ok { + connectionWriterWrite.Add(1) + n, err := buf.Write(b) + if err != nil { return } - if msg.Type == pp.Cancel { - for e := pending.Back(); e != nil; e = e.Prev() { - elemMsg := e.Value.(pp.Message) - if elemMsg.Type == pp.Request && msg.Index == elemMsg.Index && msg.Begin == elemMsg.Begin && msg.Length == elemMsg.Length { - pending.Remove(e) - optimizedCancels.Add(1) - break event - } - } + keepAliveTimer.Reset(keepAliveTimeout) + if n != len(b) { + panic("short write") } - pending.PushBack(msg) - case write <- nextWrite: - pending.Remove(pending.Front()) - nextWrite = nil - lastWrite = time.Now() - if pending.Len() == 0 { - timer.Reset(keepAliveDelay) + cn.mu().Lock() + } + cn.outgoingUnbufferedMessagesNotEmpty.Clear() + cn.mu().Unlock() + connectionWriterFlush.Add(1) + if buf.Buffered() != 0 { + if buf.Flush() != nil { + return } - case <-cn.closed.C(): + keepAliveTimer.Reset(keepAliveTimeout) + } + select { + case <-cn.closed.LockedChan(cn.mu()): return + case <-cn.outgoingUnbufferedMessagesNotEmpty.LockedChan(cn.mu()): + case <-keepAliveTimer.C: + cn.mu().Lock() + cn.Post(pp.Message{Keepalive: true}) + cn.mu().Unlock() + postedKeepalives.Add(1) } } } diff --git a/connection_test.go b/connection_test.go index dc225ad5..29d032ac 100644 --- a/connection_test.go +++ b/connection_test.go @@ -1,16 +1,19 @@ package torrent import ( + "io" + "io/ioutil" + "net" "testing" "time" "github.com/anacrolix/missinggo/bitmap" "github.com/stretchr/testify/assert" - - "github.com/anacrolix/torrent/peer_protocol" + "github.com/stretchr/testify/require" ) func TestCancelRequestOptimized(t *testing.T) { + r, w := io.Pipe() c := &connection{ PeerMaxRequests: 1, peerPieces: func() bitmap.Bitmap { @@ -18,38 +21,39 @@ func TestCancelRequestOptimized(t *testing.T) { bm.Set(1, true) return bm }(), - post: make(chan peer_protocol.Message), - writeCh: make(chan []byte), + rw: struct { + io.Reader + io.Writer + }{ + Writer: w, + }, + conn: new(net.TCPConn), + // For the locks + t: &Torrent{cl: &Client{}}, } assert.Len(t, c.Requests, 0) - // Keepalive timeout of 0 works because I'm just that good. - go c.writeOptimizer(0 * time.Millisecond) c.Request(newRequest(1, 2, 3)) - if len(c.Requests) != 1 { - t.Fatal("request was not posted") - } + require.Len(t, c.Requests, 1) // Posting this message should removing the pending Request. - if !c.Cancel(newRequest(1, 2, 3)) { - t.Fatal("request was not found") - } - // Check that the write optimization has filtered out the Request message. - for _, b := range []string{ - // The initial request triggers an Interested message. - "\x00\x00\x00\x01\x02", - // Let a keep-alive through to verify there were no pending messages. - "\x00\x00\x00\x00", - } { - bb := string(<-c.writeCh) - if b != bb { - t.Fatalf("received message %q is not expected: %q", bb, b) - } - } - close(c.post) - // Drain the write channel until it closes. - for b := range c.writeCh { - bs := string(b) - if bs != "\x00\x00\x00\x00" { - t.Fatal("got unexpected non-keepalive") - } - } + require.True(t, c.Cancel(newRequest(1, 2, 3))) + assert.Len(t, c.Requests, 0) + // Check that write optimization filters out the Request, due to the + // Cancel. We should have received an Interested, due to the initial + // request, and then keep-alives until we close the connection. + go c.writer(0) + b := make([]byte, 9) + n, err := io.ReadFull(r, b) + require.NoError(t, err) + require.EqualValues(t, len(b), n) + require.EqualValues(t, "\x00\x00\x00\x01\x02"+"\x00\x00\x00\x00", string(b)) + time.Sleep(time.Millisecond) + c.mu().Lock() + c.Close() + c.mu().Unlock() + w.Close() + b, err = ioutil.ReadAll(r) + require.NoError(t, err) + // A single keep-alive will have gone through, as writer would be stuck + // trying to flush it, and then promptly close. + require.EqualValues(t, "\x00\x00\x00\x00", string(b)) } diff --git a/dht/server.go b/dht/server.go index 41187537..630fc421 100644 --- a/dht/server.go +++ b/dht/server.go @@ -585,7 +585,7 @@ func (s *Server) bootstrap() (err error) { }() s.mu.Unlock() select { - case <-s.closed.C(): + case <-s.closed.LockedChan(&s.mu): s.mu.Lock() return case <-time.After(15 * time.Second):