From eab111dd84dfc19ac3dd1f86c194204bc47e4a6b Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Tue, 5 Oct 2021 20:06:23 +1100 Subject: [PATCH] Switch Peer.PieceAllowedFast and several request strategy inputs to raw roaring.Bitmaps This is in preparation to support encoding request strategy run inputs for benchmarking. --- peerconn.go | 23 +++++---- piece.go | 62 ++++++++++++++++------- request-strategy/order.go | 8 +-- request-strategy/order_test.go | 91 +++++++++++++++++----------------- request-strategy/peer.go | 27 +++++----- request-strategy/piece.go | 8 ++- requesting.go | 55 +++++++++++++++----- 7 files changed, 166 insertions(+), 108 deletions(-) diff --git a/peerconn.go b/peerconn.go index 2c309bd7..0cb127fa 100644 --- a/peerconn.go +++ b/peerconn.go @@ -121,7 +121,7 @@ type Peer struct { peerMinPieces pieceIndex // Pieces we've accepted chunks for from the peer. peerTouchedPieces map[pieceIndex]struct{} - peerAllowedFast bitmap.Bitmap + peerAllowedFast roaring.Bitmap PeerMaxRequests maxRequests // Maximum pending requests the peer allows. PeerExtensionIDs map[pp.ExtensionName]pp.ExtensionNumber @@ -182,16 +182,19 @@ func (cn *Peer) expectingChunks() bool { if !cn.actualRequestState.Interested { return false } - if cn.peerAllowedFast.IterTyped(func(i int) bool { - return roaringBitmapRangeCardinality( - &cn.actualRequestState.Requests, - cn.t.pieceRequestIndexOffset(i), - cn.t.pieceRequestIndexOffset(i+1), - ) == 0 - }) { + if !cn.peerChoking { return true } - return !cn.peerChoking + haveAllowedFastRequests := false + cn.peerAllowedFast.Iterate(func(i uint32) bool { + haveAllowedFastRequests = roaringBitmapRangeCardinality( + &cn.actualRequestState.Requests, + cn.t.pieceRequestIndexOffset(pieceIndex(i)), + cn.t.pieceRequestIndexOffset(pieceIndex(i+1)), + ) == 0 + return !haveAllowedFastRequests + }) + return haveAllowedFastRequests } func (cn *Peer) remoteChokingPiece(piece pieceIndex) bool { @@ -1276,7 +1279,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { } c.decExpectedChunkReceive(req) - if c.peerChoking && c.peerAllowedFast.Get(bitmap.BitIndex(ppReq.Index)) { + if c.peerChoking && c.peerAllowedFast.Contains(bitmap.BitIndex(ppReq.Index)) { chunksReceived.Add("due to allowed fast", 1) } diff --git a/piece.go b/piece.go index bd107144..6376c8c1 100644 --- a/piece.go +++ b/piece.go @@ -1,11 +1,14 @@ package torrent import ( + "encoding/gob" "fmt" "sync" + "github.com/RoaringBitmap/roaring" "github.com/anacrolix/chansync" "github.com/anacrolix/missinggo/v2/bitmap" + request_strategy "github.com/anacrolix/torrent/request-strategy" "github.com/anacrolix/torrent/metainfo" pp "github.com/anacrolix/torrent/peer_protocol" @@ -237,27 +240,50 @@ func (p *Piece) State() PieceState { return p.t.PieceState(p.index) } -func (p *Piece) iterUndirtiedChunks(f func(chunkIndexType)) { - // Use an iterator to jump between dirty bits. - if true { - it := p.t.dirtyChunks.Iterator() - startIndex := p.requestIndexOffset() - endIndex := startIndex + p.numChunks() - it.AdvanceIfNeeded(startIndex) - lastDirty := startIndex - 1 - for it.HasNext() { - next := it.Next() - if next >= endIndex { - break - } - for index := lastDirty + 1; index < next; index++ { - f(index - startIndex) - } - lastDirty = next +func init() { + gob.Register(undirtiedChunksIter{}) +} + +type undirtiedChunksIter struct { + TorrentDirtyChunks *roaring.Bitmap + StartRequestIndex RequestIndex + EndRequestIndex RequestIndex +} + +func (me undirtiedChunksIter) Iter(f func(chunkIndexType)) { + it := me.TorrentDirtyChunks.Iterator() + startIndex := me.StartRequestIndex + endIndex := me.EndRequestIndex + it.AdvanceIfNeeded(startIndex) + lastDirty := startIndex - 1 + for it.HasNext() { + next := it.Next() + if next >= endIndex { + break } - for index := lastDirty + 1; index < endIndex; index++ { + for index := lastDirty + 1; index < next; index++ { f(index - startIndex) } + lastDirty = next + } + for index := lastDirty + 1; index < endIndex; index++ { + f(index - startIndex) + } + return +} + +func (p *Piece) undirtiedChunksIter() request_strategy.ChunksIter { + // Use an iterator to jump between dirty bits. + return undirtiedChunksIter{ + TorrentDirtyChunks: &p.t.dirtyChunks, + StartRequestIndex: p.requestIndexOffset(), + EndRequestIndex: p.requestIndexOffset() + p.numChunks(), + } +} + +func (p *Piece) iterUndirtiedChunks(f func(chunkIndexType)) { + if true { + p.undirtiedChunksIter().Iter(f) return } // The original implementation. diff --git a/request-strategy/order.go b/request-strategy/order.go index 752198a6..82cb5048 100644 --- a/request-strategy/order.go +++ b/request-strategy/order.go @@ -84,7 +84,7 @@ type requestablePiece struct { t *Torrent alwaysReallocate bool NumPendingChunks int - IterPendingChunks ChunksIter + IterPendingChunks ChunksIterFunc } func (p *requestablePiece) chunkIndexToRequestIndex(c ChunkIndex) RequestIndex { @@ -338,7 +338,7 @@ func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) { p.IterPendingChunks(func(spec ChunkIndex) { req := p.chunkIndexToRequestIndex(spec) for _, peer := range peersForPiece { - if h := peer.HasExistingRequest; h == nil || !h(req) { + if !peer.ExistingRequests.Contains(req) { continue } if !peer.canFitRequest() { @@ -360,7 +360,7 @@ func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) { if !peer.canFitRequest() { continue } - if !peer.pieceAllowedFastOrDefault(p.index) { + if !peer.PieceAllowedFast.ContainsInt(p.index) { // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces. peer.nextState.Interested = true if peer.Choking { @@ -389,7 +389,7 @@ chunk: if !peer.canFitRequest() { continue } - if !peer.pieceAllowedFastOrDefault(p.index) { + if !peer.PieceAllowedFast.ContainsInt(p.index) { // TODO: Verify that's okay to stay uninterested if we request allowed fast pieces. peer.nextState.Interested = true if peer.Choking { diff --git a/request-strategy/order_test.go b/request-strategy/order_test.go index d15988f7..ba86f0cd 100644 --- a/request-strategy/order_test.go +++ b/request-strategy/order_test.go @@ -1,6 +1,7 @@ package request_strategy import ( + "encoding/gob" "math" "testing" @@ -9,19 +10,28 @@ import ( "github.com/google/go-cmp/cmp" ) -func chunkIterRange(end ChunkIndex) ChunksIter { - return func(f func(ChunkIndex)) { - for offset := ChunkIndex(0); offset < end; offset += 1 { - f(offset) - } +func init() { + gob.Register(chunkIterRange(0)) + gob.Register(sliceChunksIter{}) +} + +type chunkIterRange ChunkIndex + +func (me chunkIterRange) Iter(f func(ChunkIndex)) { + for offset := ChunkIndex(0); offset < ChunkIndex(me); offset += 1 { + f(offset) } } +type sliceChunksIter []ChunkIndex + func chunkIter(offsets ...ChunkIndex) ChunksIter { - return func(f func(ChunkIndex)) { - for _, offset := range offsets { - f(offset) - } + return sliceChunksIter(offsets) +} + +func (offsets sliceChunksIter) Iter(f func(ChunkIndex)) { + for _, offset := range offsets { + f(offset) } } @@ -30,27 +40,32 @@ func requestSetFromSlice(rs ...RequestIndex) (ret roaring.Bitmap) { return } +func init() { + gob.Register(intPeerId(0)) +} + type intPeerId int func (i intPeerId) Uintptr() uintptr { return uintptr(i) } -func hasAllRequests(RequestIndex) bool { return true } +var hasAllRequests = func() (all roaring.Bitmap) { + all.AddRange(0, roaring.MaxRange) + return +}() func TestStealingFromSlowerPeer(t *testing.T) { c := qt.New(t) basePeer := Peer{ - HasPiece: func(i pieceIndex) bool { - return true - }, MaxRequests: math.MaxInt16, DownloadRate: 2, } + basePeer.Pieces.Add(0) // Slower than the stealers, but has all requests already. stealee := basePeer stealee.DownloadRate = 1 - stealee.HasExistingRequest = hasAllRequests + stealee.ExistingRequests = hasAllRequests stealee.Id = intPeerId(1) firstStealer := basePeer firstStealer.Id = intPeerId(2) @@ -90,15 +105,13 @@ func checkNumRequestsAndInterest(c *qt.C, next PeerNextRequestState, num uint64, func TestStealingFromSlowerPeersBasic(t *testing.T) { c := qt.New(t) basePeer := Peer{ - HasPiece: func(i pieceIndex) bool { - return true - }, MaxRequests: math.MaxInt16, DownloadRate: 2, } + basePeer.Pieces.Add(0) stealee := basePeer stealee.DownloadRate = 1 - stealee.HasExistingRequest = hasAllRequests + stealee.ExistingRequests = hasAllRequests stealee.Id = intPeerId(1) firstStealer := basePeer firstStealer.Id = intPeerId(2) @@ -130,19 +143,15 @@ func checkResultsRequestsLen(t *testing.T, reqs roaring.Bitmap, l uint64) { func TestPeerKeepsExistingIfReasonable(t *testing.T) { c := qt.New(t) basePeer := Peer{ - HasPiece: func(i pieceIndex) bool { - return true - }, MaxRequests: math.MaxInt16, DownloadRate: 2, } + basePeer.Pieces.Add(0) // Slower than the stealers, but has all requests already. stealee := basePeer stealee.DownloadRate = 1 keepReq := RequestIndex(0) - stealee.HasExistingRequest = func(r RequestIndex) bool { - return r == keepReq - } + stealee.ExistingRequests = requestSetFromSlice(keepReq) stealee.Id = intPeerId(1) firstStealer := basePeer firstStealer.Id = intPeerId(2) @@ -189,12 +198,10 @@ var peerNextRequestStateChecker = qt.CmpEquals( func TestDontStealUnnecessarily(t *testing.T) { c := qt.New(t) basePeer := Peer{ - HasPiece: func(i pieceIndex) bool { - return true - }, MaxRequests: math.MaxInt16, DownloadRate: 2, } + basePeer.Pieces.AddRange(0, 5) // Slower than the stealers, but has all requests already. stealee := basePeer stealee.DownloadRate = 1 @@ -204,22 +211,15 @@ func TestDontStealUnnecessarily(t *testing.T) { keepReqs := requestSetFromSlice( r(3, 2), r(3, 4), r(3, 6), r(3, 8), r(4, 0), r(4, 1), r(4, 7), r(4, 8)) - stealee.HasExistingRequest = func(r RequestIndex) bool { - return keepReqs.Contains(r) - } + stealee.ExistingRequests = keepReqs stealee.Id = intPeerId(1) firstStealer := basePeer firstStealer.Id = intPeerId(2) secondStealer := basePeer secondStealer.Id = intPeerId(3) - secondStealer.HasPiece = func(i pieceIndex) bool { - switch i { - case 1, 3: - return true - default: - return false - } - } + secondStealer.Pieces = roaring.Bitmap{} + secondStealer.Pieces.Add(1) + secondStealer.Pieces.Add(3) results := Run(Input{Torrents: []Torrent{{ ChunksPerPiece: 9, Pieces: []Piece{ @@ -277,15 +277,14 @@ func TestDontStealUnnecessarily(t *testing.T) { // its actual request state since the last request strategy run. func TestDuplicatePreallocations(t *testing.T) { peer := func(id int, downloadRate float64) Peer { - return Peer{ - HasExistingRequest: hasAllRequests, - MaxRequests: 2, - HasPiece: func(i pieceIndex) bool { - return true - }, - Id: intPeerId(id), - DownloadRate: downloadRate, + p := Peer{ + ExistingRequests: hasAllRequests, + MaxRequests: 2, + Id: intPeerId(id), + DownloadRate: downloadRate, } + p.Pieces.AddRange(0, roaring.MaxRange) + return p } results := Run(Input{ Torrents: []Torrent{{ diff --git a/request-strategy/peer.go b/request-strategy/peer.go index ece8ea42..b031d28e 100644 --- a/request-strategy/peer.go +++ b/request-strategy/peer.go @@ -16,25 +16,22 @@ type PeerId interface { } type Peer struct { - HasPiece func(i pieceIndex) bool - MaxRequests int - HasExistingRequest func(r RequestIndex) bool - Choking bool - PieceAllowedFast func(pieceIndex) bool - DownloadRate float64 - Age time.Duration + Pieces roaring.Bitmap + MaxRequests int + ExistingRequests roaring.Bitmap + Choking bool + PieceAllowedFast roaring.Bitmap + DownloadRate float64 + Age time.Duration // This is passed back out at the end, so must support equality. Could be a type-param later. Id PeerId } -func (p *Peer) pieceAllowedFastOrDefault(i pieceIndex) bool { - if f := p.PieceAllowedFast; f != nil { - return f(i) - } - return false -} - // TODO: This might be used in more places I think. func (p *Peer) canRequestPiece(i pieceIndex) bool { - return (!p.Choking || p.pieceAllowedFastOrDefault(i)) && p.HasPiece(i) + return (!p.Choking || p.PieceAllowedFast.Contains(uint32(i))) && p.HasPiece(i) +} + +func (p *Peer) HasPiece(i pieceIndex) bool { + return p.Pieces.Contains(uint32(i)) } diff --git a/request-strategy/piece.go b/request-strategy/piece.go index 2dbe0bcf..dfc2d928 100644 --- a/request-strategy/piece.go +++ b/request-strategy/piece.go @@ -1,6 +1,10 @@ package request_strategy -type ChunksIter func(func(ChunkIndex)) +type ChunksIterFunc func(func(ChunkIndex)) + +type ChunksIter interface { + Iter(func(ChunkIndex)) +} type Piece struct { Request bool @@ -15,6 +19,6 @@ type Piece struct { func (p Piece) iterPendingChunksWrapper(f func(ChunkIndex)) { i := p.IterPendingChunks if i != nil { - i(f) + i.Iter(f) } } diff --git a/requesting.go b/requesting.go index 01814ce8..a76d7dcc 100644 --- a/requesting.go +++ b/requesting.go @@ -1,10 +1,13 @@ package torrent import ( + "encoding/gob" + "reflect" "time" "unsafe" "github.com/RoaringBitmap/roaring" + "github.com/anacrolix/log" "github.com/anacrolix/missinggo/v2/bitmap" "github.com/anacrolix/chansync" @@ -69,7 +72,7 @@ func (cl *Client) getRequestStrategyInput() request_strategy.Input { Availability: p.availability, Length: int64(p.length()), NumPendingChunks: int(t.pieceNumPendingChunks(i)), - IterPendingChunks: p.iterUndirtiedChunks, + IterPendingChunks: p.undirtiedChunksIter(), }) } t.iterPeers(func(p *Peer) { @@ -81,17 +84,13 @@ func (cl *Client) getRequestStrategyInput() request_strategy.Input { } p.piecesReceivedSinceLastRequestUpdate = 0 rst.Peers = append(rst.Peers, request_strategy.Peer{ - HasPiece: p.peerHasPiece, - MaxRequests: p.nominalMaxRequests(), - HasExistingRequest: func(r RequestIndex) bool { - return p.actualRequestState.Requests.Contains(r) - }, - Choking: p.peerChoking, - PieceAllowedFast: func(i pieceIndex) bool { - return p.peerAllowedFast.Contains(bitmap.BitIndex(i)) - }, - DownloadRate: p.downloadRate(), - Age: time.Since(p.completedHandshake), + Pieces: *p.newPeerPieces(), + MaxRequests: p.nominalMaxRequests(), + ExistingRequests: p.actualRequestState.Requests, + Choking: p.peerChoking, + PieceAllowedFast: p.peerAllowedFast, + DownloadRate: p.downloadRate(), + Age: time.Since(p.completedHandshake), Id: peerId{ Peer: p, ptr: uintptr(unsafe.Pointer(p)), @@ -107,12 +106,17 @@ func (cl *Client) getRequestStrategyInput() request_strategy.Input { } func (cl *Client) doRequests() { - nextPeerStates := request_strategy.Run(cl.getRequestStrategyInput()) + input := cl.getRequestStrategyInput() + nextPeerStates := request_strategy.Run(input) for p, state := range nextPeerStates { setPeerNextRequestState(p, state) } } +func init() { + gob.Register(peerId{}) +} + type peerId struct { *Peer ptr uintptr @@ -122,6 +126,31 @@ func (p peerId) Uintptr() uintptr { return p.ptr } +func (p peerId) GobEncode() (b []byte, _ error) { + *(*reflect.SliceHeader)(unsafe.Pointer(&b)) = reflect.SliceHeader{ + Data: uintptr(unsafe.Pointer(&p.ptr)), + Len: int(unsafe.Sizeof(p.ptr)), + Cap: int(unsafe.Sizeof(p.ptr)), + } + return +} + +func (p *peerId) GobDecode(b []byte) error { + if uintptr(len(b)) != unsafe.Sizeof(p.ptr) { + panic(len(b)) + } + ptr := unsafe.Pointer(&b[0]) + p.ptr = *(*uintptr)(ptr) + log.Printf("%p", ptr) + dst := reflect.SliceHeader{ + Data: uintptr(unsafe.Pointer(&p.Peer)), + Len: int(unsafe.Sizeof(p.Peer)), + Cap: int(unsafe.Sizeof(p.Peer)), + } + copy(*(*[]byte)(unsafe.Pointer(&dst)), b) + return nil +} + func setPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.PeerNextRequestState) { p := _p.(peerId).Peer p.nextRequestState = rp -- 2.44.0