package torrent
import (
+ "container/heap"
"encoding/gob"
"reflect"
"time"
"github.com/anacrolix/chansync/events"
"github.com/anacrolix/log"
"github.com/anacrolix/missinggo/v2/bitmap"
+ "github.com/anacrolix/multiless"
request_strategy "github.com/anacrolix/torrent/request-strategy"
)
// Calculate requests individually for each peer.
-const peerRequesting = false
+const peerRequesting = true
func (cl *Client) requester() {
for {
type RequestIndex = request_strategy.RequestIndex
type chunkIndexType = request_strategy.ChunkIndex
-func (p *Peer) applyNextRequestState() bool {
- if peerRequesting {
- if p.actualRequestState.Requests.GetCardinality() > uint64(p.nominalMaxRequests()/2) {
- return true
+type peerRequests struct {
+ requestIndexes []RequestIndex
+ peer *Peer
+ torrentStrategyInput request_strategy.Torrent
+}
+
+func (p peerRequests) Len() int {
+ return len(p.requestIndexes)
+}
+
+func (p peerRequests) Less(i, j int) bool {
+ leftRequest := p.requestIndexes[i]
+ rightRequest := p.requestIndexes[j]
+ t := p.peer.t
+ leftPieceIndex := leftRequest / t.chunksPerRegularPiece()
+ rightPieceIndex := rightRequest / t.chunksPerRegularPiece()
+ leftCurrent := p.peer.actualRequestState.Requests.Contains(leftRequest)
+ rightCurrent := p.peer.actualRequestState.Requests.Contains(rightRequest)
+ pending := func(index RequestIndex, current bool) int {
+ ret := t.pendingRequests[index]
+ if current {
+ ret--
}
- type piece struct {
- index int
- endGame bool
+ return ret
+ }
+ ml := multiless.New()
+ ml = ml.Int(
+ pending(leftRequest, leftCurrent),
+ pending(rightRequest, rightCurrent))
+ ml = ml.Bool(rightCurrent, leftCurrent)
+ ml = ml.Int(
+ int(p.torrentStrategyInput.Pieces[leftPieceIndex].Priority),
+ int(p.torrentStrategyInput.Pieces[rightPieceIndex].Priority))
+ ml = ml.Int(
+ int(p.torrentStrategyInput.Pieces[leftPieceIndex].Availability),
+ int(p.torrentStrategyInput.Pieces[rightPieceIndex].Availability))
+ ml = ml.Uint32(leftPieceIndex, rightPieceIndex)
+ ml = ml.Uint32(leftRequest, rightRequest)
+ return ml.MustLess()
+}
+
+func (p peerRequests) Swap(i, j int) {
+ p.requestIndexes[i], p.requestIndexes[j] = p.requestIndexes[j], p.requestIndexes[i]
+}
+
+func (p *peerRequests) Push(x interface{}) {
+ p.requestIndexes = append(p.requestIndexes, x.(RequestIndex))
+}
+
+func (p *peerRequests) Pop() interface{} {
+ last := len(p.requestIndexes) - 1
+ x := p.requestIndexes[last]
+ p.requestIndexes = p.requestIndexes[:last]
+ return x
+}
+
+func (p *Peer) getDesiredRequestState() (desired requestState) {
+ input := p.t.cl.getRequestStrategyInput()
+ requestHeap := peerRequests{
+ requestIndexes: nil,
+ peer: p,
+ }
+ for _, t := range input.Torrents {
+ if t.InfoHash == p.t.infoHash {
+ requestHeap.torrentStrategyInput = t
+ break
}
- var pieceOrder []piece
- request_strategy.GetRequestablePieces(
- p.t.cl.getRequestStrategyInput(),
- func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
- if t.InfoHash != p.t.infoHash {
- return
- }
- if !p.peerHasPiece(pieceIndex) {
- return
- }
- pieceOrder = append(pieceOrder, piece{
- index: pieceIndex,
- endGame: rsp.Priority == PiecePriorityNow,
- })
- },
- )
- more := true
- interested := false
- for _, endGameIter := range []bool{false, true} {
- for _, piece := range pieceOrder {
- tp := p.t.piece(piece.index)
- tp.iterUndirtiedChunks(func(cs chunkIndexType) {
- req := cs + tp.requestIndexOffset()
- if !piece.endGame && !endGameIter && p.t.pendingRequests[req] > 0 {
- return
- }
- interested = true
- more = p.setInterested(true)
- if !more {
- return
- }
- if maxRequests(p.actualRequestState.Requests.GetCardinality()) >= p.nominalMaxRequests() {
- return
- }
- if p.peerChoking && !p.peerAllowedFast.Contains(bitmap.BitIndex(piece.index)) {
- return
- }
- var err error
- more, err = p.request(req)
- if err != nil {
- panic(err)
- }
- })
- if interested && maxRequests(p.actualRequestState.Requests.GetCardinality()) >= p.nominalMaxRequests() {
- break
- }
- if !more {
- break
- }
+ }
+ request_strategy.GetRequestablePieces(
+ input,
+ func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
+ if t.InfoHash != p.t.infoHash {
+ return
}
- if !more {
- break
+ if !p.peerHasPiece(pieceIndex) {
+ return
}
+ rsp.IterPendingChunks.Iter(func(ci request_strategy.ChunkIndex) {
+ requestHeap.requestIndexes = append(
+ requestHeap.requestIndexes,
+ p.t.pieceRequestIndexOffset(pieceIndex)+ci)
+ })
+ },
+ )
+ heap.Init(&requestHeap)
+ for requestHeap.Len() != 0 && desired.Requests.GetCardinality() < uint64(p.nominalMaxRequests()) {
+ requestIndex := heap.Pop(&requestHeap).(RequestIndex)
+ pieceIndex := requestIndex / p.t.chunksPerRegularPiece()
+ allowedFast := p.peerAllowedFast.Contains(pieceIndex)
+ if !allowedFast {
+ desired.Interested = true
}
- if !more {
- return false
- }
- if !interested {
- p.setInterested(false)
+ if allowedFast || !p.peerChoking {
+ desired.Requests.Add(requestIndex)
}
- return more
}
+ return
+}
+func (p *Peer) applyNextRequestState() bool {
next := p.nextRequestState
current := p.actualRequestState
if !p.setInterested(next.Interested) {