]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Do peer requests separately for each peer
authorMatt Joiner <anacrolix@gmail.com>
Sat, 18 Sep 2021 10:34:14 +0000 (20:34 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Sat, 18 Sep 2021 10:34:14 +0000 (20:34 +1000)
client.go
peerconn.go
request-strategy/order.go
request-strategy/torrent.go
requesting.go

index 9aa99b070650341748e71c588d21cf2b14fd5bf5..1a4c1a50ecc1e438a2d30ab74d5a42f2300768a0 100644 (file)
--- a/client.go
+++ b/client.go
@@ -305,7 +305,7 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) {
                },
        }
 
-       go cl.requester()
+       //go cl.requester()
 
        return
 }
index 2174b0f620e677c68812abd010b95b3a711b68e4..4eca560e35dd96bd933c77dda716bd05c82201da 100644 (file)
@@ -365,7 +365,7 @@ func (cn *Peer) writeStatus(w io.Writer, t *Torrent) {
                &cn._stats.ChunksReadUseful,
                &cn._stats.ChunksRead,
                &cn._stats.ChunksWritten,
-               cn.numLocalRequests(),
+               len(cn.actualRequestState.Requests),
                cn.nominalMaxRequests(),
                cn.PeerMaxRequests,
                len(cn.peerRequests),
@@ -463,7 +463,7 @@ func (cn *PeerConn) requestedMetadataPiece(index int) bool {
 
 // The actual value to use as the maximum outbound requests.
 func (cn *Peer) nominalMaxRequests() (ret maxRequests) {
-       return int(clamp(1, 2*int64(cn.maxPiecesReceivedBetweenRequestUpdates), int64(cn.PeerMaxRequests)))
+       return maxRequests(clamp(1, int64(cn.PeerMaxRequests), 128))
 }
 
 func (cn *Peer) totalExpectingTime() (ret time.Duration) {
@@ -576,7 +576,7 @@ func (cn *Peer) request(r Request) (more bool, err error) {
        if _, ok := cn.actualRequestState.Requests[r]; ok {
                return true, nil
        }
-       if cn.numLocalRequests() >= cn.nominalMaxRequests() {
+       if len(cn.actualRequestState.Requests) >= cn.nominalMaxRequests() {
                return true, errors.New("too many outstanding requests")
        }
        if cn.actualRequestState.Requests == nil {
@@ -653,7 +653,7 @@ func (cn *PeerConn) postBitfield() {
 }
 
 func (cn *PeerConn) updateRequests() {
-       cn.t.cl.tickleRequester()
+       cn.tickleWriter()
 }
 
 // Emits the indices in the Bitmaps bms in order, never repeating any index.
@@ -1453,10 +1453,6 @@ func (c *Peer) peerHasWantedPieces() bool {
        return !c._pieceRequestOrder.IsEmpty()
 }
 
-func (c *Peer) numLocalRequests() int {
-       return len(c.actualRequestState.Requests)
-}
-
 func (c *Peer) deleteRequest(r Request) bool {
        delete(c.nextRequestState.Requests, r)
        if _, ok := c.actualRequestState.Requests[r]; !ok {
index 0a32ae9a40fa47ae8b2b85dc79a33682fd8383a0..473b02b3fffc4468ea5f3a53539bb2406079dd53 100644 (file)
@@ -1,11 +1,13 @@
 package request_strategy
 
 import (
+       "bytes"
        "fmt"
        "sort"
        "sync"
 
        "github.com/anacrolix/multiless"
+       "github.com/anacrolix/torrent/metainfo"
        "github.com/anacrolix/torrent/storage"
 
        pp "github.com/anacrolix/torrent/peer_protocol"
@@ -41,9 +43,12 @@ func sortFilterPieces(pieces []filterPiece) {
                        i.Availability, j.Availability,
                ).Int(
                        i.index, j.index,
-               ).Uintptr(
-                       i.t.StableId, j.t.StableId,
-               ).MustLess()
+               ).Lazy(func() multiless.Computation {
+                       return multiless.New().Cmp(bytes.Compare(
+                               i.t.InfoHash[:],
+                               j.t.InfoHash[:],
+                       ))
+               }).MustLess()
        })
 }
 
@@ -170,7 +175,7 @@ func Run(input Input) map[PeerId]PeerNextRequestState {
                })
        })
        torrents := input.Torrents
-       allPeers := make(map[uintptr][]*requestsPeer, len(torrents))
+       allPeers := make(map[metainfo.Hash][]*requestsPeer, len(torrents))
        for _, t := range torrents {
                peers := make([]*requestsPeer, 0, len(t.Peers))
                for _, p := range t.Peers {
@@ -181,17 +186,17 @@ func Run(input Input) map[PeerId]PeerNextRequestState {
                                },
                        })
                }
-               allPeers[t.StableId] = peers
+               allPeers[t.InfoHash] = peers
        }
        for _, piece := range requestPieces {
-               for _, peer := range allPeers[piece.t.StableId] {
+               for _, peer := range allPeers[piece.t.InfoHash] {
                        if peer.canRequestPiece(piece.index) {
                                peer.requestablePiecesRemaining++
                        }
                }
        }
        for _, piece := range requestPieces {
-               allocatePendingChunks(piece, allPeers[piece.t.StableId])
+               allocatePendingChunks(piece, allPeers[piece.t.InfoHash])
        }
        ret := make(map[PeerId]PeerNextRequestState)
        for _, peers := range allPeers {
index a31ec772aa9c125c9b7e85fc594c6bc6784d9a0f..a92bffc0bda8199c15c5a2b16a34eb57466de578 100644 (file)
@@ -1,6 +1,7 @@
 package request_strategy
 
 import (
+       "github.com/anacrolix/torrent/metainfo"
        "github.com/anacrolix/torrent/storage"
 )
 
@@ -10,7 +11,7 @@ type Torrent struct {
        // Unclosed Peers. Not necessary for getting requestable piece ordering.
        Peers []Peer
        // Some value that's unique and stable between runs. Could even use the infohash?
-       StableId uintptr
+       InfoHash metainfo.Hash
 
        MaxUnverifiedBytes int64
 }
index 7e42babacf5ccf087b28bcc70b067aab72eebe54..29a6dd96dfdcbecb3ec4747b98c0e89a431f1e2c 100644 (file)
@@ -5,6 +5,7 @@ import (
        "unsafe"
 
        "github.com/anacrolix/missinggo/v2/bitmap"
+       pp "github.com/anacrolix/torrent/peer_protocol"
 
        "github.com/anacrolix/chansync"
        request_strategy "github.com/anacrolix/torrent/request-strategy"
@@ -43,7 +44,7 @@ func (cl *Client) getRequestStrategyInput() request_strategy.Input {
        ts := make([]request_strategy.Torrent, 0, len(cl.torrents))
        for _, t := range cl.torrents {
                rst := request_strategy.Torrent{
-                       StableId: uintptr(unsafe.Pointer(t)),
+                       InfoHash: t.infoHash,
                }
                if t.storage != nil {
                        rst.Capacity = t.storage.Capacity
@@ -119,28 +120,72 @@ func setPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.Pee
 }
 
 func (p *Peer) applyNextRequestState() bool {
-       next := p.nextRequestState
-       current := p.actualRequestState
-       if !p.setInterested(next.Interested) {
-               return false
+       if len(p.actualRequestState.Requests) > p.nominalMaxRequests()/2 {
+               return true
+       }
+       type piece struct {
+               index   int
+               endGame bool
        }
-       for req := range current.Requests {
-               if _, ok := next.Requests[req]; !ok {
-                       if !p.cancel(req) {
-                               return false
+       var pieceOrder []piece
+       request_strategy.GetRequestablePieces(
+               p.t.cl.getRequestStrategyInput(),
+               func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
+                       if t.InfoHash != p.t.infoHash {
+                               return
+                       }
+                       if !p.peerHasPiece(pieceIndex) {
+                               return
+                       }
+                       pieceOrder = append(pieceOrder, piece{
+                               index:   pieceIndex,
+                               endGame: rsp.Priority == PiecePriorityNow,
+                       })
+               },
+       )
+       more := true
+       interested := false
+       for _, endGameIter := range []bool{false, true} {
+               for _, piece := range pieceOrder {
+                       tp := p.t.piece(piece.index)
+                       tp.iterUndirtiedChunks(func(cs ChunkSpec) {
+                               req := Request{pp.Integer(piece.index), cs}
+                               if !piece.endGame && !endGameIter && p.t.pendingRequests[req] > 0 {
+                                       return
+                               }
+                               interested = true
+                               more = p.setInterested(true)
+                               if !more {
+                                       return
+                               }
+                               if len(p.actualRequestState.Requests) >= p.nominalMaxRequests() {
+                                       return
+                               }
+                               if p.peerChoking && !p.peerAllowedFast.Contains(bitmap.BitIndex(req.Index)) {
+                                       return
+                               }
+                               var err error
+                               more, err = p.request(req)
+                               if err != nil {
+                                       panic(err)
+                               }
+                       })
+                       if interested && len(p.actualRequestState.Requests) >= p.nominalMaxRequests() {
+                               break
+                       }
+                       if !more {
+                               break
                        }
                }
-       }
-       for req := range next.Requests {
-               more, err := p.request(req)
-               if err != nil {
-                       panic(err)
-               } /* else {
-                       log.Print(req)
-               } */
                if !more {
-                       return false
+                       break
                }
        }
-       return true
+       if !more {
+               return false
+       }
+       if !interested {
+               p.setInterested(false)
+       }
+       return more
 }