func (p *Piece) unpendChunkIndex(i chunkIndexType) {
p.t.dirtyChunks.Add(p.requestIndexOffset() + i)
- p.t.updatePieceRequestOrder(p.index)
+ p.t.updatePieceRequestOrderPiece(p.index)
p.readerCond.Broadcast()
}
func (p *Piece) pendChunkIndex(i RequestIndex) {
p.t.dirtyChunks.Remove(p.requestIndexOffset() + i)
- p.t.updatePieceRequestOrder(p.index)
+ p.t.updatePieceRequestOrderPiece(p.index)
}
func (p *Piece) numChunks() chunkIndexType {
import (
g "github.com/anacrolix/generics"
+
"github.com/anacrolix/torrent/metainfo"
request_strategy "github.com/anacrolix/torrent/request-strategy"
"github.com/anacrolix/torrent/storage"
)
-type requestStrategyInput struct {
- cl *Client
- capFunc storage.TorrentCapacity
+type requestStrategyInputCommon struct {
+ maxUnverifiedBytes int64
}
-func (r requestStrategyInput) Torrent(ih metainfo.Hash) request_strategy.Torrent {
- return requestStrategyTorrent{g.MapMustGet(r.cl.torrents, ih)}
+func (r requestStrategyInputCommon) MaxUnverifiedBytes() int64 {
+ return r.maxUnverifiedBytes
}
-func (r requestStrategyInput) Capacity() (int64, bool) {
- if r.capFunc == nil {
- return 0, false
- }
+type requestStrategyInputMultiTorrent struct {
+ requestStrategyInputCommon
+ torrents map[metainfo.Hash]*Torrent
+ capFunc storage.TorrentCapacity
+}
+
+func (r requestStrategyInputMultiTorrent) Torrent(ih metainfo.Hash) request_strategy.Torrent {
+ return requestStrategyTorrent{g.MapMustGet(r.torrents, ih)}
+}
+
+func (r requestStrategyInputMultiTorrent) Capacity() (int64, bool) {
return (*r.capFunc)()
}
-func (r requestStrategyInput) MaxUnverifiedBytes() int64 {
- return r.cl.config.MaxUnverifiedBytes
+type requestStrategyInputSingleTorrent struct {
+ requestStrategyInputCommon
+ t *Torrent
}
-var _ request_strategy.Input = requestStrategyInput{}
+func (r requestStrategyInputSingleTorrent) Torrent(_ metainfo.Hash) request_strategy.Torrent {
+ return requestStrategyTorrent{r.t}
+}
+
+func (r requestStrategyInputSingleTorrent) Capacity() (cap int64, capped bool) {
+ return 0, false
+}
+
+var _ request_strategy.Input = requestStrategyInputSingleTorrent{}
+
+func (cl *Client) getRequestStrategyInputCommon() requestStrategyInputCommon {
+ return requestStrategyInputCommon{cl.config.MaxUnverifiedBytes}
+}
// Returns what is necessary to run request_strategy.GetRequestablePieces for primaryTorrent.
func (cl *Client) getRequestStrategyInput(primaryTorrent *Torrent) (input request_strategy.Input) {
- return requestStrategyInput{
- cl: cl,
- capFunc: primaryTorrent.storage.Capacity,
+ if primaryTorrent.storage.Capacity == nil {
+ return requestStrategyInputSingleTorrent{
+ requestStrategyInputCommon: cl.getRequestStrategyInputCommon(),
+ t: primaryTorrent,
+ }
+ } else {
+ return requestStrategyInputMultiTorrent{
+ requestStrategyInputCommon: cl.getRequestStrategyInputCommon(),
+ torrents: cl.torrents,
+ capFunc: primaryTorrent.storage.Capacity,
+ }
}
}
"runtime"
"testing"
+ "github.com/anacrolix/missinggo/v2/iter"
"github.com/davecgh/go-spew/spew"
qt "github.com/frankban/quicktest"
+ "github.com/anacrolix/torrent/metainfo"
request_strategy "github.com/anacrolix/torrent/request-strategy"
+ "github.com/anacrolix/torrent/storage"
)
func makeRequestStrategyPiece(t request_strategy.Torrent) request_strategy.Piece {
// We have to use p, or it gets optimized away.
spew.Fdump(io.Discard, p)
}
+
+type storagePiece struct {
+ complete bool
+}
+
+func (s storagePiece) ReadAt(p []byte, off int64) (n int, err error) {
+ //TODO implement me
+ panic("implement me")
+}
+
+func (s storagePiece) WriteAt(p []byte, off int64) (n int, err error) {
+ //TODO implement me
+ panic("implement me")
+}
+
+func (s storagePiece) MarkComplete() error {
+ //TODO implement me
+ panic("implement me")
+}
+
+func (s storagePiece) MarkNotComplete() error {
+ //TODO implement me
+ panic("implement me")
+}
+
+func (s storagePiece) Completion() storage.Completion {
+ return storage.Completion{Ok: true, Complete: s.complete}
+}
+
+var _ storage.PieceImpl = storagePiece{}
+
+type storageClient struct {
+ completed int
+}
+
+func (s *storageClient) OpenTorrent(
+ info *metainfo.Info,
+ infoHash metainfo.Hash,
+) (storage.TorrentImpl, error) {
+ return storage.TorrentImpl{
+ Piece: func(p metainfo.Piece) storage.PieceImpl {
+ return storagePiece{complete: p.Index() < s.completed}
+ },
+ }, nil
+}
+
+func BenchmarkRequestStrategy(b *testing.B) {
+ c := qt.New(b)
+ cl := newTestingClient(b)
+ storageClient := storageClient{}
+ tor, new := cl.AddTorrentOpt(AddTorrentOpts{
+ Storage: &storageClient,
+ })
+ tor.disableTriggers = true
+ c.Assert(new, qt.IsTrue)
+ const pieceLength = 1 << 8 << 10
+ const numPieces = 30_000
+ err := tor.setInfo(&metainfo.Info{
+ Pieces: make([]byte, numPieces*metainfo.HashSize),
+ PieceLength: pieceLength,
+ Length: pieceLength * numPieces,
+ })
+ c.Assert(err, qt.IsNil)
+ tor.onSetInfo()
+ peer := cl.newConnection(nil, newConnectionOpts{
+ network: "test",
+ })
+ peer.setTorrent(tor)
+ c.Assert(tor.storage, qt.IsNotNil)
+ const chunkSize = defaultChunkSize
+ peer.onPeerHasAllPiecesNoTriggers()
+ for i := 0; i < tor.numPieces(); i++ {
+ tor.pieces[i].priority.Raise(PiecePriorityNormal)
+ tor.updatePiecePriorityNoTriggers(i)
+ }
+ peer.peerChoking = false
+ //b.StopTimer()
+ b.ResetTimer()
+ //b.ReportAllocs()
+ for _ = range iter.N(b.N) {
+ storageClient.completed = 0
+ for pieceIndex := range iter.N(numPieces) {
+ tor.updatePieceCompletion(pieceIndex)
+ }
+ for completed := 0; completed <= numPieces; completed += 1 {
+ storageClient.completed = completed
+ if completed > 0 {
+ tor.updatePieceCompletion(completed - 1)
+ }
+ // Starting and stopping timers around this part causes lots of GC overhead.
+ rs := peer.getDesiredRequestState()
+ tor.cacheNextRequestIndexesForReuse(rs.Requests.requestIndexes)
+ // End of part that should be timed.
+ remainingChunks := (numPieces - completed) * (pieceLength / chunkSize)
+ c.Assert(rs.Requests.requestIndexes, qt.HasLen, minInt(
+ remainingChunks,
+ int(cl.config.MaxUnverifiedBytes/chunkSize)))
+ }
+ }
+}
return true
}
if input.MaxUnverifiedBytes() != 0 && allTorrentsUnverifiedBytes+pieceLength > input.MaxUnverifiedBytes() {
- return true
+ return false
}
allTorrentsUnverifiedBytes += pieceLength
f(ih, _i.key.Index, _i.state)
package requestStrategy
-import "github.com/anacrolix/torrent/metainfo"
+import (
+ g "github.com/anacrolix/generics"
+ "github.com/anacrolix/torrent/metainfo"
+)
type Btree interface {
Delete(pieceRequestOrderItem)
}
type PieceRequestOrderKey struct {
- InfoHash metainfo.Hash
Index int
+ InfoHash metainfo.Hash
}
type PieceRequestOrderState struct {
+ Availability int
Priority piecePriority
Partial bool
- Availability int
}
type pieceRequestOrderItem struct {
return pieceOrderLess(me, otherConcrete).Less()
}
-func (me *PieceRequestOrder) Add(key PieceRequestOrderKey, state PieceRequestOrderState) {
- if _, ok := me.keys[key]; ok {
- panic(key)
+// Returns the old state if the key was already present.
+func (me *PieceRequestOrder) Add(
+ key PieceRequestOrderKey,
+ state PieceRequestOrderState,
+) (old g.Option[PieceRequestOrderState]) {
+ if old.Value, old.Ok = me.keys[key]; old.Ok {
+ if state == old.Value {
+ return
+ }
+ me.tree.Delete(pieceRequestOrderItem{key, old.Value})
}
me.tree.Add(pieceRequestOrderItem{key, state})
me.keys[key] = state
+ return
}
func (me *PieceRequestOrder) Update(
key PieceRequestOrderKey,
state PieceRequestOrderState,
) {
- oldState, ok := me.keys[key]
- if !ok {
+ if !me.Add(key, state).Ok {
panic("key should have been added already")
}
- if state == oldState {
- return
- }
- me.tree.Delete(pieceRequestOrderItem{key, oldState})
- me.tree.Add(pieceRequestOrderItem{key, state})
- me.keys[key] = state
}
func (me *PieceRequestOrder) existingItemForKey(key PieceRequestOrderKey) pieceRequestOrderItem {
}
}
-func (me *PieceRequestOrder) Delete(key PieceRequestOrderKey) {
- me.tree.Delete(pieceRequestOrderItem{key, me.keys[key]})
+func (me *PieceRequestOrder) Delete(key PieceRequestOrderKey) bool {
+ state, ok := me.keys[key]
+ if !ok {
+ return false
+ }
+ me.tree.Delete(pieceRequestOrderItem{key, state})
delete(me.keys, key)
+ return true
}
func (me *PieceRequestOrder) Len() int {
for range iter.N(b.N) {
pro := NewPieceOrder(newBtree(), numPieces)
state := PieceRequestOrderState{}
- doPieces := func(m func(PieceRequestOrderKey)) {
+ doPieces := func(m func(PieceRequestOrderKey) bool) {
for i := range iter.N(numPieces) {
key := PieceRequestOrderKey{
Index: i,
m(key)
}
}
- doPieces(func(key PieceRequestOrderKey) {
- pro.Add(key, state)
+ doPieces(func(key PieceRequestOrderKey) bool {
+ return !pro.Add(key, state).Ok
})
state.Availability++
- doPieces(func(key PieceRequestOrderKey) {
+ doPieces(func(key PieceRequestOrderKey) bool {
pro.Update(key, state)
+ return true
})
pro.tree.Scan(func(item pieceRequestOrderItem) bool {
return true
})
- doPieces(func(key PieceRequestOrderKey) {
+ doPieces(func(key PieceRequestOrderKey) bool {
state.Priority = piecePriority(key.Index / 4)
pro.Update(key, state)
+ return true
})
pro.tree.Scan(func(item pieceRequestOrderItem) bool {
return item.key.Index < 1000
})
state.Priority = 0
state.Availability++
- doPieces(func(key PieceRequestOrderKey) {
+ doPieces(func(key PieceRequestOrderKey) bool {
pro.Update(key, state)
+ return true
})
pro.tree.Scan(func(item pieceRequestOrderItem) bool {
return item.key.Index < 1000
})
state.Availability--
- doPieces(func(key PieceRequestOrderKey) {
+ doPieces(func(key PieceRequestOrderKey) bool {
pro.Update(key, state)
+ return true
})
doPieces(pro.Delete)
if pro.Len() != 0 {
return
}
}
- if p.requestState.Cancelled.Contains(r) {
+ cancelled := &p.requestState.Cancelled
+ if !cancelled.IsEmpty() && cancelled.Contains(r) {
// Can't re-request while awaiting acknowledgement.
return
}
func(_ context.Context) {
next := p.getDesiredRequestState()
p.applyRequestState(next)
- p.t.requestIndexes = next.Requests.requestIndexes[:0]
+ p.t.cacheNextRequestIndexesForReuse(next.Requests.requestIndexes)
},
)
}
+func (t *Torrent) cacheNextRequestIndexesForReuse(slice []RequestIndex) {
+ // The incoming slice can be smaller when getDesiredRequestState short circuits on some
+ // conditions.
+ if cap(slice) > cap(t.requestIndexes) {
+ t.requestIndexes = slice[:0]
+ }
+}
+
// Whether we should allow sending not interested ("losing interest") to the peer. I noticed
// qBitTorrent seems to punish us for sending not interested when we're streaming and don't
// currently need anything.
return
}
more := true
- requestHeap := heap.InterfaceForSlice(&next.Requests.requestIndexes, next.Requests.lessByValue)
+ orig := next.Requests.requestIndexes
+ requestHeap := heap.InterfaceForSlice(
+ &next.Requests.requestIndexes,
+ next.Requests.lessByValue,
+ )
heap.Init(requestHeap)
t := p.t
break
}
req := heap.Pop(requestHeap)
+ if cap(next.Requests.requestIndexes) != cap(orig) {
+ panic("changed")
+ }
existing := t.requestingPeer(req)
if existing != nil && existing != p {
// Don't steal from the poor.
package torrent
import (
+ g "github.com/anacrolix/generics"
request_strategy "github.com/anacrolix/torrent/request-strategy"
)
-func (t *Torrent) updatePieceRequestOrder(pieceIndex int) {
+func (t *Torrent) updatePieceRequestOrderPiece(pieceIndex int) {
if t.storage == nil {
return
}
- if ro, ok := t.cl.pieceRequestOrder[t.clientPieceRequestOrderKey()]; ok {
- ro.Update(
- t.pieceRequestOrderKey(pieceIndex),
- t.requestStrategyPieceOrderState(pieceIndex))
+ pro, ok := t.cl.pieceRequestOrder[t.clientPieceRequestOrderKey()]
+ if !ok {
+ return
+ }
+ key := t.pieceRequestOrderKey(pieceIndex)
+ if t.hasStorageCap() {
+ pro.Update(key, t.requestStrategyPieceOrderState(pieceIndex))
+ return
+ }
+ pending := !t.ignorePieceForRequests(pieceIndex)
+ if pending {
+ pro.Add(key, t.requestStrategyPieceOrderState(pieceIndex))
+ } else {
+ pro.Delete(key)
}
}
if t.storage == nil {
return
}
- if t.cl.pieceRequestOrder == nil {
- t.cl.pieceRequestOrder = make(map[interface{}]*request_strategy.PieceRequestOrder)
- }
+ g.MakeMapIfNil(&t.cl.pieceRequestOrder)
key := t.clientPieceRequestOrderKey()
cpro := t.cl.pieceRequestOrder
if cpro[key] == nil {
if t.storage == nil {
return
}
- t.cl.pieceRequestOrder[t.clientPieceRequestOrderKey()].Add(
- t.pieceRequestOrderKey(i),
- t.requestStrategyPieceOrderState(i))
+ pro := t.getPieceRequestOrder()
+ key := t.pieceRequestOrderKey(i)
+ if t.hasStorageCap() || !t.ignorePieceForRequests(i) {
+ pro.Add(key, t.requestStrategyPieceOrderState(i))
+ }
}
func (t *Torrent) getPieceRequestOrder() *request_strategy.PieceRequestOrder {
// Large allocations reused between request state updates.
requestPieceStates []request_strategy.PieceRequestOrderState
requestIndexes []RequestIndex
+
+ disableTriggers bool
}
type outgoingConnAttemptKey = *PeerInfo
panic(p.relativeAvailability)
}
p.relativeAvailability--
- t.updatePieceRequestOrder(i)
+ t.updatePieceRequestOrderPiece(i)
}
func (t *Torrent) incPieceAvailability(i pieceIndex) {
if t.haveInfo() {
p := t.piece(i)
p.relativeAvailability++
- t.updatePieceRequestOrder(i)
+ t.updatePieceRequestOrderPiece(i)
}
}
}
func (t *Torrent) wantPieceIndex(index pieceIndex) bool {
- return t._pendingPieces.Contains(uint32(index))
+ return !t._pendingPieces.IsEmpty() && t._pendingPieces.Contains(uint32(index))
}
// A pool of []*PeerConn, to reduce allocations in functions that need to index or sort Torrent
if !t.closed.IsSet() {
// It would be possible to filter on pure-priority changes here to avoid churning the piece
// request order.
- t.updatePieceRequestOrder(piece)
+ t.updatePieceRequestOrderPiece(piece)
}
p := &t.pieces[piece]
newPrio := p.uncachedPriority()
}
func (t *Torrent) updatePiecePriority(piece pieceIndex, reason string) {
- if t.updatePiecePriorityNoTriggers(piece) {
+ if t.updatePiecePriorityNoTriggers(piece) && !t.disableTriggers {
t.onPiecePendingTriggers(piece, reason)
}
+ t.updatePieceRequestOrderPiece(piece)
}
func (t *Torrent) updateAllPiecePriorities(reason string) {
} else {
t._completedPieces.Remove(x)
}
- p.t.updatePieceRequestOrder(piece)
+ p.t.updatePieceRequestOrderPiece(piece)
t.updateComplete()
if complete && len(p.dirtiers) != 0 {
t.logger.Printf("marked piece %v complete but still has dirtiers", piece)
}
if changed {
- t.logger.Levelf(log.Debug, "piece %d completion changed: %+v -> %+v", piece, cached, uncached)
+ //slog.Debug(
+ // "piece completion changed",
+ // slog.Int("piece", piece),
+ // slog.Any("from", cached),
+ // slog.Any("to", uncached))
t.pieceCompletionChanged(piece, "Torrent.updatePieceCompletion")
}
return changed
package torrent
import (
- "github.com/anacrolix/torrent/typed-roaring"
+ typedRoaring "github.com/anacrolix/torrent/typed-roaring"
)
-func iterBitmapUnsetInRange[T typedRoaring.BitConstraint](it *typedRoaring.Iterator[T], start, end T, f func(T)) {
+func iterBitmapUnsetInRange[T typedRoaring.BitConstraint](
+ it *typedRoaring.Iterator[T],
+ start, end T,
+ f func(T),
+) {
it.AdvanceIfNeeded(start)
lastDirty := start - 1
for it.HasNext() {