X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=webseed-peer.go;h=5b6632badd8153ff0f685e75b4c4452c188ff34d;hb=HEAD;hp=2486aba05882f722b38953f9371a16c9057543df;hpb=1bae62fd222402b483f638cdcc27dda5a47a29ed;p=btrtrc.git diff --git a/webseed-peer.go b/webseed-peer.go index 2486aba0..5b6632ba 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -10,24 +10,33 @@ import ( "github.com/RoaringBitmap/roaring" "github.com/anacrolix/log" + "github.com/anacrolix/torrent/metainfo" pp "github.com/anacrolix/torrent/peer_protocol" "github.com/anacrolix/torrent/webseed" ) +const ( + webseedPeerUnhandledErrorSleep = 5 * time.Second + webseedPeerCloseOnUnhandledError = false +) + type webseedPeer struct { - client webseed.Client - activeRequests map[Request]webseed.Request - requesterCond sync.Cond - peer Peer - // Number of requester routines. - maxRequests int + // First field for stats alignment. + peer Peer + client webseed.Client + activeRequests map[Request]webseed.Request + requesterCond sync.Cond + lastUnhandledErr time.Time } var _ peerImpl = (*webseedPeer)(nil) -func (me *webseedPeer) connStatusString() string { - return me.client.Url +func (me *webseedPeer) peerImplStatusLines() []string { + return []string{ + me.client.Url, + fmt.Sprintf("last unhandled error: %v", eventAgeString(me.lastUnhandledErr)), + } } func (ws *webseedPeer) String() string { @@ -84,8 +93,9 @@ func (ws *webseedPeer) requester(i int) { defer ws.requesterCond.L.Unlock() start: for !ws.peer.closed.IsSet() { + // Restart is set if we don't need to wait for the requestCond before trying again. restart := false - ws.peer.requestState.Requests.Iterate(func(x uint32) bool { + ws.peer.requestState.Requests.Iterate(func(x RequestIndex) bool { r := ws.peer.t.requestIndexToRequest(x) if _, ok := ws.activeRequests[r]; ok { return true @@ -99,6 +109,11 @@ start: if errors.Is(err, webseed.ErrTooFast) { time.Sleep(time.Duration(rand.Int63n(int64(10 * time.Second)))) } + // Demeter is throwing a tantrum on Mount Olympus for this + ws.peer.t.cl.locker().RLock() + duration := time.Until(ws.lastUnhandledErr.Add(webseedPeerUnhandledErrorSleep)) + ws.peer.t.cl.locker().RUnlock() + time.Sleep(duration) ws.requesterCond.L.Lock() return false }) @@ -113,10 +128,13 @@ func (ws *webseedPeer) connectionFlags() string { return "WS" } -// TODO: This is called when banning peers. Perhaps we want to be able to ban webseeds too. We could -// return bool if this is even possible, and if it isn't, skip to the next drop candidate. +// Maybe this should drop all existing connections, or something like that. func (ws *webseedPeer) drop() {} +func (cn *webseedPeer) ban() { + cn.peer.close() +} + func (ws *webseedPeer) handleUpdateRequests() { // Because this is synchronous, webseed peers seem to get first dibs on newly prioritized // pieces. @@ -128,7 +146,7 @@ func (ws *webseedPeer) handleUpdateRequests() { } func (ws *webseedPeer) onClose() { - ws.peer.logger.WithLevel(log.Debug).Print("closing") + ws.peer.logger.Levelf(log.Debug, "closing") // Just deleting them means we would have to manually cancel active requests. ws.peer.cancelAllRequests() ws.peer.t.iterPeers(func(p *Peer) { @@ -167,8 +185,13 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re // cfg := spew.NewDefaultConfig() // cfg.DisableMethods = true // cfg.Dump(result.Err) - log.Printf("closing %v", ws) - ws.peer.close() + + if webseedPeerCloseOnUnhandledError { + log.Printf("closing %v", ws) + ws.peer.close() + } else { + ws.lastUnhandledErr = time.Now() + } } if !ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r)) { panic("invalid reject")