]> Sergey Matveev's repositories - btrtrc.git/blob - webseed-peer.go
Remove unused peerImpl methods
[btrtrc.git] / webseed-peer.go
1 package torrent
2
3 import (
4         "context"
5         "errors"
6         "fmt"
7         "net/http"
8         "strings"
9         "sync"
10
11         "github.com/anacrolix/torrent/common"
12         "github.com/anacrolix/torrent/metainfo"
13         pp "github.com/anacrolix/torrent/peer_protocol"
14         "github.com/anacrolix/torrent/segments"
15         "github.com/anacrolix/torrent/webseed"
16 )
17
18 type webseedPeer struct {
19         client         webseed.Client
20         activeRequests map[Request]webseed.Request
21         requesterCond  sync.Cond
22         peer           Peer
23 }
24
25 var _ peerImpl = (*webseedPeer)(nil)
26
27 func (me *webseedPeer) connStatusString() string {
28         return me.client.Url
29 }
30
31 func (ws *webseedPeer) String() string {
32         return fmt.Sprintf("webseed peer for %q", ws.client.Url)
33 }
34
35 func (ws *webseedPeer) onGotInfo(info *metainfo.Info) {
36         ws.client.FileIndex = segments.NewIndex(common.LengthIterFromUpvertedFiles(info.UpvertedFiles()))
37         ws.client.Info = info
38 }
39
40 func (ws *webseedPeer) writeInterested(interested bool) bool {
41         return true
42 }
43
44 func (ws *webseedPeer) _cancel(r RequestIndex) bool {
45         active, ok := ws.activeRequests[ws.peer.t.requestIndexToRequest(r)]
46         if ok {
47                 active.Cancel()
48                 if !ws.peer.deleteRequest(r) {
49                         panic("cancelled webseed request should exist")
50                 }
51                 if ws.peer.actualRequestState.Requests.IsEmpty() {
52                         ws.peer.updateRequests("webseedPeer._cancel")
53                 }
54         }
55         return true
56 }
57
58 func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec {
59         return webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)}
60 }
61
62 func (ws *webseedPeer) _request(r Request) bool {
63         ws.requesterCond.Signal()
64         return true
65 }
66
67 func (ws *webseedPeer) doRequest(r Request) {
68         webseedRequest := ws.client.NewRequest(ws.intoSpec(r))
69         ws.activeRequests[r] = webseedRequest
70         func() {
71                 ws.requesterCond.L.Unlock()
72                 defer ws.requesterCond.L.Lock()
73                 ws.requestResultHandler(r, webseedRequest)
74         }()
75         delete(ws.activeRequests, r)
76 }
77
78 func (ws *webseedPeer) requester() {
79         ws.requesterCond.L.Lock()
80         defer ws.requesterCond.L.Unlock()
81 start:
82         for !ws.peer.closed.IsSet() {
83                 restart := false
84                 ws.peer.actualRequestState.Requests.Iterate(func(x uint32) bool {
85                         r := ws.peer.t.requestIndexToRequest(x)
86                         if _, ok := ws.activeRequests[r]; ok {
87                                 return true
88                         }
89                         ws.doRequest(r)
90                         restart = true
91                         return false
92                 })
93                 if restart {
94                         goto start
95                 }
96                 ws.requesterCond.Wait()
97         }
98 }
99
100 func (ws *webseedPeer) connectionFlags() string {
101         return "WS"
102 }
103
104 // TODO: This is called when banning peers. Perhaps we want to be able to ban webseeds too. We could
105 // return bool if this is even possible, and if it isn't, skip to the next drop candidate.
106 func (ws *webseedPeer) drop() {}
107
108 func (ws *webseedPeer) updateRequests(reason string) {
109 }
110
111 func (ws *webseedPeer) onClose() {
112         ws.peer.logger.Print("closing")
113         for _, r := range ws.activeRequests {
114                 r.Cancel()
115         }
116         ws.requesterCond.Broadcast()
117 }
118
119 func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) {
120         result := <-webseedRequest.Result
121         // We do this here rather than inside receiveChunk, since we want to count errors too. I'm not
122         // sure if we can divine which errors indicate cancellation on our end without hitting the
123         // network though.
124         ws.peer.doChunkReadStats(int64(len(result.Bytes)))
125         ws.peer.t.cl.lock()
126         defer ws.peer.t.cl.unlock()
127         if result.Err != nil {
128                 if !errors.Is(result.Err, context.Canceled) {
129                         ws.peer.logger.Printf("Request %v rejected: %v", r, result.Err)
130                 }
131                 // We need to filter out temporary errors, but this is a nightmare in Go. Currently a bad
132                 // webseed URL can starve out the good ones due to the chunk selection algorithm.
133                 const closeOnAllErrors = false
134                 if closeOnAllErrors ||
135                         strings.Contains(result.Err.Error(), "unsupported protocol scheme") ||
136                         func() bool {
137                                 var err webseed.ErrBadResponse
138                                 if !errors.As(result.Err, &err) {
139                                         return false
140                                 }
141                                 return err.Response.StatusCode == http.StatusNotFound
142                         }() {
143                         ws.peer.close()
144                 } else {
145                         ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r))
146                 }
147         } else {
148                 err := ws.peer.receiveChunk(&pp.Message{
149                         Type:  pp.Piece,
150                         Index: r.Index,
151                         Begin: r.Begin,
152                         Piece: result.Bytes,
153                 })
154                 if err != nil {
155                         panic(err)
156                 }
157         }
158 }