]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Tidy up request and cancel
authorMatt Joiner <anacrolix@gmail.com>
Sun, 9 May 2021 13:38:38 +0000 (23:38 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 7 Jun 2021 03:01:39 +0000 (13:01 +1000)
peer-impl.go
peerconn.go
webseed-peer.go

index f4ad12a29dbd7f81079fe66df5052ca15c8ed416..880b8f3543fe885d0c95a190f3d84e6127d737c0 100644 (file)
@@ -10,12 +10,14 @@ import (
 type peerImpl interface {
        updateRequests()
        writeInterested(interested bool) bool
-       cancel(Request) bool
-       // Return true if there's room for more activity.
-       request(Request) bool
+
+       // Neither of these return buffer room anymore, because they're currently both posted. There's
+       // also PeerConn.writeBufferFull for when/where it matters.
+       _cancel(Request)
+       _request(Request)
+
        connectionFlags() string
        onClose()
-       _postCancel(Request)
        onGotInfo(*metainfo.Info)
        drop()
        String() string
index 1ce06994d228996184ae750766e350706bb1cbdd..aa31eec80840e636620deb56ed83a599a17c2bcb 100644 (file)
@@ -529,15 +529,15 @@ func (pc *PeerConn) writeInterested(interested bool) bool {
 // are okay.
 type messageWriter func(pp.Message) bool
 
-func (cn *Peer) request(r Request) (more bool, err error) {
+func (cn *Peer) request(r Request) error {
        if _, ok := cn.requests[r]; ok {
-               return true, nil
+               return nil
        }
        if cn.numLocalRequests() >= cn.nominalMaxRequests() {
-               return true, errors.New("too many outstanding requests")
+               return errors.New("too many outstanding requests")
        }
        if !cn.peerHasPiece(pieceIndex(r.Index)) {
-               return true, errors.New("requesting piece peer doesn't have")
+               return errors.New("requesting piece peer doesn't have")
        }
        if !cn.t.peerIsActive(cn) {
                panic("requesting but not in active conns")
@@ -545,19 +545,22 @@ func (cn *Peer) request(r Request) (more bool, err error) {
        if cn.closed.IsSet() {
                panic("requesting when connection is closed")
        }
-       if cn.peerChoking {
-               if cn.peerAllowedFast.Get(int(r.Index)) {
-                       torrent.Add("allowed fast requests sent", 1)
-               } else {
-                       return cn.setInterested(true), errors.New("requesting while choked and not allowed fast")
-               }
-       }
        if cn.t.hashingPiece(pieceIndex(r.Index)) {
                panic("piece is being hashed")
        }
        if cn.t.pieceQueuedForHash(pieceIndex(r.Index)) {
                panic("piece is queued for hash")
        }
+       if !cn.setInterested(true) {
+               return errors.New("write buffer full after expressing interest")
+       }
+       if cn.peerChoking {
+               if cn.peerAllowedFast.Get(int(r.Index)) {
+                       torrent.Add("allowed fast requests sent", 1)
+               } else {
+                       errors.New("peer choking and piece not in allowed fast set")
+               }
+       }
        if cn.requests == nil {
                cn.requests = make(map[Request]struct{})
        }
@@ -571,11 +574,12 @@ func (cn *Peer) request(r Request) (more bool, err error) {
        for _, f := range cn.callbacks.SentRequest {
                f(PeerRequestEvent{cn, r})
        }
-       return cn.peerImpl.request(r), nil
+       cn.peerImpl._request(r)
+       return nil
 }
 
-func (me *PeerConn) request(r Request) bool {
-       return me.write(pp.Message{
+func (me *PeerConn) _request(r Request) {
+       me.write(pp.Message{
                Type:   pp.Request,
                Index:  r.Index,
                Begin:  r.Begin,
@@ -583,8 +587,14 @@ func (me *PeerConn) request(r Request) bool {
        })
 }
 
-func (me *PeerConn) cancel(r Request) bool {
-       return me.write(makeCancelMessage(r))
+func (me *Peer) cancel(r Request) {
+       if me.deleteRequest(r) {
+               me.peerImpl._cancel(r)
+       }
+}
+
+func (me *PeerConn) _cancel(r Request) {
+       me.write(makeCancelMessage(r))
 }
 
 func (cn *PeerConn) fillWriteBuffer() {
@@ -1317,7 +1327,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
                if p == c {
                        return
                }
-               p.postCancel(req)
+               p.cancel(req)
        })
 
        err := func() error {
@@ -1472,26 +1482,6 @@ func (c *Peer) deleteRequest(r Request) bool {
        if n < 0 {
                panic(n)
        }
-       // If a request fails, updating the requests for the current peer first may miss the opportunity
-       // to try other peers for that request instead, depending on the request strategy. This might
-       // only affect webseed peers though, since they synchronously issue new requests: PeerConns do
-       // it in the writer routine.
-       const updateCurrentConnRequestsFirst = false
-       if updateCurrentConnRequestsFirst {
-               c.updateRequests()
-       }
-       // Give other conns a chance to pick up the request.
-       c.t.iterPeers(func(_c *Peer) {
-               // We previously checked that the peer wasn't interested to to only wake connections that
-               // were unable to issue requests due to starvation by the request strategy. There could be
-               // performance ramifications.
-               if _c != c && c.peerHasPiece(pieceIndex(r.Index)) {
-                       _c.updateRequests()
-               }
-       })
-       if !updateCurrentConnRequestsFirst {
-               c.updateRequests()
-       }
        return true
 }
 
@@ -1513,18 +1503,6 @@ func (c *PeerConn) tickleWriter() {
        c.writerCond.Broadcast()
 }
 
-func (c *Peer) postCancel(r Request) bool {
-       if !c.deleteRequest(r) {
-               return false
-       }
-       c.peerImpl._postCancel(r)
-       return true
-}
-
-func (c *PeerConn) _postCancel(r Request) {
-       c.post(makeCancelMessage(r))
-}
-
 func (c *PeerConn) sendChunk(r Request, msg func(pp.Message) bool, state *peerRequestState) (more bool) {
        c.lastChunkSent = time.Now()
        return msg(pp.Message{
index 9fa77a2857a685a8dda4b98643eaaaaf04010d58..5f2980c3e7ce56dda1ea8c82ebb38df38fefa0b5 100644 (file)
@@ -41,30 +41,23 @@ func (ws *webseedPeer) onGotInfo(info *metainfo.Info) {
        ws.client.Info = info
 }
 
-func (ws *webseedPeer) _postCancel(r Request) {
-       ws.cancel(r)
-}
-
 func (ws *webseedPeer) writeInterested(interested bool) bool {
        return true
 }
 
-func (ws *webseedPeer) cancel(r Request) bool {
+func (ws *webseedPeer) _cancel(r Request) {
        active, ok := ws.activeRequests[r]
-       if !ok {
-               return false
+       if ok {
+               active.Cancel()
        }
-       active.Cancel()
-       return true
 }
 
 func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec {
        return webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)}
 }
 
-func (ws *webseedPeer) request(r Request) bool {
+func (ws *webseedPeer) _request(r Request) {
        ws.requesterCond.Signal()
-       return true
 }
 
 func (ws *webseedPeer) doRequest(r Request) {