"github.com/anacrolix/missinggo/v2/pproffd"
"github.com/anacrolix/sync"
"github.com/anacrolix/torrent/internal/limiter"
+ request_strategy "github.com/anacrolix/torrent/request-strategy"
"github.com/anacrolix/torrent/tracker"
"github.com/anacrolix/torrent/webtorrent"
"github.com/davecgh/go-spew/spew"
activeAnnounceLimiter limiter.Instance
- pieceRequestOrder clientPieceRequestOrder
+ pieceRequestOrder request_strategy.ClientPieceOrder
}
type ipStr string
t: tt,
}}
cn.peerImpl = cn
+ cl.lock()
+ defer cl.unlock()
assert.NoError(t, cn.peerSentHave(0))
assert.Error(t, cn.peerSentHave(1))
}
"net"
"github.com/anacrolix/missinggo/v2"
+ "github.com/anacrolix/torrent/types"
"golang.org/x/time/rate"
"github.com/anacrolix/torrent/metainfo"
pp "github.com/anacrolix/torrent/peer_protocol"
)
-type ChunkSpec struct {
- Begin, Length pp.Integer
-}
-
-type Request struct {
- Index pp.Integer
- ChunkSpec
-}
+type (
+ Request = types.Request
+ ChunkSpec = types.ChunkSpec
+ piecePriority = types.PiecePriority
+)
-func (r Request) ToMsg(mt pp.MessageType) pp.Message {
- return pp.Message{
- Type: mt,
- Index: r.Index,
- Begin: r.Begin,
- Length: r.Length,
- }
-}
+const (
+ PiecePriorityNormal = types.PiecePriorityNormal
+ PiecePriorityNone = types.PiecePriorityNone
+ PiecePriorityNow = types.PiecePriorityNow
+ PiecePriorityReadahead = types.PiecePriorityReadahead
+ PiecePriorityNext = types.PiecePriorityNext
+ PiecePriorityHigh = types.PiecePriorityHigh
+)
func newRequest(index, begin, length pp.Integer) Request {
return Request{index, ChunkSpec{begin, length}}
"github.com/anacrolix/torrent/storage"
)
-// Describes the importance of obtaining a particular piece.
-type piecePriority byte
-
-func (pp *piecePriority) Raise(maybe piecePriority) bool {
- if maybe > *pp {
- *pp = maybe
- return true
- }
- return false
-}
-
-// Priority for use in PriorityBitmap
-func (me piecePriority) BitmapPriority() int {
- return -int(me)
-}
-
-const (
- PiecePriorityNone piecePriority = iota // Not wanted. Must be the zero value.
- PiecePriorityNormal // Wanted.
- PiecePriorityHigh // Wanted a lot.
- PiecePriorityReadahead // May be required soon.
- // Succeeds a piece where a read occurred. Currently the same as Now,
- // apparently due to issues with caching.
- PiecePriorityNext
- PiecePriorityNow // A Reader is reading in this piece. Highest urgency.
-)
-
type Piece struct {
// The completed piece SHA1 hash, from the metainfo "pieces" field.
hash *metainfo.Hash
return p.t.PieceState(p.index)
}
-func (p *Piece) iterUndirtiedChunks(f func(ChunkSpec) bool) bool {
+func (p *Piece) iterUndirtiedChunks(f func(cs ChunkSpec) bool) bool {
for i := pp.Integer(0); i < p.numChunks(); i++ {
if p.chunkIndexDirty(i) {
continue
package torrent
import (
- "sort"
+ "log"
"time"
"unsafe"
- "github.com/anacrolix/multiless"
- pp "github.com/anacrolix/torrent/peer_protocol"
- "github.com/bradfitz/iter"
+ request_strategy "github.com/anacrolix/torrent/request-strategy"
+ "github.com/anacrolix/torrent/types"
)
-type clientPieceRequestOrder struct {
- pieces []pieceRequestOrderPiece
-}
-
-type pieceRequestOrderPiece struct {
- t *Torrent
- index pieceIndex
- prio piecePriority
- partial bool
- availability int64
- request bool
-}
-
-func (me *clientPieceRequestOrder) Len() int {
- return len(me.pieces)
-}
-
-func (me clientPieceRequestOrder) sort() {
- sort.Slice(me.pieces, me.less)
-}
-
-func (me clientPieceRequestOrder) less(_i, _j int) bool {
- i := me.pieces[_i]
- j := me.pieces[_j]
- return multiless.New().Int(
- int(j.prio), int(i.prio),
- ).Bool(
- j.partial, i.partial,
- ).Int64(i.availability, j.availability).Int(i.index, j.index).Less()
-}
-
func (cl *Client) requester() {
for {
func() {
}
}
-type requestsPeer struct {
- cur *Peer
- nextRequests map[Request]struct{}
- nextInterest bool
- requestablePiecesRemaining int
-}
-
-func (rp *requestsPeer) canRequestPiece(p pieceIndex) bool {
- return rp.hasPiece(p) && (!rp.choking() || rp.pieceAllowedFast(p))
-}
-
-func (rp *requestsPeer) hasPiece(i pieceIndex) bool {
- return rp.cur.peerHasPiece(i)
-}
-
-func (rp *requestsPeer) pieceAllowedFast(p pieceIndex) bool {
- return rp.cur.peerAllowedFast.Contains(p)
-}
-
-func (rp *requestsPeer) choking() bool {
- return rp.cur.peerChoking
-}
-
-func (rp *requestsPeer) hasExistingRequest(r Request) bool {
- _, ok := rp.cur.requests[r]
- return ok
-}
-
-func (rp *requestsPeer) canFitRequest() bool {
- return len(rp.nextRequests) < rp.cur.nominalMaxRequests()
-}
-
-// Returns true if it is added and wasn't there before.
-func (rp *requestsPeer) addNextRequest(r Request) bool {
- _, ok := rp.nextRequests[r]
- if ok {
- return false
- }
- rp.nextRequests[r] = struct{}{}
- return true
-}
-
-type peersForPieceRequests struct {
- requestsInPiece int
- *requestsPeer
-}
-
-func (me *peersForPieceRequests) addNextRequest(r Request) {
- if me.requestsPeer.addNextRequest(r) {
- return
- me.requestsInPiece++
- }
-}
-
func (cl *Client) doRequests() {
- requestOrder := &cl.pieceRequestOrder
- requestOrder.pieces = requestOrder.pieces[:0]
- allPeers := make(map[*Torrent][]*requestsPeer)
- // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
- // TorrentImpl.
- storageLeft := make(map[*func() *int64]*int64)
+ ts := make([]*request_strategy.Torrent, 0, len(cl.torrents))
for _, t := range cl.torrents {
- // TODO: We could do metainfo requests here.
- if !t.haveInfo() {
- continue
+ rst := &request_strategy.Torrent{}
+ if t.storage != nil {
+ rst.Capacity = t.storage.Capacity
}
- key := t.storage.Capacity
- if key != nil {
- if _, ok := storageLeft[key]; !ok {
- storageLeft[key] = (*key)()
- }
- }
- var peers []*requestsPeer
- t.iterPeers(func(p *Peer) {
- if !p.closed.IsSet() {
- peers = append(peers, &requestsPeer{
- cur: p,
- nextRequests: make(map[Request]struct{}),
- })
- }
- })
- for i := range iter.N(t.numPieces()) {
- tp := t.piece(i)
- pp := tp.purePriority()
- request := !t.ignorePieceForRequests(i)
- requestOrder.pieces = append(requestOrder.pieces, pieceRequestOrderPiece{
- t: t,
- index: i,
- prio: pp,
- partial: t.piecePartiallyDownloaded(i),
- availability: tp.availability,
- request: request,
+ for i := range t.pieces {
+ p := &t.pieces[i]
+ rst.Pieces = append(rst.Pieces, request_strategy.Piece{
+ Request: !t.ignorePieceForRequests(i),
+ Priority: p.purePriority(),
+ Partial: t.piecePartiallyDownloaded(i),
+ Availability: p.availability,
+ Length: int64(p.length()),
+ NumPendingChunks: int(t.pieceNumPendingChunks(i)),
+ IterPendingChunks: func(f func(types.ChunkSpec)) {
+ p.iterUndirtiedChunks(func(cs ChunkSpec) bool {
+ f(cs)
+ return true
+ })
+ },
})
- if request {
- for _, p := range peers {
- if p.canRequestPiece(i) {
- p.requestablePiecesRemaining++
- }
- }
- }
}
- allPeers[t] = peers
- }
- requestOrder.sort()
- for _, p := range requestOrder.pieces {
- torrentPiece := p.t.piece(p.index)
- if left := storageLeft[p.t.storage.Capacity]; left != nil {
- if *left < int64(torrentPiece.length()) {
- continue
+ t.iterPeers(func(p *Peer) {
+ if p.closed.IsSet() {
+ return
}
- *left -= int64(torrentPiece.length())
- }
- if !p.request {
- continue
- }
- peersForPiece := make([]*peersForPieceRequests, 0, len(allPeers[p.t]))
- for _, peer := range allPeers[p.t] {
- peersForPiece = append(peersForPiece, &peersForPieceRequests{
- requestsInPiece: 0,
- requestsPeer: peer,
+ rst.Peers = append(rst.Peers, &request_strategy.Peer{
+ HasPiece: p.peerHasPiece,
+ MaxRequests: p.nominalMaxRequests,
+ HasExistingRequest: func(r request_strategy.Request) bool {
+ _, ok := p.requests[r]
+ return ok
+ },
+ Choking: p.peerChoking,
+ PieceAllowedFast: func(i pieceIndex) bool {
+ return p.peerAllowedFast.Contains(i)
+ },
+ DownloadRate: p.downloadRate(),
+ Age: time.Since(p.completedHandshake),
+ Id: unsafe.Pointer(p),
})
- }
- sortPeersForPiece := func() {
- sort.Slice(peersForPiece, func(i, j int) bool {
- return multiless.New().Int(
- peersForPiece[i].requestsInPiece,
- peersForPiece[j].requestsInPiece,
- ).Int(
- peersForPiece[i].requestablePiecesRemaining,
- peersForPiece[j].requestablePiecesRemaining,
- ).Float64(
- peersForPiece[j].cur.downloadRate(),
- peersForPiece[i].cur.downloadRate(),
- ).EagerSameLess(
- peersForPiece[i].cur.completedHandshake.Equal(peersForPiece[j].cur.completedHandshake),
- peersForPiece[i].cur.completedHandshake.Before(peersForPiece[j].cur.completedHandshake),
- // TODO: Probably peer priority can come next
- ).Uintptr(
- uintptr(unsafe.Pointer(peersForPiece[j].cur)),
- uintptr(unsafe.Pointer(peersForPiece[i].cur)),
- ).Less()
- })
- }
- pendingChunksRemaining := int(p.t.pieceNumPendingChunks(p.index))
- torrentPiece.iterUndirtiedChunks(func(chunk ChunkSpec) bool {
- req := Request{pp.Integer(p.index), chunk}
- pendingChunksRemaining--
- sortPeersForPiece()
- skipped := 0
- // Try up to the number of peers that could legitimately receive the request equal to
- // the number of chunks left. This should ensure that only the best peers serve the last
- // few chunks in a piece.
- for _, peer := range peersForPiece {
- if !peer.canFitRequest() || !peer.hasPiece(p.index) || (!peer.pieceAllowedFast(p.index) && peer.choking()) {
- continue
- }
- if skipped > pendingChunksRemaining {
- break
- }
- if !peer.hasExistingRequest(req) {
- skipped++
- continue
- }
- if !peer.pieceAllowedFast(p.index) {
- // We must stay interested for this.
- peer.nextInterest = true
- }
- peer.addNextRequest(req)
- return true
- }
- for _, peer := range peersForPiece {
- if !peer.canFitRequest() {
- continue
- }
- if !peer.hasPiece(p.index) {
- continue
- }
- if !peer.pieceAllowedFast(p.index) {
- // TODO: Verify that's okay to stay uninterested if we request allowed fast
- // pieces.
- peer.nextInterest = true
- if peer.choking() {
- continue
- }
- }
- peer.addNextRequest(req)
- return true
- }
- return true
})
- if pendingChunksRemaining != 0 {
- panic(pendingChunksRemaining)
- }
- for _, peer := range peersForPiece {
- if peer.canRequestPiece(p.index) {
- peer.requestablePiecesRemaining--
- }
- }
+ ts = append(ts, rst)
}
- for _, peers := range allPeers {
- for _, rp := range peers {
- if rp.requestablePiecesRemaining != 0 {
- panic(rp.requestablePiecesRemaining)
- }
- applyPeerNextRequests(rp)
- }
+ nextPeerStates := cl.pieceRequestOrder.DoRequests(ts)
+ for p, state := range nextPeerStates {
+ applyPeerNextRequestState(p, state)
}
}
-func applyPeerNextRequests(rp *requestsPeer) {
- p := rp.cur
- p.setInterested(rp.nextInterest)
+func applyPeerNextRequestState(_p request_strategy.PeerPointer, rp request_strategy.PeerNextRequestState) {
+ p := (*Peer)(_p)
+ p.setInterested(rp.Interested)
for req := range p.requests {
- if _, ok := rp.nextRequests[req]; !ok {
+ if _, ok := rp.Requests[req]; !ok {
p.cancel(req)
}
}
- for req := range rp.nextRequests {
+ for req := range rp.Requests {
err := p.request(req)
if err != nil {
panic(err)
} else {
- //log.Print(req)
+ log.Print(req)
}
}
}
--- /dev/null
+package request_strategy
+
+import (
+ "sort"
+
+ "github.com/anacrolix/multiless"
+ pp "github.com/anacrolix/torrent/peer_protocol"
+ "github.com/anacrolix/torrent/types"
+)
+
+type (
+ Request = types.Request
+ pieceIndex = types.PieceIndex
+ piecePriority = types.PiecePriority
+)
+
+type ClientPieceOrder struct {
+ pieces []pieceRequestOrderPiece
+}
+
+type pieceRequestOrderPiece struct {
+ t *Torrent
+ index pieceIndex
+ Piece
+}
+
+func (me *ClientPieceOrder) Len() int {
+ return len(me.pieces)
+}
+
+func (me ClientPieceOrder) sort() {
+ sort.Slice(me.pieces, me.less)
+}
+
+func (me ClientPieceOrder) less(_i, _j int) bool {
+ i := me.pieces[_i]
+ j := me.pieces[_j]
+ return multiless.New().Int(
+ int(j.Priority), int(i.Priority),
+ ).Bool(
+ j.Partial, i.Partial,
+ ).Int64(i.Availability, j.Availability).Int(i.index, j.index).Less()
+}
+
+type requestsPeer struct {
+ *Peer
+ nextState PeerNextRequestState
+ requestablePiecesRemaining int
+}
+
+func (rp *requestsPeer) canFitRequest() bool {
+ return len(rp.nextState.Requests) < rp.MaxRequests()
+}
+
+// Returns true if it is added and wasn't there before.
+func (rp *requestsPeer) addNextRequest(r Request) bool {
+ _, ok := rp.nextState.Requests[r]
+ if ok {
+ return false
+ }
+ rp.nextState.Requests[r] = struct{}{}
+ return true
+}
+
+type peersForPieceRequests struct {
+ requestsInPiece int
+ *requestsPeer
+}
+
+func (me *peersForPieceRequests) addNextRequest(r Request) {
+ if me.requestsPeer.addNextRequest(r) {
+ return
+ me.requestsInPiece++
+ }
+}
+
+type Torrent struct {
+ Pieces []Piece
+ Capacity *func() *int64
+ Peers []*Peer // not closed.
+}
+
+func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerPointer]PeerNextRequestState {
+ requestOrder.pieces = requestOrder.pieces[:0]
+ allPeers := make(map[*Torrent][]*requestsPeer)
+ // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
+ // TorrentImpl.
+ storageLeft := make(map[*func() *int64]*int64)
+ for _, t := range torrents {
+ // TODO: We could do metainfo requests here.
+ key := t.Capacity
+ if key != nil {
+ if _, ok := storageLeft[key]; !ok {
+ storageLeft[key] = (*key)()
+ }
+ }
+ var peers []*requestsPeer
+ for _, p := range t.Peers {
+ peers = append(peers, &requestsPeer{
+ Peer: p,
+ nextState: PeerNextRequestState{
+ Requests: make(map[Request]struct{}),
+ },
+ })
+ }
+ for i, tp := range t.Pieces {
+ requestOrder.pieces = append(requestOrder.pieces, pieceRequestOrderPiece{
+ t: t,
+ index: i,
+ Piece: tp,
+ })
+ if tp.Request {
+ for _, p := range peers {
+ if p.canRequestPiece(i) {
+ p.requestablePiecesRemaining++
+ }
+ }
+ }
+ }
+ allPeers[t] = peers
+ }
+ requestOrder.sort()
+ for _, p := range requestOrder.pieces {
+ torrentPiece := p
+ if left := storageLeft[p.t.Capacity]; left != nil {
+ if *left < int64(torrentPiece.Length) {
+ continue
+ }
+ *left -= int64(torrentPiece.Length)
+ }
+ if !p.Request {
+ continue
+ }
+ peersForPiece := make([]*peersForPieceRequests, 0, len(allPeers[p.t]))
+ for _, peer := range allPeers[p.t] {
+ peersForPiece = append(peersForPiece, &peersForPieceRequests{
+ requestsInPiece: 0,
+ requestsPeer: peer,
+ })
+ }
+ sortPeersForPiece := func() {
+ sort.Slice(peersForPiece, func(i, j int) bool {
+ return multiless.New().Int(
+ peersForPiece[i].requestsInPiece,
+ peersForPiece[j].requestsInPiece,
+ ).Int(
+ peersForPiece[i].requestablePiecesRemaining,
+ peersForPiece[j].requestablePiecesRemaining,
+ ).Float64(
+ peersForPiece[j].DownloadRate,
+ peersForPiece[i].DownloadRate,
+ ).Int64(
+ int64(peersForPiece[j].Age), int64(peersForPiece[i].Age),
+ // TODO: Probably peer priority can come next
+ ).Uintptr(
+ uintptr(peersForPiece[j].Id),
+ uintptr(peersForPiece[i].Id),
+ ).MustLess()
+ })
+ }
+ pendingChunksRemaining := int(p.NumPendingChunks)
+ torrentPiece.IterPendingChunks(func(chunk types.ChunkSpec) {
+ req := Request{pp.Integer(p.index), chunk}
+ pendingChunksRemaining--
+ sortPeersForPiece()
+ skipped := 0
+ // Try up to the number of peers that could legitimately receive the request equal to
+ // the number of chunks left. This should ensure that only the best peers serve the last
+ // few chunks in a piece.
+ for _, peer := range peersForPiece {
+ if !peer.canFitRequest() || !peer.HasPiece(p.index) || (!peer.PieceAllowedFast(p.index) && peer.Choking) {
+ continue
+ }
+ if skipped > pendingChunksRemaining {
+ break
+ }
+ if !peer.HasExistingRequest(req) {
+ skipped++
+ continue
+ }
+ if !peer.PieceAllowedFast(p.index) {
+ // We must stay interested for this.
+ peer.nextState.Interested = true
+ }
+ peer.addNextRequest(req)
+ return
+ }
+ for _, peer := range peersForPiece {
+ if !peer.canFitRequest() {
+ continue
+ }
+ if !peer.HasPiece(p.index) {
+ continue
+ }
+ if !peer.PieceAllowedFast(p.index) {
+ // TODO: Verify that's okay to stay uninterested if we request allowed fast
+ // pieces.
+ peer.nextState.Interested = true
+ if peer.Choking {
+ continue
+ }
+ }
+ peer.addNextRequest(req)
+ return
+ }
+ })
+ if pendingChunksRemaining != 0 {
+ panic(pendingChunksRemaining)
+ }
+ for _, peer := range peersForPiece {
+ if peer.canRequestPiece(p.index) {
+ peer.requestablePiecesRemaining--
+ }
+ }
+ }
+ ret := make(map[PeerPointer]PeerNextRequestState)
+ for _, peers := range allPeers {
+ for _, rp := range peers {
+ if rp.requestablePiecesRemaining != 0 {
+ panic(rp.requestablePiecesRemaining)
+ }
+ ret[rp.Id] = rp.nextState
+ }
+ }
+ return ret
+}
--- /dev/null
+package request_strategy
--- /dev/null
+package request_strategy
+
+import (
+ "time"
+ "unsafe"
+)
+
+type PeerNextRequestState struct {
+ Interested bool
+ Requests map[Request]struct{}
+}
+
+type PeerPointer = unsafe.Pointer
+
+type Peer struct {
+ HasPiece func(pieceIndex) bool
+ MaxRequests func() int
+ HasExistingRequest func(Request) bool
+ Choking bool
+ PieceAllowedFast func(pieceIndex) bool
+ DownloadRate float64
+ Age time.Duration
+ Id PeerPointer
+}
+
+// TODO: This might be used in more places I think.
+func (p *Peer) canRequestPiece(i pieceIndex) bool {
+ return p.HasPiece(i) && (!p.Choking || p.PieceAllowedFast(i))
+}
--- /dev/null
+package request_strategy
+
+import (
+ "github.com/anacrolix/torrent/types"
+)
+
+type Piece struct {
+ Request bool
+ Priority piecePriority
+ Partial bool
+ Availability int64
+ Length int64
+ NumPendingChunks int
+ IterPendingChunks func(func(types.ChunkSpec))
+}
--- /dev/null
+package types
+
+import (
+ pp "github.com/anacrolix/torrent/peer_protocol"
+)
+
+type PieceIndex = int
+
+type ChunkSpec struct {
+ Begin, Length pp.Integer
+}
+
+type Request struct {
+ Index pp.Integer
+ ChunkSpec
+}
+
+func (r Request) ToMsg(mt pp.MessageType) pp.Message {
+ return pp.Message{
+ Type: mt,
+ Index: r.Index,
+ Begin: r.Begin,
+ Length: r.Length,
+ }
+}
+
+// Describes the importance of obtaining a particular piece.
+type PiecePriority byte
+
+func (pp *PiecePriority) Raise(maybe PiecePriority) bool {
+ if maybe > *pp {
+ *pp = maybe
+ return true
+ }
+ return false
+}
+
+// Priority for use in PriorityBitmap
+func (me PiecePriority) BitmapPriority() int {
+ return -int(me)
+}
+
+const (
+ PiecePriorityNone PiecePriority = iota // Not wanted. Must be the zero value.
+ PiecePriorityNormal // Wanted.
+ PiecePriorityHigh // Wanted a lot.
+ PiecePriorityReadahead // May be required soon.
+ // Succeeds a piece where a read occurred. Currently the same as Now,
+ // apparently due to issues with caching.
+ PiecePriorityNext
+ PiecePriorityNow // A Reader is reading in this piece. Highest urgency.
+)