10 "github.com/anacrolix/log"
12 "github.com/anacrolix/torrent/tracker"
13 "github.com/gorilla/websocket"
14 "github.com/pion/datachannel"
15 "github.com/pion/webrtc/v2"
18 // Client represents the webtorrent client
19 type TrackerClient struct {
21 GetAnnounceRequest func(_ tracker.AnnounceEvent, infoHash [20]byte) tracker.AnnounceRequest
23 OnConn onDataChannelOpen
28 outboundOffers map[string]outboundOffer // OfferID to outboundOffer
29 wsConn *websocket.Conn
33 func (me *TrackerClient) peerIdBinary() string {
34 return binaryToJsonString(me.PeerId[:])
37 // outboundOffer represents an outstanding offer.
38 type outboundOffer struct {
39 originalOffer webrtc.SessionDescription
40 peerConnection wrappedPeerConnection
41 dataChannel *webrtc.DataChannel
45 type DataChannelContext struct {
46 Local, Remote webrtc.SessionDescription
52 type onDataChannelOpen func(_ datachannel.ReadWriteCloser, dcc DataChannelContext)
54 func (tc *TrackerClient) doWebsocket() error {
55 metrics.Add("websocket dials", 1)
56 c, _, err := websocket.DefaultDialer.Dial(tc.Url, nil)
58 return fmt.Errorf("dialing tracker: %w", err)
61 tc.Logger.WithDefaultLevel(log.Debug).Printf("dialed tracker %q", tc.Url)
66 err = tc.trackerReadLoop(tc.wsConn)
68 tc.closeUnusedOffers()
74 func (tc *TrackerClient) Run() error {
79 err := tc.doWebsocket()
80 tc.Logger.WithDefaultLevel(log.Warning).Printf("websocket instance ended: %v", err)
81 time.Sleep(time.Minute)
88 func (tc *TrackerClient) Close() error {
99 func (tc *TrackerClient) closeUnusedOffers() {
100 for _, offer := range tc.outboundOffers {
101 offer.peerConnection.Close()
103 tc.outboundOffers = nil
106 func (tc *TrackerClient) Announce(event tracker.AnnounceEvent, infoHash [20]byte) error {
107 metrics.Add("outbound announces", 1)
108 var randOfferId [20]byte
109 _, err := rand.Read(randOfferId[:])
111 return fmt.Errorf("generating offer_id bytes: %w", err)
113 offerIDBinary := binaryToJsonString(randOfferId[:])
115 pc, dc, offer, err := newOffer()
117 return fmt.Errorf("creating offer: %w", err)
120 request := tc.GetAnnounceRequest(event, infoHash)
122 req := AnnounceRequest{
123 Numwant: 1, // If higher we need to create equal amount of offers.
124 Uploaded: request.Uploaded,
125 Downloaded: request.Downloaded,
127 Event: request.Event.String(),
129 InfoHash: binaryToJsonString(infoHash[:]),
130 PeerID: tc.peerIdBinary(),
132 OfferID: offerIDBinary,
137 data, err := json.Marshal(req)
139 return fmt.Errorf("marshalling request: %w", err)
144 err = tc.writeMessage(data)
147 return fmt.Errorf("write AnnounceRequest: %w", err)
149 if tc.outboundOffers == nil {
150 tc.outboundOffers = make(map[string]outboundOffer)
152 tc.outboundOffers[offerIDBinary] = outboundOffer{
155 originalOffer: offer,
161 func (tc *TrackerClient) writeMessage(data []byte) error {
162 for tc.wsConn == nil {
164 return fmt.Errorf("%T closed", tc)
168 return tc.wsConn.WriteMessage(websocket.TextMessage, data)
171 func (tc *TrackerClient) trackerReadLoop(tracker *websocket.Conn) error {
173 _, message, err := tracker.ReadMessage()
175 return fmt.Errorf("read message error: %w", err)
177 //tc.Logger.WithDefaultLevel(log.Debug).Printf("received message from tracker: %q", message)
179 var ar AnnounceResponse
180 if err := json.Unmarshal(message, &ar); err != nil {
181 tc.Logger.WithDefaultLevel(log.Warning).Printf("error unmarshalling announce response: %v", err)
185 case ar.Offer != nil:
186 ih, err := jsonStringToInfoHash(ar.InfoHash)
188 tc.Logger.WithDefaultLevel(log.Warning).Printf("error decoding info_hash in offer: %v", err)
191 tc.handleOffer(*ar.Offer, ar.OfferID, ih, ar.PeerID)
192 case ar.Answer != nil:
193 tc.handleAnswer(ar.OfferID, *ar.Answer)
198 func (tc *TrackerClient) handleOffer(
199 offer webrtc.SessionDescription,
204 peerConnection, answer, err := newAnsweringPeerConnection(offer)
206 return fmt.Errorf("write AnnounceResponse: %w", err)
208 response := AnnounceResponse{
210 InfoHash: binaryToJsonString(infoHash[:]),
211 PeerID: tc.peerIdBinary(),
216 data, err := json.Marshal(response)
218 peerConnection.Close()
219 return fmt.Errorf("marshalling response: %w", err)
223 if err := tc.writeMessage(data); err != nil {
224 peerConnection.Close()
225 return fmt.Errorf("writing response: %w", err)
227 timer := time.AfterFunc(30*time.Second, func() {
228 metrics.Add("answering peer connections timed out", 1)
229 peerConnection.Close()
231 peerConnection.OnDataChannel(func(d *webrtc.DataChannel) {
232 setDataChannelOnOpen(d, peerConnection, func(dc datachannel.ReadWriteCloser) {
234 metrics.Add("answering peer connection conversions", 1)
235 tc.OnConn(dc, DataChannelContext{
247 func (tc *TrackerClient) handleAnswer(offerId string, answer webrtc.SessionDescription) {
250 offer, ok := tc.outboundOffers[offerId]
252 tc.Logger.WithDefaultLevel(log.Warning).Printf("could not find offer for id %q", offerId)
255 //tc.Logger.WithDefaultLevel(log.Debug).Printf("offer %q got answer %v", offerId, answer)
256 metrics.Add("outbound offers answered", 1)
257 err := offer.setAnswer(answer, func(dc datachannel.ReadWriteCloser) {
258 metrics.Add("outbound offers answered with datachannel", 1)
259 tc.OnConn(dc, DataChannelContext{
260 Local: offer.originalOffer,
264 InfoHash: offer.infoHash,
268 tc.Logger.WithDefaultLevel(log.Warning).Printf("error using outbound offer answer: %v", err)
271 delete(tc.outboundOffers, offerId)
272 go tc.Announce(tracker.None, offer.infoHash)