]> Sergey Matveev's repositories - btrtrc.git/blob - webtorrent/client.go
Improve webtorrent tracker client logging
[btrtrc.git] / webtorrent / client.go
1 package webtorrent
2
3 import (
4         "encoding/json"
5         "fmt"
6         "sync"
7
8         "github.com/anacrolix/log"
9
10         "github.com/anacrolix/torrent/tracker"
11         "github.com/anacrolix/torrent/webtorrent/buffer"
12         "github.com/gorilla/websocket"
13         "github.com/pion/datachannel"
14         "github.com/pion/webrtc/v2"
15 )
16
17 // Client represents the webtorrent client
18 type Client struct {
19         lock           sync.Mutex
20         peerIDBinary   string
21         infoHashBinary string
22         outboundOffers map[string]outboundOffer // OfferID to outboundOffer
23         tracker        *websocket.Conn
24         onConn         onDataChannelOpen
25         logger         log.Logger
26 }
27
28 // outboundOffer represents an outstanding offer.
29 type outboundOffer struct {
30         originalOffer webrtc.SessionDescription
31         transport     *Transport
32 }
33
34 func binaryToJsonString(b []byte) string {
35         var seq []rune
36         for _, v := range b {
37                 seq = append(seq, rune(v))
38         }
39         return string(seq)
40 }
41
42 type DataChannelContext struct {
43         Local, Remote webrtc.SessionDescription
44         OfferId       string
45         LocalOffered  bool
46 }
47
48 type onDataChannelOpen func(_ datachannel.ReadWriteCloser, dcc DataChannelContext)
49
50 func NewClient(peerId, infoHash [20]byte, onConn onDataChannelOpen, logger log.Logger) *Client {
51         return &Client{
52                 outboundOffers: make(map[string]outboundOffer),
53                 peerIDBinary:   binaryToJsonString(peerId[:]),
54                 infoHashBinary: binaryToJsonString(infoHash[:]),
55                 onConn:         onConn,
56                 logger:         logger,
57         }
58 }
59
60 func (c *Client) Run(ar tracker.AnnounceRequest, url string) error {
61         t, _, err := websocket.DefaultDialer.Dial(url, nil)
62         if err != nil {
63                 return fmt.Errorf("failed to dial tracker: %w", err)
64         }
65         defer t.Close()
66         c.logger.WithValues(log.Info).Printf("dialed tracker %q", url)
67         c.tracker = t
68
69         go func() {
70                 err := c.announce(ar)
71                 if err != nil {
72                         c.logger.WithValues(log.Error).Printf("error announcing: %v", err)
73                 }
74         }()
75         return c.trackerReadLoop()
76 }
77
78 func (c *Client) announce(request tracker.AnnounceRequest) error {
79         transport, offer, err := NewTransport()
80         if err != nil {
81                 return fmt.Errorf("failed to create transport: %w", err)
82         }
83
84         randOfferID, err := buffer.RandomBytes(20)
85         if err != nil {
86                 return fmt.Errorf("failed to generate bytes: %w", err)
87         }
88         offerIDBinary := randOfferID.ToStringLatin1()
89
90         c.lock.Lock()
91         c.outboundOffers[offerIDBinary] = outboundOffer{
92                 transport:     transport,
93                 originalOffer: offer,
94         }
95         c.lock.Unlock()
96
97         req := AnnounceRequest{
98                 Numwant:    1, // If higher we need to create equal amount of offers
99                 Uploaded:   0,
100                 Downloaded: 0,
101                 Left:       request.Left,
102                 Event:      "started",
103                 Action:     "announce",
104                 InfoHash:   c.infoHashBinary,
105                 PeerID:     c.peerIDBinary,
106                 Offers: []Offer{{
107                         OfferID: offerIDBinary,
108                         Offer:   offer,
109                 }},
110         }
111
112         data, err := json.Marshal(req)
113         if err != nil {
114                 return fmt.Errorf("failed to marshal request: %w", err)
115         }
116         c.lock.Lock()
117         tracker := c.tracker
118         err = tracker.WriteMessage(websocket.TextMessage, data)
119         if err != nil {
120                 return fmt.Errorf("write AnnounceRequest: %w", err)
121                 c.lock.Unlock()
122         }
123         c.lock.Unlock()
124         return nil
125 }
126
127 func (c *Client) trackerReadLoop() error {
128
129         c.lock.Lock()
130         tracker := c.tracker
131         c.lock.Unlock()
132         for {
133                 _, message, err := tracker.ReadMessage()
134                 if err != nil {
135                         return fmt.Errorf("read error: %w", err)
136                 }
137                 c.logger.WithValues(log.Debug).Printf("received message from tracker: %q", message)
138
139                 var ar AnnounceResponse
140                 if err := json.Unmarshal(message, &ar); err != nil {
141                         log.Printf("error unmarshaling announce response: %v", err)
142                         continue
143                 }
144                 if ar.InfoHash != c.infoHashBinary {
145                         log.Printf("announce response for different hash: expected %q got %q", c.infoHashBinary, ar.InfoHash)
146                         continue
147                 }
148                 switch {
149                 case ar.Offer != nil:
150                         _, answer, err := NewTransportFromOffer(*ar.Offer, c.onConn, ar.OfferID)
151                         if err != nil {
152                                 return fmt.Errorf("write AnnounceResponse: %w", err)
153                         }
154
155                         req := AnnounceResponse{
156                                 Action:   "announce",
157                                 InfoHash: c.infoHashBinary,
158                                 PeerID:   c.peerIDBinary,
159                                 ToPeerID: ar.PeerID,
160                                 Answer:   &answer,
161                                 OfferID:  ar.OfferID,
162                         }
163                         data, err := json.Marshal(req)
164                         if err != nil {
165                                 return fmt.Errorf("failed to marshal request: %w", err)
166                         }
167
168                         c.lock.Lock()
169                         err = tracker.WriteMessage(websocket.TextMessage, data)
170                         if err != nil {
171                                 return fmt.Errorf("write AnnounceResponse: %w", err)
172                                 c.lock.Unlock()
173                         }
174                         c.lock.Unlock()
175                 case ar.Answer != nil:
176                         c.lock.Lock()
177                         offer, ok := c.outboundOffers[ar.OfferID]
178                         c.lock.Unlock()
179                         if !ok {
180                                 c.logger.WithValues(log.Warning).Printf("could not find offer for id %q", ar.OfferID)
181                                 continue
182                         }
183                         c.logger.Printf("offer %q got answer %v", ar.OfferID, *ar.Answer)
184                         err = offer.transport.SetAnswer(*ar.Answer, func(dc datachannel.ReadWriteCloser) {
185                                 c.onConn(dc, DataChannelContext{
186                                         Local:        offer.originalOffer,
187                                         Remote:       *ar.Answer,
188                                         OfferId:      ar.OfferID,
189                                         LocalOffered: true,
190                                 })
191                         })
192                         if err != nil {
193                                 return fmt.Errorf("failed to sent answer: %w", err)
194                         }
195                 }
196         }
197 }
198
199 type AnnounceRequest struct {
200         Numwant    int     `json:"numwant"`
201         Uploaded   int     `json:"uploaded"`
202         Downloaded int     `json:"downloaded"`
203         Left       int64   `json:"left"`
204         Event      string  `json:"event"`
205         Action     string  `json:"action"`
206         InfoHash   string  `json:"info_hash"`
207         PeerID     string  `json:"peer_id"`
208         Offers     []Offer `json:"offers"`
209 }
210
211 type Offer struct {
212         OfferID string                    `json:"offer_id"`
213         Offer   webrtc.SessionDescription `json:"offer"`
214 }
215
216 type AnnounceResponse struct {
217         InfoHash   string                     `json:"info_hash"`
218         Action     string                     `json:"action"`
219         Interval   *int                       `json:"interval,omitempty"`
220         Complete   *int                       `json:"complete,omitempty"`
221         Incomplete *int                       `json:"incomplete,omitempty"`
222         PeerID     string                     `json:"peer_id,omitempty"`
223         ToPeerID   string                     `json:"to_peer_id,omitempty"`
224         Answer     *webrtc.SessionDescription `json:"answer,omitempty"`
225         Offer      *webrtc.SessionDescription `json:"offer,omitempty"`
226         OfferID    string                     `json:"offer_id,omitempty"`
227 }