]> Sergey Matveev's repositories - btrtrc.git/blobdiff - webseed-peer.go
Drop support for go 1.20
[btrtrc.git] / webseed-peer.go
index 682bef35d3f2aec9c03e8cacc4ce848ada39f378..5b6632badd8153ff0f685e75b4c4452c188ff34d 100644 (file)
@@ -4,28 +4,39 @@ import (
        "context"
        "errors"
        "fmt"
+       "math/rand"
        "sync"
+       "time"
 
        "github.com/RoaringBitmap/roaring"
        "github.com/anacrolix/log"
+
        "github.com/anacrolix/torrent/metainfo"
        pp "github.com/anacrolix/torrent/peer_protocol"
        "github.com/anacrolix/torrent/webseed"
 )
 
+const (
+       webseedPeerUnhandledErrorSleep   = 5 * time.Second
+       webseedPeerCloseOnUnhandledError = false
+)
+
 type webseedPeer struct {
-       client         webseed.Client
-       activeRequests map[Request]webseed.Request
-       requesterCond  sync.Cond
-       peer           Peer
-       // Number of requester routines.
-       maxRequests int
+       // First field for stats alignment.
+       peer             Peer
+       client           webseed.Client
+       activeRequests   map[Request]webseed.Request
+       requesterCond    sync.Cond
+       lastUnhandledErr time.Time
 }
 
 var _ peerImpl = (*webseedPeer)(nil)
 
