]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Try request stealing
authorMatt Joiner <anacrolix@gmail.com>
Fri, 3 Dec 2021 10:30:41 +0000 (21:30 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Sun, 12 Dec 2021 07:35:01 +0000 (18:35 +1100)
peerconn.go
requesting.go
torrent.go

index 7f8a78f13afc1b5d3a356c14d7ca2002dcccb83c..0eec60370bfc2713a8644af2a067f75db47cb032 100644 (file)
@@ -619,7 +619,7 @@ func (cn *Peer) request(r RequestIndex) (more bool, err error) {
                cn.validReceiveChunks = make(map[RequestIndex]int)
        }
        cn.validReceiveChunks[r]++
-       cn.t.pendingRequests.Inc(r)
+       cn.t.pendingRequests[r] = cn
        cn.t.lastRequested[r] = time.Now()
        cn.updateExpectingChunks()
        ppReq := cn.t.requestIndexToRequest(r)
@@ -1550,10 +1550,8 @@ func (c *Peer) deleteRequest(r RequestIndex) bool {
                f(PeerRequestEvent{c, c.t.requestIndexToRequest(r)})
        }
        c.updateExpectingChunks()
-       c.t.pendingRequests.Dec(r)
-       if c.t.pendingRequests.Get(r) == 0 {
-               delete(c.t.lastRequested, r)
-       }
+       delete(c.t.pendingRequests, r)
+       delete(c.t.lastRequested, r)
        return true
 }
 
index 2a400ccd05e25c3b6fe4a147a5ef3125ee37fdbe..50771025d88a67df603ab2da3e14bc6fe3838751 100644 (file)
@@ -4,7 +4,6 @@ import (
        "container/heap"
        "context"
        "encoding/gob"
-       "math/rand"
        "reflect"
        "runtime/pprof"
        "time"
@@ -82,20 +81,6 @@ func (p *peerRequests) Less(i, j int) bool {
        t := p.peer.t
        leftPieceIndex := leftRequest / t.chunksPerRegularPiece()
        rightPieceIndex := rightRequest / t.chunksPerRegularPiece()
-       leftCurrent := p.peer.actualRequestState.Requests.Contains(leftRequest)
-       rightCurrent := p.peer.actualRequestState.Requests.Contains(rightRequest)
-       pending := func(index RequestIndex, current bool) int {
-               ret := t.pendingRequests.Get(index)
-               if current {
-                       ret--
-               }
-               // See https://github.com/anacrolix/torrent/issues/679 for possible issues. This should be
-               // resolved.
-               if ret < 0 {
-                       panic(ret)
-               }
-               return ret
-       }
        ml := multiless.New()
        // Push requests that can't be served right now to the end. But we don't throw them away unless
        // there's a better alternative. This is for when we're using the fast extension and get choked
@@ -106,28 +91,33 @@ func (p *peerRequests) Less(i, j int) bool {
                        !p.peer.peerAllowedFast.Contains(rightPieceIndex),
                )
        }
-       ml = ml.Int(
-               pending(leftRequest, leftCurrent),
-               pending(rightRequest, rightCurrent))
-       ml = ml.Bool(!leftCurrent, !rightCurrent)
+       leftPeer := t.pendingRequests[leftRequest]
+       rightPeer := t.pendingRequests[rightRequest]
+       ml = ml.Bool(rightPeer == p.peer, leftPeer == p.peer)
+       ml = ml.Bool(rightPeer == nil, leftPeer == nil)
+       if ml.Ok() {
+               return ml.MustLess()
+       }
+       if leftPeer != nil {
+               ml = ml.Uint64(
+                       rightPeer.actualRequestState.Requests.GetCardinality(),
+                       leftPeer.actualRequestState.Requests.GetCardinality(),
+               )
+       }
+       ml = ml.CmpInt64(t.lastRequested[rightRequest].Sub(t.lastRequested[leftRequest]).Nanoseconds())
        leftPiece := t.piece(int(leftPieceIndex))
        rightPiece := t.piece(int(rightPieceIndex))
        ml = ml.Int(
+               // Technically we would be happy with the cached priority here, except we don't actually
+               // cache it anymore, and Torrent.piecePriority just does another lookup of *Piece to resolve
+               // the priority through Piece.purePriority, which is probably slower.
                -int(leftPiece.purePriority()),
                -int(rightPiece.purePriority()),
        )
        ml = ml.Int(
                int(leftPiece.availability),
                int(rightPiece.availability))
-       leftLastRequested := p.peer.t.lastRequested[leftRequest]
-       rightLastRequested := p.peer.t.lastRequested[rightRequest]
-       ml = ml.EagerSameLess(
-               leftLastRequested.Equal(rightLastRequested),
-               leftLastRequested.Before(rightLastRequested),
-       )
-       ml = ml.Uint32(leftPieceIndex, rightPieceIndex)
-       ml = ml.Uint32(leftRequest, rightRequest)
-       return ml.MustLess()
+       return ml.Less()
 }
 
 func (p *peerRequests) Swap(i, j int) {
@@ -146,7 +136,7 @@ func (p *peerRequests) Pop() interface{} {
 }
 
 type desiredRequestState struct {
-       Requests   []RequestIndex
+       Requests   peerRequests
        Interested bool
 }
 
@@ -175,7 +165,9 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
                                //      return
                                // }
                                if !allowedFast {
-                                       // We must signal interest to request this
+                                       // We must signal interest to request this. TODO: We could set interested if the
+                                       // peers pieces (minus the allowed fast set) overlap with our missing pieces if
+                                       // there are any readers, or any pending pieces.
                                        desired.Interested = true
                                        // We can make or will allow sustaining a request here if we're not choked, or
                                        // have made the request previously (presumably while unchoked), and haven't had
@@ -186,23 +178,12 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
                                                return
                                        }
                                }
