X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=requesting.go;h=8b9db971b537630e65dd41ca9618f8a708f6a698;hb=HEAD;hp=a3a7e1c3a9d5c5b35b7885274ac021d83193a41d;hpb=04beb8937fbebe3b651d69fadcd918fe401d4678;p=btrtrc.git diff --git a/requesting.go b/requesting.go index a3a7e1c3..8b9db971 100644 --- a/requesting.go +++ b/requesting.go @@ -1,25 +1,33 @@ package torrent import ( - "container/heap" "context" "encoding/gob" + "fmt" "reflect" "runtime/pprof" "time" "unsafe" + "github.com/anacrolix/generics/heap" "github.com/anacrolix/log" "github.com/anacrolix/multiless" - request_strategy "github.com/anacrolix/torrent/request-strategy" + requestStrategy "github.com/anacrolix/torrent/request-strategy" + typedRoaring "github.com/anacrolix/torrent/typed-roaring" ) -func (t *Torrent) requestStrategyPieceOrderState(i int) request_strategy.PieceRequestOrderState { - return request_strategy.PieceRequestOrderState{ +type ( + // Since we have to store all the requests in memory, we can't reasonably exceed what could be + // indexed with the memory space available. + maxRequests = int +) + +func (t *Torrent) requestStrategyPieceOrderState(i int) requestStrategy.PieceRequestOrderState { + return requestStrategy.PieceRequestOrderState{ Priority: t.piece(i).purePriority(), Partial: t.piecePartiallyDownloaded(i), - Availability: t.piece(i).availability, + Availability: t.piece(i).availability(), } } @@ -62,25 +70,20 @@ func (p *peerId) GobDecode(b []byte) error { } type ( - RequestIndex = request_strategy.RequestIndex - chunkIndexType = request_strategy.ChunkIndex + RequestIndex = requestStrategy.RequestIndex + chunkIndexType = requestStrategy.ChunkIndex ) -type peerRequests struct { +type desiredPeerRequests struct { requestIndexes []RequestIndex peer *Peer + pieceStates []requestStrategy.PieceRequestOrderState } -func (p *peerRequests) Len() int { - return len(p.requestIndexes) -} - -func (p *peerRequests) Less(i, j int) bool { - leftRequest := p.requestIndexes[i] - rightRequest := p.requestIndexes[j] +func (p *desiredPeerRequests) lessByValue(leftRequest, rightRequest RequestIndex) bool { t := p.peer.t - leftPieceIndex := leftRequest / t.chunksPerRegularPiece() - rightPieceIndex := rightRequest / t.chunksPerRegularPiece() + leftPieceIndex := t.pieceIndexOfRequestIndex(leftRequest) + rightPieceIndex := t.pieceIndexOfRequestIndex(rightRequest) ml := multiless.New() // Push requests that can't be served right now to the end. But we don't throw them away unless // there's a better alternative. This is for when we're using the fast extension and get choked @@ -91,9 +94,37 @@ func (p *peerRequests) Less(i, j int) bool { !p.peer.peerAllowedFast.Contains(rightPieceIndex), ) } - leftPeer := t.pendingRequests[leftRequest] - rightPeer := t.pendingRequests[rightRequest] + leftPiece := &p.pieceStates[leftPieceIndex] + rightPiece := &p.pieceStates[rightPieceIndex] + // Putting this first means we can steal requests from lesser-performing peers for our first few + // new requests. + priority := func() piecePriority { + // Technically we would be happy with the cached priority here, except we don't actually + // cache it anymore, and Torrent.piecePriority just does another lookup of *Piece to resolve + // the priority through Piece.purePriority, which is probably slower. + leftPriority := leftPiece.Priority + rightPriority := rightPiece.Priority + ml = ml.Int( + -int(leftPriority), + -int(rightPriority), + ) + if !ml.Ok() { + if leftPriority != rightPriority { + panic("expected equal") + } + } + return leftPriority + }() + if ml.Ok() { + return ml.MustLess() + } + leftRequestState := t.requestState[leftRequest] + rightRequestState := t.requestState[rightRequest] + leftPeer := leftRequestState.peer + rightPeer := rightRequestState.peer + // Prefer chunks already requested from this peer. ml = ml.Bool(rightPeer == p.peer, leftPeer == p.peer) + // Prefer unrequested chunks. ml = ml.Bool(rightPeer == nil, leftPeer == nil) if ml.Ok() { return ml.MustLess() @@ -105,8 +136,8 @@ func (p *peerRequests) Less(i, j int) bool { leftPeer.requestState.Requests.GetCardinality(), ) // Could either of the lastRequested be Zero? That's what checking an existing peer is for. - leftLast := t.lastRequested[leftRequest] - rightLast := t.lastRequested[rightRequest] + leftLast := leftRequestState.when + rightLast := rightRequestState.when if leftLast.IsZero() || rightLast.IsZero() { panic("expected non-zero last requested times") } @@ -115,65 +146,54 @@ func (p *peerRequests) Less(i, j int) bool { // it will be served and therefore is the best candidate to cancel. ml = ml.CmpInt64(rightLast.Sub(leftLast).Nanoseconds()) } - leftPiece := t.piece(int(leftPieceIndex)) - rightPiece := t.piece(int(rightPieceIndex)) ml = ml.Int( - // Technically we would be happy with the cached priority here, except we don't actually - // cache it anymore, and Torrent.piecePriority just does another lookup of *Piece to resolve - // the priority through Piece.purePriority, which is probably slower. - -int(leftPiece.purePriority()), - -int(rightPiece.purePriority()), - ) - ml = ml.Int( - int(leftPiece.availability), - int(rightPiece.availability)) + leftPiece.Availability, + rightPiece.Availability) + if priority == PiecePriorityReadahead { + // TODO: For readahead in particular, it would be even better to consider distance from the + // reader position so that reads earlier in a torrent don't starve reads later in the + // torrent. This would probably require reconsideration of how readahead priority works. + ml = ml.Int(leftPieceIndex, rightPieceIndex) + } else { + ml = ml.Int(t.pieceRequestOrder[leftPieceIndex], t.pieceRequestOrder[rightPieceIndex]) + } return ml.Less() } -func (p *peerRequests) Swap(i, j int) { - p.requestIndexes[i], p.requestIndexes[j] = p.requestIndexes[j], p.requestIndexes[i] -} - -func (p *peerRequests) Push(x interface{}) { - p.requestIndexes = append(p.requestIndexes, x.(RequestIndex)) -} - -func (p *peerRequests) Pop() interface{} { - last := len(p.requestIndexes) - 1 - x := p.requestIndexes[last] - p.requestIndexes = p.requestIndexes[:last] - return x -} - type desiredRequestState struct { - Requests peerRequests + Requests desiredPeerRequests Interested bool } func (p *Peer) getDesiredRequestState() (desired desiredRequestState) { - if !p.t.haveInfo() { + t := p.t + if !t.haveInfo() { return } - input := p.t.getRequestStrategyInput() - requestHeap := peerRequests{ - peer: p, + if t.closed.IsSet() { + return } - request_strategy.GetRequestablePieces( + input := t.getRequestStrategyInput() + requestHeap := desiredPeerRequests{ + peer: p, + pieceStates: t.requestPieceStates, + requestIndexes: t.requestIndexes, + } + // Caller-provided allocation for roaring bitmap iteration. + var it typedRoaring.Iterator[RequestIndex] + requestStrategy.GetRequestablePieces( input, - p.t.getPieceRequestOrder(), - func(ih InfoHash, pieceIndex int) { - if ih != p.t.infoHash { + t.getPieceRequestOrder(), + func(ih InfoHash, pieceIndex int, pieceExtra requestStrategy.PieceRequestOrderState) { + if ih != t.infoHash { return } if !p.peerHasPiece(pieceIndex) { return } - allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex) - p.t.piece(pieceIndex).undirtiedChunksIter.Iter(func(ci request_strategy.ChunkIndex) { - r := p.t.pieceRequestIndexOffset(pieceIndex) + ci - // if p.t.pendingRequests.Get(r) != 0 && !p.requestState.Requests.Contains(r) { - // return - // } + requestHeap.pieceStates[pieceIndex] = pieceExtra + allowedFast := p.peerAllowedFast.Contains(pieceIndex) + t.iterUndirtiedRequestIndexesInPiece(&it, pieceIndex, func(r requestStrategy.RequestIndex) { if !allowedFast { // We must signal interest to request this. TODO: We could set interested if the // peers pieces (minus the allowed fast set) overlap with our missing pieces if @@ -189,46 +209,63 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) { } } if p.requestState.Cancelled.Contains(r) { - // Can't re-request. + // Can't re-request while awaiting acknowledgement. return } requestHeap.requestIndexes = append(requestHeap.requestIndexes, r) }) }, ) - p.t.assertPendingRequests() + t.assertPendingRequests() desired.Requests = requestHeap return } -func (p *Peer) maybeUpdateActualRequestState() bool { +func (p *Peer) maybeUpdateActualRequestState() { + if p.closed.IsSet() { + return + } if p.needRequestUpdate == "" { - return true + return + } + if p.needRequestUpdate == peerUpdateRequestsTimerReason { + since := time.Since(p.lastRequestUpdate) + if since < updateRequestsTimerDuration { + panic(since) + } } - var more bool pprof.Do( context.Background(), pprof.Labels("update request", p.needRequestUpdate), func(_ context.Context) { next := p.getDesiredRequestState() - more = p.applyRequestState(next) + p.applyRequestState(next) + p.t.requestIndexes = next.Requests.requestIndexes[:0] }, ) - return more } // Transmit/action the request state to the peer. -func (p *Peer) applyRequestState(next desiredRequestState) bool { +func (p *Peer) applyRequestState(next desiredRequestState) { current := &p.requestState if !p.setInterested(next.Interested) { - return false + return } more := true - requestHeap := &next.Requests - t := p.t + requestHeap := heap.InterfaceForSlice(&next.Requests.requestIndexes, next.Requests.lessByValue) heap.Init(requestHeap) - for requestHeap.Len() != 0 && maxRequests(current.Requests.GetCardinality()) < p.nominalMaxRequests() { - req := heap.Pop(requestHeap).(RequestIndex) + + t := p.t + originalRequestCount := current.Requests.GetCardinality() + // We're either here on a timer, or because we ran out of requests. Both are valid reasons to + // alter peakRequests. + if originalRequestCount != 0 && p.needRequestUpdate != peerUpdateRequestsTimerReason { + panic(fmt.Sprintf( + "expected zero existing requests (%v) for update reason %q", + originalRequestCount, p.needRequestUpdate)) + } + for requestHeap.Len() != 0 && maxRequests(current.Requests.GetCardinality()+current.Cancelled.GetCardinality()) < p.nominalMaxRequests() { + req := heap.Pop(requestHeap) existing := t.requestingPeer(req) if existing != nil && existing != p { // Don't steal from the poor. @@ -245,14 +282,29 @@ func (p *Peer) applyRequestState(next desiredRequestState) bool { break } } - // TODO: This may need to change, we might want to update even if there were no requests due to - // filtering them for being recently requested already. - p.updateRequestsTimer.Stop() - if more { - p.needRequestUpdate = "" - if current.Interested { - p.updateRequestsTimer.Reset(3 * time.Second) - } + if !more { + // This might fail if we incorrectly determine that we can fit up to the maximum allowed + // requests into the available write buffer space. We don't want that to happen because it + // makes our peak requests dependent on how much was already in the buffer. + panic(fmt.Sprintf( + "couldn't fill apply entire request state [newRequests=%v]", + current.Requests.GetCardinality()-originalRequestCount)) + } + newPeakRequests := maxRequests(current.Requests.GetCardinality() - originalRequestCount) + // log.Printf( + // "requests %v->%v (peak %v->%v) reason %q (peer %v)", + // originalRequestCount, current.Requests.GetCardinality(), p.peakRequests, newPeakRequests, p.needRequestUpdate, p) + p.peakRequests = newPeakRequests + p.needRequestUpdate = "" + p.lastRequestUpdate = time.Now() + if enableUpdateRequestsTimer { + p.updateRequestsTimer.Reset(updateRequestsTimerDuration) } - return more } + +// This could be set to 10s to match the unchoke/request update interval recommended by some +// specifications. I've set it shorter to trigger it more often for testing for now. +const ( + updateRequestsTimerDuration = 3 * time.Second + enableUpdateRequestsTimer = false +)