From a54d2d81e48267b37ce21f4dbf0ee6160715cefb Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Fri, 6 May 2022 16:24:46 +1000 Subject: [PATCH] Retain peer local request ordering --- go.mod | 1 + go.sum | 2 ++ ordered-bitmap.go | 58 ++++++++++++++++++++++++++++++++++++++++ peerconn.go | 46 ++++++++++++++----------------- request-strategy/peer.go | 4 +-- 5 files changed, 82 insertions(+), 29 deletions(-) create mode 100644 ordered-bitmap.go diff --git a/go.mod b/go.mod index 627cd6de..c77eb3f9 100644 --- a/go.mod +++ b/go.mod @@ -50,6 +50,7 @@ require ( github.com/alexflint/go-scalar v1.1.0 // indirect github.com/anacrolix/mmsg v1.0.0 // indirect github.com/anacrolix/stm v0.3.0 // indirect + github.com/bahlo/generic-list-go v0.2.0 // indirect github.com/benbjohnson/immutable v0.3.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.2.0 // indirect diff --git a/go.sum b/go.sum index d70c5578..7cf2d2a2 100644 --- a/go.sum +++ b/go.sum @@ -88,6 +88,8 @@ github.com/anacrolix/upnp v0.1.3-0.20220123035249-922794e51c96/go.mod h1:Wa6n8cY github.com/anacrolix/utp v0.1.0 h1:FOpQOmIwYsnENnz7tAGohA+r6iXpRjrq8ssKSre2Cp4= github.com/anacrolix/utp v0.1.0/go.mod h1:MDwc+vsGEq7RMw6lr2GKOEqjWny5hO5OZXRVNaBJ2Dk= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= +github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= +github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg= github.com/benbjohnson/immutable v0.2.0/go.mod h1:uc6OHo6PN2++n98KHLxW8ef4W42ylHiQSENghE1ezxI= github.com/benbjohnson/immutable v0.3.0 h1:TVRhuZx2wG9SZ0LRdqlbs9S5BZ6Y24hJEHTCgWHZEIw= github.com/benbjohnson/immutable v0.3.0/go.mod h1:uc6OHo6PN2++n98KHLxW8ef4W42ylHiQSENghE1ezxI= diff --git a/ordered-bitmap.go b/ordered-bitmap.go new file mode 100644 index 00000000..685ac62c --- /dev/null +++ b/ordered-bitmap.go @@ -0,0 +1,58 @@ +package torrent + +import ( + "github.com/anacrolix/generics" + "github.com/anacrolix/torrent/typed-roaring" + list "github.com/bahlo/generic-list-go" +) + +type orderedBitmap[T typedRoaring.BitConstraint] struct { + bitmap typedRoaring.Bitmap[T] + // There should be way more efficient ways to do this. + order list.List[T] + elements map[T]*list.Element[T] +} + +func (o *orderedBitmap[T]) IterateSnapshot(f func(T) bool) { + o.bitmap.Clone().Iterate(f) +} + +func (o *orderedBitmap[T]) IsEmpty() bool { + return o.bitmap.IsEmpty() +} + +func (o *orderedBitmap[T]) GetCardinality() uint64 { + return uint64(o.order.Len()) +} + +func (o *orderedBitmap[T]) Contains(index T) bool { + return o.bitmap.Contains(index) +} + +func (o *orderedBitmap[T]) Add(index T) { + o.bitmap.Add(index) + if _, ok := o.elements[index]; !ok { + generics.MakeMapIfNilAndSet(&o.elements, index, o.order.PushBack(index)) + } +} + +func (o *orderedBitmap[T]) Rank(index T) uint64 { + return o.bitmap.Rank(index) +} + +func (o *orderedBitmap[T]) Iterate(f func(T) bool) { + for e := o.order.Front(); e != nil; e = e.Next() { + if !f(e.Value) { + return + } + } +} + +func (o *orderedBitmap[T]) CheckedRemove(index T) bool { + if !o.bitmap.CheckedRemove(index) { + return false + } + o.order.Remove(o.elements[index]) + delete(o.elements, index) + return true +} diff --git a/peerconn.go b/peerconn.go index a344934e..08b1814b 100644 --- a/peerconn.go +++ b/peerconn.go @@ -8,7 +8,6 @@ import ( "io" "math/rand" "net" - "sort" "strconv" "strings" "sync/atomic" @@ -130,13 +129,7 @@ type Peer struct { logger log.Logger } -type peerRequests struct { - typedRoaring.Bitmap[RequestIndex] -} - -func (p *peerRequests) IterateSnapshot(f func(request_strategy.RequestIndex) bool) { - p.Clone().Iterate(f) -} +type peerRequests = orderedBitmap[RequestIndex] func (p *Peer) initRequestState() { p.requestState.Requests = &peerRequests{} @@ -359,13 +352,25 @@ func (cn *Peer) downloadRate() float64 { return float64(num) / cn.totalExpectingTime().Seconds() } -func (cn *Peer) numRequestsByPiece() (ret map[pieceIndex]int) { - ret = make(map[pieceIndex]int) - cn.requestState.Requests.Iterate(func(x RequestIndex) bool { - ret[cn.t.pieceIndexOfRequestIndex(x)]++ +func (cn *Peer) iterContiguousPieceRequests(f func(piece pieceIndex, count int)) { + var last Option[pieceIndex] + var count int + next := func(item Option[pieceIndex]) { + if item == last { + count++ + } else { + if count != 0 { + f(last.Value(), count) + } + last = item + count = 1 + } + } + cn.requestState.Requests.Iterate(func(requestIndex request_strategy.RequestIndex) bool { + next(Some(cn.t.pieceIndexOfRequestIndex(requestIndex))) return true }) - return + next(None[pieceIndex]()) } func (cn *Peer) writeStatus(w io.Writer, t *Torrent) { @@ -404,20 +409,9 @@ func (cn *Peer) writeStatus(w io.Writer, t *Torrent) { cn.downloadRate()/(1<<10), ) fmt.Fprintf(w, " requested pieces:") - type pieceNumRequestsType struct { - piece pieceIndex - numRequests int - } - var pieceNumRequests []pieceNumRequestsType - for piece, count := range cn.numRequestsByPiece() { - pieceNumRequests = append(pieceNumRequests, pieceNumRequestsType{piece, count}) - } - sort.Slice(pieceNumRequests, func(i, j int) bool { - return pieceNumRequests[i].piece < pieceNumRequests[j].piece + cn.iterContiguousPieceRequests(func(piece pieceIndex, count int) { + fmt.Fprintf(w, " %v(%v)", piece, count) }) - for _, elem := range pieceNumRequests { - fmt.Fprintf(w, " %v(%v)", elem.piece, elem.numRequests) - } fmt.Fprintf(w, "\n") } diff --git a/request-strategy/peer.go b/request-strategy/peer.go index 8b711f3c..cbdb2b75 100644 --- a/request-strategy/peer.go +++ b/request-strategy/peer.go @@ -6,9 +6,7 @@ import ( type PeerRequestState struct { Interested bool - // Expecting. TODO: This should be ordered so webseed requesters initiate in the same order they - // were assigned. - Requests PeerRequests + Requests PeerRequests // Cancelled and waiting response Cancelled typedRoaring.Bitmap[RequestIndex] } -- 2.44.0