-func (me *webseedPeer) connStatusString() string {
-       return me.client.Url
+func (me *webseedPeer) peerImplStatusLines() []string {
+       return []string{
+               me.client.Url,
+               fmt.Sprintf("last unhandled error: %v", eventAgeString(me.lastUnhandledErr)),
+       }
 }
 
 func (ws *webseedPeer) String() string {
@@ -47,17 +58,13 @@ func (ws *webseedPeer) writeInterested(interested bool) bool {
 }
 
 func (ws *webseedPeer) _cancel(r RequestIndex) bool {
-       active, ok := ws.activeRequests[ws.peer.t.requestIndexToRequest(r)]
-       if ok {
+       if active, ok := ws.activeRequests[ws.peer.t.requestIndexToRequest(r)]; ok {
                active.Cancel()
-               if !ws.peer.deleteRequest(r) {
-                       panic("cancelled webseed request should exist")
-               }
-               if ws.peer.isLowOnRequests() {
-                       ws.peer.updateRequests("webseedPeer._cancel")
-               }
+               // The requester is running and will handle the result.
+               return true
        }
-       return true
+       // There should be no requester handling this, so no further events will occur.
+       return false
 }
 
 func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec {
@@ -69,30 +76,45 @@ func (ws *webseedPeer) _request(r Request) bool {
        return true
 }
 
-func (ws *webseedPeer) doRequest(r Request) {
+func (ws *webseedPeer) doRequest(r Request) error {
        webseedRequest := ws.client.NewRequest(ws.intoSpec(r))
        ws.activeRequests[r] = webseedRequest
-       func() {
+       err := func() error {
                ws.requesterCond.L.Unlock()
                defer ws.requesterCond.L.Lock()
-               ws.requestResultHandler(r, webseedRequest)
+               return ws.requestResultHandler(r, webseedRequest)
        }()
        delete(ws.activeRequests, r)
+       return err
 }
 
-func (ws *webseedPeer) requester() {
+func (ws *webseedPeer) requester(i int) {
        ws.requesterCond.L.Lock()
        defer ws.requesterCond.L.Unlock()
 start:
        for !ws.peer.closed.IsSet() {
+               // Restart is set if we don't need to wait for the requestCond before trying again.
                restart := false
-               ws.peer.actualRequestState.Requests.Iterate(func(x uint32) bool {
+               ws.peer.requestState.Requests.Iterate(func(x RequestIndex) bool {
                        r := ws.peer.t.requestIndexToRequest(x)
                        if _, ok := ws.activeRequests[r]; ok {
                                return true
                        }
-                       ws.doRequest(r)
+                       err := ws.doRequest(r)
+                       ws.requesterCond.L.Unlock()
+                       if err != nil && !errors.Is(err, context.Canceled) {
+                               log.Printf("requester %v: error doing webseed request %v: %v", i, r, err)
+                       }
                        restart = true
+                       if errors.Is(err, webseed.ErrTooFast) {
+                               time.Sleep(time.Duration(rand.Int63n(int64(10 * time.Second))))
+                       }
+                       // Demeter is throwing a tantrum on Mount Olympus for this
+                       ws.peer.t.cl.locker().RLock()
+                       duration := time.Until(ws.lastUnhandledErr.Add(webseedPeerUnhandledErrorSleep))
+                       ws.peer.t.cl.locker().RUnlock()
+                       time.Sleep(duration)
+                       ws.requesterCond.L.Lock()
                        return false
                })
                if restart {
@@ -106,60 +128,86 @@ func (ws *webseedPeer) connectionFlags() string {
        return "WS"
 }
 
-// TODO: This is called when banning peers. Perhaps we want to be able to ban webseeds too. We could
-// return bool if this is even possible, and if it isn't, skip to the next drop candidate.
+// Maybe this should drop all existing connections, or something like that.
 func (ws *webseedPeer) drop() {}
 
+func (cn *webseedPeer) ban() {
+       cn.peer.close()
+}
+
 func (ws *webseedPeer) handleUpdateRequests() {
-       ws.peer.maybeUpdateActualRequestState()
+       // Because this is synchronous, webseed peers seem to get first dibs on newly prioritized
+       // pieces.
+       go func() {
+               ws.peer.t.cl.lock()
+               defer ws.peer.t.cl.unlock()
+               ws.peer.maybeUpdateActualRequestState()
+       }()
 }
 
 func (ws *webseedPeer) onClose() {
-       ws.peer.logger.WithLevel(log.Debug).Print("closing")
-       ws.peer.deleteAllRequests()
-       for _, r := range ws.activeRequests {
-               r.Cancel()
-       }
+       ws.peer.logger.Levelf(log.Debug, "closing")
+       // Just deleting them means we would have to manually cancel active requests.
+       ws.peer.cancelAllRequests()
+       ws.peer.t.iterPeers(func(p *Peer) {
+               if p.isLowOnRequests() {
+                       p.updateRequests("webseedPeer.onClose")
+               }
+       })
        ws.requesterCond.Broadcast()
 }
 
-func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) {
+func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) error {
        result := <-webseedRequest.Result
+       close(webseedRequest.Result) // one-shot
        // We do this here rather than inside receiveChunk, since we want to count errors too. I'm not
        // sure if we can divine which errors indicate cancellation on our end without hitting the
        // network though.
-       ws.peer.doChunkReadStats(int64(len(result.Bytes)))
+       if len(result.Bytes) != 0 || result.Err == nil {
+               // Increment ChunksRead and friends
+               ws.peer.doChunkReadStats(int64(len(result.Bytes)))
+       }
        ws.peer.readBytes(int64(len(result.Bytes)))
        ws.peer.t.cl.lock()
        defer ws.peer.t.cl.unlock()
        if ws.peer.t.closed.IsSet() {
-               return
+               return nil
        }
-       if result.Err != nil {
-               if !errors.Is(result.Err, context.Canceled) && !ws.peer.closed.IsSet() {
+       err := result.Err
+       if err != nil {
+               switch {
+               case errors.Is(err, context.Canceled):
+               case errors.Is(err, webseed.ErrTooFast):
+               case ws.peer.closed.IsSet():
+               default:
                        ws.peer.logger.Printf("Request %v rejected: %v", r, result.Err)
+                       // // Here lies my attempt to extract something concrete from Go's error system. RIP.
                        // cfg := spew.NewDefaultConfig()
                        // cfg.DisableMethods = true
                        // cfg.Dump(result.Err)
-                       log.Printf("closing %v", ws)
-                       ws.peer.close()
+
+                       if webseedPeerCloseOnUnhandledError {
+                               log.Printf("closing %v", ws)
+                               ws.peer.close()
+                       } else {
+                               ws.lastUnhandledErr = time.Now()
+                       }
                }
-               ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r))
-       } else {
-               err := ws.peer.receiveChunk(&pp.Message{
-                       Type:  pp.Piece,
-                       Index: r.Index,
-                       Begin: r.Begin,
-                       Piece: result.Bytes,
-               })
-               if err != nil {
-                       panic(err)
+               if !ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r)) {
+                       panic("invalid reject")
                }
+               return err
        }
-}
-
-func (me *webseedPeer) isLowOnRequests() bool {
-       return me.peer.actualRequestState.Requests.GetCardinality() < uint64(me.maxRequests)
+       err = ws.peer.receiveChunk(&pp.Message{
+               Type:  pp.Piece,
+               Index: r.Index,
+               Begin: r.Begin,
+               Piece: result.Bytes,
+       })
+       if err != nil {
+               panic(err)
+       }
+       return err
 }
 
 func (me *webseedPeer) peerPieces() *roaring.Bitmap {