From: Matt Joiner Date: Thu, 31 Aug 2017 13:48:52 +0000 (+1000) Subject: Generate requests, cancels and interest state in the connection writer X-Git-Tag: v1.0.0~418 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=13e79039f292dbb886b15b17d12a3563eba3540c;p=btrtrc.git Generate requests, cancels and interest state in the connection writer --- diff --git a/client.go b/client.go index 6c999a98..d3ee0703 100644 --- a/client.go +++ b/client.go @@ -1035,23 +1035,11 @@ func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) { } } -func (cl *Client) peerUnchoked(torrent *Torrent, conn *connection) { - conn.updateRequests() -} - -func (cl *Client) connCancel(t *Torrent, cn *connection, r request) (ok bool) { - ok = cn.Cancel(r) - if ok { - postedCancels.Add(1) - } - return -} - func (cl *Client) connDeleteRequest(t *Torrent, cn *connection, r request) bool { if !cn.RequestPending(r) { return false } - delete(cn.Requests, r) + delete(cn.requests, r) return true } diff --git a/connection.go b/connection.go index 11075edf..20fec26f 100644 --- a/connection.go +++ b/connection.go @@ -66,7 +66,7 @@ type connection struct { // Stuff controlled by the local peer. Interested bool Choked bool - Requests map[request]struct{} + requests map[request]struct{} requestsLowWater int // Indexed by metadata piece, set to true if posted and pending a // response. @@ -204,7 +204,7 @@ func (cn *connection) WriteStatus(w io.Writer, t *Torrent) { cn.UsefulChunksReceived, cn.UnwantedChunksReceived+cn.UsefulChunksReceived, cn.chunksSent, - len(cn.Requests), + cn.numLocalRequests(), len(cn.PeerRequests), cn.statusFlags(), ) @@ -243,7 +243,7 @@ func (cn *connection) Post(msg pp.Message) { } func (cn *connection) RequestPending(r request) bool { - _, ok := cn.Requests[r] + _, ok := cn.requests[r] return ok } @@ -288,50 +288,6 @@ func (cn *connection) nominalMaxRequests() (ret int) { return } -// Returns true if more requests can be sent. -func (cn *connection) Request(chunk request) bool { - if len(cn.Requests) >= cn.nominalMaxRequests() { - return false - } - if !cn.PeerHasPiece(int(chunk.Index)) { - return true - } - if cn.RequestPending(chunk) { - return true - } - cn.SetInterested(true) - if cn.PeerChoked { - return false - } - if cn.Requests == nil { - cn.Requests = make(map[request]struct{}, cn.PeerMaxRequests) - } - cn.Requests[chunk] = struct{}{} - cn.requestsLowWater = len(cn.Requests) / 2 - cn.Post(pp.Message{ - Type: pp.Request, - Index: chunk.Index, - Begin: chunk.Begin, - Length: chunk.Length, - }) - return true -} - -// Returns true if an unsatisfied request was canceled. -func (cn *connection) Cancel(r request) bool { - if !cn.RequestPending(r) { - return false - } - delete(cn.Requests, r) - cn.Post(pp.Message{ - Type: pp.Cancel, - Index: r.Index, - Begin: r.Begin, - Length: r.Length, - }) - return true -} - // Returns true if an unsatisfied request was canceled. func (cn *connection) PeerCancel(r request) bool { if cn.PeerRequests == nil { @@ -365,11 +321,13 @@ func (cn *connection) Unchoke() { cn.Choked = false } -func (cn *connection) SetInterested(interested bool) { +func (cn *connection) SetInterested(interested bool, msg func(pp.Message) bool) bool { if cn.Interested == interested { - return + return true } - cn.Post(pp.Message{ + cn.Interested = interested + // log.Printf("%p: setting interest: %v", cn, interested) + return msg(pp.Message{ Type: func() pp.MessageType { if interested { return pp.Interested @@ -378,7 +336,6 @@ func (cn *connection) SetInterested(interested bool) { } }(), }) - cn.Interested = interested } var ( @@ -388,6 +345,44 @@ var ( connectionWriterWrite = expvar.NewInt("connectionWriterWrite") ) +func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) { + rs, i := cn.desiredRequestState() + if !cn.SetInterested(i, msg) { + return + } + for r := range cn.requests { + if _, ok := rs[r]; !ok { + delete(cn.requests, r) + // log.Printf("%p: cancelling request: %v", cn, r) + if !msg(pp.Message{ + Type: pp.Cancel, + Index: r.Index, + Begin: r.Begin, + Length: r.Length, + }) { + return + } + } + } + for r := range rs { + if _, ok := cn.requests[r]; !ok { + if cn.requests == nil { + cn.requests = make(map[request]struct{}, cn.nominalMaxRequests()) + } + cn.requests[r] = struct{}{} + // log.Printf("%p: requesting %v", cn, r) + if !msg(pp.Message{ + Type: pp.Request, + Index: r.Index, + Begin: r.Begin, + Length: r.Length, + }) { + return + } + } + } +} + // Writes buffers to the socket from the write channel. func (cn *connection) writer(keepAliveTimeout time.Duration) { var ( @@ -410,6 +405,12 @@ func (cn *connection) writer(keepAliveTimeout time.Duration) { for { buf.Write(cn.postedBuffer.Bytes()) cn.postedBuffer.Reset() + if buf.Len() == 0 { + cn.fillWriteBuffer(func(msg pp.Message) bool { + buf.Write(msg.MustMarshalBinary()) + return buf.Len() < 1<<16 + }) + } if buf.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout { buf.Write(pp.Message{Keepalive: true}.MustMarshalBinary()) postedKeepalives.Add(1) @@ -500,31 +501,21 @@ func nextRequestState( } func (cn *connection) updateRequests() { - rs, i := nextRequestState( + cn.writerCond.Broadcast() +} + +func (cn *connection) desiredRequestState() (map[request]struct{}, bool) { + return nextRequestState( cn.t.networkingEnabled, - cn.Requests, + cn.requests, cn.PeerChoked, &cn.pieceRequestOrder, func(piece int, f func(chunkSpec) bool) bool { return undirtiedChunks(piece, cn.t, f) }, cn.requestsLowWater, - cn.nominalMaxRequests()) - for r := range cn.Requests { - if _, ok := rs[r]; !ok { - if !cn.Cancel(r) { - panic("wat") - } - } - } - for r := range rs { - if _, ok := cn.Requests[r]; !ok { - if !cn.Request(r) { - panic("how") - } - } - } - cn.SetInterested(i) + cn.nominalMaxRequests(), + ) } func undirtiedChunks(piece int, t *Torrent, f func(chunkSpec) bool) bool { @@ -536,7 +527,7 @@ func undirtiedChunks(piece int, t *Torrent, f func(chunkSpec) bool) bool { func (cn *connection) stopRequestingPiece(piece int) { cn.pieceRequestOrder.Remove(piece) - cn.updateRequests() + cn.writerCond.Broadcast() } // This is distinct from Torrent piece priority, which is the user's @@ -754,7 +745,7 @@ func (c *connection) mainReadLoop() error { switch msg.Type { case pp.Choke: c.PeerChoked = true - c.Requests = nil + c.requests = nil // We can then reset our interest. c.updateRequests() case pp.Reject: @@ -762,7 +753,7 @@ func (c *connection) mainReadLoop() error { c.updateRequests() case pp.Unchoke: c.PeerChoked = false - cl.peerUnchoked(t, c) + c.writerCond.Broadcast() case pp.Interested: c.PeerInterested = true c.upload() @@ -989,9 +980,7 @@ func (c *connection) receiveChunk(msg *pp.Message) { // Cancel pending requests for this chunk. for c := range t.conns { - if cl.connCancel(t, c, req) { - c.updateRequests() - } + c.updateRequests() } cl.mu.Unlock() @@ -1096,3 +1085,7 @@ func (cn *connection) netGoodPiecesDirtied() int { func (c *connection) peerHasWantedPieces() bool { return !c.pieceRequestOrder.IsEmpty() } + +func (c *connection) numLocalRequests() int { + return len(c.requests) +} diff --git a/connection_test.go b/connection_test.go index 9fd4066d..4f1d715b 100644 --- a/connection_test.go +++ b/connection_test.go @@ -2,13 +2,10 @@ package torrent import ( "io" - "io/ioutil" - "net" "sync" "testing" "time" - "github.com/anacrolix/missinggo/bitmap" "github.com/anacrolix/missinggo/pubsub" "github.com/bradfitz/iter" "github.com/stretchr/testify/assert" @@ -19,49 +16,6 @@ import ( "github.com/anacrolix/torrent/storage" ) -func TestCancelRequestOptimized(t *testing.T) { - r, w := io.Pipe() - c := &connection{ - PeerMaxRequests: 1, - peerPieces: func() bitmap.Bitmap { - var bm bitmap.Bitmap - bm.Set(1, true) - return bm - }(), - w: w, - conn: new(net.TCPConn), - // For the locks - t: &Torrent{cl: &Client{}}, - } - assert.Len(t, c.Requests, 0) - c.Request(newRequest(1, 2, 3)) - require.Len(t, c.Requests, 1) - // Posting this message should removing the pending Request. - require.True(t, c.Cancel(newRequest(1, 2, 3))) - assert.Len(t, c.Requests, 0) - // Check that write optimization filters out the Request, due to the - // Cancel. We should have received an Interested, due to the initial - // request, and then keep-alives until we close the connection. - go c.writer(0) - b := make([]byte, 9) - n, err := io.ReadFull(r, b) - require.NoError(t, err) - require.EqualValues(t, len(b), n) - require.EqualValues(t, "\x00\x00\x00\x01\x02"+"\x00\x00\x00\x00", string(b)) - time.Sleep(time.Millisecond) - c.mu().Lock() - c.Close() - c.mu().Unlock() - w.Close() - b, err = ioutil.ReadAll(r) - require.NoError(t, err) - // A single keep-alive may have gone through, as writer would be stuck - // trying to flush it, and then promptly close. - if s := string(b); s != "\x00\x00\x00\x00" && s != "" { - t.Logf("expected zero or one keepalives, got %q", s) - } -} - // Ensure that no race exists between sending a bitfield, and a subsequent // Have that would potentially alter it. func TestSendBitfieldThenHave(t *testing.T) { diff --git a/torrent.go b/torrent.go index eb4f7a7d..b741dde3 100644 --- a/torrent.go +++ b/torrent.go @@ -1422,16 +1422,18 @@ func (t *Torrent) pieceHashed(piece int, correct bool) { } } +func (t *Torrent) cancelRequestsForPiece(piece int) { + for cn := range t.conns { + cn.writerCond.Broadcast() + } +} + func (t *Torrent) onPieceCompleted(piece int) { t.pendingPieces.Remove(piece) t.pendAllChunkSpecs(piece) + t.cancelRequestsForPiece(piece) for conn := range t.conns { conn.Have(piece) - for r := range conn.Requests { - if int(r.Index) == piece { - conn.Cancel(r) - } - } // Could check here if peer doesn't have piece, but due to caching // some peers may have said they have a piece but they don't. conn.upload()