]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Pass tests with new full-client request strategy implementation
authorMatt Joiner <anacrolix@gmail.com>
Sun, 9 May 2021 04:14:11 +0000 (14:14 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 7 Jun 2021 03:01:39 +0000 (13:01 +1000)
client.go
config.go
peer-impl.go
peerconn.go
piece.go
request-strategy-defaults.go
request-strategy.go
torrent.go
webseed-peer.go

index cab4c234307436a18223b4fa29df2eef5728a5df..b0f537ca2ff66815429c9aba892f0dbae81e67e9 100644 (file)
--- a/client.go
+++ b/client.go
@@ -81,6 +81,8 @@ type Client struct {
        websocketTrackers websocketTrackers
 
        activeAnnounceLimiter limiter.Instance
+
+       clientPieceRequestOrder
 }
 
 type ipStr string
@@ -293,6 +295,8 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) {
                },
        }
 
+       go cl.requester()
+
        return
 }
 
@@ -1139,7 +1143,6 @@ func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (
                webSeeds: make(map[string]*Peer),
        }
        t._pendingPieces.NewSet = priorityBitmapStableNewSet
-       t.requestStrategy = cl.config.DefaultRequestStrategy(t.requestStrategyCallbacks(), &cl._mu)
        t.logger = cl.logger.WithContextValue(t)
        t.setChunkSize(defaultChunkSize)
        return
