From: Matt Joiner Date: Fri, 10 Jan 2020 04:09:21 +0000 (+1100) Subject: Extract the request strategy logic X-Git-Tag: v1.12.0~12 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=4c989da21e0fbd04972b28a82a2aae06afb61fd1;p=btrtrc.git Extract the request strategy logic Bit messy. Now it'll be easier to clean-up what it depends on, and test. --- diff --git a/client.go b/client.go index 260219cb..0dab418c 100644 --- a/client.go +++ b/client.go @@ -1039,11 +1039,11 @@ func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) ( maxEstablishedConns: cl.config.EstablishedConnsPerTorrent, networkingEnabled: true, - requestStrategy: 2, + requestStrategy: requestStrategyOne{}, metadataChanged: sync.Cond{ L: cl.locker(), }, - duplicateRequestTimeout: 1 * time.Second, + _duplicateRequestTimeout: 1 * time.Second, } t.logger = cl.logger.WithValues(t).WithText(func(m log.Msg) string { return fmt.Sprintf("%v: %s", t, m.Text()) diff --git a/connection.go b/connection.go index e882fec3..24c15b92 100644 --- a/connection.go +++ b/connection.go @@ -16,9 +16,9 @@ import ( "github.com/anacrolix/dht/v2" "github.com/anacrolix/log" "github.com/anacrolix/missinggo" - "github.com/anacrolix/missinggo/bitmap" "github.com/anacrolix/missinggo/iter" - "github.com/anacrolix/missinggo/prioritybitmap" + "github.com/anacrolix/missinggo/v2/bitmap" + "github.com/anacrolix/missinggo/v2/prioritybitmap" "github.com/anacrolix/multiless" "github.com/pkg/errors" @@ -40,7 +40,7 @@ const ( // Maintains the state of a connection with a peer. type connection struct { // First to ensure 64-bit alignment for atomics. See #262. - stats ConnStats + _stats ConnStats t *Torrent // The actual Conn, used for closing, and setting socket options. @@ -74,7 +74,7 @@ type connection struct { lastStartedExpectingToReceiveChunks time.Time cumulativeExpectedToReceiveChunks time.Duration - chunksReceivedWhileExpecting int64 + _chunksReceivedWhileExpecting int64 Choked bool requests map[request]struct{} @@ -95,7 +95,7 @@ type connection struct { PeerRequests map[request]struct{} PeerExtensionBytes pp.PeerExtensionBits // The pieces the peer has claimed to have. - peerPieces bitmap.Bitmap + _peerPieces bitmap.Bitmap // The peer has everything. This can occur due to a special message, when // we may not even know the number of pieces in the torrent yet. peerSentHaveAll bool @@ -111,8 +111,8 @@ type connection struct { PeerExtensionIDs map[pp.ExtensionName]pp.ExtensionNumber PeerClientName string - pieceInclination []int - pieceRequestOrder prioritybitmap.PriorityBitmap + pieceInclination []int + _pieceRequestOrder prioritybitmap.PriorityBitmap writeBuffer *bytes.Buffer uploadTimer *time.Timer @@ -179,7 +179,7 @@ func (cn *connection) peerHasAllPieces() (all bool, known bool) { if !cn.t.haveInfo() { return false, false } - return bitmap.Flip(cn.peerPieces, 0, bitmap.BitIndex(cn.t.numPieces())).IsEmpty(), true + return bitmap.Flip(cn._peerPieces, 0, bitmap.BitIndex(cn.t.numPieces())).IsEmpty(), true } func (cn *connection) mu() sync.Locker { @@ -204,7 +204,7 @@ func (cn *connection) bestPeerNumPieces() pieceIndex { } func (cn *connection) completedString() string { - have := pieceIndex(cn.peerPieces.Len()) + have := pieceIndex(cn._peerPieces.Len()) if cn.peerSentHaveAll { have = cn.bestPeerNumPieces() } @@ -215,7 +215,7 @@ func (cn *connection) completedString() string { // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE // messages. func (cn *connection) setNumPieces(num pieceIndex) error { - cn.peerPieces.RemoveRange(bitmap.BitIndex(num), bitmap.ToEnd) + cn._peerPieces.RemoveRange(bitmap.BitIndex(num), bitmap.ToEnd) cn.peerPiecesChanged() return nil } @@ -277,7 +277,7 @@ func (cn *connection) statusFlags() (ret string) { // } func (cn *connection) downloadRate() float64 { - return float64(cn.stats.BytesReadUsefulData.Int64()) / cn.cumInterest().Seconds() + return float64(cn._stats.BytesReadUsefulData.Int64()) / cn.cumInterest().Seconds() } func (cn *connection) WriteStatus(w io.Writer, t *Torrent) { @@ -294,9 +294,9 @@ func (cn *connection) WriteStatus(w io.Writer, t *Torrent) { " %s completed, %d pieces touched, good chunks: %v/%v-%v reqq: (%d,%d,%d]-%d, flags: %s, dr: %.1f KiB/s\n", cn.completedString(), len(cn.peerTouchedPieces), - &cn.stats.ChunksReadUseful, - &cn.stats.ChunksRead, - &cn.stats.ChunksWritten, + &cn._stats.ChunksReadUseful, + &cn._stats.ChunksRead, + &cn._stats.ChunksWritten, cn.requestsLowWater, cn.numLocalRequests(), cn.nominalMaxRequests(), @@ -307,12 +307,13 @@ func (cn *connection) WriteStatus(w io.Writer, t *Torrent) { fmt.Fprintf(w, " next pieces: %v%s\n", iter.ToSlice(iter.Head(10, cn.iterPendingPiecesUntyped)), func() string { - if cn.shouldRequestWithoutBias() { + if cn == t.fastestConn { return " (fastest)" } else { return "" } - }()) + }(), + ) } func (cn *connection) Close() { @@ -321,14 +322,14 @@ func (cn *connection) Close() { } cn.tickleWriter() cn.discardPieceInclination() - cn.pieceRequestOrder.Clear() + cn._pieceRequestOrder.Clear() if cn.conn != nil { go cn.conn.Close() } } func (cn *connection) PeerHasPiece(piece pieceIndex) bool { - return cn.peerSentHaveAll || cn.peerPieces.Contains(bitmap.BitIndex(piece)) + return cn.peerSentHaveAll || cn._peerPieces.Contains(bitmap.BitIndex(piece)) } // Writes a message into the write buffer. @@ -379,31 +380,44 @@ func (cn *connection) requestedMetadataPiece(index int) bool { // The actual value to use as the maximum outbound requests. func (cn *connection) nominalMaxRequests() (ret int) { - if cn.t.requestStrategy == 3 { - expectingTime := int64(cn.totalExpectingTime()) - if expectingTime == 0 { - expectingTime = math.MaxInt64 - } else { - expectingTime *= 2 - } - return int(clamp( - 1, - int64(cn.PeerMaxRequests), - max( - // It makes sense to always pipeline at least one connection, since latency must be - // non-zero. - 2, - // Request only as many as we expect to receive in the duplicateRequestTimeout - // window. We are trying to avoid having to duplicate requests. - cn.chunksReceivedWhileExpecting*int64(cn.t.duplicateRequestTimeout)/expectingTime, - ), - )) - } return int(clamp( 1, int64(cn.PeerMaxRequests), + int64(cn.t.requestStrategy.nominalMaxRequests(cn.requestStrategyConnection())), + )) +} + +// The actual value to use as the maximum outbound requests. +func (requestStrategyThree) nominalMaxRequests(cn requestStrategyConnection) (ret int) { + expectingTime := int64(cn.totalExpectingTime()) + if expectingTime == 0 { + expectingTime = math.MaxInt64 + } else { + expectingTime *= 2 + } + return int(clamp( + 1, + int64(cn.peerMaxRequests()), + max( + // It makes sense to always pipeline at least one connection, since latency must be + // non-zero. + 2, + // Request only as many as we expect to receive in the duplicateRequestTimeout + // window. We are trying to avoid having to duplicate requests. + cn.chunksReceivedWhileExpecting()*int64(cn.torrent().duplicateRequestTimeout())/expectingTime, + ), + )) +} +func defaultNominalMaxRequests(cn requestStrategyConnection) int { + return int( max(64, - cn.stats.ChunksReadUseful.Int64()-(cn.stats.ChunksRead.Int64()-cn.stats.ChunksReadUseful.Int64())))) + cn.stats().ChunksReadUseful.Int64()-(cn.stats().ChunksRead.Int64()-cn.stats().ChunksReadUseful.Int64()))) +} +func (rs requestStrategyOne) nominalMaxRequests(cn requestStrategyConnection) int { + return defaultNominalMaxRequests(cn) +} +func (rs requestStrategyTwo) nominalMaxRequests(cn requestStrategyConnection) int { + return defaultNominalMaxRequests(cn) } func (cn *connection) totalExpectingTime() (ret time.Duration) { @@ -519,7 +533,7 @@ func (cn *connection) request(r request, mw messageWriter) bool { } cn.validReceiveChunks[r] = struct{}{} cn.t.pendingRequests[r]++ - cn.t.lastRequested[r] = time.AfterFunc(cn.t.duplicateRequestTimeout, func() { + cn.t.lastRequested[r] = time.AfterFunc(cn.t._duplicateRequestTimeout, func() { torrent.Add("duplicate request timeouts", 1) cn.mu().Lock() defer cn.mu().Unlock() @@ -675,7 +689,7 @@ func (cn *connection) PostBitfield() { Type: pp.Bitfield, Bitfield: cn.t.bitfield(), }) - cn.sentHaves = cn.t.completedPieces.Copy() + cn.sentHaves = cn.t._completedPieces.Copy() } func (cn *connection) updateRequests() { @@ -702,29 +716,19 @@ func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func { } } -func (cn *connection) iterUnbiasedPieceRequestOrder(f func(piece pieceIndex) bool) bool { - now, readahead := cn.t.readerPiecePriorities() - var skip bitmap.Bitmap - if !cn.peerSentHaveAll { - // Pieces to skip include pieces the peer doesn't have. - skip = bitmap.Flip(cn.peerPieces, 0, bitmap.BitIndex(cn.t.numPieces())) - } - // And pieces that we already have. - skip.Union(cn.t.completedPieces) - skip.Union(cn.t.piecesQueuedForHash) +func iterUnbiasedPieceRequestOrder(cn requestStrategyConnection, f func(piece pieceIndex) bool) bool { + now, readahead := cn.torrent().readerPiecePriorities() + skip := bitmap.Flip(cn.peerPieces(), 0, cn.torrent().numPieces()) + skip.Union(cn.torrent().ignorePieces()) // Return an iterator over the different priority classes, minus the skip pieces. return iter.All( func(_piece interface{}) bool { - i := _piece.(bitmap.BitIndex) - if cn.t.hashingPiece(pieceIndex(i)) { - return true - } - return f(pieceIndex(i)) + return f(pieceIndex(_piece.(bitmap.BitIndex))) }, iterBitmapsDistinct(&skip, now, readahead), - // We have to iterate pendingPieces separately because it isn't a Bitmap. + // We have to iterate _pendingPieces separately because it isn't a Bitmap. func(cb iter.Callback) { - cn.t.pendingPieces.IterTyped(func(piece int) bool { + cn.torrent().pendingPieces().IterTyped(func(piece int) bool { if skip.Contains(piece) { return true } @@ -743,80 +747,109 @@ func (cn *connection) iterUnbiasedPieceRequestOrder(f func(piece pieceIndex) boo // assigned to the highest priority pieces, and assigning more than one this role would cause // significant wasted bandwidth. func (cn *connection) shouldRequestWithoutBias() bool { - if cn.t.requestStrategy != 2 { - return false - } - if len(cn.t.readers) == 0 { + return cn.t.requestStrategy.shouldRequestWithoutBias(cn.requestStrategyConnection()) +} + +func defaultShouldRequestWithoutBias(cn requestStrategyConnection) bool { + return false +} + +func (requestStrategyTwo) shouldRequestWithoutBias(cn requestStrategyConnection) bool { + if cn.torrent().numReaders() == 0 { return false } - if len(cn.t.conns) == 1 { + if cn.torrent().numConns() == 1 { return true } - if cn == cn.t.fastestConn { + if cn.fastest() { return true } return false } +func (requestStrategyOne) shouldRequestWithoutBias(cn requestStrategyConnection) bool { + return defaultShouldRequestWithoutBias(cn) +} + +func (requestStrategyThree) shouldRequestWithoutBias(cn requestStrategyConnection) bool { + return defaultShouldRequestWithoutBias(cn) +} + func (cn *connection) iterPendingPieces(f func(pieceIndex) bool) bool { if !cn.t.haveInfo() { return false } - if cn.t.requestStrategy == 3 { - return cn.iterUnbiasedPieceRequestOrder(f) - } - if cn.shouldRequestWithoutBias() { - return cn.iterUnbiasedPieceRequestOrder(f) + return cn.t.requestStrategy.iterPendingPieces(cn, f) +} +func (requestStrategyThree) iterPendingPieces(cn requestStrategyConnection, f func(pieceIndex) bool) bool { + return iterUnbiasedPieceRequestOrder(cn, f) +} +func defaultIterPendingPieces(rs requestStrategy, cn requestStrategyConnection, f func(pieceIndex) bool) bool { + if rs.shouldRequestWithoutBias(cn) { + return iterUnbiasedPieceRequestOrder(cn, f) } else { - return cn.pieceRequestOrder.IterTyped(func(i int) bool { + return cn.pieceRequestOrder().IterTyped(func(i int) bool { return f(pieceIndex(i)) }) } } +func (rs requestStrategyOne) iterPendingPieces(cn requestStrategyConnection, cb func(pieceIndex) bool) bool { + return defaultIterPendingPieces(rs, cn, cb) +} +func (rs requestStrategyTwo) iterPendingPieces(cn requestStrategyConnection, cb func(pieceIndex) bool) bool { + return defaultIterPendingPieces(rs, cn, cb) +} func (cn *connection) iterPendingPiecesUntyped(f iter.Callback) { cn.iterPendingPieces(func(i pieceIndex) bool { return f(i) }) } func (cn *connection) iterPendingRequests(piece pieceIndex, f func(request) bool) bool { - return iterUndirtiedChunks(piece, cn.t, func(cs chunkSpec) bool { - r := request{pp.Integer(piece), cs} - if cn.t.requestStrategy == 3 { - if _, ok := cn.t.lastRequested[r]; ok { - // This piece has been requested on another connection, and - // the duplicate request timer is still running. - return true - } - } - return f(r) - }) + return cn.t.requestStrategy.iterUndirtiedChunks(cn.t.piece(piece).requestStrategyPiece(), + func(cs chunkSpec) bool { + return f(request{pp.Integer(piece), cs}) + }) } -func iterUndirtiedChunks(piece pieceIndex, t *Torrent, f func(chunkSpec) bool) bool { - p := &t.pieces[piece] - if t.requestStrategy == 3 { - for i := pp.Integer(0); i < p.numChunks(); i++ { - if !p.dirtyChunks.Get(bitmap.BitIndex(i)) { - if !f(t.chunkIndexSpec(i, piece)) { - return false - } - } +func (requestStrategyThree) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool { + for i := pp.Integer(0); i < pp.Integer(p.numChunks()); i++ { + if p.dirtyChunks().Get(bitmap.BitIndex(i)) { + continue + } + ci := p.chunkIndexSpec(i) + if p.wouldDuplicateRecent(ci) { + continue + } + if !f(p.chunkIndexSpec(i)) { + return false } - return true } - chunkIndices := t.pieces[piece].undirtiedChunkIndices() + return true +} + +func defaultIterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool { + chunkIndices := p.dirtyChunks().Copy() + chunkIndices.FlipRange(0, bitmap.BitIndex(p.numChunks())) return iter.ForPerm(chunkIndices.Len(), func(i int) bool { ci, err := chunkIndices.RB.Select(uint32(i)) if err != nil { panic(err) } - return f(t.chunkIndexSpec(pp.Integer(ci), piece)) + return f(p.chunkIndexSpec(pp.Integer(ci))) }) } +func (rs requestStrategyOne) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool { + return defaultIterUndirtiedChunks(p, f) +} + +func (rs requestStrategyTwo) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool { + return defaultIterUndirtiedChunks(p, f) +} + // check callers updaterequests func (cn *connection) stopRequestingPiece(piece pieceIndex) bool { - return cn.pieceRequestOrder.Remove(bitmap.BitIndex(piece)) + return cn._pieceRequestOrder.Remove(bitmap.BitIndex(piece)) } // This is distinct from Torrent piece priority, which is the user's @@ -832,21 +865,34 @@ func (cn *connection) updatePiecePriority(piece pieceIndex) bool { return cn.stopRequestingPiece(piece) } prio := cn.getPieceInclination()[piece] - switch cn.t.requestStrategy { - case 1: - switch tpp { - case PiecePriorityNormal: - case PiecePriorityReadahead: - prio -= int(cn.t.numPieces()) - case PiecePriorityNext, PiecePriorityNow: - prio -= 2 * int(cn.t.numPieces()) - default: - panic(tpp) - } - prio += int(piece / 3) + prio = cn.t.requestStrategy.piecePriority(cn, piece, tpp, prio) + return cn._pieceRequestOrder.Set(bitmap.BitIndex(piece), prio) || cn.shouldRequestWithoutBias() +} + +func (requestStrategyOne) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int { + switch tpp { + case PiecePriorityNormal: + case PiecePriorityReadahead: + prio -= int(cn.torrent().numPieces()) + case PiecePriorityNext, PiecePriorityNow: + prio -= 2 * int(cn.torrent().numPieces()) default: + panic(tpp) } - return cn.pieceRequestOrder.Set(bitmap.BitIndex(piece), prio) || cn.shouldRequestWithoutBias() + prio += int(piece / 3) + return prio +} + +func defaultPiecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int { + return prio +} + +func (requestStrategyTwo) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int { + return defaultPiecePriority(cn, piece, tpp, prio) +} + +func (requestStrategyThree) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int { + return defaultPiecePriority(cn, piece, tpp, prio) } func (cn *connection) getPieceInclination() []int { @@ -892,7 +938,7 @@ func (cn *connection) peerSentHave(piece pieceIndex) error { return nil } cn.raisePeerMinPieces(piece + 1) - cn.peerPieces.Set(bitmap.BitIndex(piece), true) + cn._peerPieces.Set(bitmap.BitIndex(piece), true) if cn.updatePiecePriority(piece) { cn.updateRequests() } @@ -915,7 +961,7 @@ func (cn *connection) peerSentBitfield(bf []bool) error { if have { cn.raisePeerMinPieces(pieceIndex(i) + 1) } - cn.peerPieces.Set(i, have) + cn._peerPieces.Set(i, have) } cn.peerPiecesChanged() return nil @@ -923,13 +969,13 @@ func (cn *connection) peerSentBitfield(bf []bool) error { func (cn *connection) onPeerSentHaveAll() error { cn.peerSentHaveAll = true - cn.peerPieces.Clear() + cn._peerPieces.Clear() cn.peerPiecesChanged() return nil } func (cn *connection) peerSentHaveNone() error { - cn.peerPieces.Clear() + cn._peerPieces.Clear() cn.peerSentHaveAll = false cn.peerPiecesChanged() return nil @@ -977,7 +1023,7 @@ func (cn *connection) postHandshakeStats(f func(*ConnStats)) { // until the handshake is complete, after which it's expected to reconcile the // differences. func (cn *connection) allStats(f func(*ConnStats)) { - f(&cn.stats) + f(&cn._stats) if cn.reconciledHandshakeStats { cn.postHandshakeStats(f) } @@ -1295,7 +1341,7 @@ func (c *connection) receiveChunk(msg *pp.Message) error { // Request has been satisfied. if c.deleteRequest(req) { if c.expectingChunks() { - c.chunksReceivedWhileExpecting++ + c._chunksReceivedWhileExpecting++ } } else { torrent.Add("chunks received unwanted", 1) @@ -1391,7 +1437,7 @@ func (c *connection) uploadAllowed() bool { return false } // Don't upload more than 100 KiB more than we download. - if c.stats.BytesWrittenData.Int64() >= c.stats.BytesReadData.Int64()+100<<10 { + if c._stats.BytesWrittenData.Int64() >= c._stats.BytesReadData.Int64()+100<<10 { return false } return true @@ -1461,11 +1507,11 @@ func (cn *connection) Drop() { } func (cn *connection) netGoodPiecesDirtied() int64 { - return cn.stats.PiecesDirtiedGood.Int64() - cn.stats.PiecesDirtiedBad.Int64() + return cn._stats.PiecesDirtiedGood.Int64() - cn._stats.PiecesDirtiedBad.Int64() } func (c *connection) peerHasWantedPieces() bool { - return !c.pieceRequestOrder.IsEmpty() + return !c._pieceRequestOrder.IsEmpty() } func (c *connection) numLocalRequests() int { @@ -1584,3 +1630,39 @@ type connectionTrust struct { func (l connectionTrust) Less(r connectionTrust) bool { return multiless.New().Bool(l.Implicit, r.Implicit).Int64(l.NetGoodPiecesDirted, r.NetGoodPiecesDirted).Less() } + +func (cn *connection) requestStrategyConnection() requestStrategyConnection { + return cn +} + +func (cn *connection) chunksReceivedWhileExpecting() int64 { + return cn._chunksReceivedWhileExpecting +} + +func (cn *connection) fastest() bool { + return cn == cn.t.fastestConn +} + +func (cn *connection) peerMaxRequests() int { + return cn.PeerMaxRequests +} + +func (cn *connection) peerPieces() bitmap.Bitmap { + ret := cn._peerPieces.Copy() + if cn.peerSentHaveAll { + ret.AddRange(0, cn.t.numPieces()) + } + return ret +} + +func (cn *connection) pieceRequestOrder() *prioritybitmap.PriorityBitmap { + return &cn._pieceRequestOrder +} + +func (cn *connection) stats() *ConnStats { + return &cn._stats +} + +func (cn *connection) torrent() requestStrategyTorrent { + return cn.t.requestStrategyTorrent() +} diff --git a/connection_test.go b/connection_test.go index d9924d18..58153fa5 100644 --- a/connection_test.go +++ b/connection_test.go @@ -33,7 +33,7 @@ func TestSendBitfieldThenHave(t *testing.T) { c.w = w go c.writer(time.Minute) c.mu().Lock() - c.t.completedPieces.Add(1) + c.t._completedPieces.Add(1) c.PostBitfield( /*[]bool{false, true, false}*/ ) c.mu().Unlock() c.mu().Lock() @@ -105,7 +105,7 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) { PieceLength: 1 << 20, })) t.setChunkSize(defaultChunkSize) - t.pendingPieces.Set(0, PiecePriorityNormal.BitmapPriority()) + t._pendingPieces.Set(0, PiecePriorityNormal.BitmapPriority()) r, w := net.Pipe() cn := cl.newConnection(r, true, IpPort{}, "") cn.setTorrent(t) @@ -131,7 +131,7 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) { cl.lock() // The chunk must be written to storage everytime, to ensure the // writeSem is unlocked. - t.pieces[0].dirtyChunks.Clear() + t.pieces[0]._dirtyChunks.Clear() cn.validReceiveChunks = map[request]struct{}{newRequestFromMessage(&msg): {}} cl.unlock() n, err := w.Write(wb) @@ -141,5 +141,5 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) { } }() require.NoError(b, <-mrlErr) - require.EqualValues(b, b.N, cn.stats.ChunksReadUseful.Int64()) + require.EqualValues(b, b.N, cn._stats.ChunksReadUseful.Int64()) } diff --git a/file.go b/file.go index 2f0e6b60..b9548eba 100644 --- a/file.go +++ b/file.go @@ -3,7 +3,7 @@ package torrent import ( "strings" - "github.com/anacrolix/missinggo/bitmap" + "github.com/anacrolix/missinggo/v2/bitmap" "github.com/anacrolix/torrent/metainfo" ) @@ -58,7 +58,7 @@ func (f *File) bytesLeft() (left int64) { pieceSize := int64(f.t.usualPieceSize()) firstPieceIndex := f.firstPieceIndex() endPieceIndex := f.endPieceIndex() - 1 - bitmap.Flip(f.t.completedPieces, firstPieceIndex+1, endPieceIndex).IterTyped(func(piece int) bool { + bitmap.Flip(f.t._completedPieces, firstPieceIndex+1, endPieceIndex).IterTyped(func(piece int) bool { if piece >= endPieceIndex { return false } diff --git a/misc_test.go b/misc_test.go index b7272151..a5ff2fea 100644 --- a/misc_test.go +++ b/misc_test.go @@ -5,8 +5,8 @@ import ( "strings" "testing" - "github.com/anacrolix/missinggo/bitmap" "github.com/anacrolix/missinggo/iter" + "github.com/anacrolix/missinggo/v2/bitmap" "github.com/davecgh/go-spew/spew" "github.com/stretchr/testify/assert" ) diff --git a/piece.go b/piece.go index 2ebe3f8b..af4385c3 100644 --- a/piece.go +++ b/piece.go @@ -4,7 +4,7 @@ import ( "fmt" "sync" - "github.com/anacrolix/missinggo/bitmap" + "github.com/anacrolix/missinggo/v2/bitmap" "github.com/anacrolix/torrent/metainfo" pp "github.com/anacrolix/torrent/peer_protocol" @@ -46,7 +46,7 @@ type Piece struct { files []*File // Chunks we've written to since the last check. The chunk offset and // length can be determined by the request chunkSize in use. - dirtyChunks bitmap.Bitmap + _dirtyChunks bitmap.Bitmap hashing bool numVerifies int64 @@ -77,7 +77,7 @@ func (p *Piece) Storage() storage.Piece { } func (p *Piece) pendingChunkIndex(chunkIndex int) bool { - return !p.dirtyChunks.Contains(chunkIndex) + return !p._dirtyChunks.Contains(chunkIndex) } func (p *Piece) pendingChunk(cs chunkSpec, chunkSize pp.Integer) bool { @@ -85,20 +85,20 @@ func (p *Piece) pendingChunk(cs chunkSpec, chunkSize pp.Integer) bool { } func (p *Piece) hasDirtyChunks() bool { - return p.dirtyChunks.Len() != 0 + return p._dirtyChunks.Len() != 0 } func (p *Piece) numDirtyChunks() pp.Integer { - return pp.Integer(p.dirtyChunks.Len()) + return pp.Integer(p._dirtyChunks.Len()) } func (p *Piece) unpendChunkIndex(i int) { - p.dirtyChunks.Add(i) + p._dirtyChunks.Add(i) p.t.tickleReaders() } func (p *Piece) pendChunkIndex(i int) { - p.dirtyChunks.Remove(i) + p._dirtyChunks.Remove(i) } func (p *Piece) numChunks() pp.Integer { @@ -106,7 +106,7 @@ func (p *Piece) numChunks() pp.Integer { } func (p *Piece) undirtiedChunkIndices() (ret bitmap.Bitmap) { - ret = p.dirtyChunks.Copy() + ret = p._dirtyChunks.Copy() ret.FlipRange(0, bitmap.BitIndex(p.numChunks())) return } @@ -138,7 +138,7 @@ func (p *Piece) waitNoPendingWrites() { } func (p *Piece) chunkIndexDirty(chunk pp.Integer) bool { - return p.dirtyChunks.Contains(bitmap.BitIndex(chunk)) + return p._dirtyChunks.Contains(bitmap.BitIndex(chunk)) } func (p *Piece) chunkIndexSpec(chunk pp.Integer) chunkSpec { @@ -225,13 +225,13 @@ func (p *Piece) uncachedPriority() (ret piecePriority) { for _, f := range p.files { ret.Raise(f.prio) } - if p.t.readerNowPieces.Contains(int(p.index)) { + if p.t.readerNowPieces().Contains(int(p.index)) { ret.Raise(PiecePriorityNow) } - // if t.readerNowPieces.Contains(piece - 1) { + // if t._readerNowPieces.Contains(piece - 1) { // return PiecePriorityNext // } - if p.t.readerReadaheadPieces.Contains(bitmap.BitIndex(p.index)) { + if p.t.readerReadaheadPieces().Contains(bitmap.BitIndex(p.index)) { ret.Raise(PiecePriorityReadahead) } ret.Raise(p.priority) @@ -245,5 +245,20 @@ func (p *Piece) completion() (ret storage.Completion) { } func (p *Piece) allChunksDirty() bool { - return p.dirtyChunks.Len() == int(p.numChunks()) + return p._dirtyChunks.Len() == int(p.numChunks()) +} + +func (p *Piece) requestStrategyPiece() requestStrategyPiece { + return p +} + +func (p *Piece) dirtyChunks() bitmap.Bitmap { + return p._dirtyChunks +} + +func (p *Piece) wouldDuplicateRecent(cs chunkSpec) bool { + // This piece has been requested on another connection, and the duplicate request timer is still + // running. + _, ok := p.t.lastRequested[request{pp.Integer(p.index), cs}] + return ok } diff --git a/request_strategy.go b/request_strategy.go new file mode 100644 index 00000000..a09932a3 --- /dev/null +++ b/request_strategy.go @@ -0,0 +1,72 @@ +package torrent + +import ( + "time" + + "github.com/anacrolix/missinggo/v2/bitmap" + "github.com/anacrolix/missinggo/v2/prioritybitmap" + + pp "github.com/anacrolix/torrent/peer_protocol" +) + +type requestStrategyPiece interface { + numChunks() pp.Integer + dirtyChunks() bitmap.Bitmap + chunkIndexSpec(i pp.Integer) chunkSpec + wouldDuplicateRecent(chunkSpec) bool +} + +type requestStrategyTorrent interface { + numConns() int + numReaders() int + numPieces() int + readerPiecePriorities() (now, readahead bitmap.Bitmap) + ignorePieces() bitmap.Bitmap + pendingPieces() *prioritybitmap.PriorityBitmap + duplicateRequestTimeout() time.Duration +} + +type requestStrategyConnection interface { + torrent() requestStrategyTorrent + peerPieces() bitmap.Bitmap + pieceRequestOrder() *prioritybitmap.PriorityBitmap + fastest() bool + stats() *ConnStats + totalExpectingTime() time.Duration + peerMaxRequests() int + chunksReceivedWhileExpecting() int64 +} + +type requestStrategy interface { + iterPendingPieces(requestStrategyConnection, func(pieceIndex) bool) bool + iterUndirtiedChunks(requestStrategyPiece, func(chunkSpec) bool) bool + nominalMaxRequests(requestStrategyConnection) int + shouldRequestWithoutBias(requestStrategyConnection) bool + piecePriority(requestStrategyConnection, pieceIndex, piecePriority, int) int +} + +// Favour higher priority pieces with some fuzzing to reduce overlaps and wastage across +// connections. +type requestStrategyOne struct{} + +// The fastest connection downloads strictly in order of priority, while all others adhere to their +// piece inclinations. +type requestStrategyTwo struct{} + +func (requestStrategyTwo) ShouldRequestWithoutBias(cn requestStrategyConnection) bool { + if cn.torrent().numReaders() == 0 { + return false + } + if cn.torrent().numConns() == 1 { + return true + } + if cn.fastest() { + return true + } + return false +} + +// Requests are strictly by piece priority, and not duplicated until duplicateRequestTimeout is +// reached. +type requestStrategyThree struct { +} diff --git a/torrent.go b/torrent.go index 423614b4..b29fd9f8 100644 --- a/torrent.go +++ b/torrent.go @@ -18,11 +18,11 @@ import ( "github.com/anacrolix/dht/v2" "github.com/anacrolix/log" "github.com/anacrolix/missinggo" - "github.com/anacrolix/missinggo/bitmap" "github.com/anacrolix/missinggo/perf" - "github.com/anacrolix/missinggo/prioritybitmap" "github.com/anacrolix/missinggo/pubsub" "github.com/anacrolix/missinggo/slices" + "github.com/anacrolix/missinggo/v2/bitmap" + "github.com/anacrolix/missinggo/v2/prioritybitmap" "github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/metainfo" @@ -46,14 +46,10 @@ type Torrent struct { networkingEnabled bool - // Determines what chunks to request from peers. 1: Favour higher priority pieces with some - // fuzzing to reduce overlaps and wastage across connections. 2: The fastest connection - // downloads strictly in order of priority, while all others adhere to their piece inclinations. - // 3: Requests are strictly by piece priority, and not duplicated until duplicateRequestTimeout - // is reached. - requestStrategy int + // Determines what chunks to request from peers. + requestStrategy requestStrategy // How long to avoid duplicating a pending request. - duplicateRequestTimeout time.Duration + _duplicateRequestTimeout time.Duration closed missinggo.Event infoHash metainfo.Hash @@ -119,15 +115,15 @@ type Torrent struct { // Set when .Info is obtained. gotMetainfo missinggo.Event - readers map[*reader]struct{} - readerNowPieces bitmap.Bitmap - readerReadaheadPieces bitmap.Bitmap + readers map[*reader]struct{} + _readerNowPieces bitmap.Bitmap + _readerReadaheadPieces bitmap.Bitmap // A cache of pieces we need to get. Calculated from various piece and // file priorities and completion states elsewhere. - pendingPieces prioritybitmap.PriorityBitmap + _pendingPieces prioritybitmap.PriorityBitmap // A cache of completed piece indices. - completedPieces bitmap.Bitmap + _completedPieces bitmap.Bitmap // Pieces that need to be hashed. piecesQueuedForHash bitmap.Bitmap activePieceHashes int @@ -144,6 +140,37 @@ type Torrent struct { lastRequested map[request]*time.Timer } +func (t *Torrent) numConns() int { + return len(t.conns) +} + +func (t *Torrent) numReaders() int { + return len(t.readers) +} + +func (t *Torrent) readerNowPieces() bitmap.Bitmap { + return t._readerNowPieces +} + +func (t *Torrent) readerReadaheadPieces() bitmap.Bitmap { + return t._readerReadaheadPieces +} + +func (t *Torrent) ignorePieces() bitmap.Bitmap { + ret := t._completedPieces.Copy() + ret.Union(t.piecesQueuedForHash) + for i := 0; i < t.numPieces(); i++ { + if t.piece(i).hashing { + ret.Set(i, true) + } + } + return ret +} + +func (t *Torrent) pendingPieces() *prioritybitmap.PriorityBitmap { + return &t._pendingPieces +} + func (t *Torrent) tickleReaders() { t.cl.event.Broadcast() } @@ -198,7 +225,7 @@ func (t *Torrent) setChunkSize(size pp.Integer) { } func (t *Torrent) pieceComplete(piece pieceIndex) bool { - return t.completedPieces.Get(bitmap.BitIndex(piece)) + return t._completedPieces.Get(bitmap.BitIndex(piece)) } func (t *Torrent) pieceCompleteUncached(piece pieceIndex) storage.Completion { @@ -636,7 +663,7 @@ func (t *Torrent) bytesMissingLocked() int64 { } func (t *Torrent) bytesLeft() (left int64) { - bitmap.Flip(t.completedPieces, 0, bitmap.BitIndex(t.numPieces())).IterTyped(func(piece int) bool { + bitmap.Flip(t._completedPieces, 0, bitmap.BitIndex(t.numPieces())).IterTyped(func(piece int) bool { p := &t.pieces[piece] left += int64(p.length() - p.numDirtyBytes()) return true @@ -672,7 +699,7 @@ func (t *Torrent) numPieces() pieceIndex { } func (t *Torrent) numPiecesCompleted() (num int) { - return t.completedPieces.Len() + return t._completedPieces.Len() } func (t *Torrent) close() (err error) { @@ -713,7 +740,7 @@ func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) { func (t *Torrent) bitfield() (bf []bool) { bf = make([]bool, t.numPieces()) - t.completedPieces.IterTyped(func(piece int) (again bool) { + t._completedPieces.IterTyped(func(piece int) (again bool) { bf[piece] = true return true }) @@ -725,7 +752,7 @@ func (t *Torrent) pieceNumChunks(piece pieceIndex) pp.Integer { } func (t *Torrent) pendAllChunkSpecs(pieceIndex pieceIndex) { - t.pieces[pieceIndex].dirtyChunks.Clear() + t.pieces[pieceIndex]._dirtyChunks.Clear() } func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer { @@ -754,14 +781,14 @@ func (t *Torrent) hashPiece(piece pieceIndex) (ret metainfo.Hash, copyErr error) } func (t *Torrent) haveAnyPieces() bool { - return t.completedPieces.Len() != 0 + return t._completedPieces.Len() != 0 } func (t *Torrent) haveAllPieces() bool { if !t.haveInfo() { return false } - return t.completedPieces.Len() == bitmap.BitIndex(t.numPieces()) + return t._completedPieces.Len() == bitmap.BitIndex(t.numPieces()) } func (t *Torrent) havePiece(index pieceIndex) bool { @@ -803,7 +830,7 @@ func (t *Torrent) wantPieceIndex(index pieceIndex) bool { if t.pieceComplete(index) { return false } - if t.pendingPieces.Contains(bitmap.BitIndex(index)) { + if t._pendingPieces.Contains(bitmap.BitIndex(index)) { return true } // t.logger.Printf("piece %d not pending", index) @@ -821,7 +848,7 @@ func (t *Torrent) worstBadConn() *connection { heap.Init(&wcs) for wcs.Len() != 0 { c := heap.Pop(&wcs).(*connection) - if c.stats.ChunksReadWasted.Int64() >= 6 && c.stats.ChunksReadWasted.Int64() > c.stats.ChunksReadUseful.Int64() { + if c._stats.ChunksReadWasted.Int64() >= 6 && c._stats.ChunksReadWasted.Int64() > c._stats.ChunksReadUseful.Int64() { return c } // If the connection is in the worst half of the established @@ -863,7 +890,7 @@ func (t *Torrent) pieceNumPendingChunks(piece pieceIndex) pp.Integer { } func (t *Torrent) pieceAllDirty(piece pieceIndex) bool { - return t.pieces[piece].dirtyChunks.Len() == int(t.pieceNumChunks(piece)) + return t.pieces[piece]._dirtyChunks.Len() == int(t.pieceNumChunks(piece)) } func (t *Torrent) readersChanged() { @@ -872,7 +899,7 @@ func (t *Torrent) readersChanged() { } func (t *Torrent) updateReaderPieces() { - t.readerNowPieces, t.readerReadaheadPieces = t.readerPiecePriorities() + t._readerNowPieces, t._readerReadaheadPieces = t.readerPiecePriorities() } func (t *Torrent) readerPosChanged(from, to pieceRange) { @@ -922,11 +949,11 @@ func (t *Torrent) updatePiecePriority(piece pieceIndex) { newPrio := p.uncachedPriority() // t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio) if newPrio == PiecePriorityNone { - if !t.pendingPieces.Remove(bitmap.BitIndex(piece)) { + if !t._pendingPieces.Remove(bitmap.BitIndex(piece)) { return } } else { - if !t.pendingPieces.Set(bitmap.BitIndex(piece), newPrio.BitmapPriority()) { + if !t._pendingPieces.Set(bitmap.BitIndex(piece), newPrio.BitmapPriority()) { return } } @@ -982,7 +1009,7 @@ func (t *Torrent) forReaderOffsetPieces(f func(begin, end pieceIndex) (more bool } func (t *Torrent) piecePriority(piece pieceIndex) piecePriority { - prio, ok := t.pendingPieces.GetPriority(bitmap.BitIndex(piece)) + prio, ok := t._pendingPieces.GetPriority(bitmap.BitIndex(piece)) if !ok { return PiecePriorityNone } @@ -1067,7 +1094,7 @@ func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool { changed := cached != uncached complete := uncached.Complete p.storageCompletionOk = uncached.Ok - t.completedPieces.Set(bitmap.BitIndex(piece), complete) + t._completedPieces.Set(bitmap.BitIndex(piece), complete) if complete && len(p.dirtiers) != 0 { t.logger.Printf("marked piece %v complete but still has dirtiers", piece) } @@ -1131,7 +1158,7 @@ func (t *Torrent) needData() bool { if !t.haveInfo() { return true } - return t.pendingPieces.Len() != 0 + return t._pendingPieces.Len() != 0 } func appendMissingStrings(old, new []string) (ret []string) { @@ -1429,16 +1456,16 @@ func (t *Torrent) numTotalPeers() int { // Reconcile bytes transferred before connection was associated with a // torrent. func (t *Torrent) reconcileHandshakeStats(c *connection) { - if c.stats != (ConnStats{ + if c._stats != (ConnStats{ // Handshakes should only increment these fields: - BytesWritten: c.stats.BytesWritten, - BytesRead: c.stats.BytesRead, + BytesWritten: c._stats.BytesWritten, + BytesRead: c._stats.BytesRead, }) { panic("bad stats") } c.postHandshakeStats(func(cs *ConnStats) { - cs.BytesRead.Add(c.stats.BytesRead.Int64()) - cs.BytesWritten.Add(c.stats.BytesWritten.Int64()) + cs.BytesRead.Add(c._stats.BytesRead.Int64()) + cs.BytesWritten.Add(c._stats.BytesWritten.Int64()) }) c.reconciledHandshakeStats = true } @@ -1536,7 +1563,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, correct bool) { t.allStats((*ConnStats).incrementPiecesDirtiedGood) } for c := range p.dirtiers { - c.stats.incrementPiecesDirtiedGood() + c._stats.incrementPiecesDirtiedGood() } t.clearPieceTouchers(piece) err := p.Storage().MarkComplete() @@ -1550,7 +1577,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, correct bool) { t.allStats((*ConnStats).incrementPiecesDirtiedBad) for c := range p.dirtiers { // Y u do dis peer?! - c.stats.incrementPiecesDirtiedBad() + c._stats.incrementPiecesDirtiedBad() } bannableTouchers := make([]*connection, 0, len(p.dirtiers)) @@ -1770,3 +1797,11 @@ func (t *Torrent) dialTimeout() time.Duration { func (t *Torrent) piece(i int) *Piece { return &t.pieces[i] } + +func (t *Torrent) requestStrategyTorrent() requestStrategyTorrent { + return t +} + +func (t *Torrent) duplicateRequestTimeout() time.Duration { + return t._duplicateRequestTimeout +} diff --git a/torrent_test.go b/torrent_test.go index 9502b57e..6133f5df 100644 --- a/torrent_test.go +++ b/torrent_test.go @@ -96,7 +96,7 @@ func BenchmarkUpdatePiecePriorities(b *testing.B) { } assert.Len(b, t.readers, 7) for i := 0; i < int(t.numPieces()); i += 3 { - t.completedPieces.Set(i, true) + t._completedPieces.Set(i, true) } t.DownloadPieces(0, t.numPieces()) for range iter.N(b.N) { @@ -155,7 +155,7 @@ func TestPieceHashFailed(t *testing.T) { tt.setChunkSize(2) require.NoError(t, tt.setInfoBytes(mi.InfoBytes)) tt.cl.lock() - tt.pieces[1].dirtyChunks.AddRange(0, 3) + tt.pieces[1]._dirtyChunks.AddRange(0, 3) require.True(t, tt.pieceAllDirty(1)) tt.pieceHashed(1, false) // Dirty chunks should be cleared so we can try again.