]> Sergey Matveev's repositories - btrtrc.git/blob - webtorrent/tracker_client.go
14b62ed28a5d6faca5542b627cab64b8df090d3d
[btrtrc.git] / webtorrent / tracker_client.go
1 package webtorrent
2
3 import (
4         "crypto/rand"
5         "encoding/json"
6         "fmt"
7         "sync"
8         "time"
9
10         "github.com/anacrolix/log"
11
12         "github.com/anacrolix/torrent/tracker"
13         "github.com/gorilla/websocket"
14         "github.com/pion/datachannel"
15         "github.com/pion/webrtc/v2"
16 )
17
18 // Client represents the webtorrent client
19 type TrackerClient struct {
20         Url                string
21         GetAnnounceRequest func(_ tracker.AnnounceEvent, infoHash [20]byte) tracker.AnnounceRequest
22         PeerId             [20]byte
23         OnConn             onDataChannelOpen
24         Logger             log.Logger
25
26         mu             sync.Mutex
27         cond           sync.Cond
28         outboundOffers map[string]outboundOffer // OfferID to outboundOffer
29         wsConn         *websocket.Conn
30         closed         bool
31 }
32
33 func (me *TrackerClient) peerIdBinary() string {
34         return binaryToJsonString(me.PeerId[:])
35 }
36
37 // outboundOffer represents an outstanding offer.
38 type outboundOffer struct {
39         originalOffer  webrtc.SessionDescription
40         peerConnection wrappedPeerConnection
41         dataChannel    *webrtc.DataChannel
42         infoHash       [20]byte
43 }
44
45 type DataChannelContext struct {
46         Local, Remote webrtc.SessionDescription
47         OfferId       string
48         LocalOffered  bool
49         InfoHash      [20]byte
50 }
51
52 type onDataChannelOpen func(_ datachannel.ReadWriteCloser, dcc DataChannelContext)
53
54 func (tc *TrackerClient) doWebsocket() error {
55         metrics.Add("websocket dials", 1)
56         c, _, err := websocket.DefaultDialer.Dial(tc.Url, nil)
57         if err != nil {
58                 return fmt.Errorf("dialing tracker: %w", err)
59         }
60         defer c.Close()
61         tc.Logger.WithDefaultLevel(log.Debug).Printf("dialed tracker %q", tc.Url)
62         tc.mu.Lock()
63         tc.wsConn = c
64         tc.cond.Broadcast()
65         tc.mu.Unlock()
66         err = tc.trackerReadLoop(tc.wsConn)
67         tc.mu.Lock()
68         tc.closeUnusedOffers()
69         c.Close()
70         tc.mu.Unlock()
71         return err
72 }
73
74 func (tc *TrackerClient) Run() error {
75         tc.cond.L = &tc.mu
76         tc.mu.Lock()
77         for !tc.closed {
78                 tc.mu.Unlock()
79                 err := tc.doWebsocket()
80                 tc.Logger.WithDefaultLevel(log.Warning).Printf("websocket instance ended: %v", err)
81                 time.Sleep(time.Minute)
82                 tc.mu.Lock()
83         }
84         tc.mu.Unlock()
85         return nil
86 }
87
88 func (tc *TrackerClient) Close() error {
89         tc.mu.Lock()
90         tc.closed = true
91         if tc.wsConn != nil {
92                 tc.wsConn.Close()
93         }
94         tc.mu.Unlock()
95         tc.cond.Broadcast()
96         return nil
97 }
98
99 func (tc *TrackerClient) closeUnusedOffers() {
100         for _, offer := range tc.outboundOffers {
101                 offer.peerConnection.Close()
102         }
103         tc.outboundOffers = nil
104 }
105
106 func (tc *TrackerClient) Announce(event tracker.AnnounceEvent, infoHash [20]byte) error {
107         metrics.Add("outbound announces", 1)
108         var randOfferId [20]byte
109         _, err := rand.Read(randOfferId[:])
110         if err != nil {
111                 return fmt.Errorf("generating offer_id bytes: %w", err)
112         }
113         offerIDBinary := binaryToJsonString(randOfferId[:])
114
115         pc, dc, offer, err := newOffer()
116         if err != nil {
117                 return fmt.Errorf("creating offer: %w", err)
118         }
119
120         request := tc.GetAnnounceRequest(event, infoHash)
121
122         req := AnnounceRequest{
123                 Numwant:    1, // If higher we need to create equal amount of offers.
124                 Uploaded:   request.Uploaded,
125                 Downloaded: request.Downloaded,
126                 Left:       request.Left,
127                 Event:      request.Event.String(),
128                 Action:     "announce",
129                 InfoHash:   binaryToJsonString(infoHash[:]),
130                 PeerID:     tc.peerIdBinary(),
131                 Offers: []Offer{{
132                         OfferID: offerIDBinary,
133                         Offer:   offer,
134                 }},
135         }
136
137         data, err := json.Marshal(req)
138         if err != nil {
139                 return fmt.Errorf("marshalling request: %w", err)
140         }
141
142         tc.mu.Lock()
143         defer tc.mu.Unlock()
144         err = tc.writeMessage(data)
145         if err != nil {
146                 pc.Close()
147                 return fmt.Errorf("write AnnounceRequest: %w", err)
148         }
149         if tc.outboundOffers == nil {
150                 tc.outboundOffers = make(map[string]outboundOffer)
151         }
152         tc.outboundOffers[offerIDBinary] = outboundOffer{
153                 peerConnection: pc,
154                 dataChannel:    dc,
155                 originalOffer:  offer,
156                 infoHash:       infoHash,
157         }
158         return nil
159 }
160
161 func (tc *TrackerClient) writeMessage(data []byte) error {
162         for tc.wsConn == nil {
163                 if tc.closed {
164                         return fmt.Errorf("%T closed", tc)
165                 }
166                 tc.cond.Wait()
167         }
168         return tc.wsConn.WriteMessage(websocket.TextMessage, data)
169 }
170
171 func (tc *TrackerClient) trackerReadLoop(tracker *websocket.Conn) error {
172         for {
173                 _, message, err := tracker.ReadMessage()
174                 if err != nil {
175                         return fmt.Errorf("read message error: %w", err)
176                 }
177                 //tc.Logger.WithDefaultLevel(log.Debug).Printf("received message from tracker: %q", message)
178
179                 var ar AnnounceResponse
180                 if err := json.Unmarshal(message, &ar); err != nil {
181                         tc.Logger.WithDefaultLevel(log.Warning).Printf("error unmarshalling announce response: %v", err)
182                         continue
183                 }
184                 switch {
185                 case ar.Offer != nil:
186                         ih, err := jsonStringToInfoHash(ar.InfoHash)
187                         if err != nil {
188                                 tc.Logger.WithDefaultLevel(log.Warning).Printf("error decoding info_hash in offer: %v", err)
189                                 break
190                         }
191                         tc.handleOffer(*ar.Offer, ar.OfferID, ih, ar.PeerID)
192                 case ar.Answer != nil:
193                         tc.handleAnswer(ar.OfferID, *ar.Answer)
194                 }
195         }
196 }
197
198 func (tc *TrackerClient) handleOffer(
199         offer webrtc.SessionDescription,
200         offerId string,
201         infoHash [20]byte,
202         peerId string,
203 ) error {
204         peerConnection, answer, err := newAnsweringPeerConnection(offer)
205         if err != nil {
206                 return fmt.Errorf("write AnnounceResponse: %w", err)
207         }
208         response := AnnounceResponse{
209                 Action:   "announce",
210                 InfoHash: binaryToJsonString(infoHash[:]),
211                 PeerID:   tc.peerIdBinary(),
212                 ToPeerID: peerId,
213                 Answer:   &answer,
214                 OfferID:  offerId,
215         }
216         data, err := json.Marshal(response)
217         if err != nil {
218                 peerConnection.Close()
219                 return fmt.Errorf("marshalling response: %w", err)
220         }
221         tc.mu.Lock()
222         defer tc.mu.Unlock()
223         if err := tc.writeMessage(data); err != nil {
224                 peerConnection.Close()
225                 return fmt.Errorf("writing response: %w", err)
226         }
227         timer := time.AfterFunc(30*time.Second, func() {
228                 metrics.Add("answering peer connections timed out", 1)
229                 peerConnection.Close()
230         })
231         peerConnection.OnDataChannel(func(d *webrtc.DataChannel) {
232                 setDataChannelOnOpen(d, peerConnection, func(dc datachannel.ReadWriteCloser) {
233                         timer.Stop()
234                         metrics.Add("answering peer connection conversions", 1)
235                         tc.OnConn(dc, DataChannelContext{
236                                 Local:        answer,
237                                 Remote:       offer,
238                                 OfferId:      offerId,
239                                 LocalOffered: false,
240                                 InfoHash:     infoHash,
241                         })
242                 })
243         })
244         return nil
245 }
246
247 func (tc *TrackerClient) handleAnswer(offerId string, answer webrtc.SessionDescription) {
248         tc.mu.Lock()
249         defer tc.mu.Unlock()
250         offer, ok := tc.outboundOffers[offerId]
251         if !ok {
252                 tc.Logger.WithDefaultLevel(log.Warning).Printf("could not find offer for id %q", offerId)
253                 return
254         }
255         //tc.Logger.WithDefaultLevel(log.Debug).Printf("offer %q got answer %v", offerId, answer)
256         metrics.Add("outbound offers answered", 1)
257         err := offer.setAnswer(answer, func(dc datachannel.ReadWriteCloser) {
258                 metrics.Add("outbound offers answered with datachannel", 1)
259                 tc.OnConn(dc, DataChannelContext{
260                         Local:        offer.originalOffer,
261                         Remote:       answer,
262                         OfferId:      offerId,
263                         LocalOffered: true,
264                         InfoHash:     offer.infoHash,
265                 })
266         })
267         if err != nil {
268                 tc.Logger.WithDefaultLevel(log.Warning).Printf("error using outbound offer answer: %v", err)
269                 return
270         }
271         delete(tc.outboundOffers, offerId)
272         go tc.Announce(tracker.None, offer.infoHash)
273 }