]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Update webseeds for peer requesting
authorMatt Joiner <anacrolix@gmail.com>
Wed, 20 Oct 2021 23:28:57 +0000 (10:28 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Wed, 20 Oct 2021 23:28:57 +0000 (10:28 +1100)
client.go
peer-impl.go
peerconn.go
requesting.go
torrent.go
webseed-peer.go

index 1efd98966182d4bbfe3f7a6504ad8790918c85fb..72013b99bf13ecbe34f400341ce41033114021b8 100644 (file)
--- a/client.go
+++ b/client.go
@@ -957,8 +957,7 @@ func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
        defer t.dropConnection(c)
        c.startWriter()
        cl.sendInitialMessages(c, t)
-       c.updateRequestsTimer = time.AfterFunc(math.MaxInt64, c.updateRequestsTimerFunc)
-       c.updateRequestsTimer.Stop()
+       c.initUpdateRequestsTimer()
        err := c.mainReadLoop()
        if err != nil {
                return fmt.Errorf("main read loop: %w", err)
@@ -966,7 +965,19 @@ func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
        return nil
 }
 
-func (c *PeerConn) updateRequestsTimerFunc() {
+const check = false
+
+func (p *Peer) initUpdateRequestsTimer() {
+       if check {
+               if p.updateRequestsTimer != nil {
+                       panic(p.updateRequestsTimer)
+               }
+       }
+       p.updateRequestsTimer = time.AfterFunc(math.MaxInt64, p.updateRequestsTimerFunc)
+       p.updateRequestsTimer.Stop()
+}
+
+func (c *Peer) updateRequestsTimerFunc() {
        c.locker().Lock()
        defer c.locker().Unlock()
        if c.needRequestUpdate != "" {
index eb926c748d46cf5d75825201511f1832b7b74f9a..4dbc6b4c08046832804203eb80a638d944e28619 100644 (file)
@@ -8,7 +8,8 @@ import (
 // BitTorrent protocol connections. Some methods are underlined so as to avoid collisions with
 // legacy PeerConn methods.
 type peerImpl interface {
-       updateRequests(reason string)
+       // Trigger the actual request state to get updated
+       handleUpdateRequests()
        writeInterested(interested bool) bool
 
        // Neither of these return buffer room anymore, because they're currently both posted. There's
index 48453f2bc571ba4431d2e92eb2fae0d5c1acb3d9..a9d0baad2696e0fc5e0161b2a9dc723bc162dcea 100644 (file)
@@ -243,7 +243,7 @@ func (cn *Peer) peerHasAllPieces() (all bool, known bool) {
        return roaring.Flip(&cn._peerPieces, 0, bitmap.BitRange(cn.t.numPieces())).IsEmpty(), true
 }
 
-func (cn *PeerConn) locker() *lockWithDeferreds {
+func (cn *Peer) locker() *lockWithDeferreds {
        return cn.t.cl.locker()
 }
 
@@ -403,6 +403,9 @@ func (p *Peer) close() {
        if !p.closed.Set() {
                return
        }
+       if p.updateRequestsTimer != nil {
+               p.updateRequestsTimer.Stop()
+       }
        p.peerImpl.onClose()
        if p.t != nil {
                p.t.decPeerPieceAvailability(p)
@@ -416,9 +419,6 @@ func (cn *PeerConn) onClose() {
        if cn.pex.IsEnabled() {
                cn.pex.Close()
        }
-       if cn.updateRequestsTimer != nil {
-               cn.updateRequestsTimer.Stop()
-       }
        cn.tickleWriter()
        if cn.conn != nil {
                cn.conn.Close()
@@ -638,7 +638,7 @@ func (me *PeerConn) _cancel(r RequestIndex) bool {
 }
 
 func (cn *PeerConn) fillWriteBuffer() {
-       if !cn.applyNextRequestState() {
+       if !cn.maybeUpdateActualRequestState() {
                return
        }
        if cn.pex.IsEnabled() {
@@ -674,11 +674,17 @@ func (cn *PeerConn) postBitfield() {
        cn.sentHaves = bitmap.Bitmap{cn.t._completedPieces.Clone()}
 }
 
-func (cn *PeerConn) updateRequests(reason string) {
+// Sets a reason to update requests, and if there wasn't already one, handle it.
+func (cn *Peer) updateRequests(reason string) {
        if cn.needRequestUpdate != "" {
                return
        }
        cn.needRequestUpdate = reason
+       cn.handleUpdateRequests()
+}
+
+func (cn *PeerConn) handleUpdateRequests() {
+       // The writer determines the request state as needed when it can write.
        cn.tickleWriter()
 }
 
index 97e46e9cee86598a803188a346507f100302d85c..6d707fdef229ab4605b978366987069458e45c15 100644 (file)
@@ -237,7 +237,7 @@ func (p *Peer) getDesiredRequestState() (desired requestState) {
        return
 }
 
-func (p *Peer) applyNextRequestState() bool {
+func (p *Peer) maybeUpdateActualRequestState() bool {
        if p.needRequestUpdate == "" {
                return true
        }
@@ -253,6 +253,7 @@ func (p *Peer) applyNextRequestState() bool {
        return more
 }
 
+// Transmit/action the request state to the peer.
 func (p *Peer) applyRequestState(next requestState) bool {
        current := &p.actualRequestState
        if !p.setInterested(next.Interested) {
@@ -267,6 +268,12 @@ func (p *Peer) applyRequestState(next requestState) bool {
        if !more {
                return false
        }
+       // We randomize the order in which requests are issued, to reduce the overlap with requests to
+       // other peers. Note that although it really depends on what order the peer services the
+       // requests, if we are only able to issue some requests before buffering, or the peer starts
+       // handling our requests before they've all arrived, then this randomization should reduce
+       // overlap. Note however that if we received the desired requests in priority order, then
+       // randomizing would throw away that benefit.
        for _, x := range rand.Perm(int(next.Requests.GetCardinality())) {
                req, err := next.Requests.Select(uint32(x))
                if err != nil {
@@ -277,6 +284,11 @@ func (p *Peer) applyRequestState(next requestState) bool {
                        // requests, so we can skip this one with no additional consideration.
                        continue
                }
+               // The cardinality of our desired requests shouldn't exceed the max requests since it's used
+               // in the calculation of the requests. However if we cancelled requests and they haven't
+               // been rejected or serviced yet with the fast extension enabled, we can end up with more
+               // extra outstanding requests. We could subtract the number of outstanding cancels from the
+               // next request cardinality, but peers might not like that.
                if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() {
                        //log.Printf("not assigning all requests [desired=%v, cancelled=%v, current=%v, max=%v]",
                        //      next.Requests.GetCardinality(),
index 9af6a44d4fdf135c821995dd61800948e7bf06bf..9227734271db72514e0aa9b592dc2bd6b0460ee2 100644 (file)
@@ -2226,6 +2226,7 @@ func (t *Torrent) addWebSeed(url string) {
                },
                activeRequests: make(map[Request]webseed.Request, maxRequests),
        }
+       ws.peer.initUpdateRequestsTimer()
        ws.requesterCond.L = t.cl.locker()
        for i := 0; i < maxRequests; i += 1 {
                go ws.requester()
index 01518f450740f4de473b26dcbdd5bc0eaefc5229..68fbc3e7458d707c11851ee6935d848443d5756e 100644 (file)
@@ -8,6 +8,7 @@ import (
        "strings"
        "sync"
 
+       "github.com/anacrolix/log"
        "github.com/anacrolix/torrent/common"
        "github.com/anacrolix/torrent/metainfo"
        pp "github.com/anacrolix/torrent/peer_protocol"
@@ -105,11 +106,12 @@ func (ws *webseedPeer) connectionFlags() string {
 // return bool if this is even possible, and if it isn't, skip to the next drop candidate.
 func (ws *webseedPeer) drop() {}
 
-func (ws *webseedPeer) updateRequests(reason string) {
+func (ws *webseedPeer) handleUpdateRequests() {
+       ws.peer.maybeUpdateActualRequestState()
 }
 
 func (ws *webseedPeer) onClose() {
-       ws.peer.logger.Print("closing")
+       ws.peer.logger.WithLevel(log.Debug).Print("closing")
        for _, r := range ws.activeRequests {
                r.Cancel()
        }