This is in preparation to support encoding request strategy run inputs for benchmarking.
peerMinPieces pieceIndex
// Pieces we've accepted chunks for from the peer.
peerTouchedPieces map[pieceIndex]struct{}
- peerAllowedFast bitmap.Bitmap
+ peerAllowedFast roaring.Bitmap
PeerMaxRequests maxRequests // Maximum pending requests the peer allows.
PeerExtensionIDs map[pp.ExtensionName]pp.ExtensionNumber
if !cn.actualRequestState.Interested {
return false
}
- if cn.peerAllowedFast.IterTyped(func(i int) bool {
- return roaringBitmapRangeCardinality(
- &cn.actualRequestState.Requests,
- cn.t.pieceRequestIndexOffset(i),
- cn.t.pieceRequestIndexOffset(i+1),
- ) == 0
- }) {
+ if !cn.peerChoking {
return true
}
- return !cn.peerChoking
+ haveAllowedFastRequests := false
+ cn.peerAllowedFast.Iterate(func(i uint32) bool {
+ haveAllowedFastRequests = roaringBitmapRangeCardinality(
+ &cn.actualRequestState.Requests,
+ cn.t.pieceRequestIndexOffset(pieceIndex(i)),
+ cn.t.pieceRequestIndexOffset(pieceIndex(i+1)),
+ ) == 0
+ return !haveAllowedFastRequests
+ })
+ return haveAllowedFastRequests
}
func (cn *Peer) remoteChokingPiece(piece pieceIndex) bool {
}
c.decExpectedChunkReceive(req)
- if c.peerChoking && c.peerAllowedFast.Get(bitmap.BitIndex(ppReq.Index)) {
+ if c.peerChoking && c.peerAllowedFast.Contains(bitmap.BitIndex(ppReq.Index)) {
chunksReceived.Add("due to allowed fast", 1)
}
package torrent
import (
+ "encoding/gob"
"fmt"
"sync"
+ "github.com/RoaringBitmap/roaring"
"github.com/anacrolix/chansync"
"github.com/anacrolix/missinggo/v2/bitmap"
+ request_strategy "github.com/anacrolix/torrent/request-strategy"
"github.com/anacrolix/torrent/metainfo"
pp "github.com/anacrolix/torrent/peer_protocol"
return p.t.PieceState(p.index)
}
-func (p *Piece) iterUndirtiedChunks(f func(chunkIndexType)) {
- // Use an iterator to jump between dirty bits.
- if true {
- it := p.t.dirtyChunks.Iterator()
- startIndex := p.requestIndexOffset()
- endIndex := startIndex + p.numChunks()
- it.AdvanceIfNeeded(startIndex)
- lastDirty := startIndex - 1
- for it.HasNext() {
- next := it.Next()
- if next >= endIndex {
- break
- }
- for index := lastDirty + 1; index < next; index++ {
- f(index - startIndex)
- }
- lastDirty = next
+func init() {
+ gob.Register(undirtiedChunksIter{})
+}
+
+type undirtiedChunksIter struct {
+ TorrentDirtyChunks *roaring.Bitmap
+ StartRequestIndex RequestIndex
+ EndRequestIndex RequestIndex
+}
+
+func (me undirtiedChunksIter) Iter(f func(chunkIndexType)) {
+ it := me.TorrentDirtyChunks.Iterator()
+ startIndex := me.StartRequestIndex
+ endIndex := me.EndRequestIndex
+ it.AdvanceIfNeeded(startIndex)
+ lastDirty := startIndex - 1
+ for it.HasNext() {
+ next := it.Next()
+ if next >= endIndex {
+ break
}
- for index := lastDirty + 1; index < endIndex; index++ {
+ for index := lastDirty + 1; index < next; index++ {
f(index - startIndex)
}
+ lastDirty = next
+ }
+ for index := lastDirty + 1; index < endIndex; index++ {
+ f(index - startIndex)
+ }
+ return
+}
+
+func (p *Piece) undirtiedChunksIter() request_strategy.ChunksIter {
+ // Use an iterator to jump between dirty bits.
+ return undirtiedChunksIter{
+ TorrentDirtyChunks: &p.t.dirtyChunks,
+ StartRequestIndex: p.requestIndexOffset(),
+ EndRequestIndex: p.requestIndexOffset() + p.numChunks(),
+ }
+}
+
+func (p *Piece) iterUndirtiedChunks(f func(chunkIndexType)) {
+ if true {
+ p.undirtiedChunksIter().Iter(f)
return
}
// The original implementation.
t *Torrent
alwaysReallocate bool
NumPendingChunks int
- IterPendingChunks ChunksIter
+ IterPendingChunks ChunksIterFunc
}
func (p *requestablePiece) chunkIndexToRequestIndex(c ChunkIndex) RequestIndex {
p.IterPendingChunks(func(spec ChunkIndex) {
req := p.chunkIndexToRequestIndex(spec)
for _, peer := range peersForPiece {
- if h := peer.HasExistingRequest; h == nil || !h(req) {
+ if !peer.ExistingRequests.Contains(req) {
continue
}
if !peer.canFitRequest() {
if !peer.canFitRequest() {
continue
}
- if !peer.pieceAllowedFastOrDefault(p.index) {
+ if !peer.PieceAllowedFast.ContainsInt(p.index) {
// TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
peer.nextState.Interested = true
if peer.Choking {
if !peer.canFitRequest() {
continue
}
- if !peer.pieceAllowedFastOrDefault(p.index) {
+ if !peer.PieceAllowedFast.ContainsInt(p.index) {
// TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
peer.nextState.Interested = true
if peer.Choking {
package request_strategy
import (
+ "encoding/gob"
"math"
"testing"
"github.com/google/go-cmp/cmp"
)
-func chunkIterRange(end ChunkIndex) ChunksIter {
- return func(f func(ChunkIndex)) {
- for offset := ChunkIndex(0); offset < end; offset += 1 {
- f(offset)
- }
+func init() {
+ gob.Register(chunkIterRange(0))
+ gob.Register(sliceChunksIter{})
+}
+
+type chunkIterRange ChunkIndex
+
+func (me chunkIterRange) Iter(f func(ChunkIndex)) {
+ for offset := ChunkIndex(0); offset < ChunkIndex(me); offset += 1 {
+ f(offset)
}
}
+type sliceChunksIter []ChunkIndex
+
func chunkIter(offsets ...ChunkIndex) ChunksIter {
- return func(f func(ChunkIndex)) {
- for _, offset := range offsets {
- f(offset)
- }
+ return sliceChunksIter(offsets)
+}
+
+func (offsets sliceChunksIter) Iter(f func(ChunkIndex)) {
+ for _, offset := range offsets {
+ f(offset)
}
}
return
}
+func init() {
+ gob.Register(intPeerId(0))
+}
+
type intPeerId int
func (i intPeerId) Uintptr() uintptr {
return uintptr(i)
}
-func hasAllRequests(RequestIndex) bool { return true }
+var hasAllRequests = func() (all roaring.Bitmap) {
+ all.AddRange(0, roaring.MaxRange)
+ return
+}()
func TestStealingFromSlowerPeer(t *testing.T) {
c := qt.New(t)
basePeer := Peer{
- HasPiece: func(i pieceIndex) bool {
- return true
- },
MaxRequests: math.MaxInt16,
DownloadRate: 2,
}
+ basePeer.Pieces.Add(0)
// Slower than the stealers, but has all requests already.
stealee := basePeer
stealee.DownloadRate = 1
- stealee.HasExistingRequest = hasAllRequests
+ stealee.ExistingRequests = hasAllRequests
stealee.Id = intPeerId(1)
firstStealer := basePeer
firstStealer.Id = intPeerId(2)
func TestStealingFromSlowerPeersBasic(t *testing.T) {
c := qt.New(t)
basePeer := Peer{
- HasPiece: func(i pieceIndex) bool {
- return true
- },
MaxRequests: math.MaxInt16,
DownloadRate: 2,
}
+ basePeer.Pieces.Add(0)
stealee := basePeer
stealee.DownloadRate = 1
- stealee.HasExistingRequest = hasAllRequests
+ stealee.ExistingRequests = hasAllRequests
stealee.Id = intPeerId(1)
firstStealer := basePeer
firstStealer.Id = intPeerId(2)
func TestPeerKeepsExistingIfReasonable(t *testing.T) {
c := qt.New(t)
basePeer := Peer{
- HasPiece: func(i pieceIndex) bool {
- return true
- },
MaxRequests: math.MaxInt16,
DownloadRate: 2,
}
+ basePeer.Pieces.Add(0)
// Slower than the stealers, but has all requests already.
stealee := basePeer
stealee.DownloadRate = 1
keepReq := RequestIndex(0)
- stealee.HasExistingRequest = func(r RequestIndex) bool {
- return r == keepReq
- }
+ stealee.ExistingRequests = requestSetFromSlice(keepReq)
stealee.Id = intPeerId(1)
firstStealer := basePeer
firstStealer.Id = intPeerId(2)
func TestDontStealUnnecessarily(t *testing.T) {
c := qt.New(t)
basePeer := Peer{
- HasPiece: func(i pieceIndex) bool {
- return true
- },
MaxRequests: math.MaxInt16,
DownloadRate: 2,
}
+ basePeer.Pieces.AddRange(0, 5)
// Slower than the stealers, but has all requests already.
stealee := basePeer
stealee.DownloadRate = 1
keepReqs := requestSetFromSlice(
r(3, 2), r(3, 4), r(3, 6), r(3, 8),
r(4, 0), r(4, 1), r(4, 7), r(4, 8))
- stealee.HasExistingRequest = func(r RequestIndex) bool {
- return keepReqs.Contains(r)
- }
+ stealee.ExistingRequests = keepReqs
stealee.Id = intPeerId(1)
firstStealer := basePeer
firstStealer.Id = intPeerId(2)
secondStealer := basePeer
secondStealer.Id = intPeerId(3)
- secondStealer.HasPiece = func(i pieceIndex) bool {
- switch i {
- case 1, 3:
- return true
- default:
- return false
- }
- }
+ secondStealer.Pieces = roaring.Bitmap{}
+ secondStealer.Pieces.Add(1)
+ secondStealer.Pieces.Add(3)
results := Run(Input{Torrents: []Torrent{{
ChunksPerPiece: 9,
Pieces: []Piece{
// its actual request state since the last request strategy run.
func TestDuplicatePreallocations(t *testing.T) {
peer := func(id int, downloadRate float64) Peer {
- return Peer{
- HasExistingRequest: hasAllRequests,
- MaxRequests: 2,
- HasPiece: func(i pieceIndex) bool {
- return true
- },
- Id: intPeerId(id),
- DownloadRate: downloadRate,
+ p := Peer{
+ ExistingRequests: hasAllRequests,
+ MaxRequests: 2,
+ Id: intPeerId(id),
+ DownloadRate: downloadRate,
}
+ p.Pieces.AddRange(0, roaring.MaxRange)
+ return p
}
results := Run(Input{
Torrents: []Torrent{{
}
type Peer struct {
- HasPiece func(i pieceIndex) bool
- MaxRequests int
- HasExistingRequest func(r RequestIndex) bool
- Choking bool
- PieceAllowedFast func(pieceIndex) bool
- DownloadRate float64
- Age time.Duration
+ Pieces roaring.Bitmap
+ MaxRequests int
+ ExistingRequests roaring.Bitmap
+ Choking bool
+ PieceAllowedFast roaring.Bitmap
+ DownloadRate float64
+ Age time.Duration
// This is passed back out at the end, so must support equality. Could be a type-param later.
Id PeerId
}
-func (p *Peer) pieceAllowedFastOrDefault(i pieceIndex) bool {
- if f := p.PieceAllowedFast; f != nil {
- return f(i)
- }
- return false
-}
-
// TODO: This might be used in more places I think.
func (p *Peer) canRequestPiece(i pieceIndex) bool {
- return (!p.Choking || p.pieceAllowedFastOrDefault(i)) && p.HasPiece(i)
+ return (!p.Choking || p.PieceAllowedFast.Contains(uint32(i))) && p.HasPiece(i)
+}
+
+func (p *Peer) HasPiece(i pieceIndex) bool {
+ return p.Pieces.Contains(uint32(i))
}
package request_strategy
-type ChunksIter func(func(ChunkIndex))
+type ChunksIterFunc func(func(ChunkIndex))
+
+type ChunksIter interface {
+ Iter(func(ChunkIndex))
+}
type Piece struct {
Request bool
func (p Piece) iterPendingChunksWrapper(f func(ChunkIndex)) {
i := p.IterPendingChunks
if i != nil {
- i(f)
+ i.Iter(f)
}
}
package torrent
import (
+ "encoding/gob"
+ "reflect"
"time"
"unsafe"
"github.com/RoaringBitmap/roaring"
+ "github.com/anacrolix/log"
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/chansync"
Availability: p.availability,
Length: int64(p.length()),
NumPendingChunks: int(t.pieceNumPendingChunks(i)),
- IterPendingChunks: p.iterUndirtiedChunks,
+ IterPendingChunks: p.undirtiedChunksIter(),
})
}
t.iterPeers(func(p *Peer) {
}
p.piecesReceivedSinceLastRequestUpdate = 0
rst.Peers = append(rst.Peers, request_strategy.Peer{
- HasPiece: p.peerHasPiece,
- MaxRequests: p.nominalMaxRequests(),
- HasExistingRequest: func(r RequestIndex) bool {
- return p.actualRequestState.Requests.Contains(r)
- },
- Choking: p.peerChoking,
- PieceAllowedFast: func(i pieceIndex) bool {
- return p.peerAllowedFast.Contains(bitmap.BitIndex(i))
- },
- DownloadRate: p.downloadRate(),
- Age: time.Since(p.completedHandshake),
+ Pieces: *p.newPeerPieces(),
+ MaxRequests: p.nominalMaxRequests(),
+ ExistingRequests: p.actualRequestState.Requests,
+ Choking: p.peerChoking,
+ PieceAllowedFast: p.peerAllowedFast,
+ DownloadRate: p.downloadRate(),
+ Age: time.Since(p.completedHandshake),
Id: peerId{
Peer: p,
ptr: uintptr(unsafe.Pointer(p)),
}
func (cl *Client) doRequests() {
- nextPeerStates := request_strategy.Run(cl.getRequestStrategyInput())
+ input := cl.getRequestStrategyInput()
+ nextPeerStates := request_strategy.Run(input)
for p, state := range nextPeerStates {
setPeerNextRequestState(p, state)
}
}
+func init() {
+ gob.Register(peerId{})
+}
+
type peerId struct {
*Peer
ptr uintptr
return p.ptr
}
+func (p peerId) GobEncode() (b []byte, _ error) {
+ *(*reflect.SliceHeader)(unsafe.Pointer(&b)) = reflect.SliceHeader{
+ Data: uintptr(unsafe.Pointer(&p.ptr)),
+ Len: int(unsafe.Sizeof(p.ptr)),
+ Cap: int(unsafe.Sizeof(p.ptr)),
+ }
+ return
+}
+
+func (p *peerId) GobDecode(b []byte) error {
+ if uintptr(len(b)) != unsafe.Sizeof(p.ptr) {
+ panic(len(b))
+ }
+ ptr := unsafe.Pointer(&b[0])
+ p.ptr = *(*uintptr)(ptr)
+ log.Printf("%p", ptr)
+ dst := reflect.SliceHeader{
+ Data: uintptr(unsafe.Pointer(&p.Peer)),
+ Len: int(unsafe.Sizeof(p.Peer)),
+ Cap: int(unsafe.Sizeof(p.Peer)),
+ }
+ copy(*(*[]byte)(unsafe.Pointer(&dst)), b)
+ return nil
+}
+
func setPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.PeerNextRequestState) {
p := _p.(peerId).Peer
p.nextRequestState = rp