12 g "github.com/anacrolix/generics"
13 "github.com/anacrolix/log"
14 "github.com/gorilla/websocket"
15 "github.com/pion/datachannel"
16 "github.com/pion/webrtc/v3"
17 "go.opentelemetry.io/otel/trace"
19 "github.com/anacrolix/torrent/tracker"
22 type TrackerClientStats struct {
24 ConvertedInboundConns int64
25 ConvertedOutboundConns int64
28 // Client represents the webtorrent client
29 type TrackerClient struct {
31 GetAnnounceRequest func(_ tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error)
33 OnConn onDataChannelOpen
35 Dialer *websocket.Dialer
39 outboundOffers map[string]outboundOfferValue // OfferID to outboundOfferValue
40 wsConn *websocket.Conn
42 stats TrackerClientStats
43 pingTicker *time.Ticker
45 WebsocketTrackerHttpHeader func() http.Header
48 func (me *TrackerClient) Stats() TrackerClientStats {
54 func (me *TrackerClient) peerIdBinary() string {
55 return binaryToJsonString(me.PeerId[:])
58 type outboundOffer struct {
63 // outboundOfferValue represents an outstanding offer.
64 type outboundOfferValue struct {
65 originalOffer webrtc.SessionDescription
66 peerConnection *wrappedPeerConnection
68 dataChannel *webrtc.DataChannel
71 type DataChannelContext struct {
75 // This is private as some methods might not be appropriate with data channel context.
76 peerConnection *wrappedPeerConnection
78 Context context.Context
81 func (me *DataChannelContext) GetSelectedIceCandidatePair() (*webrtc.ICECandidatePair, error) {
82 return me.peerConnection.SCTP().Transport().ICETransport().GetSelectedCandidatePair()
85 type onDataChannelOpen func(_ datachannel.ReadWriteCloser, dcc DataChannelContext)
87 func (tc *TrackerClient) doWebsocket() error {
88 metrics.Add("websocket dials", 1)
93 var header http.Header
94 if tc.WebsocketTrackerHttpHeader != nil {
95 header = tc.WebsocketTrackerHttpHeader()
98 c, _, err := tc.Dialer.Dial(tc.Url, header)
100 return fmt.Errorf("dialing tracker: %w", err)
103 tc.Logger.WithDefaultLevel(log.Info).Printf("connected")
109 closeChan := make(chan struct{})
113 case <-tc.pingTicker.C:
115 err := c.WriteMessage(websocket.PingMessage, []byte{})
126 err = tc.trackerReadLoop(tc.wsConn)
134 // Finishes initialization and spawns the run routine, calling onStop when it completes with the
135 // result. We don't let the caller just spawn the runner directly, since then we can race against
136 // .Close to finish initialization.
137 func (tc *TrackerClient) Start(onStop func(error)) {
138 tc.pingTicker = time.NewTicker(60 * time.Second)
145 func (tc *TrackerClient) run() error {
149 err := tc.doWebsocket()
156 tc.Logger.WithDefaultLevel(level).Printf("websocket instance ended: %v", err)
157 time.Sleep(time.Minute)
164 func (tc *TrackerClient) Close() error {
167 if tc.wsConn != nil {
170 tc.closeUnusedOffers()
177 func (tc *TrackerClient) announceOffers() {
178 // tc.Announce grabs a lock on tc.outboundOffers. It also handles the case where outboundOffers
179 // is nil. Take ownership of outboundOffers here.
181 offers := tc.outboundOffers
182 tc.outboundOffers = nil
189 // Iterate over our locally-owned offers, close any existing "invalid" ones from before the
190 // socket reconnected, reannounce the infohash, adding it back into the tc.outboundOffers.
191 tc.Logger.WithDefaultLevel(log.Info).Printf("reannouncing %d infohashes after restart", len(offers))
192 for _, offer := range offers {
193 // TODO: Capture the errors? Are we even in a position to do anything with them?
194 offer.peerConnection.Close()
195 // Use goroutine here to allow read loop to start and ensure the buffer drains.
196 go tc.Announce(tracker.Started, offer.infoHash)
200 func (tc *TrackerClient) closeUnusedOffers() {
201 for _, offer := range tc.outboundOffers {
202 offer.peerConnection.Close()
203 offer.dataChannel.Close()
205 tc.outboundOffers = nil
208 func (tc *TrackerClient) CloseOffersForInfohash(infoHash [20]byte) {
211 for key, offer := range tc.outboundOffers {
212 if offer.infoHash == infoHash {
213 offer.peerConnection.Close()
214 delete(tc.outboundOffers, key)
219 func (tc *TrackerClient) Announce(event tracker.AnnounceEvent, infoHash [20]byte) error {
220 metrics.Add("outbound announces", 1)
221 if event == tracker.Stopped {
222 return tc.announce(event, infoHash, nil)
224 var randOfferId [20]byte
225 _, err := rand.Read(randOfferId[:])
227 return fmt.Errorf("generating offer_id bytes: %w", err)
229 offerIDBinary := binaryToJsonString(randOfferId[:])
231 pc, dc, offer, err := tc.newOffer(tc.Logger, offerIDBinary, infoHash)
233 return fmt.Errorf("creating offer: %w", err)
236 err = tc.announce(event, infoHash, []outboundOffer{
238 offerId: offerIDBinary,
239 outboundOfferValue: outboundOfferValue{
240 originalOffer: offer,
254 func (tc *TrackerClient) announce(event tracker.AnnounceEvent, infoHash [20]byte, offers []outboundOffer) error {
255 request, err := tc.GetAnnounceRequest(event, infoHash)
257 return fmt.Errorf("getting announce parameters: %w", err)
260 req := AnnounceRequest{
261 Numwant: len(offers),
262 Uploaded: request.Uploaded,
263 Downloaded: request.Downloaded,
265 Event: request.Event.String(),
267 InfoHash: binaryToJsonString(infoHash[:]),
268 PeerID: tc.peerIdBinary(),
270 for _, offer := range offers {
271 req.Offers = append(req.Offers, Offer{
272 OfferID: offer.offerId,
273 Offer: offer.originalOffer,
277 data, err := json.Marshal(req)
279 return fmt.Errorf("marshalling request: %w", err)
284 err = tc.writeMessage(data)
286 return fmt.Errorf("write AnnounceRequest: %w", err)
288 for _, offer := range offers {
289 g.MakeMapIfNilAndSet(&tc.outboundOffers, offer.offerId, offer.outboundOfferValue)
294 func (tc *TrackerClient) writeMessage(data []byte) error {
295 for tc.wsConn == nil {
297 return fmt.Errorf("%T closed", tc)
301 return tc.wsConn.WriteMessage(websocket.TextMessage, data)
304 func (tc *TrackerClient) trackerReadLoop(tracker *websocket.Conn) error {
306 _, message, err := tracker.ReadMessage()
308 return fmt.Errorf("read message error: %w", err)
310 // tc.Logger.WithDefaultLevel(log.Debug).Printf("received message from tracker: %q", message)
312 var ar AnnounceResponse
313 if err := json.Unmarshal(message, &ar); err != nil {
314 tc.Logger.WithDefaultLevel(log.Warning).Printf("error unmarshalling announce response: %v", err)
318 case ar.Offer != nil:
319 ih, err := jsonStringToInfoHash(ar.InfoHash)
321 tc.Logger.WithDefaultLevel(log.Warning).Printf("error decoding info_hash in offer: %v", err)
324 err = tc.handleOffer(offerContext{
330 tc.Logger.Levelf(log.Error, "handling offer for infohash %x: %v", ih, err)
332 case ar.Answer != nil:
333 tc.handleAnswer(ar.OfferID, *ar.Answer)
335 tc.Logger.Levelf(log.Warning, "unhandled announce response %q", message)
340 type offerContext struct {
341 SessDesc webrtc.SessionDescription
346 func (tc *TrackerClient) handleOffer(
347 offerContext offerContext,
350 peerConnection, answer, err := tc.newAnsweringPeerConnection(offerContext)
352 return fmt.Errorf("creating answering peer connection: %w", err)
354 response := AnnounceResponse{
356 InfoHash: binaryToJsonString(offerContext.InfoHash[:]),
357 PeerID: tc.peerIdBinary(),
360 OfferID: offerContext.Id,
362 data, err := json.Marshal(response)
364 peerConnection.Close()
365 return fmt.Errorf("marshalling response: %w", err)
369 if err := tc.writeMessage(data); err != nil {
370 peerConnection.Close()
371 return fmt.Errorf("writing response: %w", err)
376 func (tc *TrackerClient) handleAnswer(offerId string, answer webrtc.SessionDescription) {
379 offer, ok := tc.outboundOffers[offerId]
381 tc.Logger.WithDefaultLevel(log.Warning).Printf("could not find offer for id %+q", offerId)
384 // tc.Logger.WithDefaultLevel(log.Debug).Printf("offer %q got answer %v", offerId, answer)
385 metrics.Add("outbound offers answered", 1)
386 err := offer.peerConnection.SetRemoteDescription(answer)
388 err = fmt.Errorf("using outbound offer answer: %w", err)
389 offer.peerConnection.span.RecordError(err)
390 tc.Logger.LevelPrint(log.Error, err)
393 delete(tc.outboundOffers, offerId)
394 go tc.Announce(tracker.None, offer.infoHash)