8 "github.com/anacrolix/generics"
9 "go.opentelemetry.io/otel/trace"
13 "github.com/anacrolix/log"
15 "github.com/anacrolix/torrent/tracker"
16 "github.com/gorilla/websocket"
17 "github.com/pion/datachannel"
18 "github.com/pion/webrtc/v3"
21 type TrackerClientStats struct {
23 ConvertedInboundConns int64
24 ConvertedOutboundConns int64
27 // Client represents the webtorrent client
28 type TrackerClient struct {
30 GetAnnounceRequest func(_ tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error)
32 OnConn onDataChannelOpen
34 Dialer *websocket.Dialer
38 outboundOffers map[string]outboundOfferValue // OfferID to outboundOfferValue
39 wsConn *websocket.Conn
41 stats TrackerClientStats
42 pingTicker *time.Ticker
45 func (me *TrackerClient) Stats() TrackerClientStats {
51 func (me *TrackerClient) peerIdBinary() string {
52 return binaryToJsonString(me.PeerId[:])
55 type outboundOffer struct {
60 // outboundOfferValue represents an outstanding offer.
61 type outboundOfferValue struct {
62 originalOffer webrtc.SessionDescription
63 peerConnection *wrappedPeerConnection
65 dataChannel *webrtc.DataChannel
68 type DataChannelContext struct {
72 // This is private as some methods might not be appropriate with data channel context.
73 peerConnection *wrappedPeerConnection
75 Context context.Context
78 func (me *DataChannelContext) GetSelectedIceCandidatePair() (*webrtc.ICECandidatePair, error) {
79 return me.peerConnection.SCTP().Transport().ICETransport().GetSelectedCandidatePair()
82 type onDataChannelOpen func(_ datachannel.ReadWriteCloser, dcc DataChannelContext)
84 func (tc *TrackerClient) doWebsocket() error {
85 metrics.Add("websocket dials", 1)
89 c, _, err := tc.Dialer.Dial(tc.Url, nil)
91 return fmt.Errorf("dialing tracker: %w", err)
94 tc.Logger.WithDefaultLevel(log.Info).Printf("connected")
100 closeChan := make(chan struct{})
104 case <-tc.pingTicker.C:
106 err := c.WriteMessage(websocket.PingMessage, []byte{})
117 err = tc.trackerReadLoop(tc.wsConn)
125 // Finishes initialization and spawns the run routine, calling onStop when it completes with the
126 // result. We don't let the caller just spawn the runner directly, since then we can race against
127 // .Close to finish initialization.
128 func (tc *TrackerClient) Start(onStop func(error)) {
129 tc.pingTicker = time.NewTicker(60 * time.Second)
136 func (tc *TrackerClient) run() error {
140 err := tc.doWebsocket()
147 tc.Logger.WithDefaultLevel(level).Printf("websocket instance ended: %v", err)
148 time.Sleep(time.Minute)
155 func (tc *TrackerClient) Close() error {
158 if tc.wsConn != nil {
161 tc.closeUnusedOffers()
168 func (tc *TrackerClient) announceOffers() {
169 // tc.Announce grabs a lock on tc.outboundOffers. It also handles the case where outboundOffers
170 // is nil. Take ownership of outboundOffers here.
172 offers := tc.outboundOffers
173 tc.outboundOffers = nil
180 // Iterate over our locally-owned offers, close any existing "invalid" ones from before the
181 // socket reconnected, reannounce the infohash, adding it back into the tc.outboundOffers.
182 tc.Logger.WithDefaultLevel(log.Info).Printf("reannouncing %d infohashes after restart", len(offers))
183 for _, offer := range offers {
184 // TODO: Capture the errors? Are we even in a position to do anything with them?
185 offer.peerConnection.Close()
186 // Use goroutine here to allow read loop to start and ensure the buffer drains.
187 go tc.Announce(tracker.Started, offer.infoHash)
191 func (tc *TrackerClient) closeUnusedOffers() {
192 for _, offer := range tc.outboundOffers {
193 offer.peerConnection.Close()
194 offer.dataChannel.Close()
196 tc.outboundOffers = nil
199 func (tc *TrackerClient) CloseOffersForInfohash(infoHash [20]byte) {
202 for key, offer := range tc.outboundOffers {
203 if offer.infoHash == infoHash {
204 offer.peerConnection.Close()
205 delete(tc.outboundOffers, key)
210 func (tc *TrackerClient) Announce(event tracker.AnnounceEvent, infoHash [20]byte) error {
211 metrics.Add("outbound announces", 1)
212 if event == tracker.Stopped {
213 return tc.announce(event, infoHash, nil)
215 var randOfferId [20]byte
216 _, err := rand.Read(randOfferId[:])
218 return fmt.Errorf("generating offer_id bytes: %w", err)
220 offerIDBinary := binaryToJsonString(randOfferId[:])
222 pc, dc, offer, err := tc.newOffer(tc.Logger, offerIDBinary, infoHash)
224 return fmt.Errorf("creating offer: %w", err)
227 err = tc.announce(event, infoHash, []outboundOffer{{
228 offerId: offerIDBinary,
229 outboundOfferValue: outboundOfferValue{
230 originalOffer: offer,
243 func (tc *TrackerClient) announce(event tracker.AnnounceEvent, infoHash [20]byte, offers []outboundOffer) error {
244 request, err := tc.GetAnnounceRequest(event, infoHash)
246 return fmt.Errorf("getting announce parameters: %w", err)
249 req := AnnounceRequest{
250 Numwant: len(offers),
251 Uploaded: request.Uploaded,
252 Downloaded: request.Downloaded,
254 Event: request.Event.String(),
256 InfoHash: binaryToJsonString(infoHash[:]),
257 PeerID: tc.peerIdBinary(),
259 for _, offer := range offers {
260 req.Offers = append(req.Offers, Offer{
261 OfferID: offer.offerId,
262 Offer: offer.originalOffer,
266 data, err := json.Marshal(req)
268 return fmt.Errorf("marshalling request: %w", err)
273 err = tc.writeMessage(data)
275 return fmt.Errorf("write AnnounceRequest: %w", err)
277 for _, offer := range offers {
278 generics.MakeMapIfNilAndSet(&tc.outboundOffers, offer.offerId, offer.outboundOfferValue)
283 func (tc *TrackerClient) writeMessage(data []byte) error {
284 for tc.wsConn == nil {
286 return fmt.Errorf("%T closed", tc)
290 return tc.wsConn.WriteMessage(websocket.TextMessage, data)
293 func (tc *TrackerClient) trackerReadLoop(tracker *websocket.Conn) error {
295 _, message, err := tracker.ReadMessage()
297 return fmt.Errorf("read message error: %w", err)
299 // tc.Logger.WithDefaultLevel(log.Debug).Printf("received message from tracker: %q", message)
301 var ar AnnounceResponse
302 if err := json.Unmarshal(message, &ar); err != nil {
303 tc.Logger.WithDefaultLevel(log.Warning).Printf("error unmarshalling announce response: %v", err)
307 case ar.Offer != nil:
308 ih, err := jsonStringToInfoHash(ar.InfoHash)
310 tc.Logger.WithDefaultLevel(log.Warning).Printf("error decoding info_hash in offer: %v", err)
313 err = tc.handleOffer(offerContext{
319 tc.Logger.Levelf(log.Error, "handling offer for infohash %x: %v", ih, err)
321 case ar.Answer != nil:
322 tc.handleAnswer(ar.OfferID, *ar.Answer)
324 tc.Logger.Levelf(log.Warning, "unhandled announce response %q", message)
329 type offerContext struct {
330 SessDesc webrtc.SessionDescription
335 func (tc *TrackerClient) handleOffer(
336 offerContext offerContext,
339 peerConnection, answer, err := tc.newAnsweringPeerConnection(offerContext)
341 return fmt.Errorf("creating answering peer connection: %w", err)
343 response := AnnounceResponse{
345 InfoHash: binaryToJsonString(offerContext.InfoHash[:]),
346 PeerID: tc.peerIdBinary(),
349 OfferID: offerContext.Id,
351 data, err := json.Marshal(response)
353 peerConnection.Close()
354 return fmt.Errorf("marshalling response: %w", err)
358 if err := tc.writeMessage(data); err != nil {
359 peerConnection.Close()
360 return fmt.Errorf("writing response: %w", err)
365 func (tc *TrackerClient) handleAnswer(offerId string, answer webrtc.SessionDescription) {
368 offer, ok := tc.outboundOffers[offerId]
370 tc.Logger.WithDefaultLevel(log.Warning).Printf("could not find offer for id %+q", offerId)
373 // tc.Logger.WithDefaultLevel(log.Debug).Printf("offer %q got answer %v", offerId, answer)
374 metrics.Add("outbound offers answered", 1)
375 err := offer.peerConnection.SetRemoteDescription(answer)
377 err = fmt.Errorf("using outbound offer answer: %w", err)
378 offer.peerConnection.span.RecordError(err)
379 tc.Logger.LevelPrint(log.Error, err)
382 delete(tc.outboundOffers, offerId)
383 go tc.Announce(tracker.None, offer.infoHash)