defer t.dropConnection(c)
c.startWriter()
cl.sendInitialMessages(c, t)
- c.updateRequestsTimer = time.AfterFunc(math.MaxInt64, c.updateRequestsTimerFunc)
- c.updateRequestsTimer.Stop()
+ c.initUpdateRequestsTimer()
err := c.mainReadLoop()
if err != nil {
return fmt.Errorf("main read loop: %w", err)
return nil
}
-func (c *PeerConn) updateRequestsTimerFunc() {
+const check = false
+
+func (p *Peer) initUpdateRequestsTimer() {
+ if check {
+ if p.updateRequestsTimer != nil {
+ panic(p.updateRequestsTimer)
+ }
+ }
+ p.updateRequestsTimer = time.AfterFunc(math.MaxInt64, p.updateRequestsTimerFunc)
+ p.updateRequestsTimer.Stop()
+}
+
+func (c *Peer) updateRequestsTimerFunc() {
c.locker().Lock()
defer c.locker().Unlock()
if c.needRequestUpdate != "" {
// BitTorrent protocol connections. Some methods are underlined so as to avoid collisions with
// legacy PeerConn methods.
type peerImpl interface {
- updateRequests(reason string)
+ // Trigger the actual request state to get updated
+ handleUpdateRequests()
writeInterested(interested bool) bool
// Neither of these return buffer room anymore, because they're currently both posted. There's
return roaring.Flip(&cn._peerPieces, 0, bitmap.BitRange(cn.t.numPieces())).IsEmpty(), true
}
-func (cn *PeerConn) locker() *lockWithDeferreds {
+func (cn *Peer) locker() *lockWithDeferreds {
return cn.t.cl.locker()
}
if !p.closed.Set() {
return
}
+ if p.updateRequestsTimer != nil {
+ p.updateRequestsTimer.Stop()
+ }
p.peerImpl.onClose()
if p.t != nil {
p.t.decPeerPieceAvailability(p)
if cn.pex.IsEnabled() {
cn.pex.Close()
}
- if cn.updateRequestsTimer != nil {
- cn.updateRequestsTimer.Stop()
- }
cn.tickleWriter()
if cn.conn != nil {
cn.conn.Close()
}
func (cn *PeerConn) fillWriteBuffer() {
- if !cn.applyNextRequestState() {
+ if !cn.maybeUpdateActualRequestState() {
return
}
if cn.pex.IsEnabled() {
cn.sentHaves = bitmap.Bitmap{cn.t._completedPieces.Clone()}
}
-func (cn *PeerConn) updateRequests(reason string) {
+// Sets a reason to update requests, and if there wasn't already one, handle it.
+func (cn *Peer) updateRequests(reason string) {
if cn.needRequestUpdate != "" {
return
}
cn.needRequestUpdate = reason
+ cn.handleUpdateRequests()
+}
+
+func (cn *PeerConn) handleUpdateRequests() {
+ // The writer determines the request state as needed when it can write.
cn.tickleWriter()
}
return
}
-func (p *Peer) applyNextRequestState() bool {
+func (p *Peer) maybeUpdateActualRequestState() bool {
if p.needRequestUpdate == "" {
return true
}
return more
}
+// Transmit/action the request state to the peer.
func (p *Peer) applyRequestState(next requestState) bool {
current := &p.actualRequestState
if !p.setInterested(next.Interested) {
if !more {
return false
}
+ // We randomize the order in which requests are issued, to reduce the overlap with requests to
+ // other peers. Note that although it really depends on what order the peer services the
+ // requests, if we are only able to issue some requests before buffering, or the peer starts
+ // handling our requests before they've all arrived, then this randomization should reduce
+ // overlap. Note however that if we received the desired requests in priority order, then
+ // randomizing would throw away that benefit.
for _, x := range rand.Perm(int(next.Requests.GetCardinality())) {
req, err := next.Requests.Select(uint32(x))
if err != nil {
// requests, so we can skip this one with no additional consideration.
continue
}
+ // The cardinality of our desired requests shouldn't exceed the max requests since it's used
+ // in the calculation of the requests. However if we cancelled requests and they haven't
+ // been rejected or serviced yet with the fast extension enabled, we can end up with more
+ // extra outstanding requests. We could subtract the number of outstanding cancels from the
+ // next request cardinality, but peers might not like that.
if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() {
//log.Printf("not assigning all requests [desired=%v, cancelled=%v, current=%v, max=%v]",
// next.Requests.GetCardinality(),
},
activeRequests: make(map[Request]webseed.Request, maxRequests),
}
+ ws.peer.initUpdateRequestsTimer()
ws.requesterCond.L = t.cl.locker()
for i := 0; i < maxRequests; i += 1 {
go ws.requester()
"strings"
"sync"
+ "github.com/anacrolix/log"
"github.com/anacrolix/torrent/common"
"github.com/anacrolix/torrent/metainfo"
pp "github.com/anacrolix/torrent/peer_protocol"
// return bool if this is even possible, and if it isn't, skip to the next drop candidate.
func (ws *webseedPeer) drop() {}
-func (ws *webseedPeer) updateRequests(reason string) {
+func (ws *webseedPeer) handleUpdateRequests() {
+ ws.peer.maybeUpdateActualRequestState()
}
func (ws *webseedPeer) onClose() {
- ws.peer.logger.Print("closing")
+ ws.peer.logger.WithLevel(log.Debug).Print("closing")
for _, r := range ws.activeRequests {
r.Cancel()
}