From 53e334d3f20db8986e9b58e455398cf9419a178f Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Fri, 24 Jan 2020 17:55:20 +1100 Subject: [PATCH] Move all the request strategy stuff into its file --- connection.go | 149 ----------------------------------------- piece.go | 7 -- request_strategy.go | 157 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 157 insertions(+), 156 deletions(-) diff --git a/connection.go b/connection.go index 1b595e7c..f5a58627 100644 --- a/connection.go +++ b/connection.go @@ -5,7 +5,6 @@ import ( "bytes" "fmt" "io" - "math" "math/rand" "net" "strconv" @@ -387,39 +386,6 @@ func (cn *connection) nominalMaxRequests() (ret int) { )) } -// The actual value to use as the maximum outbound requests. -func (rs requestStrategyDuplicateRequestTimeout) nominalMaxRequests(cn requestStrategyConnection) (ret int) { - expectingTime := int64(cn.totalExpectingTime()) - if expectingTime == 0 { - expectingTime = math.MaxInt64 - } else { - expectingTime *= 2 - } - return int(clamp( - 1, - int64(cn.peerMaxRequests()), - max( - // It makes sense to always pipeline at least one connection, since latency must be - // non-zero. - 2, - // Request only as many as we expect to receive in the duplicateRequestTimeout - // window. We are trying to avoid having to duplicate requests. - cn.chunksReceivedWhileExpecting()*int64(rs.duplicateRequestTimeout)/expectingTime, - ), - )) -} -func defaultNominalMaxRequests(cn requestStrategyConnection) int { - return int( - max(64, - cn.stats().ChunksReadUseful.Int64()-(cn.stats().ChunksRead.Int64()-cn.stats().ChunksReadUseful.Int64()))) -} -func (rs requestStrategyFuzzing) nominalMaxRequests(cn requestStrategyConnection) int { - return defaultNominalMaxRequests(cn) -} -func (rs requestStrategyFastest) nominalMaxRequests(cn requestStrategyConnection) int { - return defaultNominalMaxRequests(cn) -} - func (cn *connection) totalExpectingTime() (ret time.Duration) { ret = cn.cumulativeExpectedToReceiveChunks if !cn.lastStartedExpectingToReceiveChunks.IsZero() { @@ -543,15 +509,6 @@ func (cn *connection) request(r request, mw messageWriter) bool { }) } -func (rs requestStrategyDuplicateRequestTimeout) onSentRequest(r request) { - rs.lastRequested[r] = time.AfterFunc(rs.duplicateRequestTimeout, func() { - rs.timeoutLocker.Lock() - delete(rs.lastRequested, r) - rs.timeoutLocker.Unlock() - rs.callbacks.requestTimedOut(r) - }) -} - func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) { if !cn.t.networkingEnabled { if !cn.SetInterested(false, msg) { @@ -749,56 +706,12 @@ func (cn *connection) shouldRequestWithoutBias() bool { return cn.t.requestStrategy.shouldRequestWithoutBias(cn.requestStrategyConnection()) } -func defaultShouldRequestWithoutBias(cn requestStrategyConnection) bool { - return false -} - -func (requestStrategyFastest) shouldRequestWithoutBias(cn requestStrategyConnection) bool { - if cn.torrent().numReaders() == 0 { - return false - } - if cn.torrent().numConns() == 1 { - return true - } - if cn.fastest() { - return true - } - return false -} - -func (requestStrategyFuzzing) shouldRequestWithoutBias(cn requestStrategyConnection) bool { - return defaultShouldRequestWithoutBias(cn) -} - -func (requestStrategyDuplicateRequestTimeout) shouldRequestWithoutBias(cn requestStrategyConnection) bool { - return defaultShouldRequestWithoutBias(cn) -} - func (cn *connection) iterPendingPieces(f func(pieceIndex) bool) bool { if !cn.t.haveInfo() { return false } return cn.t.requestStrategy.iterPendingPieces(cn, f) } -func (requestStrategyDuplicateRequestTimeout) iterPendingPieces(cn requestStrategyConnection, f func(pieceIndex) bool) bool { - return iterUnbiasedPieceRequestOrder(cn, f) -} -func defaultIterPendingPieces(rs requestStrategy, cn requestStrategyConnection, f func(pieceIndex) bool) bool { - if rs.shouldRequestWithoutBias(cn) { - return iterUnbiasedPieceRequestOrder(cn, f) - } else { - return cn.pieceRequestOrder().IterTyped(func(i int) bool { - return f(pieceIndex(i)) - }) - } -} -func (rs requestStrategyFuzzing) iterPendingPieces(cn requestStrategyConnection, cb func(pieceIndex) bool) bool { - return defaultIterPendingPieces(rs, cn, cb) -} -func (rs requestStrategyFastest) iterPendingPieces(cn requestStrategyConnection, cb func(pieceIndex) bool) bool { - return defaultIterPendingPieces(rs, cn, cb) -} - func (cn *connection) iterPendingPiecesUntyped(f iter.Callback) { cn.iterPendingPieces(func(i pieceIndex) bool { return f(i) }) } @@ -812,42 +725,6 @@ func (cn *connection) iterPendingRequests(piece pieceIndex, f func(request) bool ) } -func (rs requestStrategyDuplicateRequestTimeout) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool { - for i := pp.Integer(0); i < pp.Integer(p.numChunks()); i++ { - if p.dirtyChunks().Get(bitmap.BitIndex(i)) { - continue - } - r := p.chunkIndexRequest(i) - if rs.wouldDuplicateRecent(r) { - continue - } - if !f(r.chunkSpec) { - return false - } - } - return true -} - -func defaultIterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool { - chunkIndices := p.dirtyChunks().Copy() - chunkIndices.FlipRange(0, bitmap.BitIndex(p.numChunks())) - return iter.ForPerm(chunkIndices.Len(), func(i int) bool { - ci, err := chunkIndices.RB.Select(uint32(i)) - if err != nil { - panic(err) - } - return f(p.chunkIndexRequest(pp.Integer(ci)).chunkSpec) - }) -} - -func (rs requestStrategyFuzzing) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool { - return defaultIterUndirtiedChunks(p, f) -} - -func (rs requestStrategyFastest) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool { - return defaultIterUndirtiedChunks(p, f) -} - // check callers updaterequests func (cn *connection) stopRequestingPiece(piece pieceIndex) bool { return cn._pieceRequestOrder.Remove(bitmap.BitIndex(piece)) @@ -870,32 +747,6 @@ func (cn *connection) updatePiecePriority(piece pieceIndex) bool { return cn._pieceRequestOrder.Set(bitmap.BitIndex(piece), prio) || cn.shouldRequestWithoutBias() } -func (requestStrategyFuzzing) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int { - switch tpp { - case PiecePriorityNormal: - case PiecePriorityReadahead: - prio -= int(cn.torrent().numPieces()) - case PiecePriorityNext, PiecePriorityNow: - prio -= 2 * int(cn.torrent().numPieces()) - default: - panic(tpp) - } - prio += int(piece / 3) - return prio -} - -func defaultPiecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int { - return prio -} - -func (requestStrategyFastest) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int { - return defaultPiecePriority(cn, piece, tpp, prio) -} - -func (requestStrategyDuplicateRequestTimeout) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int { - return defaultPiecePriority(cn, piece, tpp, prio) -} - func (cn *connection) getPieceInclination() []int { if cn.pieceInclination == nil { cn.pieceInclination = cn.t.getConnPieceInclination() diff --git a/piece.go b/piece.go index 80fda070..1c68d3ed 100644 --- a/piece.go +++ b/piece.go @@ -262,10 +262,3 @@ func (p *Piece) requestStrategyPiece() requestStrategyPiece { func (p *Piece) dirtyChunks() bitmap.Bitmap { return p._dirtyChunks } - -func (rs requestStrategyDuplicateRequestTimeout) wouldDuplicateRecent(r request) bool { - // This piece has been requested on another connection, and the duplicate request timer is still - // running. - _, ok := rs.lastRequested[r] - return ok -} diff --git a/request_strategy.go b/request_strategy.go index dc7538d1..fd403de9 100644 --- a/request_strategy.go +++ b/request_strategy.go @@ -1,9 +1,11 @@ package torrent import ( + "math" "sync" "time" + "github.com/anacrolix/missinggo/iter" "github.com/anacrolix/missinggo/v2/bitmap" "github.com/anacrolix/missinggo/v2/prioritybitmap" @@ -141,3 +143,158 @@ func (rs requestStrategyDuplicateRequestTimeout) hooks() requestStrategyHooks { sentRequest: rs.onSentRequest, } } + +func defaultPiecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int { + return prio +} + +func (requestStrategyFastest) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int { + return defaultPiecePriority(cn, piece, tpp, prio) +} + +func (requestStrategyDuplicateRequestTimeout) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int { + return defaultPiecePriority(cn, piece, tpp, prio) +} + +func (rs requestStrategyDuplicateRequestTimeout) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool { + for i := pp.Integer(0); i < pp.Integer(p.numChunks()); i++ { + if p.dirtyChunks().Get(bitmap.BitIndex(i)) { + continue + } + r := p.chunkIndexRequest(i) + if rs.wouldDuplicateRecent(r) { + continue + } + if !f(r.chunkSpec) { + return false + } + } + return true +} + +func defaultIterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool { + chunkIndices := p.dirtyChunks().Copy() + chunkIndices.FlipRange(0, bitmap.BitIndex(p.numChunks())) + return iter.ForPerm(chunkIndices.Len(), func(i int) bool { + ci, err := chunkIndices.RB.Select(uint32(i)) + if err != nil { + panic(err) + } + return f(p.chunkIndexRequest(pp.Integer(ci)).chunkSpec) + }) +} + +func (rs requestStrategyFuzzing) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool { + return defaultIterUndirtiedChunks(p, f) +} + +func (rs requestStrategyFastest) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool { + return defaultIterUndirtiedChunks(p, f) +} + +func (requestStrategyFuzzing) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int { + switch tpp { + case PiecePriorityNormal: + case PiecePriorityReadahead: + prio -= int(cn.torrent().numPieces()) + case PiecePriorityNext, PiecePriorityNow: + prio -= 2 * int(cn.torrent().numPieces()) + default: + panic(tpp) + } + prio += int(piece / 3) + return prio +} + +func (requestStrategyDuplicateRequestTimeout) iterPendingPieces(cn requestStrategyConnection, f func(pieceIndex) bool) bool { + return iterUnbiasedPieceRequestOrder(cn, f) +} +func defaultIterPendingPieces(rs requestStrategy, cn requestStrategyConnection, f func(pieceIndex) bool) bool { + if rs.shouldRequestWithoutBias(cn) { + return iterUnbiasedPieceRequestOrder(cn, f) + } else { + return cn.pieceRequestOrder().IterTyped(func(i int) bool { + return f(pieceIndex(i)) + }) + } +} +func (rs requestStrategyFuzzing) iterPendingPieces(cn requestStrategyConnection, cb func(pieceIndex) bool) bool { + return defaultIterPendingPieces(rs, cn, cb) +} +func (rs requestStrategyFastest) iterPendingPieces(cn requestStrategyConnection, cb func(pieceIndex) bool) bool { + return defaultIterPendingPieces(rs, cn, cb) +} + +func defaultShouldRequestWithoutBias(cn requestStrategyConnection) bool { + return false +} + +func (requestStrategyFastest) shouldRequestWithoutBias(cn requestStrategyConnection) bool { + if cn.torrent().numReaders() == 0 { + return false + } + if cn.torrent().numConns() == 1 { + return true + } + if cn.fastest() { + return true + } + return false +} + +func (requestStrategyFuzzing) shouldRequestWithoutBias(cn requestStrategyConnection) bool { + return defaultShouldRequestWithoutBias(cn) +} + +func (requestStrategyDuplicateRequestTimeout) shouldRequestWithoutBias(cn requestStrategyConnection) bool { + return defaultShouldRequestWithoutBias(cn) +} + +func (rs requestStrategyDuplicateRequestTimeout) onSentRequest(r request) { + rs.lastRequested[r] = time.AfterFunc(rs.duplicateRequestTimeout, func() { + rs.timeoutLocker.Lock() + delete(rs.lastRequested, r) + rs.timeoutLocker.Unlock() + rs.callbacks.requestTimedOut(r) + }) +} + +// The actual value to use as the maximum outbound requests. +func (rs requestStrategyDuplicateRequestTimeout) nominalMaxRequests(cn requestStrategyConnection) (ret int) { + expectingTime := int64(cn.totalExpectingTime()) + if expectingTime == 0 { + expectingTime = math.MaxInt64 + } else { + expectingTime *= 2 + } + return int(clamp( + 1, + int64(cn.peerMaxRequests()), + max( + // It makes sense to always pipeline at least one connection, since latency must be + // non-zero. + 2, + // Request only as many as we expect to receive in the duplicateRequestTimeout + // window. We are trying to avoid having to duplicate requests. + cn.chunksReceivedWhileExpecting()*int64(rs.duplicateRequestTimeout)/expectingTime, + ), + )) +} +func defaultNominalMaxRequests(cn requestStrategyConnection) int { + return int( + max(64, + cn.stats().ChunksReadUseful.Int64()-(cn.stats().ChunksRead.Int64()-cn.stats().ChunksReadUseful.Int64()))) +} +func (rs requestStrategyFuzzing) nominalMaxRequests(cn requestStrategyConnection) int { + return defaultNominalMaxRequests(cn) +} +func (rs requestStrategyFastest) nominalMaxRequests(cn requestStrategyConnection) int { + return defaultNominalMaxRequests(cn) +} + +func (rs requestStrategyDuplicateRequestTimeout) wouldDuplicateRecent(r request) bool { + // This piece has been requested on another connection, and the duplicate request timer is still + // running. + _, ok := rs.lastRequested[r] + return ok +} -- 2.44.0