]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Generate requests, cancels and interest state in the connection writer
authorMatt Joiner <anacrolix@gmail.com>
Thu, 31 Aug 2017 13:48:52 +0000 (23:48 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Thu, 31 Aug 2017 13:48:52 +0000 (23:48 +1000)
client.go
connection.go
connection_test.go
torrent.go

index 6c999a98e2d260c3a24e940f861396ea5253b993..d3ee0703c3e725e30051ed3a71847b00de5eb87d 100644 (file)
--- a/client.go
+++ b/client.go
@@ -1035,23 +1035,11 @@ func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
        }
 }
 
-func (cl *Client) peerUnchoked(torrent *Torrent, conn *connection) {
-       conn.updateRequests()
-}
-
-func (cl *Client) connCancel(t *Torrent, cn *connection, r request) (ok bool) {
-       ok = cn.Cancel(r)
-       if ok {
-               postedCancels.Add(1)
-       }
-       return
-}
-
 func (cl *Client) connDeleteRequest(t *Torrent, cn *connection, r request) bool {
        if !cn.RequestPending(r) {
                return false
        }
-       delete(cn.Requests, r)
+       delete(cn.requests, r)
        return true
 }
 
index 11075edf4c0898024f022791e4457511c8d4f73c..20fec26f4a3516db04be0e2df1ad23869b499e14 100644 (file)
@@ -66,7 +66,7 @@ type connection struct {
        // Stuff controlled by the local peer.
        Interested       bool
        Choked           bool
-       Requests         map[request]struct{}
+       requests         map[request]struct{}
        requestsLowWater int
        // Indexed by metadata piece, set to true if posted and pending a
        // response.
@@ -204,7 +204,7 @@ func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
                cn.UsefulChunksReceived,
                cn.UnwantedChunksReceived+cn.UsefulChunksReceived,
                cn.chunksSent,
-               len(cn.Requests),
+               cn.numLocalRequests(),
                len(cn.PeerRequests),
                cn.statusFlags(),
        )
@@ -243,7 +243,7 @@ func (cn *connection) Post(msg pp.Message) {
 }
 
 func (cn *connection) RequestPending(r request) bool {
-       _, ok := cn.Requests[r]
+       _, ok := cn.requests[r]
        return ok
 }
 
@@ -288,50 +288,6 @@ func (cn *connection) nominalMaxRequests() (ret int) {
        return
 }
 
