]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Apply next request state asynchronously
authorMatt Joiner <anacrolix@gmail.com>
Thu, 20 May 2021 10:23:45 +0000 (20:23 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 7 Jun 2021 03:01:40 +0000 (13:01 +1000)
client.go
internal/chansync/broadcast-cond.go.go
internal/chansync/set-once.go
peer-impl.go
peerconn.go
pexconn_test.go
request-strategy.go
webseed-peer.go

index 7ac6b264d65aaed0df3b70243fa14bac52419406..1858b84816a5e064cd6c9d5579b1e0c564d008c6 100644 (file)
--- a/client.go
+++ b/client.go
@@ -33,12 +33,12 @@ import (
        "golang.org/x/xerrors"
 
        "github.com/anacrolix/torrent/bencode"
+       "github.com/anacrolix/torrent/internal/chansync"
        "github.com/anacrolix/torrent/internal/limiter"
        "github.com/anacrolix/torrent/iplist"
        "github.com/anacrolix/torrent/metainfo"
        "github.com/anacrolix/torrent/mse"
        pp "github.com/anacrolix/torrent/peer_protocol"
-       request_strategy "github.com/anacrolix/torrent/request-strategy"
        "github.com/anacrolix/torrent/storage"
        "github.com/anacrolix/torrent/tracker"
        "github.com/anacrolix/torrent/webtorrent"
@@ -81,7 +81,7 @@ type Client struct {
 
        activeAnnounceLimiter limiter.Instance
 
-       pieceRequestOrder request_strategy.ClientPieceOrder
+       updateRequests chansync.BroadcastCond
 }
 
 type ipStr string
index 9b8906920da1900d58d4a899534f3f0241e17284..6d96d3c4e6e02ab6707e7c82c1a3ae0796b60400 100644 (file)
@@ -22,7 +22,8 @@ func (me *BroadcastCond) Broadcast() {
 }
 
 // Should be called before releasing locks on resources that might trigger subsequent Broadcasts.
-func (me *BroadcastCond) WaitChan() <-chan struct{} {
+// The channel is closed when the condition changes.
+func (me *BroadcastCond) Signaled() Signaled {
        me.mu.Lock()
        defer me.mu.Unlock()
        if me.ch == nil {
index 523e5eaf5bd638ad9dc2f2c5de4259b947a9d77c..db0e6e89c04113cb3975314148488577a9f10988 100644 (file)
@@ -9,7 +9,8 @@ type SetOnce struct {
        closeOnce sync.Once
 }
 
-func (me *SetOnce) Chan() <-chan struct{} {
+// Returns a channel that is closed when the event is flagged.
+func (me *SetOnce) Done() Done {
        me.init()
        return me.ch
 }
index 880b8f3543fe885d0c95a190f3d84e6127d737c0..23c0fbb922413298e5ac548a245d01df26bcbb58 100644 (file)
@@ -8,13 +8,14 @@ import (
 // BitTorrent protocol connections. Some methods are underlined so as to avoid collisions with
 // legacy PeerConn methods.
 type peerImpl interface {
+       onNextRequestStateChanged()
        updateRequests()
        writeInterested(interested bool) bool
 
        // Neither of these return buffer room anymore, because they're currently both posted. There's
        // also PeerConn.writeBufferFull for when/where it matters.
-       _cancel(Request)
-       _request(Request)
+       _cancel(Request) bool
+       _request(Request) bool
 
        connectionFlags() string
        onClose()
index b2753d6a88d741e266d2e8ee2982df2c2f193822..40848e37e2c5e136991323cf9cefb5dc1bf35aaf 100644 (file)
@@ -24,6 +24,7 @@ import (
        "github.com/anacrolix/torrent/metainfo"
        "github.com/anacrolix/torrent/mse"
        pp "github.com/anacrolix/torrent/peer_protocol"
+       request_strategy "github.com/anacrolix/torrent/request-strategy"
 )
 
 type PeerSource string
@@ -48,7 +49,10 @@ type PeerRemoteAddr interface {
 
 // Since we have to store all the requests in memory, we can't reasonably exceed what would be
 // indexable with the memory space available.
-type maxRequests = int
+type (
+       maxRequests  = int
+       requestState = request_strategy.PeerNextRequestState
+)
 
 type Peer struct {
        // First to ensure 64-bit alignment for atomics. See #262.
@@ -78,7 +82,8 @@ type Peer struct {
        lastChunkSent           time.Time
 
        // Stuff controlled by the local peer.
-       interested           bool
+       nextRequestState     requestState
+       actualRequestState   requestState
        lastBecameInterested time.Time
        priorInterest        time.Duration
 
@@ -87,7 +92,6 @@ type Peer struct {
        _chunksReceivedWhileExpecting       int64
 
        choking                                bool
-       requests                               map[Request]struct{}
        piecesReceivedSinceLastRequestUpdate   maxRequests
        maxPiecesReceivedBetweenRequestUpdates maxRequests
        // Chunks that we might reasonably expect to receive from the peer. Due to
@@ -171,13 +175,13 @@ func (cn *Peer) updateExpectingChunks() {
 }
 
 func (cn *Peer) expectingChunks() bool {
-       if len(cn.requests) == 0 {
+       if len(cn.actualRequestState.Requests) == 0 {
                return false
        }
-       if !cn.interested {
+       if !cn.actualRequestState.Interested {
                return false
        }
-       for r := range cn.requests {
+       for r := range cn.actualRequestState.Requests {
                if !cn.remoteChokingPiece(r.Index.Int()) {
                        return true
                }
@@ -217,7 +221,7 @@ func (l *PeerConn) hasPreferredNetworkOver(r *PeerConn) (left, ok bool) {
 
 func (cn *Peer) cumInterest() time.Duration {
        ret := cn.priorInterest
-       if cn.interested {
+       if cn.actualRequestState.Interested {
                ret += time.Since(cn.lastBecameInterested)
        }
        return ret
@@ -301,7 +305,7 @@ func (cn *Peer) statusFlags() (ret string) {
        c := func(b byte) {
                ret += string([]byte{b})
        }
-       if cn.interested {
+       if cn.actualRequestState.Interested {
                c('i')
        }
        if cn.choking {
@@ -329,7 +333,7 @@ func (cn *Peer) downloadRate() float64 {
 
 func (cn *Peer) numRequestsByPiece() (ret map[pieceIndex]int) {
        ret = make(map[pieceIndex]int)
-       for r := range cn.requests {
+       for r := range cn.actualRequestState.Requests {
                ret[pieceIndex(r.Index)]++
        }
        return
@@ -437,18 +441,6 @@ func (cn *PeerConn) write(msg pp.Message) bool {
        return notFull
 }
 
-func (cn *peerConnMsgWriter) write(msg pp.Message) bool {
-       cn.mu.Lock()
-       defer cn.mu.Unlock()
-       cn.writeBuffer.Write(msg.MustMarshalBinary())
-       cn.writeCond.Broadcast()
-       return !cn.writeBufferFull()
-}
-
-func (cn *peerConnMsgWriter) writeBufferFull() bool {
-       return cn.writeBuffer.Len() >= writeBufferHighWaterLen
-}
-
 func (cn *PeerConn) requestMetadataPiece(index int) {
        eID := cn.PeerExtensionIDs[pp.ExtensionNameMetadata]
        if eID == 0 {
@@ -538,10 +530,10 @@ func (cn *PeerConn) unchoke(msg func(pp.Message) bool) bool {
 }
 
 func (cn *Peer) setInterested(interested bool) bool {
-       if cn.interested == interested {
+       if cn.actualRequestState.Interested == interested {
                return true
        }
-       cn.interested = interested
+       cn.actualRequestState.Interested = interested
        if interested {
                cn.lastBecameInterested = time.Now()
        } else if !cn.lastBecameInterested.IsZero() {
@@ -587,20 +579,20 @@ func (cn *Peer) shouldRequest(r Request) error {
        return nil
 }
 
-func (cn *Peer) request(r Request) error {
+func (cn *Peer) request(r Request) (more bool, err error) {
        if err := cn.shouldRequest(r); err != nil {
                panic(err)
        }
-       if _, ok := cn.requests[r]; ok {
-               return nil
+       if _, ok := cn.actualRequestState.Requests[r]; ok {
+               return true, nil
        }
        if cn.numLocalRequests() >= cn.nominalMaxRequests() {
-               return errors.New("too many outstanding requests")
+               return true, errors.New("too many outstanding requests")
        }
-       if cn.requests == nil {
-               cn.requests = make(map[Request]struct{})
+       if cn.actualRequestState.Requests == nil {
+               cn.actualRequestState.Requests = make(map[Request]struct{})
        }
-       cn.requests[r] = struct{}{}
+       cn.actualRequestState.Requests[r] = struct{}{}
        if cn.validReceiveChunks == nil {
                cn.validReceiveChunks = make(map[Request]int)
        }
@@ -610,12 +602,11 @@ func (cn *Peer) request(r Request) error {
        for _, f := range cn.callbacks.SentRequest {
                f(PeerRequestEvent{cn, r})
        }
-       cn.peerImpl._request(r)
-       return nil
+       return cn.peerImpl._request(r), nil
 }
 
-func (me *PeerConn) _request(r Request) {
-       me.write(pp.Message{
+func (me *PeerConn) _request(r Request) bool {
+       return me.write(pp.Message{
                Type:   pp.Request,
                Index:  r.Index,
                Begin:  r.Begin,
@@ -623,17 +614,21 @@ func (me *PeerConn) _request(r Request) {
        })
 }
 
-func (me *Peer) cancel(r Request) {
+func (me *Peer) cancel(r Request) bool {
        if me.deleteRequest(r) {
-               me.peerImpl._cancel(r)
+               return me.peerImpl._cancel(r)
        }
+       return true
 }
 
-func (me *PeerConn) _cancel(r Request) {
-       me.write(makeCancelMessage(r))
+func (me *PeerConn) _cancel(r Request) bool {
+       return me.write(makeCancelMessage(r))
 }
 
 func (cn *PeerConn) fillWriteBuffer() {
+       if !cn.applyNextRequestState() {
+               return
+       }
        if cn.pex.IsEnabled() {
                if flow := cn.pex.Share(cn.write); !flow {
                        return
@@ -668,8 +663,7 @@ func (cn *PeerConn) postBitfield() {
 }
 
 func (cn *PeerConn) updateRequests() {
-       // log.Print("update requests")
-       cn.tickleWriter()
+       cn.t.cl.tickleRequester()
 }
 
 // Emits the indices in the Bitmaps bms in order, never repeating any index.
@@ -1286,7 +1280,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
        // out.
        deletedRequest := false
        {
-               if _, ok := c.requests[req]; ok {
+               if _, ok := c.actualRequestState.Requests[req]; ok {
                        for _, f := range c.callbacks.ReceivedRequested {
                                f(PeerMessageEvent{c, msg})
                        }
@@ -1468,14 +1462,15 @@ func (c *Peer) peerHasWantedPieces() bool {
 }
 
 func (c *Peer) numLocalRequests() int {
-       return len(c.requests)
+       return len(c.actualRequestState.Requests)
 }
 
 func (c *Peer) deleteRequest(r Request) bool {
-       if _, ok := c.requests[r]; !ok {
+       delete(c.nextRequestState.Requests, r)
+       if _, ok := c.actualRequestState.Requests[r]; !ok {
                return false
        }
-       delete(c.requests, r)
+       delete(c.actualRequestState.Requests, r)
        for _, f := range c.callbacks.DeletedRequest {
                f(PeerRequestEvent{c, r})
        }
@@ -1493,12 +1488,13 @@ func (c *Peer) deleteRequest(r Request) bool {
 }
 
 func (c *Peer) deleteAllRequests() {
-       for r := range c.requests {
+       for r := range c.actualRequestState.Requests {
                c.deleteRequest(r)
        }
-       if len(c.requests) != 0 {
-               panic(len(c.requests))
+       if l := len(c.actualRequestState.Requests); l != 0 {
+               panic(l)
        }
+       c.nextRequestState.Requests = nil
        // for c := range c.t.conns {
        //      c.tickleWriter()
        // }
@@ -1635,3 +1631,7 @@ func (p *Peer) TryAsPeerConn() (*PeerConn, bool) {
        pc, ok := p.peerImpl.(*PeerConn)
        return pc, ok
 }
+
+func (p *PeerConn) onNextRequestStateChanged() {
+       p.tickleWriter()
+}
index df9aa44ed670d45c362853ca93def22d2e19454a..7bb61ecdaaccb6461ae384f367e5387486ff94af 100644 (file)
@@ -36,7 +36,7 @@ func TestPexConnState(t *testing.T) {
                out = m
                return true
        }
-       <-c.messageWriter.writeCond.WaitChan()
+       <-c.messageWriter.writeCond.Signaled()
        c.pex.Share(testWriter)
        require.True(t, writerCalled)
        require.EqualValues(t, pp.Extended, out.Type)
index 72f7f3d4ae1b6ebf90118d16d197662605147819..7313c84a3fa1d6fff04aed1ad43364a7efd2a9d7 100644 (file)
@@ -5,25 +5,36 @@ import (
        "unsafe"
 
        "github.com/anacrolix/missinggo/v2/bitmap"
+
+       "github.com/anacrolix/torrent/internal/chansync"
        request_strategy "github.com/anacrolix/torrent/request-strategy"
        "github.com/anacrolix/torrent/types"
 )
 
 func (cl *Client) requester() {
        for {
-               func() {
+               update := func() chansync.Signaled {
                        cl.lock()
                        defer cl.unlock()
                        cl.doRequests()
+                       return cl.updateRequests.Signaled()
                }()
+               // We can probably tune how often to heed this signal. TODO: Currently disabled to retain
+               // existing behaviour, while the signalling is worked out.
+               update = nil
                select {
                case <-cl.closed.LockedChan(cl.locker()):
                        return
+               case <-update:
                case <-time.After(100 * time.Millisecond):
                }
        }
 }
 
+func (cl *Client) tickleRequester() {
+       cl.updateRequests.Broadcast()
+}
+
 func (cl *Client) doRequests() {
        ts := make([]request_strategy.Torrent, 0, len(cl.torrents))
        for _, t := range cl.torrents {
@@ -62,7 +73,7 @@ func (cl *Client) doRequests() {
                                HasPiece:    p.peerHasPiece,
                                MaxRequests: p.nominalMaxRequests(),
                                HasExistingRequest: func(r request_strategy.Request) bool {
-                                       _, ok := p.requests[r]
+                                       _, ok := p.actualRequestState.Requests[r]
                                        return ok
                                },
                                Choking: p.peerChoking,
@@ -81,7 +92,7 @@ func (cl *Client) doRequests() {
                MaxUnverifiedBytes: cl.config.MaxUnverifiedBytes,
        })
        for p, state := range nextPeerStates {
-               applyPeerNextRequestState(p, state)
+               setPeerNextRequestState(p, state)
        }
 }
 
@@ -91,20 +102,35 @@ func (p *peerId) Uintptr() uintptr {
        return uintptr(unsafe.Pointer(p))
 }
 
-func applyPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.PeerNextRequestState) {
+func setPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.PeerNextRequestState) {
        p := (*Peer)(_p.(*peerId))
-       p.setInterested(rp.Interested)
-       for req := range p.requests {
-               if _, ok := rp.Requests[req]; !ok {
-                       p.cancel(req)
+       p.nextRequestState = rp
+       p.onNextRequestStateChanged()
+}
+
+func (p *Peer) applyNextRequestState() bool {
+       next := p.nextRequestState
+       current := p.actualRequestState
+       if !p.setInterested(next.Interested) {
+               return false
+       }
+       for req := range current.Requests {
+               if _, ok := next.Requests[req]; !ok {
+                       if !p.cancel(req) {
+                               return false
+                       }
                }
        }
-       for req := range rp.Requests {
-               err := p.request(req)
+       for req := range next.Requests {
+               more, err := p.request(req)
                if err != nil {
                        panic(err)
                } else {
                        //log.Print(req)
                }
+               if !more {
+                       return false
+               }
        }
+       return true
 }
index 5f2980c3e7ce56dda1ea8c82ebb38df38fefa0b5..7af892f0ff09579a011f0a3b628a869456c135ee 100644 (file)
@@ -45,19 +45,21 @@ func (ws *webseedPeer) writeInterested(interested bool) bool {
        return true
 }
 
-func (ws *webseedPeer) _cancel(r Request) {
+func (ws *webseedPeer) _cancel(r Request) bool {
        active, ok := ws.activeRequests[r]
        if ok {
                active.Cancel()
        }
+       return true
 }
 
 func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec {
        return webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)}
 }
 
-func (ws *webseedPeer) _request(r Request) {
+func (ws *webseedPeer) _request(r Request) bool {
        ws.requesterCond.Signal()
+       return true
 }
 
 func (ws *webseedPeer) doRequest(r Request) {
@@ -76,7 +78,7 @@ func (ws *webseedPeer) requester() {
        defer ws.requesterCond.L.Unlock()
 start:
        for !ws.peer.closed.IsSet() {
-               for r := range ws.peer.requests {
+               for r := range ws.peer.actualRequestState.Requests {
                        if _, ok := ws.activeRequests[r]; ok {
                                continue
                        }
@@ -142,3 +144,7 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re
                }
        }
 }
+
+func (me *webseedPeer) onNextRequestStateChanged() {
+       me.peer.applyNextRequestState()
+}