]> Sergey Matveev's repositories - btrtrc.git/blobdiff - peerconn.go
Revert "Switch requestState to be a slice"
[btrtrc.git] / peerconn.go
index a344934ebae9cf6119e7788423c16e350bd6cc8d..2eb15bdaa1483da5977142e1d197ed4b7f66232c 100644 (file)
@@ -8,7 +8,6 @@ import (
        "io"
        "math/rand"
        "net"
-       "sort"
        "strconv"
        "strings"
        "sync/atomic"
@@ -49,11 +48,10 @@ type PeerRemoteAddr interface {
        String() string
 }
 
-// Since we have to store all the requests in memory, we can't reasonably exceed what would be
-// indexable with the memory space available.
 type (
-       maxRequests  = int
-       requestState = request_strategy.PeerRequestState
+       // Since we have to store all the requests in memory, we can't reasonably exceed what could be
+       // indexed with the memory space available.
+       maxRequests = int
 )
 
 type Peer struct {
@@ -86,7 +84,7 @@ type Peer struct {
 
        // Stuff controlled by the local peer.
        needRequestUpdate    string
-       requestState         requestState
+       requestState         request_strategy.PeerRequestState
        updateRequestsTimer  *time.Timer
        lastRequestUpdate    time.Time
        peakRequests         maxRequests
@@ -130,13 +128,7 @@ type Peer struct {
        logger log.Logger
 }
 
-type peerRequests struct {
-       typedRoaring.Bitmap[RequestIndex]
-}
-
-func (p *peerRequests) IterateSnapshot(f func(request_strategy.RequestIndex) bool) {
-       p.Clone().Iterate(f)
-}
+type peerRequests = orderedBitmap[RequestIndex]
 
 func (p *Peer) initRequestState() {
        p.requestState.Requests = &peerRequests{}
@@ -359,13 +351,32 @@ func (cn *Peer) downloadRate() float64 {
        return float64(num) / cn.totalExpectingTime().Seconds()
 }
 
-func (cn *Peer) numRequestsByPiece() (ret map[pieceIndex]int) {
-       ret = make(map[pieceIndex]int)
-       cn.requestState.Requests.Iterate(func(x RequestIndex) bool {
-               ret[cn.t.pieceIndexOfRequestIndex(x)]++
+func (cn *Peer) DownloadRate() float64 {
+       cn.locker().Lock()
+       defer cn.locker().Unlock()
+
+       return cn.downloadRate()
+}
+
+func (cn *Peer) iterContiguousPieceRequests(f func(piece pieceIndex, count int)) {
+       var last Option[pieceIndex]
+       var count int
+       next := func(item Option[pieceIndex]) {
+               if item == last {
+                       count++
+               } else {
+                       if count != 0 {
+                               f(last.Value(), count)
+                       }
+                       last = item
+                       count = 1
+               }
+       }
+       cn.requestState.Requests.Iterate(func(requestIndex request_strategy.RequestIndex) bool {
+               next(Some(cn.t.pieceIndexOfRequestIndex(requestIndex)))
                return true
        })
-       return
+       next(None[pieceIndex]())
 }
 
 func (cn *Peer) writeStatus(w io.Writer, t *Torrent) {
@@ -404,20 +415,9 @@ func (cn *Peer) writeStatus(w io.Writer, t *Torrent) {
                cn.downloadRate()/(1<<10),
        )
        fmt.Fprintf(w, "    requested pieces:")
-       type pieceNumRequestsType struct {
-               piece       pieceIndex
-               numRequests int
-       }
-       var pieceNumRequests []pieceNumRequestsType
-       for piece, count := range cn.numRequestsByPiece() {
-               pieceNumRequests = append(pieceNumRequests, pieceNumRequestsType{piece, count})
-       }
-       sort.Slice(pieceNumRequests, func(i, j int) bool {
-               return pieceNumRequests[i].piece < pieceNumRequests[j].piece
+       cn.iterContiguousPieceRequests(func(piece pieceIndex, count int) {
+               fmt.Fprintf(w, " %v(%v)", piece, count)
        })
-       for _, elem := range pieceNumRequests {
-               fmt.Fprintf(w, " %v(%v)", elem.piece, elem.numRequests)
-       }
        fmt.Fprintf(w, "\n")
 }
 
@@ -510,7 +510,7 @@ var (
 
 // The actual value to use as the maximum outbound requests.
 func (cn *Peer) nominalMaxRequests() maxRequests {
-       return maxRequests(maxInt(1, minInt(cn.PeerMaxRequests, cn.peakRequests*2, maxLocalToRemoteRequests)))
+       return maxInt(1, minInt(cn.PeerMaxRequests, cn.peakRequests*2, maxLocalToRemoteRequests))
 }
 
 func (cn *Peer) totalExpectingTime() (ret time.Duration) {
@@ -643,8 +643,10 @@ func (cn *Peer) request(r RequestIndex) (more bool, err error) {
                cn.validReceiveChunks = make(map[RequestIndex]int)
        }
        cn.validReceiveChunks[r]++
-       cn.t.pendingRequests[r] = cn
-       cn.t.lastRequested[r] = time.Now()
+       cn.t.requestState[r] = requestState{
+               peer: cn,
+               when: time.Now(),
+       }
        cn.updateExpectingChunks()
        ppReq := cn.t.requestIndexToRequest(r)
        for _, f := range cn.callbacks.SentRequest {
@@ -1470,7 +1472,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
        piece.unpendChunkIndex(chunkIndexFromChunkSpec(ppReq.ChunkSpec, t.chunkSize))
 
        // Cancel pending requests for this chunk from *other* peers.
-       if p := t.pendingRequests[req]; p != nil {
+       if p := t.requestingPeer(req); p != nil {
                if p == c {
                        panic("should not be pending request from conn that just received it")
                }
@@ -1633,8 +1635,7 @@ func (c *Peer) deleteRequest(r RequestIndex) bool {
        if c.t.requestingPeer(r) != c {
                panic("only one peer should have a given request at a time")
        }
-       delete(c.t.pendingRequests, r)
-       delete(c.t.lastRequested, r)
+       delete(c.t.requestState, r)
        // c.t.iterPeers(func(p *Peer) {
        //      if p.isLowOnRequests() {
        //              p.updateRequests("Peer.deleteRequest")