]> Sergey Matveev's repositories - btrtrc.git/blobdiff - webseed-peer.go
Drop support for go 1.20
[btrtrc.git] / webseed-peer.go
index 4ed323725d655f5c55156a41eb43ec7e0168bbfa..5b6632badd8153ff0f685e75b4c4452c188ff34d 100644 (file)
@@ -1,27 +1,42 @@
 package torrent
 
 import (
+       "context"
+       "errors"
        "fmt"
-       "strings"
+       "math/rand"
+       "sync"
+       "time"
+
+       "github.com/RoaringBitmap/roaring"
+       "github.com/anacrolix/log"
 
-       "github.com/anacrolix/torrent/common"
        "github.com/anacrolix/torrent/metainfo"
        pp "github.com/anacrolix/torrent/peer_protocol"
-       "github.com/anacrolix/torrent/segments"
        "github.com/anacrolix/torrent/webseed"
-       "github.com/pkg/errors"
+)
+
+const (
+       webseedPeerUnhandledErrorSleep   = 5 * time.Second
+       webseedPeerCloseOnUnhandledError = false
 )
 
 type webseedPeer struct {
-       client   webseed.Client
-       requests map[request]webseed.Request
-       peer     peer
+       // First field for stats alignment.
+       peer             Peer
+       client           webseed.Client
+       activeRequests   map[Request]webseed.Request
+       requesterCond    sync.Cond
+       lastUnhandledErr time.Time
 }
 
 var _ peerImpl = (*webseedPeer)(nil)
 