-// Returns true if more requests can be sent.
-func (cn *connection) Request(chunk request) bool {
-       if len(cn.Requests) >= cn.nominalMaxRequests() {
-               return false
-       }
-       if !cn.PeerHasPiece(int(chunk.Index)) {
-               return true
-       }
-       if cn.RequestPending(chunk) {
-               return true
-       }
-       cn.SetInterested(true)
-       if cn.PeerChoked {
-               return false
-       }
-       if cn.Requests == nil {
-               cn.Requests = make(map[request]struct{}, cn.PeerMaxRequests)
-       }
-       cn.Requests[chunk] = struct{}{}
-       cn.requestsLowWater = len(cn.Requests) / 2
-       cn.Post(pp.Message{
-               Type:   pp.Request,
-               Index:  chunk.Index,
-               Begin:  chunk.Begin,
-               Length: chunk.Length,
-       })
-       return true
-}
-
-// Returns true if an unsatisfied request was canceled.
-func (cn *connection) Cancel(r request) bool {
-       if !cn.RequestPending(r) {
-               return false
-       }
-       delete(cn.Requests, r)
-       cn.Post(pp.Message{
-               Type:   pp.Cancel,
-               Index:  r.Index,
-               Begin:  r.Begin,
-               Length: r.Length,
-       })
-       return true
-}
-
 // Returns true if an unsatisfied request was canceled.
 func (cn *connection) PeerCancel(r request) bool {
        if cn.PeerRequests == nil {
@@ -365,11 +321,13 @@ func (cn *connection) Unchoke() {
        cn.Choked = false
 }
 
-func (cn *connection) SetInterested(interested bool) {
+func (cn *connection) SetInterested(interested bool, msg func(pp.Message) bool) bool {
        if cn.Interested == interested {
-               return
+               return true
        }
-       cn.Post(pp.Message{
+       cn.Interested = interested
+       // log.Printf("%p: setting interest: %v", cn, interested)
+       return msg(pp.Message{
                Type: func() pp.MessageType {
                        if interested {
                                return pp.Interested
@@ -378,7 +336,6 @@ func (cn *connection) SetInterested(interested bool) {
                        }
                }(),
        })
-       cn.Interested = interested
 }
 
 var (
@@ -388,6 +345,44 @@ var (
        connectionWriterWrite = expvar.NewInt("connectionWriterWrite")
 )
 
+func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
+       rs, i := cn.desiredRequestState()
+       if !cn.SetInterested(i, msg) {
+               return
+       }
+       for r := range cn.requests {
+               if _, ok := rs[r]; !ok {
+                       delete(cn.requests, r)
+                       // log.Printf("%p: cancelling request: %v", cn, r)
+                       if !msg(pp.Message{
+                               Type:   pp.Cancel,
+                               Index:  r.Index,
+                               Begin:  r.Begin,
+                               Length: r.Length,
+                       }) {
+                               return
+                       }
+               }
+       }
+       for r := range rs {
+               if _, ok := cn.requests[r]; !ok {
+                       if cn.requests == nil {
+                               cn.requests = make(map[request]struct{}, cn.nominalMaxRequests())
+                       }
+                       cn.requests[r] = struct{}{}
+                       // log.Printf("%p: requesting %v", cn, r)
+                       if !msg(pp.Message{
+                               Type:   pp.Request,
+                               Index:  r.Index,
+                               Begin:  r.Begin,
+                               Length: r.Length,
+                       }) {
+                               return
+                       }
+               }
+       }
+}
+
 // Writes buffers to the socket from the write channel.
 func (cn *connection) writer(keepAliveTimeout time.Duration) {
        var (
@@ -410,6 +405,12 @@ func (cn *connection) writer(keepAliveTimeout time.Duration) {
        for {
                buf.Write(cn.postedBuffer.Bytes())
                cn.postedBuffer.Reset()
+               if buf.Len() == 0 {
+                       cn.fillWriteBuffer(func(msg pp.Message) bool {
+                               buf.Write(msg.MustMarshalBinary())
+                               return buf.Len() < 1<<16
+                       })
+               }
                if buf.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout {
                        buf.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
                        postedKeepalives.Add(1)
@@ -500,31 +501,21 @@ func nextRequestState(
 }
 
 func (cn *connection) updateRequests() {
-       rs, i := nextRequestState(
+       cn.writerCond.Broadcast()
+}
+
+func (cn *connection) desiredRequestState() (map[request]struct{}, bool) {
+       return nextRequestState(
                cn.t.networkingEnabled,
-               cn.Requests,
+               cn.requests,
                cn.PeerChoked,
                &cn.pieceRequestOrder,
                func(piece int, f func(chunkSpec) bool) bool {
                        return undirtiedChunks(piece, cn.t, f)
                },
                cn.requestsLowWater,
-               cn.nominalMaxRequests())
-       for r := range cn.Requests {
-               if _, ok := rs[r]; !ok {
-                       if !cn.Cancel(r) {
-                               panic("wat")
-                       }
-               }
-       }
-       for r := range rs {
-               if _, ok := cn.Requests[r]; !ok {
-                       if !cn.Request(r) {
-                               panic("how")
-                       }
-               }
-       }
-       cn.SetInterested(i)
+               cn.nominalMaxRequests(),
+       )
 }
 
 func undirtiedChunks(piece int, t *Torrent, f func(chunkSpec) bool) bool {
@@ -536,7 +527,7 @@ func undirtiedChunks(piece int, t *Torrent, f func(chunkSpec) bool) bool {
 
 func (cn *connection) stopRequestingPiece(piece int) {
        cn.pieceRequestOrder.Remove(piece)
-       cn.updateRequests()
+       cn.writerCond.Broadcast()
 }
 
 // This is distinct from Torrent piece priority, which is the user's
@@ -754,7 +745,7 @@ func (c *connection) mainReadLoop() error {
                switch msg.Type {
                case pp.Choke:
                        c.PeerChoked = true
-                       c.Requests = nil
+                       c.requests = nil
                        // We can then reset our interest.
                        c.updateRequests()
                case pp.Reject:
@@ -762,7 +753,7 @@ func (c *connection) mainReadLoop() error {
                        c.updateRequests()
                case pp.Unchoke:
                        c.PeerChoked = false
-                       cl.peerUnchoked(t, c)
+                       c.writerCond.Broadcast()
                case pp.Interested:
                        c.PeerInterested = true
                        c.upload()
@@ -989,9 +980,7 @@ func (c *connection) receiveChunk(msg *pp.Message) {
 
        // Cancel pending requests for this chunk.
        for c := range t.conns {
-               if cl.connCancel(t, c, req) {
-                       c.updateRequests()
-               }
+               c.updateRequests()
        }
 
        cl.mu.Unlock()
@@ -1096,3 +1085,7 @@ func (cn *connection) netGoodPiecesDirtied() int {
 func (c *connection) peerHasWantedPieces() bool {
        return !c.pieceRequestOrder.IsEmpty()
 }
+
+func (c *connection) numLocalRequests() int {
+       return len(c.requests)
+}
index 9fd4066da60b163fd16cd3358173a601fd69d45a..4f1d715b5196082a9e070a1f674dded9df03cdd3 100644 (file)
@@ -2,13 +2,10 @@ package torrent
 
 import (
        "io"
-       "io/ioutil"
-       "net"
        "sync"
        "testing"
        "time"
 
-       "github.com/anacrolix/missinggo/bitmap"
        "github.com/anacrolix/missinggo/pubsub"
        "github.com/bradfitz/iter"
        "github.com/stretchr/testify/assert"
@@ -19,49 +16,6 @@ import (
        "github.com/anacrolix/torrent/storage"
 )
 
-func TestCancelRequestOptimized(t *testing.T) {
-       r, w := io.Pipe()
-       c := &connection{
-               PeerMaxRequests: 1,
-               peerPieces: func() bitmap.Bitmap {
-                       var bm bitmap.Bitmap
-                       bm.Set(1, true)
-                       return bm
-               }(),
-               w:    w,
-               conn: new(net.TCPConn),
-               // For the locks
-               t: &Torrent{cl: &Client{}},
-       }
-       assert.Len(t, c.Requests, 0)
-       c.Request(newRequest(1, 2, 3))
-       require.Len(t, c.Requests, 1)
-       // Posting this message should removing the pending Request.
-       require.True(t, c.Cancel(newRequest(1, 2, 3)))
-       assert.Len(t, c.Requests, 0)
-       // Check that write optimization filters out the Request, due to the
-       // Cancel. We should have received an Interested, due to the initial
-       // request, and then keep-alives until we close the connection.
-       go c.writer(0)
-       b := make([]byte, 9)
-       n, err := io.ReadFull(r, b)
-       require.NoError(t, err)
-       require.EqualValues(t, len(b), n)
-       require.EqualValues(t, "\x00\x00\x00\x01\x02"+"\x00\x00\x00\x00", string(b))
-       time.Sleep(time.Millisecond)
-       c.mu().Lock()
-       c.Close()
-       c.mu().Unlock()
-       w.Close()
-       b, err = ioutil.ReadAll(r)
-       require.NoError(t, err)
-       // A single keep-alive may have gone through, as writer would be stuck
-       // trying to flush it, and then promptly close.
-       if s := string(b); s != "\x00\x00\x00\x00" && s != "" {
-               t.Logf("expected zero or one keepalives, got %q", s)
-       }
-}
-
 // Ensure that no race exists between sending a bitfield, and a subsequent
 // Have that would potentially alter it.
 func TestSendBitfieldThenHave(t *testing.T) {
index eb4f7a7da06b8e7d3fdb602533f01ff1085a4187..b741dde32cf15c2d333126dd1be56b8c31441f73 100644 (file)
@@ -1422,16 +1422,18 @@ func (t *Torrent) pieceHashed(piece int, correct bool) {
        }
 }
 
+func (t *Torrent) cancelRequestsForPiece(piece int) {
+       for cn := range t.conns {
+               cn.writerCond.Broadcast()
+       }
+}
+
 func (t *Torrent) onPieceCompleted(piece int) {
        t.pendingPieces.Remove(piece)
        t.pendAllChunkSpecs(piece)
+       t.cancelRequestsForPiece(piece)
        for conn := range t.conns {
                conn.Have(piece)
-               for r := range conn.Requests {
-                       if int(r.Index) == piece {
-                               conn.Cancel(r)
-                       }
-               }
                // Could check here if peer doesn't have piece, but due to caching
                // some peers may have said they have a piece but they don't.
                conn.upload()