From: Matt Joiner <anacrolix@gmail.com>
Date: Thu, 20 May 2021 10:23:45 +0000 (+1000)
Subject: Apply next request state asynchronously
X-Git-Tag: v1.29.0~31^2~14
X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=36f52d7a145142e682b79563c84b229159fdcb58;p=btrtrc.git

Apply next request state asynchronously
---

diff --git a/client.go b/client.go
index 7ac6b264..1858b848 100644
--- a/client.go
+++ b/client.go
@@ -33,12 +33,12 @@ import (
 	"golang.org/x/xerrors"
 
 	"github.com/anacrolix/torrent/bencode"
+	"github.com/anacrolix/torrent/internal/chansync"
 	"github.com/anacrolix/torrent/internal/limiter"
 	"github.com/anacrolix/torrent/iplist"
 	"github.com/anacrolix/torrent/metainfo"
 	"github.com/anacrolix/torrent/mse"
 	pp "github.com/anacrolix/torrent/peer_protocol"
-	request_strategy "github.com/anacrolix/torrent/request-strategy"
 	"github.com/anacrolix/torrent/storage"
 	"github.com/anacrolix/torrent/tracker"
 	"github.com/anacrolix/torrent/webtorrent"
@@ -81,7 +81,7 @@ type Client struct {
 
 	activeAnnounceLimiter limiter.Instance
 
-	pieceRequestOrder request_strategy.ClientPieceOrder
+	updateRequests chansync.BroadcastCond
 }
 
 type ipStr string
diff --git a/internal/chansync/broadcast-cond.go.go b/internal/chansync/broadcast-cond.go.go
index 9b890692..6d96d3c4 100644
--- a/internal/chansync/broadcast-cond.go.go
+++ b/internal/chansync/broadcast-cond.go.go
@@ -22,7 +22,8 @@ func (me *BroadcastCond) Broadcast() {
 }
 
 // Should be called before releasing locks on resources that might trigger subsequent Broadcasts.
-func (me *BroadcastCond) WaitChan() <-chan struct{} {
+// The channel is closed when the condition changes.
+func (me *BroadcastCond) Signaled() Signaled {
 	me.mu.Lock()
 	defer me.mu.Unlock()
 	if me.ch == nil {
diff --git a/internal/chansync/set-once.go b/internal/chansync/set-once.go
index 523e5eaf..db0e6e89 100644
--- a/internal/chansync/set-once.go
+++ b/internal/chansync/set-once.go
@@ -9,7 +9,8 @@ type SetOnce struct {
 	closeOnce sync.Once
 }
 
-func (me *SetOnce) Chan() <-chan struct{} {
+// Returns a channel that is closed when the event is flagged.
+func (me *SetOnce) Done() Done {
 	me.init()
 	return me.ch
 }
diff --git a/peer-impl.go b/peer-impl.go
index 880b8f35..23c0fbb9 100644
--- a/peer-impl.go
+++ b/peer-impl.go
@@ -8,13 +8,14 @@ import (
 // BitTorrent protocol connections. Some methods are underlined so as to avoid collisions with
 // legacy PeerConn methods.
 type peerImpl interface {
+	onNextRequestStateChanged()
 	updateRequests()
 	writeInterested(interested bool) bool
 
 	// Neither of these return buffer room anymore, because they're currently both posted. There's
 	// also PeerConn.writeBufferFull for when/where it matters.
-	_cancel(Request)
-	_request(Request)
+	_cancel(Request) bool
+	_request(Request) bool
 
 	connectionFlags() string
 	onClose()
diff --git a/peerconn.go b/peerconn.go
index b2753d6a..40848e37 100644
--- a/peerconn.go
+++ b/peerconn.go
@@ -24,6 +24,7 @@ import (
 	"github.com/anacrolix/torrent/metainfo"
 	"github.com/anacrolix/torrent/mse"
 	pp "github.com/anacrolix/torrent/peer_protocol"
+	request_strategy "github.com/anacrolix/torrent/request-strategy"
 )
 
 type PeerSource string
@@ -48,7 +49,10 @@ type PeerRemoteAddr interface {
 
 // Since we have to store all the requests in memory, we can't reasonably exceed what would be
 // indexable with the memory space available.
-type maxRequests = int
+type (
+	maxRequests  = int
+	requestState = request_strategy.PeerNextRequestState
+)
 
 type Peer struct {
 	// First to ensure 64-bit alignment for atomics. See #262.
@@ -78,7 +82,8 @@ type Peer struct {
 	lastChunkSent           time.Time
 
 	// Stuff controlled by the local peer.
-	interested           bool
+	nextRequestState     requestState
+	actualRequestState   requestState
 	lastBecameInterested time.Time
 	priorInterest        time.Duration
 
@@ -87,7 +92,6 @@ type Peer struct {
 	_chunksReceivedWhileExpecting       int64
 
 	choking                                bool
-	requests                               map[Request]struct{}
 	piecesReceivedSinceLastRequestUpdate   maxRequests
 	maxPiecesReceivedBetweenRequestUpdates maxRequests
 	// Chunks that we might reasonably expect to receive from the peer. Due to
@@ -171,13 +175,13 @@ func (cn *Peer) updateExpectingChunks() {
 }
 
 func (cn *Peer) expectingChunks() bool {
-	if len(cn.requests) == 0 {
+	if len(cn.actualRequestState.Requests) == 0 {
 		return false
 	}
-	if !cn.interested {
+	if !cn.actualRequestState.Interested {
 		return false
 	}
-	for r := range cn.requests {
+	for r := range cn.actualRequestState.Requests {
 		if !cn.remoteChokingPiece(r.Index.Int()) {
 			return true
 		}
@@ -217,7 +221,7 @@ func (l *PeerConn) hasPreferredNetworkOver(r *PeerConn) (left, ok bool) {
 
 func (cn *Peer) cumInterest() time.Duration {
 	ret := cn.priorInterest
-	if cn.interested {
+	if cn.actualRequestState.Interested {
 		ret += time.Since(cn.lastBecameInterested)
 	}
 	return ret
@@ -301,7 +305,7 @@ func (cn *Peer) statusFlags() (ret string) {
 	c := func(b byte) {
 		ret += string([]byte{b})
 	}
-	if cn.interested {
+	if cn.actualRequestState.Interested {
 		c('i')
 	}
 	if cn.choking {
@@ -329,7 +333,7 @@ func (cn *Peer) downloadRate() float64 {
 
 func (cn *Peer) numRequestsByPiece() (ret map[pieceIndex]int) {
 	ret = make(map[pieceIndex]int)
-	for r := range cn.requests {
+	for r := range cn.actualRequestState.Requests {
 		ret[pieceIndex(r.Index)]++
 	}
 	return
@@ -437,18 +441,6 @@ func (cn *PeerConn) write(msg pp.Message) bool {
 	return notFull
 }
 
-func (cn *peerConnMsgWriter) write(msg pp.Message) bool {
-	cn.mu.Lock()
-	defer cn.mu.Unlock()
-	cn.writeBuffer.Write(msg.MustMarshalBinary())
-	cn.writeCond.Broadcast()
-	return !cn.writeBufferFull()
-}
-
-func (cn *peerConnMsgWriter) writeBufferFull() bool {
-	return cn.writeBuffer.Len() >= writeBufferHighWaterLen
-}
-
 func (cn *PeerConn) requestMetadataPiece(index int) {
 	eID := cn.PeerExtensionIDs[pp.ExtensionNameMetadata]
 	if eID == 0 {
@@ -538,10 +530,10 @@ func (cn *PeerConn) unchoke(msg func(pp.Message) bool) bool {
 }
 
 func (cn *Peer) setInterested(interested bool) bool {
-	if cn.interested == interested {
+	if cn.actualRequestState.Interested == interested {
 		return true
 	}
-	cn.interested = interested
+	cn.actualRequestState.Interested = interested
 	if interested {
 		cn.lastBecameInterested = time.Now()
 	} else if !cn.lastBecameInterested.IsZero() {
@@ -587,20 +579,20 @@ func (cn *Peer) shouldRequest(r Request) error {
 	return nil
 }
 
-func (cn *Peer) request(r Request) error {
+func (cn *Peer) request(r Request) (more bool, err error) {
 	if err := cn.shouldRequest(r); err != nil {
 		panic(err)
 	}
-	if _, ok := cn.requests[r]; ok {
-		return nil
+	if _, ok := cn.actualRequestState.Requests[r]; ok {
+		return true, nil
 	}
 	if cn.numLocalRequests() >= cn.nominalMaxRequests() {
-		return errors.New("too many outstanding requests")
+		return true, errors.New("too many outstanding requests")
 	}
-	if cn.requests == nil {
-		cn.requests = make(map[Request]struct{})
+	if cn.actualRequestState.Requests == nil {
+		cn.actualRequestState.Requests = make(map[Request]struct{})
 	}
-	cn.requests[r] = struct{}{}
+	cn.actualRequestState.Requests[r] = struct{}{}
 	if cn.validReceiveChunks == nil {
 		cn.validReceiveChunks = make(map[Request]int)
 	}
@@ -610,12 +602,11 @@ func (cn *Peer) request(r Request) error {
 	for _, f := range cn.callbacks.SentRequest {
 		f(PeerRequestEvent{cn, r})
 	}
-	cn.peerImpl._request(r)
-	return nil
+	return cn.peerImpl._request(r), nil
 }
 
-func (me *PeerConn) _request(r Request) {
-	me.write(pp.Message{
+func (me *PeerConn) _request(r Request) bool {
+	return me.write(pp.Message{
 		Type:   pp.Request,
 		Index:  r.Index,
 		Begin:  r.Begin,
@@ -623,17 +614,21 @@ func (me *PeerConn) _request(r Request) {
 	})
 }
 
-func (me *Peer) cancel(r Request) {
+func (me *Peer) cancel(r Request) bool {
 	if me.deleteRequest(r) {
-		me.peerImpl._cancel(r)
+		return me.peerImpl._cancel(r)
 	}
+	return true
 }
 
-func (me *PeerConn) _cancel(r Request) {
-	me.write(makeCancelMessage(r))
+func (me *PeerConn) _cancel(r Request) bool {
+	return me.write(makeCancelMessage(r))
 }
 
 func (cn *PeerConn) fillWriteBuffer() {
+	if !cn.applyNextRequestState() {
+		return
+	}
 	if cn.pex.IsEnabled() {
 		if flow := cn.pex.Share(cn.write); !flow {
 			return
@@ -668,8 +663,7 @@ func (cn *PeerConn) postBitfield() {
 }
 
 func (cn *PeerConn) updateRequests() {
-	// log.Print("update requests")
-	cn.tickleWriter()
+	cn.t.cl.tickleRequester()
 }
 
 // Emits the indices in the Bitmaps bms in order, never repeating any index.
@@ -1286,7 +1280,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
 	// out.
 	deletedRequest := false
 	{
-		if _, ok := c.requests[req]; ok {
+		if _, ok := c.actualRequestState.Requests[req]; ok {
 			for _, f := range c.callbacks.ReceivedRequested {
 				f(PeerMessageEvent{c, msg})
 			}
@@ -1468,14 +1462,15 @@ func (c *Peer) peerHasWantedPieces() bool {
 }
 
 func (c *Peer) numLocalRequests() int {
-	return len(c.requests)
+	return len(c.actualRequestState.Requests)
 }
 
 func (c *Peer) deleteRequest(r Request) bool {
-	if _, ok := c.requests[r]; !ok {
+	delete(c.nextRequestState.Requests, r)
+	if _, ok := c.actualRequestState.Requests[r]; !ok {
 		return false
 	}
-	delete(c.requests, r)
+	delete(c.actualRequestState.Requests, r)
 	for _, f := range c.callbacks.DeletedRequest {
 		f(PeerRequestEvent{c, r})
 	}
@@ -1493,12 +1488,13 @@ func (c *Peer) deleteRequest(r Request) bool {
 }
 
 func (c *Peer) deleteAllRequests() {
-	for r := range c.requests {
+	for r := range c.actualRequestState.Requests {
 		c.deleteRequest(r)
 	}
-	if len(c.requests) != 0 {
-		panic(len(c.requests))
+	if l := len(c.actualRequestState.Requests); l != 0 {
+		panic(l)
 	}
+	c.nextRequestState.Requests = nil
 	// for c := range c.t.conns {
 	// 	c.tickleWriter()
 	// }
@@ -1635,3 +1631,7 @@ func (p *Peer) TryAsPeerConn() (*PeerConn, bool) {
 	pc, ok := p.peerImpl.(*PeerConn)
 	return pc, ok
 }
+
+func (p *PeerConn) onNextRequestStateChanged() {
+	p.tickleWriter()
+}
diff --git a/pexconn_test.go b/pexconn_test.go
index df9aa44e..7bb61ecd 100644
--- a/pexconn_test.go
+++ b/pexconn_test.go
@@ -36,7 +36,7 @@ func TestPexConnState(t *testing.T) {
 		out = m
 		return true
 	}
-	<-c.messageWriter.writeCond.WaitChan()
+	<-c.messageWriter.writeCond.Signaled()
 	c.pex.Share(testWriter)
 	require.True(t, writerCalled)
 	require.EqualValues(t, pp.Extended, out.Type)
diff --git a/request-strategy.go b/request-strategy.go
index 72f7f3d4..7313c84a 100644
--- a/request-strategy.go
+++ b/request-strategy.go
@@ -5,25 +5,36 @@ import (
 	"unsafe"
 
 	"github.com/anacrolix/missinggo/v2/bitmap"
+
+	"github.com/anacrolix/torrent/internal/chansync"
 	request_strategy "github.com/anacrolix/torrent/request-strategy"
 	"github.com/anacrolix/torrent/types"
 )
 
 func (cl *Client) requester() {
 	for {
-		func() {
+		update := func() chansync.Signaled {
 			cl.lock()
 			defer cl.unlock()
 			cl.doRequests()
+			return cl.updateRequests.Signaled()
 		}()
+		// We can probably tune how often to heed this signal. TODO: Currently disabled to retain
+		// existing behaviour, while the signalling is worked out.
+		update = nil
 		select {
 		case <-cl.closed.LockedChan(cl.locker()):
 			return
+		case <-update:
 		case <-time.After(100 * time.Millisecond):
 		}
 	}
 }
 
+func (cl *Client) tickleRequester() {
+	cl.updateRequests.Broadcast()
+}
+
 func (cl *Client) doRequests() {
 	ts := make([]request_strategy.Torrent, 0, len(cl.torrents))
 	for _, t := range cl.torrents {
@@ -62,7 +73,7 @@ func (cl *Client) doRequests() {
 				HasPiece:    p.peerHasPiece,
 				MaxRequests: p.nominalMaxRequests(),
 				HasExistingRequest: func(r request_strategy.Request) bool {
-					_, ok := p.requests[r]
+					_, ok := p.actualRequestState.Requests[r]
 					return ok
 				},
 				Choking: p.peerChoking,
@@ -81,7 +92,7 @@ func (cl *Client) doRequests() {
 		MaxUnverifiedBytes: cl.config.MaxUnverifiedBytes,
 	})
 	for p, state := range nextPeerStates {
-		applyPeerNextRequestState(p, state)
+		setPeerNextRequestState(p, state)
 	}
 }
 
@@ -91,20 +102,35 @@ func (p *peerId) Uintptr() uintptr {
 	return uintptr(unsafe.Pointer(p))
 }
 
-func applyPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.PeerNextRequestState) {
+func setPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.PeerNextRequestState) {
 	p := (*Peer)(_p.(*peerId))
-	p.setInterested(rp.Interested)
-	for req := range p.requests {
-		if _, ok := rp.Requests[req]; !ok {
-			p.cancel(req)
+	p.nextRequestState = rp
+	p.onNextRequestStateChanged()
+}
+
+func (p *Peer) applyNextRequestState() bool {
+	next := p.nextRequestState
+	current := p.actualRequestState
+	if !p.setInterested(next.Interested) {
+		return false
+	}
+	for req := range current.Requests {
+		if _, ok := next.Requests[req]; !ok {
+			if !p.cancel(req) {
+				return false
+			}
 		}
 	}
-	for req := range rp.Requests {
-		err := p.request(req)
+	for req := range next.Requests {
+		more, err := p.request(req)
 		if err != nil {
 			panic(err)
 		} else {
 			//log.Print(req)
 		}
+		if !more {
+			return false
+		}
 	}
+	return true
 }
diff --git a/webseed-peer.go b/webseed-peer.go
index 5f2980c3..7af892f0 100644
--- a/webseed-peer.go
+++ b/webseed-peer.go
@@ -45,19 +45,21 @@ func (ws *webseedPeer) writeInterested(interested bool) bool {
 	return true
 }
 
-func (ws *webseedPeer) _cancel(r Request) {
+func (ws *webseedPeer) _cancel(r Request) bool {
 	active, ok := ws.activeRequests[r]
 	if ok {
 		active.Cancel()
 	}
+	return true
 }
 
 func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec {
 	return webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)}
 }
 
-func (ws *webseedPeer) _request(r Request) {
+func (ws *webseedPeer) _request(r Request) bool {
 	ws.requesterCond.Signal()
+	return true
 }
 
 func (ws *webseedPeer) doRequest(r Request) {
@@ -76,7 +78,7 @@ func (ws *webseedPeer) requester() {
 	defer ws.requesterCond.L.Unlock()
 start:
 	for !ws.peer.closed.IsSet() {
-		for r := range ws.peer.requests {
+		for r := range ws.peer.actualRequestState.Requests {
 			if _, ok := ws.activeRequests[r]; ok {
 				continue
 			}
@@ -142,3 +144,7 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re
 		}
 	}
 }
+
+func (me *webseedPeer) onNextRequestStateChanged() {
+	me.peer.applyNextRequestState()
+}