]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Switch Peer.PieceAllowedFast and several request strategy inputs to raw roaring.Bitmaps
authorMatt Joiner <anacrolix@gmail.com>
Tue, 5 Oct 2021 09:06:23 +0000 (20:06 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 5 Oct 2021 09:06:23 +0000 (20:06 +1100)
This is in preparation to support encoding request strategy run inputs for benchmarking.

peerconn.go
piece.go
request-strategy/order.go
request-strategy/order_test.go
request-strategy/peer.go
request-strategy/piece.go
requesting.go

index 2c309bd7bb9bbd3b523a0895a8a464b0ed8ca6f2..0cb127fa684a5b70ecae117765d255b367a2232d 100644 (file)
@@ -121,7 +121,7 @@ type Peer struct {
        peerMinPieces pieceIndex
        // Pieces we've accepted chunks for from the peer.
        peerTouchedPieces map[pieceIndex]struct{}
-       peerAllowedFast   bitmap.Bitmap
+       peerAllowedFast   roaring.Bitmap
 
        PeerMaxRequests  maxRequests // Maximum pending requests the peer allows.
        PeerExtensionIDs map[pp.ExtensionName]pp.ExtensionNumber
@@ -182,16 +182,19 @@ func (cn *Peer) expectingChunks() bool {
        if !cn.actualRequestState.Interested {
                return false
        }
-       if cn.peerAllowedFast.IterTyped(func(i int) bool {
-               return roaringBitmapRangeCardinality(
-                       &cn.actualRequestState.Requests,
-                       cn.t.pieceRequestIndexOffset(i),
-                       cn.t.pieceRequestIndexOffset(i+1),
-               ) == 0
-       }) {
+       if !cn.peerChoking {
                return true
        }
-       return !cn.peerChoking
+       haveAllowedFastRequests := false
+       cn.peerAllowedFast.Iterate(func(i uint32) bool {
+               haveAllowedFastRequests = roaringBitmapRangeCardinality(
+                       &cn.actualRequestState.Requests,
+                       cn.t.pieceRequestIndexOffset(pieceIndex(i)),
+                       cn.t.pieceRequestIndexOffset(pieceIndex(i+1)),
+               ) == 0
+               return !haveAllowedFastRequests
+       })
+       return haveAllowedFastRequests
 }
 
 func (cn *Peer) remoteChokingPiece(piece pieceIndex) bool {
@@ -1276,7 +1279,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
        }
        c.decExpectedChunkReceive(req)
 
-       if c.peerChoking && c.peerAllowedFast.Get(bitmap.BitIndex(ppReq.Index)) {
+       if c.peerChoking && c.peerAllowedFast.Contains(bitmap.BitIndex(ppReq.Index)) {
                chunksReceived.Add("due to allowed fast", 1)
        }
 
index bd107144ec91e8e9533a591416f41036da2ef848..6376c8c1c4d2b279f8befed52fe59b89b6c0e7df 100644 (file)
--- a/piece.go
+++ b/piece.go
@@ -1,11 +1,14 @@
 package torrent
 
 import (
+       "encoding/gob"
        "fmt"
        "sync"
 
+       "github.com/RoaringBitmap/roaring"
        "github.com/anacrolix/chansync"
        "github.com/anacrolix/missinggo/v2/bitmap"
+       request_strategy "github.com/anacrolix/torrent/request-strategy"
 
        "github.com/anacrolix/torrent/metainfo"
        pp "github.com/anacrolix/torrent/peer_protocol"
@@ -237,27 +240,50 @@ func (p *Piece) State() PieceState {
        return p.t.PieceState(p.index)
 }
 
-func (p *Piece) iterUndirtiedChunks(f func(chunkIndexType)) {
-       // Use an iterator to jump between dirty bits.
-       if true {
-               it := p.t.dirtyChunks.Iterator()
-               startIndex := p.requestIndexOffset()
-               endIndex := startIndex + p.numChunks()
-               it.AdvanceIfNeeded(startIndex)
-               lastDirty := startIndex - 1
-               for it.HasNext() {
-                       next := it.Next()
-                       if next >= endIndex {
-                               break
-                       }
-                       for index := lastDirty + 1; index < next; index++ {
-                               f(index - startIndex)
-                       }
-                       lastDirty = next
+func init() {
+       gob.Register(undirtiedChunksIter{})
+}
+
+type undirtiedChunksIter struct {
+       TorrentDirtyChunks *roaring.Bitmap
+       StartRequestIndex  RequestIndex
+       EndRequestIndex    RequestIndex
+}
+
+func (me undirtiedChunksIter) Iter(f func(chunkIndexType)) {
+       it := me.TorrentDirtyChunks.Iterator()
+       startIndex := me.StartRequestIndex
+       endIndex := me.EndRequestIndex
+       it.AdvanceIfNeeded(startIndex)
+       lastDirty := startIndex - 1
+       for it.HasNext() {
+               next := it.Next()
+               if next >= endIndex {
+                       break
                }
-               for index := lastDirty + 1; index < endIndex; index++ {
+               for index := lastDirty + 1; index < next; index++ {
                        f(index - startIndex)
                }
+               lastDirty = next
+       }
+       for index := lastDirty + 1; index < endIndex; index++ {
+               f(index - startIndex)
+       }
+       return
+}
+
+func (p *Piece) undirtiedChunksIter() request_strategy.ChunksIter {
+       // Use an iterator to jump between dirty bits.
+       return undirtiedChunksIter{
+               TorrentDirtyChunks: &p.t.dirtyChunks,
+               StartRequestIndex:  p.requestIndexOffset(),
+               EndRequestIndex:    p.requestIndexOffset() + p.numChunks(),
+       }
+}
+
+func (p *Piece) iterUndirtiedChunks(f func(chunkIndexType)) {
+       if true {
+               p.undirtiedChunksIter().Iter(f)
                return
        }
        // The original implementation.
index 752198a6496e63f3c03d58329bbbb4a55ebbd398..82cb504872d69596e65529c7edc7541f3c815fc6 100644 (file)
@@ -84,7 +84,7 @@ type requestablePiece struct {
        t                 *Torrent
        alwaysReallocate  bool
        NumPendingChunks  int
-       IterPendingChunks ChunksIter
+       IterPendingChunks ChunksIterFunc
 }
 
 func (p *requestablePiece) chunkIndexToRequestIndex(c ChunkIndex) RequestIndex {
@@ -338,7 +338,7 @@ func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
        p.IterPendingChunks(func(spec ChunkIndex) {
                req := p.chunkIndexToRequestIndex(spec)
                for _, peer := range peersForPiece {
-                       if h := peer.HasExistingRequest; h == nil || !h(req) {
+                       if !peer.ExistingRequests.Contains(req) {
                                continue
                        }
                        if !peer.canFitRequest() {
@@ -360,7 +360,7 @@ func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
                        if !peer.canFitRequest() {
                                continue
                        }
-                       if !peer.pieceAllowedFastOrDefault(p.index) {
+                       if !peer.PieceAllowedFast.ContainsInt(p.index) {
                                // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
                                peer.nextState.Interested = true
                                if peer.Choking {
@@ -389,7 +389,7 @@ chunk:
                        if !peer.canFitRequest() {
                                continue
                        }
-                       if !peer.pieceAllowedFastOrDefault(p.index) {
+                       if !peer.PieceAllowedFast.ContainsInt(p.index) {
                                // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
                                peer.nextState.Interested = true
                                if peer.Choking {
index d15988f74c7bb2ff5bc9743f85cd8a41dea59411..ba86f0cd1482d4b62863a7c255d156ff159e25a2 100644 (file)
@@ -1,6 +1,7 @@
 package request_strategy
 
 import (
+       "encoding/gob"
        "math"
        "testing"
 
@@ -9,19 +10,28 @@ import (
        "github.com/google/go-cmp/cmp"
 )
 
-func chunkIterRange(end ChunkIndex) ChunksIter {
-       return func(f func(ChunkIndex)) {
-               for offset := ChunkIndex(0); offset < end; offset += 1 {
-                       f(offset)
-               }
+func init() {
+       gob.Register(chunkIterRange(0))
+       gob.Register(sliceChunksIter{})
+}
+
+type chunkIterRange ChunkIndex
+
+func (me chunkIterRange) Iter(f func(ChunkIndex)) {
+       for offset := ChunkIndex(0); offset < ChunkIndex(me); offset += 1 {
+               f(offset)
        }
 }
 
+type sliceChunksIter []ChunkIndex
+
 func chunkIter(offsets ...ChunkIndex) ChunksIter {
-       return func(f func(ChunkIndex)) {
-               for _, offset := range offsets {
-                       f(offset)
-               }
+       return sliceChunksIter(offsets)
+}
+
+func (offsets sliceChunksIter) Iter(f func(ChunkIndex)) {
+       for _, offset := range offsets {
+               f(offset)
        }
 }
 
@@ -30,27 +40,32 @@ func requestSetFromSlice(rs ...RequestIndex) (ret roaring.Bitmap) {
        return
 }
 
+func init() {
+       gob.Register(intPeerId(0))
+}
+
 type intPeerId int
 
 func (i intPeerId) Uintptr() uintptr {
        return uintptr(i)
 }
 
-func hasAllRequests(RequestIndex) bool { return true }
+var hasAllRequests = func() (all roaring.Bitmap) {
+       all.AddRange(0, roaring.MaxRange)
+       return
+}()
 
 func TestStealingFromSlowerPeer(t *testing.T) {
        c := qt.New(t)
        basePeer := Peer{
-               HasPiece: func(i pieceIndex) bool {
-                       return true
-               },
                MaxRequests:  math.MaxInt16,
                DownloadRate: 2,
        }
+       basePeer.Pieces.Add(0)
        // Slower than the stealers, but has all requests already.
        stealee := basePeer
        stealee.DownloadRate = 1
-       stealee.HasExistingRequest = hasAllRequests
+       stealee.ExistingRequests = hasAllRequests
        stealee.Id = intPeerId(1)
        firstStealer := basePeer
        firstStealer.Id = intPeerId(2)
@@ -90,15 +105,13 @@ func checkNumRequestsAndInterest(c *qt.C, next PeerNextRequestState, num uint64,
 func TestStealingFromSlowerPeersBasic(t *testing.T) {
        c := qt.New(t)
        basePeer := Peer{
-               HasPiece: func(i pieceIndex) bool {
-                       return true
-               },
                MaxRequests:  math.MaxInt16,
                DownloadRate: 2,
        }
+       basePeer.Pieces.Add(0)
        stealee := basePeer
        stealee.DownloadRate = 1
-       stealee.HasExistingRequest = hasAllRequests
+       stealee.ExistingRequests = hasAllRequests
        stealee.Id = intPeerId(1)
        firstStealer := basePeer
        firstStealer.Id = intPeerId(2)
@@ -130,19 +143,15 @@ func checkResultsRequestsLen(t *testing.T, reqs roaring.Bitmap, l uint64) {
 func TestPeerKeepsExistingIfReasonable(t *testing.T) {
        c := qt.New(t)
        basePeer := Peer{
-               HasPiece: func(i pieceIndex) bool {
-                       return true
-               },
                MaxRequests:  math.MaxInt16,
                DownloadRate: 2,
        }
+       basePeer.Pieces.Add(0)
        // Slower than the stealers, but has all requests already.
        stealee := basePeer
        stealee.DownloadRate = 1
        keepReq := RequestIndex(0)
-       stealee.HasExistingRequest = func(r RequestIndex) bool {
-               return r == keepReq
-       }
+       stealee.ExistingRequests = requestSetFromSlice(keepReq)
        stealee.Id = intPeerId(1)
        firstStealer := basePeer
        firstStealer.Id = intPeerId(2)
@@ -189,12 +198,10 @@ var peerNextRequestStateChecker = qt.CmpEquals(
 func TestDontStealUnnecessarily(t *testing.T) {
        c := qt.New(t)
        basePeer := Peer{
-               HasPiece: func(i pieceIndex) bool {
-                       return true
-               },
                MaxRequests:  math.MaxInt16,
                DownloadRate: 2,
        }
+       basePeer.Pieces.AddRange(0, 5)
        // Slower than the stealers, but has all requests already.
        stealee := basePeer
        stealee.DownloadRate = 1
@@ -204,22 +211,15 @@ func TestDontStealUnnecessarily(t *testing.T) {
        keepReqs := requestSetFromSlice(
                r(3, 2), r(3, 4), r(3, 6), r(3, 8),
                r(4, 0), r(4, 1), r(4, 7), r(4, 8))
-       stealee.HasExistingRequest = func(r RequestIndex) bool {
-               return keepReqs.Contains(r)
-       }
+       stealee.ExistingRequests = keepReqs
        stealee.Id = intPeerId(1)
        firstStealer := basePeer
        firstStealer.Id = intPeerId(2)
        secondStealer := basePeer
        secondStealer.Id = intPeerId(3)
-       secondStealer.HasPiece = func(i pieceIndex) bool {
-               switch i {
-               case 1, 3:
-                       return true
-               default:
-                       return false
-               }
-       }
+       secondStealer.Pieces = roaring.Bitmap{}
+       secondStealer.Pieces.Add(1)
+       secondStealer.Pieces.Add(3)
        results := Run(Input{Torrents: []Torrent{{
                ChunksPerPiece: 9,
                Pieces: []Piece{
@@ -277,15 +277,14 @@ func TestDontStealUnnecessarily(t *testing.T) {
 // its actual request state since the last request strategy run.
 func TestDuplicatePreallocations(t *testing.T) {
        peer := func(id int, downloadRate float64) Peer {
-               return Peer{
-                       HasExistingRequest: hasAllRequests,
-                       MaxRequests:        2,
-                       HasPiece: func(i pieceIndex) bool {
-                               return true
-                       },
-                       Id:           intPeerId(id),
-                       DownloadRate: downloadRate,
+               p := Peer{
+                       ExistingRequests: hasAllRequests,
+                       MaxRequests:      2,
+                       Id:               intPeerId(id),
+                       DownloadRate:     downloadRate,
                }
+               p.Pieces.AddRange(0, roaring.MaxRange)
+               return p
        }
        results := Run(Input{
                Torrents: []Torrent{{
index ece8ea427adce35c2c4961294ad97cf56367d157..b031d28e7fc1b2bdee3717f3a618366b3a456202 100644 (file)
@@ -16,25 +16,22 @@ type PeerId interface {
 }
 
 type Peer struct {
-       HasPiece           func(i pieceIndex) bool
-       MaxRequests        int
-       HasExistingRequest func(r RequestIndex) bool
-       Choking            bool
-       PieceAllowedFast   func(pieceIndex) bool
-       DownloadRate       float64
-       Age                time.Duration
+       Pieces           roaring.Bitmap
+       MaxRequests      int
+       ExistingRequests roaring.Bitmap
+       Choking          bool
+       PieceAllowedFast roaring.Bitmap
+       DownloadRate     float64
+       Age              time.Duration
        // This is passed back out at the end, so must support equality. Could be a type-param later.
        Id PeerId
 }
 
-func (p *Peer) pieceAllowedFastOrDefault(i pieceIndex) bool {
-       if f := p.PieceAllowedFast; f != nil {
-               return f(i)
-       }
-       return false
-}
-
 // TODO: This might be used in more places I think.
 func (p *Peer) canRequestPiece(i pieceIndex) bool {
-       return (!p.Choking || p.pieceAllowedFastOrDefault(i)) && p.HasPiece(i)
+       return (!p.Choking || p.PieceAllowedFast.Contains(uint32(i))) && p.HasPiece(i)
+}
+
+func (p *Peer) HasPiece(i pieceIndex) bool {
+       return p.Pieces.Contains(uint32(i))
 }
index 2dbe0bcf40f820e46a103a2f2dde1a6ba2d40ad2..dfc2d928a323710add9c010bbc66858a19246e24 100644 (file)
@@ -1,6 +1,10 @@
 package request_strategy
 
-type ChunksIter func(func(ChunkIndex))
+type ChunksIterFunc func(func(ChunkIndex))
+
+type ChunksIter interface {
+       Iter(func(ChunkIndex))
+}
 
 type Piece struct {
        Request           bool
@@ -15,6 +19,6 @@ type Piece struct {
 func (p Piece) iterPendingChunksWrapper(f func(ChunkIndex)) {
        i := p.IterPendingChunks
        if i != nil {
-               i(f)
+               i.Iter(f)
        }
 }
index 01814ce829d43bb97fdca3ea1768be1ae68f20b0..a76d7dcc79df107d45ab3d43d0468e60a43a458e 100644 (file)
@@ -1,10 +1,13 @@
 package torrent
 
 import (
+       "encoding/gob"
+       "reflect"
        "time"
        "unsafe"
 
        "github.com/RoaringBitmap/roaring"
+       "github.com/anacrolix/log"
        "github.com/anacrolix/missinggo/v2/bitmap"
 
        "github.com/anacrolix/chansync"
@@ -69,7 +72,7 @@ func (cl *Client) getRequestStrategyInput() request_strategy.Input {
                                Availability:      p.availability,
                                Length:            int64(p.length()),
                                NumPendingChunks:  int(t.pieceNumPendingChunks(i)),
-                               IterPendingChunks: p.iterUndirtiedChunks,
+                               IterPendingChunks: p.undirtiedChunksIter(),
                        })
                }
                t.iterPeers(func(p *Peer) {
@@ -81,17 +84,13 @@ func (cl *Client) getRequestStrategyInput() request_strategy.Input {
                        }
                        p.piecesReceivedSinceLastRequestUpdate = 0
                        rst.Peers = append(rst.Peers, request_strategy.Peer{
-                               HasPiece:    p.peerHasPiece,
-                               MaxRequests: p.nominalMaxRequests(),
-                               HasExistingRequest: func(r RequestIndex) bool {
-                                       return p.actualRequestState.Requests.Contains(r)
-                               },
-                               Choking: p.peerChoking,
-                               PieceAllowedFast: func(i pieceIndex) bool {
-                                       return p.peerAllowedFast.Contains(bitmap.BitIndex(i))
-                               },
-                               DownloadRate: p.downloadRate(),
-                               Age:          time.Since(p.completedHandshake),
+                               Pieces:           *p.newPeerPieces(),
+                               MaxRequests:      p.nominalMaxRequests(),
+                               ExistingRequests: p.actualRequestState.Requests,
+                               Choking:          p.peerChoking,
+                               PieceAllowedFast: p.peerAllowedFast,
+                               DownloadRate:     p.downloadRate(),
+                               Age:              time.Since(p.completedHandshake),
                                Id: peerId{
                                        Peer: p,
                                        ptr:  uintptr(unsafe.Pointer(p)),
@@ -107,12 +106,17 @@ func (cl *Client) getRequestStrategyInput() request_strategy.Input {
 }
 
 func (cl *Client) doRequests() {
-       nextPeerStates := request_strategy.Run(cl.getRequestStrategyInput())
+       input := cl.getRequestStrategyInput()
+       nextPeerStates := request_strategy.Run(input)
        for p, state := range nextPeerStates {
                setPeerNextRequestState(p, state)
        }
 }
 
+func init() {
+       gob.Register(peerId{})
+}
+
 type peerId struct {
        *Peer
        ptr uintptr
@@ -122,6 +126,31 @@ func (p peerId) Uintptr() uintptr {
        return p.ptr
 }
 
+func (p peerId) GobEncode() (b []byte, _ error) {
+       *(*reflect.SliceHeader)(unsafe.Pointer(&b)) = reflect.SliceHeader{
+               Data: uintptr(unsafe.Pointer(&p.ptr)),
+               Len:  int(unsafe.Sizeof(p.ptr)),
+               Cap:  int(unsafe.Sizeof(p.ptr)),
+       }
+       return
+}
+
+func (p *peerId) GobDecode(b []byte) error {
+       if uintptr(len(b)) != unsafe.Sizeof(p.ptr) {
+               panic(len(b))
+       }
+       ptr := unsafe.Pointer(&b[0])
+       p.ptr = *(*uintptr)(ptr)
+       log.Printf("%p", ptr)
+       dst := reflect.SliceHeader{
+               Data: uintptr(unsafe.Pointer(&p.Peer)),
+               Len:  int(unsafe.Sizeof(p.Peer)),
+               Cap:  int(unsafe.Sizeof(p.Peer)),
+       }
+       copy(*(*[]byte)(unsafe.Pointer(&dst)), b)
+       return nil
+}
+
 func setPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.PeerNextRequestState) {
        p := _p.(peerId).Peer
        p.nextRequestState = rp