From 117ae28b38fa2162401de3703ac2e9441c6927e1 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Fri, 3 Dec 2021 21:30:41 +1100 Subject: [PATCH] Try request stealing --- peerconn.go | 8 ++-- requesting.go | 124 ++++++++++++-------------------------------------- torrent.go | 4 +- 3 files changed, 35 insertions(+), 101 deletions(-) diff --git a/peerconn.go b/peerconn.go index 7f8a78f1..0eec6037 100644 --- a/peerconn.go +++ b/peerconn.go @@ -619,7 +619,7 @@ func (cn *Peer) request(r RequestIndex) (more bool, err error) { cn.validReceiveChunks = make(map[RequestIndex]int) } cn.validReceiveChunks[r]++ - cn.t.pendingRequests.Inc(r) + cn.t.pendingRequests[r] = cn cn.t.lastRequested[r] = time.Now() cn.updateExpectingChunks() ppReq := cn.t.requestIndexToRequest(r) @@ -1550,10 +1550,8 @@ func (c *Peer) deleteRequest(r RequestIndex) bool { f(PeerRequestEvent{c, c.t.requestIndexToRequest(r)}) } c.updateExpectingChunks() - c.t.pendingRequests.Dec(r) - if c.t.pendingRequests.Get(r) == 0 { - delete(c.t.lastRequested, r) - } + delete(c.t.pendingRequests, r) + delete(c.t.lastRequested, r) return true } diff --git a/requesting.go b/requesting.go index 2a400ccd..50771025 100644 --- a/requesting.go +++ b/requesting.go @@ -4,7 +4,6 @@ import ( "container/heap" "context" "encoding/gob" - "math/rand" "reflect" "runtime/pprof" "time" @@ -82,20 +81,6 @@ func (p *peerRequests) Less(i, j int) bool { t := p.peer.t leftPieceIndex := leftRequest / t.chunksPerRegularPiece() rightPieceIndex := rightRequest / t.chunksPerRegularPiece() - leftCurrent := p.peer.actualRequestState.Requests.Contains(leftRequest) - rightCurrent := p.peer.actualRequestState.Requests.Contains(rightRequest) - pending := func(index RequestIndex, current bool) int { - ret := t.pendingRequests.Get(index) - if current { - ret-- - } - // See https://github.com/anacrolix/torrent/issues/679 for possible issues. This should be - // resolved. - if ret < 0 { - panic(ret) - } - return ret - } ml := multiless.New() // Push requests that can't be served right now to the end. But we don't throw them away unless // there's a better alternative. This is for when we're using the fast extension and get choked @@ -106,28 +91,33 @@ func (p *peerRequests) Less(i, j int) bool { !p.peer.peerAllowedFast.Contains(rightPieceIndex), ) } - ml = ml.Int( - pending(leftRequest, leftCurrent), - pending(rightRequest, rightCurrent)) - ml = ml.Bool(!leftCurrent, !rightCurrent) + leftPeer := t.pendingRequests[leftRequest] + rightPeer := t.pendingRequests[rightRequest] + ml = ml.Bool(rightPeer == p.peer, leftPeer == p.peer) + ml = ml.Bool(rightPeer == nil, leftPeer == nil) + if ml.Ok() { + return ml.MustLess() + } + if leftPeer != nil { + ml = ml.Uint64( + rightPeer.actualRequestState.Requests.GetCardinality(), + leftPeer.actualRequestState.Requests.GetCardinality(), + ) + } + ml = ml.CmpInt64(t.lastRequested[rightRequest].Sub(t.lastRequested[leftRequest]).Nanoseconds()) leftPiece := t.piece(int(leftPieceIndex)) rightPiece := t.piece(int(rightPieceIndex)) ml = ml.Int( + // Technically we would be happy with the cached priority here, except we don't actually + // cache it anymore, and Torrent.piecePriority just does another lookup of *Piece to resolve + // the priority through Piece.purePriority, which is probably slower. -int(leftPiece.purePriority()), -int(rightPiece.purePriority()), ) ml = ml.Int( int(leftPiece.availability), int(rightPiece.availability)) - leftLastRequested := p.peer.t.lastRequested[leftRequest] - rightLastRequested := p.peer.t.lastRequested[rightRequest] - ml = ml.EagerSameLess( - leftLastRequested.Equal(rightLastRequested), - leftLastRequested.Before(rightLastRequested), - ) - ml = ml.Uint32(leftPieceIndex, rightPieceIndex) - ml = ml.Uint32(leftRequest, rightRequest) - return ml.MustLess() + return ml.Less() } func (p *peerRequests) Swap(i, j int) { @@ -146,7 +136,7 @@ func (p *peerRequests) Pop() interface{} { } type desiredRequestState struct { - Requests []RequestIndex + Requests peerRequests Interested bool } @@ -175,7 +165,9 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) { // return // } if !allowedFast { - // We must signal interest to request this + // We must signal interest to request this. TODO: We could set interested if the + // peers pieces (minus the allowed fast set) overlap with our missing pieces if + // there are any readers, or any pending pieces. desired.Interested = true // We can make or will allow sustaining a request here if we're not choked, or // have made the request previously (presumably while unchoked), and haven't had @@ -186,23 +178,12 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) { return } } - // Note that we can still be interested if we filter all requests due to being - // recently requested from another peer. - if !p.actualRequestState.Requests.Contains(r) { - if time.Since(p.t.lastRequested[r]) < time.Second { - return - } - } requestHeap.requestIndexes = append(requestHeap.requestIndexes, r) }) }, ) p.t.assertPendingRequests() - heap.Init(&requestHeap) - for requestHeap.Len() != 0 && len(desired.Requests) < p.nominalMaxRequests() { - requestIndex := heap.Pop(&requestHeap).(RequestIndex) - desired.Requests = append(desired.Requests, requestIndex) - } + desired.Requests = requestHeap return } @@ -229,64 +210,19 @@ func (p *Peer) applyRequestState(next desiredRequestState) bool { return false } more := true - cancel := current.Requests.Clone() - for _, ri := range next.Requests { - cancel.Remove(ri) - } - cancel.Iterate(func(req uint32) bool { - more = p.cancel(req) - return more - }) - if !more { - return false - } - shuffled := false - lastPending := 0 - for i := 0; i < len(next.Requests); i++ { - req := next.Requests[i] + requestHeap := &next.Requests + heap.Init(requestHeap) + for requestHeap.Len() != 0 && maxRequests(current.Requests.GetCardinality()) < p.nominalMaxRequests() { + req := heap.Pop(requestHeap).(RequestIndex) if p.cancelledRequests.Contains(req) { // Waiting for a reject or piece message, which will suitably trigger us to update our // requests, so we can skip this one with no additional consideration. continue } - // The cardinality of our desired requests shouldn't exceed the max requests since it's used - // in the calculation of the requests. However, if we cancelled requests and they haven't - // been rejected or serviced yet with the fast extension enabled, we can end up with more - // extra outstanding requests. We could subtract the number of outstanding cancels from the - // next request cardinality, but peers might not like that. - if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() { - // log.Printf("not assigning all requests [desired=%v, cancelled=%v, current=%v, max=%v]", - // next.Requests.GetCardinality(), - // p.cancelledRequests.GetCardinality(), - // current.Requests.GetCardinality(), - // p.nominalMaxRequests(), - // ) - break - } - otherPending := p.t.pendingRequests.Get(next.Requests[0]) - if p.actualRequestState.Requests.Contains(next.Requests[0]) { - otherPending-- + existing := p.t.pendingRequests[req] + if existing != nil && existing != p && existing.actualRequestState.Requests.GetCardinality()-existing.cancelledRequests.GetCardinality() > current.Requests.GetCardinality() { + existing.cancel(req) } - if otherPending < lastPending { - // Pending should only rise. It's supposed to be the strongest ordering criteria. If it - // doesn't, our shuffling condition could be wrong. - panic(lastPending) - } - // If the request has already been requested by another peer, shuffle this and the rest of - // the requests (since according to the increasing condition, the rest of the indices - // already have an outstanding request with another peer). - if !shuffled && otherPending > 0 { - shuffleReqs := next.Requests[i:] - rand.Shuffle(len(shuffleReqs), func(i, j int) { - shuffleReqs[i], shuffleReqs[j] = shuffleReqs[j], shuffleReqs[i] - }) - // log.Printf("shuffled reqs [%v:%v]", i, len(next.Requests)) - shuffled = true - // Repeat this index - i-- - continue - } - more = p.mustRequest(req) if !more { break diff --git a/torrent.go b/torrent.go index 30ccbb08..b2688994 100644 --- a/torrent.go +++ b/torrent.go @@ -138,7 +138,7 @@ type Torrent struct { initialPieceCheckDisabled bool // Count of each request across active connections. - pendingRequests pendingRequests + pendingRequests map[RequestIndex]*Peer lastRequested map[RequestIndex]time.Time // Chunks we've written to since the corresponding piece was last checked. dirtyChunks roaring.Bitmap @@ -463,7 +463,7 @@ func (t *Torrent) onSetInfo() { t.cl.event.Broadcast() close(t.gotMetainfoC) t.updateWantPeersEvent() - t.pendingRequests.Init(t.numRequests()) + t.pendingRequests = make(map[RequestIndex]*Peer) t.lastRequested = make(map[RequestIndex]time.Time) t.tryCreateMorePieceHashers() t.iterPeers(func(p *Peer) { -- 2.44.0