From 14cf045b6a440143a1c231ff8d7b555d7b3f74a4 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Sat, 31 Dec 2022 11:27:47 +1100 Subject: [PATCH] Sleep webseed peers after unhandled errors --- peer-impl.go | 2 +- peerconn.go | 14 +++++++------- torrent.go | 8 ++++++-- webseed-peer.go | 34 ++++++++++++++++++++++++---------- 4 files changed, 38 insertions(+), 20 deletions(-) diff --git a/peer-impl.go b/peer-impl.go index b854ffd0..f9f9096b 100644 --- a/peer-impl.go +++ b/peer-impl.go @@ -26,7 +26,7 @@ type peerImpl interface { // Rebuke the peer ban() String() string - connStatusString() string + peerImplStatusLines() []string // All if the peer should have everything, known if we know that for a fact. For example, we can // guess at how many pieces are in a torrent, and assume they have all pieces based on them diff --git a/peerconn.go b/peerconn.go index 5ed5bde6..3c515aaf 100644 --- a/peerconn.go +++ b/peerconn.go @@ -171,8 +171,8 @@ type PeerConn struct { peerSentHaveAll bool } -func (cn *PeerConn) connStatusString() string { - return fmt.Sprintf("%+-55q %s %s", cn.PeerID, cn.PeerExtensionBytes, cn.connString) +func (cn *PeerConn) peerImplStatusLines() []string { + return []string{fmt.Sprintf("%+-55q %s %s", cn.PeerID, cn.PeerExtensionBytes, cn.connString)} } func (cn *Peer) updateExpectingChunks() { @@ -389,14 +389,14 @@ func (cn *Peer) writeStatus(w io.Writer, t *Torrent) { if cn.closed.IsSet() { fmt.Fprint(w, "CLOSED: ") } - fmt.Fprintln(w, cn.connStatusString()) + fmt.Fprintln(w, strings.Join(cn.peerImplStatusLines(), "\n")) prio, err := cn.peerPriority() prioStr := fmt.Sprintf("%08x", prio) if err != nil { prioStr += ": " + err.Error() } - fmt.Fprintf(w, " bep40-prio: %v\n", prioStr) - fmt.Fprintf(w, " last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n", + fmt.Fprintf(w, "bep40-prio: %v\n", prioStr) + fmt.Fprintf(w, "last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n", eventAgeString(cn.lastMessageReceived), eventAgeString(cn.completedHandshake), eventAgeString(cn.lastHelpful()), @@ -404,7 +404,7 @@ func (cn *Peer) writeStatus(w io.Writer, t *Torrent) { cn.totalExpectingTime(), ) fmt.Fprintf(w, - " %s completed, %d pieces touched, good chunks: %v/%v:%v reqq: %d+%v/(%d/%d):%d/%d, flags: %s, dr: %.1f KiB/s\n", + "%s completed, %d pieces touched, good chunks: %v/%v:%v reqq: %d+%v/(%d/%d):%d/%d, flags: %s, dr: %.1f KiB/s\n", cn.completedString(), len(cn.peerTouchedPieces), &cn._stats.ChunksReadUseful, @@ -419,7 +419,7 @@ func (cn *Peer) writeStatus(w io.Writer, t *Torrent) { cn.statusFlags(), cn.downloadRate()/(1<<10), ) - fmt.Fprintf(w, " requested pieces:") + fmt.Fprintf(w, "requested pieces:") cn.iterContiguousPieceRequests(func(piece pieceIndex, count int) { fmt.Fprintf(w, " %v(%v)", piece, count) }) diff --git a/torrent.go b/torrent.go index e0bc7246..f19deb75 100644 --- a/torrent.go +++ b/torrent.go @@ -770,9 +770,14 @@ func (t *Torrent) writeStatus(w io.Writer) { } return worseConn(i, j) }) + var buf bytes.Buffer for i, c := range peers { fmt.Fprintf(w, "%2d. ", i+1) - c.writeStatus(w, t) + buf.Reset() + c.writeStatus(&buf, t) + w.Write(bytes.TrimRight( + bytes.ReplaceAll(buf.Bytes(), []byte("\n"), []byte("\n ")), + " ")) } } @@ -2425,7 +2430,6 @@ func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) { }, }, activeRequests: make(map[Request]webseed.Request, maxRequests), - maxRequests: maxRequests, } ws.peer.initRequestState() for _, opt := range opts { diff --git a/webseed-peer.go b/webseed-peer.go index 52e33c72..678c805b 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -16,20 +16,27 @@ import ( "github.com/anacrolix/torrent/webseed" ) +const ( + webseedPeerUnhandledErrorSleep = 5 * time.Second + webseedPeerCloseOnUnhandledError = false +) + type webseedPeer struct { // First field for stats alignment. - peer Peer - client webseed.Client - activeRequests map[Request]webseed.Request - requesterCond sync.Cond - // Number of requester routines. - maxRequests int + peer Peer + client webseed.Client + activeRequests map[Request]webseed.Request + requesterCond sync.Cond + lastUnhandledErr time.Time } var _ peerImpl = (*webseedPeer)(nil) -func (me *webseedPeer) connStatusString() string { - return me.client.Url +func (me *webseedPeer) peerImplStatusLines() []string { + return []string{ + me.client.Url, + fmt.Sprintf("last unhandled error: %v", eventAgeString(me.lastUnhandledErr)), + } } func (ws *webseedPeer) String() string { @@ -86,6 +93,7 @@ func (ws *webseedPeer) requester(i int) { defer ws.requesterCond.L.Unlock() start: for !ws.peer.closed.IsSet() { + // Restart is set if we don't need to wait for the requestCond before trying again. restart := false ws.peer.requestState.Requests.Iterate(func(x RequestIndex) bool { r := ws.peer.t.requestIndexToRequest(x) @@ -101,6 +109,7 @@ start: if errors.Is(err, webseed.ErrTooFast) { time.Sleep(time.Duration(rand.Int63n(int64(10 * time.Second)))) } + time.Sleep(time.Until(ws.lastUnhandledErr.Add(webseedPeerUnhandledErrorSleep))) ws.requesterCond.L.Lock() return false }) @@ -172,8 +181,13 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re // cfg := spew.NewDefaultConfig() // cfg.DisableMethods = true // cfg.Dump(result.Err) - log.Printf("closing %v", ws) - ws.peer.close() + + if webseedPeerCloseOnUnhandledError { + log.Printf("closing %v", ws) + ws.peer.close() + } else { + ws.lastUnhandledErr = time.Now() + } } if !ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r)) { panic("invalid reject") -- 2.44.0