10 "github.com/anacrolix/log"
12 "github.com/anacrolix/torrent/tracker"
13 "github.com/gorilla/websocket"
14 "github.com/pion/datachannel"
15 "github.com/pion/webrtc/v2"
18 type TrackerClientStats struct {
20 ConvertedInboundConns int64
21 ConvertedOutboundConns int64
24 // Client represents the webtorrent client
25 type TrackerClient struct {
27 GetAnnounceRequest func(_ tracker.AnnounceEvent, infoHash [20]byte) tracker.AnnounceRequest
29 OnConn onDataChannelOpen
34 outboundOffers map[string]outboundOffer // OfferID to outboundOffer
35 wsConn *websocket.Conn
37 stats TrackerClientStats
40 func (me *TrackerClient) Stats() TrackerClientStats {
46 func (me *TrackerClient) peerIdBinary() string {
47 return binaryToJsonString(me.PeerId[:])
50 // outboundOffer represents an outstanding offer.
51 type outboundOffer struct {
52 originalOffer webrtc.SessionDescription
53 peerConnection wrappedPeerConnection
54 dataChannel *webrtc.DataChannel
58 type DataChannelContext struct {
59 Local, Remote webrtc.SessionDescription
65 type onDataChannelOpen func(_ datachannel.ReadWriteCloser, dcc DataChannelContext)
67 func (tc *TrackerClient) doWebsocket() error {
68 metrics.Add("websocket dials", 1)
70 c, _, err := websocket.DefaultDialer.Dial(tc.Url, nil)
72 return fmt.Errorf("dialing tracker: %w", err)
75 tc.Logger.WithDefaultLevel(log.Debug).Printf("dialed tracker %q", tc.Url)
80 err = tc.trackerReadLoop(tc.wsConn)
82 tc.closeUnusedOffers()
88 func (tc *TrackerClient) Run() error {
93 err := tc.doWebsocket()
94 tc.Logger.WithDefaultLevel(log.Warning).Printf("websocket instance ended: %v", err)
95 time.Sleep(time.Minute)
102 func (tc *TrackerClient) Close() error {
105 if tc.wsConn != nil {
113 func (tc *TrackerClient) closeUnusedOffers() {
114 for _, offer := range tc.outboundOffers {
115 offer.peerConnection.Close()
117 tc.outboundOffers = nil
120 func (tc *TrackerClient) Announce(event tracker.AnnounceEvent, infoHash [20]byte) error {
121 metrics.Add("outbound announces", 1)
122 var randOfferId [20]byte
123 _, err := rand.Read(randOfferId[:])
125 return fmt.Errorf("generating offer_id bytes: %w", err)
127 offerIDBinary := binaryToJsonString(randOfferId[:])
129 pc, dc, offer, err := newOffer()
131 return fmt.Errorf("creating offer: %w", err)
134 request := tc.GetAnnounceRequest(event, infoHash)
136 req := AnnounceRequest{
137 Numwant: 1, // If higher we need to create equal amount of offers.
138 Uploaded: request.Uploaded,
139 Downloaded: request.Downloaded,
141 Event: request.Event.String(),
143 InfoHash: binaryToJsonString(infoHash[:]),
144 PeerID: tc.peerIdBinary(),
146 OfferID: offerIDBinary,
151 data, err := json.Marshal(req)
153 return fmt.Errorf("marshalling request: %w", err)
158 err = tc.writeMessage(data)
161 return fmt.Errorf("write AnnounceRequest: %w", err)
163 if tc.outboundOffers == nil {
164 tc.outboundOffers = make(map[string]outboundOffer)
166 tc.outboundOffers[offerIDBinary] = outboundOffer{
169 originalOffer: offer,
175 func (tc *TrackerClient) writeMessage(data []byte) error {
176 for tc.wsConn == nil {
178 return fmt.Errorf("%T closed", tc)
182 return tc.wsConn.WriteMessage(websocket.TextMessage, data)
185 func (tc *TrackerClient) trackerReadLoop(tracker *websocket.Conn) error {
187 _, message, err := tracker.ReadMessage()
189 return fmt.Errorf("read message error: %w", err)
191 //tc.Logger.WithDefaultLevel(log.Debug).Printf("received message from tracker: %q", message)
193 var ar AnnounceResponse
194 if err := json.Unmarshal(message, &ar); err != nil {
195 tc.Logger.WithDefaultLevel(log.Warning).Printf("error unmarshalling announce response: %v", err)
199 case ar.Offer != nil:
200 ih, err := jsonStringToInfoHash(ar.InfoHash)
202 tc.Logger.WithDefaultLevel(log.Warning).Printf("error decoding info_hash in offer: %v", err)
205 tc.handleOffer(*ar.Offer, ar.OfferID, ih, ar.PeerID)
206 case ar.Answer != nil:
207 tc.handleAnswer(ar.OfferID, *ar.Answer)
212 func (tc *TrackerClient) handleOffer(
213 offer webrtc.SessionDescription,
218 peerConnection, answer, err := newAnsweringPeerConnection(offer)
220 return fmt.Errorf("write AnnounceResponse: %w", err)
222 response := AnnounceResponse{
224 InfoHash: binaryToJsonString(infoHash[:]),
225 PeerID: tc.peerIdBinary(),
230 data, err := json.Marshal(response)
232 peerConnection.Close()
233 return fmt.Errorf("marshalling response: %w", err)
237 if err := tc.writeMessage(data); err != nil {
238 peerConnection.Close()
239 return fmt.Errorf("writing response: %w", err)
241 timer := time.AfterFunc(30*time.Second, func() {
242 metrics.Add("answering peer connections timed out", 1)
243 peerConnection.Close()
245 peerConnection.OnDataChannel(func(d *webrtc.DataChannel) {
246 setDataChannelOnOpen(d, peerConnection, func(dc datachannel.ReadWriteCloser) {
248 metrics.Add("answering peer connection conversions", 1)
250 tc.stats.ConvertedInboundConns++
252 tc.OnConn(dc, DataChannelContext{
264 func (tc *TrackerClient) handleAnswer(offerId string, answer webrtc.SessionDescription) {
267 offer, ok := tc.outboundOffers[offerId]
269 tc.Logger.WithDefaultLevel(log.Warning).Printf("could not find offer for id %q", offerId)
272 //tc.Logger.WithDefaultLevel(log.Debug).Printf("offer %q got answer %v", offerId, answer)
273 metrics.Add("outbound offers answered", 1)
274 err := offer.setAnswer(answer, func(dc datachannel.ReadWriteCloser) {
275 metrics.Add("outbound offers answered with datachannel", 1)
277 tc.stats.ConvertedOutboundConns++
279 tc.OnConn(dc, DataChannelContext{
280 Local: offer.originalOffer,
284 InfoHash: offer.infoHash,
288 tc.Logger.WithDefaultLevel(log.Warning).Printf("error using outbound offer answer: %v", err)
291 delete(tc.outboundOffers, offerId)
292 go tc.Announce(tracker.None, offer.infoHash)