]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Move all the request strategy stuff into its file
authorMatt Joiner <anacrolix@gmail.com>
Fri, 24 Jan 2020 06:55:20 +0000 (17:55 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Fri, 24 Jan 2020 06:55:20 +0000 (17:55 +1100)
connection.go
piece.go
request_strategy.go

index 1b595e7caee426394540a7a2a29bbb52f4132b41..f5a586275ff158edb6eb78a07c8e96ad99f192c7 100644 (file)
@@ -5,7 +5,6 @@ import (
        "bytes"
        "fmt"
        "io"
-       "math"
        "math/rand"
        "net"
        "strconv"
@@ -387,39 +386,6 @@ func (cn *connection) nominalMaxRequests() (ret int) {
        ))
 }
 
-// The actual value to use as the maximum outbound requests.
-func (rs requestStrategyDuplicateRequestTimeout) nominalMaxRequests(cn requestStrategyConnection) (ret int) {
-       expectingTime := int64(cn.totalExpectingTime())
-       if expectingTime == 0 {
-               expectingTime = math.MaxInt64
-       } else {
-               expectingTime *= 2
-       }
-       return int(clamp(
-               1,
-               int64(cn.peerMaxRequests()),
-               max(
-                       // It makes sense to always pipeline at least one connection, since latency must be
-                       // non-zero.
-                       2,
-                       // Request only as many as we expect to receive in the duplicateRequestTimeout
-                       // window. We are trying to avoid having to duplicate requests.
-                       cn.chunksReceivedWhileExpecting()*int64(rs.duplicateRequestTimeout)/expectingTime,
-               ),
-       ))
-}
-func defaultNominalMaxRequests(cn requestStrategyConnection) int {
-       return int(
-               max(64,
-                       cn.stats().ChunksReadUseful.Int64()-(cn.stats().ChunksRead.Int64()-cn.stats().ChunksReadUseful.Int64())))
-}
-func (rs requestStrategyFuzzing) nominalMaxRequests(cn requestStrategyConnection) int {
-       return defaultNominalMaxRequests(cn)
-}
-func (rs requestStrategyFastest) nominalMaxRequests(cn requestStrategyConnection) int {
-       return defaultNominalMaxRequests(cn)
-}
-
 func (cn *connection) totalExpectingTime() (ret time.Duration) {
        ret = cn.cumulativeExpectedToReceiveChunks
        if !cn.lastStartedExpectingToReceiveChunks.IsZero() {
@@ -543,15 +509,6 @@ func (cn *connection) request(r request, mw messageWriter) bool {
        })
 }
 