-func (me *webseedPeer) connStatusString() string {
-       return me.client.Url
+func (me *webseedPeer) peerImplStatusLines() []string {
+       return []string{
+               me.client.Url,
+               fmt.Sprintf("last unhandled error: %v", eventAgeString(me.lastUnhandledErr)),
+       }
 }
 
 func (ws *webseedPeer) String() string {
@@ -29,71 +44,179 @@ func (ws *webseedPeer) String() string {
 }
 
 func (ws *webseedPeer) onGotInfo(info *metainfo.Info) {
-       ws.client.FileIndex = segments.NewIndex(common.LengthIterFromUpvertedFiles(info.UpvertedFiles()))
-       ws.client.Info = info
-}
-
-func (ws *webseedPeer) _postCancel(r request) {
-       ws.cancel(r)
+       ws.client.SetInfo(info)
+       // There should be probably be a callback in Client instead, so it can remove pieces at its whim
+       // too.
+       ws.client.Pieces.Iterate(func(x uint32) bool {
+               ws.peer.t.incPieceAvailability(pieceIndex(x))
+               return true
+       })
 }
 
 func (ws *webseedPeer) writeInterested(interested bool) bool {
        return true
 }
 
-func (ws *webseedPeer) cancel(r request) bool {
-       ws.requests[r].Cancel()
-       return true
+func (ws *webseedPeer) _cancel(r RequestIndex) bool {
+       if active, ok := ws.activeRequests[ws.peer.t.requestIndexToRequest(r)]; ok {
+               active.Cancel()
+               // The requester is running and will handle the result.
+               return true
+       }
+       // There should be no requester handling this, so no further events will occur.
+       return false
 }
 
-func (ws *webseedPeer) intoSpec(r request) webseed.RequestSpec {
+func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec {
        return webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)}
 }
 
-func (ws *webseedPeer) request(r request) bool {
-       webseedRequest := ws.client.NewRequest(ws.intoSpec(r))
-       ws.requests[r] = webseedRequest
-       go ws.requestResultHandler(r, webseedRequest)
+func (ws *webseedPeer) _request(r Request) bool {
+       ws.requesterCond.Signal()
        return true
 }
 
+func (ws *webseedPeer) doRequest(r Request) error {
+       webseedRequest := ws.client.NewRequest(ws.intoSpec(r))
+       ws.activeRequests[r] = webseedRequest
+       err := func() error {
+               ws.requesterCond.L.Unlock()
+               defer ws.requesterCond.L.Lock()
+               return ws.requestResultHandler(r, webseedRequest)
+       }()
+       delete(ws.activeRequests, r)
+       return err
+}
+
+func (ws *webseedPeer) requester(i int) {
+       ws.requesterCond.L.Lock()
+       defer ws.requesterCond.L.Unlock()
+start:
+       for !ws.peer.closed.IsSet() {
+               // Restart is set if we don't need to wait for the requestCond before trying again.
+               restart := false
+               ws.peer.requestState.Requests.Iterate(func(x RequestIndex) bool {
+                       r := ws.peer.t.requestIndexToRequest(x)
+                       if _, ok := ws.activeRequests[r]; ok {
+                               return true
+                       }
+                       err := ws.doRequest(r)
+                       ws.requesterCond.L.Unlock()
+                       if err != nil && !errors.Is(err, context.Canceled) {
+                               log.Printf("requester %v: error doing webseed request %v: %v", i, r, err)
+                       }
+                       restart = true
+                       if errors.Is(err, webseed.ErrTooFast) {
+                               time.Sleep(time.Duration(rand.Int63n(int64(10 * time.Second))))
+                       }
+                       // Demeter is throwing a tantrum on Mount Olympus for this
+                       ws.peer.t.cl.locker().RLock()
+                       duration := time.Until(ws.lastUnhandledErr.Add(webseedPeerUnhandledErrorSleep))
+                       ws.peer.t.cl.locker().RUnlock()
+                       time.Sleep(duration)
+                       ws.requesterCond.L.Lock()
+                       return false
+               })
+               if restart {
+                       goto start
+               }
+               ws.requesterCond.Wait()
+       }
+}
+
 func (ws *webseedPeer) connectionFlags() string {
        return "WS"
 }
 
-// TODO: This is called when banning peers. Perhaps we want to be able to ban webseeds too.
+// Maybe this should drop all existing connections, or something like that.
 func (ws *webseedPeer) drop() {}
 
-func (ws *webseedPeer) updateRequests() {
-       ws.peer.doRequestState()
+func (cn *webseedPeer) ban() {
+       cn.peer.close()
+}
+
+func (ws *webseedPeer) handleUpdateRequests() {
+       // Because this is synchronous, webseed peers seem to get first dibs on newly prioritized
+       // pieces.
+       go func() {
+               ws.peer.t.cl.lock()
+               defer ws.peer.t.cl.unlock()
+               ws.peer.maybeUpdateActualRequestState()
+       }()
 }
 
-func (ws *webseedPeer) _close() {}
+func (ws *webseedPeer) onClose() {
+       ws.peer.logger.Levelf(log.Debug, "closing")
+       // Just deleting them means we would have to manually cancel active requests.
+       ws.peer.cancelAllRequests()
+       ws.peer.t.iterPeers(func(p *Peer) {
+               if p.isLowOnRequests() {
+                       p.updateRequests("webseedPeer.onClose")
+               }
+       })
+       ws.requesterCond.Broadcast()
+}
 
-func (ws *webseedPeer) requestResultHandler(r request, webseedRequest webseed.Request) {
+func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) error {
        result := <-webseedRequest.Result
+       close(webseedRequest.Result) // one-shot
+       // We do this here rather than inside receiveChunk, since we want to count errors too. I'm not
+       // sure if we can divine which errors indicate cancellation on our end without hitting the
+       // network though.
+       if len(result.Bytes) != 0 || result.Err == nil {
+               // Increment ChunksRead and friends
+               ws.peer.doChunkReadStats(int64(len(result.Bytes)))
+       }
+       ws.peer.readBytes(int64(len(result.Bytes)))
        ws.peer.t.cl.lock()
        defer ws.peer.t.cl.unlock()
-       if result.Err != nil {
-               ws.peer.logger.Printf("request %v rejected: %v", r, result.Err)
-               // Always close for now. We need to filter out temporary errors, but this is a nightmare in
-               // Go. Currently a bad webseed URL can starve out the good ones due to the chunk selection
-               // algorithm.
-               const closeOnAllErrors = false
-               if closeOnAllErrors || strings.Contains(errors.Cause(result.Err).Error(), "unsupported protocol scheme") {
-                       ws.peer.close()
-               } else {
-                       ws.peer.remoteRejectedRequest(r)
+       if ws.peer.t.closed.IsSet() {
+               return nil
+       }
+       err := result.Err
+       if err != nil {
+               switch {
+               case errors.Is(err, context.Canceled):
+               case errors.Is(err, webseed.ErrTooFast):
+               case ws.peer.closed.IsSet():
+               default:
+                       ws.peer.logger.Printf("Request %v rejected: %v", r, result.Err)
+                       // // Here lies my attempt to extract something concrete from Go's error system. RIP.
+                       // cfg := spew.NewDefaultConfig()
+                       // cfg.DisableMethods = true
+                       // cfg.Dump(result.Err)
+
+                       if webseedPeerCloseOnUnhandledError {
+                               log.Printf("closing %v", ws)
+                               ws.peer.close()
+                       } else {
+                               ws.lastUnhandledErr = time.Now()
+                       }
                }
-       } else {
-               err := ws.peer.receiveChunk(&pp.Message{
-                       Type:  pp.Piece,
-                       Index: r.Index,
-                       Begin: r.Begin,
-                       Piece: result.Bytes,
-               })
-               if err != nil {
-                       panic(err)
+               if !ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r)) {
+                       panic("invalid reject")
                }
+               return err
+       }
+       err = ws.peer.receiveChunk(&pp.Message{
+               Type:  pp.Piece,
+               Index: r.Index,
+               Begin: r.Begin,
+               Piece: result.Bytes,
+       })
+       if err != nil {
+               panic(err)
+       }
+       return err
+}
+
+func (me *webseedPeer) peerPieces() *roaring.Bitmap {
+       return &me.client.Pieces
+}
+
+func (cn *webseedPeer) peerHasAllPieces() (all, known bool) {
+       if !cn.peer.t.haveInfo() {
+               return true, false
        }
+       return cn.client.Pieces.GetCardinality() == uint64(cn.peer.t.numPieces()), true
 }