X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=webseed-peer.go;h=5b6632badd8153ff0f685e75b4c4452c188ff34d;hb=HEAD;hp=4921ab727e2676c56463207ab34f36f9bc0f3b12;hpb=a2c50ea2bd98144cd8ea37dffec015e745ac3189;p=btrtrc.git diff --git a/webseed-peer.go b/webseed-peer.go index 4921ab72..5b6632ba 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -10,24 +10,33 @@ import ( "github.com/RoaringBitmap/roaring" "github.com/anacrolix/log" + "github.com/anacrolix/torrent/metainfo" pp "github.com/anacrolix/torrent/peer_protocol" "github.com/anacrolix/torrent/webseed" ) +const ( + webseedPeerUnhandledErrorSleep = 5 * time.Second + webseedPeerCloseOnUnhandledError = false +) + type webseedPeer struct { - client webseed.Client - activeRequests map[Request]webseed.Request - requesterCond sync.Cond - peer Peer - // Number of requester routines. - maxRequests int + // First field for stats alignment. + peer Peer + client webseed.Client + activeRequests map[Request]webseed.Request + requesterCond sync.Cond + lastUnhandledErr time.Time } var _ peerImpl = (*webseedPeer)(nil) -func (me *webseedPeer) connStatusString() string { - return me.client.Url +func (me *webseedPeer) peerImplStatusLines() []string { + return []string{ + me.client.Url, + fmt.Sprintf("last unhandled error: %v", eventAgeString(me.lastUnhandledErr)), + } } func (ws *webseedPeer) String() string { @@ -49,17 +58,13 @@ func (ws *webseedPeer) writeInterested(interested bool) bool { } func (ws *webseedPeer) _cancel(r RequestIndex) bool { - active, ok := ws.activeRequests[ws.peer.t.requestIndexToRequest(r)] - if ok { + if active, ok := ws.activeRequests[ws.peer.t.requestIndexToRequest(r)]; ok { active.Cancel() + // The requester is running and will handle the result. + return true } - if !ws.peer.deleteRequest(r) { - panic("cancelled webseed request should exist") - } - if ws.peer.isLowOnRequests() { - ws.peer.updateRequests("webseedPeer._cancel") - } - return true + // There should be no requester handling this, so no further events will occur. + return false } func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec { @@ -88,21 +93,27 @@ func (ws *webseedPeer) requester(i int) { defer ws.requesterCond.L.Unlock() start: for !ws.peer.closed.IsSet() { + // Restart is set if we don't need to wait for the requestCond before trying again. restart := false - ws.peer.actualRequestState.Requests.Iterate(func(x uint32) bool { + ws.peer.requestState.Requests.Iterate(func(x RequestIndex) bool { r := ws.peer.t.requestIndexToRequest(x) if _, ok := ws.activeRequests[r]; ok { return true } err := ws.doRequest(r) ws.requesterCond.L.Unlock() - if err != nil { + if err != nil && !errors.Is(err, context.Canceled) { log.Printf("requester %v: error doing webseed request %v: %v", i, r, err) } restart = true if errors.Is(err, webseed.ErrTooFast) { time.Sleep(time.Duration(rand.Int63n(int64(10 * time.Second)))) } + // Demeter is throwing a tantrum on Mount Olympus for this + ws.peer.t.cl.locker().RLock() + duration := time.Until(ws.lastUnhandledErr.Add(webseedPeerUnhandledErrorSleep)) + ws.peer.t.cl.locker().RUnlock() + time.Sleep(duration) ws.requesterCond.L.Lock() return false }) @@ -117,22 +128,32 @@ func (ws *webseedPeer) connectionFlags() string { return "WS" } -// TODO: This is called when banning peers. Perhaps we want to be able to ban webseeds too. We could -// return bool if this is even possible, and if it isn't, skip to the next drop candidate. +// Maybe this should drop all existing connections, or something like that. func (ws *webseedPeer) drop() {} +func (cn *webseedPeer) ban() { + cn.peer.close() +} + func (ws *webseedPeer) handleUpdateRequests() { // Because this is synchronous, webseed peers seem to get first dibs on newly prioritized // pieces. - ws.peer.maybeUpdateActualRequestState() + go func() { + ws.peer.t.cl.lock() + defer ws.peer.t.cl.unlock() + ws.peer.maybeUpdateActualRequestState() + }() } func (ws *webseedPeer) onClose() { - ws.peer.logger.WithLevel(log.Debug).Print("closing") - ws.peer.deleteAllRequests() - for _, r := range ws.activeRequests { - r.Cancel() - } + ws.peer.logger.Levelf(log.Debug, "closing") + // Just deleting them means we would have to manually cancel active requests. + ws.peer.cancelAllRequests() + ws.peer.t.iterPeers(func(p *Peer) { + if p.isLowOnRequests() { + p.updateRequests("webseedPeer.onClose") + } + }) ws.requesterCond.Broadcast() } @@ -164,10 +185,17 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re // cfg := spew.NewDefaultConfig() // cfg.DisableMethods = true // cfg.Dump(result.Err) - log.Printf("closing %v", ws) - ws.peer.close() + + if webseedPeerCloseOnUnhandledError { + log.Printf("closing %v", ws) + ws.peer.close() + } else { + ws.lastUnhandledErr = time.Now() + } + } + if !ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r)) { + panic("invalid reject") } - ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r)) return err } err = ws.peer.receiveChunk(&pp.Message{ @@ -182,10 +210,6 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re return err } -func (me *webseedPeer) isLowOnRequests() bool { - return me.peer.actualRequestState.Requests.GetCardinality() < uint64(me.maxRequests) -} - func (me *webseedPeer) peerPieces() *roaring.Bitmap { return &me.client.Pieces }