]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Sleep webseed peers after unhandled errors
authorMatt Joiner <anacrolix@gmail.com>
Sat, 31 Dec 2022 00:27:47 +0000 (11:27 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Sat, 31 Dec 2022 00:27:47 +0000 (11:27 +1100)
peer-impl.go
peerconn.go
torrent.go
webseed-peer.go

index b854ffd0ff5f096ab486b81a3353269eb0042794..f9f9096b198ea5d247d157fb940fbc9172ace1ac 100644 (file)
@@ -26,7 +26,7 @@ type peerImpl interface {
        // Rebuke the peer
        ban()
        String() string
-       connStatusString() string
+       peerImplStatusLines() []string
 
        // All if the peer should have everything, known if we know that for a fact. For example, we can
        // guess at how many pieces are in a torrent, and assume they have all pieces based on them
index 5ed5bde69eff88dcf9fe9e16b682bba7bc80c1b4..3c515aaf989d130e88670a89215abb14b1364e53 100644 (file)
@@ -171,8 +171,8 @@ type PeerConn struct {
        peerSentHaveAll bool
 }
 
-func (cn *PeerConn) connStatusString() string {
-       return fmt.Sprintf("%+-55q %s %s", cn.PeerID, cn.PeerExtensionBytes, cn.connString)
+func (cn *PeerConn) peerImplStatusLines() []string {
+       return []string{fmt.Sprintf("%+-55q %s %s", cn.PeerID, cn.PeerExtensionBytes, cn.connString)}
 }
 
 func (cn *Peer) updateExpectingChunks() {
@@ -389,14 +389,14 @@ func (cn *Peer) writeStatus(w io.Writer, t *Torrent) {
        if cn.closed.IsSet() {
                fmt.Fprint(w, "CLOSED: ")
        }
-       fmt.Fprintln(w, cn.connStatusString())
+       fmt.Fprintln(w, strings.Join(cn.peerImplStatusLines(), "\n"))
        prio, err := cn.peerPriority()
        prioStr := fmt.Sprintf("%08x", prio)
        if err != nil {
                prioStr += ": " + err.Error()
        }
-       fmt.Fprintf(w, "    bep40-prio: %v\n", prioStr)
-       fmt.Fprintf(w, "    last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n",
+       fmt.Fprintf(w, "bep40-prio: %v\n", prioStr)
+       fmt.Fprintf(w, "last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n",
                eventAgeString(cn.lastMessageReceived),
                eventAgeString(cn.completedHandshake),
                eventAgeString(cn.lastHelpful()),
@@ -404,7 +404,7 @@ func (cn *Peer) writeStatus(w io.Writer, t *Torrent) {
                cn.totalExpectingTime(),
        )
        fmt.Fprintf(w,
-               "    %s completed, %d pieces touched, good chunks: %v/%v:%v reqq: %d+%v/(%d/%d):%d/%d, flags: %s, dr: %.1f KiB/s\n",
+               "%s completed, %d pieces touched, good chunks: %v/%v:%v reqq: %d+%v/(%d/%d):%d/%d, flags: %s, dr: %.1f KiB/s\n",
                cn.completedString(),
                len(cn.peerTouchedPieces),
                &cn._stats.ChunksReadUseful,
@@ -419,7 +419,7 @@ func (cn *Peer) writeStatus(w io.Writer, t *Torrent) {
                cn.statusFlags(),
                cn.downloadRate()/(1<<10),
        )
-       fmt.Fprintf(w, "    requested pieces:")
+       fmt.Fprintf(w, "requested pieces:")
        cn.iterContiguousPieceRequests(func(piece pieceIndex, count int) {
                fmt.Fprintf(w, " %v(%v)", piece, count)
        })
index e0bc7246320ca7880dbee91bb06c685e03f5d519..f19deb7587aa07b26c18d779025779e16c2e88b7 100644 (file)
@@ -770,9 +770,14 @@ func (t *Torrent) writeStatus(w io.Writer) {
                }
                return worseConn(i, j)
        })
+       var buf bytes.Buffer
        for i, c := range peers {
                fmt.Fprintf(w, "%2d. ", i+1)
-               c.writeStatus(w, t)
+               buf.Reset()
+               c.writeStatus(&buf, t)
+               w.Write(bytes.TrimRight(
+                       bytes.ReplaceAll(buf.Bytes(), []byte("\n"), []byte("\n    ")),
+                       " "))
        }
 }
 
@@ -2425,7 +2430,6 @@ func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) {
                        },
                },
                activeRequests: make(map[Request]webseed.Request, maxRequests),
-               maxRequests:    maxRequests,
        }
        ws.peer.initRequestState()
        for _, opt := range opts {
index 52e33c721217b51096a5d6e99c6f179233c5e23c..678c805bd9b3b4590aae11f5d92e0be86661d0ab 100644 (file)
@@ -16,20 +16,27 @@ import (
        "github.com/anacrolix/torrent/webseed"
 )
 
+const (
+       webseedPeerUnhandledErrorSleep   = 5 * time.Second
+       webseedPeerCloseOnUnhandledError = false
+)
+
 type webseedPeer struct {
        // First field for stats alignment.
-       peer           Peer
-       client         webseed.Client
-       activeRequests map[Request]webseed.Request
-       requesterCond  sync.Cond
-       // Number of requester routines.
-       maxRequests int
+       peer             Peer
+       client           webseed.Client
+       activeRequests   map[Request]webseed.Request
+       requesterCond    sync.Cond
+       lastUnhandledErr time.Time
 }
 
 var _ peerImpl = (*webseedPeer)(nil)
 
-func (me *webseedPeer) connStatusString() string {
-       return me.client.Url
+func (me *webseedPeer) peerImplStatusLines() []string {
+       return []string{
+               me.client.Url,
+               fmt.Sprintf("last unhandled error: %v", eventAgeString(me.lastUnhandledErr)),
+       }
 }
 
 func (ws *webseedPeer) String() string {
@@ -86,6 +93,7 @@ func (ws *webseedPeer) requester(i int) {
        defer ws.requesterCond.L.Unlock()
 start:
        for !ws.peer.closed.IsSet() {
+               // Restart is set if we don't need to wait for the requestCond before trying again.
                restart := false
                ws.peer.requestState.Requests.Iterate(func(x RequestIndex) bool {
                        r := ws.peer.t.requestIndexToRequest(x)
@@ -101,6 +109,7 @@ start:
                        if errors.Is(err, webseed.ErrTooFast) {
                                time.Sleep(time.Duration(rand.Int63n(int64(10 * time.Second))))
                        }
+                       time.Sleep(time.Until(ws.lastUnhandledErr.Add(webseedPeerUnhandledErrorSleep)))
                        ws.requesterCond.L.Lock()
                        return false
                })
@@ -172,8 +181,13 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re
                        // cfg := spew.NewDefaultConfig()
                        // cfg.DisableMethods = true
                        // cfg.Dump(result.Err)
-                       log.Printf("closing %v", ws)
-                       ws.peer.close()
+
+                       if webseedPeerCloseOnUnhandledError {
+                               log.Printf("closing %v", ws)
+                               ws.peer.close()
+                       } else {
+                               ws.lastUnhandledErr = time.Now()
+                       }
                }
                if !ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r)) {
                        panic("invalid reject")