]> Sergey Matveev's repositories - btrtrc.git/blob - webtorrent/client.go
d4dd604954caf6fc690cd8e0163196bb1c814ef8
[btrtrc.git] / webtorrent / client.go
1 package webtorrent
2
3 import (
4         "encoding/json"
5         "fmt"
6         "sync"
7
8         "github.com/anacrolix/log"
9
10         "github.com/anacrolix/torrent/tracker"
11         "github.com/anacrolix/torrent/webtorrent/buffer"
12         "github.com/gorilla/websocket"
13         "github.com/pion/datachannel"
14         "github.com/pion/webrtc/v2"
15 )
16
17 const (
18         trackerURL = `wss://tracker.openwebtorrent.com/` // For simplicity
19 )
20
21 // Client represents the webtorrent client
22 type Client struct {
23         lock           sync.Mutex
24         peerIDBinary   string
25         infoHashBinary string
26         offeredPeers   map[string]Peer // OfferID to Peer
27         tracker        *websocket.Conn
28         onConn         func(_ datachannel.ReadWriteCloser, initiatedLocally bool)
29 }
30
31 // Peer represents a remote peer
32 type Peer struct {
33         peerID    string
34         transport *Transport
35 }
36
37 func binaryToJsonString(b []byte) string {
38         var seq []rune
39         for _, v := range b {
40                 seq = append(seq, rune(v))
41         }
42         return string(seq)
43 }
44
45 type onDataChannelOpen func(_ datachannel.ReadWriteCloser, initiatedLocally bool)
46
47 func NewClient(peerId, infoHash [20]byte, onConn onDataChannelOpen) *Client {
48         return &Client{
49                 offeredPeers:   make(map[string]Peer),
50                 peerIDBinary:   binaryToJsonString(peerId[:]),
51                 infoHashBinary: binaryToJsonString(infoHash[:]),
52                 onConn:         onConn,
53         }
54 }
55
56 func (c *Client) Run(ar tracker.AnnounceRequest) error {
57         t, _, err := websocket.DefaultDialer.Dial(trackerURL, nil)
58         if err != nil {
59                 return fmt.Errorf("failed to dial tracker: %v", err)
60         }
61         defer t.Close()
62         c.tracker = t
63
64         go c.announce(ar)
65         c.trackerReadLoop()
66
67         return nil
68 }
69
70 func (c *Client) announce(request tracker.AnnounceRequest) error {
71         transpot, offer, err := NewTransport()
72         if err != nil {
73                 return fmt.Errorf("failed to create transport: %w", err)
74         }
75
76         randOfferID, err := buffer.RandomBytes(20)
77         if err != nil {
78                 return fmt.Errorf("failed to generate bytes: %w", err)
79         }
80         // OfferID := randOfferID.ToStringHex()
81         offerIDBinary := randOfferID.ToStringLatin1()
82
83         c.lock.Lock()
84         c.offeredPeers[offerIDBinary] = Peer{transport: transpot}
85         c.lock.Unlock()
86
87         req := AnnounceRequest{
88                 Numwant:    1, // If higher we need to create equal amount of offers
89                 Uploaded:   0,
90                 Downloaded: 0,
91                 Left:       request.Left,
92                 Event:      "started",
93                 Action:     "announce",
94                 InfoHash:   c.infoHashBinary,
95                 PeerID:     c.peerIDBinary,
96                 Offers: []Offer{{
97                         OfferID: offerIDBinary,
98                         Offer:   offer,
99                 }},
100         }
101
102         data, err := json.Marshal(req)
103         if err != nil {
104                 return fmt.Errorf("failed to marshal request: %w", err)
105         }
106         c.lock.Lock()
107         tracker := c.tracker
108         err = tracker.WriteMessage(websocket.TextMessage, data)
109         if err != nil {
110                 return fmt.Errorf("write AnnounceRequest: %w", err)
111                 c.lock.Unlock()
112         }
113         c.lock.Unlock()
114         return nil
115 }
116
117 func (c *Client) trackerReadLoop() error {
118
119         c.lock.Lock()
120         tracker := c.tracker
121         c.lock.Unlock()
122         for {
123                 _, message, err := tracker.ReadMessage()
124                 if err != nil {
125                         return fmt.Errorf("read error: %w", err)
126                 }
127                 log.Printf("recv: %q", message)
128
129                 var ar AnnounceResponse
130                 if err := json.Unmarshal(message, &ar); err != nil {
131                         log.Printf("error unmarshaling announce response: %v", err)
132                         continue
133                 }
134                 if ar.InfoHash != c.infoHashBinary {
135                         log.Printf("announce response for different hash: expected %q got %q", c.infoHashBinary, ar.InfoHash)
136                         continue
137                 }
138                 switch {
139                 case ar.Offer != nil:
140                         t, answer, err := NewTransportFromOffer(*ar.Offer, func(dc datachannel.ReadWriteCloser) {
141                                 c.onConn(dc, false)
142                         })
143                         if err != nil {
144                                 return fmt.Errorf("write AnnounceResponse: %w", err)
145                         }
146
147                         req := AnnounceResponse{
148                                 Action:   "announce",
149                                 InfoHash: c.infoHashBinary,
150                                 PeerID:   c.peerIDBinary,
151                                 ToPeerID: ar.PeerID,
152                                 Answer:   &answer,
153                                 OfferID:  ar.OfferID,
154                         }
155                         data, err := json.Marshal(req)
156                         if err != nil {
157                                 return fmt.Errorf("failed to marshal request: %w", err)
158                         }
159
160                         c.lock.Lock()
161                         err = tracker.WriteMessage(websocket.TextMessage, data)
162                         if err != nil {
163                                 return fmt.Errorf("write AnnounceResponse: %w", err)
164                                 c.lock.Unlock()
165                         }
166                         c.lock.Unlock()
167
168                         // Do something with the peer
169                         _ = Peer{peerID: ar.PeerID, transport: t}
170                 case ar.Answer != nil:
171                         c.lock.Lock()
172                         peer, ok := c.offeredPeers[ar.OfferID]
173                         c.lock.Unlock()
174                         if !ok {
175                                 log.Printf("could not find peer for offer %q", ar.OfferID)
176                                 continue
177                         }
178                         log.Printf("offer %q got answer %v", ar.OfferID, *ar.Answer)
179                         err = peer.transport.SetAnswer(*ar.Answer, func(dc datachannel.ReadWriteCloser) {
180                                 c.onConn(dc, true)
181                         })
182                         if err != nil {
183                                 return fmt.Errorf("failed to sent answer: %v", err)
184                         }
185                 }
186         }
187 }
188
189 type AnnounceRequest struct {
190         Numwant    int     `json:"numwant"`
191         Uploaded   int     `json:"uploaded"`
192         Downloaded int     `json:"downloaded"`
193         Left       int64   `json:"left"`
194         Event      string  `json:"event"`
195         Action     string  `json:"action"`
196         InfoHash   string  `json:"info_hash"`
197         PeerID     string  `json:"peer_id"`
198         Offers     []Offer `json:"offers"`
199 }
200
201 type Offer struct {
202         OfferID string                    `json:"offer_id"`
203         Offer   webrtc.SessionDescription `json:"offer"`
204 }
205
206 type AnnounceResponse struct {
207         InfoHash   string                     `json:"info_hash"`
208         Action     string                     `json:"action"`
209         Interval   *int                       `json:"interval,omitempty"`
210         Complete   *int                       `json:"complete,omitempty"`
211         Incomplete *int                       `json:"incomplete,omitempty"`
212         PeerID     string                     `json:"peer_id,omitempty"`
213         ToPeerID   string                     `json:"to_peer_id,omitempty"`
214         Answer     *webrtc.SessionDescription `json:"answer,omitempty"`
215         Offer      *webrtc.SessionDescription `json:"offer,omitempty"`
216         OfferID    string                     `json:"offer_id,omitempty"`
217 }