This should bring in significant performance improvements that fix issues with the peer-requesting that existed from v1.34.0.
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/missinggo/v2/pproffd"
"github.com/anacrolix/sync"
+ request_strategy "github.com/anacrolix/torrent/request-strategy"
"github.com/davecgh/go-spew/spew"
"github.com/dustin/go-humanize"
"github.com/google/btree"
dopplegangerAddrs map[string]struct{}
badPeerIPs map[string]struct{}
torrents map[InfoHash]*Torrent
+ pieceRequestOrder map[interface{}]*request_strategy.PieceRequestOrder
acceptLimiter map[ipStr]int
dialRateLimiter *rate.Limiter
isLowOnRequests() bool
writeInterested(interested bool) bool
- // Neither of these return buffer room anymore, because they're currently both posted. There's
- // also PeerConn.writeBufferFull for when/where it matters.
- _cancel(RequestIndex) bool
+ // _cancel initiates cancellation of a request and returns acked if it expects the cancel to be
+ // handled by a follow-up event.
+ _cancel(RequestIndex) (acked bool)
_request(Request) bool
-
connectionFlags() string
onClose()
onGotInfo(*metainfo.Info)
// indexable with the memory space available.
type (
maxRequests = int
- requestState = request_strategy.PeerNextRequestState
+ requestState = request_strategy.PeerRequestState
)
type Peer struct {
// Stuff controlled by the local peer.
needRequestUpdate string
- actualRequestState requestState
+ requestState requestState
updateRequestsTimer *time.Timer
- cancelledRequests roaring.Bitmap
lastBecameInterested time.Time
priorInterest time.Duration
choking bool
piecesReceivedSinceLastRequestUpdate maxRequests
maxPiecesReceivedBetweenRequestUpdates maxRequests
- // Chunks that we might reasonably expect to receive from the peer. Due to
- // latency, buffering, and implementation differences, we may receive
- // chunks that are no longer in the set of requests actually want.
+ // Chunks that we might reasonably expect to receive from the peer. Due to latency, buffering,
+ // and implementation differences, we may receive chunks that are no longer in the set of
+ // requests actually want. This could use a roaring.BSI if the memory use becomes noticeable.
validReceiveChunks map[RequestIndex]int
// Indexed by metadata piece, set to true if posted and pending a
// response.
}
func (cn *Peer) expectingChunks() bool {
- if cn.actualRequestState.Requests.IsEmpty() {
+ if cn.requestState.Requests.IsEmpty() {
return false
}
- if !cn.actualRequestState.Interested {
+ if !cn.requestState.Interested {
return false
}
if !cn.peerChoking {
haveAllowedFastRequests := false
cn.peerAllowedFast.Iterate(func(i uint32) bool {
haveAllowedFastRequests = roaringBitmapRangeCardinality(
- &cn.actualRequestState.Requests,
+ &cn.requestState.Requests,
cn.t.pieceRequestIndexOffset(pieceIndex(i)),
cn.t.pieceRequestIndexOffset(pieceIndex(i+1)),
) == 0
func (cn *Peer) cumInterest() time.Duration {
ret := cn.priorInterest
- if cn.actualRequestState.Interested {
+ if cn.requestState.Interested {
ret += time.Since(cn.lastBecameInterested)
}
return ret
c := func(b byte) {
ret += string([]byte{b})
}
- if cn.actualRequestState.Interested {
+ if cn.requestState.Interested {
c('i')
}
if cn.choking {
func (cn *Peer) numRequestsByPiece() (ret map[pieceIndex]int) {
ret = make(map[pieceIndex]int)
- cn.actualRequestState.Requests.Iterate(func(x uint32) bool {
+ cn.requestState.Requests.Iterate(func(x uint32) bool {
ret[pieceIndex(x/cn.t.chunksPerRegularPiece())]++
return true
})
cn.totalExpectingTime(),
)
fmt.Fprintf(w,
- " %s completed, %d pieces touched, good chunks: %v/%v:%v reqq: %d-%v/(%d/%d):%d/%d, flags: %s, dr: %.1f KiB/s\n",
+ " %s completed, %d pieces touched, good chunks: %v/%v:%v reqq: %d+%v/(%d/%d):%d/%d, flags: %s, dr: %.1f KiB/s\n",
cn.completedString(),
len(cn.peerTouchedPieces),
&cn._stats.ChunksReadUseful,
&cn._stats.ChunksRead,
&cn._stats.ChunksWritten,
- cn.actualRequestState.Requests.GetCardinality(),
- cn.cancelledRequests.GetCardinality(),
+ cn.requestState.Requests.GetCardinality(),
+ cn.requestState.Cancelled.GetCardinality(),
cn.nominalMaxRequests(),
cn.PeerMaxRequests,
len(cn.peerRequests),
}
func (cn *Peer) setInterested(interested bool) bool {
- if cn.actualRequestState.Interested == interested {
+ if cn.requestState.Interested == interested {
return true
}
- cn.actualRequestState.Interested = interested
+ cn.requestState.Interested = interested
if interested {
cn.lastBecameInterested = time.Now()
} else if !cn.lastBecameInterested.IsZero() {
if cn.peerChoking && !cn.peerAllowedFast.Contains(bitmap.BitIndex(pi)) {
// This could occur if we made a request with the fast extension, and then got choked and
// haven't had the request rejected yet.
- if !cn.actualRequestState.Requests.Contains(r) {
+ if !cn.requestState.Requests.Contains(r) {
panic("peer choking and piece not allowed fast")
}
}
if err := cn.shouldRequest(r); err != nil {
panic(err)
}
- if cn.actualRequestState.Requests.Contains(r) {
+ if cn.requestState.Requests.Contains(r) {
return true, nil
}
- if maxRequests(cn.actualRequestState.Requests.GetCardinality()) >= cn.nominalMaxRequests() {
+ if maxRequests(cn.requestState.Requests.GetCardinality()) >= cn.nominalMaxRequests() {
return true, errors.New("too many outstanding requests")
}
- cn.actualRequestState.Requests.Add(r)
+ cn.requestState.Requests.Add(r)
if cn.validReceiveChunks == nil {
cn.validReceiveChunks = make(map[RequestIndex]int)
}
cn.validReceiveChunks[r]++
- cn.t.pendingRequests.Inc(r)
+ cn.t.pendingRequests[r] = cn
+ cn.t.lastRequested[r] = time.Now()
cn.updateExpectingChunks()
ppReq := cn.t.requestIndexToRequest(r)
for _, f := range cn.callbacks.SentRequest {
})
}
-func (me *Peer) cancel(r RequestIndex) bool {
- if !me.actualRequestState.Requests.Contains(r) {
- return true
+func (me *Peer) cancel(r RequestIndex) {
+ if !me.deleteRequest(r) {
+ panic("request not existing should have been guarded")
+ }
+ if me._cancel(r) {
+ if !me.requestState.Cancelled.CheckedAdd(r) {
+ panic("request already cancelled")
+ }
+ }
+ if me.isLowOnRequests() {
+ me.updateRequests("Peer.cancel")
}
- return me._cancel(r)
}
func (me *PeerConn) _cancel(r RequestIndex) bool {
- if me.cancelledRequests.Contains(r) {
- // Already cancelled and waiting for a response.
- return true
- }
+ me.write(makeCancelMessage(me.t.requestIndexToRequest(r)))
// Transmission does not send rejects for received cancels. See
// https://github.com/transmission/transmission/pull/2275.
- if me.fastEnabled() && !me.remoteIsTransmission() {
- me.cancelledRequests.Add(r)
- } else {
- if !me.deleteRequest(r) {
- panic("request not existing should have been guarded")
- }
- if me.isLowOnRequests() {
- me.updateRequests("Peer.cancel")
- }
- }
- return me.write(makeCancelMessage(me.t.requestIndexToRequest(r)))
+ return me.fastEnabled() && !me.remoteIsTransmission()
}
func (cn *PeerConn) fillWriteBuffer() {
c.deleteAllRequests()
} else {
// We don't decrement pending requests here, let's wait for the peer to either
- // reject or satisfy the outstanding requests. Additionally some peers may unchoke
+ // reject or satisfy the outstanding requests. Additionally, some peers may unchoke
// us and resume where they left off, we don't want to have piled on to those chunks
- // in the meanwhile. I think a peers ability to abuse this should be limited: they
+ // in the meanwhile. I think a peer's ability to abuse this should be limited: they
// could let us request a lot of stuff, then choke us and never reject, but they're
// only a single peer, our chunk balancing should smooth over this abuse.
}
c.peerChoking = true
- // We can now reset our interest. I think we do this after setting the flag in case the
- // peerImpl updates synchronously (webseeds?).
- if !c.actualRequestState.Requests.IsEmpty() {
- c.updateRequests("choked")
- }
c.updateExpectingChunks()
case pp.Unchoke:
if !c.peerChoking {
}
c.peerChoking = false
preservedCount := 0
- c.actualRequestState.Requests.Iterate(func(x uint32) bool {
+ c.requestState.Requests.Iterate(func(x uint32) bool {
if !c.peerAllowedFast.Contains(x / c.t.chunksPerRegularPiece()) {
preservedCount++
}
case pp.HaveNone:
err = c.peerSentHaveNone()
case pp.Reject:
- c.remoteRejectedRequest(c.t.requestIndexFromRequest(newRequestFromMessage(&msg)))
+ req := newRequestFromMessage(&msg)
+ if !c.remoteRejectedRequest(c.t.requestIndexFromRequest(req)) {
+ log.Printf("received invalid reject [request=%v, peer=%v]", req, c)
+ err = fmt.Errorf("received invalid reject [request=%v]", req)
+ }
case pp.AllowedFast:
torrent.Add("allowed fasts received", 1)
log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).SetLevel(log.Debug).Log(c.t.logger)
}
}
-func (c *Peer) remoteRejectedRequest(r RequestIndex) {
- if c.deleteRequest(r) {
- if c.isLowOnRequests() {
- c.updateRequests("Peer.remoteRejectedRequest")
- }
- c.decExpectedChunkReceive(r)
+// Returns true if it was valid to reject the request.
+func (c *Peer) remoteRejectedRequest(r RequestIndex) bool {
+ if !c.deleteRequest(r) && !c.requestState.Cancelled.CheckedRemove(r) {
+ return false
}
+ if c.isLowOnRequests() {
+ c.updateRequests("Peer.remoteRejectedRequest")
+ }
+ c.decExpectedChunkReceive(r)
+ return true
}
func (c *Peer) decExpectedChunkReceive(r RequestIndex) {
// The request needs to be deleted immediately to prevent cancels occurring asynchronously when
// have actually already received the piece, while we have the Client unlocked to write the data
// out.
- deletedRequest := false
+ intended := false
{
- if c.actualRequestState.Requests.Contains(req) {
+ if c.requestState.Requests.Contains(req) {
for _, f := range c.callbacks.ReceivedRequested {
f(PeerMessageEvent{c, msg})
}
}
// Request has been satisfied.
- if c.deleteRequest(req) {
- deletedRequest = true
+ if c.deleteRequest(req) || c.requestState.Cancelled.CheckedRemove(req) {
+ intended = true
if !c.peerChoking {
c._chunksReceivedWhileExpecting++
}
c.updateRequests("Peer.receiveChunk deleted request")
}
} else {
- chunksReceived.Add("unwanted", 1)
+ chunksReceived.Add("unintended", 1)
}
}
// Do we actually want this chunk?
if t.haveChunk(ppReq) {
// panic(fmt.Sprintf("%+v", ppReq))
- chunksReceived.Add("wasted", 1)
+ chunksReceived.Add("redundant", 1)
c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadWasted }))
return nil
}
c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
- if deletedRequest {
+ if intended {
c.piecesReceivedSinceLastRequestUpdate++
c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulIntendedData }))
}
piece.unpendChunkIndex(chunkIndexFromChunkSpec(ppReq.ChunkSpec, t.chunkSize))
// Cancel pending requests for this chunk from *other* peers.
- t.iterPeers(func(p *Peer) {
+ if p := t.pendingRequests[req]; p != nil {
if p == c {
- return
+ panic("should not be pending request from conn that just received it")
}
p.cancel(req)
- })
+ }
err := func() error {
cl.unlock()
return c.peerPieces().Intersects(&c.t._pendingPieces)
}
+// Returns true if an outstanding request is removed. Cancelled requests should be handled
+// separately.
func (c *Peer) deleteRequest(r RequestIndex) bool {
- if !c.actualRequestState.Requests.CheckedRemove(r) {
+ if !c.requestState.Requests.CheckedRemove(r) {
return false
}
- c.cancelledRequests.Remove(r)
for _, f := range c.callbacks.DeletedRequest {
f(PeerRequestEvent{c, c.t.requestIndexToRequest(r)})
}
c.updateExpectingChunks()
- c.t.pendingRequests.Dec(r)
+ if c.t.requestingPeer(r) != c {
+ panic("only one peer should have a given request at a time")
+ }
+ delete(c.t.pendingRequests, r)
+ delete(c.t.lastRequested, r)
return true
}
func (c *Peer) deleteAllRequests() {
- c.actualRequestState.Requests.Clone().Iterate(func(x uint32) bool {
- c.deleteRequest(x)
+ c.requestState.Requests.Clone().Iterate(func(x uint32) bool {
+ if !c.deleteRequest(x) {
+ panic("request should exist")
+ }
return true
})
- if !c.actualRequestState.Requests.IsEmpty() {
- panic(c.actualRequestState.Requests.GetCardinality())
+ if !c.requestState.Requests.IsEmpty() {
+ panic(c.requestState.Requests.GetCardinality())
}
}
}
func (pc *PeerConn) isLowOnRequests() bool {
- return pc.actualRequestState.Requests.IsEmpty()
+ return pc.requestState.Requests.IsEmpty() && pc.requestState.Cancelled.IsEmpty()
+}
+
+func (p *Peer) uncancelledRequests() uint64 {
+ return p.requestState.Requests.GetCardinality()
}
func (pc *PeerConn) remoteIsTransmission() bool {
func (p *Piece) unpendChunkIndex(i chunkIndexType) {
p.t.dirtyChunks.Add(p.requestIndexOffset() + i)
+ p.t.updatePieceRequestOrder(p.index)
p.readerCond.Broadcast()
}
func (p *Piece) pendChunkIndex(i RequestIndex) {
p.t.dirtyChunks.Remove(p.requestIndexOffset() + i)
+ p.t.updatePieceRequestOrder(p.index)
}
func (p *Piece) numChunks() chunkIndexType {
--- /dev/null
+package torrent
+
+import (
+ "github.com/anacrolix/torrent/metainfo"
+ request_strategy "github.com/anacrolix/torrent/request-strategy"
+ "github.com/anacrolix/torrent/storage"
+)
+
+type requestStrategyInput struct {
+ cl *Client
+ capFunc storage.TorrentCapacity
+}
+
+func (r requestStrategyInput) Torrent(ih metainfo.Hash) request_strategy.Torrent {
+ return requestStrategyTorrent{r.cl.torrents[ih]}
+}
+
+func (r requestStrategyInput) Capacity() (int64, bool) {
+ if r.capFunc == nil {
+ return 0, false
+ }
+ return (*r.capFunc)()
+}
+
+func (r requestStrategyInput) MaxUnverifiedBytes() int64 {
+ return r.cl.config.MaxUnverifiedBytes
+}
+
+var _ request_strategy.Input = requestStrategyInput{}
+
+// Returns what is necessary to run request_strategy.GetRequestablePieces for primaryTorrent.
+func (cl *Client) getRequestStrategyInput(primaryTorrent *Torrent) (input request_strategy.Input) {
+ return requestStrategyInput{
+ cl: cl,
+ capFunc: primaryTorrent.storage.Capacity,
+ }
+}
+
+func (t *Torrent) getRequestStrategyInput() request_strategy.Input {
+ return t.cl.getRequestStrategyInput(t)
+}
+
+type requestStrategyTorrent struct {
+ t *Torrent
+}
+
+func (r requestStrategyTorrent) Piece(i int) request_strategy.Piece {
+ return requestStrategyPiece{r.t, i}
+}
+
+func (r requestStrategyTorrent) ChunksPerPiece() uint32 {
+ return r.t.chunksPerRegularPiece()
+}
+
+func (r requestStrategyTorrent) PieceLength() int64 {
+ return r.t.info.PieceLength
+}
+
+var _ request_strategy.Torrent = requestStrategyTorrent{}
+
+type requestStrategyPiece struct {
+ t *Torrent
+ i pieceIndex
+}
+
+func (r requestStrategyPiece) Request() bool {
+ return !r.t.ignorePieceForRequests(r.i)
+}
+
+func (r requestStrategyPiece) NumPendingChunks() int {
+ return int(r.t.pieceNumPendingChunks(r.i))
+}
+
+var _ request_strategy.Piece = requestStrategyPiece{}
import (
"bytes"
- "sort"
- "sync"
+ "expvar"
"github.com/anacrolix/multiless"
+ "github.com/anacrolix/torrent/metainfo"
+ "github.com/google/btree"
"github.com/anacrolix/torrent/types"
)
ChunkSpec = types.ChunkSpec
)
-type ClientPieceOrder struct{}
-
-type filterTorrent struct {
- *Torrent
- unverifiedBytes int64
-}
-
-func sortFilterPieces(pieces []filterPiece) {
- sort.Slice(pieces, func(_i, _j int) bool {
- i := &pieces[_i]
- j := &pieces[_j]
- return multiless.New().Int(
- int(j.Priority), int(i.Priority),
- ).Bool(
- j.Partial, i.Partial,
- ).Int64(
- i.Availability, j.Availability,
- ).Int(
- i.index, j.index,
- ).Lazy(func() multiless.Computation {
- return multiless.New().Cmp(bytes.Compare(
- i.t.InfoHash[:],
- j.t.InfoHash[:],
- ))
- }).MustLess()
+type pieceOrderInput struct {
+ PieceRequestOrderState
+ PieceRequestOrderKey
+}
+
+func pieceOrderLess(i, j pieceOrderInput) multiless.Computation {
+ return multiless.New().Int(
+ int(j.Priority), int(i.Priority),
+ ).Bool(
+ j.Partial, i.Partial,
+ ).Int64(
+ i.Availability, j.Availability,
+ ).Int(
+ i.Index, j.Index,
+ ).Lazy(func() multiless.Computation {
+ return multiless.New().Cmp(bytes.Compare(
+ i.InfoHash[:],
+ j.InfoHash[:],
+ ))
})
}
-type requestsPeer struct {
- Peer
- nextState PeerNextRequestState
- requestablePiecesRemaining int
-}
-
-func (rp *requestsPeer) canFitRequest() bool {
- return int(rp.nextState.Requests.GetCardinality()) < rp.MaxRequests
-}
-
-func (rp *requestsPeer) addNextRequest(r RequestIndex) {
- if !rp.nextState.Requests.CheckedAdd(r) {
- panic("should only add once")
- }
-}
-
-type peersForPieceRequests struct {
- requestsInPiece int
- *requestsPeer
-}
-
-func (me *peersForPieceRequests) addNextRequest(r RequestIndex) {
- me.requestsPeer.addNextRequest(r)
- me.requestsInPiece++
-}
-
-type requestablePiece struct {
- index pieceIndex
- t *Torrent
- alwaysReallocate bool
- NumPendingChunks int
- IterPendingChunks ChunksIterFunc
-}
-
-func (p *requestablePiece) chunkIndexToRequestIndex(c ChunkIndex) RequestIndex {
- return p.t.ChunksPerPiece*uint32(p.index) + c
-}
-
-type filterPiece struct {
- t *filterTorrent
- index pieceIndex
- *Piece
-}
+var packageExpvarMap = expvar.NewMap("request-strategy")
// Calls f with requestable pieces in order.
-func GetRequestablePieces(input Input, f func(t *Torrent, p *Piece, pieceIndex int)) {
- maxPieces := 0
- for i := range input.Torrents {
- maxPieces += len(input.Torrents[i].Pieces)
- }
- pieces := make([]filterPiece, 0, maxPieces)
+func GetRequestablePieces(input Input, pro *PieceRequestOrder, f func(ih metainfo.Hash, pieceIndex int)) {
// Storage capacity left for this run, keyed by the storage capacity pointer on the storage
// TorrentImpl. A nil value means no capacity limit.
var storageLeft *int64
- if input.Capacity != nil {
- storageLeft = new(int64)
- *storageLeft = *input.Capacity
- }
- for _t := range input.Torrents {
- // TODO: We could do metainfo requests here.
- t := &filterTorrent{
- Torrent: &input.Torrents[_t],
- unverifiedBytes: 0,
- }
- for i := range t.Pieces {
- pieces = append(pieces, filterPiece{
- t: t,
- index: i,
- Piece: &t.Pieces[i],
- })
- }
+ if cap, ok := input.Capacity(); ok {
+ storageLeft = &cap
}
- sortFilterPieces(pieces)
var allTorrentsUnverifiedBytes int64
- for _, piece := range pieces {
- if left := storageLeft; left != nil {
- if *left < piece.Length {
- continue
+ pro.tree.Ascend(func(i btree.Item) bool {
+ _i := i.(*pieceRequestOrderItem)
+ ih := _i.key.InfoHash
+ var t Torrent = input.Torrent(ih)
+ var piece Piece = t.Piece(_i.key.Index)
+ pieceLength := t.PieceLength()
+ if storageLeft != nil {
+ if *storageLeft < pieceLength {
+ return false
}
- *left -= piece.Length
+ *storageLeft -= pieceLength
}
- if !piece.Request || piece.NumPendingChunks == 0 {
+ if !piece.Request() || piece.NumPendingChunks() == 0 {
// TODO: Clarify exactly what is verified. Stuff that's being hashed should be
// considered unverified and hold up further requests.
- continue
+ return true
}
- if piece.t.MaxUnverifiedBytes != 0 && piece.t.unverifiedBytes+piece.Length > piece.t.MaxUnverifiedBytes {
- continue
+ if input.MaxUnverifiedBytes() != 0 && allTorrentsUnverifiedBytes+pieceLength > input.MaxUnverifiedBytes() {
+ return true
}
- if input.MaxUnverifiedBytes != 0 && allTorrentsUnverifiedBytes+piece.Length > input.MaxUnverifiedBytes {
- continue
- }
- piece.t.unverifiedBytes += piece.Length
- allTorrentsUnverifiedBytes += piece.Length
- f(piece.t.Torrent, piece.Piece, piece.index)
- }
+ allTorrentsUnverifiedBytes += pieceLength
+ f(ih, _i.key.Index)
+ return true
+ })
return
}
-type Input struct {
- // This is all torrents that share the same capacity below (or likely a single torrent if there
- // is infinite capacity, since you could just run it separately for each Torrent if that's the
- // case).
- Torrents []Torrent
- // Must not be modified. Non-nil if capacity is not infinite, meaning that pieces of torrents
- // that share the same capacity key must be incorporated in piece ordering.
- Capacity *int64
+type Input interface {
+ Torrent(metainfo.Hash) Torrent
+ // Storage capacity, shared among all Torrents with the same storage.TorrentCapacity pointer in
+ // their storage.Torrent references.
+ Capacity() (cap int64, capped bool)
// Across all the Torrents. This might be partitioned by storage capacity key now.
- MaxUnverifiedBytes int64
-}
-
-// Checks that a sorted peersForPiece slice makes sense.
-func ensureValidSortedPeersForPieceRequests(peers *peersForPieceSorter) {
- if !sort.IsSorted(peers) {
- panic("not sorted")
- }
- peerMap := make(map[*peersForPieceRequests]struct{}, peers.Len())
- for _, p := range peers.peersForPiece {
- if _, ok := peerMap[p]; ok {
- panic(p)
- }
- peerMap[p] = struct{}{}
- }
-}
-
-var peersForPiecesPool sync.Pool
-
-func makePeersForPiece(cap int) []*peersForPieceRequests {
- got := peersForPiecesPool.Get()
- if got == nil {
- return make([]*peersForPieceRequests, 0, cap)
- }
- return got.([]*peersForPieceRequests)[:0]
-}
-
-type peersForPieceSorter struct {
- peersForPiece []*peersForPieceRequests
- req *RequestIndex
- p requestablePiece
-}
-
-func (me *peersForPieceSorter) Len() int {
- return len(me.peersForPiece)
-}
-
-func (me *peersForPieceSorter) Swap(i, j int) {
- me.peersForPiece[i], me.peersForPiece[j] = me.peersForPiece[j], me.peersForPiece[i]
-}
-
-func (me *peersForPieceSorter) Less(_i, _j int) bool {
- i := me.peersForPiece[_i]
- j := me.peersForPiece[_j]
- req := me.req
- p := &me.p
- byHasRequest := func() multiless.Computation {
- ml := multiless.New()
- if req != nil {
- iHas := i.nextState.Requests.Contains(*req)
- jHas := j.nextState.Requests.Contains(*req)
- ml = ml.Bool(jHas, iHas)
- }
- return ml
- }()
- ml := multiless.New()
- // We always "reallocate", that is force even striping amongst peers that are either on
- // the last piece they can contribute too, or for pieces marked for this behaviour.
- // Striping prevents starving peers of requests, and will always re-balance to the
- // fastest known peers.
- if !p.alwaysReallocate {
- ml = ml.Bool(
- j.requestablePiecesRemaining == 1,
- i.requestablePiecesRemaining == 1)
- }
- if p.alwaysReallocate || j.requestablePiecesRemaining == 1 {
- ml = ml.Int(
- i.requestsInPiece,
- j.requestsInPiece)
- } else {
- ml = ml.AndThen(byHasRequest)
- }
- ml = ml.Int(
- i.requestablePiecesRemaining,
- j.requestablePiecesRemaining,
- ).Float64(
- j.DownloadRate,
- i.DownloadRate,
- )
- if ml.Ok() {
- return ml.Less()
- }
- ml = ml.AndThen(byHasRequest)
- return ml.Int64(
- int64(j.Age), int64(i.Age),
- // TODO: Probably peer priority can come next
- ).Uintptr(
- i.Id.Uintptr(),
- j.Id.Uintptr(),
- ).MustLess()
-}
-
-func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
- peersForPiece := makePeersForPiece(len(peers))
- for _, peer := range peers {
- if !peer.canRequestPiece(p.index) {
- continue
- }
- if !peer.canFitRequest() {
- peer.requestablePiecesRemaining--
- continue
- }
- peersForPiece = append(peersForPiece, &peersForPieceRequests{
- requestsInPiece: 0,
- requestsPeer: peer,
- })
- }
- defer func() {
- for _, peer := range peersForPiece {
- peer.requestablePiecesRemaining--
- }
- peersForPiecesPool.Put(peersForPiece)
- }()
- peersForPieceSorter := peersForPieceSorter{
- peersForPiece: peersForPiece,
- p: p,
- }
- sortPeersForPiece := func(req *RequestIndex) {
- peersForPieceSorter.req = req
- sort.Sort(&peersForPieceSorter)
- // ensureValidSortedPeersForPieceRequests(&peersForPieceSorter)
- }
- // Chunks can be preassigned several times, if peers haven't been able to update their "actual"
- // with "next" request state before another request strategy run occurs.
- preallocated := make([][]*peersForPieceRequests, p.t.ChunksPerPiece)
- p.IterPendingChunks(func(spec ChunkIndex) {
- req := p.chunkIndexToRequestIndex(spec)
- for _, peer := range peersForPiece {
- if !peer.ExistingRequests.Contains(req) {
- continue
- }
- if !peer.canFitRequest() {
- continue
- }
- preallocated[spec] = append(preallocated[spec], peer)
- peer.addNextRequest(req)
- }
- })
- pendingChunksRemaining := int(p.NumPendingChunks)
- p.IterPendingChunks(func(chunk ChunkIndex) {
- if len(preallocated[chunk]) != 0 {
- return
- }
- req := p.chunkIndexToRequestIndex(chunk)
- defer func() { pendingChunksRemaining-- }()
- sortPeersForPiece(nil)
- for _, peer := range peersForPiece {
- if !peer.canFitRequest() {
- continue
- }
- if !peer.PieceAllowedFast.ContainsInt(p.index) {
- // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
- peer.nextState.Interested = true
- if peer.Choking {
- continue
- }
- }
- peer.addNextRequest(req)
- break
- }
- })
-chunk:
- for chunk, prePeers := range preallocated {
- if len(prePeers) == 0 {
- continue
- }
- pendingChunksRemaining--
- req := p.chunkIndexToRequestIndex(ChunkIndex(chunk))
- for _, pp := range prePeers {
- pp.requestsInPiece--
- }
- sortPeersForPiece(&req)
- for _, pp := range prePeers {
- pp.nextState.Requests.Remove(req)
- }
- for _, peer := range peersForPiece {
- if !peer.canFitRequest() {
- continue
- }
- if !peer.PieceAllowedFast.ContainsInt(p.index) {
- // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
- peer.nextState.Interested = true
- if peer.Choking {
- continue
- }
- }
- peer.addNextRequest(req)
- continue chunk
- }
- }
- if pendingChunksRemaining != 0 {
- panic(pendingChunksRemaining)
- }
+ MaxUnverifiedBytes() int64
}
+++ /dev/null
-package request_strategy
-
-import (
- "encoding/gob"
- "testing"
-
- "github.com/RoaringBitmap/roaring"
- qt "github.com/frankban/quicktest"
- "github.com/google/go-cmp/cmp"
-)
-
-func init() {
- gob.Register(chunkIterRange(0))
- gob.Register(sliceChunksIter{})
-}
-
-type chunkIterRange ChunkIndex
-
-func (me chunkIterRange) Iter(f func(ChunkIndex)) {
- for offset := ChunkIndex(0); offset < ChunkIndex(me); offset += 1 {
- f(offset)
- }
-}
-
-type sliceChunksIter []ChunkIndex
-
-func chunkIter(offsets ...ChunkIndex) ChunksIter {
- return sliceChunksIter(offsets)
-}
-
-func (offsets sliceChunksIter) Iter(f func(ChunkIndex)) {
- for _, offset := range offsets {
- f(offset)
- }
-}
-
-func requestSetFromSlice(rs ...RequestIndex) (ret roaring.Bitmap) {
- ret.AddMany(rs)
- return
-}
-
-func init() {
- gob.Register(intPeerId(0))
-}
-
-type intPeerId int
-
-func (i intPeerId) Uintptr() uintptr {
- return uintptr(i)
-}
-
-var hasAllRequests = func() (all roaring.Bitmap) {
- all.AddRange(0, roaring.MaxRange)
- return
-}()
-
-func checkNumRequestsAndInterest(c *qt.C, next PeerNextRequestState, num uint64, interest bool) {
- addressableBm := next.Requests
- c.Check(addressableBm.GetCardinality(), qt.ContentEquals, num)
- c.Check(next.Interested, qt.Equals, interest)
-}
-
-func checkResultsRequestsLen(t *testing.T, reqs roaring.Bitmap, l uint64) {
- qt.Check(t, reqs.GetCardinality(), qt.Equals, l)
-}
-
-var peerNextRequestStateChecker = qt.CmpEquals(
- cmp.Transformer(
- "bitmap",
- func(bm roaring.Bitmap) []uint32 {
- return bm.ToArray()
- }))
package request_strategy
import (
- "time"
-
"github.com/RoaringBitmap/roaring"
)
-type PeerNextRequestState struct {
+type PeerRequestState struct {
Interested bool
- Requests roaring.Bitmap
-}
-
-type PeerId interface {
- Uintptr() uintptr
-}
-
-type Peer struct {
- Pieces roaring.Bitmap
- MaxRequests int
- ExistingRequests roaring.Bitmap
- Choking bool
- PieceAllowedFast roaring.Bitmap
- DownloadRate float64
- Age time.Duration
- // This is passed back out at the end, so must support equality. Could be a type-param later.
- Id PeerId
-}
-
-// TODO: This might be used in more places I think.
-func (p *Peer) canRequestPiece(i pieceIndex) bool {
- return (!p.Choking || p.PieceAllowedFast.Contains(uint32(i))) && p.HasPiece(i)
-}
-
-func (p *Peer) HasPiece(i pieceIndex) bool {
- return p.Pieces.Contains(uint32(i))
+ // Expecting
+ Requests roaring.Bitmap
+ // Cancelled and waiting response
+ Cancelled roaring.Bitmap
}
--- /dev/null
+package request_strategy
+
+import (
+ "fmt"
+
+ "github.com/anacrolix/torrent/metainfo"
+ "github.com/google/btree"
+)
+
+func NewPieceOrder() *PieceRequestOrder {
+ return &PieceRequestOrder{
+ tree: btree.New(32),
+ keys: make(map[PieceRequestOrderKey]PieceRequestOrderState),
+ }
+}
+
+type PieceRequestOrder struct {
+ tree *btree.BTree
+ keys map[PieceRequestOrderKey]PieceRequestOrderState
+}
+
+type PieceRequestOrderKey struct {
+ InfoHash metainfo.Hash
+ Index int
+}
+
+type PieceRequestOrderState struct {
+ Priority piecePriority
+ Partial bool
+ Availability int64
+}
+
+type pieceRequestOrderItem struct {
+ key PieceRequestOrderKey
+ state PieceRequestOrderState
+}
+
+func (me *pieceRequestOrderItem) Less(other btree.Item) bool {
+ otherConcrete := other.(*pieceRequestOrderItem)
+ return pieceOrderLess(
+ pieceOrderInput{
+ PieceRequestOrderState: me.state,
+ PieceRequestOrderKey: me.key,
+ },
+ pieceOrderInput{
+ PieceRequestOrderState: otherConcrete.state,
+ PieceRequestOrderKey: otherConcrete.key,
+ },
+ ).Less()
+}
+
+func (me *PieceRequestOrder) Add(key PieceRequestOrderKey, state PieceRequestOrderState) {
+ if _, ok := me.keys[key]; ok {
+ panic(key)
+ }
+ if me.tree.ReplaceOrInsert(&pieceRequestOrderItem{
+ key: key,
+ state: state,
+ }) != nil {
+ panic("shouldn't already have this")
+ }
+ me.keys[key] = state
+}
+
+func (me *PieceRequestOrder) Update(key PieceRequestOrderKey, state PieceRequestOrderState) {
+ item := me.existingItemForKey(key)
+ if item.state == state {
+ return
+ }
+ if me.tree.Delete(&item) == nil {
+ panic(fmt.Sprintf("%#v", key))
+ }
+ item.state = state
+ if me.tree.ReplaceOrInsert(&item) != nil {
+ panic(key)
+ }
+ me.keys[key] = state
+}
+
+func (me *PieceRequestOrder) existingItemForKey(key PieceRequestOrderKey) pieceRequestOrderItem {
+ return pieceRequestOrderItem{
+ key: key,
+ state: me.keys[key],
+ }
+}
+
+func (me *PieceRequestOrder) Delete(key PieceRequestOrderKey) {
+ item := me.existingItemForKey(key)
+ if me.tree.Delete(&item) == nil {
+ panic(key)
+ }
+ delete(me.keys, key)
+ // log.Printf("deleting %#v", key)
+}
+
+func (me *PieceRequestOrder) Len() int {
+ return len(me.keys)
+}
Iter(func(ci ChunkIndex))
}
-type Piece struct {
- Request bool
- Priority piecePriority
- Partial bool
- Availability int64
- Length int64
- NumPendingChunks int
- IterPendingChunks ChunksIter
-}
-
-func (p Piece) iterPendingChunksWrapper(f func(ChunkIndex)) {
- i := p.IterPendingChunks
- if i != nil {
- i.Iter(f)
- }
+type Piece interface {
+ Request() bool
+ NumPendingChunks() int
}
package request_strategy
-import (
- "github.com/anacrolix/torrent/metainfo"
-)
-
-type Torrent struct {
- Pieces []Piece
- // Some value that's unique and stable between runs.
- InfoHash metainfo.Hash
- ChunksPerPiece uint32
- // TODO: This isn't actually configurable anywhere yet.
- MaxUnverifiedBytes int64
+type Torrent interface {
+ Piece(int) Piece
+ ChunksPerPiece() uint32
+ PieceLength() int64
}
"container/heap"
"context"
"encoding/gob"
- "math/rand"
"reflect"
"runtime/pprof"
"time"
request_strategy "github.com/anacrolix/torrent/request-strategy"
)
-// Returns what is necessary to run request_strategy.GetRequestablePieces for primaryTorrent.
-func (cl *Client) getRequestStrategyInput(primaryTorrent *Torrent) (input request_strategy.Input) {
- input.MaxUnverifiedBytes = cl.config.MaxUnverifiedBytes
- if !primaryTorrent.haveInfo() {
- return
- }
- if capFunc := primaryTorrent.storage.Capacity; capFunc != nil {
- if cap, ok := (*capFunc)(); ok {
- input.Capacity = &cap
- }
+func (t *Torrent) requestStrategyPieceOrderState(i int) request_strategy.PieceRequestOrderState {
+ return request_strategy.PieceRequestOrderState{
+ Priority: t.piece(i).purePriority(),
+ Partial: t.piecePartiallyDownloaded(i),
+ Availability: t.piece(i).availability,
}
- if input.Capacity == nil {
- input.Torrents = []request_strategy.Torrent{primaryTorrent.requestStrategyTorrentInput()}
- return
- }
- input.Torrents = make([]request_strategy.Torrent, 0, len(cl.torrents))
- for _, t := range cl.torrents {
- if !t.haveInfo() {
- // This would be removed if metadata is handled here. Determining chunks per piece
- // requires the info. If we have no info, we have no pieces too, so the end result is
- // the same.
- continue
- }
- if t.storage.Capacity != primaryTorrent.storage.Capacity {
- continue
- }
- input.Torrents = append(input.Torrents, t.requestStrategyTorrentInput())
- }
- return
-}
-
-func (t *Torrent) getRequestStrategyInput() request_strategy.Input {
- return t.cl.getRequestStrategyInput(t)
-}
-
-func (t *Torrent) requestStrategyTorrentInput() request_strategy.Torrent {
- rst := request_strategy.Torrent{
- InfoHash: t.infoHash,
- ChunksPerPiece: t.chunksPerRegularPiece(),
- }
- rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces))
- for i := range t.pieces {
- p := &t.pieces[i]
- rst.Pieces = append(rst.Pieces, request_strategy.Piece{
- Request: !t.ignorePieceForRequests(i),
- Priority: p.purePriority(),
- Partial: t.piecePartiallyDownloaded(i),
- Availability: p.availability,
- Length: int64(p.length()),
- NumPendingChunks: int(t.pieceNumPendingChunks(i)),
- IterPendingChunks: &p.undirtiedChunksIter,
- })
- }
- return rst
}
func init() {
)
type peerRequests struct {
- requestIndexes []RequestIndex
- peer *Peer
- torrentStrategyInput *request_strategy.Torrent
+ requestIndexes []RequestIndex
+ peer *Peer
}
func (p *peerRequests) Len() int {
leftRequest := p.requestIndexes[i]
rightRequest := p.requestIndexes[j]
t := p.peer.t
- leftPieceIndex := leftRequest / p.torrentStrategyInput.ChunksPerPiece
- rightPieceIndex := rightRequest / p.torrentStrategyInput.ChunksPerPiece
- leftCurrent := p.peer.actualRequestState.Requests.Contains(leftRequest)
- rightCurrent := p.peer.actualRequestState.Requests.Contains(rightRequest)
- pending := func(index RequestIndex, current bool) int {
- ret := t.pendingRequests.Get(index)
- if current {
- ret--
- }
- // See https://github.com/anacrolix/torrent/issues/679 for possible issues. This should be
- // resolved.
- if ret < 0 {
- panic(ret)
- }
- return ret
- }
+ leftPieceIndex := leftRequest / t.chunksPerRegularPiece()
+ rightPieceIndex := rightRequest / t.chunksPerRegularPiece()
ml := multiless.New()
// Push requests that can't be served right now to the end. But we don't throw them away unless
// there's a better alternative. This is for when we're using the fast extension and get choked
!p.peer.peerAllowedFast.Contains(rightPieceIndex),
)
}
+ leftPeer := t.pendingRequests[leftRequest]
+ rightPeer := t.pendingRequests[rightRequest]
+ ml = ml.Bool(rightPeer == p.peer, leftPeer == p.peer)
+ ml = ml.Bool(rightPeer == nil, leftPeer == nil)
+ if ml.Ok() {
+ return ml.MustLess()
+ }
+ if leftPeer != nil {
+ // The right peer should also be set, or we'd have resolved the computation by now.
+ ml = ml.Uint64(
+ rightPeer.requestState.Requests.GetCardinality(),
+ leftPeer.requestState.Requests.GetCardinality(),
+ )
+ // Could either of the lastRequested be Zero? That's what checking an existing peer is for.
+ leftLast := t.lastRequested[leftRequest]
+ rightLast := t.lastRequested[rightRequest]
+ if leftLast.IsZero() || rightLast.IsZero() {
+ panic("expected non-zero last requested times")
+ }
+ // We want the most-recently requested on the left. Clients like Transmission serve requests
+ // in received order, so the most recently-requested is the one that has the longest until
+ // it will be served and therefore is the best candidate to cancel.
+ ml = ml.CmpInt64(rightLast.Sub(leftLast).Nanoseconds())
+ }
+ leftPiece := t.piece(int(leftPieceIndex))
+ rightPiece := t.piece(int(rightPieceIndex))
ml = ml.Int(
- pending(leftRequest, leftCurrent),
- pending(rightRequest, rightCurrent))
- ml = ml.Bool(!leftCurrent, !rightCurrent)
- ml = ml.Int(
- -int(p.torrentStrategyInput.Pieces[leftPieceIndex].Priority),
- -int(p.torrentStrategyInput.Pieces[rightPieceIndex].Priority),
+ // Technically we would be happy with the cached priority here, except we don't actually
+ // cache it anymore, and Torrent.piecePriority just does another lookup of *Piece to resolve
+ // the priority through Piece.purePriority, which is probably slower.
+ -int(leftPiece.purePriority()),
+ -int(rightPiece.purePriority()),
)
ml = ml.Int(
- int(p.torrentStrategyInput.Pieces[leftPieceIndex].Availability),
- int(p.torrentStrategyInput.Pieces[rightPieceIndex].Availability))
- ml = ml.Uint32(leftPieceIndex, rightPieceIndex)
- ml = ml.Uint32(leftRequest, rightRequest)
- return ml.MustLess()
+ int(leftPiece.availability),
+ int(rightPiece.availability))
+ return ml.Less()
}
func (p *peerRequests) Swap(i, j int) {
}
type desiredRequestState struct {
- Requests []RequestIndex
+ Requests peerRequests
Interested bool
}
func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
+ if !p.t.haveInfo() {
+ return
+ }
input := p.t.getRequestStrategyInput()
requestHeap := peerRequests{
peer: p,
}
- for i := range input.Torrents {
- t := &input.Torrents[i]
- if t.InfoHash == p.t.infoHash {
- requestHeap.torrentStrategyInput = t
- break
- }
- }
request_strategy.GetRequestablePieces(
input,
- func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
- if t.InfoHash != p.t.infoHash {
+ p.t.getPieceRequestOrder(),
+ func(ih InfoHash, pieceIndex int) {
+ if ih != p.t.infoHash {
return
}
if !p.peerHasPiece(pieceIndex) {
return
}
allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex)
- rsp.IterPendingChunks.Iter(func(ci request_strategy.ChunkIndex) {
+ p.t.piece(pieceIndex).undirtiedChunksIter.Iter(func(ci request_strategy.ChunkIndex) {
r := p.t.pieceRequestIndexOffset(pieceIndex) + ci
- // if p.t.pendingRequests.Get(r) != 0 && !p.actualRequestState.Requests.Contains(r) {
+ // if p.t.pendingRequests.Get(r) != 0 && !p.requestState.Requests.Contains(r) {
// return
// }
if !allowedFast {
- // We must signal interest to request this
+ // We must signal interest to request this. TODO: We could set interested if the
+ // peers pieces (minus the allowed fast set) overlap with our missing pieces if
+ // there are any readers, or any pending pieces.
desired.Interested = true
// We can make or will allow sustaining a request here if we're not choked, or
// have made the request previously (presumably while unchoked), and haven't had
// the peer respond yet (and the request was retained because we are using the
// fast extension).
- if p.peerChoking && !p.actualRequestState.Requests.Contains(r) {
+ if p.peerChoking && !p.requestState.Requests.Contains(r) {
// We can't request this right now.
return
}
}
+ if p.requestState.Cancelled.Contains(r) {
+ // Can't re-request.
+ return
+ }
requestHeap.requestIndexes = append(requestHeap.requestIndexes, r)
})
},
)
p.t.assertPendingRequests()
- heap.Init(&requestHeap)
- for requestHeap.Len() != 0 && len(desired.Requests) < p.nominalMaxRequests() {
- requestIndex := heap.Pop(&requestHeap).(RequestIndex)
- desired.Requests = append(desired.Requests, requestIndex)
- }
+ desired.Requests = requestHeap
return
}
// Transmit/action the request state to the peer.
func (p *Peer) applyRequestState(next desiredRequestState) bool {
- current := &p.actualRequestState
+ current := &p.requestState
if !p.setInterested(next.Interested) {
return false
}
more := true
- cancel := current.Requests.Clone()
- for _, ri := range next.Requests {
- cancel.Remove(ri)
- }
- cancel.Iterate(func(req uint32) bool {
- more = p.cancel(req)
- return more
- })
- if !more {
- return false
- }
- shuffled := false
- lastPending := 0
- for i := 0; i < len(next.Requests); i++ {
- req := next.Requests[i]
- if p.cancelledRequests.Contains(req) {
- // Waiting for a reject or piece message, which will suitably trigger us to update our
- // requests, so we can skip this one with no additional consideration.
- continue
- }
- // The cardinality of our desired requests shouldn't exceed the max requests since it's used
- // in the calculation of the requests. However, if we cancelled requests and they haven't
- // been rejected or serviced yet with the fast extension enabled, we can end up with more
- // extra outstanding requests. We could subtract the number of outstanding cancels from the
- // next request cardinality, but peers might not like that.
- if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() {
- // log.Printf("not assigning all requests [desired=%v, cancelled=%v, current=%v, max=%v]",
- // next.Requests.GetCardinality(),
- // p.cancelledRequests.GetCardinality(),
- // current.Requests.GetCardinality(),
- // p.nominalMaxRequests(),
- // )
- break
- }
- otherPending := p.t.pendingRequests.Get(next.Requests[0])
- if p.actualRequestState.Requests.Contains(next.Requests[0]) {
- otherPending--
- }
- if otherPending < lastPending {
- // Pending should only rise. It's supposed to be the strongest ordering criteria. If it
- // doesn't, our shuffling condition could be wrong.
- panic(lastPending)
- }
- // If the request has already been requested by another peer, shuffle this and the rest of
- // the requests (since according to the increasing condition, the rest of the indices
- // already have an outstanding request with another peer).
- if !shuffled && otherPending > 0 {
- shuffleReqs := next.Requests[i:]
- rand.Shuffle(len(shuffleReqs), func(i, j int) {
- shuffleReqs[i], shuffleReqs[j] = shuffleReqs[j], shuffleReqs[i]
- })
- // log.Printf("shuffled reqs [%v:%v]", i, len(next.Requests))
- shuffled = true
- // Repeat this index
- i--
- continue
+ requestHeap := &next.Requests
+ t := p.t
+ heap.Init(requestHeap)
+ for requestHeap.Len() != 0 && maxRequests(current.Requests.GetCardinality()) < p.nominalMaxRequests() {
+ req := heap.Pop(requestHeap).(RequestIndex)
+ existing := t.requestingPeer(req)
+ if existing != nil && existing != p {
+ // Don't steal from the poor.
+ diff := int64(current.Requests.GetCardinality()) + 1 - (int64(existing.uncancelledRequests()) - 1)
+ // Steal a request that leaves us with one more request than the existing peer
+ // connection if the stealer more recently received a chunk.
+ if diff > 1 || (diff == 1 && p.lastUsefulChunkReceived.Before(existing.lastUsefulChunkReceived)) {
+ continue
+ }
+ t.cancelRequest(req)
}
-
more = p.mustRequest(req)
if !more {
break
}
}
+ // TODO: This may need to change, we might want to update even if there were no requests due to
+ // filtering them for being recently requested already.
p.updateRequestsTimer.Stop()
if more {
p.needRequestUpdate = ""
- if !current.Requests.IsEmpty() {
+ if current.Interested {
p.updateRequestsTimer.Reset(3 * time.Second)
}
}
--- /dev/null
+package torrent
+
+import (
+ request_strategy "github.com/anacrolix/torrent/request-strategy"
+)
+
+func (t *Torrent) updatePieceRequestOrder(pieceIndex int) {
+ t.cl.pieceRequestOrder[t.clientPieceRequestOrderKey()].Update(
+ t.pieceRequestOrderKey(pieceIndex),
+ t.requestStrategyPieceOrderState(pieceIndex))
+}
+
+func (t *Torrent) clientPieceRequestOrderKey() interface{} {
+ if t.storage.Capacity == nil {
+ return t
+ }
+ return t.storage.Capacity
+}
+
+func (t *Torrent) deletePieceRequestOrder() {
+ cpro := t.cl.pieceRequestOrder
+ key := t.clientPieceRequestOrderKey()
+ pro := cpro[key]
+ for i := 0; i < t.numPieces(); i++ {
+ pro.Delete(t.pieceRequestOrderKey(i))
+ }
+ if pro.Len() == 0 {
+ delete(cpro, key)
+ }
+}
+
+func (t *Torrent) initPieceRequestOrder() {
+ if t.cl.pieceRequestOrder == nil {
+ t.cl.pieceRequestOrder = make(map[interface{}]*request_strategy.PieceRequestOrder)
+ }
+ key := t.clientPieceRequestOrderKey()
+ cpro := t.cl.pieceRequestOrder
+ if cpro[key] == nil {
+ cpro[key] = request_strategy.NewPieceOrder()
+ }
+}
+
+func (t *Torrent) addRequestOrderPiece(i int) {
+ t.cl.pieceRequestOrder[t.clientPieceRequestOrderKey()].Add(
+ t.pieceRequestOrderKey(i),
+ t.requestStrategyPieceOrderState(i))
+}
+
+func (t *Torrent) getPieceRequestOrder() *request_strategy.PieceRequestOrder {
+ return t.cl.pieceRequestOrder[t.clientPieceRequestOrderKey()]
+}
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/multiless"
"github.com/anacrolix/sync"
+ request_strategy "github.com/anacrolix/torrent/request-strategy"
"github.com/davecgh/go-spew/spew"
"github.com/pion/datachannel"
initialPieceCheckDisabled bool
// Count of each request across active connections.
- pendingRequests pendingRequests
+ pendingRequests map[RequestIndex]*Peer
+ lastRequested map[RequestIndex]time.Time
// Chunks we've written to since the corresponding piece was last checked.
dirtyChunks roaring.Bitmap
panic(p.availability)
}
p.availability--
+ t.updatePieceRequestOrder(i)
}
func (t *Torrent) incPieceAvailability(i pieceIndex) {
if t.haveInfo() {
p := t.piece(i)
p.availability++
+ t.updatePieceRequestOrder(i)
}
}
return nil
}
+func (t *Torrent) pieceRequestOrderKey(i int) request_strategy.PieceRequestOrderKey {
+ return request_strategy.PieceRequestOrderKey{
+ InfoHash: t.infoHash,
+ Index: i,
+ }
+}
+
// This seems to be all the follow-up tasks after info is set, that can't fail.
func (t *Torrent) onSetInfo() {
+ t.initPieceRequestOrder()
for i := range t.pieces {
p := &t.pieces[i]
// Need to add availability before updating piece completion, as that may result in conns
panic(p.availability)
}
p.availability = int64(t.pieceAvailabilityFromPeers(i))
+ t.addRequestOrderPiece(i)
t.updatePieceCompletion(pieceIndex(i))
if !t.initialPieceCheckDisabled && !p.storageCompletionOk {
// t.logger.Printf("piece %s completion unknown, queueing check", p)
t.cl.event.Broadcast()
close(t.gotMetainfoC)
t.updateWantPeersEvent()
- t.pendingRequests.Init(t.numRequests())
+ t.pendingRequests = make(map[RequestIndex]*Peer)
+ t.lastRequested = make(map[RequestIndex]time.Time)
t.tryCreateMorePieceHashers()
t.iterPeers(func(p *Peer) {
p.onGotInfo(t.info)
t.iterPeers(func(p *Peer) {
p.close()
})
+ if t.storage != nil {
+ t.deletePieceRequestOrder()
+ }
t.pex.Reset()
t.cl.event.Broadcast()
t.pieceStateChanges.Close()
func (t *Torrent) piecePriorityChanged(piece pieceIndex, reason string) {
if t._pendingPieces.Contains(uint32(piece)) {
t.iterPeers(func(c *Peer) {
- if c.actualRequestState.Interested {
- return
- }
+ // if c.requestState.Interested {
+ // return
+ // }
if !c.isLowOnRequests() {
return
}
}
func (t *Torrent) updatePiecePriority(piece pieceIndex, reason string) {
+ if !t.closed.IsSet() {
+ // It would be possible to filter on pure-priority changes here to avoid churning the piece
+ // request order.
+ t.updatePieceRequestOrder(piece)
+ }
p := &t.pieces[piece]
newPrio := p.uncachedPriority()
// t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio)
} else {
t._completedPieces.Remove(x)
}
+ p.t.updatePieceRequestOrder(piece)
t.updateComplete()
if complete && len(p.dirtiers) != 0 {
t.logger.Printf("marked piece %v complete but still has dirtiers", piece)
// actual.m = make([]int, t.numRequests())
// }
// t.iterPeers(func(p *Peer) {
- // p.actualRequestState.Requests.Iterate(func(x uint32) bool {
+ // p.requestState.Requests.Iterate(func(x uint32) bool {
// actual.Inc(x)
// return true
// })
func (t *Torrent) updateComplete() {
t.Complete.SetBool(t.haveAllPieces())
}
+
+func (t *Torrent) cancelRequest(r RequestIndex) *Peer {
+ p := t.pendingRequests[r]
+ if p != nil {
+ p.cancel(r)
+ }
+ delete(t.pendingRequests, r)
+ return p
+}
+
+func (t *Torrent) requestingPeer(r RequestIndex) *Peer {
+ return t.pendingRequests[r]
+}
}
func (ws *webseedPeer) _cancel(r RequestIndex) bool {
- active, ok := ws.activeRequests[ws.peer.t.requestIndexToRequest(r)]
- if ok {
+ if active, ok := ws.activeRequests[ws.peer.t.requestIndexToRequest(r)]; ok {
active.Cancel()
+ // The requester is running and will handle the result.
+ return true
}
- if !ws.peer.deleteRequest(r) {
- panic("cancelled webseed request should exist")
- }
- if ws.peer.isLowOnRequests() {
- ws.peer.updateRequests("webseedPeer._cancel")
- }
- return true
+ // There should be no requester handling this, so no further events will occur.
+ return false
}
func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec {
start:
for !ws.peer.closed.IsSet() {
restart := false
- ws.peer.actualRequestState.Requests.Iterate(func(x uint32) bool {
+ ws.peer.requestState.Requests.Iterate(func(x uint32) bool {
r := ws.peer.t.requestIndexToRequest(x)
if _, ok := ws.activeRequests[r]; ok {
return true
log.Printf("closing %v", ws)
ws.peer.close()
}
- ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r))
+ if !ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r)) {
+ panic("invalid reject")
+ }
return err
}
err = ws.peer.receiveChunk(&pp.Message{
}
func (me *webseedPeer) isLowOnRequests() bool {
- return me.peer.actualRequestState.Requests.GetCardinality() < uint64(me.maxRequests)
+ return me.peer.requestState.Requests.GetCardinality() < uint64(me.maxRequests)
}
func (me *webseedPeer) peerPieces() *roaring.Bitmap {