]> Sergey Matveev's repositories - btrtrc.git/blobdiff - webseed-peer.go
Drop support for go 1.20
[btrtrc.git] / webseed-peer.go
index 0cf6c7daa364e560db79a5ddb65da2d35f6ec7c7..5b6632badd8153ff0f685e75b4c4452c188ff34d 100644 (file)
@@ -10,24 +10,33 @@ import (
 
        "github.com/RoaringBitmap/roaring"
        "github.com/anacrolix/log"
+
        "github.com/anacrolix/torrent/metainfo"
        pp "github.com/anacrolix/torrent/peer_protocol"
        "github.com/anacrolix/torrent/webseed"
 )
 
+const (
+       webseedPeerUnhandledErrorSleep   = 5 * time.Second
+       webseedPeerCloseOnUnhandledError = false
+)
+
 type webseedPeer struct {
-       client         webseed.Client
-       activeRequests map[Request]webseed.Request
-       requesterCond  sync.Cond
-       peer           Peer
-       // Number of requester routines.
-       maxRequests int
+       // First field for stats alignment.
+       peer             Peer
+       client           webseed.Client
+       activeRequests   map[Request]webseed.Request
+       requesterCond    sync.Cond
+       lastUnhandledErr time.Time
 }
 
 var _ peerImpl = (*webseedPeer)(nil)
 
-func (me *webseedPeer) connStatusString() string {
-       return me.client.Url
+func (me *webseedPeer) peerImplStatusLines() []string {
+       return []string{
+               me.client.Url,
+               fmt.Sprintf("last unhandled error: %v", eventAgeString(me.lastUnhandledErr)),
+       }
 }
 
 func (ws *webseedPeer) String() string {
@@ -48,17 +57,14 @@ func (ws *webseedPeer) writeInterested(interested bool) bool {
        return true
 }
 
-func (ws *webseedPeer) _cancel(r RequestIndex) {
-       active, ok := ws.activeRequests[ws.peer.t.requestIndexToRequest(r)]
-       if ok {
+func (ws *webseedPeer) _cancel(r RequestIndex) bool {
+       if active, ok := ws.activeRequests[ws.peer.t.requestIndexToRequest(r)]; ok {
                active.Cancel()
+               // The requester is running and will handle the result.
+               return true
        }
-       if !ws.peer.deleteRequest(r) {
-               panic("cancelled webseed request should exist")
-       }
-       if ws.peer.isLowOnRequests() {
-               ws.peer.updateRequests("webseedPeer._cancel")
-       }
+       // There should be no requester handling this, so no further events will occur.
+       return false
 }
 
 func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec {
@@ -87,8 +93,9 @@ func (ws *webseedPeer) requester(i int) {
        defer ws.requesterCond.L.Unlock()
 start:
        for !ws.peer.closed.IsSet() {
+               // Restart is set if we don't need to wait for the requestCond before trying again.
                restart := false
-               ws.peer.actualRequestState.Requests.Iterate(func(x uint32) bool {
+               ws.peer.requestState.Requests.Iterate(func(x RequestIndex) bool {
                        r := ws.peer.t.requestIndexToRequest(x)
                        if _, ok := ws.activeRequests[r]; ok {
                                return true
@@ -102,6 +109,11 @@ start:
                        if errors.Is(err, webseed.ErrTooFast) {
                                time.Sleep(time.Duration(rand.Int63n(int64(10 * time.Second))))
                        }
+                       // Demeter is throwing a tantrum on Mount Olympus for this
+                       ws.peer.t.cl.locker().RLock()
+                       duration := time.Until(ws.lastUnhandledErr.Add(webseedPeerUnhandledErrorSleep))
+                       ws.peer.t.cl.locker().RUnlock()
+                       time.Sleep(duration)
                        ws.requesterCond.L.Lock()
                        return false
                })
@@ -116,10 +128,13 @@ func (ws *webseedPeer) connectionFlags() string {
        return "WS"
 }
 
-// TODO: This is called when banning peers. Perhaps we want to be able to ban webseeds too. We could
-// return bool if this is even possible, and if it isn't, skip to the next drop candidate.
+// Maybe this should drop all existing connections, or something like that.
 func (ws *webseedPeer) drop() {}
 
+func (cn *webseedPeer) ban() {
+       cn.peer.close()
+}
+
 func (ws *webseedPeer) handleUpdateRequests() {
        // Because this is synchronous, webseed peers seem to get first dibs on newly prioritized
        // pieces.
@@ -131,11 +146,14 @@ func (ws *webseedPeer) handleUpdateRequests() {
 }
 
 func (ws *webseedPeer) onClose() {
-       ws.peer.logger.WithLevel(log.Debug).Print("closing")
-       ws.peer.deleteAllRequests()
-       for _, r := range ws.activeRequests {
-               r.Cancel()
-       }
+       ws.peer.logger.Levelf(log.Debug, "closing")
+       // Just deleting them means we would have to manually cancel active requests.
+       ws.peer.cancelAllRequests()
+       ws.peer.t.iterPeers(func(p *Peer) {
+               if p.isLowOnRequests() {
+                       p.updateRequests("webseedPeer.onClose")
+               }
+       })
        ws.requesterCond.Broadcast()
 }
 
@@ -167,10 +185,17 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re
                        // cfg := spew.NewDefaultConfig()
                        // cfg.DisableMethods = true
                        // cfg.Dump(result.Err)
-                       log.Printf("closing %v", ws)
-                       ws.peer.close()
+
+                       if webseedPeerCloseOnUnhandledError {
+                               log.Printf("closing %v", ws)
+                               ws.peer.close()
+                       } else {
+                               ws.lastUnhandledErr = time.Now()
+                       }
+               }
+               if !ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r)) {
+                       panic("invalid reject")
                }
-               ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r))
                return err
        }
        err = ws.peer.receiveChunk(&pp.Message{
@@ -185,10 +210,6 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re
        return err
 }
 
-func (me *webseedPeer) isLowOnRequests() bool {
-       return me.peer.actualRequestState.Requests.GetCardinality() < uint64(me.maxRequests)
-}
-
 func (me *webseedPeer) peerPieces() *roaring.Bitmap {
        return &me.client.Pieces
 }