]> Sergey Matveev's repositories - btrtrc.git/blob - webtorrent/tracker_client.go
Merge pull request #410 from anacrolix/webseeds
[btrtrc.git] / webtorrent / tracker_client.go
1 package webtorrent
2
3 import (
4         "crypto/rand"
5         "encoding/json"
6         "fmt"
7         "sync"
8         "time"
9
10         "github.com/anacrolix/log"
11
12         "github.com/anacrolix/torrent/tracker"
13         "github.com/gorilla/websocket"
14         "github.com/pion/datachannel"
15         "github.com/pion/webrtc/v2"
16 )
17
18 type TrackerClientStats struct {
19         Dials                  int64
20         ConvertedInboundConns  int64
21         ConvertedOutboundConns int64
22 }
23
24 // Client represents the webtorrent client
25 type TrackerClient struct {
26         Url                string
27         GetAnnounceRequest func(_ tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error)
28         PeerId             [20]byte
29         OnConn             onDataChannelOpen
30         Logger             log.Logger
31
32         mu             sync.Mutex
33         cond           sync.Cond
34         outboundOffers map[string]outboundOffer // OfferID to outboundOffer
35         wsConn         *websocket.Conn
36         closed         bool
37         stats          TrackerClientStats
38 }
39
40 func (me *TrackerClient) Stats() TrackerClientStats {
41         me.mu.Lock()
42         defer me.mu.Unlock()
43         return me.stats
44 }
45
46 func (me *TrackerClient) peerIdBinary() string {
47         return binaryToJsonString(me.PeerId[:])
48 }
49
50 // outboundOffer represents an outstanding offer.
51 type outboundOffer struct {
52         originalOffer  webrtc.SessionDescription
53         peerConnection *wrappedPeerConnection
54         dataChannel    *webrtc.DataChannel
55         infoHash       [20]byte
56 }
57
58 type DataChannelContext struct {
59         Local, Remote webrtc.SessionDescription
60         OfferId       string
61         LocalOffered  bool
62         InfoHash      [20]byte
63 }
64
65 type onDataChannelOpen func(_ datachannel.ReadWriteCloser, dcc DataChannelContext)
66
67 func (tc *TrackerClient) doWebsocket() error {
68         metrics.Add("websocket dials", 1)
69         tc.stats.Dials++
70         c, _, err := websocket.DefaultDialer.Dial(tc.Url, nil)
71         if err != nil {
72                 return fmt.Errorf("dialing tracker: %w", err)
73         }
74         defer c.Close()
75         tc.Logger.WithDefaultLevel(log.Debug).Printf("dialed tracker %q", tc.Url)
76         tc.mu.Lock()
77         tc.wsConn = c
78         tc.cond.Broadcast()
79         tc.mu.Unlock()
80         err = tc.trackerReadLoop(tc.wsConn)
81         tc.mu.Lock()
82         tc.closeUnusedOffers()
83         c.Close()
84         tc.mu.Unlock()
85         return err
86 }
87
88 func (tc *TrackerClient) Run() error {
89         tc.cond.L = &tc.mu
90         tc.mu.Lock()
91         for !tc.closed {
92                 tc.mu.Unlock()
93                 err := tc.doWebsocket()
94                 level := log.Info
95                 tc.mu.Lock()
96                 if tc.closed {
97                         level = log.Debug
98                 }
99                 tc.mu.Unlock()
100                 tc.Logger.WithDefaultLevel(level).Printf("websocket instance ended: %v", err)
101                 time.Sleep(time.Minute)
102                 tc.mu.Lock()
103         }
104         tc.mu.Unlock()
105         return nil
106 }
107
108 func (tc *TrackerClient) Close() error {
109         tc.mu.Lock()
110         tc.closed = true
111         if tc.wsConn != nil {
112                 tc.wsConn.Close()
113         }
114         tc.mu.Unlock()
115         tc.cond.Broadcast()
116         return nil
117 }
118
119 func (tc *TrackerClient) closeUnusedOffers() {
120         for _, offer := range tc.outboundOffers {
121                 offer.peerConnection.Close()
122         }
123         tc.outboundOffers = nil
124 }
125
126 func (tc *TrackerClient) Announce(event tracker.AnnounceEvent, infoHash [20]byte) error {
127         metrics.Add("outbound announces", 1)
128         var randOfferId [20]byte
129         _, err := rand.Read(randOfferId[:])
130         if err != nil {
131                 return fmt.Errorf("generating offer_id bytes: %w", err)
132         }
133         offerIDBinary := binaryToJsonString(randOfferId[:])
134
135         pc, dc, offer, err := newOffer()
136         if err != nil {
137                 return fmt.Errorf("creating offer: %w", err)
138         }
139
140         request, err := tc.GetAnnounceRequest(event, infoHash)
141         if err != nil {
142                 return fmt.Errorf("getting announce parameters: %w", err)
143         }
144
145         req := AnnounceRequest{
146                 Numwant:    1, // If higher we need to create equal amount of offers.
147                 Uploaded:   request.Uploaded,
148                 Downloaded: request.Downloaded,
149                 Left:       request.Left,
150                 Event:      request.Event.String(),
151                 Action:     "announce",
152                 InfoHash:   binaryToJsonString(infoHash[:]),
153                 PeerID:     tc.peerIdBinary(),
154                 Offers: []Offer{{
155                         OfferID: offerIDBinary,
156                         Offer:   offer,
157                 }},
158         }
159
160         data, err := json.Marshal(req)
161         if err != nil {
162                 return fmt.Errorf("marshalling request: %w", err)
163         }
164
165         tc.mu.Lock()
166         defer tc.mu.Unlock()
167         err = tc.writeMessage(data)
168         if err != nil {
169                 pc.Close()
170                 return fmt.Errorf("write AnnounceRequest: %w", err)
171         }
172         if tc.outboundOffers == nil {
173                 tc.outboundOffers = make(map[string]outboundOffer)
174         }
175         tc.outboundOffers[offerIDBinary] = outboundOffer{
176                 peerConnection: pc,
177                 dataChannel:    dc,
178                 originalOffer:  offer,
179                 infoHash:       infoHash,
180         }
181         return nil
182 }
183
184 func (tc *TrackerClient) writeMessage(data []byte) error {
185         for tc.wsConn == nil {
186                 if tc.closed {
187                         return fmt.Errorf("%T closed", tc)
188                 }
189                 tc.cond.Wait()
190         }
191         return tc.wsConn.WriteMessage(websocket.TextMessage, data)
192 }
193
194 func (tc *TrackerClient) trackerReadLoop(tracker *websocket.Conn) error {
195         for {
196                 _, message, err := tracker.ReadMessage()
197                 if err != nil {
198                         return fmt.Errorf("read message error: %w", err)
199                 }
200                 //tc.Logger.WithDefaultLevel(log.Debug).Printf("received message from tracker: %q", message)
201
202                 var ar AnnounceResponse
203                 if err := json.Unmarshal(message, &ar); err != nil {
204                         tc.Logger.WithDefaultLevel(log.Warning).Printf("error unmarshalling announce response: %v", err)
205                         continue
206                 }
207                 switch {
208                 case ar.Offer != nil:
209                         ih, err := jsonStringToInfoHash(ar.InfoHash)
210                         if err != nil {
211                                 tc.Logger.WithDefaultLevel(log.Warning).Printf("error decoding info_hash in offer: %v", err)
212                                 break
213                         }
214                         tc.handleOffer(*ar.Offer, ar.OfferID, ih, ar.PeerID)
215                 case ar.Answer != nil:
216                         tc.handleAnswer(ar.OfferID, *ar.Answer)
217                 }
218         }
219 }
220
221 func (tc *TrackerClient) handleOffer(
222         offer webrtc.SessionDescription,
223         offerId string,
224         infoHash [20]byte,
225         peerId string,
226 ) error {
227         peerConnection, answer, err := newAnsweringPeerConnection(offer)
228         if err != nil {
229                 return fmt.Errorf("write AnnounceResponse: %w", err)
230         }
231         response := AnnounceResponse{
232                 Action:   "announce",
233                 InfoHash: binaryToJsonString(infoHash[:]),
234                 PeerID:   tc.peerIdBinary(),
235                 ToPeerID: peerId,
236                 Answer:   &answer,
237                 OfferID:  offerId,
238         }
239         data, err := json.Marshal(response)
240         if err != nil {
241                 peerConnection.Close()
242                 return fmt.Errorf("marshalling response: %w", err)
243         }
244         tc.mu.Lock()
245         defer tc.mu.Unlock()
246         if err := tc.writeMessage(data); err != nil {
247                 peerConnection.Close()
248                 return fmt.Errorf("writing response: %w", err)
249         }
250         timer := time.AfterFunc(30*time.Second, func() {
251                 metrics.Add("answering peer connections timed out", 1)
252                 peerConnection.Close()
253         })
254         peerConnection.OnDataChannel(func(d *webrtc.DataChannel) {
255                 setDataChannelOnOpen(d, peerConnection, func(dc datachannel.ReadWriteCloser) {
256                         timer.Stop()
257                         metrics.Add("answering peer connection conversions", 1)
258                         tc.mu.Lock()
259                         tc.stats.ConvertedInboundConns++
260                         tc.mu.Unlock()
261                         tc.OnConn(dc, DataChannelContext{
262                                 Local:        answer,
263                                 Remote:       offer,
264                                 OfferId:      offerId,
265                                 LocalOffered: false,
266                                 InfoHash:     infoHash,
267                         })
268                 })
269         })
270         return nil
271 }
272
273 func (tc *TrackerClient) handleAnswer(offerId string, answer webrtc.SessionDescription) {
274         tc.mu.Lock()
275         defer tc.mu.Unlock()
276         offer, ok := tc.outboundOffers[offerId]
277         if !ok {
278                 tc.Logger.WithDefaultLevel(log.Warning).Printf("could not find offer for id %q", offerId)
279                 return
280         }
281         //tc.Logger.WithDefaultLevel(log.Debug).Printf("offer %q got answer %v", offerId, answer)
282         metrics.Add("outbound offers answered", 1)
283         err := offer.setAnswer(answer, func(dc datachannel.ReadWriteCloser) {
284                 metrics.Add("outbound offers answered with datachannel", 1)
285                 tc.mu.Lock()
286                 tc.stats.ConvertedOutboundConns++
287                 tc.mu.Unlock()
288                 tc.OnConn(dc, DataChannelContext{
289                         Local:        offer.originalOffer,
290                         Remote:       answer,
291                         OfferId:      offerId,
292                         LocalOffered: true,
293                         InfoHash:     offer.infoHash,
294                 })
295         })
296         if err != nil {
297                 tc.Logger.WithDefaultLevel(log.Warning).Printf("error using outbound offer answer: %v", err)
298                 return
299         }
300         delete(tc.outboundOffers, offerId)
301         go tc.Announce(tracker.None, offer.infoHash)
302 }