X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=requesting.go;h=8b9db971b537630e65dd41ca9618f8a708f6a698;hb=HEAD;hp=462d681836e44a7d051ef8c535463614aa28fe56;hpb=344ada8b7c5e50be08d90d96fa8babfc1ae33e30;p=btrtrc.git diff --git a/requesting.go b/requesting.go index 462d6818..8b9db971 100644 --- a/requesting.go +++ b/requesting.go @@ -1,82 +1,33 @@ package torrent import ( - "container/heap" "context" "encoding/gob" - "math/rand" + "fmt" "reflect" "runtime/pprof" "time" "unsafe" - "github.com/RoaringBitmap/roaring" + "github.com/anacrolix/generics/heap" "github.com/anacrolix/log" "github.com/anacrolix/multiless" - request_strategy "github.com/anacrolix/torrent/request-strategy" + requestStrategy "github.com/anacrolix/torrent/request-strategy" + typedRoaring "github.com/anacrolix/torrent/typed-roaring" ) -func (cl *Client) tickleRequester() { - cl.updateRequests.Broadcast() -} +type ( + // Since we have to store all the requests in memory, we can't reasonably exceed what could be + // indexed with the memory space available. + maxRequests = int +) -func (cl *Client) getRequestStrategyInput() request_strategy.Input { - ts := make([]request_strategy.Torrent, 0, len(cl.torrents)) - for _, t := range cl.torrents { - if !t.haveInfo() { - // This would be removed if metadata is handled here. We have to guard against not - // knowing the piece size. If we have no info, we have no pieces too, so the end result - // is the same. - continue - } - rst := request_strategy.Torrent{ - InfoHash: t.infoHash, - ChunksPerPiece: t.chunksPerRegularPiece(), - } - if t.storage != nil { - rst.Capacity = t.storage.Capacity - } - rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces)) - for i := range t.pieces { - p := &t.pieces[i] - rst.Pieces = append(rst.Pieces, request_strategy.Piece{ - Request: !t.ignorePieceForRequests(i), - Priority: p.purePriority(), - Partial: t.piecePartiallyDownloaded(i), - Availability: p.availability, - Length: int64(p.length()), - NumPendingChunks: int(t.pieceNumPendingChunks(i)), - IterPendingChunks: &p.undirtiedChunksIter, - }) - } - t.iterPeers(func(p *Peer) { - if p.closed.IsSet() { - return - } - if p.piecesReceivedSinceLastRequestUpdate > p.maxPiecesReceivedBetweenRequestUpdates { - p.maxPiecesReceivedBetweenRequestUpdates = p.piecesReceivedSinceLastRequestUpdate - } - p.piecesReceivedSinceLastRequestUpdate = 0 - rst.Peers = append(rst.Peers, request_strategy.Peer{ - Pieces: *p.newPeerPieces(), - MaxRequests: p.nominalMaxRequests(), - ExistingRequests: p.actualRequestState.Requests, - Choking: p.peerChoking, - PieceAllowedFast: p.peerAllowedFast, - DownloadRate: p.downloadRate(), - Age: time.Since(p.completedHandshake), - Id: peerId{ - Peer: p, - ptr: uintptr(unsafe.Pointer(p)), - }, - }) - }) - ts = append(ts, rst) - } - return request_strategy.Input{ - Torrents: ts, - MaxUnverifiedBytes: cl.config.MaxUnverifiedBytes, +func (t *Torrent) requestStrategyPieceOrderState(i int) requestStrategy.PieceRequestOrderState { + return requestStrategy.PieceRequestOrderState{ + Priority: t.piece(i).purePriority(), + Partial: t.piecePartiallyDownloaded(i), + Availability: t.piece(i).availability(), } } @@ -118,39 +69,21 @@ func (p *peerId) GobDecode(b []byte) error { return nil } -type RequestIndex = request_strategy.RequestIndex -type chunkIndexType = request_strategy.ChunkIndex - -type peerRequests struct { - requestIndexes []RequestIndex - peer *Peer - torrentStrategyInput request_strategy.Torrent -} +type ( + RequestIndex = requestStrategy.RequestIndex + chunkIndexType = requestStrategy.ChunkIndex +) -func (p *peerRequests) Len() int { - return len(p.requestIndexes) +type desiredPeerRequests struct { + requestIndexes []RequestIndex + peer *Peer + pieceStates []requestStrategy.PieceRequestOrderState } -func (p *peerRequests) Less(i, j int) bool { - leftRequest := p.requestIndexes[i] - rightRequest := p.requestIndexes[j] +func (p *desiredPeerRequests) lessByValue(leftRequest, rightRequest RequestIndex) bool { t := p.peer.t - leftPieceIndex := leftRequest / p.torrentStrategyInput.ChunksPerPiece - rightPieceIndex := rightRequest / p.torrentStrategyInput.ChunksPerPiece - leftCurrent := p.peer.actualRequestState.Requests.Contains(leftRequest) - rightCurrent := p.peer.actualRequestState.Requests.Contains(rightRequest) - pending := func(index RequestIndex, current bool) int { - ret := t.pendingRequests.Get(index) - if current { - ret-- - } - // See https://github.com/anacrolix/torrent/issues/679 for possible issues. This should be - // resolved. - if ret < 0 { - panic(ret) - } - return ret - } + leftPieceIndex := t.pieceIndexOfRequestIndex(leftRequest) + rightPieceIndex := t.pieceIndexOfRequestIndex(rightRequest) ml := multiless.New() // Push requests that can't be served right now to the end. But we don't throw them away unless // there's a better alternative. This is for when we're using the fast extension and get choked @@ -161,163 +94,217 @@ func (p *peerRequests) Less(i, j int) bool { !p.peer.peerAllowedFast.Contains(rightPieceIndex), ) } + leftPiece := &p.pieceStates[leftPieceIndex] + rightPiece := &p.pieceStates[rightPieceIndex] + // Putting this first means we can steal requests from lesser-performing peers for our first few + // new requests. + priority := func() piecePriority { + // Technically we would be happy with the cached priority here, except we don't actually + // cache it anymore, and Torrent.piecePriority just does another lookup of *Piece to resolve + // the priority through Piece.purePriority, which is probably slower. + leftPriority := leftPiece.Priority + rightPriority := rightPiece.Priority + ml = ml.Int( + -int(leftPriority), + -int(rightPriority), + ) + if !ml.Ok() { + if leftPriority != rightPriority { + panic("expected equal") + } + } + return leftPriority + }() + if ml.Ok() { + return ml.MustLess() + } + leftRequestState := t.requestState[leftRequest] + rightRequestState := t.requestState[rightRequest] + leftPeer := leftRequestState.peer + rightPeer := rightRequestState.peer + // Prefer chunks already requested from this peer. + ml = ml.Bool(rightPeer == p.peer, leftPeer == p.peer) + // Prefer unrequested chunks. + ml = ml.Bool(rightPeer == nil, leftPeer == nil) + if ml.Ok() { + return ml.MustLess() + } + if leftPeer != nil { + // The right peer should also be set, or we'd have resolved the computation by now. + ml = ml.Uint64( + rightPeer.requestState.Requests.GetCardinality(), + leftPeer.requestState.Requests.GetCardinality(), + ) + // Could either of the lastRequested be Zero? That's what checking an existing peer is for. + leftLast := leftRequestState.when + rightLast := rightRequestState.when + if leftLast.IsZero() || rightLast.IsZero() { + panic("expected non-zero last requested times") + } + // We want the most-recently requested on the left. Clients like Transmission serve requests + // in received order, so the most recently-requested is the one that has the longest until + // it will be served and therefore is the best candidate to cancel. + ml = ml.CmpInt64(rightLast.Sub(leftLast).Nanoseconds()) + } ml = ml.Int( - pending(leftRequest, leftCurrent), - pending(rightRequest, rightCurrent)) - ml = ml.Bool(!leftCurrent, !rightCurrent) - ml = ml.Int( - -int(p.torrentStrategyInput.Pieces[leftPieceIndex].Priority), - -int(p.torrentStrategyInput.Pieces[rightPieceIndex].Priority), - ) - ml = ml.Int( - int(p.torrentStrategyInput.Pieces[leftPieceIndex].Availability), - int(p.torrentStrategyInput.Pieces[rightPieceIndex].Availability)) - ml = ml.Uint32(leftPieceIndex, rightPieceIndex) - ml = ml.Uint32(leftRequest, rightRequest) - return ml.MustLess() -} - -func (p *peerRequests) Swap(i, j int) { - p.requestIndexes[i], p.requestIndexes[j] = p.requestIndexes[j], p.requestIndexes[i] -} - -func (p *peerRequests) Push(x interface{}) { - p.requestIndexes = append(p.requestIndexes, x.(RequestIndex)) + leftPiece.Availability, + rightPiece.Availability) + if priority == PiecePriorityReadahead { + // TODO: For readahead in particular, it would be even better to consider distance from the + // reader position so that reads earlier in a torrent don't starve reads later in the + // torrent. This would probably require reconsideration of how readahead priority works. + ml = ml.Int(leftPieceIndex, rightPieceIndex) + } else { + ml = ml.Int(t.pieceRequestOrder[leftPieceIndex], t.pieceRequestOrder[rightPieceIndex]) + } + return ml.Less() } -func (p *peerRequests) Pop() interface{} { - last := len(p.requestIndexes) - 1 - x := p.requestIndexes[last] - p.requestIndexes = p.requestIndexes[:last] - return x +type desiredRequestState struct { + Requests desiredPeerRequests + Interested bool } -func (p *Peer) getDesiredRequestState() (desired requestState) { - input := p.t.cl.getRequestStrategyInput() - requestHeap := peerRequests{ - peer: p, +func (p *Peer) getDesiredRequestState() (desired desiredRequestState) { + t := p.t + if !t.haveInfo() { + return } - for _, t := range input.Torrents { - if t.InfoHash == p.t.infoHash { - requestHeap.torrentStrategyInput = t - break - } + if t.closed.IsSet() { + return } - request_strategy.GetRequestablePieces( + input := t.getRequestStrategyInput() + requestHeap := desiredPeerRequests{ + peer: p, + pieceStates: t.requestPieceStates, + requestIndexes: t.requestIndexes, + } + // Caller-provided allocation for roaring bitmap iteration. + var it typedRoaring.Iterator[RequestIndex] + requestStrategy.GetRequestablePieces( input, - func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) { - if t.InfoHash != p.t.infoHash { + t.getPieceRequestOrder(), + func(ih InfoHash, pieceIndex int, pieceExtra requestStrategy.PieceRequestOrderState) { + if ih != t.infoHash { return } if !p.peerHasPiece(pieceIndex) { return } - allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex) - rsp.IterPendingChunks.Iter(func(ci request_strategy.ChunkIndex) { - r := p.t.pieceRequestIndexOffset(pieceIndex) + ci - //if p.t.pendingRequests.Get(r) != 0 && !p.actualRequestState.Requests.Contains(r) { - // return - //} + requestHeap.pieceStates[pieceIndex] = pieceExtra + allowedFast := p.peerAllowedFast.Contains(pieceIndex) + t.iterUndirtiedRequestIndexesInPiece(&it, pieceIndex, func(r requestStrategy.RequestIndex) { if !allowedFast { - // We must signal interest to request this + // We must signal interest to request this. TODO: We could set interested if the + // peers pieces (minus the allowed fast set) overlap with our missing pieces if + // there are any readers, or any pending pieces. desired.Interested = true // We can make or will allow sustaining a request here if we're not choked, or // have made the request previously (presumably while unchoked), and haven't had // the peer respond yet (and the request was retained because we are using the // fast extension). - if p.peerChoking && !p.actualRequestState.Requests.Contains(r) { + if p.peerChoking && !p.requestState.Requests.Contains(r) { // We can't request this right now. return } } + if p.requestState.Cancelled.Contains(r) { + // Can't re-request while awaiting acknowledgement. + return + } requestHeap.requestIndexes = append(requestHeap.requestIndexes, r) }) }, ) - p.t.assertPendingRequests() - heap.Init(&requestHeap) - for requestHeap.Len() != 0 && desired.Requests.GetCardinality() < uint64(p.nominalMaxRequests()) { - requestIndex := heap.Pop(&requestHeap).(RequestIndex) - desired.Requests.Add(requestIndex) - } + t.assertPendingRequests() + desired.Requests = requestHeap return } -func (p *Peer) maybeUpdateActualRequestState() bool { +func (p *Peer) maybeUpdateActualRequestState() { + if p.closed.IsSet() { + return + } if p.needRequestUpdate == "" { - return true + return + } + if p.needRequestUpdate == peerUpdateRequestsTimerReason { + since := time.Since(p.lastRequestUpdate) + if since < updateRequestsTimerDuration { + panic(since) + } } - var more bool pprof.Do( context.Background(), pprof.Labels("update request", p.needRequestUpdate), func(_ context.Context) { next := p.getDesiredRequestState() - more = p.applyRequestState(next) + p.applyRequestState(next) + p.t.requestIndexes = next.Requests.requestIndexes[:0] }, ) - return more } // Transmit/action the request state to the peer. -func (p *Peer) applyRequestState(next requestState) bool { - current := &p.actualRequestState +func (p *Peer) applyRequestState(next desiredRequestState) { + current := &p.requestState if !p.setInterested(next.Interested) { - return false + return } more := true - cancel := roaring.AndNot(¤t.Requests, &next.Requests) - cancel.Iterate(func(req uint32) bool { - more = p.cancel(req) - return more - }) - if !more { - return false + requestHeap := heap.InterfaceForSlice(&next.Requests.requestIndexes, next.Requests.lessByValue) + heap.Init(requestHeap) + + t := p.t + originalRequestCount := current.Requests.GetCardinality() + // We're either here on a timer, or because we ran out of requests. Both are valid reasons to + // alter peakRequests. + if originalRequestCount != 0 && p.needRequestUpdate != peerUpdateRequestsTimerReason { + panic(fmt.Sprintf( + "expected zero existing requests (%v) for update reason %q", + originalRequestCount, p.needRequestUpdate)) } - // We randomize the order in which requests are issued, to reduce the overlap with requests to - // other peers. Note that although it really depends on what order the peer services the - // requests, if we are only able to issue some requests before buffering, or the peer starts - // handling our requests before they've all arrived, then this randomization should reduce - // overlap. Note however that if we received the desired requests in priority order, then - // randomizing would throw away that benefit. - for _, x := range rand.Perm(int(next.Requests.GetCardinality())) { - req, err := next.Requests.Select(uint32(x)) - if err != nil { - panic(err) - } - if p.cancelledRequests.Contains(req) { - // Waiting for a reject or piece message, which will suitably trigger us to update our - // requests, so we can skip this one with no additional consideration. - continue - } - // The cardinality of our desired requests shouldn't exceed the max requests since it's used - // in the calculation of the requests. However if we cancelled requests and they haven't - // been rejected or serviced yet with the fast extension enabled, we can end up with more - // extra outstanding requests. We could subtract the number of outstanding cancels from the - // next request cardinality, but peers might not like that. - if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() { - //log.Printf("not assigning all requests [desired=%v, cancelled=%v, current=%v, max=%v]", - // next.Requests.GetCardinality(), - // p.cancelledRequests.GetCardinality(), - // current.Requests.GetCardinality(), - // p.nominalMaxRequests(), - //) - break - } - more, err = p.request(req) - if err != nil { - panic(err) + for requestHeap.Len() != 0 && maxRequests(current.Requests.GetCardinality()+current.Cancelled.GetCardinality()) < p.nominalMaxRequests() { + req := heap.Pop(requestHeap) + existing := t.requestingPeer(req) + if existing != nil && existing != p { + // Don't steal from the poor. + diff := int64(current.Requests.GetCardinality()) + 1 - (int64(existing.uncancelledRequests()) - 1) + // Steal a request that leaves us with one more request than the existing peer + // connection if the stealer more recently received a chunk. + if diff > 1 || (diff == 1 && p.lastUsefulChunkReceived.Before(existing.lastUsefulChunkReceived)) { + continue + } + t.cancelRequest(req) } + more = p.mustRequest(req) if !more { break } } - p.updateRequestsTimer.Stop() - if more { - p.needRequestUpdate = "" - if !current.Requests.IsEmpty() { - p.updateRequestsTimer.Reset(3 * time.Second) - } + if !more { + // This might fail if we incorrectly determine that we can fit up to the maximum allowed + // requests into the available write buffer space. We don't want that to happen because it + // makes our peak requests dependent on how much was already in the buffer. + panic(fmt.Sprintf( + "couldn't fill apply entire request state [newRequests=%v]", + current.Requests.GetCardinality()-originalRequestCount)) + } + newPeakRequests := maxRequests(current.Requests.GetCardinality() - originalRequestCount) + // log.Printf( + // "requests %v->%v (peak %v->%v) reason %q (peer %v)", + // originalRequestCount, current.Requests.GetCardinality(), p.peakRequests, newPeakRequests, p.needRequestUpdate, p) + p.peakRequests = newPeakRequests + p.needRequestUpdate = "" + p.lastRequestUpdate = time.Now() + if enableUpdateRequestsTimer { + p.updateRequestsTimer.Reset(updateRequestsTimerDuration) } - return more } + +// This could be set to 10s to match the unchoke/request update interval recommended by some +// specifications. I've set it shorter to trigger it more often for testing for now. +const ( + updateRequestsTimerDuration = 3 * time.Second + enableUpdateRequestsTimer = false +)