X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=requesting.go;h=8b9db971b537630e65dd41ca9618f8a708f6a698;hb=HEAD;hp=f481351673a1daedf8c8830fe61c08bdf7640016;hpb=16c571b58b9cccd34fdbe50b723945e22d2767b8;p=btrtrc.git diff --git a/requesting.go b/requesting.go index f4813516..8b9db971 100644 --- a/requesting.go +++ b/requesting.go @@ -1,82 +1,33 @@ package torrent import ( - "container/heap" "context" "encoding/gob" + "fmt" "reflect" "runtime/pprof" "time" "unsafe" - "github.com/RoaringBitmap/roaring" + "github.com/anacrolix/generics/heap" "github.com/anacrolix/log" - "github.com/anacrolix/missinggo/v2/bitmap" "github.com/anacrolix/multiless" - request_strategy "github.com/anacrolix/torrent/request-strategy" + requestStrategy "github.com/anacrolix/torrent/request-strategy" + typedRoaring "github.com/anacrolix/torrent/typed-roaring" ) -func (cl *Client) tickleRequester() { - cl.updateRequests.Broadcast() -} +type ( + // Since we have to store all the requests in memory, we can't reasonably exceed what could be + // indexed with the memory space available. + maxRequests = int +) -func (cl *Client) getRequestStrategyInput() request_strategy.Input { - ts := make([]request_strategy.Torrent, 0, len(cl.torrents)) - for _, t := range cl.torrents { - if !t.haveInfo() { - // This would be removed if metadata is handled here. We have to guard against not - // knowing the piece size. If we have no info, we have no pieces too, so the end result - // is the same. - continue - } - rst := request_strategy.Torrent{ - InfoHash: t.infoHash, - ChunksPerPiece: (t.usualPieceSize() + int(t.chunkSize) - 1) / int(t.chunkSize), - } - if t.storage != nil { - rst.Capacity = t.storage.Capacity - } - rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces)) - for i := range t.pieces { - p := &t.pieces[i] - rst.Pieces = append(rst.Pieces, request_strategy.Piece{ - Request: !t.ignorePieceForRequests(i), - Priority: p.purePriority(), - Partial: t.piecePartiallyDownloaded(i), - Availability: p.availability, - Length: int64(p.length()), - NumPendingChunks: int(t.pieceNumPendingChunks(i)), - IterPendingChunks: p.undirtiedChunksIter(), - }) - } - t.iterPeers(func(p *Peer) { - if p.closed.IsSet() { - return - } - if p.piecesReceivedSinceLastRequestUpdate > p.maxPiecesReceivedBetweenRequestUpdates { - p.maxPiecesReceivedBetweenRequestUpdates = p.piecesReceivedSinceLastRequestUpdate - } - p.piecesReceivedSinceLastRequestUpdate = 0 - rst.Peers = append(rst.Peers, request_strategy.Peer{ - Pieces: *p.newPeerPieces(), - MaxRequests: p.nominalMaxRequests(), - ExistingRequests: p.actualRequestState.Requests, - Choking: p.peerChoking, - PieceAllowedFast: p.peerAllowedFast, - DownloadRate: p.downloadRate(), - Age: time.Since(p.completedHandshake), - Id: peerId{ - Peer: p, - ptr: uintptr(unsafe.Pointer(p)), - }, - }) - }) - ts = append(ts, rst) - } - return request_strategy.Input{ - Torrents: ts, - MaxUnverifiedBytes: cl.config.MaxUnverifiedBytes, +func (t *Torrent) requestStrategyPieceOrderState(i int) requestStrategy.PieceRequestOrderState { + return requestStrategy.PieceRequestOrderState{ + Priority: t.piece(i).purePriority(), + Partial: t.piecePartiallyDownloaded(i), + Availability: t.piece(i).availability(), } } @@ -118,155 +69,242 @@ func (p *peerId) GobDecode(b []byte) error { return nil } -type RequestIndex = request_strategy.RequestIndex -type chunkIndexType = request_strategy.ChunkIndex - -type peerRequests struct { - requestIndexes []RequestIndex - peer *Peer - torrentStrategyInput request_strategy.Torrent -} +type ( + RequestIndex = requestStrategy.RequestIndex + chunkIndexType = requestStrategy.ChunkIndex +) -func (p peerRequests) Len() int { - return len(p.requestIndexes) +type desiredPeerRequests struct { + requestIndexes []RequestIndex + peer *Peer + pieceStates []requestStrategy.PieceRequestOrderState } -func (p peerRequests) Less(i, j int) bool { - leftRequest := p.requestIndexes[i] - rightRequest := p.requestIndexes[j] +func (p *desiredPeerRequests) lessByValue(leftRequest, rightRequest RequestIndex) bool { t := p.peer.t - leftPieceIndex := leftRequest / t.chunksPerRegularPiece() - rightPieceIndex := rightRequest / t.chunksPerRegularPiece() - leftCurrent := p.peer.actualRequestState.Requests.Contains(leftRequest) - rightCurrent := p.peer.actualRequestState.Requests.Contains(rightRequest) - pending := func(index RequestIndex, current bool) int { - ret := t.pendingRequests[index] - if current { - ret-- + leftPieceIndex := t.pieceIndexOfRequestIndex(leftRequest) + rightPieceIndex := t.pieceIndexOfRequestIndex(rightRequest) + ml := multiless.New() + // Push requests that can't be served right now to the end. But we don't throw them away unless + // there's a better alternative. This is for when we're using the fast extension and get choked + // but our requests could still be good when we get unchoked. + if p.peer.peerChoking { + ml = ml.Bool( + !p.peer.peerAllowedFast.Contains(leftPieceIndex), + !p.peer.peerAllowedFast.Contains(rightPieceIndex), + ) + } + leftPiece := &p.pieceStates[leftPieceIndex] + rightPiece := &p.pieceStates[rightPieceIndex] + // Putting this first means we can steal requests from lesser-performing peers for our first few + // new requests. + priority := func() piecePriority { + // Technically we would be happy with the cached priority here, except we don't actually + // cache it anymore, and Torrent.piecePriority just does another lookup of *Piece to resolve + // the priority through Piece.purePriority, which is probably slower. + leftPriority := leftPiece.Priority + rightPriority := rightPiece.Priority + ml = ml.Int( + -int(leftPriority), + -int(rightPriority), + ) + if !ml.Ok() { + if leftPriority != rightPriority { + panic("expected equal") + } } - return ret + return leftPriority + }() + if ml.Ok() { + return ml.MustLess() + } + leftRequestState := t.requestState[leftRequest] + rightRequestState := t.requestState[rightRequest] + leftPeer := leftRequestState.peer + rightPeer := rightRequestState.peer + // Prefer chunks already requested from this peer. + ml = ml.Bool(rightPeer == p.peer, leftPeer == p.peer) + // Prefer unrequested chunks. + ml = ml.Bool(rightPeer == nil, leftPeer == nil) + if ml.Ok() { + return ml.MustLess() + } + if leftPeer != nil { + // The right peer should also be set, or we'd have resolved the computation by now. + ml = ml.Uint64( + rightPeer.requestState.Requests.GetCardinality(), + leftPeer.requestState.Requests.GetCardinality(), + ) + // Could either of the lastRequested be Zero? That's what checking an existing peer is for. + leftLast := leftRequestState.when + rightLast := rightRequestState.when + if leftLast.IsZero() || rightLast.IsZero() { + panic("expected non-zero last requested times") + } + // We want the most-recently requested on the left. Clients like Transmission serve requests + // in received order, so the most recently-requested is the one that has the longest until + // it will be served and therefore is the best candidate to cancel. + ml = ml.CmpInt64(rightLast.Sub(leftLast).Nanoseconds()) } - ml := multiless.New() - ml = ml.Int( - pending(leftRequest, leftCurrent), - pending(rightRequest, rightCurrent)) - ml = ml.Bool(rightCurrent, leftCurrent) - ml = ml.Int( - int(p.torrentStrategyInput.Pieces[leftPieceIndex].Priority), - int(p.torrentStrategyInput.Pieces[rightPieceIndex].Priority)) ml = ml.Int( - int(p.torrentStrategyInput.Pieces[leftPieceIndex].Availability), - int(p.torrentStrategyInput.Pieces[rightPieceIndex].Availability)) - ml = ml.Uint32(leftPieceIndex, rightPieceIndex) - ml = ml.Uint32(leftRequest, rightRequest) - return ml.MustLess() -} - -func (p peerRequests) Swap(i, j int) { - p.requestIndexes[i], p.requestIndexes[j] = p.requestIndexes[j], p.requestIndexes[i] -} - -func (p *peerRequests) Push(x interface{}) { - p.requestIndexes = append(p.requestIndexes, x.(RequestIndex)) + leftPiece.Availability, + rightPiece.Availability) + if priority == PiecePriorityReadahead { + // TODO: For readahead in particular, it would be even better to consider distance from the + // reader position so that reads earlier in a torrent don't starve reads later in the + // torrent. This would probably require reconsideration of how readahead priority works. + ml = ml.Int(leftPieceIndex, rightPieceIndex) + } else { + ml = ml.Int(t.pieceRequestOrder[leftPieceIndex], t.pieceRequestOrder[rightPieceIndex]) + } + return ml.Less() } -func (p *peerRequests) Pop() interface{} { - last := len(p.requestIndexes) - 1 - x := p.requestIndexes[last] - p.requestIndexes = p.requestIndexes[:last] - return x +type desiredRequestState struct { + Requests desiredPeerRequests + Interested bool } -func (p *Peer) getDesiredRequestState() (desired requestState) { - input := p.t.cl.getRequestStrategyInput() - requestHeap := peerRequests{ - requestIndexes: nil, - peer: p, +func (p *Peer) getDesiredRequestState() (desired desiredRequestState) { + t := p.t + if !t.haveInfo() { + return } - for _, t := range input.Torrents { - if t.InfoHash == p.t.infoHash { - requestHeap.torrentStrategyInput = t - break - } + if t.closed.IsSet() { + return } - request_strategy.GetRequestablePieces( + input := t.getRequestStrategyInput() + requestHeap := desiredPeerRequests{ + peer: p, + pieceStates: t.requestPieceStates, + requestIndexes: t.requestIndexes, + } + // Caller-provided allocation for roaring bitmap iteration. + var it typedRoaring.Iterator[RequestIndex] + requestStrategy.GetRequestablePieces( input, - func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) { - if t.InfoHash != p.t.infoHash { + t.getPieceRequestOrder(), + func(ih InfoHash, pieceIndex int, pieceExtra requestStrategy.PieceRequestOrderState) { + if ih != t.infoHash { return } if !p.peerHasPiece(pieceIndex) { return } - rsp.IterPendingChunks.Iter(func(ci request_strategy.ChunkIndex) { - requestHeap.requestIndexes = append( - requestHeap.requestIndexes, - p.t.pieceRequestIndexOffset(pieceIndex)+ci) + requestHeap.pieceStates[pieceIndex] = pieceExtra + allowedFast := p.peerAllowedFast.Contains(pieceIndex) + t.iterUndirtiedRequestIndexesInPiece(&it, pieceIndex, func(r requestStrategy.RequestIndex) { + if !allowedFast { + // We must signal interest to request this. TODO: We could set interested if the + // peers pieces (minus the allowed fast set) overlap with our missing pieces if + // there are any readers, or any pending pieces. + desired.Interested = true + // We can make or will allow sustaining a request here if we're not choked, or + // have made the request previously (presumably while unchoked), and haven't had + // the peer respond yet (and the request was retained because we are using the + // fast extension). + if p.peerChoking && !p.requestState.Requests.Contains(r) { + // We can't request this right now. + return + } + } + if p.requestState.Cancelled.Contains(r) { + // Can't re-request while awaiting acknowledgement. + return + } + requestHeap.requestIndexes = append(requestHeap.requestIndexes, r) }) }, ) - heap.Init(&requestHeap) - for requestHeap.Len() != 0 && desired.Requests.GetCardinality() < uint64(p.nominalMaxRequests()) { - requestIndex := heap.Pop(&requestHeap).(RequestIndex) - pieceIndex := requestIndex / p.t.chunksPerRegularPiece() - allowedFast := p.peerAllowedFast.Contains(pieceIndex) - if !allowedFast { - desired.Interested = true - } - if allowedFast || !p.peerChoking { - desired.Requests.Add(requestIndex) - } - } + t.assertPendingRequests() + desired.Requests = requestHeap return } -func (p *Peer) applyNextRequestState() bool { +func (p *Peer) maybeUpdateActualRequestState() { + if p.closed.IsSet() { + return + } if p.needRequestUpdate == "" { - return true + return + } + if p.needRequestUpdate == peerUpdateRequestsTimerReason { + since := time.Since(p.lastRequestUpdate) + if since < updateRequestsTimerDuration { + panic(since) + } } - var more bool pprof.Do( context.Background(), pprof.Labels("update request", p.needRequestUpdate), func(_ context.Context) { next := p.getDesiredRequestState() - more = p.applyRequestState(next) + p.applyRequestState(next) + p.t.requestIndexes = next.Requests.requestIndexes[:0] }, ) - return more } -func (p *Peer) applyRequestState(next requestState) bool { - current := p.actualRequestState +// Transmit/action the request state to the peer. +func (p *Peer) applyRequestState(next desiredRequestState) { + current := &p.requestState if !p.setInterested(next.Interested) { - return false + return } more := true - cancel := roaring.AndNot(¤t.Requests, &next.Requests) - cancel.Iterate(func(req uint32) bool { - more = p.cancel(req) - return more - }) - if !more { - return false + requestHeap := heap.InterfaceForSlice(&next.Requests.requestIndexes, next.Requests.lessByValue) + heap.Init(requestHeap) + + t := p.t + originalRequestCount := current.Requests.GetCardinality() + // We're either here on a timer, or because we ran out of requests. Both are valid reasons to + // alter peakRequests. + if originalRequestCount != 0 && p.needRequestUpdate != peerUpdateRequestsTimerReason { + panic(fmt.Sprintf( + "expected zero existing requests (%v) for update reason %q", + originalRequestCount, p.needRequestUpdate)) } - next.Requests.Iterate(func(req uint32) bool { - // This could happen if the peer chokes us between the next state being generated, and us - // trying to transmit the state. - if p.peerChoking && !p.peerAllowedFast.Contains(bitmap.BitIndex(req/p.t.chunksPerRegularPiece())) { - return true + for requestHeap.Len() != 0 && maxRequests(current.Requests.GetCardinality()+current.Cancelled.GetCardinality()) < p.nominalMaxRequests() { + req := heap.Pop(requestHeap) + existing := t.requestingPeer(req) + if existing != nil && existing != p { + // Don't steal from the poor. + diff := int64(current.Requests.GetCardinality()) + 1 - (int64(existing.uncancelledRequests()) - 1) + // Steal a request that leaves us with one more request than the existing peer + // connection if the stealer more recently received a chunk. + if diff > 1 || (diff == 1 && p.lastUsefulChunkReceived.Before(existing.lastUsefulChunkReceived)) { + continue + } + t.cancelRequest(req) + } + more = p.mustRequest(req) + if !more { + break } - var err error - more, err = p.request(req) - if err != nil { - panic(err) - } /* else { - log.Print(req) - } */ - return more - }) - if more { - p.needRequestUpdate = "" } - return more + if !more { + // This might fail if we incorrectly determine that we can fit up to the maximum allowed + // requests into the available write buffer space. We don't want that to happen because it + // makes our peak requests dependent on how much was already in the buffer. + panic(fmt.Sprintf( + "couldn't fill apply entire request state [newRequests=%v]", + current.Requests.GetCardinality()-originalRequestCount)) + } + newPeakRequests := maxRequests(current.Requests.GetCardinality() - originalRequestCount) + // log.Printf( + // "requests %v->%v (peak %v->%v) reason %q (peer %v)", + // originalRequestCount, current.Requests.GetCardinality(), p.peakRequests, newPeakRequests, p.needRequestUpdate, p) + p.peakRequests = newPeakRequests + p.needRequestUpdate = "" + p.lastRequestUpdate = time.Now() + if enableUpdateRequestsTimer { + p.updateRequestsTimer.Reset(updateRequestsTimerDuration) + } } + +// This could be set to 10s to match the unchoke/request update interval recommended by some +// specifications. I've set it shorter to trigger it more often for testing for now. +const ( + updateRequestsTimerDuration = 3 * time.Second + enableUpdateRequestsTimer = false +)