X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=requesting.go;h=8b9db971b537630e65dd41ca9618f8a708f6a698;hb=HEAD;hp=411fac53c203fb67f7d56dd1f244cf50279f4700;hpb=e90037216c2e9d3d1639418bcc1a18dcee984c1b;p=btrtrc.git diff --git a/requesting.go b/requesting.go index 411fac53..8b9db971 100644 --- a/requesting.go +++ b/requesting.go @@ -9,15 +9,22 @@ import ( "time" "unsafe" + "github.com/anacrolix/generics/heap" "github.com/anacrolix/log" "github.com/anacrolix/multiless" - "github.com/lispad/go-generics-tools/binheap" - request_strategy "github.com/anacrolix/torrent/request-strategy" + requestStrategy "github.com/anacrolix/torrent/request-strategy" + typedRoaring "github.com/anacrolix/torrent/typed-roaring" ) -func (t *Torrent) requestStrategyPieceOrderState(i int) request_strategy.PieceRequestOrderState { - return request_strategy.PieceRequestOrderState{ +type ( + // Since we have to store all the requests in memory, we can't reasonably exceed what could be + // indexed with the memory space available. + maxRequests = int +) + +func (t *Torrent) requestStrategyPieceOrderState(i int) requestStrategy.PieceRequestOrderState { + return requestStrategy.PieceRequestOrderState{ Priority: t.piece(i).purePriority(), Partial: t.piecePartiallyDownloaded(i), Availability: t.piece(i).availability(), @@ -63,21 +70,14 @@ func (p *peerId) GobDecode(b []byte) error { } type ( - RequestIndex = request_strategy.RequestIndex - chunkIndexType = request_strategy.ChunkIndex + RequestIndex = requestStrategy.RequestIndex + chunkIndexType = requestStrategy.ChunkIndex ) type desiredPeerRequests struct { requestIndexes []RequestIndex peer *Peer -} - -func (p *desiredPeerRequests) Len() int { - return len(p.requestIndexes) -} - -func (p *desiredPeerRequests) Less(i, j int) bool { - return p.lessByValue(p.requestIndexes[i], p.requestIndexes[j]) + pieceStates []requestStrategy.PieceRequestOrderState } func (p *desiredPeerRequests) lessByValue(leftRequest, rightRequest RequestIndex) bool { @@ -94,16 +94,16 @@ func (p *desiredPeerRequests) lessByValue(leftRequest, rightRequest RequestIndex !p.peer.peerAllowedFast.Contains(rightPieceIndex), ) } - leftPiece := t.piece(leftPieceIndex) - rightPiece := t.piece(rightPieceIndex) + leftPiece := &p.pieceStates[leftPieceIndex] + rightPiece := &p.pieceStates[rightPieceIndex] // Putting this first means we can steal requests from lesser-performing peers for our first few // new requests. priority := func() piecePriority { // Technically we would be happy with the cached priority here, except we don't actually // cache it anymore, and Torrent.piecePriority just does another lookup of *Piece to resolve // the priority through Piece.purePriority, which is probably slower. - leftPriority := leftPiece.purePriority() - rightPriority := rightPiece.purePriority() + leftPriority := leftPiece.Priority + rightPriority := rightPiece.Priority ml = ml.Int( -int(leftPriority), -int(rightPriority), @@ -118,8 +118,10 @@ func (p *desiredPeerRequests) lessByValue(leftRequest, rightRequest RequestIndex if ml.Ok() { return ml.MustLess() } - leftPeer := t.pendingRequests[leftRequest] - rightPeer := t.pendingRequests[rightRequest] + leftRequestState := t.requestState[leftRequest] + rightRequestState := t.requestState[rightRequest] + leftPeer := leftRequestState.peer + rightPeer := rightRequestState.peer // Prefer chunks already requested from this peer. ml = ml.Bool(rightPeer == p.peer, leftPeer == p.peer) // Prefer unrequested chunks. @@ -134,8 +136,8 @@ func (p *desiredPeerRequests) lessByValue(leftRequest, rightRequest RequestIndex leftPeer.requestState.Requests.GetCardinality(), ) // Could either of the lastRequested be Zero? That's what checking an existing peer is for. - leftLast := t.lastRequested[leftRequest] - rightLast := t.lastRequested[rightRequest] + leftLast := leftRequestState.when + rightLast := rightRequestState.when if leftLast.IsZero() || rightLast.IsZero() { panic("expected non-zero last requested times") } @@ -145,65 +147,53 @@ func (p *desiredPeerRequests) lessByValue(leftRequest, rightRequest RequestIndex ml = ml.CmpInt64(rightLast.Sub(leftLast).Nanoseconds()) } ml = ml.Int( - leftPiece.relativeAvailability, - rightPiece.relativeAvailability) + leftPiece.Availability, + rightPiece.Availability) if priority == PiecePriorityReadahead { // TODO: For readahead in particular, it would be even better to consider distance from the // reader position so that reads earlier in a torrent don't starve reads later in the // torrent. This would probably require reconsideration of how readahead priority works. ml = ml.Int(leftPieceIndex, rightPieceIndex) } else { - // TODO: To prevent unnecessarily requesting from disparate pieces, and to ensure pieces are - // selected randomly when availability is even, there should be some fixed ordering of - // pieces. + ml = ml.Int(t.pieceRequestOrder[leftPieceIndex], t.pieceRequestOrder[rightPieceIndex]) } return ml.Less() } -func (p *desiredPeerRequests) Swap(i, j int) { - p.requestIndexes[i], p.requestIndexes[j] = p.requestIndexes[j], p.requestIndexes[i] -} - -func (p *desiredPeerRequests) Push(x interface{}) { - p.requestIndexes = append(p.requestIndexes, x.(RequestIndex)) -} - -func (p *desiredPeerRequests) Pop() interface{} { - last := len(p.requestIndexes) - 1 - x := p.requestIndexes[last] - p.requestIndexes = p.requestIndexes[:last] - return x -} - type desiredRequestState struct { Requests desiredPeerRequests Interested bool } func (p *Peer) getDesiredRequestState() (desired desiredRequestState) { - if !p.t.haveInfo() { + t := p.t + if !t.haveInfo() { return } - if p.t.closed.IsSet() { + if t.closed.IsSet() { return } - input := p.t.getRequestStrategyInput() + input := t.getRequestStrategyInput() requestHeap := desiredPeerRequests{ - peer: p, + peer: p, + pieceStates: t.requestPieceStates, + requestIndexes: t.requestIndexes, } - request_strategy.GetRequestablePieces( + // Caller-provided allocation for roaring bitmap iteration. + var it typedRoaring.Iterator[RequestIndex] + requestStrategy.GetRequestablePieces( input, - p.t.getPieceRequestOrder(), - func(ih InfoHash, pieceIndex int) { - if ih != p.t.infoHash { + t.getPieceRequestOrder(), + func(ih InfoHash, pieceIndex int, pieceExtra requestStrategy.PieceRequestOrderState) { + if ih != t.infoHash { return } if !p.peerHasPiece(pieceIndex) { return } + requestHeap.pieceStates[pieceIndex] = pieceExtra allowedFast := p.peerAllowedFast.Contains(pieceIndex) - p.t.piece(pieceIndex).undirtiedChunksIter.Iter(func(ci request_strategy.ChunkIndex) { - r := p.t.pieceRequestIndexOffset(pieceIndex) + ci + t.iterUndirtiedRequestIndexesInPiece(&it, pieceIndex, func(r requestStrategy.RequestIndex) { if !allowedFast { // We must signal interest to request this. TODO: We could set interested if the // peers pieces (minus the allowed fast set) overlap with our missing pieces if @@ -226,7 +216,7 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) { }) }, ) - p.t.assertPendingRequests() + t.assertPendingRequests() desired.Requests = requestHeap return } @@ -250,6 +240,7 @@ func (p *Peer) maybeUpdateActualRequestState() { func(_ context.Context) { next := p.getDesiredRequestState() p.applyRequestState(next) + p.t.requestIndexes = next.Requests.requestIndexes[:0] }, ) } @@ -258,10 +249,12 @@ func (p *Peer) maybeUpdateActualRequestState() { func (p *Peer) applyRequestState(next desiredRequestState) { current := &p.requestState if !p.setInterested(next.Interested) { - panic("insufficient write buffer") + return } more := true - requestHeap := binheap.FromSlice(next.Requests.requestIndexes, next.Requests.lessByValue) + requestHeap := heap.InterfaceForSlice(&next.Requests.requestIndexes, next.Requests.lessByValue) + heap.Init(requestHeap) + t := p.t originalRequestCount := current.Requests.GetCardinality() // We're either here on a timer, or because we ran out of requests. Both are valid reasons to @@ -272,7 +265,7 @@ func (p *Peer) applyRequestState(next desiredRequestState) { originalRequestCount, p.needRequestUpdate)) } for requestHeap.Len() != 0 && maxRequests(current.Requests.GetCardinality()+current.Cancelled.GetCardinality()) < p.nominalMaxRequests() { - req := requestHeap.Pop() + req := heap.Pop(requestHeap) existing := t.requestingPeer(req) if existing != nil && existing != p { // Don't steal from the poor. @@ -304,9 +297,14 @@ func (p *Peer) applyRequestState(next desiredRequestState) { p.peakRequests = newPeakRequests p.needRequestUpdate = "" p.lastRequestUpdate = time.Now() - p.updateRequestsTimer.Reset(updateRequestsTimerDuration) + if enableUpdateRequestsTimer { + p.updateRequestsTimer.Reset(updateRequestsTimerDuration) + } } // This could be set to 10s to match the unchoke/request update interval recommended by some // specifications. I've set it shorter to trigger it more often for testing for now. -const updateRequestsTimerDuration = 3 * time.Second +const ( + updateRequestsTimerDuration = 3 * time.Second + enableUpdateRequestsTimer = false +)