]> Sergey Matveev's repositories - btrtrc.git/blob - webseed-peer.go
Check if Torrent is closed before receiving webseed chunks
[btrtrc.git] / webseed-peer.go
1 package torrent
2
3 import (
4         "context"
5         "errors"
6         "fmt"
7         "sync"
8
9         "github.com/RoaringBitmap/roaring"
10         "github.com/anacrolix/log"
11         "github.com/anacrolix/torrent/metainfo"
12         pp "github.com/anacrolix/torrent/peer_protocol"
13         "github.com/anacrolix/torrent/webseed"
14 )
15
16 type webseedPeer struct {
17         client         webseed.Client
18         activeRequests map[Request]webseed.Request
19         requesterCond  sync.Cond
20         peer           Peer
21         // Number of requester routines.
22         maxRequests int
23 }
24
25 var _ peerImpl = (*webseedPeer)(nil)
26
27 func (me *webseedPeer) connStatusString() string {
28         return me.client.Url
29 }
30
31 func (ws *webseedPeer) String() string {
32         return fmt.Sprintf("webseed peer for %q", ws.client.Url)
33 }
34
35 func (ws *webseedPeer) onGotInfo(info *metainfo.Info) {
36         ws.client.SetInfo(info)
37         // There should be probably be a callback in Client instead, so it can remove pieces at its whim
38         // too.
39         ws.client.Pieces.Iterate(func(x uint32) bool {
40                 ws.peer.t.incPieceAvailability(pieceIndex(x))
41                 return true
42         })
43 }
44
45 func (ws *webseedPeer) writeInterested(interested bool) bool {
46         return true
47 }
48
49 func (ws *webseedPeer) _cancel(r RequestIndex) bool {
50         active, ok := ws.activeRequests[ws.peer.t.requestIndexToRequest(r)]
51         if ok {
52                 active.Cancel()
53                 if !ws.peer.deleteRequest(r) {
54                         panic("cancelled webseed request should exist")
55                 }
56                 if ws.peer.isLowOnRequests() {
57                         ws.peer.updateRequests("webseedPeer._cancel")
58                 }
59         }
60         return true
61 }
62
63 func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec {
64         return webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)}
65 }
66
67 func (ws *webseedPeer) _request(r Request) bool {
68         ws.requesterCond.Signal()
69         return true
70 }
71
72 func (ws *webseedPeer) doRequest(r Request) {
73         webseedRequest := ws.client.NewRequest(ws.intoSpec(r))
74         ws.activeRequests[r] = webseedRequest
75         func() {
76                 ws.requesterCond.L.Unlock()
77                 defer ws.requesterCond.L.Lock()
78                 ws.requestResultHandler(r, webseedRequest)
79         }()
80         delete(ws.activeRequests, r)
81 }
82
83 func (ws *webseedPeer) requester() {
84         ws.requesterCond.L.Lock()
85         defer ws.requesterCond.L.Unlock()
86 start:
87         for !ws.peer.closed.IsSet() {
88                 restart := false
89                 ws.peer.actualRequestState.Requests.Iterate(func(x uint32) bool {
90                         r := ws.peer.t.requestIndexToRequest(x)
91                         if _, ok := ws.activeRequests[r]; ok {
92                                 return true
93                         }
94                         ws.doRequest(r)
95                         restart = true
96                         return false
97                 })
98                 if restart {
99                         goto start
100                 }
101                 ws.requesterCond.Wait()
102         }
103 }
104
105 func (ws *webseedPeer) connectionFlags() string {
106         return "WS"
107 }
108
109 // TODO: This is called when banning peers. Perhaps we want to be able to ban webseeds too. We could
110 // return bool if this is even possible, and if it isn't, skip to the next drop candidate.
111 func (ws *webseedPeer) drop() {}
112
113 func (ws *webseedPeer) handleUpdateRequests() {
114         ws.peer.maybeUpdateActualRequestState()
115 }
116
117 func (ws *webseedPeer) onClose() {
118         ws.peer.logger.WithLevel(log.Debug).Print("closing")
119         ws.peer.deleteAllRequests()
120         for _, r := range ws.activeRequests {
121                 r.Cancel()
122         }
123         ws.requesterCond.Broadcast()
124 }
125
126 func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) {
127         result := <-webseedRequest.Result
128         // We do this here rather than inside receiveChunk, since we want to count errors too. I'm not
129         // sure if we can divine which errors indicate cancellation on our end without hitting the
130         // network though.
131         ws.peer.doChunkReadStats(int64(len(result.Bytes)))
132         ws.peer.readBytes(int64(len(result.Bytes)))
133         ws.peer.t.cl.lock()
134         defer ws.peer.t.cl.unlock()
135         if ws.peer.t.closed.IsSet() {
136                 return
137         }
138         if result.Err != nil {
139                 if !errors.Is(result.Err, context.Canceled) && !ws.peer.closed.IsSet() {
140                         ws.peer.logger.Printf("Request %v rejected: %v", r, result.Err)
141                         // cfg := spew.NewDefaultConfig()
142                         // cfg.DisableMethods = true
143                         // cfg.Dump(result.Err)
144                         log.Printf("closing %v", ws)
145                         ws.peer.close()
146                 }
147                 ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r))
148         } else {
149                 err := ws.peer.receiveChunk(&pp.Message{
150                         Type:  pp.Piece,
151                         Index: r.Index,
152                         Begin: r.Begin,
153                         Piece: result.Bytes,
154                 })
155                 if err != nil {
156                         panic(err)
157                 }
158         }
159 }
160
161 func (me *webseedPeer) isLowOnRequests() bool {
162         return me.peer.actualRequestState.Requests.GetCardinality() < uint64(me.maxRequests)
163 }
164
165 func (me *webseedPeer) peerPieces() *roaring.Bitmap {
166         return &me.client.Pieces
167 }
168
169 func (cn *webseedPeer) peerHasAllPieces() (all, known bool) {
170         if !cn.peer.t.haveInfo() {
171                 return true, false
172         }
173         return cn.client.Pieces.GetCardinality() == uint64(cn.peer.t.numPieces()), true
174 }