-func (rs requestStrategyDuplicateRequestTimeout) onSentRequest(r request) {
-       rs.lastRequested[r] = time.AfterFunc(rs.duplicateRequestTimeout, func() {
-               rs.timeoutLocker.Lock()
-               delete(rs.lastRequested, r)
-               rs.timeoutLocker.Unlock()
-               rs.callbacks.requestTimedOut(r)
-       })
-}
-
 func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
        if !cn.t.networkingEnabled {
                if !cn.SetInterested(false, msg) {
@@ -749,56 +706,12 @@ func (cn *connection) shouldRequestWithoutBias() bool {
        return cn.t.requestStrategy.shouldRequestWithoutBias(cn.requestStrategyConnection())
 }
 
-func defaultShouldRequestWithoutBias(cn requestStrategyConnection) bool {
-       return false
-}
-
-func (requestStrategyFastest) shouldRequestWithoutBias(cn requestStrategyConnection) bool {
-       if cn.torrent().numReaders() == 0 {
-               return false
-       }
-       if cn.torrent().numConns() == 1 {
-               return true
-       }
-       if cn.fastest() {
-               return true
-       }
-       return false
-}
-
-func (requestStrategyFuzzing) shouldRequestWithoutBias(cn requestStrategyConnection) bool {
-       return defaultShouldRequestWithoutBias(cn)
-}
-
-func (requestStrategyDuplicateRequestTimeout) shouldRequestWithoutBias(cn requestStrategyConnection) bool {
-       return defaultShouldRequestWithoutBias(cn)
-}
-
 func (cn *connection) iterPendingPieces(f func(pieceIndex) bool) bool {
        if !cn.t.haveInfo() {
                return false
        }
        return cn.t.requestStrategy.iterPendingPieces(cn, f)
 }
-func (requestStrategyDuplicateRequestTimeout) iterPendingPieces(cn requestStrategyConnection, f func(pieceIndex) bool) bool {
-       return iterUnbiasedPieceRequestOrder(cn, f)
-}
-func defaultIterPendingPieces(rs requestStrategy, cn requestStrategyConnection, f func(pieceIndex) bool) bool {
-       if rs.shouldRequestWithoutBias(cn) {
-               return iterUnbiasedPieceRequestOrder(cn, f)
-       } else {
-               return cn.pieceRequestOrder().IterTyped(func(i int) bool {
-                       return f(pieceIndex(i))
-               })
-       }
-}
-func (rs requestStrategyFuzzing) iterPendingPieces(cn requestStrategyConnection, cb func(pieceIndex) bool) bool {
-       return defaultIterPendingPieces(rs, cn, cb)
-}
-func (rs requestStrategyFastest) iterPendingPieces(cn requestStrategyConnection, cb func(pieceIndex) bool) bool {
-       return defaultIterPendingPieces(rs, cn, cb)
-}
-
 func (cn *connection) iterPendingPiecesUntyped(f iter.Callback) {
        cn.iterPendingPieces(func(i pieceIndex) bool { return f(i) })
 }
@@ -812,42 +725,6 @@ func (cn *connection) iterPendingRequests(piece pieceIndex, f func(request) bool
        )
 }
 
-func (rs requestStrategyDuplicateRequestTimeout) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
-       for i := pp.Integer(0); i < pp.Integer(p.numChunks()); i++ {
-               if p.dirtyChunks().Get(bitmap.BitIndex(i)) {
-                       continue
-               }
-               r := p.chunkIndexRequest(i)
-               if rs.wouldDuplicateRecent(r) {
-                       continue
-               }
-               if !f(r.chunkSpec) {
-                       return false
-               }
-       }
-       return true
-}
-
-func defaultIterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
-       chunkIndices := p.dirtyChunks().Copy()
-       chunkIndices.FlipRange(0, bitmap.BitIndex(p.numChunks()))
-       return iter.ForPerm(chunkIndices.Len(), func(i int) bool {
-               ci, err := chunkIndices.RB.Select(uint32(i))
-               if err != nil {
-                       panic(err)
-               }
-               return f(p.chunkIndexRequest(pp.Integer(ci)).chunkSpec)
-       })
-}
-
-func (rs requestStrategyFuzzing) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
-       return defaultIterUndirtiedChunks(p, f)
-}
-
-func (rs requestStrategyFastest) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
-       return defaultIterUndirtiedChunks(p, f)
-}
-
 // check callers updaterequests
 func (cn *connection) stopRequestingPiece(piece pieceIndex) bool {
        return cn._pieceRequestOrder.Remove(bitmap.BitIndex(piece))
@@ -870,32 +747,6 @@ func (cn *connection) updatePiecePriority(piece pieceIndex) bool {
        return cn._pieceRequestOrder.Set(bitmap.BitIndex(piece), prio) || cn.shouldRequestWithoutBias()
 }
 
-func (requestStrategyFuzzing) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
-       switch tpp {
-       case PiecePriorityNormal:
-       case PiecePriorityReadahead:
-               prio -= int(cn.torrent().numPieces())
-       case PiecePriorityNext, PiecePriorityNow:
-               prio -= 2 * int(cn.torrent().numPieces())
-       default:
-               panic(tpp)
-       }
-       prio += int(piece / 3)
-       return prio
-}
-
-func defaultPiecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
-       return prio
-}
-
-func (requestStrategyFastest) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
-       return defaultPiecePriority(cn, piece, tpp, prio)
-}
-
-func (requestStrategyDuplicateRequestTimeout) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
-       return defaultPiecePriority(cn, piece, tpp, prio)
-}
-
 func (cn *connection) getPieceInclination() []int {
        if cn.pieceInclination == nil {
                cn.pieceInclination = cn.t.getConnPieceInclination()
index 80fda0706167ccb0bd7e8e48efba055674b0580c..1c68d3edb5533c1c670a97a174434aeaa5eaefb8 100644 (file)
--- a/piece.go
+++ b/piece.go
@@ -262,10 +262,3 @@ func (p *Piece) requestStrategyPiece() requestStrategyPiece {
 func (p *Piece) dirtyChunks() bitmap.Bitmap {
        return p._dirtyChunks
 }
-
-func (rs requestStrategyDuplicateRequestTimeout) wouldDuplicateRecent(r request) bool {
-       // This piece has been requested on another connection, and the duplicate request timer is still
-       // running.
-       _, ok := rs.lastRequested[r]
-       return ok
-}
index dc7538d1d32c39a888b4566cf27555c70666a835..fd403de9c762dc224770ad556ddb60be62f1676b 100644 (file)
@@ -1,9 +1,11 @@
 package torrent
 
 import (
+       "math"
        "sync"
        "time"
 
+       "github.com/anacrolix/missinggo/iter"
        "github.com/anacrolix/missinggo/v2/bitmap"
        "github.com/anacrolix/missinggo/v2/prioritybitmap"
 
@@ -141,3 +143,158 @@ func (rs requestStrategyDuplicateRequestTimeout) hooks() requestStrategyHooks {
                sentRequest: rs.onSentRequest,
        }
 }
+
+func defaultPiecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
+       return prio
+}
+
+func (requestStrategyFastest) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
+       return defaultPiecePriority(cn, piece, tpp, prio)
+}
+
+func (requestStrategyDuplicateRequestTimeout) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
+       return defaultPiecePriority(cn, piece, tpp, prio)
+}
+
+func (rs requestStrategyDuplicateRequestTimeout) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
+       for i := pp.Integer(0); i < pp.Integer(p.numChunks()); i++ {
+               if p.dirtyChunks().Get(bitmap.BitIndex(i)) {
+                       continue
+               }
+               r := p.chunkIndexRequest(i)
+               if rs.wouldDuplicateRecent(r) {
+                       continue
+               }
+               if !f(r.chunkSpec) {
+                       return false
+               }
+       }
+       return true
+}
+
+func defaultIterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
+       chunkIndices := p.dirtyChunks().Copy()
+       chunkIndices.FlipRange(0, bitmap.BitIndex(p.numChunks()))
+       return iter.ForPerm(chunkIndices.Len(), func(i int) bool {
+               ci, err := chunkIndices.RB.Select(uint32(i))
+               if err != nil {
+                       panic(err)
+               }
+               return f(p.chunkIndexRequest(pp.Integer(ci)).chunkSpec)
+       })
+}
+
+func (rs requestStrategyFuzzing) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
+       return defaultIterUndirtiedChunks(p, f)
+}
+
+func (rs requestStrategyFastest) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
+       return defaultIterUndirtiedChunks(p, f)
+}
+
+func (requestStrategyFuzzing) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
+       switch tpp {
+       case PiecePriorityNormal:
+       case PiecePriorityReadahead:
+               prio -= int(cn.torrent().numPieces())
+       case PiecePriorityNext, PiecePriorityNow:
+               prio -= 2 * int(cn.torrent().numPieces())
+       default:
+               panic(tpp)
+       }
+       prio += int(piece / 3)
+       return prio
+}
+
+func (requestStrategyDuplicateRequestTimeout) iterPendingPieces(cn requestStrategyConnection, f func(pieceIndex) bool) bool {
+       return iterUnbiasedPieceRequestOrder(cn, f)
+}
+func defaultIterPendingPieces(rs requestStrategy, cn requestStrategyConnection, f func(pieceIndex) bool) bool {
+       if rs.shouldRequestWithoutBias(cn) {
+               return iterUnbiasedPieceRequestOrder(cn, f)
+       } else {
+               return cn.pieceRequestOrder().IterTyped(func(i int) bool {
+                       return f(pieceIndex(i))
+               })
+       }
+}
+func (rs requestStrategyFuzzing) iterPendingPieces(cn requestStrategyConnection, cb func(pieceIndex) bool) bool {
+       return defaultIterPendingPieces(rs, cn, cb)
+}
+func (rs requestStrategyFastest) iterPendingPieces(cn requestStrategyConnection, cb func(pieceIndex) bool) bool {
+       return defaultIterPendingPieces(rs, cn, cb)
+}
+
+func defaultShouldRequestWithoutBias(cn requestStrategyConnection) bool {
+       return false
+}
+
+func (requestStrategyFastest) shouldRequestWithoutBias(cn requestStrategyConnection) bool {
+       if cn.torrent().numReaders() == 0 {
+               return false
+       }
+       if cn.torrent().numConns() == 1 {
+               return true
+       }
+       if cn.fastest() {
+               return true
+       }
+       return false
+}
+
+func (requestStrategyFuzzing) shouldRequestWithoutBias(cn requestStrategyConnection) bool {
+       return defaultShouldRequestWithoutBias(cn)
+}
+
+func (requestStrategyDuplicateRequestTimeout) shouldRequestWithoutBias(cn requestStrategyConnection) bool {
+       return defaultShouldRequestWithoutBias(cn)
+}
+
+func (rs requestStrategyDuplicateRequestTimeout) onSentRequest(r request) {
+       rs.lastRequested[r] = time.AfterFunc(rs.duplicateRequestTimeout, func() {
+               rs.timeoutLocker.Lock()
+               delete(rs.lastRequested, r)
+               rs.timeoutLocker.Unlock()
+               rs.callbacks.requestTimedOut(r)
+       })
+}
+
+// The actual value to use as the maximum outbound requests.
+func (rs requestStrategyDuplicateRequestTimeout) nominalMaxRequests(cn requestStrategyConnection) (ret int) {
+       expectingTime := int64(cn.totalExpectingTime())
+       if expectingTime == 0 {
+               expectingTime = math.MaxInt64
+       } else {
+               expectingTime *= 2
+       }
+       return int(clamp(
+               1,
+               int64(cn.peerMaxRequests()),
+               max(
+                       // It makes sense to always pipeline at least one connection, since latency must be
+                       // non-zero.
+                       2,
+                       // Request only as many as we expect to receive in the duplicateRequestTimeout
+                       // window. We are trying to avoid having to duplicate requests.
+                       cn.chunksReceivedWhileExpecting()*int64(rs.duplicateRequestTimeout)/expectingTime,
+               ),
+       ))
+}
+func defaultNominalMaxRequests(cn requestStrategyConnection) int {
+       return int(
+               max(64,
+                       cn.stats().ChunksReadUseful.Int64()-(cn.stats().ChunksRead.Int64()-cn.stats().ChunksReadUseful.Int64())))
+}
+func (rs requestStrategyFuzzing) nominalMaxRequests(cn requestStrategyConnection) int {
+       return defaultNominalMaxRequests(cn)
+}
+func (rs requestStrategyFastest) nominalMaxRequests(cn requestStrategyConnection) int {
+       return defaultNominalMaxRequests(cn)
+}
+
+func (rs requestStrategyDuplicateRequestTimeout) wouldDuplicateRecent(r request) bool {
+       // This piece has been requested on another connection, and the duplicate request timer is still
+       // running.
+       _, ok := rs.lastRequested[r]
+       return ok
+}