]> Sergey Matveev's repositories - btrtrc.git/blob - webseed-peer.go
Record an observation about why webseed peers are doing most of the work in recent...
[btrtrc.git] / webseed-peer.go
1 package torrent
2
3 import (
4         "context"
5         "errors"
6         "fmt"
7         "sync"
8
9         "github.com/RoaringBitmap/roaring"
10         "github.com/anacrolix/log"
11         "github.com/anacrolix/torrent/metainfo"
12         pp "github.com/anacrolix/torrent/peer_protocol"
13         "github.com/anacrolix/torrent/webseed"
14 )
15
16 type webseedPeer struct {
17         client         webseed.Client
18         activeRequests map[Request]webseed.Request
19         requesterCond  sync.Cond
20         peer           Peer
21         // Number of requester routines.
22         maxRequests int
23 }
24
25 var _ peerImpl = (*webseedPeer)(nil)
26
27 func (me *webseedPeer) connStatusString() string {
28         return me.client.Url
29 }
30
31 func (ws *webseedPeer) String() string {
32         return fmt.Sprintf("webseed peer for %q", ws.client.Url)
33 }
34
35 func (ws *webseedPeer) onGotInfo(info *metainfo.Info) {
36         ws.client.SetInfo(info)
37         // There should be probably be a callback in Client instead, so it can remove pieces at its whim
38         // too.
39         ws.client.Pieces.Iterate(func(x uint32) bool {
40                 ws.peer.t.incPieceAvailability(pieceIndex(x))
41                 return true
42         })
43 }
44
45 func (ws *webseedPeer) writeInterested(interested bool) bool {
46         return true
47 }
48
49 func (ws *webseedPeer) _cancel(r RequestIndex) bool {
50         active, ok := ws.activeRequests[ws.peer.t.requestIndexToRequest(r)]
51         if ok {
52                 active.Cancel()
53         }
54         if !ws.peer.deleteRequest(r) {
55                 panic("cancelled webseed request should exist")
56         }
57         if ws.peer.isLowOnRequests() {
58                 ws.peer.updateRequests("webseedPeer._cancel")
59         }
60         return true
61 }
62
63 func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec {
64         return webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)}
65 }
66
67 func (ws *webseedPeer) _request(r Request) bool {
68         ws.requesterCond.Signal()
69         return true
70 }
71
72 func (ws *webseedPeer) doRequest(r Request) {
73         webseedRequest := ws.client.NewRequest(ws.intoSpec(r))
74         ws.activeRequests[r] = webseedRequest
75         func() {
76                 ws.requesterCond.L.Unlock()
77                 defer ws.requesterCond.L.Lock()
78                 ws.requestResultHandler(r, webseedRequest)
79         }()
80         delete(ws.activeRequests, r)
81 }
82
83 func (ws *webseedPeer) requester() {
84         ws.requesterCond.L.Lock()
85         defer ws.requesterCond.L.Unlock()
86 start:
87         for !ws.peer.closed.IsSet() {
88                 restart := false
89                 ws.peer.actualRequestState.Requests.Iterate(func(x uint32) bool {
90                         r := ws.peer.t.requestIndexToRequest(x)
91                         if _, ok := ws.activeRequests[r]; ok {
92                                 return true
93                         }
94                         ws.doRequest(r)
95                         restart = true
96                         return false
97                 })
98                 if restart {
99                         goto start
100                 }
101                 ws.requesterCond.Wait()
102         }
103 }
104
105 func (ws *webseedPeer) connectionFlags() string {
106         return "WS"
107 }
108
109 // TODO: This is called when banning peers. Perhaps we want to be able to ban webseeds too. We could
110 // return bool if this is even possible, and if it isn't, skip to the next drop candidate.
111 func (ws *webseedPeer) drop() {}
112
113 func (ws *webseedPeer) handleUpdateRequests() {
114         // Because this is synchronous, webseed peers seem to get first dibs on newly prioritized
115         // pieces.
116         ws.peer.maybeUpdateActualRequestState()
117 }
118
119 func (ws *webseedPeer) onClose() {
120         ws.peer.logger.WithLevel(log.Debug).Print("closing")
121         ws.peer.deleteAllRequests()
122         for _, r := range ws.activeRequests {
123                 r.Cancel()
124         }
125         ws.requesterCond.Broadcast()
126 }
127
128 func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) {
129         result := <-webseedRequest.Result
130         close(webseedRequest.Result) // one-shot
131         // We do this here rather than inside receiveChunk, since we want to count errors too. I'm not
132         // sure if we can divine which errors indicate cancellation on our end without hitting the
133         // network though.
134         if len(result.Bytes) != 0 || result.Err == nil {
135                 // Increment ChunksRead and friends
136                 ws.peer.doChunkReadStats(int64(len(result.Bytes)))
137         }
138         ws.peer.readBytes(int64(len(result.Bytes)))
139         ws.peer.t.cl.lock()
140         defer ws.peer.t.cl.unlock()
141         if ws.peer.t.closed.IsSet() {
142                 return
143         }
144         if result.Err != nil {
145                 if !errors.Is(result.Err, context.Canceled) && !ws.peer.closed.IsSet() {
146                         ws.peer.logger.Printf("Request %v rejected: %v", r, result.Err)
147                         // // Here lies my attempt to extract something concrete from Go's error system. RIP.
148                         // cfg := spew.NewDefaultConfig()
149                         // cfg.DisableMethods = true
150                         // cfg.Dump(result.Err)
151                         log.Printf("closing %v", ws)
152                         ws.peer.close()
153                 }
154                 ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r))
155         } else {
156                 err := ws.peer.receiveChunk(&pp.Message{
157                         Type:  pp.Piece,
158                         Index: r.Index,
159                         Begin: r.Begin,
160                         Piece: result.Bytes,
161                 })
162                 if err != nil {
163                         panic(err)
164                 }
165         }
166 }
167
168 func (me *webseedPeer) isLowOnRequests() bool {
169         return me.peer.actualRequestState.Requests.GetCardinality() < uint64(me.maxRequests)
170 }
171
172 func (me *webseedPeer) peerPieces() *roaring.Bitmap {
173         return &me.client.Pieces
174 }
175
176 func (cn *webseedPeer) peerHasAllPieces() (all, known bool) {
177         if !cn.peer.t.haveInfo() {
178                 return true, false
179         }
180         return cn.client.Pieces.GetCardinality() == uint64(cn.peer.t.numPieces()), true
181 }