]> Sergey Matveev's repositories - btrtrc.git/blobdiff - peerconn.go
Revert "Switch requestState to be a slice"
[btrtrc.git] / peerconn.go
index ed77f44442f6ce36aae35dc75e99d12e97c51a39..2eb15bdaa1483da5977142e1d197ed4b7f66232c 100644 (file)
@@ -8,7 +8,6 @@ import (
        "io"
        "math/rand"
        "net"
-       "sort"
        "strconv"
        "strings"
        "sync/atomic"
@@ -26,6 +25,7 @@ import (
        "github.com/anacrolix/torrent/mse"
        pp "github.com/anacrolix/torrent/peer_protocol"
        request_strategy "github.com/anacrolix/torrent/request-strategy"
+       "github.com/anacrolix/torrent/typed-roaring"
 )
 
 type PeerSource string
@@ -48,11 +48,10 @@ type PeerRemoteAddr interface {
        String() string
 }
 
-// Since we have to store all the requests in memory, we can't reasonably exceed what would be
-// indexable with the memory space available.
 type (
-       maxRequests  = int
-       requestState = request_strategy.PeerRequestState
+       // Since we have to store all the requests in memory, we can't reasonably exceed what could be
+       // indexed with the memory space available.
+       maxRequests = int
 )
 
 type Peer struct {
@@ -85,7 +84,7 @@ type Peer struct {
 
        // Stuff controlled by the local peer.
        needRequestUpdate    string
-       requestState         requestState
+       requestState         request_strategy.PeerRequestState
        updateRequestsTimer  *time.Timer
        lastRequestUpdate    time.Time
        peakRequests         maxRequests
@@ -120,7 +119,7 @@ type Peer struct {
        peerMinPieces pieceIndex
        // Pieces we've accepted chunks for from the peer.
        peerTouchedPieces map[pieceIndex]struct{}
-       peerAllowedFast   roaring.Bitmap
+       peerAllowedFast   typedRoaring.Bitmap[pieceIndex]
 
        PeerMaxRequests  maxRequests // Maximum pending requests the peer allows.
        PeerExtensionIDs map[pp.ExtensionName]pp.ExtensionNumber
@@ -129,6 +128,12 @@ type Peer struct {
        logger log.Logger
 }
 
+type peerRequests = orderedBitmap[RequestIndex]
+
+func (p *Peer) initRequestState() {
+       p.requestState.Requests = &peerRequests{}
+}
+
 // Maintains the state of a BitTorrent-protocol based connection with a peer.
 type PeerConn struct {
        Peer
@@ -189,11 +194,11 @@ func (cn *Peer) expectingChunks() bool {
                return true
        }
        haveAllowedFastRequests := false
-       cn.peerAllowedFast.Iterate(func(i uint32) bool {
-               haveAllowedFastRequests = roaringBitmapRangeCardinality(
-                       &cn.requestState.Requests,
-                       cn.t.pieceRequestIndexOffset(pieceIndex(i)),
-                       cn.t.pieceRequestIndexOffset(pieceIndex(i+1)),
+       cn.peerAllowedFast.Iterate(func(i pieceIndex) bool {
+               haveAllowedFastRequests = roaringBitmapRangeCardinality[RequestIndex](
+                       cn.requestState.Requests,
+                       cn.t.pieceRequestIndexOffset(i),
+                       cn.t.pieceRequestIndexOffset(i+1),
                ) == 0
                return !haveAllowedFastRequests
        })
@@ -201,7 +206,7 @@ func (cn *Peer) expectingChunks() bool {
 }
 
 func (cn *Peer) remoteChokingPiece(piece pieceIndex) bool {
-       return cn.peerChoking && !cn.peerAllowedFast.Contains(bitmap.BitIndex(piece))
+       return cn.peerChoking && !cn.peerAllowedFast.Contains(piece)
 }
 
 // Returns true if the connection is over IPv6.
@@ -346,13 +351,32 @@ func (cn *Peer) downloadRate() float64 {
        return float64(num) / cn.totalExpectingTime().Seconds()
 }
 
-func (cn *Peer) numRequestsByPiece() (ret map[pieceIndex]int) {
-       ret = make(map[pieceIndex]int)
-       cn.requestState.Requests.Iterate(func(x uint32) bool {
-               ret[pieceIndex(x/cn.t.chunksPerRegularPiece())]++
+func (cn *Peer) DownloadRate() float64 {
+       cn.locker().Lock()
+       defer cn.locker().Unlock()
+
+       return cn.downloadRate()
+}
+
+func (cn *Peer) iterContiguousPieceRequests(f func(piece pieceIndex, count int)) {
+       var last Option[pieceIndex]
+       var count int
+       next := func(item Option[pieceIndex]) {
+               if item == last {
+                       count++
+               } else {
+                       if count != 0 {
+                               f(last.Value(), count)
+                       }
+                       last = item
+                       count = 1
+               }
+       }
+       cn.requestState.Requests.Iterate(func(requestIndex request_strategy.RequestIndex) bool {
+               next(Some(cn.t.pieceIndexOfRequestIndex(requestIndex)))
                return true
        })
-       return
+       next(None[pieceIndex]())
 }
 
 func (cn *Peer) writeStatus(w io.Writer, t *Torrent) {
@@ -391,20 +415,9 @@ func (cn *Peer) writeStatus(w io.Writer, t *Torrent) {
                cn.downloadRate()/(1<<10),
        )
        fmt.Fprintf(w, "    requested pieces:")
-       type pieceNumRequestsType struct {
-               piece       pieceIndex
-               numRequests int
-       }
-       var pieceNumRequests []pieceNumRequestsType
-       for piece, count := range cn.numRequestsByPiece() {
-               pieceNumRequests = append(pieceNumRequests, pieceNumRequestsType{piece, count})
-       }
-       sort.Slice(pieceNumRequests, func(i, j int) bool {
-               return pieceNumRequests[i].piece < pieceNumRequests[j].piece
+       cn.iterContiguousPieceRequests(func(piece pieceIndex, count int) {
+               fmt.Fprintf(w, " %v(%v)", piece, count)
        })
-       for _, elem := range pieceNumRequests {
-               fmt.Fprintf(w, " %v(%v)", elem.piece, elem.numRequests)
-       }
        fmt.Fprintf(w, "\n")
 }
 
@@ -497,7 +510,7 @@ var (
 
 // The actual value to use as the maximum outbound requests.
 func (cn *Peer) nominalMaxRequests() maxRequests {
-       return maxRequests(maxInt(1, minInt(cn.PeerMaxRequests, cn.peakRequests*2, maxLocalToRemoteRequests)))
+       return maxInt(1, minInt(cn.PeerMaxRequests, cn.peakRequests*2, maxLocalToRemoteRequests))
 }
 
 func (cn *Peer) totalExpectingTime() (ret time.Duration) {
@@ -578,7 +591,7 @@ type messageWriter func(pp.Message) bool
 // This function seems to only used by Peer.request. It's all logic checks, so maybe we can no-op it
 // when we want to go fast.
 func (cn *Peer) shouldRequest(r RequestIndex) error {
-       pi := pieceIndex(r / cn.t.chunksPerRegularPiece())
+       pi := cn.t.pieceIndexOfRequestIndex(r)
        if cn.requestState.Cancelled.Contains(r) {
                return errors.New("request is cancelled and waiting acknowledgement")
        }
@@ -597,7 +610,7 @@ func (cn *Peer) shouldRequest(r RequestIndex) error {
        if cn.t.pieceQueuedForHash(pi) {
                panic("piece is queued for hash")
        }
-       if cn.peerChoking && !cn.peerAllowedFast.Contains(bitmap.BitIndex(pi)) {
+       if cn.peerChoking && !cn.peerAllowedFast.Contains(pi) {
                // This could occur if we made a request with the fast extension, and then got choked and
                // haven't had the request rejected yet.
                if !cn.requestState.Requests.Contains(r) {
@@ -630,8 +643,10 @@ func (cn *Peer) request(r RequestIndex) (more bool, err error) {
                cn.validReceiveChunks = make(map[RequestIndex]int)
        }
        cn.validReceiveChunks[r]++
-       cn.t.pendingRequests[r] = cn
-       cn.t.lastRequested[r] = time.Now()
+       cn.t.requestState[r] = requestState{
+               peer: cn,
+               when: time.Now(),
+       }
        cn.updateExpectingChunks()
        ppReq := cn.t.requestIndexToRequest(r)
        for _, f := range cn.callbacks.SentRequest {
@@ -1117,7 +1132,7 @@ func (c *PeerConn) mainReadLoop() (err error) {
 
        decoder := pp.Decoder{
                R:         bufio.NewReaderSize(c.r, 1<<17),
-               MaxLength: 256 * 1024,
+               MaxLength: 4 * pp.Integer(max(int64(t.chunkSize), defaultChunkSize)),
                Pool:      &t.chunkPool,
        }
        for {
@@ -1152,13 +1167,7 @@ func (c *PeerConn) mainReadLoop() (err error) {
                                break
                        }
                        if !c.fastEnabled() {
-                               if !c.deleteAllRequests().IsEmpty() {
-                                       c.t.iterPeers(func(p *Peer) {
-                                               if p.isLowOnRequests() {
-                                                       p.updateRequests("choked by non-fast PeerConn")
-                                               }
-                                       })
-                               }
+                               c.deleteAllRequests("choked by non-fast PeerConn")
                        } else {
                                // We don't decrement pending requests here, let's wait for the peer to either
                                // reject or satisfy the outstanding requests. Additionally, some peers may unchoke
@@ -1178,8 +1187,8 @@ func (c *PeerConn) mainReadLoop() (err error) {
                        }
                        c.peerChoking = false
                        preservedCount := 0
-                       c.requestState.Requests.Iterate(func(x uint32) bool {
-                               if !c.peerAllowedFast.Contains(x / c.t.chunksPerRegularPiece()) {
+                       c.requestState.Requests.Iterate(func(x RequestIndex) bool {
+                               if !c.peerAllowedFast.Contains(c.t.pieceIndexOfRequestIndex(x)) {
                                        preservedCount++
                                }
                                return true
@@ -1251,7 +1260,7 @@ func (c *PeerConn) mainReadLoop() (err error) {
                case pp.Reject:
                        req := newRequestFromMessage(&msg)
                        if !c.remoteRejectedRequest(c.t.requestIndexFromRequest(req)) {
-                               log.Printf("received invalid reject [request=%v, peer=%v]", req, c)
+                               c.logger.Printf("received invalid reject [request=%v, peer=%v]", req, c)
                                err = fmt.Errorf("received invalid reject [request=%v]", req)
                        }
                case pp.AllowedFast:
@@ -1404,7 +1413,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
        }
        c.decExpectedChunkReceive(req)
 
-       if c.peerChoking && c.peerAllowedFast.Contains(bitmap.BitIndex(ppReq.Index)) {
+       if c.peerChoking && c.peerAllowedFast.Contains(pieceIndex(ppReq.Index)) {
                chunksReceived.Add("due to allowed fast", 1)
        }
 
@@ -1463,7 +1472,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
        piece.unpendChunkIndex(chunkIndexFromChunkSpec(ppReq.ChunkSpec, t.chunkSize))
 
        // Cancel pending requests for this chunk from *other* peers.
-       if p := t.pendingRequests[req]; p != nil {
+       if p := t.requestingPeer(req); p != nil {
                if p == c {
                        panic("should not be pending request from conn that just received it")
                }
@@ -1595,6 +1604,10 @@ func (cn *PeerConn) drop() {
        cn.t.dropConnection(cn)
 }
 
+func (cn *PeerConn) ban() {
+       cn.t.cl.banPeerIP(cn.remoteIp())
+}
+
 func (cn *Peer) netGoodPiecesDirtied() int64 {
        return cn._stats.PiecesDirtiedGood.Int64() - cn._stats.PiecesDirtiedBad.Int64()
 }
@@ -1622,8 +1635,7 @@ func (c *Peer) deleteRequest(r RequestIndex) bool {
        if c.t.requestingPeer(r) != c {
                panic("only one peer should have a given request at a time")
        }
-       delete(c.t.pendingRequests, r)
-       delete(c.t.lastRequested, r)
+       delete(c.t.requestState, r)
        // c.t.iterPeers(func(p *Peer) {
        //      if p.isLowOnRequests() {
        //              p.updateRequests("Peer.deleteRequest")
@@ -1632,15 +1644,22 @@ func (c *Peer) deleteRequest(r RequestIndex) bool {
        return true
 }
 
-func (c *Peer) deleteAllRequests() (deleted *roaring.Bitmap) {
-       deleted = c.requestState.Requests.Clone()
-       deleted.Iterate(func(x uint32) bool {
+func (c *Peer) deleteAllRequests(reason string) {
+       if c.requestState.Requests.IsEmpty() {
+               return
+       }
+       c.requestState.Requests.IterateSnapshot(func(x RequestIndex) bool {
                if !c.deleteRequest(x) {
                        panic("request should exist")
                }
                return true
        })
        c.assertNoRequests()
+       c.t.iterPeers(func(p *Peer) {
+               if p.isLowOnRequests() {
+                       p.updateRequests(reason)
+               }
+       })
        return
 }
 
@@ -1650,9 +1669,8 @@ func (c *Peer) assertNoRequests() {
        }
 }
 
-func (c *Peer) cancelAllRequests() (cancelled *roaring.Bitmap) {
-       cancelled = c.requestState.Requests.Clone()
-       cancelled.Iterate(func(x uint32) bool {
+func (c *Peer) cancelAllRequests() {
+       c.requestState.Requests.IterateSnapshot(func(x RequestIndex) bool {
                c.cancel(x)
                return true
        })