// Rebuke the peer
ban()
String() string
- connStatusString() string
+ peerImplStatusLines() []string
// All if the peer should have everything, known if we know that for a fact. For example, we can
// guess at how many pieces are in a torrent, and assume they have all pieces based on them
peerSentHaveAll bool
}
-func (cn *PeerConn) connStatusString() string {
- return fmt.Sprintf("%+-55q %s %s", cn.PeerID, cn.PeerExtensionBytes, cn.connString)
+func (cn *PeerConn) peerImplStatusLines() []string {
+ return []string{fmt.Sprintf("%+-55q %s %s", cn.PeerID, cn.PeerExtensionBytes, cn.connString)}
}
func (cn *Peer) updateExpectingChunks() {
if cn.closed.IsSet() {
fmt.Fprint(w, "CLOSED: ")
}
- fmt.Fprintln(w, cn.connStatusString())
+ fmt.Fprintln(w, strings.Join(cn.peerImplStatusLines(), "\n"))
prio, err := cn.peerPriority()
prioStr := fmt.Sprintf("%08x", prio)
if err != nil {
prioStr += ": " + err.Error()
}
- fmt.Fprintf(w, " bep40-prio: %v\n", prioStr)
- fmt.Fprintf(w, " last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n",
+ fmt.Fprintf(w, "bep40-prio: %v\n", prioStr)
+ fmt.Fprintf(w, "last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n",
eventAgeString(cn.lastMessageReceived),
eventAgeString(cn.completedHandshake),
eventAgeString(cn.lastHelpful()),
cn.totalExpectingTime(),
)
fmt.Fprintf(w,
- " %s completed, %d pieces touched, good chunks: %v/%v:%v reqq: %d+%v/(%d/%d):%d/%d, flags: %s, dr: %.1f KiB/s\n",
+ "%s completed, %d pieces touched, good chunks: %v/%v:%v reqq: %d+%v/(%d/%d):%d/%d, flags: %s, dr: %.1f KiB/s\n",
cn.completedString(),
len(cn.peerTouchedPieces),
&cn._stats.ChunksReadUseful,
cn.statusFlags(),
cn.downloadRate()/(1<<10),
)
- fmt.Fprintf(w, " requested pieces:")
+ fmt.Fprintf(w, "requested pieces:")
cn.iterContiguousPieceRequests(func(piece pieceIndex, count int) {
fmt.Fprintf(w, " %v(%v)", piece, count)
})
}
return worseConn(i, j)
})
+ var buf bytes.Buffer
for i, c := range peers {
fmt.Fprintf(w, "%2d. ", i+1)
- c.writeStatus(w, t)
+ buf.Reset()
+ c.writeStatus(&buf, t)
+ w.Write(bytes.TrimRight(
+ bytes.ReplaceAll(buf.Bytes(), []byte("\n"), []byte("\n ")),
+ " "))
}
}
},
},
activeRequests: make(map[Request]webseed.Request, maxRequests),
- maxRequests: maxRequests,
}
ws.peer.initRequestState()
for _, opt := range opts {
"github.com/anacrolix/torrent/webseed"
)
+const (
+ webseedPeerUnhandledErrorSleep = 5 * time.Second
+ webseedPeerCloseOnUnhandledError = false
+)
+
type webseedPeer struct {
// First field for stats alignment.
- peer Peer
- client webseed.Client
- activeRequests map[Request]webseed.Request
- requesterCond sync.Cond
- // Number of requester routines.
- maxRequests int
+ peer Peer
+ client webseed.Client
+ activeRequests map[Request]webseed.Request
+ requesterCond sync.Cond
+ lastUnhandledErr time.Time
}
var _ peerImpl = (*webseedPeer)(nil)
-func (me *webseedPeer) connStatusString() string {
- return me.client.Url
+func (me *webseedPeer) peerImplStatusLines() []string {
+ return []string{
+ me.client.Url,
+ fmt.Sprintf("last unhandled error: %v", eventAgeString(me.lastUnhandledErr)),
+ }
}
func (ws *webseedPeer) String() string {
defer ws.requesterCond.L.Unlock()
start:
for !ws.peer.closed.IsSet() {
+ // Restart is set if we don't need to wait for the requestCond before trying again.
restart := false
ws.peer.requestState.Requests.Iterate(func(x RequestIndex) bool {
r := ws.peer.t.requestIndexToRequest(x)
if errors.Is(err, webseed.ErrTooFast) {
time.Sleep(time.Duration(rand.Int63n(int64(10 * time.Second))))
}
+ time.Sleep(time.Until(ws.lastUnhandledErr.Add(webseedPeerUnhandledErrorSleep)))
ws.requesterCond.L.Lock()
return false
})
// cfg := spew.NewDefaultConfig()
// cfg.DisableMethods = true
// cfg.Dump(result.Err)
- log.Printf("closing %v", ws)
- ws.peer.close()
+
+ if webseedPeerCloseOnUnhandledError {
+ log.Printf("closing %v", ws)
+ ws.peer.close()
+ } else {
+ ws.lastUnhandledErr = time.Now()
+ }
}
if !ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r)) {
panic("invalid reject")