index 373ce6feeecbcc17f70c8747004a7980eb5219e4..5aeef38bdaaa5420b59b95fa5649557bbd83c1c9 100644 (file)
--- a/config.go
+++ b/config.go
@@ -137,8 +137,6 @@ type ClientConfig struct {
        // OnQuery hook func
        DHTOnQuery func(query *krpc.Msg, source net.Addr) (propagate bool)
 
-       DefaultRequestStrategy requestStrategyMaker
-
        Extensions PeerExtensionBits
 
        DisableWebtorrent bool
@@ -185,10 +183,7 @@ func NewDefaultClientConfig() *ClientConfig {
                CryptoSelector: mse.DefaultCryptoSelector,
                CryptoProvides: mse.AllSupportedCrypto,
                ListenPort:     42069,
-
-               DefaultRequestStrategy: RequestStrategyDuplicateRequestTimeout(5 * time.Second),
-
-               Extensions: defaultPeerExtensionBytes(),
+               Extensions:     defaultPeerExtensionBytes(),
        }
        //cc.ConnTracker.SetNoMaxEntries()
        //cc.ConnTracker.Timeout = func(conntrack.Entry) time.Duration { return 0 }
index a04a160b3b466f90bb4882464ab018de6773de63..f4ad12a29dbd7f81079fe66df5052ca15c8ed416 100644 (file)
@@ -20,4 +20,5 @@ type peerImpl interface {
        drop()
        String() string
        connStatusString() string
+       writeBufferFull() bool
 }
index b26f439842fc238763dbd12e7d1f63051cd875e7..9522adf32a34584dadd13c207d2bec085f17d904 100644 (file)
@@ -337,16 +337,16 @@ func (cn *Peer) writeStatus(w io.Writer, t *Torrent) {
                cn.statusFlags(),
                cn.downloadRate()/(1<<10),
        )
-       fmt.Fprintf(w, "    next pieces: %v%s\n",
-               iter.ToSlice(iter.Head(10, cn.iterPendingPiecesUntyped)),
-               func() string {
-                       if cn == t.fastestPeer {
-                               return " (fastest)"
-                       } else {
-                               return ""
-                       }
-               }(),
-       )
+       //fmt.Fprintf(w, "    next pieces: %v%s\n",
+       //      iter.ToSlice(iter.Head(10, cn.iterPendingPiecesUntyped)),
+       //      func() string {
+       //              if cn == t.fastestPeer {
+       //                      return " (fastest)"
+       //              } else {
+       //                      return ""
+       //              }
+       //      }(),
+       //)
 }
 
 func (cn *Peer) close() {
@@ -402,7 +402,12 @@ func (cn *PeerConn) write(msg pp.Message) bool {
        cn.wroteMsg(&msg)
        cn.writeBuffer.Write(msg.MustMarshalBinary())
        torrent.Add(fmt.Sprintf("messages filled of type %s", msg.Type.String()), 1)
-       return cn.writeBuffer.Len() < writeBufferHighWaterLen
+       cn.tickleWriter()
+       return !cn.writeBufferFull()
+}
+
+func (cn *PeerConn) writeBufferFull() bool {
+       return cn.writeBuffer.Len() >= writeBufferHighWaterLen
 }
 
 func (cn *PeerConn) requestMetadataPiece(index int) {
@@ -440,11 +445,7 @@ func (cn *PeerConn) requestedMetadataPiece(index int) bool {
 
 // The actual value to use as the maximum outbound requests.
 func (cn *Peer) nominalMaxRequests() (ret int) {
-       return int(clamp(
-               1,
-               int64(cn.PeerMaxRequests),
-               int64(cn.t.requestStrategy.nominalMaxRequests(cn.requestStrategyConnection())),
-       ))
+       return cn.PeerMaxRequests
 }
 
 func (cn *Peer) totalExpectingTime() (ret time.Duration) {
@@ -528,12 +529,12 @@ func (pc *PeerConn) writeInterested(interested bool) bool {
 // are okay.
 type messageWriter func(pp.Message) bool
 
-func (cn *Peer) request(r Request) bool {
+func (cn *Peer) request(r Request) (more bool, err error) {
        if _, ok := cn.requests[r]; ok {
-               panic("chunk already requested")
+               return true, nil
        }
        if !cn.peerHasPiece(pieceIndex(r.Index)) {
-               panic("requesting piece peer doesn't have")
+               return true, errors.New("requesting piece peer doesn't have")
        }
        if !cn.t.peerIsActive(cn) {
                panic("requesting but not in active conns")
@@ -545,7 +546,7 @@ func (cn *Peer) request(r Request) bool {
                if cn.peerAllowedFast.Get(int(r.Index)) {
                        torrent.Add("allowed fast requests sent", 1)
                } else {
-                       panic("requesting while choking and not allowed fast")
+                       return cn.setInterested(true), errors.New("requesting while choked and not allowed fast")
                }
        }
        if cn.t.hashingPiece(pieceIndex(r.Index)) {
@@ -563,12 +564,11 @@ func (cn *Peer) request(r Request) bool {
        }
        cn.validReceiveChunks[r]++
        cn.t.pendingRequests[r]++
-       cn.t.requestStrategy.hooks().sentRequest(r)
        cn.updateExpectingChunks()
        for _, f := range cn.callbacks.SentRequest {
                f(PeerRequestEvent{cn, r})
        }
-       return cn.peerImpl.request(r)
+       return cn.peerImpl.request(r), nil
 }
 
 func (me *PeerConn) request(r Request) bool {
@@ -584,64 +584,7 @@ func (me *PeerConn) cancel(r Request) bool {
        return me.write(makeCancelMessage(r))
 }
 
-func (cn *Peer) doRequestState() bool {
-       if !cn.t.networkingEnabled || cn.t.dataDownloadDisallowed {
-               if !cn.setInterested(false) {
-                       return false
-               }
-               if len(cn.requests) != 0 {
-                       for r := range cn.requests {
-                               cn.deleteRequest(r)
-                               // log.Printf("%p: cancelling request: %v", cn, r)
-                               if !cn.peerImpl.cancel(r) {
-                                       return false
-                               }
-                       }
-               }
-       } else if len(cn.requests) <= cn.requestsLowWater {
-               filledBuffer := false
-               cn.iterPendingPieces(func(pieceIndex pieceIndex) bool {
-                       cn.iterPendingRequests(pieceIndex, func(r Request) bool {
-                               if !cn.setInterested(true) {
-                                       filledBuffer = true
-                                       return false
-                               }
-                               if len(cn.requests) >= cn.nominalMaxRequests() {
-                                       return false
-                               }
-                               // Choking is looked at here because our interest is dependent
-                               // on whether we'd make requests in its absence.
-                               if cn.peerChoking {
-                                       if !cn.peerAllowedFast.Get(bitmap.BitIndex(r.Index)) {
-                                               return false
-                                       }
-                               }
-                               if _, ok := cn.requests[r]; ok {
-                                       return true
-                               }
-                               filledBuffer = !cn.request(r)
-                               return !filledBuffer
-                       })
-                       return !filledBuffer
-               })
-               if filledBuffer {
-                       // If we didn't completely top up the requests, we shouldn't mark
-                       // the low water, since we'll want to top up the requests as soon
-                       // as we have more write buffer space.
-                       return false
-               }
-               cn.requestsLowWater = len(cn.requests) / 2
-               if len(cn.requests) == 0 {
-                       return cn.setInterested(false)
-               }
-       }
-       return true
-}
-
 func (cn *PeerConn) fillWriteBuffer() {
-       if !cn.doRequestState() {
-               return
-       }
        if cn.pex.IsEnabled() {
                if flow := cn.pex.Share(cn.write); !flow {
                        return
@@ -743,10 +686,13 @@ func (cn *PeerConn) updateRequests() {
 func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func {
        return func(cb iter.Callback) {
                for _, bm := range bms {
-                       bm.Sub(*skip)
                        if !iter.All(
-                               func(i interface{}) bool {
-                                       skip.Add(i.(int))
+                               func(_i interface{}) bool {
+                                       i := _i.(int)
+                                       if skip.Contains(i) {
+                                               return true
+                                       }
+                                       skip.Add(i)
                                        return cb(i)
                                },
                                bm.Iter,
@@ -757,62 +703,6 @@ func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func {
        }
 }
 
-func iterUnbiasedPieceRequestOrder(cn requestStrategyConnection, f func(piece pieceIndex) bool) bool {
-       now, readahead := cn.torrent().readerPiecePriorities()
-       skip := bitmap.Flip(cn.peerPieces(), 0, cn.torrent().numPieces())
-       skip.Union(cn.torrent().ignorePieces())
-       // Return an iterator over the different priority classes, minus the skip pieces.
-       return iter.All(
-               func(_piece interface{}) bool {
-                       return f(pieceIndex(_piece.(bitmap.BitIndex)))
-               },
-               iterBitmapsDistinct(&skip, now, readahead),
-               // We have to iterate _pendingPieces separately because it isn't a Bitmap.
-               func(cb iter.Callback) {
-                       cn.torrent().pendingPieces().IterTyped(func(piece int) bool {
-                               if skip.Contains(piece) {
-                                       return true
-                               }
-                               more := cb(piece)
-                               skip.Add(piece)
-                               return more
-                       })
-               },
-       )
-}
-
-// The connection should download highest priority pieces first, without any inclination toward
-// avoiding wastage. Generally we might do this if there's a single connection, or this is the
-// fastest connection, and we have active readers that signal an ordering preference. It's
-// conceivable that the best connection should do this, since it's least likely to waste our time if
-// assigned to the highest priority pieces, and assigning more than one this role would cause
-// significant wasted bandwidth.
-func (cn *Peer) shouldRequestWithoutBias() bool {
-       return cn.t.requestStrategy.shouldRequestWithoutBias(cn.requestStrategyConnection())
-}
-
-func (cn *Peer) iterPendingPieces(f func(pieceIndex) bool) {
-       if !cn.t.haveInfo() {
-               return
-       }
-       if cn.closed.IsSet() {
-               return
-       }
-       cn.t.requestStrategy.iterPendingPieces(cn, f)
-}
-func (cn *Peer) iterPendingPiecesUntyped(f iter.Callback) {
-       cn.iterPendingPieces(func(i pieceIndex) bool { return f(i) })
-}
-
-func (cn *Peer) iterPendingRequests(piece pieceIndex, f func(Request) bool) bool {
-       return cn.t.requestStrategy.iterUndirtiedChunks(
-               cn.t.piece(piece).requestStrategyPiece(),
-               func(cs ChunkSpec) bool {
-                       return f(Request{pp.Integer(piece), cs})
-               },
-       )
-}
-
 // check callers updaterequests
 func (cn *Peer) stopRequestingPiece(piece pieceIndex) bool {
        return cn._pieceRequestOrder.Remove(bitmap.BitIndex(piece))
@@ -831,8 +721,7 @@ func (cn *Peer) updatePiecePriority(piece pieceIndex) bool {
                return cn.stopRequestingPiece(piece)
        }
        prio := cn.getPieceInclination()[piece]
-       prio = cn.t.requestStrategy.piecePriority(cn, piece, tpp, prio)
-       return cn._pieceRequestOrder.Set(bitmap.BitIndex(piece), prio) || cn.shouldRequestWithoutBias()
+       return cn._pieceRequestOrder.Set(bitmap.BitIndex(piece), prio)
 }
 
 func (cn *Peer) getPieceInclination() []int {
@@ -1571,7 +1460,6 @@ func (c *Peer) deleteRequest(r Request) bool {
                f(PeerRequestEvent{c, r})
        }
        c.updateExpectingChunks()
-       c.t.requestStrategy.hooks().deletedRequest(r)
        pr := c.t.pendingRequests
        pr[r]--
        n := pr[r]
@@ -1722,10 +1610,6 @@ func (l connectionTrust) Less(r connectionTrust) bool {
        return multiless.New().Bool(l.Implicit, r.Implicit).Int64(l.NetGoodPiecesDirted, r.NetGoodPiecesDirted).Less()
 }
 
-func (cn *Peer) requestStrategyConnection() requestStrategyConnection {
-       return cn
-}
-
 func (cn *Peer) chunksReceivedWhileExpecting() int64 {
        return cn._chunksReceivedWhileExpecting
 }
@@ -1761,10 +1645,6 @@ func (cn *Peer) stats() *ConnStats {
        return &cn._stats
 }
 
-func (cn *Peer) torrent() requestStrategyTorrent {
-       return cn.t.requestStrategyTorrent()
-}
-
 func (p *Peer) TryAsPeerConn() (*PeerConn, bool) {
        pc, ok := p.peerImpl.(*PeerConn)
        return pc, ok
index fb9c8056f7f6fda3b8df53bd9d2cca6223717047..248832d0ae2cc345b0510257cb6f1b7b49af3fcb 100644 (file)
--- a/piece.go
+++ b/piece.go
@@ -144,7 +144,7 @@ func (p *Piece) chunkIndexSpec(chunk pp.Integer) ChunkSpec {
 func (p *Piece) chunkIndexRequest(chunkIndex pp.Integer) Request {
        return Request{
                pp.Integer(p.index),
-               chunkIndexSpec(chunkIndex, p.length(), p.chunkSize()),
+               p.chunkIndexSpec(chunkIndex),
        }
 }
 
@@ -259,10 +259,6 @@ func (p *Piece) allChunksDirty() bool {
        return p._dirtyChunks.Len() == int(p.numChunks())
 }
 
-func (p *Piece) requestStrategyPiece() requestStrategyPiece {
-       return p
-}
-
 func (p *Piece) dirtyChunks() bitmap.Bitmap {
        return p._dirtyChunks
 }
@@ -270,3 +266,15 @@ func (p *Piece) dirtyChunks() bitmap.Bitmap {
 func (p *Piece) State() PieceState {
        return p.t.PieceState(p.index)
 }
+
+func (p *Piece) iterUndirtiedChunks(f func(ChunkSpec) bool) bool {
+       for i := pp.Integer(0); i < p.numChunks(); i++ {
+               if p.chunkIndexDirty(i) {
+                       continue
+               }
+               if !f(p.chunkIndexSpec(i)) {
+                       return false
+               }
+       }
+       return true
+}
index 1ece5fc69b52861afaad4831606ade05fe10127d..10cbafc73d7b9237c864a6f3c625bf9c74366b21 100644 (file)
@@ -1,45 +1 @@
 package torrent
-
-import (
-       "github.com/anacrolix/missinggo/iter"
-       "github.com/anacrolix/missinggo/v2/bitmap"
-       pp "github.com/anacrolix/torrent/peer_protocol"
-)
-
-// Provides default implementations for requestStrategy methods. Could be embedded, or delegated to.
-type requestStrategyDefaults struct{}
-
-func (requestStrategyDefaults) hooks() requestStrategyHooks {
-       return requestStrategyHooks{
-               sentRequest:    func(Request) {},
-               deletedRequest: func(Request) {},
-       }
-}
-
-func (requestStrategyDefaults) iterUndirtiedChunks(p requestStrategyPiece, f func(ChunkSpec) bool) bool {
-       chunkIndices := p.dirtyChunks().Copy()
-       chunkIndices.FlipRange(0, bitmap.BitIndex(p.numChunks()))
-       return iter.ForPerm(chunkIndices.Len(), func(i int) bool {
-               ci, err := chunkIndices.RB.Select(uint32(i))
-               if err != nil {
-                       panic(err)
-               }
-               return f(p.chunkIndexRequest(pp.Integer(ci)).ChunkSpec)
-       })
-}
-
-func (requestStrategyDefaults) nominalMaxRequests(cn requestStrategyConnection) int {
-       return int(
-               max(
-                       64,
-                       cn.stats().ChunksReadUseful.Int64()-
-                               (cn.stats().ChunksRead.Int64()-cn.stats().ChunksReadUseful.Int64())))
-}
-
-func (requestStrategyDefaults) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
-       return prio
-}
-
-func (requestStrategyDefaults) shouldRequestWithoutBias(cn requestStrategyConnection) bool {
-       return false
-}
index 75cff9637fcbdb957329ea09a2397a358d90442e..62e7174f02c4929e87c4ad0775589b85bbcd628b 100644 (file)
 package torrent
 
 import (
-       "math"
-       "sync"
+       "sort"
        "time"
 
-       "github.com/anacrolix/missinggo/v2/bitmap"
-       "github.com/anacrolix/missinggo/v2/prioritybitmap"
-
+       "github.com/anacrolix/log"
+       "github.com/anacrolix/multiless"
        pp "github.com/anacrolix/torrent/peer_protocol"
+       "github.com/bradfitz/iter"
 )
 
-type requestStrategyPiece interface {
-       numChunks() pp.Integer
-       dirtyChunks() bitmap.Bitmap
-       chunkIndexRequest(i pp.Integer) Request
-}
-
-type requestStrategyTorrent interface {
-       numConns() int
-       numReaders() int
-       numPieces() int
-       readerPiecePriorities() (now, readahead bitmap.Bitmap)
-       ignorePieces() bitmap.Bitmap
-       pendingPieces() *prioritybitmap.PriorityBitmap
-}
-
-type requestStrategyConnection interface {
-       torrent() requestStrategyTorrent
-       peerPieces() bitmap.Bitmap
-       pieceRequestOrder() *prioritybitmap.PriorityBitmap
-       fastest() bool
-       stats() *ConnStats
-       totalExpectingTime() time.Duration
-       peerMaxRequests() int
-       chunksReceivedWhileExpecting() int64
-}
-
-type requestStrategy interface {
-       iterPendingPieces(requestStrategyConnection, func(pieceIndex) bool) bool
-       iterUndirtiedChunks(requestStrategyPiece, func(ChunkSpec) bool) bool
-       nominalMaxRequests(requestStrategyConnection) int
-       shouldRequestWithoutBias(requestStrategyConnection) bool
-       piecePriority(requestStrategyConnection, pieceIndex, piecePriority, int) int
-       hooks() requestStrategyHooks
-}
-
-type requestStrategyHooks struct {
-       sentRequest    func(Request)
-       deletedRequest func(Request)
-}
-
-type requestStrategyCallbacks interface {
-       requestTimedOut(Request)
-}
-
-type requestStrategyFuzzing struct {
-       requestStrategyDefaults
+type clientPieceRequestOrder struct {
+       pieces []pieceRequestOrderPiece
 }
 
-type requestStrategyFastest struct {
-       requestStrategyDefaults
+type pieceRequestOrderPiece struct {
+       t            *Torrent
+       index        pieceIndex
+       prio         piecePriority
+       partial      bool
+       availability int
 }
 
-func newRequestStrategyMaker(rs requestStrategy) requestStrategyMaker {
-       return func(requestStrategyCallbacks, sync.Locker) requestStrategy {
-               return rs
+func (me *clientPieceRequestOrder) addPieces(t *Torrent, numPieces pieceIndex) {
+       for i := range iter.N(numPieces) {
+               me.pieces = append(me.pieces, pieceRequestOrderPiece{
+                       t:     t,
+                       index: i,
+               })
        }
 }
 
-// The fastest connection downloads strictly in order of priority, while all others adhere to their
-// piece inclinations.
-func RequestStrategyFastest() requestStrategyMaker {
-       return newRequestStrategyMaker(requestStrategyFastest{})
-}
-
-// Favour higher priority pieces with some fuzzing to reduce overlaps and wastage across
-// connections.
-func RequestStrategyFuzzing() requestStrategyMaker {
-       return newRequestStrategyMaker(requestStrategyFuzzing{})
-}
-
-func (requestStrategyFastest) shouldRequestWithoutBias(cn requestStrategyConnection) bool {
-       if cn.torrent().numReaders() == 0 {
-               return false
-       }
-       if cn.torrent().numConns() == 1 {
-               return true
-       }
-       if cn.fastest() {
-               return true
+func (me *clientPieceRequestOrder) removePieces(t *Torrent) {
+       newPieces := make([]pieceRequestOrderPiece, 0, len(me.pieces)-t.numPieces())
+       for _, p := range me.pieces {
+               if p.t != t {
+                       newPieces = append(newPieces, p)
+               }
        }
-       return false
+       me.pieces = newPieces
 }
 
-type requestStrategyDuplicateRequestTimeout struct {
-       requestStrategyDefaults
-       // How long to avoid duplicating a pending request.
-       duplicateRequestTimeout time.Duration
-
-       callbacks requestStrategyCallbacks
-
-       // The last time we requested a chunk. Deleting the request from any connection will clear this
-       // value.
-       lastRequested map[Request]*time.Timer
-       // The lock to take when running a request timeout handler.
-       timeoutLocker sync.Locker
+func (me clientPieceRequestOrder) sort() {
+       sort.SliceStable(me.pieces, me.less)
 }
 
-// Generates a request strategy instance for a given torrent. callbacks are probably specific to the torrent.
-type requestStrategyMaker func(callbacks requestStrategyCallbacks, clientLocker sync.Locker) requestStrategy
-
-// Requests are strictly by piece priority, and not duplicated until duplicateRequestTimeout is
-// reached.
-func RequestStrategyDuplicateRequestTimeout(duplicateRequestTimeout time.Duration) requestStrategyMaker {
-       return func(callbacks requestStrategyCallbacks, clientLocker sync.Locker) requestStrategy {
-               return requestStrategyDuplicateRequestTimeout{
-                       duplicateRequestTimeout: duplicateRequestTimeout,
-                       callbacks:               callbacks,
-                       lastRequested:           make(map[Request]*time.Timer),
-                       timeoutLocker:           clientLocker,
-               }
+func (me clientPieceRequestOrder) update() {
+       for i := range me.pieces {
+               p := &me.pieces[i]
+               p.prio = p.t.piece(p.index).uncachedPriority()
+               p.partial = p.t.piecePartiallyDownloaded(p.index)
+               p.availability = p.t.pieceAvailability(p.index)
        }
 }
 
-func (rs requestStrategyDuplicateRequestTimeout) hooks() requestStrategyHooks {
-       return requestStrategyHooks{
-               deletedRequest: func(r Request) {
-                       if t, ok := rs.lastRequested[r]; ok {
-                               t.Stop()
-                               delete(rs.lastRequested, r)
-                       }
-               },
-               sentRequest: rs.onSentRequest,
+func (me clientPieceRequestOrder) less(_i, _j int) bool {
+       i := me.pieces[_i]
+       j := me.pieces[_j]
+       ml := multiless.New()
+       ml.Int(int(j.prio), int(i.prio))
+       ml.Bool(j.partial, i.partial)
+       ml.Int(i.availability, j.availability)
+       return ml.Less()
+}
+
+func (cl *Client) requester() {
+       for {
+               func() {
+                       cl.lock()
+                       defer cl.unlock()
+                       cl.doRequests()
+               }()
+               select {
+               case <-cl.closed.LockedChan(cl.locker()):
+                       return
+               case <-time.After(10 * time.Millisecond):
+               }
        }
 }
 
-func (rs requestStrategyDuplicateRequestTimeout) iterUndirtiedChunks(p requestStrategyPiece, f func(ChunkSpec) bool) bool {
-       for i := pp.Integer(0); i < pp.Integer(p.numChunks()); i++ {
-               if p.dirtyChunks().Get(bitmap.BitIndex(i)) {
-                       continue
+func (cl *Client) doRequests() {
+       requestOrder := clientPieceRequestOrder{}
+       allPeers := make(map[*Torrent][]*Peer)
+       storageCapacity := make(map[*Torrent]*int64)
+       for _, t := range cl.torrents {
+               // TODO: We could do metainfo requests here.
+               if t.haveInfo() {
+                       value := int64(t.usualPieceSize())
+                       storageCapacity[t] = &value
+                       requestOrder.addPieces(t, t.numPieces())
                }
-               r := p.chunkIndexRequest(i)
-               if rs.wouldDuplicateRecent(r) {
+               var peers []*Peer
+               t.iterPeers(func(p *Peer) {
+                       peers = append(peers, p)
+               })
+               allPeers[t] = peers
+       }
+       requestOrder.update()
+       requestOrder.sort()
+       for _, p := range requestOrder.pieces {
+               if p.t.ignorePieceForRequests(p.index) {
                        continue
                }
-               if !f(r.ChunkSpec) {
-                       return false
+               peers := allPeers[p.t]
+               torrentPiece := p.t.piece(p.index)
+               if left := storageCapacity[p.t]; left != nil {
+                       if *left < int64(torrentPiece.length()) {
+                               continue
+                       }
+                       *left -= int64(torrentPiece.length())
                }
+               p.t.piece(p.index).iterUndirtiedChunks(func(chunk ChunkSpec) bool {
+                       for _, peer := range peers {
+                               req := Request{pp.Integer(p.index), chunk}
+                               _, err := peer.request(req)
+                               if err == nil {
+                                       log.Printf("requested %v", req)
+                                       break
+                               }
+                       }
+                       return true
+               })
        }
-       return true
-}
-
-func (requestStrategyFuzzing) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
-       switch tpp {
-       case PiecePriorityNormal:
-       case PiecePriorityReadahead:
-               prio -= int(cn.torrent().numPieces())
-       case PiecePriorityNext, PiecePriorityNow:
-               prio -= 2 * int(cn.torrent().numPieces())
-       default:
-               panic(tpp)
-       }
-       prio += int(piece / 3)
-       return prio
-}
-
-func (requestStrategyDuplicateRequestTimeout) iterPendingPieces(cn requestStrategyConnection, f func(pieceIndex) bool) bool {
-       return iterUnbiasedPieceRequestOrder(cn, f)
-}
-func defaultIterPendingPieces(rs requestStrategy, cn requestStrategyConnection, f func(pieceIndex) bool) bool {
-       if rs.shouldRequestWithoutBias(cn) {
-               return iterUnbiasedPieceRequestOrder(cn, f)
-       } else {
-               return cn.pieceRequestOrder().IterTyped(func(i int) bool {
-                       return f(pieceIndex(i))
+       for _, t := range cl.torrents {
+               t.iterPeers(func(p *Peer) {
+                       if !p.peerChoking && p.numLocalRequests() == 0 && !p.writeBufferFull() {
+                               p.setInterested(false)
+                       }
                })
        }
 }
-func (rs requestStrategyFuzzing) iterPendingPieces(cn requestStrategyConnection, cb func(pieceIndex) bool) bool {
-       return defaultIterPendingPieces(rs, cn, cb)
-}
-func (rs requestStrategyFastest) iterPendingPieces(cn requestStrategyConnection, cb func(pieceIndex) bool) bool {
-       return defaultIterPendingPieces(rs, cn, cb)
-}
 
-func (rs requestStrategyDuplicateRequestTimeout) onSentRequest(r Request) {
-       rs.lastRequested[r] = time.AfterFunc(rs.duplicateRequestTimeout, func() {
-               rs.timeoutLocker.Lock()
-               delete(rs.lastRequested, r)
-               rs.timeoutLocker.Unlock()
-               rs.callbacks.requestTimedOut(r)
-       })
-}
-
-// The actual value to use as the maximum outbound requests.
-func (rs requestStrategyDuplicateRequestTimeout) nominalMaxRequests(cn requestStrategyConnection) (ret int) {
-       expectingTime := int64(cn.totalExpectingTime())
-       if expectingTime == 0 {
-               expectingTime = math.MaxInt64
-       } else {
-               expectingTime *= 2
-       }
-       return int(clamp(
-               1,
-               int64(cn.peerMaxRequests()),
-               max(
-                       // It makes sense to always pipeline at least one connection, since latency must be
-                       // non-zero.
-                       2,
-                       // Request only as many as we expect to receive in the duplicateRequestTimeout
-                       // window. We are trying to avoid having to duplicate requests.
-                       cn.chunksReceivedWhileExpecting()*int64(rs.duplicateRequestTimeout)/expectingTime,
-               ),
-       ))
-}
-func (rs requestStrategyDuplicateRequestTimeout) wouldDuplicateRecent(r Request) bool {
-       // This piece has been requested on another connection, and the duplicate request timer is still
-       // running.
-       _, ok := rs.lastRequested[r]
-       return ok
-}
+//func (requestStrategyDefaults) iterUndirtiedChunks(p requestStrategyPiece, f func(ChunkSpec) bool) bool {
+//     chunkIndices := p.dirtyChunks().Copy()
+//     chunkIndices.FlipRange(0, bitmap.BitIndex(p.numChunks()))
+//     return iter.ForPerm(chunkIndices.Len(), func(i int) bool {
+//             ci, err := chunkIndices.RB.Select(uint32(i))
+//             if err != nil {
+//                     panic(err)
+//             }
+//             return f(p.chunkIndexRequest(pp.Integer(ci)).ChunkSpec)
+//     })
+//}
+
+//
+//func iterUnbiasedPieceRequestOrder(
+//     cn requestStrategyConnection,
+//     f func(piece pieceIndex) bool,
+//     pieceRequestOrder []pieceIndex,
+//) bool {
+//     cn.torrent().sortPieceRequestOrder(pieceRequestOrder)
+//     for _, i := range pieceRequestOrder {
+//             if !cn.peerHasPiece(i) || cn.torrent().ignorePieceForRequests(i) {
+//                     continue
+//             }
+//             if !f(i) {
+//                     return false
+//             }
+//     }
+//     return true
+//}
index e38764742499d9db8fcd4b58b898cc9626d848aa..e8cf36e47f27b01a1e1cf0e51cdda6835223055d 100644 (file)
@@ -55,9 +55,6 @@ type Torrent struct {
        dataUploadDisallowed   bool
        userOnWriteChunkErr    func(error)
 
-       // Determines what chunks to request from peers.
-       requestStrategy requestStrategy
-
        closed   missinggo.Event
        infoHash metainfo.Hash
        pieces   []Piece
@@ -150,6 +147,29 @@ type Torrent struct {
        pex pexState
 }
 
+func (t *Torrent) pieceAvailability(i pieceIndex) (count int) {
+       t.iterPeers(func(peer *Peer) {
+               if peer.peerHasPiece(i) {
+                       count++
+               }
+       })
+       return
+}
+
+func (t *Torrent) sortPieceRequestOrder(sl []pieceIndex) {
+       if len(sl) != t.numPieces() {
+               panic(len(sl))
+       }
+       availability := make([]int, len(sl))
+       t.iterPeers(func(peer *Peer) {
+               for i := range availability {
+                       if peer.peerHasPiece(i) {
+                               availability[i]++
+                       }
+               }
+       })
+}
+
 func (t *Torrent) numConns() int {
        return len(t.conns)
 }
@@ -166,15 +186,8 @@ func (t *Torrent) readerReadaheadPieces() bitmap.Bitmap {
        return t._readerReadaheadPieces
 }
 
-func (t *Torrent) ignorePieces() bitmap.Bitmap {
-       ret := t._completedPieces.Copy()
-       ret.Union(t.piecesQueuedForHash)
-       for i := 0; i < t.numPieces(); i++ {
-               if t.piece(i).hashing {
-                       ret.Set(i, true)
-               }
-       }
-       return ret
+func (t *Torrent) ignorePieceForRequests(i pieceIndex) bool {
+       return !t.wantPieceIndex(i)
 }
 
 func (t *Torrent) pendingPieces() *prioritybitmap.PriorityBitmap {
@@ -413,6 +426,7 @@ func (t *Torrent) setInfo(info *metainfo.Info) error {
 
 // This seems to be all the follow-up tasks after info is set, that can't fail.
 func (t *Torrent) onSetInfo() {
+       t.cl.clientPieceRequestOrder.addPieces(t, t.numPieces())
        t.iterPeers(func(p *Peer) {
                p.onGotInfo(t.info)
        })
@@ -2026,30 +2040,6 @@ func (t *Torrent) piece(i int) *Piece {
        return &t.pieces[i]
 }
 
-func (t *Torrent) requestStrategyTorrent() requestStrategyTorrent {
-       return t
-}
-
-type torrentRequestStrategyCallbacks struct {
-       t *Torrent
-}
-
-func (cb torrentRequestStrategyCallbacks) requestTimedOut(r Request) {
-       torrent.Add("Request timeouts", 1)
-       cb.t.cl.lock()
-       defer cb.t.cl.unlock()
-       cb.t.iterPeers(func(cn *Peer) {
-               if cn.peerHasPiece(pieceIndex(r.Index)) {
-                       cn.updateRequests()
-               }
-       })
-
-}
-
-func (t *Torrent) requestStrategyCallbacks() requestStrategyCallbacks {
-       return torrentRequestStrategyCallbacks{t}
-}
-
 func (t *Torrent) onWriteChunkErr(err error) {
        if t.userOnWriteChunkErr != nil {
                go t.userOnWriteChunkErr(err)
@@ -2111,7 +2101,7 @@ func (t *Torrent) SetOnWriteChunkError(f func(error)) {
        t.userOnWriteChunkErr = f
 }
 
-func (t *Torrent) iterPeers(f func(*Peer)) {
+func (t *Torrent) iterPeers(f func(*Peer)) {
        for pc := range t.conns {
                f(&pc.Peer)
        }
index e2df582ed5cbbea02278984e917d7b79743ba336..9fa77a2857a685a8dda4b98643eaaaaf04010d58 100644 (file)
@@ -24,6 +24,10 @@ type webseedPeer struct {
 
 var _ peerImpl = (*webseedPeer)(nil)
 
+func (me *webseedPeer) writeBufferFull() bool {
+       return false
+}
+
 func (me *webseedPeer) connStatusString() string {
        return me.client.Url
 }
@@ -99,7 +103,6 @@ func (ws *webseedPeer) connectionFlags() string {
 func (ws *webseedPeer) drop() {}
 
 func (ws *webseedPeer) updateRequests() {
-       ws.peer.doRequestState()
 }
 
 func (ws *webseedPeer) onClose() {