]> Sergey Matveev's repositories - btrtrc.git/blobdiff - requesting.go
cmd/btrtrc client
[btrtrc.git] / requesting.go
index b70f2645e990b234b28703ab64927d42ae31d1a9..7d32012edd231d12ee20aec20e7b8df58ef0f24d 100644 (file)
@@ -9,6 +9,8 @@ import (
        "time"
        "unsafe"
 
+       "github.com/RoaringBitmap/roaring"
+       g "github.com/anacrolix/generics"
        "github.com/anacrolix/generics/heap"
        "github.com/anacrolix/log"
        "github.com/anacrolix/multiless"
@@ -24,6 +26,7 @@ type (
 )
 
 func (t *Torrent) requestStrategyPieceOrderState(i int) requestStrategy.PieceRequestOrderState {
+       t.slogger().Debug("requestStrategyPieceOrderState", "pieceIndex", i)
        return requestStrategy.PieceRequestOrderState{
                Priority:     t.piece(i).purePriority(),
                Partial:      t.piecePartiallyDownloaded(i),
@@ -77,7 +80,7 @@ type (
 type desiredPeerRequests struct {
        requestIndexes []RequestIndex
        peer           *Peer
-       pieceStates    []requestStrategy.PieceRequestOrderState
+       pieceStates    []g.Option[requestStrategy.PieceRequestOrderState]
 }
 
 func (p *desiredPeerRequests) lessByValue(leftRequest, rightRequest RequestIndex) bool {
@@ -94,13 +97,13 @@ func (p *desiredPeerRequests) lessByValue(leftRequest, rightRequest RequestIndex
                        !p.peer.peerAllowedFast.Contains(rightPieceIndex),
                )
        }
-       leftPiece := &p.pieceStates[leftPieceIndex]
-       rightPiece := &p.pieceStates[rightPieceIndex]
+       leftPiece := p.pieceStates[leftPieceIndex].UnwrapPtr()
+       rightPiece := p.pieceStates[rightPieceIndex].UnwrapPtr()
        // Putting this first means we can steal requests from lesser-performing peers for our first few
        // new requests.
-       priority := func() piecePriority {
+       priority := func() PiecePriority {
                // Technically we would be happy with the cached priority here, except we don't actually
-               // cache it anymore, and Torrent.piecePriority just does another lookup of *Piece to resolve
+               // cache it anymore, and Torrent.PiecePriority just does another lookup of *Piece to resolve
                // the priority through Piece.purePriority, which is probably slower.
                leftPriority := leftPiece.Priority
                rightPriority := rightPiece.Priority
@@ -182,19 +185,21 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
                pieceStates:    t.requestPieceStates,
                requestIndexes: t.requestIndexes,
        }
+       clear(requestHeap.pieceStates)
+       t.logPieceRequestOrder()
        // Caller-provided allocation for roaring bitmap iteration.
        var it typedRoaring.Iterator[RequestIndex]
        requestStrategy.GetRequestablePieces(
                input,
                t.getPieceRequestOrder(),
-               func(ih InfoHash, pieceIndex int, pieceExtra requestStrategy.PieceRequestOrderState) {
-                       if ih != t.infoHash {
-                               return
+               func(ih InfoHash, pieceIndex int, pieceExtra requestStrategy.PieceRequestOrderState) bool {
+                       if ih != *t.canonicalShortInfohash() {
+                               return false
                        }
                        if !p.peerHasPiece(pieceIndex) {
-                               return
+                               return false
                        }
-                       requestHeap.pieceStates[pieceIndex] = pieceExtra
+                       requestHeap.pieceStates[pieceIndex].Set(pieceExtra)
                        allowedFast := p.peerAllowedFast.Contains(pieceIndex)
                        t.iterUndirtiedRequestIndexesInPiece(&it, pieceIndex, func(r requestStrategy.RequestIndex) {
                                if !allowedFast {
@@ -211,12 +216,14 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
                                                return
                                        }
                                }
-                               if p.requestState.Cancelled.Contains(r) {
+                               cancelled := &p.requestState.Cancelled
+                               if !cancelled.IsEmpty() && cancelled.Contains(r) {
                                        // Can't re-request while awaiting acknowledgement.
                                        return
                                }
                                requestHeap.requestIndexes = append(requestHeap.requestIndexes, r)
                        })
+                       return true
                },
        )
        t.assertPendingRequests()
@@ -237,25 +244,60 @@ func (p *Peer) maybeUpdateActualRequestState() {
                        panic(since)
                }
        }
+       p.logger.Slogger().Debug("updating requests", "reason", p.needRequestUpdate)
        pprof.Do(
                context.Background(),
-               pprof.Labels("update request", p.needRequestUpdate),
+               pprof.Labels("update request", string(p.needRequestUpdate)),
                func(_ context.Context) {
                        next := p.getDesiredRequestState()
                        p.applyRequestState(next)
-                       p.t.requestIndexes = next.Requests.requestIndexes[:0]
+                       p.t.cacheNextRequestIndexesForReuse(next.Requests.requestIndexes)
                },
        )
 }
 
+func (t *Torrent) cacheNextRequestIndexesForReuse(slice []RequestIndex) {
+       // The incoming slice can be smaller when getDesiredRequestState short circuits on some
+       // conditions.
+       if cap(slice) > cap(t.requestIndexes) {
+               t.requestIndexes = slice[:0]
+       }
+}
+
+// Whether we should allow sending not interested ("losing interest") to the peer. I noticed
+// qBitTorrent seems to punish us for sending not interested when we're streaming and don't
+// currently need anything.
+func (p *Peer) allowSendNotInterested() bool {
+       // Except for caching, we're not likely to lose pieces very soon.
+       if p.t.haveAllPieces() {
+               return true
+       }
+       all, known := p.peerHasAllPieces()
+       if all || !known {
+               return false
+       }
+       // Allow losing interest if we have all the pieces the peer has.
+       return roaring.AndNot(p.peerPieces(), &p.t._completedPieces).IsEmpty()
+}
+
 // Transmit/action the request state to the peer.
 func (p *Peer) applyRequestState(next desiredRequestState) {
        current := &p.requestState
+       // Make interest sticky
+       if !next.Interested && p.requestState.Interested {
+               if !p.allowSendNotInterested() {
+                       next.Interested = true
+               }
+       }
        if !p.setInterested(next.Interested) {
                return
        }
        more := true
-       requestHeap := heap.InterfaceForSlice(&next.Requests.requestIndexes, next.Requests.lessByValue)
+       orig := next.Requests.requestIndexes
+       requestHeap := heap.InterfaceForSlice(
+               &next.Requests.requestIndexes,
+               next.Requests.lessByValue,
+       )
        heap.Init(requestHeap)
 
        t := p.t
@@ -269,13 +311,32 @@ func (p *Peer) applyRequestState(next desiredRequestState) {
                        break
                }
                req := heap.Pop(requestHeap)
+               if cap(next.Requests.requestIndexes) != cap(orig) {
+                       panic("changed")
+               }
+
+               // don't add requests on reciept of a reject - because this causes request back
+               // to potentially permanently unresponive peers - which just adds network noise.  If
+               // the peer can handle more requests it will send an "unchoked" message - which
+               // will cause it to get added back to the request queue
+               if p.needRequestUpdate == peerUpdateRequestsRemoteRejectReason {
+                       continue
+               }
+
                existing := t.requestingPeer(req)
                if existing != nil && existing != p {
+                       // don't steal on cancel - because this is triggered by t.cancelRequest below
+                       // which means that the cancelled can immediately try to steal back a request
+                       // it has lost which can lead to circular cancel/add processing
+                       if p.needRequestUpdate == peerUpdateRequestsPeerCancelReason {
+                               continue
+                       }
+
                        // Don't steal from the poor.
                        diff := int64(current.Requests.GetCardinality()) + 1 - (int64(existing.uncancelledRequests()) - 1)
                        // Steal a request that leaves us with one more request than the existing peer
                        // connection if the stealer more recently received a chunk.
-                       if diff > 1 || (diff == 1 && p.lastUsefulChunkReceived.Before(existing.lastUsefulChunkReceived)) {
+                       if diff > 1 || (diff == 1 && !p.lastUsefulChunkReceived.After(existing.lastUsefulChunkReceived)) {
                                continue
                        }
                        t.cancelRequest(req)