"bytes"
"fmt"
"io"
- "math"
"math/rand"
"net"
"strconv"
))
}
-// The actual value to use as the maximum outbound requests.
-func (rs requestStrategyDuplicateRequestTimeout) nominalMaxRequests(cn requestStrategyConnection) (ret int) {
- expectingTime := int64(cn.totalExpectingTime())
- if expectingTime == 0 {
- expectingTime = math.MaxInt64
- } else {
- expectingTime *= 2
- }
- return int(clamp(
- 1,
- int64(cn.peerMaxRequests()),
- max(
- // It makes sense to always pipeline at least one connection, since latency must be
- // non-zero.
- 2,
- // Request only as many as we expect to receive in the duplicateRequestTimeout
- // window. We are trying to avoid having to duplicate requests.
- cn.chunksReceivedWhileExpecting()*int64(rs.duplicateRequestTimeout)/expectingTime,
- ),
- ))
-}
-func defaultNominalMaxRequests(cn requestStrategyConnection) int {
- return int(
- max(64,
- cn.stats().ChunksReadUseful.Int64()-(cn.stats().ChunksRead.Int64()-cn.stats().ChunksReadUseful.Int64())))
-}
-func (rs requestStrategyFuzzing) nominalMaxRequests(cn requestStrategyConnection) int {
- return defaultNominalMaxRequests(cn)
-}
-func (rs requestStrategyFastest) nominalMaxRequests(cn requestStrategyConnection) int {
- return defaultNominalMaxRequests(cn)
-}
-
func (cn *connection) totalExpectingTime() (ret time.Duration) {
ret = cn.cumulativeExpectedToReceiveChunks
if !cn.lastStartedExpectingToReceiveChunks.IsZero() {
})
}
-func (rs requestStrategyDuplicateRequestTimeout) onSentRequest(r request) {
- rs.lastRequested[r] = time.AfterFunc(rs.duplicateRequestTimeout, func() {
- rs.timeoutLocker.Lock()
- delete(rs.lastRequested, r)
- rs.timeoutLocker.Unlock()
- rs.callbacks.requestTimedOut(r)
- })
-}
-
func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
if !cn.t.networkingEnabled {
if !cn.SetInterested(false, msg) {
return cn.t.requestStrategy.shouldRequestWithoutBias(cn.requestStrategyConnection())
}
-func defaultShouldRequestWithoutBias(cn requestStrategyConnection) bool {
- return false
-}
-
-func (requestStrategyFastest) shouldRequestWithoutBias(cn requestStrategyConnection) bool {
- if cn.torrent().numReaders() == 0 {
- return false
- }
- if cn.torrent().numConns() == 1 {
- return true
- }
- if cn.fastest() {
- return true
- }
- return false
-}
-
-func (requestStrategyFuzzing) shouldRequestWithoutBias(cn requestStrategyConnection) bool {
- return defaultShouldRequestWithoutBias(cn)
-}
-
-func (requestStrategyDuplicateRequestTimeout) shouldRequestWithoutBias(cn requestStrategyConnection) bool {
- return defaultShouldRequestWithoutBias(cn)
-}
-
func (cn *connection) iterPendingPieces(f func(pieceIndex) bool) bool {
if !cn.t.haveInfo() {
return false
}
return cn.t.requestStrategy.iterPendingPieces(cn, f)
}
-func (requestStrategyDuplicateRequestTimeout) iterPendingPieces(cn requestStrategyConnection, f func(pieceIndex) bool) bool {
- return iterUnbiasedPieceRequestOrder(cn, f)
-}
-func defaultIterPendingPieces(rs requestStrategy, cn requestStrategyConnection, f func(pieceIndex) bool) bool {
- if rs.shouldRequestWithoutBias(cn) {
- return iterUnbiasedPieceRequestOrder(cn, f)
- } else {
- return cn.pieceRequestOrder().IterTyped(func(i int) bool {
- return f(pieceIndex(i))
- })
- }
-}
-func (rs requestStrategyFuzzing) iterPendingPieces(cn requestStrategyConnection, cb func(pieceIndex) bool) bool {
- return defaultIterPendingPieces(rs, cn, cb)
-}
-func (rs requestStrategyFastest) iterPendingPieces(cn requestStrategyConnection, cb func(pieceIndex) bool) bool {
- return defaultIterPendingPieces(rs, cn, cb)
-}
-
func (cn *connection) iterPendingPiecesUntyped(f iter.Callback) {
cn.iterPendingPieces(func(i pieceIndex) bool { return f(i) })
}
)
}
-func (rs requestStrategyDuplicateRequestTimeout) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
- for i := pp.Integer(0); i < pp.Integer(p.numChunks()); i++ {
- if p.dirtyChunks().Get(bitmap.BitIndex(i)) {
- continue
- }
- r := p.chunkIndexRequest(i)
- if rs.wouldDuplicateRecent(r) {
- continue
- }
- if !f(r.chunkSpec) {
- return false
- }
- }
- return true
-}
-
-func defaultIterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
- chunkIndices := p.dirtyChunks().Copy()
- chunkIndices.FlipRange(0, bitmap.BitIndex(p.numChunks()))
- return iter.ForPerm(chunkIndices.Len(), func(i int) bool {
- ci, err := chunkIndices.RB.Select(uint32(i))
- if err != nil {
- panic(err)
- }
- return f(p.chunkIndexRequest(pp.Integer(ci)).chunkSpec)
- })
-}
-
-func (rs requestStrategyFuzzing) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
- return defaultIterUndirtiedChunks(p, f)
-}
-
-func (rs requestStrategyFastest) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
- return defaultIterUndirtiedChunks(p, f)
-}
-
// check callers updaterequests
func (cn *connection) stopRequestingPiece(piece pieceIndex) bool {
return cn._pieceRequestOrder.Remove(bitmap.BitIndex(piece))
return cn._pieceRequestOrder.Set(bitmap.BitIndex(piece), prio) || cn.shouldRequestWithoutBias()
}
-func (requestStrategyFuzzing) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
- switch tpp {
- case PiecePriorityNormal:
- case PiecePriorityReadahead:
- prio -= int(cn.torrent().numPieces())
- case PiecePriorityNext, PiecePriorityNow:
- prio -= 2 * int(cn.torrent().numPieces())
- default:
- panic(tpp)
- }
- prio += int(piece / 3)
- return prio
-}
-
-func defaultPiecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
- return prio
-}
-
-func (requestStrategyFastest) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
- return defaultPiecePriority(cn, piece, tpp, prio)
-}
-
-func (requestStrategyDuplicateRequestTimeout) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
- return defaultPiecePriority(cn, piece, tpp, prio)
-}
-
func (cn *connection) getPieceInclination() []int {
if cn.pieceInclination == nil {
cn.pieceInclination = cn.t.getConnPieceInclination()
package torrent
import (
+ "math"
"sync"
"time"
+ "github.com/anacrolix/missinggo/iter"
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/missinggo/v2/prioritybitmap"
sentRequest: rs.onSentRequest,
}
}
+
+func defaultPiecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
+ return prio
+}
+
+func (requestStrategyFastest) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
+ return defaultPiecePriority(cn, piece, tpp, prio)
+}
+
+func (requestStrategyDuplicateRequestTimeout) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
+ return defaultPiecePriority(cn, piece, tpp, prio)
+}
+
+func (rs requestStrategyDuplicateRequestTimeout) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
+ for i := pp.Integer(0); i < pp.Integer(p.numChunks()); i++ {
+ if p.dirtyChunks().Get(bitmap.BitIndex(i)) {
+ continue
+ }
+ r := p.chunkIndexRequest(i)
+ if rs.wouldDuplicateRecent(r) {
+ continue
+ }
+ if !f(r.chunkSpec) {
+ return false
+ }
+ }
+ return true
+}
+
+func defaultIterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
+ chunkIndices := p.dirtyChunks().Copy()
+ chunkIndices.FlipRange(0, bitmap.BitIndex(p.numChunks()))
+ return iter.ForPerm(chunkIndices.Len(), func(i int) bool {
+ ci, err := chunkIndices.RB.Select(uint32(i))
+ if err != nil {
+ panic(err)
+ }
+ return f(p.chunkIndexRequest(pp.Integer(ci)).chunkSpec)
+ })
+}
+
+func (rs requestStrategyFuzzing) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
+ return defaultIterUndirtiedChunks(p, f)
+}
+
+func (rs requestStrategyFastest) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
+ return defaultIterUndirtiedChunks(p, f)
+}
+
+func (requestStrategyFuzzing) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
+ switch tpp {
+ case PiecePriorityNormal:
+ case PiecePriorityReadahead:
+ prio -= int(cn.torrent().numPieces())
+ case PiecePriorityNext, PiecePriorityNow:
+ prio -= 2 * int(cn.torrent().numPieces())
+ default:
+ panic(tpp)
+ }
+ prio += int(piece / 3)
+ return prio
+}
+
+func (requestStrategyDuplicateRequestTimeout) iterPendingPieces(cn requestStrategyConnection, f func(pieceIndex) bool) bool {
+ return iterUnbiasedPieceRequestOrder(cn, f)
+}
+func defaultIterPendingPieces(rs requestStrategy, cn requestStrategyConnection, f func(pieceIndex) bool) bool {
+ if rs.shouldRequestWithoutBias(cn) {
+ return iterUnbiasedPieceRequestOrder(cn, f)
+ } else {
+ return cn.pieceRequestOrder().IterTyped(func(i int) bool {
+ return f(pieceIndex(i))
+ })
+ }
+}
+func (rs requestStrategyFuzzing) iterPendingPieces(cn requestStrategyConnection, cb func(pieceIndex) bool) bool {
+ return defaultIterPendingPieces(rs, cn, cb)
+}
+func (rs requestStrategyFastest) iterPendingPieces(cn requestStrategyConnection, cb func(pieceIndex) bool) bool {
+ return defaultIterPendingPieces(rs, cn, cb)
+}
+
+func defaultShouldRequestWithoutBias(cn requestStrategyConnection) bool {
+ return false
+}
+
+func (requestStrategyFastest) shouldRequestWithoutBias(cn requestStrategyConnection) bool {
+ if cn.torrent().numReaders() == 0 {
+ return false
+ }
+ if cn.torrent().numConns() == 1 {
+ return true
+ }
+ if cn.fastest() {
+ return true
+ }
+ return false
+}
+
+func (requestStrategyFuzzing) shouldRequestWithoutBias(cn requestStrategyConnection) bool {
+ return defaultShouldRequestWithoutBias(cn)
+}
+
+func (requestStrategyDuplicateRequestTimeout) shouldRequestWithoutBias(cn requestStrategyConnection) bool {
+ return defaultShouldRequestWithoutBias(cn)
+}
+
+func (rs requestStrategyDuplicateRequestTimeout) onSentRequest(r request) {
+ rs.lastRequested[r] = time.AfterFunc(rs.duplicateRequestTimeout, func() {
+ rs.timeoutLocker.Lock()
+ delete(rs.lastRequested, r)
+ rs.timeoutLocker.Unlock()
+ rs.callbacks.requestTimedOut(r)
+ })
+}
+
+// The actual value to use as the maximum outbound requests.
+func (rs requestStrategyDuplicateRequestTimeout) nominalMaxRequests(cn requestStrategyConnection) (ret int) {
+ expectingTime := int64(cn.totalExpectingTime())
+ if expectingTime == 0 {
+ expectingTime = math.MaxInt64
+ } else {
+ expectingTime *= 2
+ }
+ return int(clamp(
+ 1,
+ int64(cn.peerMaxRequests()),
+ max(
+ // It makes sense to always pipeline at least one connection, since latency must be
+ // non-zero.
+ 2,
+ // Request only as many as we expect to receive in the duplicateRequestTimeout
+ // window. We are trying to avoid having to duplicate requests.
+ cn.chunksReceivedWhileExpecting()*int64(rs.duplicateRequestTimeout)/expectingTime,
+ ),
+ ))
+}
+func defaultNominalMaxRequests(cn requestStrategyConnection) int {
+ return int(
+ max(64,
+ cn.stats().ChunksReadUseful.Int64()-(cn.stats().ChunksRead.Int64()-cn.stats().ChunksReadUseful.Int64())))
+}
+func (rs requestStrategyFuzzing) nominalMaxRequests(cn requestStrategyConnection) int {
+ return defaultNominalMaxRequests(cn)
+}
+func (rs requestStrategyFastest) nominalMaxRequests(cn requestStrategyConnection) int {
+ return defaultNominalMaxRequests(cn)
+}
+
+func (rs requestStrategyDuplicateRequestTimeout) wouldDuplicateRecent(r request) bool {
+ // This piece has been requested on another connection, and the duplicate request timer is still
+ // running.
+ _, ok := rs.lastRequested[r]
+ return ok
+}