From 21358ba458da49e9b143dde4ca244e0931b64adc Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Sat, 18 Sep 2021 20:34:14 +1000 Subject: [PATCH] Do peer requests separately for each peer --- client.go | 2 +- peerconn.go | 12 ++---- request-strategy/order.go | 19 +++++---- request-strategy/torrent.go | 3 +- requesting.go | 83 ++++++++++++++++++++++++++++--------- 5 files changed, 83 insertions(+), 36 deletions(-) diff --git a/client.go b/client.go index 9aa99b07..1a4c1a50 100644 --- a/client.go +++ b/client.go @@ -305,7 +305,7 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) { }, } - go cl.requester() + //go cl.requester() return } diff --git a/peerconn.go b/peerconn.go index 2174b0f6..4eca560e 100644 --- a/peerconn.go +++ b/peerconn.go @@ -365,7 +365,7 @@ func (cn *Peer) writeStatus(w io.Writer, t *Torrent) { &cn._stats.ChunksReadUseful, &cn._stats.ChunksRead, &cn._stats.ChunksWritten, - cn.numLocalRequests(), + len(cn.actualRequestState.Requests), cn.nominalMaxRequests(), cn.PeerMaxRequests, len(cn.peerRequests), @@ -463,7 +463,7 @@ func (cn *PeerConn) requestedMetadataPiece(index int) bool { // The actual value to use as the maximum outbound requests. func (cn *Peer) nominalMaxRequests() (ret maxRequests) { - return int(clamp(1, 2*int64(cn.maxPiecesReceivedBetweenRequestUpdates), int64(cn.PeerMaxRequests))) + return maxRequests(clamp(1, int64(cn.PeerMaxRequests), 128)) } func (cn *Peer) totalExpectingTime() (ret time.Duration) { @@ -576,7 +576,7 @@ func (cn *Peer) request(r Request) (more bool, err error) { if _, ok := cn.actualRequestState.Requests[r]; ok { return true, nil } - if cn.numLocalRequests() >= cn.nominalMaxRequests() { + if len(cn.actualRequestState.Requests) >= cn.nominalMaxRequests() { return true, errors.New("too many outstanding requests") } if cn.actualRequestState.Requests == nil { @@ -653,7 +653,7 @@ func (cn *PeerConn) postBitfield() { } func (cn *PeerConn) updateRequests() { - cn.t.cl.tickleRequester() + cn.tickleWriter() } // Emits the indices in the Bitmaps bms in order, never repeating any index. @@ -1453,10 +1453,6 @@ func (c *Peer) peerHasWantedPieces() bool { return !c._pieceRequestOrder.IsEmpty() } -func (c *Peer) numLocalRequests() int { - return len(c.actualRequestState.Requests) -} - func (c *Peer) deleteRequest(r Request) bool { delete(c.nextRequestState.Requests, r) if _, ok := c.actualRequestState.Requests[r]; !ok { diff --git a/request-strategy/order.go b/request-strategy/order.go index 0a32ae9a..473b02b3 100644 --- a/request-strategy/order.go +++ b/request-strategy/order.go @@ -1,11 +1,13 @@ package request_strategy import ( + "bytes" "fmt" "sort" "sync" "github.com/anacrolix/multiless" + "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/storage" pp "github.com/anacrolix/torrent/peer_protocol" @@ -41,9 +43,12 @@ func sortFilterPieces(pieces []filterPiece) { i.Availability, j.Availability, ).Int( i.index, j.index, - ).Uintptr( - i.t.StableId, j.t.StableId, - ).MustLess() + ).Lazy(func() multiless.Computation { + return multiless.New().Cmp(bytes.Compare( + i.t.InfoHash[:], + j.t.InfoHash[:], + )) + }).MustLess() }) } @@ -170,7 +175,7 @@ func Run(input Input) map[PeerId]PeerNextRequestState { }) }) torrents := input.Torrents - allPeers := make(map[uintptr][]*requestsPeer, len(torrents)) + allPeers := make(map[metainfo.Hash][]*requestsPeer, len(torrents)) for _, t := range torrents { peers := make([]*requestsPeer, 0, len(t.Peers)) for _, p := range t.Peers { @@ -181,17 +186,17 @@ func Run(input Input) map[PeerId]PeerNextRequestState { }, }) } - allPeers[t.StableId] = peers + allPeers[t.InfoHash] = peers } for _, piece := range requestPieces { - for _, peer := range allPeers[piece.t.StableId] { + for _, peer := range allPeers[piece.t.InfoHash] { if peer.canRequestPiece(piece.index) { peer.requestablePiecesRemaining++ } } } for _, piece := range requestPieces { - allocatePendingChunks(piece, allPeers[piece.t.StableId]) + allocatePendingChunks(piece, allPeers[piece.t.InfoHash]) } ret := make(map[PeerId]PeerNextRequestState) for _, peers := range allPeers { diff --git a/request-strategy/torrent.go b/request-strategy/torrent.go index a31ec772..a92bffc0 100644 --- a/request-strategy/torrent.go +++ b/request-strategy/torrent.go @@ -1,6 +1,7 @@ package request_strategy import ( + "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/storage" ) @@ -10,7 +11,7 @@ type Torrent struct { // Unclosed Peers. Not necessary for getting requestable piece ordering. Peers []Peer // Some value that's unique and stable between runs. Could even use the infohash? - StableId uintptr + InfoHash metainfo.Hash MaxUnverifiedBytes int64 } diff --git a/requesting.go b/requesting.go index 7e42baba..29a6dd96 100644 --- a/requesting.go +++ b/requesting.go @@ -5,6 +5,7 @@ import ( "unsafe" "github.com/anacrolix/missinggo/v2/bitmap" + pp "github.com/anacrolix/torrent/peer_protocol" "github.com/anacrolix/chansync" request_strategy "github.com/anacrolix/torrent/request-strategy" @@ -43,7 +44,7 @@ func (cl *Client) getRequestStrategyInput() request_strategy.Input { ts := make([]request_strategy.Torrent, 0, len(cl.torrents)) for _, t := range cl.torrents { rst := request_strategy.Torrent{ - StableId: uintptr(unsafe.Pointer(t)), + InfoHash: t.infoHash, } if t.storage != nil { rst.Capacity = t.storage.Capacity @@ -119,28 +120,72 @@ func setPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.Pee } func (p *Peer) applyNextRequestState() bool { - next := p.nextRequestState - current := p.actualRequestState - if !p.setInterested(next.Interested) { - return false + if len(p.actualRequestState.Requests) > p.nominalMaxRequests()/2 { + return true + } + type piece struct { + index int + endGame bool } - for req := range current.Requests { - if _, ok := next.Requests[req]; !ok { - if !p.cancel(req) { - return false + var pieceOrder []piece + request_strategy.GetRequestablePieces( + p.t.cl.getRequestStrategyInput(), + func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) { + if t.InfoHash != p.t.infoHash { + return + } + if !p.peerHasPiece(pieceIndex) { + return + } + pieceOrder = append(pieceOrder, piece{ + index: pieceIndex, + endGame: rsp.Priority == PiecePriorityNow, + }) + }, + ) + more := true + interested := false + for _, endGameIter := range []bool{false, true} { + for _, piece := range pieceOrder { + tp := p.t.piece(piece.index) + tp.iterUndirtiedChunks(func(cs ChunkSpec) { + req := Request{pp.Integer(piece.index), cs} + if !piece.endGame && !endGameIter && p.t.pendingRequests[req] > 0 { + return + } + interested = true + more = p.setInterested(true) + if !more { + return + } + if len(p.actualRequestState.Requests) >= p.nominalMaxRequests() { + return + } + if p.peerChoking && !p.peerAllowedFast.Contains(bitmap.BitIndex(req.Index)) { + return + } + var err error + more, err = p.request(req) + if err != nil { + panic(err) + } + }) + if interested && len(p.actualRequestState.Requests) >= p.nominalMaxRequests() { + break + } + if !more { + break } } - } - for req := range next.Requests { - more, err := p.request(req) - if err != nil { - panic(err) - } /* else { - log.Print(req) - } */ if !more { - return false + break } } - return true + if !more { + return false + } + if !interested { + p.setInterested(false) + } + return more } -- 2.44.0