]> Sergey Matveev's repositories - btrtrc.git/blob - webseed-peer.go
gorond ./...
[btrtrc.git] / webseed-peer.go
1 package torrent
2
3 import (
4         "context"
5         "errors"
6         "fmt"
7         "math/rand"
8         "sync"
9         "time"
10
11         "github.com/RoaringBitmap/roaring"
12         "github.com/anacrolix/log"
13
14         "github.com/anacrolix/torrent/metainfo"
15         pp "github.com/anacrolix/torrent/peer_protocol"
16         "github.com/anacrolix/torrent/webseed"
17 )
18
19 type webseedPeer struct {
20         // First field for stats alignment.
21         peer           Peer
22         client         webseed.Client
23         activeRequests map[Request]webseed.Request
24         requesterCond  sync.Cond
25         // Number of requester routines.
26         maxRequests int
27 }
28
29 var _ peerImpl = (*webseedPeer)(nil)
30
31 func (me *webseedPeer) connStatusString() string {
32         return me.client.Url
33 }
34
35 func (ws *webseedPeer) String() string {
36         return fmt.Sprintf("webseed peer for %q", ws.client.Url)
37 }
38
39 func (ws *webseedPeer) onGotInfo(info *metainfo.Info) {
40         ws.client.SetInfo(info)
41         // There should be probably be a callback in Client instead, so it can remove pieces at its whim
42         // too.
43         ws.client.Pieces.Iterate(func(x uint32) bool {
44                 ws.peer.t.incPieceAvailability(pieceIndex(x))
45                 return true
46         })
47 }
48
49 func (ws *webseedPeer) writeInterested(interested bool) bool {
50         return true
51 }
52
53 func (ws *webseedPeer) _cancel(r RequestIndex) bool {
54         if active, ok := ws.activeRequests[ws.peer.t.requestIndexToRequest(r)]; ok {
55                 active.Cancel()
56                 // The requester is running and will handle the result.
57                 return true
58         }
59         // There should be no requester handling this, so no further events will occur.
60         return false
61 }
62
63 func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec {
64         return webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)}
65 }
66
67 func (ws *webseedPeer) _request(r Request) bool {
68         ws.requesterCond.Signal()
69         return true
70 }
71
72 func (ws *webseedPeer) doRequest(r Request) error {
73         webseedRequest := ws.client.NewRequest(ws.intoSpec(r))
74         ws.activeRequests[r] = webseedRequest
75         err := func() error {
76                 ws.requesterCond.L.Unlock()
77                 defer ws.requesterCond.L.Lock()
78                 return ws.requestResultHandler(r, webseedRequest)
79         }()
80         delete(ws.activeRequests, r)
81         return err
82 }
83
84 func (ws *webseedPeer) requester(i int) {
85         ws.requesterCond.L.Lock()
86         defer ws.requesterCond.L.Unlock()
87 start:
88         for !ws.peer.closed.IsSet() {
89                 restart := false
90                 ws.peer.requestState.Requests.Iterate(func(x RequestIndex) bool {
91                         r := ws.peer.t.requestIndexToRequest(x)
92                         if _, ok := ws.activeRequests[r]; ok {
93                                 return true
94                         }
95                         err := ws.doRequest(r)
96                         ws.requesterCond.L.Unlock()
97                         if err != nil && !errors.Is(err, context.Canceled) {
98                                 log.Printf("requester %v: error doing webseed request %v: %v", i, r, err)
99                         }
100                         restart = true
101                         if errors.Is(err, webseed.ErrTooFast) {
102                                 time.Sleep(time.Duration(rand.Int63n(int64(10 * time.Second))))
103                         }
104                         ws.requesterCond.L.Lock()
105                         return false
106                 })
107                 if restart {
108                         goto start
109                 }
110                 ws.requesterCond.Wait()
111         }
112 }
113
114 func (ws *webseedPeer) connectionFlags() string {
115         return "WS"
116 }
117
118 // Maybe this should drop all existing connections, or something like that.
119 func (ws *webseedPeer) drop() {}
120
121 func (cn *webseedPeer) ban() {
122         cn.peer.close()
123 }
124
125 func (ws *webseedPeer) handleUpdateRequests() {
126         // Because this is synchronous, webseed peers seem to get first dibs on newly prioritized
127         // pieces.
128         go func() {
129                 ws.peer.t.cl.lock()
130                 defer ws.peer.t.cl.unlock()
131                 ws.peer.maybeUpdateActualRequestState()
132         }()
133 }
134
135 func (ws *webseedPeer) onClose() {
136         ws.peer.logger.Levelf(log.Debug, "closing")
137         // Just deleting them means we would have to manually cancel active requests.
138         ws.peer.cancelAllRequests()
139         ws.peer.t.iterPeers(func(p *Peer) {
140                 if p.isLowOnRequests() {
141                         p.updateRequests("webseedPeer.onClose")
142                 }
143         })
144         ws.requesterCond.Broadcast()
145 }
146
147 func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) error {
148         result := <-webseedRequest.Result
149         close(webseedRequest.Result) // one-shot
150         // We do this here rather than inside receiveChunk, since we want to count errors too. I'm not
151         // sure if we can divine which errors indicate cancellation on our end without hitting the
152         // network though.
153         if len(result.Bytes) != 0 || result.Err == nil {
154                 // Increment ChunksRead and friends
155                 ws.peer.doChunkReadStats(int64(len(result.Bytes)))
156         }
157         ws.peer.readBytes(int64(len(result.Bytes)))
158         ws.peer.t.cl.lock()
159         defer ws.peer.t.cl.unlock()
160         if ws.peer.t.closed.IsSet() {
161                 return nil
162         }
163         err := result.Err
164         if err != nil {
165                 switch {
166                 case errors.Is(err, context.Canceled):
167                 case errors.Is(err, webseed.ErrTooFast):
168                 case ws.peer.closed.IsSet():
169                 default:
170                         ws.peer.logger.Printf("Request %v rejected: %v", r, result.Err)
171                         // // Here lies my attempt to extract something concrete from Go's error system. RIP.
172                         // cfg := spew.NewDefaultConfig()
173                         // cfg.DisableMethods = true
174                         // cfg.Dump(result.Err)
175                         log.Printf("closing %v", ws)
176                         ws.peer.close()
177                 }
178                 if !ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r)) {
179                         panic("invalid reject")
180                 }
181                 return err
182         }
183         err = ws.peer.receiveChunk(&pp.Message{
184                 Type:  pp.Piece,
185                 Index: r.Index,
186                 Begin: r.Begin,
187                 Piece: result.Bytes,
188         })
189         if err != nil {
190                 panic(err)
191         }
192         return err
193 }
194
195 func (me *webseedPeer) peerPieces() *roaring.Bitmap {
196         return &me.client.Pieces
197 }
198
199 func (cn *webseedPeer) peerHasAllPieces() (all, known bool) {
200         if !cn.peer.t.haveInfo() {
201                 return true, false
202         }
203         return cn.client.Pieces.GetCardinality() == uint64(cn.peer.t.numPieces()), true
204 }