X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=requesting.go;h=8b9db971b537630e65dd41ca9618f8a708f6a698;hb=HEAD;hp=7e42babacf5ccf087b28bcc70b067aab72eebe54;hpb=cfb23e271f1f509cc39e59484c9bfe3e0075990d;p=btrtrc.git diff --git a/requesting.go b/requesting.go index 7e42baba..8b9db971 100644 --- a/requesting.go +++ b/requesting.go @@ -1,146 +1,310 @@ package torrent import ( + "context" + "encoding/gob" + "fmt" + "reflect" + "runtime/pprof" "time" "unsafe" - "github.com/anacrolix/missinggo/v2/bitmap" + "github.com/anacrolix/generics/heap" + "github.com/anacrolix/log" + "github.com/anacrolix/multiless" - "github.com/anacrolix/chansync" - request_strategy "github.com/anacrolix/torrent/request-strategy" + requestStrategy "github.com/anacrolix/torrent/request-strategy" + typedRoaring "github.com/anacrolix/torrent/typed-roaring" ) -func (cl *Client) requester() { - for { - update := func() chansync.Signaled { - cl.lock() - defer cl.unlock() - cl.doRequests() - return cl.updateRequests.Signaled() - }() - minWait := time.After(100 * time.Millisecond) - maxWait := time.After(1000 * time.Millisecond) - select { - case <-cl.closed.Done(): - return - case <-minWait: - case <-maxWait: - } - select { - case <-cl.closed.Done(): - return - case <-update: - case <-maxWait: - } +type ( + // Since we have to store all the requests in memory, we can't reasonably exceed what could be + // indexed with the memory space available. + maxRequests = int +) + +func (t *Torrent) requestStrategyPieceOrderState(i int) requestStrategy.PieceRequestOrderState { + return requestStrategy.PieceRequestOrderState{ + Priority: t.piece(i).purePriority(), + Partial: t.piecePartiallyDownloaded(i), + Availability: t.piece(i).availability(), } } -func (cl *Client) tickleRequester() { - cl.updateRequests.Broadcast() +func init() { + gob.Register(peerId{}) } -func (cl *Client) getRequestStrategyInput() request_strategy.Input { - ts := make([]request_strategy.Torrent, 0, len(cl.torrents)) - for _, t := range cl.torrents { - rst := request_strategy.Torrent{ - StableId: uintptr(unsafe.Pointer(t)), - } - if t.storage != nil { - rst.Capacity = t.storage.Capacity - } - rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces)) - for i := range t.pieces { - p := &t.pieces[i] - rst.Pieces = append(rst.Pieces, request_strategy.Piece{ - Request: !t.ignorePieceForRequests(i), - Priority: p.purePriority(), - Partial: t.piecePartiallyDownloaded(i), - Availability: p.availability, - Length: int64(p.length()), - NumPendingChunks: int(t.pieceNumPendingChunks(i)), - IterPendingChunks: p.iterUndirtiedChunks, - }) - } - t.iterPeers(func(p *Peer) { - if p.closed.IsSet() { - return - } - if p.piecesReceivedSinceLastRequestUpdate > p.maxPiecesReceivedBetweenRequestUpdates { - p.maxPiecesReceivedBetweenRequestUpdates = p.piecesReceivedSinceLastRequestUpdate - } - p.piecesReceivedSinceLastRequestUpdate = 0 - rst.Peers = append(rst.Peers, request_strategy.Peer{ - HasPiece: p.peerHasPiece, - MaxRequests: p.nominalMaxRequests(), - HasExistingRequest: func(r request_strategy.Request) bool { - _, ok := p.actualRequestState.Requests[r] - return ok - }, - Choking: p.peerChoking, - PieceAllowedFast: func(i pieceIndex) bool { - return p.peerAllowedFast.Contains(bitmap.BitIndex(i)) - }, - DownloadRate: p.downloadRate(), - Age: time.Since(p.completedHandshake), - Id: peerId{ - Peer: p, - ptr: uintptr(unsafe.Pointer(p)), - }, - }) - }) - ts = append(ts, rst) +type peerId struct { + *Peer + ptr uintptr +} + +func (p peerId) Uintptr() uintptr { + return p.ptr +} + +func (p peerId) GobEncode() (b []byte, _ error) { + *(*reflect.SliceHeader)(unsafe.Pointer(&b)) = reflect.SliceHeader{ + Data: uintptr(unsafe.Pointer(&p.ptr)), + Len: int(unsafe.Sizeof(p.ptr)), + Cap: int(unsafe.Sizeof(p.ptr)), } - return request_strategy.Input{ - Torrents: ts, - MaxUnverifiedBytes: cl.config.MaxUnverifiedBytes, + return +} + +func (p *peerId) GobDecode(b []byte) error { + if uintptr(len(b)) != unsafe.Sizeof(p.ptr) { + panic(len(b)) } + ptr := unsafe.Pointer(&b[0]) + p.ptr = *(*uintptr)(ptr) + log.Printf("%p", ptr) + dst := reflect.SliceHeader{ + Data: uintptr(unsafe.Pointer(&p.Peer)), + Len: int(unsafe.Sizeof(p.Peer)), + Cap: int(unsafe.Sizeof(p.Peer)), + } + copy(*(*[]byte)(unsafe.Pointer(&dst)), b) + return nil } -func (cl *Client) doRequests() { - nextPeerStates := request_strategy.Run(cl.getRequestStrategyInput()) - for p, state := range nextPeerStates { - setPeerNextRequestState(p, state) +type ( + RequestIndex = requestStrategy.RequestIndex + chunkIndexType = requestStrategy.ChunkIndex +) + +type desiredPeerRequests struct { + requestIndexes []RequestIndex + peer *Peer + pieceStates []requestStrategy.PieceRequestOrderState +} + +func (p *desiredPeerRequests) lessByValue(leftRequest, rightRequest RequestIndex) bool { + t := p.peer.t + leftPieceIndex := t.pieceIndexOfRequestIndex(leftRequest) + rightPieceIndex := t.pieceIndexOfRequestIndex(rightRequest) + ml := multiless.New() + // Push requests that can't be served right now to the end. But we don't throw them away unless + // there's a better alternative. This is for when we're using the fast extension and get choked + // but our requests could still be good when we get unchoked. + if p.peer.peerChoking { + ml = ml.Bool( + !p.peer.peerAllowedFast.Contains(leftPieceIndex), + !p.peer.peerAllowedFast.Contains(rightPieceIndex), + ) } + leftPiece := &p.pieceStates[leftPieceIndex] + rightPiece := &p.pieceStates[rightPieceIndex] + // Putting this first means we can steal requests from lesser-performing peers for our first few + // new requests. + priority := func() piecePriority { + // Technically we would be happy with the cached priority here, except we don't actually + // cache it anymore, and Torrent.piecePriority just does another lookup of *Piece to resolve + // the priority through Piece.purePriority, which is probably slower. + leftPriority := leftPiece.Priority + rightPriority := rightPiece.Priority + ml = ml.Int( + -int(leftPriority), + -int(rightPriority), + ) + if !ml.Ok() { + if leftPriority != rightPriority { + panic("expected equal") + } + } + return leftPriority + }() + if ml.Ok() { + return ml.MustLess() + } + leftRequestState := t.requestState[leftRequest] + rightRequestState := t.requestState[rightRequest] + leftPeer := leftRequestState.peer + rightPeer := rightRequestState.peer + // Prefer chunks already requested from this peer. + ml = ml.Bool(rightPeer == p.peer, leftPeer == p.peer) + // Prefer unrequested chunks. + ml = ml.Bool(rightPeer == nil, leftPeer == nil) + if ml.Ok() { + return ml.MustLess() + } + if leftPeer != nil { + // The right peer should also be set, or we'd have resolved the computation by now. + ml = ml.Uint64( + rightPeer.requestState.Requests.GetCardinality(), + leftPeer.requestState.Requests.GetCardinality(), + ) + // Could either of the lastRequested be Zero? That's what checking an existing peer is for. + leftLast := leftRequestState.when + rightLast := rightRequestState.when + if leftLast.IsZero() || rightLast.IsZero() { + panic("expected non-zero last requested times") + } + // We want the most-recently requested on the left. Clients like Transmission serve requests + // in received order, so the most recently-requested is the one that has the longest until + // it will be served and therefore is the best candidate to cancel. + ml = ml.CmpInt64(rightLast.Sub(leftLast).Nanoseconds()) + } + ml = ml.Int( + leftPiece.Availability, + rightPiece.Availability) + if priority == PiecePriorityReadahead { + // TODO: For readahead in particular, it would be even better to consider distance from the + // reader position so that reads earlier in a torrent don't starve reads later in the + // torrent. This would probably require reconsideration of how readahead priority works. + ml = ml.Int(leftPieceIndex, rightPieceIndex) + } else { + ml = ml.Int(t.pieceRequestOrder[leftPieceIndex], t.pieceRequestOrder[rightPieceIndex]) + } + return ml.Less() } -type peerId struct { - *Peer - ptr uintptr +type desiredRequestState struct { + Requests desiredPeerRequests + Interested bool } -func (p peerId) Uintptr() uintptr { - return p.ptr +func (p *Peer) getDesiredRequestState() (desired desiredRequestState) { + t := p.t + if !t.haveInfo() { + return + } + if t.closed.IsSet() { + return + } + input := t.getRequestStrategyInput() + requestHeap := desiredPeerRequests{ + peer: p, + pieceStates: t.requestPieceStates, + requestIndexes: t.requestIndexes, + } + // Caller-provided allocation for roaring bitmap iteration. + var it typedRoaring.Iterator[RequestIndex] + requestStrategy.GetRequestablePieces( + input, + t.getPieceRequestOrder(), + func(ih InfoHash, pieceIndex int, pieceExtra requestStrategy.PieceRequestOrderState) { + if ih != t.infoHash { + return + } + if !p.peerHasPiece(pieceIndex) { + return + } + requestHeap.pieceStates[pieceIndex] = pieceExtra + allowedFast := p.peerAllowedFast.Contains(pieceIndex) + t.iterUndirtiedRequestIndexesInPiece(&it, pieceIndex, func(r requestStrategy.RequestIndex) { + if !allowedFast { + // We must signal interest to request this. TODO: We could set interested if the + // peers pieces (minus the allowed fast set) overlap with our missing pieces if + // there are any readers, or any pending pieces. + desired.Interested = true + // We can make or will allow sustaining a request here if we're not choked, or + // have made the request previously (presumably while unchoked), and haven't had + // the peer respond yet (and the request was retained because we are using the + // fast extension). + if p.peerChoking && !p.requestState.Requests.Contains(r) { + // We can't request this right now. + return + } + } + if p.requestState.Cancelled.Contains(r) { + // Can't re-request while awaiting acknowledgement. + return + } + requestHeap.requestIndexes = append(requestHeap.requestIndexes, r) + }) + }, + ) + t.assertPendingRequests() + desired.Requests = requestHeap + return } -func setPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.PeerNextRequestState) { - p := _p.(peerId).Peer - p.nextRequestState = rp - p.onNextRequestStateChanged() +func (p *Peer) maybeUpdateActualRequestState() { + if p.closed.IsSet() { + return + } + if p.needRequestUpdate == "" { + return + } + if p.needRequestUpdate == peerUpdateRequestsTimerReason { + since := time.Since(p.lastRequestUpdate) + if since < updateRequestsTimerDuration { + panic(since) + } + } + pprof.Do( + context.Background(), + pprof.Labels("update request", p.needRequestUpdate), + func(_ context.Context) { + next := p.getDesiredRequestState() + p.applyRequestState(next) + p.t.requestIndexes = next.Requests.requestIndexes[:0] + }, + ) } -func (p *Peer) applyNextRequestState() bool { - next := p.nextRequestState - current := p.actualRequestState +// Transmit/action the request state to the peer. +func (p *Peer) applyRequestState(next desiredRequestState) { + current := &p.requestState if !p.setInterested(next.Interested) { - return false + return } - for req := range current.Requests { - if _, ok := next.Requests[req]; !ok { - if !p.cancel(req) { - return false + more := true + requestHeap := heap.InterfaceForSlice(&next.Requests.requestIndexes, next.Requests.lessByValue) + heap.Init(requestHeap) + + t := p.t + originalRequestCount := current.Requests.GetCardinality() + // We're either here on a timer, or because we ran out of requests. Both are valid reasons to + // alter peakRequests. + if originalRequestCount != 0 && p.needRequestUpdate != peerUpdateRequestsTimerReason { + panic(fmt.Sprintf( + "expected zero existing requests (%v) for update reason %q", + originalRequestCount, p.needRequestUpdate)) + } + for requestHeap.Len() != 0 && maxRequests(current.Requests.GetCardinality()+current.Cancelled.GetCardinality()) < p.nominalMaxRequests() { + req := heap.Pop(requestHeap) + existing := t.requestingPeer(req) + if existing != nil && existing != p { + // Don't steal from the poor. + diff := int64(current.Requests.GetCardinality()) + 1 - (int64(existing.uncancelledRequests()) - 1) + // Steal a request that leaves us with one more request than the existing peer + // connection if the stealer more recently received a chunk. + if diff > 1 || (diff == 1 && p.lastUsefulChunkReceived.Before(existing.lastUsefulChunkReceived)) { + continue } + t.cancelRequest(req) } - } - for req := range next.Requests { - more, err := p.request(req) - if err != nil { - panic(err) - } /* else { - log.Print(req) - } */ + more = p.mustRequest(req) if !more { - return false + break } } - return true + if !more { + // This might fail if we incorrectly determine that we can fit up to the maximum allowed + // requests into the available write buffer space. We don't want that to happen because it + // makes our peak requests dependent on how much was already in the buffer. + panic(fmt.Sprintf( + "couldn't fill apply entire request state [newRequests=%v]", + current.Requests.GetCardinality()-originalRequestCount)) + } + newPeakRequests := maxRequests(current.Requests.GetCardinality() - originalRequestCount) + // log.Printf( + // "requests %v->%v (peak %v->%v) reason %q (peer %v)", + // originalRequestCount, current.Requests.GetCardinality(), p.peakRequests, newPeakRequests, p.needRequestUpdate, p) + p.peakRequests = newPeakRequests + p.needRequestUpdate = "" + p.lastRequestUpdate = time.Now() + if enableUpdateRequestsTimer { + p.updateRequestsTimer.Reset(updateRequestsTimerDuration) + } } + +// This could be set to 10s to match the unchoke/request update interval recommended by some +// specifications. I've set it shorter to trigger it more often for testing for now. +const ( + updateRequestsTimerDuration = 3 * time.Second + enableUpdateRequestsTimer = false +)