]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Merge branch 'request-strategy-experiments' v1.39.0
authorMatt Joiner <anacrolix@gmail.com>
Sun, 12 Dec 2021 07:38:33 +0000 (18:38 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Sun, 12 Dec 2021 07:38:33 +0000 (18:38 +1100)
This should bring in significant performance improvements that fix issues with the peer-requesting that existed from v1.34.0.

15 files changed:
client.go
peer-impl.go
peerconn.go
piece.go
request-strategy-impls.go [new file with mode: 0644]
request-strategy/order.go
request-strategy/order_test.go [deleted file]
request-strategy/peer.go
request-strategy/piece-request-order.go [new file with mode: 0644]
request-strategy/piece.go
request-strategy/torrent.go
requesting.go
torrent-piece-request-order.go [new file with mode: 0644]
torrent.go
webseed-peer.go

index c491118b04d905413e2ec08556641d56ac774072..9d0777e40cb559d0c242c1ac961c358763938b23 100644 (file)
--- a/client.go
+++ b/client.go
@@ -27,6 +27,7 @@ import (
        "github.com/anacrolix/missinggo/v2/bitmap"
        "github.com/anacrolix/missinggo/v2/pproffd"
        "github.com/anacrolix/sync"
+       request_strategy "github.com/anacrolix/torrent/request-strategy"
        "github.com/davecgh/go-spew/spew"
        "github.com/dustin/go-humanize"
        "github.com/google/btree"
@@ -74,6 +75,7 @@ type Client struct {
        dopplegangerAddrs map[string]struct{}
        badPeerIPs        map[string]struct{}
        torrents          map[InfoHash]*Torrent
+       pieceRequestOrder map[interface{}]*request_strategy.PieceRequestOrder
 
        acceptLimiter   map[ipStr]int
        dialRateLimiter *rate.Limiter
index f7140377388125fb073a3ae3a55359855b7ae8a6..47b4345ae628f629003fde3da6848090edaccf23 100644 (file)
@@ -15,11 +15,10 @@ type peerImpl interface {
        isLowOnRequests() bool
        writeInterested(interested bool) bool
 
-       // Neither of these return buffer room anymore, because they're currently both posted. There's
-       // also PeerConn.writeBufferFull for when/where it matters.
-       _cancel(RequestIndex) bool
+       // _cancel initiates cancellation of a request and returns acked if it expects the cancel to be
+       // handled by a follow-up event.
+       _cancel(RequestIndex) (acked bool)
        _request(Request) bool
-
        connectionFlags() string
        onClose()
        onGotInfo(*metainfo.Info)
index fc3d9df3cb6132d8f36900f2e2b472e64c5144e1..098d93fc46a6ba8f59833eaa9dfbce07200ae6e9 100644 (file)
@@ -52,7 +52,7 @@ type PeerRemoteAddr interface {
 // indexable with the memory space available.
 type (
        maxRequests  = int
-       requestState = request_strategy.PeerNextRequestState
+       requestState = request_strategy.PeerRequestState
 )
 
 type Peer struct {
@@ -84,9 +84,8 @@ type Peer struct {
 
        // Stuff controlled by the local peer.
        needRequestUpdate    string
-       actualRequestState   requestState
+       requestState         requestState
        updateRequestsTimer  *time.Timer
-       cancelledRequests    roaring.Bitmap
        lastBecameInterested time.Time
        priorInterest        time.Duration
 
@@ -97,9 +96,9 @@ type Peer struct {
        choking                                bool
        piecesReceivedSinceLastRequestUpdate   maxRequests
        maxPiecesReceivedBetweenRequestUpdates maxRequests
-       // Chunks that we might reasonably expect to receive from the peer. Due to
-       // latency, buffering, and implementation differences, we may receive
-       // chunks that are no longer in the set of requests actually want.
+       // Chunks that we might reasonably expect to receive from the peer. Due to latency, buffering,
+       // and implementation differences, we may receive chunks that are no longer in the set of
+       // requests actually want. This could use a roaring.BSI if the memory use becomes noticeable.
        validReceiveChunks map[RequestIndex]int
        // Indexed by metadata piece, set to true if posted and pending a
        // response.
@@ -177,10 +176,10 @@ func (cn *Peer) updateExpectingChunks() {
 }
 
 func (cn *Peer) expectingChunks() bool {
-       if cn.actualRequestState.Requests.IsEmpty() {
+       if cn.requestState.Requests.IsEmpty() {
                return false
        }
-       if !cn.actualRequestState.Interested {
+       if !cn.requestState.Interested {
                return false
        }
        if !cn.peerChoking {
@@ -189,7 +188,7 @@ func (cn *Peer) expectingChunks() bool {
        haveAllowedFastRequests := false
        cn.peerAllowedFast.Iterate(func(i uint32) bool {
                haveAllowedFastRequests = roaringBitmapRangeCardinality(
-                       &cn.actualRequestState.Requests,
+                       &cn.requestState.Requests,
                        cn.t.pieceRequestIndexOffset(pieceIndex(i)),
                        cn.t.pieceRequestIndexOffset(pieceIndex(i+1)),
                ) == 0
@@ -230,7 +229,7 @@ func (l *PeerConn) hasPreferredNetworkOver(r *PeerConn) (left, ok bool) {
 
 func (cn *Peer) cumInterest() time.Duration {
        ret := cn.priorInterest
-       if cn.actualRequestState.Interested {
+       if cn.requestState.Interested {
                ret += time.Since(cn.lastBecameInterested)
        }
        return ret
@@ -318,7 +317,7 @@ func (cn *Peer) statusFlags() (ret string) {
        c := func(b byte) {
                ret += string([]byte{b})
        }
-       if cn.actualRequestState.Interested {
+       if cn.requestState.Interested {
                c('i')
        }
        if cn.choking {
@@ -346,7 +345,7 @@ func (cn *Peer) downloadRate() float64 {
 
 func (cn *Peer) numRequestsByPiece() (ret map[pieceIndex]int) {
        ret = make(map[pieceIndex]int)
-       cn.actualRequestState.Requests.Iterate(func(x uint32) bool {
+       cn.requestState.Requests.Iterate(func(x uint32) bool {
                ret[pieceIndex(x/cn.t.chunksPerRegularPiece())]++
                return true
        })
@@ -373,14 +372,14 @@ func (cn *Peer) writeStatus(w io.Writer, t *Torrent) {
                cn.totalExpectingTime(),
        )
        fmt.Fprintf(w,
-               "    %s completed, %d pieces touched, good chunks: %v/%v:%v reqq: %d-%v/(%d/%d):%d/%d, flags: %s, dr: %.1f KiB/s\n",
+               "    %s completed, %d pieces touched, good chunks: %v/%v:%v reqq: %d+%v/(%d/%d):%d/%d, flags: %s, dr: %.1f KiB/s\n",
                cn.completedString(),
                len(cn.peerTouchedPieces),
                &cn._stats.ChunksReadUseful,
                &cn._stats.ChunksRead,
                &cn._stats.ChunksWritten,
-               cn.actualRequestState.Requests.GetCardinality(),
-               cn.cancelledRequests.GetCardinality(),
+               cn.requestState.Requests.GetCardinality(),
+               cn.requestState.Cancelled.GetCardinality(),
                cn.nominalMaxRequests(),
                cn.PeerMaxRequests,
                len(cn.peerRequests),
@@ -537,10 +536,10 @@ func (cn *PeerConn) unchoke(msg func(pp.Message) bool) bool {
 }
 
 func (cn *Peer) setInterested(interested bool) bool {
-       if cn.actualRequestState.Interested == interested {
+       if cn.requestState.Interested == interested {
                return true
        }
-       cn.actualRequestState.Interested = interested
+       cn.requestState.Interested = interested
        if interested {
                cn.lastBecameInterested = time.Now()
        } else if !cn.lastBecameInterested.IsZero() {
@@ -589,7 +588,7 @@ func (cn *Peer) shouldRequest(r RequestIndex) error {
        if cn.peerChoking && !cn.peerAllowedFast.Contains(bitmap.BitIndex(pi)) {
                // This could occur if we made a request with the fast extension, and then got choked and
                // haven't had the request rejected yet.
-               if !cn.actualRequestState.Requests.Contains(r) {
+               if !cn.requestState.Requests.Contains(r) {
                        panic("peer choking and piece not allowed fast")
                }
        }
@@ -608,18 +607,19 @@ func (cn *Peer) request(r RequestIndex) (more bool, err error) {
        if err := cn.shouldRequest(r); err != nil {
                panic(err)
        }
-       if cn.actualRequestState.Requests.Contains(r) {
+       if cn.requestState.Requests.Contains(r) {
                return true, nil
        }
-       if maxRequests(cn.actualRequestState.Requests.GetCardinality()) >= cn.nominalMaxRequests() {
+       if maxRequests(cn.requestState.Requests.GetCardinality()) >= cn.nominalMaxRequests() {
                return true, errors.New("too many outstanding requests")
        }
-       cn.actualRequestState.Requests.Add(r)
+       cn.requestState.Requests.Add(r)
        if cn.validReceiveChunks == nil {
                cn.validReceiveChunks = make(map[RequestIndex]int)
        }
        cn.validReceiveChunks[r]++
-       cn.t.pendingRequests.Inc(r)
+       cn.t.pendingRequests[r] = cn
+       cn.t.lastRequested[r] = time.Now()
        cn.updateExpectingChunks()
        ppReq := cn.t.requestIndexToRequest(r)
        for _, f := range cn.callbacks.SentRequest {
@@ -637,31 +637,25 @@ func (me *PeerConn) _request(r Request) bool {
        })
 }
 
-func (me *Peer) cancel(r RequestIndex) bool {
-       if !me.actualRequestState.Requests.Contains(r) {
-               return true
+func (me *Peer) cancel(r RequestIndex) {
+       if !me.deleteRequest(r) {
+               panic("request not existing should have been guarded")
+       }
+       if me._cancel(r) {
+               if !me.requestState.Cancelled.CheckedAdd(r) {
+                       panic("request already cancelled")
+               }
+       }
+       if me.isLowOnRequests() {
+               me.updateRequests("Peer.cancel")
        }
-       return me._cancel(r)
 }
 
 func (me *PeerConn) _cancel(r RequestIndex) bool {
-       if me.cancelledRequests.Contains(r) {
-               // Already cancelled and waiting for a response.
-               return true
-       }
+       me.write(makeCancelMessage(me.t.requestIndexToRequest(r)))
        // Transmission does not send rejects for received cancels. See
        // https://github.com/transmission/transmission/pull/2275.
-       if me.fastEnabled() && !me.remoteIsTransmission() {
-               me.cancelledRequests.Add(r)
-       } else {
-               if !me.deleteRequest(r) {
-                       panic("request not existing should have been guarded")
-               }
-               if me.isLowOnRequests() {
-                       me.updateRequests("Peer.cancel")
-               }
-       }
-       return me.write(makeCancelMessage(me.t.requestIndexToRequest(r)))
+       return me.fastEnabled() && !me.remoteIsTransmission()
 }
 
 func (cn *PeerConn) fillWriteBuffer() {
@@ -1101,18 +1095,13 @@ func (c *PeerConn) mainReadLoop() (err error) {
                                c.deleteAllRequests()
                        } else {
                                // We don't decrement pending requests here, let's wait for the peer to either
-                               // reject or satisfy the outstanding requests. Additionally some peers may unchoke
+                               // reject or satisfy the outstanding requests. Additionally, some peers may unchoke
                                // us and resume where they left off, we don't want to have piled on to those chunks
-                               // in the meanwhile. I think a peers ability to abuse this should be limited: they
+                               // in the meanwhile. I think a peer's ability to abuse this should be limited: they
                                // could let us request a lot of stuff, then choke us and never reject, but they're
                                // only a single peer, our chunk balancing should smooth over this abuse.
                        }
                        c.peerChoking = true
-                       // We can now reset our interest. I think we do this after setting the flag in case the
-                       // peerImpl updates synchronously (webseeds?).
-                       if !c.actualRequestState.Requests.IsEmpty() {
-                               c.updateRequests("choked")
-                       }
                        c.updateExpectingChunks()
                case pp.Unchoke:
                        if !c.peerChoking {
@@ -1123,7 +1112,7 @@ func (c *PeerConn) mainReadLoop() (err error) {
                        }
                        c.peerChoking = false
                        preservedCount := 0
-                       c.actualRequestState.Requests.Iterate(func(x uint32) bool {
+                       c.requestState.Requests.Iterate(func(x uint32) bool {
                                if !c.peerAllowedFast.Contains(x / c.t.chunksPerRegularPiece()) {
                                        preservedCount++
                                }
@@ -1193,7 +1182,11 @@ func (c *PeerConn) mainReadLoop() (err error) {
                case pp.HaveNone:
                        err = c.peerSentHaveNone()
                case pp.Reject:
-                       c.remoteRejectedRequest(c.t.requestIndexFromRequest(newRequestFromMessage(&msg)))
+                       req := newRequestFromMessage(&msg)
+                       if !c.remoteRejectedRequest(c.t.requestIndexFromRequest(req)) {
+                               log.Printf("received invalid reject [request=%v, peer=%v]", req, c)
+                               err = fmt.Errorf("received invalid reject [request=%v]", req)
+                       }
                case pp.AllowedFast:
                        torrent.Add("allowed fasts received", 1)
                        log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).SetLevel(log.Debug).Log(c.t.logger)
@@ -1209,13 +1202,16 @@ func (c *PeerConn) mainReadLoop() (err error) {
        }
 }
 
-func (c *Peer) remoteRejectedRequest(r RequestIndex) {
-       if c.deleteRequest(r) {
-               if c.isLowOnRequests() {
-                       c.updateRequests("Peer.remoteRejectedRequest")
-               }
-               c.decExpectedChunkReceive(r)
+// Returns true if it was valid to reject the request.
+func (c *Peer) remoteRejectedRequest(r RequestIndex) bool {
+       if !c.deleteRequest(r) && !c.requestState.Cancelled.CheckedRemove(r) {
+               return false
        }
+       if c.isLowOnRequests() {
+               c.updateRequests("Peer.remoteRejectedRequest")
+       }
+       c.decExpectedChunkReceive(r)
+       return true
 }
 
 func (c *Peer) decExpectedChunkReceive(r RequestIndex) {
@@ -1341,16 +1337,16 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
        // The request needs to be deleted immediately to prevent cancels occurring asynchronously when
        // have actually already received the piece, while we have the Client unlocked to write the data
        // out.
-       deletedRequest := false
+       intended := false
        {
-               if c.actualRequestState.Requests.Contains(req) {
+               if c.requestState.Requests.Contains(req) {
                        for _, f := range c.callbacks.ReceivedRequested {
                                f(PeerMessageEvent{c, msg})
                        }
                }
                // Request has been satisfied.
-               if c.deleteRequest(req) {
-                       deletedRequest = true
+               if c.deleteRequest(req) || c.requestState.Cancelled.CheckedRemove(req) {
+                       intended = true
                        if !c.peerChoking {
                                c._chunksReceivedWhileExpecting++
                        }
@@ -1358,7 +1354,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
                                c.updateRequests("Peer.receiveChunk deleted request")
                        }
                } else {
-                       chunksReceived.Add("unwanted", 1)
+                       chunksReceived.Add("unintended", 1)
                }
        }
 
@@ -1368,7 +1364,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
        // Do we actually want this chunk?
        if t.haveChunk(ppReq) {
                // panic(fmt.Sprintf("%+v", ppReq))
-               chunksReceived.Add("wasted", 1)
+               chunksReceived.Add("redundant", 1)
                c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadWasted }))
                return nil
        }
@@ -1377,7 +1373,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
 
        c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
        c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
-       if deletedRequest {
+       if intended {
                c.piecesReceivedSinceLastRequestUpdate++
                c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulIntendedData }))
        }
@@ -1394,12 +1390,12 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
        piece.unpendChunkIndex(chunkIndexFromChunkSpec(ppReq.ChunkSpec, t.chunkSize))
 
        // Cancel pending requests for this chunk from *other* peers.
-       t.iterPeers(func(p *Peer) {
+       if p := t.pendingRequests[req]; p != nil {
                if p == c {
-                       return
+                       panic("should not be pending request from conn that just received it")
                }
                p.cancel(req)
-       })
+       }
 
        err := func() error {
                cl.unlock()
@@ -1540,26 +1536,33 @@ func (c *Peer) peerHasWantedPieces() bool {
        return c.peerPieces().Intersects(&c.t._pendingPieces)
 }
 
+// Returns true if an outstanding request is removed. Cancelled requests should be handled
+// separately.
 func (c *Peer) deleteRequest(r RequestIndex) bool {
-       if !c.actualRequestState.Requests.CheckedRemove(r) {
+       if !c.requestState.Requests.CheckedRemove(r) {
                return false
        }
-       c.cancelledRequests.Remove(r)
        for _, f := range c.callbacks.DeletedRequest {
                f(PeerRequestEvent{c, c.t.requestIndexToRequest(r)})
        }
        c.updateExpectingChunks()
-       c.t.pendingRequests.Dec(r)
+       if c.t.requestingPeer(r) != c {
+               panic("only one peer should have a given request at a time")
+       }
+       delete(c.t.pendingRequests, r)
+       delete(c.t.lastRequested, r)
        return true
 }
 
 func (c *Peer) deleteAllRequests() {
-       c.actualRequestState.Requests.Clone().Iterate(func(x uint32) bool {
-               c.deleteRequest(x)
+       c.requestState.Requests.Clone().Iterate(func(x uint32) bool {
+               if !c.deleteRequest(x) {
+                       panic("request should exist")
+               }
                return true
        })
-       if !c.actualRequestState.Requests.IsEmpty() {
-               panic(c.actualRequestState.Requests.GetCardinality())
+       if !c.requestState.Requests.IsEmpty() {
+               panic(c.requestState.Requests.GetCardinality())
        }
 }
 
@@ -1689,7 +1692,11 @@ func (p *Peer) TryAsPeerConn() (*PeerConn, bool) {
 }
 
 func (pc *PeerConn) isLowOnRequests() bool {
-       return pc.actualRequestState.Requests.IsEmpty()
+       return pc.requestState.Requests.IsEmpty() && pc.requestState.Cancelled.IsEmpty()
+}
+
+func (p *Peer) uncancelledRequests() uint64 {
+       return p.requestState.Requests.GetCardinality()
 }
 
 func (pc *PeerConn) remoteIsTransmission() bool {
index bef5f59e3d676b92fa36bc2eedafebbe5c9f37d5..6caa76284d90f2982226149c32942d9d89c2ffc6 100644 (file)
--- a/piece.go
+++ b/piece.go
@@ -77,11 +77,13 @@ func (p *Piece) numDirtyChunks() chunkIndexType {
 
 func (p *Piece) unpendChunkIndex(i chunkIndexType) {
        p.t.dirtyChunks.Add(p.requestIndexOffset() + i)
+       p.t.updatePieceRequestOrder(p.index)
        p.readerCond.Broadcast()
 }
 
 func (p *Piece) pendChunkIndex(i RequestIndex) {
        p.t.dirtyChunks.Remove(p.requestIndexOffset() + i)
+       p.t.updatePieceRequestOrder(p.index)
 }
 
 func (p *Piece) numChunks() chunkIndexType {
diff --git a/request-strategy-impls.go b/request-strategy-impls.go
new file mode 100644 (file)
index 0000000..f4c1264
--- /dev/null
@@ -0,0 +1,74 @@
+package torrent
+
+import (
+       "github.com/anacrolix/torrent/metainfo"
+       request_strategy "github.com/anacrolix/torrent/request-strategy"
+       "github.com/anacrolix/torrent/storage"
+)
+
+type requestStrategyInput struct {
+       cl      *Client
+       capFunc storage.TorrentCapacity
+}
+
+func (r requestStrategyInput) Torrent(ih metainfo.Hash) request_strategy.Torrent {
+       return requestStrategyTorrent{r.cl.torrents[ih]}
+}
+
+func (r requestStrategyInput) Capacity() (int64, bool) {
+       if r.capFunc == nil {
+               return 0, false
+       }
+       return (*r.capFunc)()
+}
+
+func (r requestStrategyInput) MaxUnverifiedBytes() int64 {
+       return r.cl.config.MaxUnverifiedBytes
+}
+
+var _ request_strategy.Input = requestStrategyInput{}
+
+// Returns what is necessary to run request_strategy.GetRequestablePieces for primaryTorrent.
+func (cl *Client) getRequestStrategyInput(primaryTorrent *Torrent) (input request_strategy.Input) {
+       return requestStrategyInput{
+               cl:      cl,
+               capFunc: primaryTorrent.storage.Capacity,
+       }
+}
+
+func (t *Torrent) getRequestStrategyInput() request_strategy.Input {
+       return t.cl.getRequestStrategyInput(t)
+}
+
+type requestStrategyTorrent struct {
+       t *Torrent
+}
+
+func (r requestStrategyTorrent) Piece(i int) request_strategy.Piece {
+       return requestStrategyPiece{r.t, i}
+}
+
+func (r requestStrategyTorrent) ChunksPerPiece() uint32 {
+       return r.t.chunksPerRegularPiece()
+}
+
+func (r requestStrategyTorrent) PieceLength() int64 {
+       return r.t.info.PieceLength
+}
+
+var _ request_strategy.Torrent = requestStrategyTorrent{}
+
+type requestStrategyPiece struct {
+       t *Torrent
+       i pieceIndex
+}
+
+func (r requestStrategyPiece) Request() bool {
+       return !r.t.ignorePieceForRequests(r.i)
+}
+
+func (r requestStrategyPiece) NumPendingChunks() int {
+       return int(r.t.pieceNumPendingChunks(r.i))
+}
+
+var _ request_strategy.Piece = requestStrategyPiece{}
index 9fb45002b234975d368e1db3194ea2b4a1981903..0fa516985ed654e67d474b9af710140680734ca7 100644 (file)
@@ -2,10 +2,11 @@ package request_strategy
 
 import (
        "bytes"
-       "sort"
-       "sync"
+       "expvar"
 
        "github.com/anacrolix/multiless"
+       "github.com/anacrolix/torrent/metainfo"
+       "github.com/google/btree"
 
        "github.com/anacrolix/torrent/types"
 )
@@ -20,333 +21,71 @@ type (
        ChunkSpec = types.ChunkSpec
 )
 
-type ClientPieceOrder struct{}
-
-type filterTorrent struct {
-       *Torrent
-       unverifiedBytes int64
-}
-
-func sortFilterPieces(pieces []filterPiece) {
-       sort.Slice(pieces, func(_i, _j int) bool {
-               i := &pieces[_i]
-               j := &pieces[_j]
-               return multiless.New().Int(
-                       int(j.Priority), int(i.Priority),
-               ).Bool(
-                       j.Partial, i.Partial,
-               ).Int64(
-                       i.Availability, j.Availability,
-               ).Int(
-                       i.index, j.index,
-               ).Lazy(func() multiless.Computation {
-                       return multiless.New().Cmp(bytes.Compare(
-                               i.t.InfoHash[:],
-                               j.t.InfoHash[:],
-                       ))
-               }).MustLess()
+type pieceOrderInput struct {
+       PieceRequestOrderState
+       PieceRequestOrderKey
+}
+
+func pieceOrderLess(i, j pieceOrderInput) multiless.Computation {
+       return multiless.New().Int(
+               int(j.Priority), int(i.Priority),
+       ).Bool(
+               j.Partial, i.Partial,
+       ).Int64(
+               i.Availability, j.Availability,
+       ).Int(
+               i.Index, j.Index,
+       ).Lazy(func() multiless.Computation {
+               return multiless.New().Cmp(bytes.Compare(
+                       i.InfoHash[:],
+                       j.InfoHash[:],
+               ))
        })
 }
 
-type requestsPeer struct {
-       Peer
-       nextState                  PeerNextRequestState
-       requestablePiecesRemaining int
-}
-
-func (rp *requestsPeer) canFitRequest() bool {
-       return int(rp.nextState.Requests.GetCardinality()) < rp.MaxRequests
-}
-
-func (rp *requestsPeer) addNextRequest(r RequestIndex) {
-       if !rp.nextState.Requests.CheckedAdd(r) {
-               panic("should only add once")
-       }
-}
-
-type peersForPieceRequests struct {
-       requestsInPiece int
-       *requestsPeer
-}
-
-func (me *peersForPieceRequests) addNextRequest(r RequestIndex) {
-       me.requestsPeer.addNextRequest(r)
-       me.requestsInPiece++
-}
-
-type requestablePiece struct {
-       index             pieceIndex
-       t                 *Torrent
-       alwaysReallocate  bool
-       NumPendingChunks  int
-       IterPendingChunks ChunksIterFunc
-}
-
-func (p *requestablePiece) chunkIndexToRequestIndex(c ChunkIndex) RequestIndex {
-       return p.t.ChunksPerPiece*uint32(p.index) + c
-}
-
-type filterPiece struct {
-       t     *filterTorrent
-       index pieceIndex
-       *Piece
-}
+var packageExpvarMap = expvar.NewMap("request-strategy")
 
 // Calls f with requestable pieces in order.
-func GetRequestablePieces(input Input, f func(t *Torrent, p *Piece, pieceIndex int)) {
-       maxPieces := 0
-       for i := range input.Torrents {
-               maxPieces += len(input.Torrents[i].Pieces)
-       }
-       pieces := make([]filterPiece, 0, maxPieces)
+func GetRequestablePieces(input Input, pro *PieceRequestOrder, f func(ih metainfo.Hash, pieceIndex int)) {
        // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
        // TorrentImpl. A nil value means no capacity limit.
        var storageLeft *int64
-       if input.Capacity != nil {
-               storageLeft = new(int64)
-               *storageLeft = *input.Capacity
-       }
-       for _t := range input.Torrents {
-               // TODO: We could do metainfo requests here.
-               t := &filterTorrent{
-                       Torrent:         &input.Torrents[_t],
-                       unverifiedBytes: 0,
-               }
-               for i := range t.Pieces {
-                       pieces = append(pieces, filterPiece{
-                               t:     t,
-                               index: i,
-                               Piece: &t.Pieces[i],
-                       })
-               }
+       if cap, ok := input.Capacity(); ok {
+               storageLeft = &cap
        }
-       sortFilterPieces(pieces)
        var allTorrentsUnverifiedBytes int64
-       for _, piece := range pieces {
-               if left := storageLeft; left != nil {
-                       if *left < piece.Length {
-                               continue
+       pro.tree.Ascend(func(i btree.Item) bool {
+               _i := i.(*pieceRequestOrderItem)
+               ih := _i.key.InfoHash
+               var t Torrent = input.Torrent(ih)
+               var piece Piece = t.Piece(_i.key.Index)
+               pieceLength := t.PieceLength()
+               if storageLeft != nil {
+                       if *storageLeft < pieceLength {
+                               return false
                        }
-                       *left -= piece.Length
+                       *storageLeft -= pieceLength
                }
-               if !piece.Request || piece.NumPendingChunks == 0 {
+               if !piece.Request() || piece.NumPendingChunks() == 0 {
                        // TODO: Clarify exactly what is verified. Stuff that's being hashed should be
                        // considered unverified and hold up further requests.
-                       continue
+                       return true
                }
-               if piece.t.MaxUnverifiedBytes != 0 && piece.t.unverifiedBytes+piece.Length > piece.t.MaxUnverifiedBytes {
-                       continue
+               if input.MaxUnverifiedBytes() != 0 && allTorrentsUnverifiedBytes+pieceLength > input.MaxUnverifiedBytes() {
+                       return true
                }
-               if input.MaxUnverifiedBytes != 0 && allTorrentsUnverifiedBytes+piece.Length > input.MaxUnverifiedBytes {
-                       continue
-               }
-               piece.t.unverifiedBytes += piece.Length
-               allTorrentsUnverifiedBytes += piece.Length
-               f(piece.t.Torrent, piece.Piece, piece.index)
-       }
+               allTorrentsUnverifiedBytes += pieceLength
+               f(ih, _i.key.Index)
+               return true
+       })
        return
 }
 
-type Input struct {
-       // This is all torrents that share the same capacity below (or likely a single torrent if there
-       // is infinite capacity, since you could just run it separately for each Torrent if that's the
-       // case).
-       Torrents []Torrent
-       // Must not be modified. Non-nil if capacity is not infinite, meaning that pieces of torrents
-       // that share the same capacity key must be incorporated in piece ordering.
-       Capacity *int64
+type Input interface {
+       Torrent(metainfo.Hash) Torrent
+       // Storage capacity, shared among all Torrents with the same storage.TorrentCapacity pointer in
+       // their storage.Torrent references.
+       Capacity() (cap int64, capped bool)
        // Across all the Torrents. This might be partitioned by storage capacity key now.
-       MaxUnverifiedBytes int64
-}
-
-// Checks that a sorted peersForPiece slice makes sense.
-func ensureValidSortedPeersForPieceRequests(peers *peersForPieceSorter) {
-       if !sort.IsSorted(peers) {
-               panic("not sorted")
-       }
-       peerMap := make(map[*peersForPieceRequests]struct{}, peers.Len())
-       for _, p := range peers.peersForPiece {
-               if _, ok := peerMap[p]; ok {
-                       panic(p)
-               }
-               peerMap[p] = struct{}{}
-       }
-}
-
-var peersForPiecesPool sync.Pool
-
-func makePeersForPiece(cap int) []*peersForPieceRequests {
-       got := peersForPiecesPool.Get()
-       if got == nil {
-               return make([]*peersForPieceRequests, 0, cap)
-       }
-       return got.([]*peersForPieceRequests)[:0]
-}
-
-type peersForPieceSorter struct {
-       peersForPiece []*peersForPieceRequests
-       req           *RequestIndex
-       p             requestablePiece
-}
-
-func (me *peersForPieceSorter) Len() int {
-       return len(me.peersForPiece)
-}
-
-func (me *peersForPieceSorter) Swap(i, j int) {
-       me.peersForPiece[i], me.peersForPiece[j] = me.peersForPiece[j], me.peersForPiece[i]
-}
-
-func (me *peersForPieceSorter) Less(_i, _j int) bool {
-       i := me.peersForPiece[_i]
-       j := me.peersForPiece[_j]
-       req := me.req
-       p := &me.p
-       byHasRequest := func() multiless.Computation {
-               ml := multiless.New()
-               if req != nil {
-                       iHas := i.nextState.Requests.Contains(*req)
-                       jHas := j.nextState.Requests.Contains(*req)
-                       ml = ml.Bool(jHas, iHas)
-               }
-               return ml
-       }()
-       ml := multiless.New()
-       // We always "reallocate", that is force even striping amongst peers that are either on
-       // the last piece they can contribute too, or for pieces marked for this behaviour.
-       // Striping prevents starving peers of requests, and will always re-balance to the
-       // fastest known peers.
-       if !p.alwaysReallocate {
-               ml = ml.Bool(
-                       j.requestablePiecesRemaining == 1,
-                       i.requestablePiecesRemaining == 1)
-       }
-       if p.alwaysReallocate || j.requestablePiecesRemaining == 1 {
-               ml = ml.Int(
-                       i.requestsInPiece,
-                       j.requestsInPiece)
-       } else {
-               ml = ml.AndThen(byHasRequest)
-       }
-       ml = ml.Int(
-               i.requestablePiecesRemaining,
-               j.requestablePiecesRemaining,
-       ).Float64(
-               j.DownloadRate,
-               i.DownloadRate,
-       )
-       if ml.Ok() {
-               return ml.Less()
-       }
-       ml = ml.AndThen(byHasRequest)
-       return ml.Int64(
-               int64(j.Age), int64(i.Age),
-               // TODO: Probably peer priority can come next
-       ).Uintptr(
-               i.Id.Uintptr(),
-               j.Id.Uintptr(),
-       ).MustLess()
-}
-
-func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
-       peersForPiece := makePeersForPiece(len(peers))
-       for _, peer := range peers {
-               if !peer.canRequestPiece(p.index) {
-                       continue
-               }
-               if !peer.canFitRequest() {
-                       peer.requestablePiecesRemaining--
-                       continue
-               }
-               peersForPiece = append(peersForPiece, &peersForPieceRequests{
-                       requestsInPiece: 0,
-                       requestsPeer:    peer,
-               })
-       }
-       defer func() {
-               for _, peer := range peersForPiece {
-                       peer.requestablePiecesRemaining--
-               }
-               peersForPiecesPool.Put(peersForPiece)
-       }()
-       peersForPieceSorter := peersForPieceSorter{
-               peersForPiece: peersForPiece,
-               p:             p,
-       }
-       sortPeersForPiece := func(req *RequestIndex) {
-               peersForPieceSorter.req = req
-               sort.Sort(&peersForPieceSorter)
-               // ensureValidSortedPeersForPieceRequests(&peersForPieceSorter)
-       }
-       // Chunks can be preassigned several times, if peers haven't been able to update their "actual"
-       // with "next" request state before another request strategy run occurs.
-       preallocated := make([][]*peersForPieceRequests, p.t.ChunksPerPiece)
-       p.IterPendingChunks(func(spec ChunkIndex) {
-               req := p.chunkIndexToRequestIndex(spec)
-               for _, peer := range peersForPiece {
-                       if !peer.ExistingRequests.Contains(req) {
-                               continue
-                       }
-                       if !peer.canFitRequest() {
-                               continue
-                       }
-                       preallocated[spec] = append(preallocated[spec], peer)
-                       peer.addNextRequest(req)
-               }
-       })
-       pendingChunksRemaining := int(p.NumPendingChunks)
-       p.IterPendingChunks(func(chunk ChunkIndex) {
-               if len(preallocated[chunk]) != 0 {
-                       return
-               }
-               req := p.chunkIndexToRequestIndex(chunk)
-               defer func() { pendingChunksRemaining-- }()
-               sortPeersForPiece(nil)
-               for _, peer := range peersForPiece {
-                       if !peer.canFitRequest() {
-                               continue
-                       }
-                       if !peer.PieceAllowedFast.ContainsInt(p.index) {
-                               // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
-                               peer.nextState.Interested = true
-                               if peer.Choking {
-                                       continue
-                               }
-                       }
-                       peer.addNextRequest(req)
-                       break
-               }
-       })
-chunk:
-       for chunk, prePeers := range preallocated {
-               if len(prePeers) == 0 {
-                       continue
-               }
-               pendingChunksRemaining--
-               req := p.chunkIndexToRequestIndex(ChunkIndex(chunk))
-               for _, pp := range prePeers {
-                       pp.requestsInPiece--
-               }
-               sortPeersForPiece(&req)
-               for _, pp := range prePeers {
-                       pp.nextState.Requests.Remove(req)
-               }
-               for _, peer := range peersForPiece {
-                       if !peer.canFitRequest() {
-                               continue
-                       }
-                       if !peer.PieceAllowedFast.ContainsInt(p.index) {
-                               // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
-                               peer.nextState.Interested = true
-                               if peer.Choking {
-                                       continue
-                               }
-                       }
-                       peer.addNextRequest(req)
-                       continue chunk
-               }
-       }
-       if pendingChunksRemaining != 0 {
-               panic(pendingChunksRemaining)
-       }
+       MaxUnverifiedBytes() int64
 }
diff --git a/request-strategy/order_test.go b/request-strategy/order_test.go
deleted file mode 100644 (file)
index bcac41d..0000000
+++ /dev/null
@@ -1,72 +0,0 @@
-package request_strategy
-
-import (
-       "encoding/gob"
-       "testing"
-
-       "github.com/RoaringBitmap/roaring"
-       qt "github.com/frankban/quicktest"
-       "github.com/google/go-cmp/cmp"
-)
-
-func init() {
-       gob.Register(chunkIterRange(0))
-       gob.Register(sliceChunksIter{})
-}
-
-type chunkIterRange ChunkIndex
-
-func (me chunkIterRange) Iter(f func(ChunkIndex)) {
-       for offset := ChunkIndex(0); offset < ChunkIndex(me); offset += 1 {
-               f(offset)
-       }
-}
-
-type sliceChunksIter []ChunkIndex
-
-func chunkIter(offsets ...ChunkIndex) ChunksIter {
-       return sliceChunksIter(offsets)
-}
-
-func (offsets sliceChunksIter) Iter(f func(ChunkIndex)) {
-       for _, offset := range offsets {
-               f(offset)
-       }
-}
-
-func requestSetFromSlice(rs ...RequestIndex) (ret roaring.Bitmap) {
-       ret.AddMany(rs)
-       return
-}
-
-func init() {
-       gob.Register(intPeerId(0))
-}
-
-type intPeerId int
-
-func (i intPeerId) Uintptr() uintptr {
-       return uintptr(i)
-}
-
-var hasAllRequests = func() (all roaring.Bitmap) {
-       all.AddRange(0, roaring.MaxRange)
-       return
-}()
-
-func checkNumRequestsAndInterest(c *qt.C, next PeerNextRequestState, num uint64, interest bool) {
-       addressableBm := next.Requests
-       c.Check(addressableBm.GetCardinality(), qt.ContentEquals, num)
-       c.Check(next.Interested, qt.Equals, interest)
-}
-
-func checkResultsRequestsLen(t *testing.T, reqs roaring.Bitmap, l uint64) {
-       qt.Check(t, reqs.GetCardinality(), qt.Equals, l)
-}
-
-var peerNextRequestStateChecker = qt.CmpEquals(
-       cmp.Transformer(
-               "bitmap",
-               func(bm roaring.Bitmap) []uint32 {
-                       return bm.ToArray()
-               }))
index b031d28e7fc1b2bdee3717f3a618366b3a456202..6a69535f955c70fd90f9721051e2971de7cb5298 100644 (file)
@@ -1,37 +1,13 @@
 package request_strategy
 
 import (
-       "time"
-
        "github.com/RoaringBitmap/roaring"
 )
 
-type PeerNextRequestState struct {
+type PeerRequestState struct {
        Interested bool
-       Requests   roaring.Bitmap
-}
-
-type PeerId interface {
-       Uintptr() uintptr
-}
-
-type Peer struct {
-       Pieces           roaring.Bitmap
-       MaxRequests      int
-       ExistingRequests roaring.Bitmap
-       Choking          bool
-       PieceAllowedFast roaring.Bitmap
-       DownloadRate     float64
-       Age              time.Duration
-       // This is passed back out at the end, so must support equality. Could be a type-param later.
-       Id PeerId
-}
-
-// TODO: This might be used in more places I think.
-func (p *Peer) canRequestPiece(i pieceIndex) bool {
-       return (!p.Choking || p.PieceAllowedFast.Contains(uint32(i))) && p.HasPiece(i)
-}
-
-func (p *Peer) HasPiece(i pieceIndex) bool {
-       return p.Pieces.Contains(uint32(i))
+       // Expecting
+       Requests roaring.Bitmap
+       // Cancelled and waiting response
+       Cancelled roaring.Bitmap
 }
diff --git a/request-strategy/piece-request-order.go b/request-strategy/piece-request-order.go
new file mode 100644 (file)
index 0000000..d906b83
--- /dev/null
@@ -0,0 +1,98 @@
+package request_strategy
+
+import (
+       "fmt"
+
+       "github.com/anacrolix/torrent/metainfo"
+       "github.com/google/btree"
+)
+
+func NewPieceOrder() *PieceRequestOrder {
+       return &PieceRequestOrder{
+               tree: btree.New(32),
+               keys: make(map[PieceRequestOrderKey]PieceRequestOrderState),
+       }
+}
+
+type PieceRequestOrder struct {
+       tree *btree.BTree
+       keys map[PieceRequestOrderKey]PieceRequestOrderState
+}
+
+type PieceRequestOrderKey struct {
+       InfoHash metainfo.Hash
+       Index    int
+}
+
+type PieceRequestOrderState struct {
+       Priority     piecePriority
+       Partial      bool
+       Availability int64
+}
+
+type pieceRequestOrderItem struct {
+       key   PieceRequestOrderKey
+       state PieceRequestOrderState
+}
+
+func (me *pieceRequestOrderItem) Less(other btree.Item) bool {
+       otherConcrete := other.(*pieceRequestOrderItem)
+       return pieceOrderLess(
+               pieceOrderInput{
+                       PieceRequestOrderState: me.state,
+                       PieceRequestOrderKey:   me.key,
+               },
+               pieceOrderInput{
+                       PieceRequestOrderState: otherConcrete.state,
+                       PieceRequestOrderKey:   otherConcrete.key,
+               },
+       ).Less()
+}
+
+func (me *PieceRequestOrder) Add(key PieceRequestOrderKey, state PieceRequestOrderState) {
+       if _, ok := me.keys[key]; ok {
+               panic(key)
+       }
+       if me.tree.ReplaceOrInsert(&pieceRequestOrderItem{
+               key:   key,
+               state: state,
+       }) != nil {
+               panic("shouldn't already have this")
+       }
+       me.keys[key] = state
+}
+
+func (me *PieceRequestOrder) Update(key PieceRequestOrderKey, state PieceRequestOrderState) {
+       item := me.existingItemForKey(key)
+       if item.state == state {
+               return
+       }
+       if me.tree.Delete(&item) == nil {
+               panic(fmt.Sprintf("%#v", key))
+       }
+       item.state = state
+       if me.tree.ReplaceOrInsert(&item) != nil {
+               panic(key)
+       }
+       me.keys[key] = state
+}
+
+func (me *PieceRequestOrder) existingItemForKey(key PieceRequestOrderKey) pieceRequestOrderItem {
+       return pieceRequestOrderItem{
+               key:   key,
+               state: me.keys[key],
+       }
+}
+
+func (me *PieceRequestOrder) Delete(key PieceRequestOrderKey) {
+       item := me.existingItemForKey(key)
+       if me.tree.Delete(&item) == nil {
+               panic(key)
+       }
+       delete(me.keys, key)
+       // log.Printf("deleting %#v", key)
+}
+
+func (me *PieceRequestOrder) Len() int {
+       return len(me.keys)
+}
index 8a038e67dafe8a9ea73531085e6057f0cf500077..626cc75bdb3a12207b8fc94e567562bf758d1036 100644 (file)
@@ -6,19 +6,7 @@ type ChunksIter interface {
        Iter(func(ci ChunkIndex))
 }
 
-type Piece struct {
-       Request           bool
-       Priority          piecePriority
-       Partial           bool
-       Availability      int64
-       Length            int64
-       NumPendingChunks  int
-       IterPendingChunks ChunksIter
-}
-
-func (p Piece) iterPendingChunksWrapper(f func(ChunkIndex)) {
-       i := p.IterPendingChunks
-       if i != nil {
-               i.Iter(f)
-       }
+type Piece interface {
+       Request() bool
+       NumPendingChunks() int
 }
index dbb41df33652ae480dab608aebb553d527e3b8c6..51fc1a6cc3516e86fbf1772fe68c916c9294ada6 100644 (file)
@@ -1,14 +1,7 @@
 package request_strategy
 
-import (
-       "github.com/anacrolix/torrent/metainfo"
-)
-
-type Torrent struct {
-       Pieces []Piece
-       // Some value that's unique and stable between runs.
-       InfoHash       metainfo.Hash
-       ChunksPerPiece uint32
-       // TODO: This isn't actually configurable anywhere yet.
-       MaxUnverifiedBytes int64
+type Torrent interface {
+       Piece(int) Piece
+       ChunksPerPiece() uint32
+       PieceLength() int64
 }
index 5ad98ea86b68ffae9eec34ef05f075357536cee7..a3a7e1c3a9d5c5b35b7885274ac021d83193a41d 100644 (file)
@@ -4,7 +4,6 @@ import (
        "container/heap"
        "context"
        "encoding/gob"
-       "math/rand"
        "reflect"
        "runtime/pprof"
        "time"
@@ -16,60 +15,12 @@ import (
        request_strategy "github.com/anacrolix/torrent/request-strategy"
 )
 
-// Returns what is necessary to run request_strategy.GetRequestablePieces for primaryTorrent.
-func (cl *Client) getRequestStrategyInput(primaryTorrent *Torrent) (input request_strategy.Input) {
-       input.MaxUnverifiedBytes = cl.config.MaxUnverifiedBytes
-       if !primaryTorrent.haveInfo() {
-               return
-       }
-       if capFunc := primaryTorrent.storage.Capacity; capFunc != nil {
-               if cap, ok := (*capFunc)(); ok {
-                       input.Capacity = &cap
-               }
+func (t *Torrent) requestStrategyPieceOrderState(i int) request_strategy.PieceRequestOrderState {
+       return request_strategy.PieceRequestOrderState{
+               Priority:     t.piece(i).purePriority(),
+               Partial:      t.piecePartiallyDownloaded(i),
+               Availability: t.piece(i).availability,
        }
-       if input.Capacity == nil {
-               input.Torrents = []request_strategy.Torrent{primaryTorrent.requestStrategyTorrentInput()}
-               return
-       }
-       input.Torrents = make([]request_strategy.Torrent, 0, len(cl.torrents))
-       for _, t := range cl.torrents {
-               if !t.haveInfo() {
-                       // This would be removed if metadata is handled here. Determining chunks per piece
-                       // requires the info. If we have no info, we have no pieces too, so the end result is
-                       // the same.
-                       continue
-               }
-               if t.storage.Capacity != primaryTorrent.storage.Capacity {
-                       continue
-               }
-               input.Torrents = append(input.Torrents, t.requestStrategyTorrentInput())
-       }
-       return
-}
-
-func (t *Torrent) getRequestStrategyInput() request_strategy.Input {
-       return t.cl.getRequestStrategyInput(t)
-}
-
-func (t *Torrent) requestStrategyTorrentInput() request_strategy.Torrent {
-       rst := request_strategy.Torrent{
-               InfoHash:       t.infoHash,
-               ChunksPerPiece: t.chunksPerRegularPiece(),
-       }
-       rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces))
-       for i := range t.pieces {
-               p := &t.pieces[i]
-               rst.Pieces = append(rst.Pieces, request_strategy.Piece{
-                       Request:           !t.ignorePieceForRequests(i),
-                       Priority:          p.purePriority(),
-                       Partial:           t.piecePartiallyDownloaded(i),
-                       Availability:      p.availability,
-                       Length:            int64(p.length()),
-                       NumPendingChunks:  int(t.pieceNumPendingChunks(i)),
-                       IterPendingChunks: &p.undirtiedChunksIter,
-               })
-       }
-       return rst
 }
 
 func init() {
@@ -116,9 +67,8 @@ type (
 )
 
 type peerRequests struct {
-       requestIndexes       []RequestIndex
-       peer                 *Peer
-       torrentStrategyInput *request_strategy.Torrent
+       requestIndexes []RequestIndex
+       peer           *Peer
 }
 
 func (p *peerRequests) Len() int {
@@ -129,22 +79,8 @@ func (p *peerRequests) Less(i, j int) bool {
        leftRequest := p.requestIndexes[i]
        rightRequest := p.requestIndexes[j]
        t := p.peer.t
-       leftPieceIndex := leftRequest / p.torrentStrategyInput.ChunksPerPiece
-       rightPieceIndex := rightRequest / p.torrentStrategyInput.ChunksPerPiece
-       leftCurrent := p.peer.actualRequestState.Requests.Contains(leftRequest)
-       rightCurrent := p.peer.actualRequestState.Requests.Contains(rightRequest)
-       pending := func(index RequestIndex, current bool) int {
-               ret := t.pendingRequests.Get(index)
-               if current {
-                       ret--
-               }
-               // See https://github.com/anacrolix/torrent/issues/679 for possible issues. This should be
-               // resolved.
-               if ret < 0 {
-                       panic(ret)
-               }
-               return ret
-       }
+       leftPieceIndex := leftRequest / t.chunksPerRegularPiece()
+       rightPieceIndex := rightRequest / t.chunksPerRegularPiece()
        ml := multiless.New()
        // Push requests that can't be served right now to the end. But we don't throw them away unless
        // there's a better alternative. This is for when we're using the fast extension and get choked
@@ -155,20 +91,43 @@ func (p *peerRequests) Less(i, j int) bool {
                        !p.peer.peerAllowedFast.Contains(rightPieceIndex),
                )
        }
+       leftPeer := t.pendingRequests[leftRequest]
+       rightPeer := t.pendingRequests[rightRequest]
+       ml = ml.Bool(rightPeer == p.peer, leftPeer == p.peer)
+       ml = ml.Bool(rightPeer == nil, leftPeer == nil)
+       if ml.Ok() {
+               return ml.MustLess()
+       }
+       if leftPeer != nil {
+               // The right peer should also be set, or we'd have resolved the computation by now.
+               ml = ml.Uint64(
+                       rightPeer.requestState.Requests.GetCardinality(),
+                       leftPeer.requestState.Requests.GetCardinality(),
+               )
+               // Could either of the lastRequested be Zero? That's what checking an existing peer is for.
+               leftLast := t.lastRequested[leftRequest]
+               rightLast := t.lastRequested[rightRequest]
+               if leftLast.IsZero() || rightLast.IsZero() {
+                       panic("expected non-zero last requested times")
+               }
+               // We want the most-recently requested on the left. Clients like Transmission serve requests
+               // in received order, so the most recently-requested is the one that has the longest until
+               // it will be served and therefore is the best candidate to cancel.
+               ml = ml.CmpInt64(rightLast.Sub(leftLast).Nanoseconds())
+       }
+       leftPiece := t.piece(int(leftPieceIndex))
+       rightPiece := t.piece(int(rightPieceIndex))
        ml = ml.Int(
-               pending(leftRequest, leftCurrent),
-               pending(rightRequest, rightCurrent))
-       ml = ml.Bool(!leftCurrent, !rightCurrent)
-       ml = ml.Int(
-               -int(p.torrentStrategyInput.Pieces[leftPieceIndex].Priority),
-               -int(p.torrentStrategyInput.Pieces[rightPieceIndex].Priority),
+               // Technically we would be happy with the cached priority here, except we don't actually
+               // cache it anymore, and Torrent.piecePriority just does another lookup of *Piece to resolve
+               // the priority through Piece.purePriority, which is probably slower.
+               -int(leftPiece.purePriority()),
+               -int(rightPiece.purePriority()),
        )
        ml = ml.Int(
-               int(p.torrentStrategyInput.Pieces[leftPieceIndex].Availability),
-               int(p.torrentStrategyInput.Pieces[rightPieceIndex].Availability))
-       ml = ml.Uint32(leftPieceIndex, rightPieceIndex)
-       ml = ml.Uint32(leftRequest, rightRequest)
-       return ml.MustLess()
+               int(leftPiece.availability),
+               int(rightPiece.availability))
+       return ml.Less()
 }
 
 func (p *peerRequests) Swap(i, j int) {
@@ -187,59 +146,58 @@ func (p *peerRequests) Pop() interface{} {
 }
 
 type desiredRequestState struct {
-       Requests   []RequestIndex
+       Requests   peerRequests
        Interested bool
 }
 
 func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
+       if !p.t.haveInfo() {
+               return
+       }
        input := p.t.getRequestStrategyInput()
        requestHeap := peerRequests{
                peer: p,
        }
-       for i := range input.Torrents {
-               t := &input.Torrents[i]
-               if t.InfoHash == p.t.infoHash {
-                       requestHeap.torrentStrategyInput = t
-                       break
-               }
-       }
        request_strategy.GetRequestablePieces(
                input,
-               func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
-                       if t.InfoHash != p.t.infoHash {
+               p.t.getPieceRequestOrder(),
+               func(ih InfoHash, pieceIndex int) {
+                       if ih != p.t.infoHash {
                                return
                        }
                        if !p.peerHasPiece(pieceIndex) {
                                return
                        }
                        allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex)
-                       rsp.IterPendingChunks.Iter(func(ci request_strategy.ChunkIndex) {
+                       p.t.piece(pieceIndex).undirtiedChunksIter.Iter(func(ci request_strategy.ChunkIndex) {
                                r := p.t.pieceRequestIndexOffset(pieceIndex) + ci
-                               // if p.t.pendingRequests.Get(r) != 0 && !p.actualRequestState.Requests.Contains(r) {
+                               // if p.t.pendingRequests.Get(r) != 0 && !p.requestState.Requests.Contains(r) {
                                //      return
                                // }
                                if !allowedFast {
-                                       // We must signal interest to request this
+                                       // We must signal interest to request this. TODO: We could set interested if the
+                                       // peers pieces (minus the allowed fast set) overlap with our missing pieces if
+                                       // there are any readers, or any pending pieces.
                                        desired.Interested = true
                                        // We can make or will allow sustaining a request here if we're not choked, or
                                        // have made the request previously (presumably while unchoked), and haven't had
                                        // the peer respond yet (and the request was retained because we are using the
                                        // fast extension).
-                                       if p.peerChoking && !p.actualRequestState.Requests.Contains(r) {
+                                       if p.peerChoking && !p.requestState.Requests.Contains(r) {
                                                // We can't request this right now.
                                                return
                                        }
                                }
+                               if p.requestState.Cancelled.Contains(r) {
+                                       // Can't re-request.
+                                       return
+                               }
                                requestHeap.requestIndexes = append(requestHeap.requestIndexes, r)
                        })
                },
        )
        p.t.assertPendingRequests()
-       heap.Init(&requestHeap)
-       for requestHeap.Len() != 0 && len(desired.Requests) < p.nominalMaxRequests() {
-               requestIndex := heap.Pop(&requestHeap).(RequestIndex)
-               desired.Requests = append(desired.Requests, requestIndex)
-       }
+       desired.Requests = requestHeap
        return
 }
 
@@ -261,78 +219,38 @@ func (p *Peer) maybeUpdateActualRequestState() bool {
 
 // Transmit/action the request state to the peer.
 func (p *Peer) applyRequestState(next desiredRequestState) bool {
-       current := &p.actualRequestState
+       current := &p.requestState
        if !p.setInterested(next.Interested) {
                return false
        }
        more := true
-       cancel := current.Requests.Clone()
-       for _, ri := range next.Requests {
-               cancel.Remove(ri)
-       }
-       cancel.Iterate(func(req uint32) bool {
-               more = p.cancel(req)
-               return more
-       })
-       if !more {
-               return false
-       }
-       shuffled := false
-       lastPending := 0
-       for i := 0; i < len(next.Requests); i++ {
-               req := next.Requests[i]
-               if p.cancelledRequests.Contains(req) {
-                       // Waiting for a reject or piece message, which will suitably trigger us to update our
-                       // requests, so we can skip this one with no additional consideration.
-                       continue
-               }
-               // The cardinality of our desired requests shouldn't exceed the max requests since it's used
-               // in the calculation of the requests. However, if we cancelled requests and they haven't
-               // been rejected or serviced yet with the fast extension enabled, we can end up with more
-               // extra outstanding requests. We could subtract the number of outstanding cancels from the
-               // next request cardinality, but peers might not like that.
-               if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() {
-                       // log.Printf("not assigning all requests [desired=%v, cancelled=%v, current=%v, max=%v]",
-                       //      next.Requests.GetCardinality(),
-                       //      p.cancelledRequests.GetCardinality(),
-                       //      current.Requests.GetCardinality(),
-                       //      p.nominalMaxRequests(),
-                       // )
-                       break
-               }
-               otherPending := p.t.pendingRequests.Get(next.Requests[0])
-               if p.actualRequestState.Requests.Contains(next.Requests[0]) {
-                       otherPending--
-               }
-               if otherPending < lastPending {
-                       // Pending should only rise. It's supposed to be the strongest ordering criteria. If it
-                       // doesn't, our shuffling condition could be wrong.
-                       panic(lastPending)
-               }
-               // If the request has already been requested by another peer, shuffle this and the rest of
-               // the requests (since according to the increasing condition, the rest of the indices
-               // already have an outstanding request with another peer).
-               if !shuffled && otherPending > 0 {
-                       shuffleReqs := next.Requests[i:]
-                       rand.Shuffle(len(shuffleReqs), func(i, j int) {
-                               shuffleReqs[i], shuffleReqs[j] = shuffleReqs[j], shuffleReqs[i]
-                       })
-                       // log.Printf("shuffled reqs [%v:%v]", i, len(next.Requests))
-                       shuffled = true
-                       // Repeat this index
-                       i--
-                       continue
+       requestHeap := &next.Requests
+       t := p.t
+       heap.Init(requestHeap)
+       for requestHeap.Len() != 0 && maxRequests(current.Requests.GetCardinality()) < p.nominalMaxRequests() {
+               req := heap.Pop(requestHeap).(RequestIndex)
+               existing := t.requestingPeer(req)
+               if existing != nil && existing != p {
+                       // Don't steal from the poor.
+                       diff := int64(current.Requests.GetCardinality()) + 1 - (int64(existing.uncancelledRequests()) - 1)
+                       // Steal a request that leaves us with one more request than the existing peer
+                       // connection if the stealer more recently received a chunk.
+                       if diff > 1 || (diff == 1 && p.lastUsefulChunkReceived.Before(existing.lastUsefulChunkReceived)) {
+                               continue
+                       }
+                       t.cancelRequest(req)
                }
-
                more = p.mustRequest(req)
                if !more {
                        break
                }
        }
+       // TODO: This may need to change, we might want to update even if there were no requests due to
+       // filtering them for being recently requested already.
        p.updateRequestsTimer.Stop()
        if more {
                p.needRequestUpdate = ""
-               if !current.Requests.IsEmpty() {
+               if current.Interested {
                        p.updateRequestsTimer.Reset(3 * time.Second)
                }
        }
diff --git a/torrent-piece-request-order.go b/torrent-piece-request-order.go
new file mode 100644 (file)
index 0000000..7b1ef97
--- /dev/null
@@ -0,0 +1,51 @@
+package torrent
+
+import (
+       request_strategy "github.com/anacrolix/torrent/request-strategy"
+)
+
+func (t *Torrent) updatePieceRequestOrder(pieceIndex int) {
+       t.cl.pieceRequestOrder[t.clientPieceRequestOrderKey()].Update(
+               t.pieceRequestOrderKey(pieceIndex),
+               t.requestStrategyPieceOrderState(pieceIndex))
+}
+
+func (t *Torrent) clientPieceRequestOrderKey() interface{} {
+       if t.storage.Capacity == nil {
+               return t
+       }
+       return t.storage.Capacity
+}
+
+func (t *Torrent) deletePieceRequestOrder() {
+       cpro := t.cl.pieceRequestOrder
+       key := t.clientPieceRequestOrderKey()
+       pro := cpro[key]
+       for i := 0; i < t.numPieces(); i++ {
+               pro.Delete(t.pieceRequestOrderKey(i))
+       }
+       if pro.Len() == 0 {
+               delete(cpro, key)
+       }
+}
+
+func (t *Torrent) initPieceRequestOrder() {
+       if t.cl.pieceRequestOrder == nil {
+               t.cl.pieceRequestOrder = make(map[interface{}]*request_strategy.PieceRequestOrder)
+       }
+       key := t.clientPieceRequestOrderKey()
+       cpro := t.cl.pieceRequestOrder
+       if cpro[key] == nil {
+               cpro[key] = request_strategy.NewPieceOrder()
+       }
+}
+
+func (t *Torrent) addRequestOrderPiece(i int) {
+       t.cl.pieceRequestOrder[t.clientPieceRequestOrderKey()].Add(
+               t.pieceRequestOrderKey(i),
+               t.requestStrategyPieceOrderState(i))
+}
+
+func (t *Torrent) getPieceRequestOrder() *request_strategy.PieceRequestOrder {
+       return t.cl.pieceRequestOrder[t.clientPieceRequestOrderKey()]
+}
index 7ccf13f680444b8fea5ba1d0a77813b1eb4e6aaf..508de54c34ba4654ed7459076b813daa8880fe50 100644 (file)
@@ -27,6 +27,7 @@ import (
        "github.com/anacrolix/missinggo/v2/bitmap"
        "github.com/anacrolix/multiless"
        "github.com/anacrolix/sync"
+       request_strategy "github.com/anacrolix/torrent/request-strategy"
        "github.com/davecgh/go-spew/spew"
        "github.com/pion/datachannel"
 
@@ -137,7 +138,8 @@ type Torrent struct {
        initialPieceCheckDisabled bool
 
        // Count of each request across active connections.
-       pendingRequests pendingRequests
+       pendingRequests map[RequestIndex]*Peer
+       lastRequested   map[RequestIndex]time.Time
        // Chunks we've written to since the corresponding piece was last checked.
        dirtyChunks roaring.Bitmap
 
@@ -165,6 +167,7 @@ func (t *Torrent) decPieceAvailability(i pieceIndex) {
                panic(p.availability)
        }
        p.availability--
+       t.updatePieceRequestOrder(i)
 }
 
 func (t *Torrent) incPieceAvailability(i pieceIndex) {
@@ -172,6 +175,7 @@ func (t *Torrent) incPieceAvailability(i pieceIndex) {
        if t.haveInfo() {
                p := t.piece(i)
                p.availability++
+               t.updatePieceRequestOrder(i)
        }
 }
 
@@ -424,8 +428,16 @@ func (t *Torrent) setInfo(info *metainfo.Info) error {
        return nil
 }
 
+func (t *Torrent) pieceRequestOrderKey(i int) request_strategy.PieceRequestOrderKey {
+       return request_strategy.PieceRequestOrderKey{
+               InfoHash: t.infoHash,
+               Index:    i,
+       }
+}
+
 // This seems to be all the follow-up tasks after info is set, that can't fail.
 func (t *Torrent) onSetInfo() {
+       t.initPieceRequestOrder()
        for i := range t.pieces {
                p := &t.pieces[i]
                // Need to add availability before updating piece completion, as that may result in conns
@@ -434,6 +446,7 @@ func (t *Torrent) onSetInfo() {
                        panic(p.availability)
                }
                p.availability = int64(t.pieceAvailabilityFromPeers(i))
+               t.addRequestOrderPiece(i)
                t.updatePieceCompletion(pieceIndex(i))
                if !t.initialPieceCheckDisabled && !p.storageCompletionOk {
                        // t.logger.Printf("piece %s completion unknown, queueing check", p)
@@ -443,7 +456,8 @@ func (t *Torrent) onSetInfo() {
        t.cl.event.Broadcast()
        close(t.gotMetainfoC)
        t.updateWantPeersEvent()
-       t.pendingRequests.Init(t.numRequests())
+       t.pendingRequests = make(map[RequestIndex]*Peer)
+       t.lastRequested = make(map[RequestIndex]time.Time)
        t.tryCreateMorePieceHashers()
        t.iterPeers(func(p *Peer) {
                p.onGotInfo(t.info)
@@ -816,6 +830,9 @@ func (t *Torrent) close(wg *sync.WaitGroup) (err error) {
        t.iterPeers(func(p *Peer) {
                p.close()
        })
+       if t.storage != nil {
+               t.deletePieceRequestOrder()
+       }
        t.pex.Reset()
        t.cl.event.Broadcast()
        t.pieceStateChanges.Close()
@@ -1082,9 +1099,9 @@ func (t *Torrent) maybeNewConns() {
 func (t *Torrent) piecePriorityChanged(piece pieceIndex, reason string) {
        if t._pendingPieces.Contains(uint32(piece)) {
                t.iterPeers(func(c *Peer) {
-                       if c.actualRequestState.Interested {
-                               return
-                       }
+                       // if c.requestState.Interested {
+                       //      return
+                       // }
                        if !c.isLowOnRequests() {
                                return
                        }
@@ -1102,6 +1119,11 @@ func (t *Torrent) piecePriorityChanged(piece pieceIndex, reason string) {
 }
 
 func (t *Torrent) updatePiecePriority(piece pieceIndex, reason string) {
+       if !t.closed.IsSet() {
+               // It would be possible to filter on pure-priority changes here to avoid churning the piece
+               // request order.
+               t.updatePieceRequestOrder(piece)
+       }
        p := &t.pieces[piece]
        newPrio := p.uncachedPriority()
        // t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio)
@@ -1238,6 +1260,7 @@ func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool {
        } else {
                t._completedPieces.Remove(x)
        }
+       p.t.updatePieceRequestOrder(piece)
        t.updateComplete()
        if complete && len(p.dirtiers) != 0 {
                t.logger.Printf("marked piece %v complete but still has dirtiers", piece)
@@ -1397,7 +1420,7 @@ func (t *Torrent) assertPendingRequests() {
        //      actual.m = make([]int, t.numRequests())
        // }
        // t.iterPeers(func(p *Peer) {
-       //      p.actualRequestState.Requests.Iterate(func(x uint32) bool {
+       //      p.requestState.Requests.Iterate(func(x uint32) bool {
        //              actual.Inc(x)
        //              return true
        //      })
@@ -2274,3 +2297,16 @@ func (t *Torrent) pieceRequestIndexOffset(piece pieceIndex) RequestIndex {
 func (t *Torrent) updateComplete() {
        t.Complete.SetBool(t.haveAllPieces())
 }
+
+func (t *Torrent) cancelRequest(r RequestIndex) *Peer {
+       p := t.pendingRequests[r]
+       if p != nil {
+               p.cancel(r)
+       }
+       delete(t.pendingRequests, r)
+       return p
+}
+
+func (t *Torrent) requestingPeer(r RequestIndex) *Peer {
+       return t.pendingRequests[r]
+}
index 85e121194d28c22c6d3180a6d8a45cd79d7a1c80..221aa53fe71cca0253759de957185a5021f43140 100644 (file)
@@ -49,17 +49,13 @@ func (ws *webseedPeer) writeInterested(interested bool) bool {
 }
 
 func (ws *webseedPeer) _cancel(r RequestIndex) bool {
-       active, ok := ws.activeRequests[ws.peer.t.requestIndexToRequest(r)]
-       if ok {
+       if active, ok := ws.activeRequests[ws.peer.t.requestIndexToRequest(r)]; ok {
                active.Cancel()
+               // The requester is running and will handle the result.
+               return true
        }
-       if !ws.peer.deleteRequest(r) {
-               panic("cancelled webseed request should exist")
-       }
-       if ws.peer.isLowOnRequests() {
-               ws.peer.updateRequests("webseedPeer._cancel")
-       }
-       return true
+       // There should be no requester handling this, so no further events will occur.
+       return false
 }
 
 func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec {
@@ -89,7 +85,7 @@ func (ws *webseedPeer) requester(i int) {
 start:
        for !ws.peer.closed.IsSet() {
                restart := false
-               ws.peer.actualRequestState.Requests.Iterate(func(x uint32) bool {
+               ws.peer.requestState.Requests.Iterate(func(x uint32) bool {
                        r := ws.peer.t.requestIndexToRequest(x)
                        if _, ok := ws.activeRequests[r]; ok {
                                return true
@@ -171,7 +167,9 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re
                        log.Printf("closing %v", ws)
                        ws.peer.close()
                }
-               ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r))
+               if !ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r)) {
+                       panic("invalid reject")
+               }
                return err
        }
        err = ws.peer.receiveChunk(&pp.Message{
@@ -187,7 +185,7 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re
 }
 
 func (me *webseedPeer) isLowOnRequests() bool {
-       return me.peer.actualRequestState.Requests.GetCardinality() < uint64(me.maxRequests)
+       return me.peer.requestState.Requests.GetCardinality() < uint64(me.maxRequests)
 }
 
 func (me *webseedPeer) peerPieces() *roaring.Bitmap {