]> Sergey Matveev's repositories - btrtrc.git/blob - webtorrent/tracker_client.go
0948cdcdeacad63228fcc63296ecaf080d604995
[btrtrc.git] / webtorrent / tracker_client.go
1 package webtorrent
2
3 import (
4         "crypto/rand"
5         "encoding/json"
6         "fmt"
7         "sync"
8         "time"
9
10         "github.com/anacrolix/log"
11
12         "github.com/anacrolix/torrent/tracker"
13         "github.com/gorilla/websocket"
14         "github.com/pion/datachannel"
15         "github.com/pion/webrtc/v2"
16 )
17
18 type TrackerClientStats struct {
19         Dials                  int64
20         ConvertedInboundConns  int64
21         ConvertedOutboundConns int64
22 }
23
24 // Client represents the webtorrent client
25 type TrackerClient struct {
26         Url                string
27         GetAnnounceRequest func(_ tracker.AnnounceEvent, infoHash [20]byte) tracker.AnnounceRequest
28         PeerId             [20]byte
29         OnConn             onDataChannelOpen
30         Logger             log.Logger
31
32         mu             sync.Mutex
33         cond           sync.Cond
34         outboundOffers map[string]outboundOffer // OfferID to outboundOffer
35         wsConn         *websocket.Conn
36         closed         bool
37         stats          TrackerClientStats
38 }
39
40 func (me *TrackerClient) Stats() TrackerClientStats {
41         me.mu.Lock()
42         defer me.mu.Unlock()
43         return me.stats
44 }
45
46 func (me *TrackerClient) peerIdBinary() string {
47         return binaryToJsonString(me.PeerId[:])
48 }
49
50 // outboundOffer represents an outstanding offer.
51 type outboundOffer struct {
52         originalOffer  webrtc.SessionDescription
53         peerConnection wrappedPeerConnection
54         dataChannel    *webrtc.DataChannel
55         infoHash       [20]byte
56 }
57
58 type DataChannelContext struct {
59         Local, Remote webrtc.SessionDescription
60         OfferId       string
61         LocalOffered  bool
62         InfoHash      [20]byte
63 }
64
65 type onDataChannelOpen func(_ datachannel.ReadWriteCloser, dcc DataChannelContext)
66
67 func (tc *TrackerClient) doWebsocket() error {
68         metrics.Add("websocket dials", 1)
69         tc.stats.Dials++
70         c, _, err := websocket.DefaultDialer.Dial(tc.Url, nil)
71         if err != nil {
72                 return fmt.Errorf("dialing tracker: %w", err)
73         }
74         defer c.Close()
75         tc.Logger.WithDefaultLevel(log.Debug).Printf("dialed tracker %q", tc.Url)
76         tc.mu.Lock()
77         tc.wsConn = c
78         tc.cond.Broadcast()
79         tc.mu.Unlock()
80         err = tc.trackerReadLoop(tc.wsConn)
81         tc.mu.Lock()
82         tc.closeUnusedOffers()
83         c.Close()
84         tc.mu.Unlock()
85         return err
86 }
87
88 func (tc *TrackerClient) Run() error {
89         tc.cond.L = &tc.mu
90         tc.mu.Lock()
91         for !tc.closed {
92                 tc.mu.Unlock()
93                 err := tc.doWebsocket()
94                 tc.Logger.WithDefaultLevel(log.Warning).Printf("websocket instance ended: %v", err)
95                 time.Sleep(time.Minute)
96                 tc.mu.Lock()
97         }
98         tc.mu.Unlock()
99         return nil
100 }
101
102 func (tc *TrackerClient) Close() error {
103         tc.mu.Lock()
104         tc.closed = true
105         if tc.wsConn != nil {
106                 tc.wsConn.Close()
107         }
108         tc.mu.Unlock()
109         tc.cond.Broadcast()
110         return nil
111 }
112
113 func (tc *TrackerClient) closeUnusedOffers() {
114         for _, offer := range tc.outboundOffers {
115                 offer.peerConnection.Close()
116         }
117         tc.outboundOffers = nil
118 }
119
120 func (tc *TrackerClient) Announce(event tracker.AnnounceEvent, infoHash [20]byte) error {
121         metrics.Add("outbound announces", 1)
122         var randOfferId [20]byte
123         _, err := rand.Read(randOfferId[:])
124         if err != nil {
125                 return fmt.Errorf("generating offer_id bytes: %w", err)
126         }
127         offerIDBinary := binaryToJsonString(randOfferId[:])
128
129         pc, dc, offer, err := newOffer()
130         if err != nil {
131                 return fmt.Errorf("creating offer: %w", err)
132         }
133
134         request := tc.GetAnnounceRequest(event, infoHash)
135
136         req := AnnounceRequest{
137                 Numwant:    1, // If higher we need to create equal amount of offers.
138                 Uploaded:   request.Uploaded,
139                 Downloaded: request.Downloaded,
140                 Left:       request.Left,
141                 Event:      request.Event.String(),
142                 Action:     "announce",
143                 InfoHash:   binaryToJsonString(infoHash[:]),
144                 PeerID:     tc.peerIdBinary(),
145                 Offers: []Offer{{
146                         OfferID: offerIDBinary,
147                         Offer:   offer,
148                 }},
149         }
150
151         data, err := json.Marshal(req)
152         if err != nil {
153                 return fmt.Errorf("marshalling request: %w", err)
154         }
155
156         tc.mu.Lock()
157         defer tc.mu.Unlock()
158         err = tc.writeMessage(data)
159         if err != nil {
160                 pc.Close()
161                 return fmt.Errorf("write AnnounceRequest: %w", err)
162         }
163         if tc.outboundOffers == nil {
164                 tc.outboundOffers = make(map[string]outboundOffer)
165         }
166         tc.outboundOffers[offerIDBinary] = outboundOffer{
167                 peerConnection: pc,
168                 dataChannel:    dc,
169                 originalOffer:  offer,
170                 infoHash:       infoHash,
171         }
172         return nil
173 }
174
175 func (tc *TrackerClient) writeMessage(data []byte) error {
176         for tc.wsConn == nil {
177                 if tc.closed {
178                         return fmt.Errorf("%T closed", tc)
179                 }
180                 tc.cond.Wait()
181         }
182         return tc.wsConn.WriteMessage(websocket.TextMessage, data)
183 }
184
185 func (tc *TrackerClient) trackerReadLoop(tracker *websocket.Conn) error {
186         for {
187                 _, message, err := tracker.ReadMessage()
188                 if err != nil {
189                         return fmt.Errorf("read message error: %w", err)
190                 }
191                 //tc.Logger.WithDefaultLevel(log.Debug).Printf("received message from tracker: %q", message)
192
193                 var ar AnnounceResponse
194                 if err := json.Unmarshal(message, &ar); err != nil {
195                         tc.Logger.WithDefaultLevel(log.Warning).Printf("error unmarshalling announce response: %v", err)
196                         continue
197                 }
198                 switch {
199                 case ar.Offer != nil:
200                         ih, err := jsonStringToInfoHash(ar.InfoHash)
201                         if err != nil {
202                                 tc.Logger.WithDefaultLevel(log.Warning).Printf("error decoding info_hash in offer: %v", err)
203                                 break
204                         }
205                         tc.handleOffer(*ar.Offer, ar.OfferID, ih, ar.PeerID)
206                 case ar.Answer != nil:
207                         tc.handleAnswer(ar.OfferID, *ar.Answer)
208                 }
209         }
210 }
211
212 func (tc *TrackerClient) handleOffer(
213         offer webrtc.SessionDescription,
214         offerId string,
215         infoHash [20]byte,
216         peerId string,
217 ) error {
218         peerConnection, answer, err := newAnsweringPeerConnection(offer)
219         if err != nil {
220                 return fmt.Errorf("write AnnounceResponse: %w", err)
221         }
222         response := AnnounceResponse{
223                 Action:   "announce",
224                 InfoHash: binaryToJsonString(infoHash[:]),
225                 PeerID:   tc.peerIdBinary(),
226                 ToPeerID: peerId,
227                 Answer:   &answer,
228                 OfferID:  offerId,
229         }
230         data, err := json.Marshal(response)
231         if err != nil {
232                 peerConnection.Close()
233                 return fmt.Errorf("marshalling response: %w", err)
234         }
235         tc.mu.Lock()
236         defer tc.mu.Unlock()
237         if err := tc.writeMessage(data); err != nil {
238                 peerConnection.Close()
239                 return fmt.Errorf("writing response: %w", err)
240         }
241         timer := time.AfterFunc(30*time.Second, func() {
242                 metrics.Add("answering peer connections timed out", 1)
243                 peerConnection.Close()
244         })
245         peerConnection.OnDataChannel(func(d *webrtc.DataChannel) {
246                 setDataChannelOnOpen(d, peerConnection, func(dc datachannel.ReadWriteCloser) {
247                         timer.Stop()
248                         metrics.Add("answering peer connection conversions", 1)
249                         tc.mu.Lock()
250                         tc.stats.ConvertedInboundConns++
251                         tc.mu.Unlock()
252                         tc.OnConn(dc, DataChannelContext{
253                                 Local:        answer,
254                                 Remote:       offer,
255                                 OfferId:      offerId,
256                                 LocalOffered: false,
257                                 InfoHash:     infoHash,
258                         })
259                 })
260         })
261         return nil
262 }
263
264 func (tc *TrackerClient) handleAnswer(offerId string, answer webrtc.SessionDescription) {
265         tc.mu.Lock()
266         defer tc.mu.Unlock()
267         offer, ok := tc.outboundOffers[offerId]
268         if !ok {
269                 tc.Logger.WithDefaultLevel(log.Warning).Printf("could not find offer for id %q", offerId)
270                 return
271         }
272         //tc.Logger.WithDefaultLevel(log.Debug).Printf("offer %q got answer %v", offerId, answer)
273         metrics.Add("outbound offers answered", 1)
274         err := offer.setAnswer(answer, func(dc datachannel.ReadWriteCloser) {
275                 metrics.Add("outbound offers answered with datachannel", 1)
276                 tc.mu.Lock()
277                 tc.stats.ConvertedOutboundConns++
278                 tc.mu.Unlock()
279                 tc.OnConn(dc, DataChannelContext{
280                         Local:        offer.originalOffer,
281                         Remote:       answer,
282                         OfferId:      offerId,
283                         LocalOffered: true,
284                         InfoHash:     offer.infoHash,
285                 })
286         })
287         if err != nil {
288                 tc.Logger.WithDefaultLevel(log.Warning).Printf("error using outbound offer answer: %v", err)
289                 return
290         }
291         delete(tc.outboundOffers, offerId)
292         go tc.Announce(tracker.None, offer.infoHash)
293 }