10 "github.com/anacrolix/log"
12 "github.com/anacrolix/torrent/tracker"
13 "github.com/gorilla/websocket"
14 "github.com/pion/datachannel"
15 "github.com/pion/webrtc/v3"
18 type TrackerClientStats struct {
20 ConvertedInboundConns int64
21 ConvertedOutboundConns int64
24 // Client represents the webtorrent client
25 type TrackerClient struct {
27 GetAnnounceRequest func(_ tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error)
29 OnConn onDataChannelOpen
31 Dialer *websocket.Dialer
35 outboundOffers map[string]outboundOffer // OfferID to outboundOffer
36 wsConn *websocket.Conn
38 stats TrackerClientStats
39 pingTicker *time.Ticker
42 func (me *TrackerClient) Stats() TrackerClientStats {
48 func (me *TrackerClient) peerIdBinary() string {
49 return binaryToJsonString(me.PeerId[:])
52 // outboundOffer represents an outstanding offer.
53 type outboundOffer struct {
54 originalOffer webrtc.SessionDescription
55 peerConnection *wrappedPeerConnection
56 dataChannel *webrtc.DataChannel
60 type DataChannelContext struct {
61 // Can these be obtained by just calling the relevant methods on peerConnection?
62 Local, Remote webrtc.SessionDescription
66 // This is private as some methods might not be appropriate with data channel context.
67 peerConnection *wrappedPeerConnection
70 func (me *DataChannelContext) GetSelectedIceCandidatePair() (*webrtc.ICECandidatePair, error) {
71 return me.peerConnection.SCTP().Transport().ICETransport().GetSelectedCandidatePair()
74 type onDataChannelOpen func(_ datachannel.ReadWriteCloser, dcc DataChannelContext)
76 func (tc *TrackerClient) doWebsocket() error {
77 metrics.Add("websocket dials", 1)
81 c, _, err := tc.Dialer.Dial(tc.Url, nil)
83 return fmt.Errorf("dialing tracker: %w", err)
86 tc.Logger.WithDefaultLevel(log.Info).Printf("connected")
92 closeChan := make(chan struct{})
96 case <-tc.pingTicker.C:
98 err := c.WriteMessage(websocket.PingMessage, []byte{})
109 err = tc.trackerReadLoop(tc.wsConn)
117 // Finishes initialization and spawns the run routine, calling onStop when it completes with the
118 // result. We don't let the caller just spawn the runner directly, since then we can race against
119 // .Close to finish initialization.
120 func (tc *TrackerClient) Start(onStop func(error)) {
121 tc.pingTicker = time.NewTicker(60 * time.Second)
128 func (tc *TrackerClient) run() error {
132 err := tc.doWebsocket()
139 tc.Logger.WithDefaultLevel(level).Printf("websocket instance ended: %v", err)
140 time.Sleep(time.Minute)
147 func (tc *TrackerClient) Close() error {
150 if tc.wsConn != nil {
153 tc.closeUnusedOffers()
160 func (tc *TrackerClient) announceOffers() {
161 // tc.Announce grabs a lock on tc.outboundOffers. It also handles the case where outboundOffers
162 // is nil. Take ownership of outboundOffers here.
164 offers := tc.outboundOffers
165 tc.outboundOffers = nil
172 // Iterate over our locally-owned offers, close any existing "invalid" ones from before the
173 // socket reconnected, reannounce the infohash, adding it back into the tc.outboundOffers.
174 tc.Logger.WithDefaultLevel(log.Info).Printf("reannouncing %d infohashes after restart", len(offers))
175 for _, offer := range offers {
176 // TODO: Capture the errors? Are we even in a position to do anything with them?
177 offer.peerConnection.Close()
178 // Use goroutine here to allow read loop to start and ensure the buffer drains.
179 go tc.Announce(tracker.Started, offer.infoHash)
183 func (tc *TrackerClient) closeUnusedOffers() {
184 for _, offer := range tc.outboundOffers {
185 offer.peerConnection.Close()
187 tc.outboundOffers = nil
190 func (tc *TrackerClient) CloseOffersForInfohash(infoHash [20]byte) {
193 for key, offer := range tc.outboundOffers {
194 if offer.infoHash == infoHash {
195 offer.peerConnection.Close()
196 delete(tc.outboundOffers, key)
201 func (tc *TrackerClient) Announce(event tracker.AnnounceEvent, infoHash [20]byte) error {
202 metrics.Add("outbound announces", 1)
203 var randOfferId [20]byte
204 _, err := rand.Read(randOfferId[:])
206 return fmt.Errorf("generating offer_id bytes: %w", err)
208 offerIDBinary := binaryToJsonString(randOfferId[:])
210 pc, dc, offer, err := newOffer()
212 return fmt.Errorf("creating offer: %w", err)
215 request, err := tc.GetAnnounceRequest(event, infoHash)
218 return fmt.Errorf("getting announce parameters: %w", err)
221 req := AnnounceRequest{
222 Numwant: 1, // If higher we need to create equal amount of offers.
223 Uploaded: request.Uploaded,
224 Downloaded: request.Downloaded,
226 Event: request.Event.String(),
228 InfoHash: binaryToJsonString(infoHash[:]),
229 PeerID: tc.peerIdBinary(),
231 OfferID: offerIDBinary,
236 data, err := json.Marshal(req)
239 return fmt.Errorf("marshalling request: %w", err)
244 err = tc.writeMessage(data)
247 return fmt.Errorf("write AnnounceRequest: %w", err)
249 if tc.outboundOffers == nil {
250 tc.outboundOffers = make(map[string]outboundOffer)
252 tc.outboundOffers[offerIDBinary] = outboundOffer{
255 originalOffer: offer,
261 func (tc *TrackerClient) writeMessage(data []byte) error {
262 for tc.wsConn == nil {
264 return fmt.Errorf("%T closed", tc)
268 return tc.wsConn.WriteMessage(websocket.TextMessage, data)
271 func (tc *TrackerClient) trackerReadLoop(tracker *websocket.Conn) error {
273 _, message, err := tracker.ReadMessage()
275 return fmt.Errorf("read message error: %w", err)
277 // tc.Logger.WithDefaultLevel(log.Debug).Printf("received message from tracker: %q", message)
279 var ar AnnounceResponse
280 if err := json.Unmarshal(message, &ar); err != nil {
281 tc.Logger.WithDefaultLevel(log.Warning).Printf("error unmarshalling announce response: %v", err)
285 case ar.Offer != nil:
286 ih, err := jsonStringToInfoHash(ar.InfoHash)
288 tc.Logger.WithDefaultLevel(log.Warning).Printf("error decoding info_hash in offer: %v", err)
291 tc.handleOffer(*ar.Offer, ar.OfferID, ih, ar.PeerID)
292 case ar.Answer != nil:
293 tc.handleAnswer(ar.OfferID, *ar.Answer)
298 func (tc *TrackerClient) handleOffer(
299 offer webrtc.SessionDescription,
304 peerConnection, answer, err := newAnsweringPeerConnection(offer)
306 return fmt.Errorf("write AnnounceResponse: %w", err)
308 response := AnnounceResponse{
310 InfoHash: binaryToJsonString(infoHash[:]),
311 PeerID: tc.peerIdBinary(),
316 data, err := json.Marshal(response)
318 peerConnection.Close()
319 return fmt.Errorf("marshalling response: %w", err)
323 if err := tc.writeMessage(data); err != nil {
324 peerConnection.Close()
325 return fmt.Errorf("writing response: %w", err)
327 timer := time.AfterFunc(30*time.Second, func() {
328 metrics.Add("answering peer connections timed out", 1)
329 peerConnection.Close()
331 peerConnection.OnDataChannel(func(d *webrtc.DataChannel) {
332 setDataChannelOnOpen(d, peerConnection, func(dc datachannel.ReadWriteCloser) {
334 metrics.Add("answering peer connection conversions", 1)
336 tc.stats.ConvertedInboundConns++
338 tc.OnConn(dc, DataChannelContext{
344 peerConnection: peerConnection,
351 func (tc *TrackerClient) handleAnswer(offerId string, answer webrtc.SessionDescription) {
354 offer, ok := tc.outboundOffers[offerId]
356 tc.Logger.WithDefaultLevel(log.Warning).Printf("could not find offer for id %+q", offerId)
359 // tc.Logger.WithDefaultLevel(log.Debug).Printf("offer %q got answer %v", offerId, answer)
360 metrics.Add("outbound offers answered", 1)
361 err := offer.setAnswer(answer, func(dc datachannel.ReadWriteCloser) {
362 metrics.Add("outbound offers answered with datachannel", 1)
364 tc.stats.ConvertedOutboundConns++
366 tc.OnConn(dc, DataChannelContext{
367 Local: offer.originalOffer,
371 InfoHash: offer.infoHash,
372 peerConnection: offer.peerConnection,
376 tc.Logger.WithDefaultLevel(log.Warning).Printf("error using outbound offer answer: %v", err)
379 delete(tc.outboundOffers, offerId)
380 go tc.Announce(tracker.None, offer.infoHash)