10 "github.com/anacrolix/log"
12 "github.com/anacrolix/torrent/tracker"
13 "github.com/gorilla/websocket"
14 "github.com/pion/datachannel"
15 "github.com/pion/webrtc/v2"
18 // Client represents the webtorrent client
19 type TrackerClient struct {
21 GetAnnounceRequest func(tracker.AnnounceEvent) tracker.AnnounceRequest
24 OnConn onDataChannelOpen
28 outboundOffers map[string]outboundOffer // OfferID to outboundOffer
29 wsConn *websocket.Conn
32 func (me *TrackerClient) peerIdBinary() string {
33 return binaryToJsonString(me.PeerId[:])
36 func (me *TrackerClient) infoHashBinary() string {
37 return binaryToJsonString(me.InfoHash[:])
40 // outboundOffer represents an outstanding offer.
41 type outboundOffer struct {
42 originalOffer webrtc.SessionDescription
43 peerConnection wrappedPeerConnection
44 dataChannel *webrtc.DataChannel
47 type DataChannelContext struct {
48 Local, Remote webrtc.SessionDescription
53 type onDataChannelOpen func(_ datachannel.ReadWriteCloser, dcc DataChannelContext)
55 func (tc *TrackerClient) doWebsocket() error {
56 c, _, err := websocket.DefaultDialer.Dial(tc.Url, nil)
58 return fmt.Errorf("dialing tracker: %w", err)
61 tc.Logger.WithDefaultLevel(log.Debug).Printf("dialed tracker %q", tc.Url)
64 err := tc.announce(tracker.Started)
66 tc.Logger.WithDefaultLevel(log.Error).Printf("error in initial announce: %v", err)
69 err = tc.trackerReadLoop(tc.wsConn)
71 tc.closeUnusedOffers()
76 func (tc *TrackerClient) Run() error {
78 err := tc.doWebsocket()
79 tc.Logger.Printf("websocket instance ended: %v", err)
80 time.Sleep(time.Minute)
84 func (tc *TrackerClient) closeUnusedOffers() {
85 for _, offer := range tc.outboundOffers {
86 offer.peerConnection.Close()
88 tc.outboundOffers = nil
91 func (tc *TrackerClient) announce(event tracker.AnnounceEvent) error {
92 var randOfferId [20]byte
93 _, err := rand.Read(randOfferId[:])
95 return fmt.Errorf("generating offer_id bytes: %w", err)
97 offerIDBinary := binaryToJsonString(randOfferId[:])
99 pc, dc, offer, err := newOffer()
101 return fmt.Errorf("creating offer: %w", err)
104 request := tc.GetAnnounceRequest(event)
106 req := AnnounceRequest{
107 Numwant: 1, // If higher we need to create equal amount of offers.
108 Uploaded: request.Uploaded,
109 Downloaded: request.Downloaded,
111 Event: request.Event.String(),
113 InfoHash: tc.infoHashBinary(),
114 PeerID: tc.peerIdBinary(),
116 OfferID: offerIDBinary,
121 data, err := json.Marshal(req)
123 return fmt.Errorf("marshalling request: %w", err)
127 defer tc.lock.Unlock()
129 err = tc.wsConn.WriteMessage(websocket.TextMessage, data)
132 return fmt.Errorf("write AnnounceRequest: %w", err)
134 if tc.outboundOffers == nil {
135 tc.outboundOffers = make(map[string]outboundOffer)
137 tc.outboundOffers[offerIDBinary] = outboundOffer{
140 originalOffer: offer,
145 func (tc *TrackerClient) trackerReadLoop(tracker *websocket.Conn) error {
147 _, message, err := tracker.ReadMessage()
149 return fmt.Errorf("read message error: %w", err)
151 tc.Logger.WithDefaultLevel(log.Debug).Printf("received message from tracker: %q", message)
153 var ar AnnounceResponse
154 if err := json.Unmarshal(message, &ar); err != nil {
155 tc.Logger.WithDefaultLevel(log.Warning).Printf("error unmarshalling announce response: %v", err)
158 if ar.InfoHash != tc.infoHashBinary() {
159 tc.Logger.Printf("announce response for different hash: expected %q got %q", tc.infoHashBinary(), ar.InfoHash)
163 case ar.Offer != nil:
164 answer, err := getAnswerForOffer(*ar.Offer, tc.OnConn, ar.OfferID)
166 return fmt.Errorf("write AnnounceResponse: %w", err)
169 req := AnnounceResponse{
171 InfoHash: tc.infoHashBinary(),
172 PeerID: tc.peerIdBinary(),
177 data, err := json.Marshal(req)
179 return fmt.Errorf("failed to marshal request: %w", err)
183 err = tracker.WriteMessage(websocket.TextMessage, data)
185 return fmt.Errorf("write AnnounceResponse: %w", err)
189 case ar.Answer != nil:
190 tc.handleAnswer(ar.OfferID, *ar.Answer)
195 func (tc *TrackerClient) handleAnswer(offerId string, answer webrtc.SessionDescription) {
197 defer tc.lock.Unlock()
198 offer, ok := tc.outboundOffers[offerId]
200 tc.Logger.WithDefaultLevel(log.Warning).Printf("could not find offer for id %q", offerId)
203 tc.Logger.Printf("offer %q got answer %v", offerId, answer)
204 err := offer.setAnswer(answer, func(dc datachannel.ReadWriteCloser) {
205 tc.OnConn(dc, DataChannelContext{
206 Local: offer.originalOffer,
213 tc.Logger.WithDefaultLevel(log.Warning).Printf("error using outbound offer answer: %v", err)
216 delete(tc.outboundOffers, offerId)
217 go tc.announce(tracker.None)