]> Sergey Matveev's repositories - btrtrc.git/blobdiff - webseed-peer.go
Drop support for go 1.20
[btrtrc.git] / webseed-peer.go
index 4f4ca1f60eeadf9759862bf5c4b7f6f640d0f722..5b6632badd8153ff0f685e75b4c4452c188ff34d 100644 (file)
@@ -10,24 +10,33 @@ import (
 
        "github.com/RoaringBitmap/roaring"
        "github.com/anacrolix/log"
+
        "github.com/anacrolix/torrent/metainfo"
        pp "github.com/anacrolix/torrent/peer_protocol"
        "github.com/anacrolix/torrent/webseed"
 )
 
+const (
+       webseedPeerUnhandledErrorSleep   = 5 * time.Second
+       webseedPeerCloseOnUnhandledError = false
+)
+
 type webseedPeer struct {
-       client         webseed.Client
-       activeRequests map[Request]webseed.Request
-       requesterCond  sync.Cond
-       peer           Peer
-       // Number of requester routines.
-       maxRequests int
+       // First field for stats alignment.
+       peer             Peer
+       client           webseed.Client
+       activeRequests   map[Request]webseed.Request
+       requesterCond    sync.Cond
+       lastUnhandledErr time.Time
 }
 
 var _ peerImpl = (*webseedPeer)(nil)
 
-func (me *webseedPeer) connStatusString() string {
-       return me.client.Url
+func (me *webseedPeer) peerImplStatusLines() []string {
+       return []string{
+               me.client.Url,
+               fmt.Sprintf("last unhandled error: %v", eventAgeString(me.lastUnhandledErr)),
+       }
 }
 
 func (ws *webseedPeer) String() string {
@@ -84,8 +93,9 @@ func (ws *webseedPeer) requester(i int) {
        defer ws.requesterCond.L.Unlock()
 start:
        for !ws.peer.closed.IsSet() {
+               // Restart is set if we don't need to wait for the requestCond before trying again.
                restart := false
-               ws.peer.requestState.Requests.Iterate(func(x uint32) bool {
+               ws.peer.requestState.Requests.Iterate(func(x RequestIndex) bool {
                        r := ws.peer.t.requestIndexToRequest(x)
                        if _, ok := ws.activeRequests[r]; ok {
                                return true
@@ -99,6 +109,11 @@ start:
                        if errors.Is(err, webseed.ErrTooFast) {
                                time.Sleep(time.Duration(rand.Int63n(int64(10 * time.Second))))
                        }
+                       // Demeter is throwing a tantrum on Mount Olympus for this
+                       ws.peer.t.cl.locker().RLock()
+                       duration := time.Until(ws.lastUnhandledErr.Add(webseedPeerUnhandledErrorSleep))
+                       ws.peer.t.cl.locker().RUnlock()
+                       time.Sleep(duration)
                        ws.requesterCond.L.Lock()
                        return false
                })
@@ -113,10 +128,13 @@ func (ws *webseedPeer) connectionFlags() string {
        return "WS"
 }
 
-// TODO: This is called when banning peers. Perhaps we want to be able to ban webseeds too. We could
-// return bool if this is even possible, and if it isn't, skip to the next drop candidate.
+// Maybe this should drop all existing connections, or something like that.
 func (ws *webseedPeer) drop() {}
 
+func (cn *webseedPeer) ban() {
+       cn.peer.close()
+}
+
 func (ws *webseedPeer) handleUpdateRequests() {
        // Because this is synchronous, webseed peers seem to get first dibs on newly prioritized
        // pieces.
@@ -167,8 +185,13 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re
                        // cfg := spew.NewDefaultConfig()
                        // cfg.DisableMethods = true
                        // cfg.Dump(result.Err)
-                       log.Printf("closing %v", ws)
-                       ws.peer.close()
+
+                       if webseedPeerCloseOnUnhandledError {
+                               log.Printf("closing %v", ws)
+                               ws.peer.close()
+                       } else {
+                               ws.lastUnhandledErr = time.Now()
+                       }
                }
                if !ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r)) {
                        panic("invalid reject")