]> Sergey Matveev's repositories - btrtrc.git/blobdiff - requesting.go
cmd/btrtrc client
[btrtrc.git] / requesting.go
index af609356da0c858f1e91739efbd1727e113b1236..7d32012edd231d12ee20aec20e7b8df58ef0f24d 100644 (file)
@@ -10,6 +10,7 @@ import (
        "unsafe"
 
        "github.com/RoaringBitmap/roaring"
+       g "github.com/anacrolix/generics"
        "github.com/anacrolix/generics/heap"
        "github.com/anacrolix/log"
        "github.com/anacrolix/multiless"
@@ -25,6 +26,7 @@ type (
 )
 
 func (t *Torrent) requestStrategyPieceOrderState(i int) requestStrategy.PieceRequestOrderState {
+       t.slogger().Debug("requestStrategyPieceOrderState", "pieceIndex", i)
        return requestStrategy.PieceRequestOrderState{
                Priority:     t.piece(i).purePriority(),
                Partial:      t.piecePartiallyDownloaded(i),
@@ -78,7 +80,7 @@ type (
 type desiredPeerRequests struct {
        requestIndexes []RequestIndex
        peer           *Peer
-       pieceStates    []requestStrategy.PieceRequestOrderState
+       pieceStates    []g.Option[requestStrategy.PieceRequestOrderState]
 }
 
 func (p *desiredPeerRequests) lessByValue(leftRequest, rightRequest RequestIndex) bool {
@@ -95,8 +97,8 @@ func (p *desiredPeerRequests) lessByValue(leftRequest, rightRequest RequestIndex
                        !p.peer.peerAllowedFast.Contains(rightPieceIndex),
                )
        }
-       leftPiece := &p.pieceStates[leftPieceIndex]
-       rightPiece := &p.pieceStates[rightPieceIndex]
+       leftPiece := p.pieceStates[leftPieceIndex].UnwrapPtr()
+       rightPiece := p.pieceStates[rightPieceIndex].UnwrapPtr()
        // Putting this first means we can steal requests from lesser-performing peers for our first few
        // new requests.
        priority := func() PiecePriority {
@@ -183,19 +185,21 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
                pieceStates:    t.requestPieceStates,
                requestIndexes: t.requestIndexes,
        }
+       clear(requestHeap.pieceStates)
+       t.logPieceRequestOrder()
        // Caller-provided allocation for roaring bitmap iteration.
        var it typedRoaring.Iterator[RequestIndex]
        requestStrategy.GetRequestablePieces(
                input,
                t.getPieceRequestOrder(),
-               func(ih InfoHash, pieceIndex int, pieceExtra requestStrategy.PieceRequestOrderState) {
+               func(ih InfoHash, pieceIndex int, pieceExtra requestStrategy.PieceRequestOrderState) bool {
                        if ih != *t.canonicalShortInfohash() {
-                               return
+                               return false
                        }
                        if !p.peerHasPiece(pieceIndex) {
-                               return
+                               return false
                        }
-                       requestHeap.pieceStates[pieceIndex] = pieceExtra
+                       requestHeap.pieceStates[pieceIndex].Set(pieceExtra)
                        allowedFast := p.peerAllowedFast.Contains(pieceIndex)
                        t.iterUndirtiedRequestIndexesInPiece(&it, pieceIndex, func(r requestStrategy.RequestIndex) {
                                if !allowedFast {
@@ -219,6 +223,7 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
                                }
                                requestHeap.requestIndexes = append(requestHeap.requestIndexes, r)
                        })
+                       return true
                },
        )
        t.assertPendingRequests()
@@ -239,9 +244,10 @@ func (p *Peer) maybeUpdateActualRequestState() {
                        panic(since)
                }
        }
+       p.logger.Slogger().Debug("updating requests", "reason", p.needRequestUpdate)
        pprof.Do(
                context.Background(),
-               pprof.Labels("update request", p.needRequestUpdate),
+               pprof.Labels("update request", string(p.needRequestUpdate)),
                func(_ context.Context) {
                        next := p.getDesiredRequestState()
                        p.applyRequestState(next)
@@ -308,13 +314,29 @@ func (p *Peer) applyRequestState(next desiredRequestState) {
                if cap(next.Requests.requestIndexes) != cap(orig) {
                        panic("changed")
                }
+
+               // don't add requests on reciept of a reject - because this causes request back
+               // to potentially permanently unresponive peers - which just adds network noise.  If
+               // the peer can handle more requests it will send an "unchoked" message - which
+               // will cause it to get added back to the request queue
+               if p.needRequestUpdate == peerUpdateRequestsRemoteRejectReason {
+                       continue
+               }
+
                existing := t.requestingPeer(req)
                if existing != nil && existing != p {
+                       // don't steal on cancel - because this is triggered by t.cancelRequest below
+                       // which means that the cancelled can immediately try to steal back a request
+                       // it has lost which can lead to circular cancel/add processing
+                       if p.needRequestUpdate == peerUpdateRequestsPeerCancelReason {
+                               continue
+                       }
+
                        // Don't steal from the poor.
                        diff := int64(current.Requests.GetCardinality()) + 1 - (int64(existing.uncancelledRequests()) - 1)
                        // Steal a request that leaves us with one more request than the existing peer
                        // connection if the stealer more recently received a chunk.
-                       if diff > 1 || (diff == 1 && p.lastUsefulChunkReceived.Before(existing.lastUsefulChunkReceived)) {
+                       if diff > 1 || (diff == 1 && !p.lastUsefulChunkReceived.After(existing.lastUsefulChunkReceived)) {
                                continue
                        }
                        t.cancelRequest(req)