websocketTrackers websocketTrackers
activeAnnounceLimiter limiter.Instance
+
+ clientPieceRequestOrder
}
type ipStr string
},
}
+ go cl.requester()
+
return
}
webSeeds: make(map[string]*Peer),
}
t._pendingPieces.NewSet = priorityBitmapStableNewSet
- t.requestStrategy = cl.config.DefaultRequestStrategy(t.requestStrategyCallbacks(), &cl._mu)
t.logger = cl.logger.WithContextValue(t)
t.setChunkSize(defaultChunkSize)
return
// OnQuery hook func
DHTOnQuery func(query *krpc.Msg, source net.Addr) (propagate bool)
- DefaultRequestStrategy requestStrategyMaker
-
Extensions PeerExtensionBits
DisableWebtorrent bool
CryptoSelector: mse.DefaultCryptoSelector,
CryptoProvides: mse.AllSupportedCrypto,
ListenPort: 42069,
-
- DefaultRequestStrategy: RequestStrategyDuplicateRequestTimeout(5 * time.Second),
-
- Extensions: defaultPeerExtensionBytes(),
+ Extensions: defaultPeerExtensionBytes(),
}
//cc.ConnTracker.SetNoMaxEntries()
//cc.ConnTracker.Timeout = func(conntrack.Entry) time.Duration { return 0 }
drop()
String() string
connStatusString() string
+ writeBufferFull() bool
}
cn.statusFlags(),
cn.downloadRate()/(1<<10),
)
- fmt.Fprintf(w, " next pieces: %v%s\n",
- iter.ToSlice(iter.Head(10, cn.iterPendingPiecesUntyped)),
- func() string {
- if cn == t.fastestPeer {
- return " (fastest)"
- } else {
- return ""
- }
- }(),
- )
+ //fmt.Fprintf(w, " next pieces: %v%s\n",
+ // iter.ToSlice(iter.Head(10, cn.iterPendingPiecesUntyped)),
+ // func() string {
+ // if cn == t.fastestPeer {
+ // return " (fastest)"
+ // } else {
+ // return ""
+ // }
+ // }(),
+ //)
}
func (cn *Peer) close() {
cn.wroteMsg(&msg)
cn.writeBuffer.Write(msg.MustMarshalBinary())
torrent.Add(fmt.Sprintf("messages filled of type %s", msg.Type.String()), 1)
- return cn.writeBuffer.Len() < writeBufferHighWaterLen
+ cn.tickleWriter()
+ return !cn.writeBufferFull()
+}
+
+func (cn *PeerConn) writeBufferFull() bool {
+ return cn.writeBuffer.Len() >= writeBufferHighWaterLen
}
func (cn *PeerConn) requestMetadataPiece(index int) {
// The actual value to use as the maximum outbound requests.
func (cn *Peer) nominalMaxRequests() (ret int) {
- return int(clamp(
- 1,
- int64(cn.PeerMaxRequests),
- int64(cn.t.requestStrategy.nominalMaxRequests(cn.requestStrategyConnection())),
- ))
+ return cn.PeerMaxRequests
}
func (cn *Peer) totalExpectingTime() (ret time.Duration) {
// are okay.
type messageWriter func(pp.Message) bool
-func (cn *Peer) request(r Request) bool {
+func (cn *Peer) request(r Request) (more bool, err error) {
if _, ok := cn.requests[r]; ok {
- panic("chunk already requested")
+ return true, nil
}
if !cn.peerHasPiece(pieceIndex(r.Index)) {
- panic("requesting piece peer doesn't have")
+ return true, errors.New("requesting piece peer doesn't have")
}
if !cn.t.peerIsActive(cn) {
panic("requesting but not in active conns")
if cn.peerAllowedFast.Get(int(r.Index)) {
torrent.Add("allowed fast requests sent", 1)
} else {
- panic("requesting while choking and not allowed fast")
+ return cn.setInterested(true), errors.New("requesting while choked and not allowed fast")
}
}
if cn.t.hashingPiece(pieceIndex(r.Index)) {
}
cn.validReceiveChunks[r]++
cn.t.pendingRequests[r]++
- cn.t.requestStrategy.hooks().sentRequest(r)
cn.updateExpectingChunks()
for _, f := range cn.callbacks.SentRequest {
f(PeerRequestEvent{cn, r})
}
- return cn.peerImpl.request(r)
+ return cn.peerImpl.request(r), nil
}
func (me *PeerConn) request(r Request) bool {
return me.write(makeCancelMessage(r))
}
-func (cn *Peer) doRequestState() bool {
- if !cn.t.networkingEnabled || cn.t.dataDownloadDisallowed {
- if !cn.setInterested(false) {
- return false
- }
- if len(cn.requests) != 0 {
- for r := range cn.requests {
- cn.deleteRequest(r)
- // log.Printf("%p: cancelling request: %v", cn, r)
- if !cn.peerImpl.cancel(r) {
- return false
- }
- }
- }
- } else if len(cn.requests) <= cn.requestsLowWater {
- filledBuffer := false
- cn.iterPendingPieces(func(pieceIndex pieceIndex) bool {
- cn.iterPendingRequests(pieceIndex, func(r Request) bool {
- if !cn.setInterested(true) {
- filledBuffer = true
- return false
- }
- if len(cn.requests) >= cn.nominalMaxRequests() {
- return false
- }
- // Choking is looked at here because our interest is dependent
- // on whether we'd make requests in its absence.
- if cn.peerChoking {
- if !cn.peerAllowedFast.Get(bitmap.BitIndex(r.Index)) {
- return false
- }
- }
- if _, ok := cn.requests[r]; ok {
- return true
- }
- filledBuffer = !cn.request(r)
- return !filledBuffer
- })
- return !filledBuffer
- })
- if filledBuffer {
- // If we didn't completely top up the requests, we shouldn't mark
- // the low water, since we'll want to top up the requests as soon
- // as we have more write buffer space.
- return false
- }
- cn.requestsLowWater = len(cn.requests) / 2
- if len(cn.requests) == 0 {
- return cn.setInterested(false)
- }
- }
- return true
-}
-
func (cn *PeerConn) fillWriteBuffer() {
- if !cn.doRequestState() {
- return
- }
if cn.pex.IsEnabled() {
if flow := cn.pex.Share(cn.write); !flow {
return
func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func {
return func(cb iter.Callback) {
for _, bm := range bms {
- bm.Sub(*skip)
if !iter.All(
- func(i interface{}) bool {
- skip.Add(i.(int))
+ func(_i interface{}) bool {
+ i := _i.(int)
+ if skip.Contains(i) {
+ return true
+ }
+ skip.Add(i)
return cb(i)
},
bm.Iter,
}
}
-func iterUnbiasedPieceRequestOrder(cn requestStrategyConnection, f func(piece pieceIndex) bool) bool {
- now, readahead := cn.torrent().readerPiecePriorities()
- skip := bitmap.Flip(cn.peerPieces(), 0, cn.torrent().numPieces())
- skip.Union(cn.torrent().ignorePieces())
- // Return an iterator over the different priority classes, minus the skip pieces.
- return iter.All(
- func(_piece interface{}) bool {
- return f(pieceIndex(_piece.(bitmap.BitIndex)))
- },
- iterBitmapsDistinct(&skip, now, readahead),
- // We have to iterate _pendingPieces separately because it isn't a Bitmap.
- func(cb iter.Callback) {
- cn.torrent().pendingPieces().IterTyped(func(piece int) bool {
- if skip.Contains(piece) {
- return true
- }
- more := cb(piece)
- skip.Add(piece)
- return more
- })
- },
- )
-}
-
-// The connection should download highest priority pieces first, without any inclination toward
-// avoiding wastage. Generally we might do this if there's a single connection, or this is the
-// fastest connection, and we have active readers that signal an ordering preference. It's
-// conceivable that the best connection should do this, since it's least likely to waste our time if
-// assigned to the highest priority pieces, and assigning more than one this role would cause
-// significant wasted bandwidth.
-func (cn *Peer) shouldRequestWithoutBias() bool {
- return cn.t.requestStrategy.shouldRequestWithoutBias(cn.requestStrategyConnection())
-}
-
-func (cn *Peer) iterPendingPieces(f func(pieceIndex) bool) {
- if !cn.t.haveInfo() {
- return
- }
- if cn.closed.IsSet() {
- return
- }
- cn.t.requestStrategy.iterPendingPieces(cn, f)
-}
-func (cn *Peer) iterPendingPiecesUntyped(f iter.Callback) {
- cn.iterPendingPieces(func(i pieceIndex) bool { return f(i) })
-}
-
-func (cn *Peer) iterPendingRequests(piece pieceIndex, f func(Request) bool) bool {
- return cn.t.requestStrategy.iterUndirtiedChunks(
- cn.t.piece(piece).requestStrategyPiece(),
- func(cs ChunkSpec) bool {
- return f(Request{pp.Integer(piece), cs})
- },
- )
-}
-
// check callers updaterequests
func (cn *Peer) stopRequestingPiece(piece pieceIndex) bool {
return cn._pieceRequestOrder.Remove(bitmap.BitIndex(piece))
return cn.stopRequestingPiece(piece)
}
prio := cn.getPieceInclination()[piece]
- prio = cn.t.requestStrategy.piecePriority(cn, piece, tpp, prio)
- return cn._pieceRequestOrder.Set(bitmap.BitIndex(piece), prio) || cn.shouldRequestWithoutBias()
+ return cn._pieceRequestOrder.Set(bitmap.BitIndex(piece), prio)
}
func (cn *Peer) getPieceInclination() []int {
f(PeerRequestEvent{c, r})
}
c.updateExpectingChunks()
- c.t.requestStrategy.hooks().deletedRequest(r)
pr := c.t.pendingRequests
pr[r]--
n := pr[r]
return multiless.New().Bool(l.Implicit, r.Implicit).Int64(l.NetGoodPiecesDirted, r.NetGoodPiecesDirted).Less()
}
-func (cn *Peer) requestStrategyConnection() requestStrategyConnection {
- return cn
-}
-
func (cn *Peer) chunksReceivedWhileExpecting() int64 {
return cn._chunksReceivedWhileExpecting
}
return &cn._stats
}
-func (cn *Peer) torrent() requestStrategyTorrent {
- return cn.t.requestStrategyTorrent()
-}
-
func (p *Peer) TryAsPeerConn() (*PeerConn, bool) {
pc, ok := p.peerImpl.(*PeerConn)
return pc, ok
func (p *Piece) chunkIndexRequest(chunkIndex pp.Integer) Request {
return Request{
pp.Integer(p.index),
- chunkIndexSpec(chunkIndex, p.length(), p.chunkSize()),
+ p.chunkIndexSpec(chunkIndex),
}
}
return p._dirtyChunks.Len() == int(p.numChunks())
}
-func (p *Piece) requestStrategyPiece() requestStrategyPiece {
- return p
-}
-
func (p *Piece) dirtyChunks() bitmap.Bitmap {
return p._dirtyChunks
}
func (p *Piece) State() PieceState {
return p.t.PieceState(p.index)
}
+
+func (p *Piece) iterUndirtiedChunks(f func(ChunkSpec) bool) bool {
+ for i := pp.Integer(0); i < p.numChunks(); i++ {
+ if p.chunkIndexDirty(i) {
+ continue
+ }
+ if !f(p.chunkIndexSpec(i)) {
+ return false
+ }
+ }
+ return true
+}
package torrent
-
-import (
- "github.com/anacrolix/missinggo/iter"
- "github.com/anacrolix/missinggo/v2/bitmap"
- pp "github.com/anacrolix/torrent/peer_protocol"
-)
-
-// Provides default implementations for requestStrategy methods. Could be embedded, or delegated to.
-type requestStrategyDefaults struct{}
-
-func (requestStrategyDefaults) hooks() requestStrategyHooks {
- return requestStrategyHooks{
- sentRequest: func(Request) {},
- deletedRequest: func(Request) {},
- }
-}
-
-func (requestStrategyDefaults) iterUndirtiedChunks(p requestStrategyPiece, f func(ChunkSpec) bool) bool {
- chunkIndices := p.dirtyChunks().Copy()
- chunkIndices.FlipRange(0, bitmap.BitIndex(p.numChunks()))
- return iter.ForPerm(chunkIndices.Len(), func(i int) bool {
- ci, err := chunkIndices.RB.Select(uint32(i))
- if err != nil {
- panic(err)
- }
- return f(p.chunkIndexRequest(pp.Integer(ci)).ChunkSpec)
- })
-}
-
-func (requestStrategyDefaults) nominalMaxRequests(cn requestStrategyConnection) int {
- return int(
- max(
- 64,
- cn.stats().ChunksReadUseful.Int64()-
- (cn.stats().ChunksRead.Int64()-cn.stats().ChunksReadUseful.Int64())))
-}
-
-func (requestStrategyDefaults) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
- return prio
-}
-
-func (requestStrategyDefaults) shouldRequestWithoutBias(cn requestStrategyConnection) bool {
- return false
-}
package torrent
import (
- "math"
- "sync"
+ "sort"
"time"
- "github.com/anacrolix/missinggo/v2/bitmap"
- "github.com/anacrolix/missinggo/v2/prioritybitmap"
-
+ "github.com/anacrolix/log"
+ "github.com/anacrolix/multiless"
pp "github.com/anacrolix/torrent/peer_protocol"
+ "github.com/bradfitz/iter"
)
-type requestStrategyPiece interface {
- numChunks() pp.Integer
- dirtyChunks() bitmap.Bitmap
- chunkIndexRequest(i pp.Integer) Request
-}
-
-type requestStrategyTorrent interface {
- numConns() int
- numReaders() int
- numPieces() int
- readerPiecePriorities() (now, readahead bitmap.Bitmap)
- ignorePieces() bitmap.Bitmap
- pendingPieces() *prioritybitmap.PriorityBitmap
-}
-
-type requestStrategyConnection interface {
- torrent() requestStrategyTorrent
- peerPieces() bitmap.Bitmap
- pieceRequestOrder() *prioritybitmap.PriorityBitmap
- fastest() bool
- stats() *ConnStats
- totalExpectingTime() time.Duration
- peerMaxRequests() int
- chunksReceivedWhileExpecting() int64
-}
-
-type requestStrategy interface {
- iterPendingPieces(requestStrategyConnection, func(pieceIndex) bool) bool
- iterUndirtiedChunks(requestStrategyPiece, func(ChunkSpec) bool) bool
- nominalMaxRequests(requestStrategyConnection) int
- shouldRequestWithoutBias(requestStrategyConnection) bool
- piecePriority(requestStrategyConnection, pieceIndex, piecePriority, int) int
- hooks() requestStrategyHooks
-}
-
-type requestStrategyHooks struct {
- sentRequest func(Request)
- deletedRequest func(Request)
-}
-
-type requestStrategyCallbacks interface {
- requestTimedOut(Request)
-}
-
-type requestStrategyFuzzing struct {
- requestStrategyDefaults
+type clientPieceRequestOrder struct {
+ pieces []pieceRequestOrderPiece
}
-type requestStrategyFastest struct {
- requestStrategyDefaults
+type pieceRequestOrderPiece struct {
+ t *Torrent
+ index pieceIndex
+ prio piecePriority
+ partial bool
+ availability int
}
-func newRequestStrategyMaker(rs requestStrategy) requestStrategyMaker {
- return func(requestStrategyCallbacks, sync.Locker) requestStrategy {
- return rs
+func (me *clientPieceRequestOrder) addPieces(t *Torrent, numPieces pieceIndex) {
+ for i := range iter.N(numPieces) {
+ me.pieces = append(me.pieces, pieceRequestOrderPiece{
+ t: t,
+ index: i,
+ })
}
}
-// The fastest connection downloads strictly in order of priority, while all others adhere to their
-// piece inclinations.
-func RequestStrategyFastest() requestStrategyMaker {
- return newRequestStrategyMaker(requestStrategyFastest{})
-}
-
-// Favour higher priority pieces with some fuzzing to reduce overlaps and wastage across
-// connections.
-func RequestStrategyFuzzing() requestStrategyMaker {
- return newRequestStrategyMaker(requestStrategyFuzzing{})
-}
-
-func (requestStrategyFastest) shouldRequestWithoutBias(cn requestStrategyConnection) bool {
- if cn.torrent().numReaders() == 0 {
- return false
- }
- if cn.torrent().numConns() == 1 {
- return true
- }
- if cn.fastest() {
- return true
+func (me *clientPieceRequestOrder) removePieces(t *Torrent) {
+ newPieces := make([]pieceRequestOrderPiece, 0, len(me.pieces)-t.numPieces())
+ for _, p := range me.pieces {
+ if p.t != t {
+ newPieces = append(newPieces, p)
+ }
}
- return false
+ me.pieces = newPieces
}
-type requestStrategyDuplicateRequestTimeout struct {
- requestStrategyDefaults
- // How long to avoid duplicating a pending request.
- duplicateRequestTimeout time.Duration
-
- callbacks requestStrategyCallbacks
-
- // The last time we requested a chunk. Deleting the request from any connection will clear this
- // value.
- lastRequested map[Request]*time.Timer
- // The lock to take when running a request timeout handler.
- timeoutLocker sync.Locker
+func (me clientPieceRequestOrder) sort() {
+ sort.SliceStable(me.pieces, me.less)
}
-// Generates a request strategy instance for a given torrent. callbacks are probably specific to the torrent.
-type requestStrategyMaker func(callbacks requestStrategyCallbacks, clientLocker sync.Locker) requestStrategy
-
-// Requests are strictly by piece priority, and not duplicated until duplicateRequestTimeout is
-// reached.
-func RequestStrategyDuplicateRequestTimeout(duplicateRequestTimeout time.Duration) requestStrategyMaker {
- return func(callbacks requestStrategyCallbacks, clientLocker sync.Locker) requestStrategy {
- return requestStrategyDuplicateRequestTimeout{
- duplicateRequestTimeout: duplicateRequestTimeout,
- callbacks: callbacks,
- lastRequested: make(map[Request]*time.Timer),
- timeoutLocker: clientLocker,
- }
+func (me clientPieceRequestOrder) update() {
+ for i := range me.pieces {
+ p := &me.pieces[i]
+ p.prio = p.t.piece(p.index).uncachedPriority()
+ p.partial = p.t.piecePartiallyDownloaded(p.index)
+ p.availability = p.t.pieceAvailability(p.index)
}
}
-func (rs requestStrategyDuplicateRequestTimeout) hooks() requestStrategyHooks {
- return requestStrategyHooks{
- deletedRequest: func(r Request) {
- if t, ok := rs.lastRequested[r]; ok {
- t.Stop()
- delete(rs.lastRequested, r)
- }
- },
- sentRequest: rs.onSentRequest,
+func (me clientPieceRequestOrder) less(_i, _j int) bool {
+ i := me.pieces[_i]
+ j := me.pieces[_j]
+ ml := multiless.New()
+ ml.Int(int(j.prio), int(i.prio))
+ ml.Bool(j.partial, i.partial)
+ ml.Int(i.availability, j.availability)
+ return ml.Less()
+}
+
+func (cl *Client) requester() {
+ for {
+ func() {
+ cl.lock()
+ defer cl.unlock()
+ cl.doRequests()
+ }()
+ select {
+ case <-cl.closed.LockedChan(cl.locker()):
+ return
+ case <-time.After(10 * time.Millisecond):
+ }
}
}
-func (rs requestStrategyDuplicateRequestTimeout) iterUndirtiedChunks(p requestStrategyPiece, f func(ChunkSpec) bool) bool {
- for i := pp.Integer(0); i < pp.Integer(p.numChunks()); i++ {
- if p.dirtyChunks().Get(bitmap.BitIndex(i)) {
- continue
+func (cl *Client) doRequests() {
+ requestOrder := clientPieceRequestOrder{}
+ allPeers := make(map[*Torrent][]*Peer)
+ storageCapacity := make(map[*Torrent]*int64)
+ for _, t := range cl.torrents {
+ // TODO: We could do metainfo requests here.
+ if t.haveInfo() {
+ value := int64(t.usualPieceSize())
+ storageCapacity[t] = &value
+ requestOrder.addPieces(t, t.numPieces())
}
- r := p.chunkIndexRequest(i)
- if rs.wouldDuplicateRecent(r) {
+ var peers []*Peer
+ t.iterPeers(func(p *Peer) {
+ peers = append(peers, p)
+ })
+ allPeers[t] = peers
+ }
+ requestOrder.update()
+ requestOrder.sort()
+ for _, p := range requestOrder.pieces {
+ if p.t.ignorePieceForRequests(p.index) {
continue
}
- if !f(r.ChunkSpec) {
- return false
+ peers := allPeers[p.t]
+ torrentPiece := p.t.piece(p.index)
+ if left := storageCapacity[p.t]; left != nil {
+ if *left < int64(torrentPiece.length()) {
+ continue
+ }
+ *left -= int64(torrentPiece.length())
}
+ p.t.piece(p.index).iterUndirtiedChunks(func(chunk ChunkSpec) bool {
+ for _, peer := range peers {
+ req := Request{pp.Integer(p.index), chunk}
+ _, err := peer.request(req)
+ if err == nil {
+ log.Printf("requested %v", req)
+ break
+ }
+ }
+ return true
+ })
}
- return true
-}
-
-func (requestStrategyFuzzing) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
- switch tpp {
- case PiecePriorityNormal:
- case PiecePriorityReadahead:
- prio -= int(cn.torrent().numPieces())
- case PiecePriorityNext, PiecePriorityNow:
- prio -= 2 * int(cn.torrent().numPieces())
- default:
- panic(tpp)
- }
- prio += int(piece / 3)
- return prio
-}
-
-func (requestStrategyDuplicateRequestTimeout) iterPendingPieces(cn requestStrategyConnection, f func(pieceIndex) bool) bool {
- return iterUnbiasedPieceRequestOrder(cn, f)
-}
-func defaultIterPendingPieces(rs requestStrategy, cn requestStrategyConnection, f func(pieceIndex) bool) bool {
- if rs.shouldRequestWithoutBias(cn) {
- return iterUnbiasedPieceRequestOrder(cn, f)
- } else {
- return cn.pieceRequestOrder().IterTyped(func(i int) bool {
- return f(pieceIndex(i))
+ for _, t := range cl.torrents {
+ t.iterPeers(func(p *Peer) {
+ if !p.peerChoking && p.numLocalRequests() == 0 && !p.writeBufferFull() {
+ p.setInterested(false)
+ }
})
}
}
-func (rs requestStrategyFuzzing) iterPendingPieces(cn requestStrategyConnection, cb func(pieceIndex) bool) bool {
- return defaultIterPendingPieces(rs, cn, cb)
-}
-func (rs requestStrategyFastest) iterPendingPieces(cn requestStrategyConnection, cb func(pieceIndex) bool) bool {
- return defaultIterPendingPieces(rs, cn, cb)
-}
-func (rs requestStrategyDuplicateRequestTimeout) onSentRequest(r Request) {
- rs.lastRequested[r] = time.AfterFunc(rs.duplicateRequestTimeout, func() {
- rs.timeoutLocker.Lock()
- delete(rs.lastRequested, r)
- rs.timeoutLocker.Unlock()
- rs.callbacks.requestTimedOut(r)
- })
-}
-
-// The actual value to use as the maximum outbound requests.
-func (rs requestStrategyDuplicateRequestTimeout) nominalMaxRequests(cn requestStrategyConnection) (ret int) {
- expectingTime := int64(cn.totalExpectingTime())
- if expectingTime == 0 {
- expectingTime = math.MaxInt64
- } else {
- expectingTime *= 2
- }
- return int(clamp(
- 1,
- int64(cn.peerMaxRequests()),
- max(
- // It makes sense to always pipeline at least one connection, since latency must be
- // non-zero.
- 2,
- // Request only as many as we expect to receive in the duplicateRequestTimeout
- // window. We are trying to avoid having to duplicate requests.
- cn.chunksReceivedWhileExpecting()*int64(rs.duplicateRequestTimeout)/expectingTime,
- ),
- ))
-}
-func (rs requestStrategyDuplicateRequestTimeout) wouldDuplicateRecent(r Request) bool {
- // This piece has been requested on another connection, and the duplicate request timer is still
- // running.
- _, ok := rs.lastRequested[r]
- return ok
-}
+//func (requestStrategyDefaults) iterUndirtiedChunks(p requestStrategyPiece, f func(ChunkSpec) bool) bool {
+// chunkIndices := p.dirtyChunks().Copy()
+// chunkIndices.FlipRange(0, bitmap.BitIndex(p.numChunks()))
+// return iter.ForPerm(chunkIndices.Len(), func(i int) bool {
+// ci, err := chunkIndices.RB.Select(uint32(i))
+// if err != nil {
+// panic(err)
+// }
+// return f(p.chunkIndexRequest(pp.Integer(ci)).ChunkSpec)
+// })
+//}
+
+//
+//func iterUnbiasedPieceRequestOrder(
+// cn requestStrategyConnection,
+// f func(piece pieceIndex) bool,
+// pieceRequestOrder []pieceIndex,
+//) bool {
+// cn.torrent().sortPieceRequestOrder(pieceRequestOrder)
+// for _, i := range pieceRequestOrder {
+// if !cn.peerHasPiece(i) || cn.torrent().ignorePieceForRequests(i) {
+// continue
+// }
+// if !f(i) {
+// return false
+// }
+// }
+// return true
+//}
dataUploadDisallowed bool
userOnWriteChunkErr func(error)
- // Determines what chunks to request from peers.
- requestStrategy requestStrategy
-
closed missinggo.Event
infoHash metainfo.Hash
pieces []Piece
pex pexState
}
+func (t *Torrent) pieceAvailability(i pieceIndex) (count int) {
+ t.iterPeers(func(peer *Peer) {
+ if peer.peerHasPiece(i) {
+ count++
+ }
+ })
+ return
+}
+
+func (t *Torrent) sortPieceRequestOrder(sl []pieceIndex) {
+ if len(sl) != t.numPieces() {
+ panic(len(sl))
+ }
+ availability := make([]int, len(sl))
+ t.iterPeers(func(peer *Peer) {
+ for i := range availability {
+ if peer.peerHasPiece(i) {
+ availability[i]++
+ }
+ }
+ })
+}
+
func (t *Torrent) numConns() int {
return len(t.conns)
}
return t._readerReadaheadPieces
}
-func (t *Torrent) ignorePieces() bitmap.Bitmap {
- ret := t._completedPieces.Copy()
- ret.Union(t.piecesQueuedForHash)
- for i := 0; i < t.numPieces(); i++ {
- if t.piece(i).hashing {
- ret.Set(i, true)
- }
- }
- return ret
+func (t *Torrent) ignorePieceForRequests(i pieceIndex) bool {
+ return !t.wantPieceIndex(i)
}
func (t *Torrent) pendingPieces() *prioritybitmap.PriorityBitmap {
// This seems to be all the follow-up tasks after info is set, that can't fail.
func (t *Torrent) onSetInfo() {
+ t.cl.clientPieceRequestOrder.addPieces(t, t.numPieces())
t.iterPeers(func(p *Peer) {
p.onGotInfo(t.info)
})
return &t.pieces[i]
}
-func (t *Torrent) requestStrategyTorrent() requestStrategyTorrent {
- return t
-}
-
-type torrentRequestStrategyCallbacks struct {
- t *Torrent
-}
-
-func (cb torrentRequestStrategyCallbacks) requestTimedOut(r Request) {
- torrent.Add("Request timeouts", 1)
- cb.t.cl.lock()
- defer cb.t.cl.unlock()
- cb.t.iterPeers(func(cn *Peer) {
- if cn.peerHasPiece(pieceIndex(r.Index)) {
- cn.updateRequests()
- }
- })
-
-}
-
-func (t *Torrent) requestStrategyCallbacks() requestStrategyCallbacks {
- return torrentRequestStrategyCallbacks{t}
-}
-
func (t *Torrent) onWriteChunkErr(err error) {
if t.userOnWriteChunkErr != nil {
go t.userOnWriteChunkErr(err)
t.userOnWriteChunkErr = f
}
-func (t *Torrent) iterPeers(f func(*Peer)) {
+func (t *Torrent) iterPeers(f func(p *Peer)) {
for pc := range t.conns {
f(&pc.Peer)
}
var _ peerImpl = (*webseedPeer)(nil)
+func (me *webseedPeer) writeBufferFull() bool {
+ return false
+}
+
func (me *webseedPeer) connStatusString() string {
return me.client.Url
}
func (ws *webseedPeer) drop() {}
func (ws *webseedPeer) updateRequests() {
- ws.peer.doRequestState()
}
func (ws *webseedPeer) onClose() {