-                               // Note that we can still be interested if we filter all requests due to being
-                               // recently requested from another peer.
-                               if !p.actualRequestState.Requests.Contains(r) {
-                                       if time.Since(p.t.lastRequested[r]) < time.Second {
-                                               return
-                                       }
-                               }
                                requestHeap.requestIndexes = append(requestHeap.requestIndexes, r)
                        })
                },
        )
        p.t.assertPendingRequests()
-       heap.Init(&requestHeap)
-       for requestHeap.Len() != 0 && len(desired.Requests) < p.nominalMaxRequests() {
-               requestIndex := heap.Pop(&requestHeap).(RequestIndex)
-               desired.Requests = append(desired.Requests, requestIndex)
-       }
+       desired.Requests = requestHeap
        return
 }
 
@@ -229,64 +210,19 @@ func (p *Peer) applyRequestState(next desiredRequestState) bool {
                return false
        }
        more := true
-       cancel := current.Requests.Clone()
-       for _, ri := range next.Requests {
-               cancel.Remove(ri)
-       }
-       cancel.Iterate(func(req uint32) bool {
-               more = p.cancel(req)
-               return more
-       })
-       if !more {
-               return false
-       }
-       shuffled := false
-       lastPending := 0
-       for i := 0; i < len(next.Requests); i++ {
-               req := next.Requests[i]
+       requestHeap := &next.Requests
+       heap.Init(requestHeap)
+       for requestHeap.Len() != 0 && maxRequests(current.Requests.GetCardinality()) < p.nominalMaxRequests() {
+               req := heap.Pop(requestHeap).(RequestIndex)
                if p.cancelledRequests.Contains(req) {
                        // Waiting for a reject or piece message, which will suitably trigger us to update our
                        // requests, so we can skip this one with no additional consideration.
                        continue
                }
-               // The cardinality of our desired requests shouldn't exceed the max requests since it's used
-               // in the calculation of the requests. However, if we cancelled requests and they haven't
-               // been rejected or serviced yet with the fast extension enabled, we can end up with more
-               // extra outstanding requests. We could subtract the number of outstanding cancels from the
-               // next request cardinality, but peers might not like that.
-               if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() {
-                       // log.Printf("not assigning all requests [desired=%v, cancelled=%v, current=%v, max=%v]",
-                       //      next.Requests.GetCardinality(),
-                       //      p.cancelledRequests.GetCardinality(),
-                       //      current.Requests.GetCardinality(),
-                       //      p.nominalMaxRequests(),
-                       // )
-                       break
-               }
-               otherPending := p.t.pendingRequests.Get(next.Requests[0])
-               if p.actualRequestState.Requests.Contains(next.Requests[0]) {
-                       otherPending--
+               existing := p.t.pendingRequests[req]
+               if existing != nil && existing != p && existing.actualRequestState.Requests.GetCardinality()-existing.cancelledRequests.GetCardinality() > current.Requests.GetCardinality() {
+                       existing.cancel(req)
                }
-               if otherPending < lastPending {
-                       // Pending should only rise. It's supposed to be the strongest ordering criteria. If it
-                       // doesn't, our shuffling condition could be wrong.
-                       panic(lastPending)
-               }
-               // If the request has already been requested by another peer, shuffle this and the rest of
-               // the requests (since according to the increasing condition, the rest of the indices
-               // already have an outstanding request with another peer).
-               if !shuffled && otherPending > 0 {
-                       shuffleReqs := next.Requests[i:]
-                       rand.Shuffle(len(shuffleReqs), func(i, j int) {
-                               shuffleReqs[i], shuffleReqs[j] = shuffleReqs[j], shuffleReqs[i]
-                       })
-                       // log.Printf("shuffled reqs [%v:%v]", i, len(next.Requests))
-                       shuffled = true
-                       // Repeat this index
-                       i--
-                       continue
-               }
-
                more = p.mustRequest(req)
                if !more {
                        break
index 30ccbb08a001cd52c84088b599a2451c26168899..b2688994cc4a5ba9d88f2b2cc77deed6f341cf97 100644 (file)
@@ -138,7 +138,7 @@ type Torrent struct {
        initialPieceCheckDisabled bool
 
        // Count of each request across active connections.
-       pendingRequests pendingRequests
+       pendingRequests map[RequestIndex]*Peer
        lastRequested   map[RequestIndex]time.Time
        // Chunks we've written to since the corresponding piece was last checked.
        dirtyChunks roaring.Bitmap
@@ -463,7 +463,7 @@ func (t *Torrent) onSetInfo() {
        t.cl.event.Broadcast()
        close(t.gotMetainfoC)
        t.updateWantPeersEvent()
-       t.pendingRequests.Init(t.numRequests())
+       t.pendingRequests = make(map[RequestIndex]*Peer)
        t.lastRequested = make(map[RequestIndex]time.Time)
        t.tryCreateMorePieceHashers()
        t.iterPeers(func(p *Peer) {