X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=requesting.go;h=8b9db971b537630e65dd41ca9618f8a708f6a698;hb=HEAD;hp=2a400ccd05e25c3b6fe4a147a5ef3125ee37fdbe;hpb=73be571f500f649c0fca55ab22b2d8aeebfa8c27;p=btrtrc.git diff --git a/requesting.go b/requesting.go index 2a400ccd..8b9db971 100644 --- a/requesting.go +++ b/requesting.go @@ -1,26 +1,33 @@ package torrent import ( - "container/heap" "context" "encoding/gob" - "math/rand" + "fmt" "reflect" "runtime/pprof" "time" "unsafe" + "github.com/anacrolix/generics/heap" "github.com/anacrolix/log" "github.com/anacrolix/multiless" - request_strategy "github.com/anacrolix/torrent/request-strategy" + requestStrategy "github.com/anacrolix/torrent/request-strategy" + typedRoaring "github.com/anacrolix/torrent/typed-roaring" ) -func (t *Torrent) requestStrategyPieceOrderState(i int) request_strategy.PieceRequestOrderState { - return request_strategy.PieceRequestOrderState{ +type ( + // Since we have to store all the requests in memory, we can't reasonably exceed what could be + // indexed with the memory space available. + maxRequests = int +) + +func (t *Torrent) requestStrategyPieceOrderState(i int) requestStrategy.PieceRequestOrderState { + return requestStrategy.PieceRequestOrderState{ Priority: t.piece(i).purePriority(), Partial: t.piecePartiallyDownloaded(i), - Availability: t.piece(i).availability, + Availability: t.piece(i).availability(), } } @@ -63,39 +70,20 @@ func (p *peerId) GobDecode(b []byte) error { } type ( - RequestIndex = request_strategy.RequestIndex - chunkIndexType = request_strategy.ChunkIndex + RequestIndex = requestStrategy.RequestIndex + chunkIndexType = requestStrategy.ChunkIndex ) -type peerRequests struct { +type desiredPeerRequests struct { requestIndexes []RequestIndex peer *Peer + pieceStates []requestStrategy.PieceRequestOrderState } -func (p *peerRequests) Len() int { - return len(p.requestIndexes) -} - -func (p *peerRequests) Less(i, j int) bool { - leftRequest := p.requestIndexes[i] - rightRequest := p.requestIndexes[j] +func (p *desiredPeerRequests) lessByValue(leftRequest, rightRequest RequestIndex) bool { t := p.peer.t - leftPieceIndex := leftRequest / t.chunksPerRegularPiece() - rightPieceIndex := rightRequest / t.chunksPerRegularPiece() - leftCurrent := p.peer.actualRequestState.Requests.Contains(leftRequest) - rightCurrent := p.peer.actualRequestState.Requests.Contains(rightRequest) - pending := func(index RequestIndex, current bool) int { - ret := t.pendingRequests.Get(index) - if current { - ret-- - } - // See https://github.com/anacrolix/torrent/issues/679 for possible issues. This should be - // resolved. - if ret < 0 { - panic(ret) - } - return ret - } + leftPieceIndex := t.pieceIndexOfRequestIndex(leftRequest) + rightPieceIndex := t.pieceIndexOfRequestIndex(rightRequest) ml := multiless.New() // Push requests that can't be served right now to the end. But we don't throw them away unless // there's a better alternative. This is for when we're using the fast extension and get choked @@ -106,200 +94,217 @@ func (p *peerRequests) Less(i, j int) bool { !p.peer.peerAllowedFast.Contains(rightPieceIndex), ) } + leftPiece := &p.pieceStates[leftPieceIndex] + rightPiece := &p.pieceStates[rightPieceIndex] + // Putting this first means we can steal requests from lesser-performing peers for our first few + // new requests. + priority := func() piecePriority { + // Technically we would be happy with the cached priority here, except we don't actually + // cache it anymore, and Torrent.piecePriority just does another lookup of *Piece to resolve + // the priority through Piece.purePriority, which is probably slower. + leftPriority := leftPiece.Priority + rightPriority := rightPiece.Priority + ml = ml.Int( + -int(leftPriority), + -int(rightPriority), + ) + if !ml.Ok() { + if leftPriority != rightPriority { + panic("expected equal") + } + } + return leftPriority + }() + if ml.Ok() { + return ml.MustLess() + } + leftRequestState := t.requestState[leftRequest] + rightRequestState := t.requestState[rightRequest] + leftPeer := leftRequestState.peer + rightPeer := rightRequestState.peer + // Prefer chunks already requested from this peer. + ml = ml.Bool(rightPeer == p.peer, leftPeer == p.peer) + // Prefer unrequested chunks. + ml = ml.Bool(rightPeer == nil, leftPeer == nil) + if ml.Ok() { + return ml.MustLess() + } + if leftPeer != nil { + // The right peer should also be set, or we'd have resolved the computation by now. + ml = ml.Uint64( + rightPeer.requestState.Requests.GetCardinality(), + leftPeer.requestState.Requests.GetCardinality(), + ) + // Could either of the lastRequested be Zero? That's what checking an existing peer is for. + leftLast := leftRequestState.when + rightLast := rightRequestState.when + if leftLast.IsZero() || rightLast.IsZero() { + panic("expected non-zero last requested times") + } + // We want the most-recently requested on the left. Clients like Transmission serve requests + // in received order, so the most recently-requested is the one that has the longest until + // it will be served and therefore is the best candidate to cancel. + ml = ml.CmpInt64(rightLast.Sub(leftLast).Nanoseconds()) + } ml = ml.Int( - pending(leftRequest, leftCurrent), - pending(rightRequest, rightCurrent)) - ml = ml.Bool(!leftCurrent, !rightCurrent) - leftPiece := t.piece(int(leftPieceIndex)) - rightPiece := t.piece(int(rightPieceIndex)) - ml = ml.Int( - -int(leftPiece.purePriority()), - -int(rightPiece.purePriority()), - ) - ml = ml.Int( - int(leftPiece.availability), - int(rightPiece.availability)) - leftLastRequested := p.peer.t.lastRequested[leftRequest] - rightLastRequested := p.peer.t.lastRequested[rightRequest] - ml = ml.EagerSameLess( - leftLastRequested.Equal(rightLastRequested), - leftLastRequested.Before(rightLastRequested), - ) - ml = ml.Uint32(leftPieceIndex, rightPieceIndex) - ml = ml.Uint32(leftRequest, rightRequest) - return ml.MustLess() -} - -func (p *peerRequests) Swap(i, j int) { - p.requestIndexes[i], p.requestIndexes[j] = p.requestIndexes[j], p.requestIndexes[i] -} - -func (p *peerRequests) Push(x interface{}) { - p.requestIndexes = append(p.requestIndexes, x.(RequestIndex)) -} - -func (p *peerRequests) Pop() interface{} { - last := len(p.requestIndexes) - 1 - x := p.requestIndexes[last] - p.requestIndexes = p.requestIndexes[:last] - return x + leftPiece.Availability, + rightPiece.Availability) + if priority == PiecePriorityReadahead { + // TODO: For readahead in particular, it would be even better to consider distance from the + // reader position so that reads earlier in a torrent don't starve reads later in the + // torrent. This would probably require reconsideration of how readahead priority works. + ml = ml.Int(leftPieceIndex, rightPieceIndex) + } else { + ml = ml.Int(t.pieceRequestOrder[leftPieceIndex], t.pieceRequestOrder[rightPieceIndex]) + } + return ml.Less() } type desiredRequestState struct { - Requests []RequestIndex + Requests desiredPeerRequests Interested bool } func (p *Peer) getDesiredRequestState() (desired desiredRequestState) { - if !p.t.haveInfo() { + t := p.t + if !t.haveInfo() { return } - input := p.t.getRequestStrategyInput() - requestHeap := peerRequests{ - peer: p, + if t.closed.IsSet() { + return + } + input := t.getRequestStrategyInput() + requestHeap := desiredPeerRequests{ + peer: p, + pieceStates: t.requestPieceStates, + requestIndexes: t.requestIndexes, } - request_strategy.GetRequestablePieces( + // Caller-provided allocation for roaring bitmap iteration. + var it typedRoaring.Iterator[RequestIndex] + requestStrategy.GetRequestablePieces( input, - p.t.cl.pieceRequestOrder[p.t.storage.Capacity], - func(ih InfoHash, pieceIndex int) { - if ih != p.t.infoHash { + t.getPieceRequestOrder(), + func(ih InfoHash, pieceIndex int, pieceExtra requestStrategy.PieceRequestOrderState) { + if ih != t.infoHash { return } if !p.peerHasPiece(pieceIndex) { return } - allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex) - p.t.piece(pieceIndex).undirtiedChunksIter.Iter(func(ci request_strategy.ChunkIndex) { - r := p.t.pieceRequestIndexOffset(pieceIndex) + ci - // if p.t.pendingRequests.Get(r) != 0 && !p.actualRequestState.Requests.Contains(r) { - // return - // } + requestHeap.pieceStates[pieceIndex] = pieceExtra + allowedFast := p.peerAllowedFast.Contains(pieceIndex) + t.iterUndirtiedRequestIndexesInPiece(&it, pieceIndex, func(r requestStrategy.RequestIndex) { if !allowedFast { - // We must signal interest to request this + // We must signal interest to request this. TODO: We could set interested if the + // peers pieces (minus the allowed fast set) overlap with our missing pieces if + // there are any readers, or any pending pieces. desired.Interested = true // We can make or will allow sustaining a request here if we're not choked, or // have made the request previously (presumably while unchoked), and haven't had // the peer respond yet (and the request was retained because we are using the // fast extension). - if p.peerChoking && !p.actualRequestState.Requests.Contains(r) { + if p.peerChoking && !p.requestState.Requests.Contains(r) { // We can't request this right now. return } } - // Note that we can still be interested if we filter all requests due to being - // recently requested from another peer. - if !p.actualRequestState.Requests.Contains(r) { - if time.Since(p.t.lastRequested[r]) < time.Second { - return - } + if p.requestState.Cancelled.Contains(r) { + // Can't re-request while awaiting acknowledgement. + return } requestHeap.requestIndexes = append(requestHeap.requestIndexes, r) }) }, ) - p.t.assertPendingRequests() - heap.Init(&requestHeap) - for requestHeap.Len() != 0 && len(desired.Requests) < p.nominalMaxRequests() { - requestIndex := heap.Pop(&requestHeap).(RequestIndex) - desired.Requests = append(desired.Requests, requestIndex) - } + t.assertPendingRequests() + desired.Requests = requestHeap return } -func (p *Peer) maybeUpdateActualRequestState() bool { +func (p *Peer) maybeUpdateActualRequestState() { + if p.closed.IsSet() { + return + } if p.needRequestUpdate == "" { - return true + return + } + if p.needRequestUpdate == peerUpdateRequestsTimerReason { + since := time.Since(p.lastRequestUpdate) + if since < updateRequestsTimerDuration { + panic(since) + } } - var more bool pprof.Do( context.Background(), pprof.Labels("update request", p.needRequestUpdate), func(_ context.Context) { next := p.getDesiredRequestState() - more = p.applyRequestState(next) + p.applyRequestState(next) + p.t.requestIndexes = next.Requests.requestIndexes[:0] }, ) - return more } // Transmit/action the request state to the peer. -func (p *Peer) applyRequestState(next desiredRequestState) bool { - current := &p.actualRequestState +func (p *Peer) applyRequestState(next desiredRequestState) { + current := &p.requestState if !p.setInterested(next.Interested) { - return false + return } more := true - cancel := current.Requests.Clone() - for _, ri := range next.Requests { - cancel.Remove(ri) - } - cancel.Iterate(func(req uint32) bool { - more = p.cancel(req) - return more - }) - if !more { - return false + requestHeap := heap.InterfaceForSlice(&next.Requests.requestIndexes, next.Requests.lessByValue) + heap.Init(requestHeap) + + t := p.t + originalRequestCount := current.Requests.GetCardinality() + // We're either here on a timer, or because we ran out of requests. Both are valid reasons to + // alter peakRequests. + if originalRequestCount != 0 && p.needRequestUpdate != peerUpdateRequestsTimerReason { + panic(fmt.Sprintf( + "expected zero existing requests (%v) for update reason %q", + originalRequestCount, p.needRequestUpdate)) } - shuffled := false - lastPending := 0 - for i := 0; i < len(next.Requests); i++ { - req := next.Requests[i] - if p.cancelledRequests.Contains(req) { - // Waiting for a reject or piece message, which will suitably trigger us to update our - // requests, so we can skip this one with no additional consideration. - continue - } - // The cardinality of our desired requests shouldn't exceed the max requests since it's used - // in the calculation of the requests. However, if we cancelled requests and they haven't - // been rejected or serviced yet with the fast extension enabled, we can end up with more - // extra outstanding requests. We could subtract the number of outstanding cancels from the - // next request cardinality, but peers might not like that. - if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() { - // log.Printf("not assigning all requests [desired=%v, cancelled=%v, current=%v, max=%v]", - // next.Requests.GetCardinality(), - // p.cancelledRequests.GetCardinality(), - // current.Requests.GetCardinality(), - // p.nominalMaxRequests(), - // ) - break - } - otherPending := p.t.pendingRequests.Get(next.Requests[0]) - if p.actualRequestState.Requests.Contains(next.Requests[0]) { - otherPending-- - } - if otherPending < lastPending { - // Pending should only rise. It's supposed to be the strongest ordering criteria. If it - // doesn't, our shuffling condition could be wrong. - panic(lastPending) - } - // If the request has already been requested by another peer, shuffle this and the rest of - // the requests (since according to the increasing condition, the rest of the indices - // already have an outstanding request with another peer). - if !shuffled && otherPending > 0 { - shuffleReqs := next.Requests[i:] - rand.Shuffle(len(shuffleReqs), func(i, j int) { - shuffleReqs[i], shuffleReqs[j] = shuffleReqs[j], shuffleReqs[i] - }) - // log.Printf("shuffled reqs [%v:%v]", i, len(next.Requests)) - shuffled = true - // Repeat this index - i-- - continue + for requestHeap.Len() != 0 && maxRequests(current.Requests.GetCardinality()+current.Cancelled.GetCardinality()) < p.nominalMaxRequests() { + req := heap.Pop(requestHeap) + existing := t.requestingPeer(req) + if existing != nil && existing != p { + // Don't steal from the poor. + diff := int64(current.Requests.GetCardinality()) + 1 - (int64(existing.uncancelledRequests()) - 1) + // Steal a request that leaves us with one more request than the existing peer + // connection if the stealer more recently received a chunk. + if diff > 1 || (diff == 1 && p.lastUsefulChunkReceived.Before(existing.lastUsefulChunkReceived)) { + continue + } + t.cancelRequest(req) } - more = p.mustRequest(req) if !more { break } } - // TODO: This may need to change, we might want to update even if there were no requests due to - // filtering them for being recently requested already. - p.updateRequestsTimer.Stop() - if more { - p.needRequestUpdate = "" - if current.Interested { - p.updateRequestsTimer.Reset(3 * time.Second) - } + if !more { + // This might fail if we incorrectly determine that we can fit up to the maximum allowed + // requests into the available write buffer space. We don't want that to happen because it + // makes our peak requests dependent on how much was already in the buffer. + panic(fmt.Sprintf( + "couldn't fill apply entire request state [newRequests=%v]", + current.Requests.GetCardinality()-originalRequestCount)) + } + newPeakRequests := maxRequests(current.Requests.GetCardinality() - originalRequestCount) + // log.Printf( + // "requests %v->%v (peak %v->%v) reason %q (peer %v)", + // originalRequestCount, current.Requests.GetCardinality(), p.peakRequests, newPeakRequests, p.needRequestUpdate, p) + p.peakRequests = newPeakRequests + p.needRequestUpdate = "" + p.lastRequestUpdate = time.Now() + if enableUpdateRequestsTimer { + p.updateRequestsTimer.Reset(updateRequestsTimerDuration) } - return more } + +// This could be set to 10s to match the unchoke/request update interval recommended by some +// specifications. I've set it shorter to trigger it more often for testing for now. +const ( + updateRequestsTimerDuration = 3 * time.Second + enableUpdateRequestsTimer = false +)