12 g "github.com/anacrolix/generics"
13 "github.com/anacrolix/log"
14 "github.com/gorilla/websocket"
15 "github.com/pion/datachannel"
16 "github.com/pion/webrtc/v3"
17 "go.opentelemetry.io/otel/trace"
19 "github.com/anacrolix/torrent/tracker"
22 type TrackerClientStats struct {
24 ConvertedInboundConns int64
25 ConvertedOutboundConns int64
28 // Client represents the webtorrent client
29 type TrackerClient struct {
31 GetAnnounceRequest func(_ tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error)
33 OnConn onDataChannelOpen
35 Dialer *websocket.Dialer
39 outboundOffers map[string]outboundOfferValue // OfferID to outboundOfferValue
40 wsConn *websocket.Conn
42 stats TrackerClientStats
43 pingTicker *time.Ticker
45 WebsocketTrackerHttpHeader func() http.Header
49 func (me *TrackerClient) Stats() TrackerClientStats {
55 func (me *TrackerClient) peerIdBinary() string {
56 return binaryToJsonString(me.PeerId[:])
59 type outboundOffer struct {
64 // outboundOfferValue represents an outstanding offer.
65 type outboundOfferValue struct {
66 originalOffer webrtc.SessionDescription
67 peerConnection *wrappedPeerConnection
69 dataChannel *webrtc.DataChannel
72 type DataChannelContext struct {
76 // This is private as some methods might not be appropriate with data channel context.
77 peerConnection *wrappedPeerConnection
79 Context context.Context
82 func (me *DataChannelContext) GetSelectedIceCandidatePair() (*webrtc.ICECandidatePair, error) {
83 return me.peerConnection.SCTP().Transport().ICETransport().GetSelectedCandidatePair()
86 type onDataChannelOpen func(_ datachannel.ReadWriteCloser, dcc DataChannelContext)
88 func (tc *TrackerClient) doWebsocket() error {
89 metrics.Add("websocket dials", 1)
94 var header http.Header
95 if tc.WebsocketTrackerHttpHeader != nil {
96 header = tc.WebsocketTrackerHttpHeader()
99 c, _, err := tc.Dialer.Dial(tc.Url, header)
101 return fmt.Errorf("dialing tracker: %w", err)
104 tc.Logger.WithDefaultLevel(log.Info).Printf("connected")
110 closeChan := make(chan struct{})
114 case <-tc.pingTicker.C:
116 err := c.WriteMessage(websocket.PingMessage, []byte{})
127 err = tc.trackerReadLoop(tc.wsConn)
135 // Finishes initialization and spawns the run routine, calling onStop when it completes with the
136 // result. We don't let the caller just spawn the runner directly, since then we can race against
137 // .Close to finish initialization.
138 func (tc *TrackerClient) Start(onStop func(error)) {
139 tc.pingTicker = time.NewTicker(60 * time.Second)
146 func (tc *TrackerClient) run() error {
150 err := tc.doWebsocket()
157 tc.Logger.WithDefaultLevel(level).Printf("websocket instance ended: %v", err)
158 time.Sleep(time.Minute)
165 func (tc *TrackerClient) Close() error {
168 if tc.wsConn != nil {
171 tc.closeUnusedOffers()
178 func (tc *TrackerClient) announceOffers() {
179 // tc.Announce grabs a lock on tc.outboundOffers. It also handles the case where outboundOffers
180 // is nil. Take ownership of outboundOffers here.
182 offers := tc.outboundOffers
183 tc.outboundOffers = nil
190 // Iterate over our locally-owned offers, close any existing "invalid" ones from before the
191 // socket reconnected, reannounce the infohash, adding it back into the tc.outboundOffers.
192 tc.Logger.WithDefaultLevel(log.Info).Printf("reannouncing %d infohashes after restart", len(offers))
193 for _, offer := range offers {
194 // TODO: Capture the errors? Are we even in a position to do anything with them?
195 offer.peerConnection.Close()
196 // Use goroutine here to allow read loop to start and ensure the buffer drains.
197 go tc.Announce(tracker.Started, offer.infoHash)
201 func (tc *TrackerClient) closeUnusedOffers() {
202 for _, offer := range tc.outboundOffers {
203 offer.peerConnection.Close()
204 offer.dataChannel.Close()
206 tc.outboundOffers = nil
209 func (tc *TrackerClient) CloseOffersForInfohash(infoHash [20]byte) {
212 for key, offer := range tc.outboundOffers {
213 if offer.infoHash == infoHash {
214 offer.peerConnection.Close()
215 delete(tc.outboundOffers, key)
220 func (tc *TrackerClient) Announce(event tracker.AnnounceEvent, infoHash [20]byte) error {
221 metrics.Add("outbound announces", 1)
222 if event == tracker.Stopped {
223 return tc.announce(event, infoHash, nil)
225 var randOfferId [20]byte
226 _, err := rand.Read(randOfferId[:])
228 return fmt.Errorf("generating offer_id bytes: %w", err)
230 offerIDBinary := binaryToJsonString(randOfferId[:])
232 pc, dc, offer, err := tc.newOffer(tc.Logger, offerIDBinary, infoHash)
234 return fmt.Errorf("creating offer: %w", err)
237 err = tc.announce(event, infoHash, []outboundOffer{
239 offerId: offerIDBinary,
240 outboundOfferValue: outboundOfferValue{
241 originalOffer: offer,
255 func (tc *TrackerClient) announce(event tracker.AnnounceEvent, infoHash [20]byte, offers []outboundOffer) error {
256 request, err := tc.GetAnnounceRequest(event, infoHash)
258 return fmt.Errorf("getting announce parameters: %w", err)
261 req := AnnounceRequest{
262 Numwant: len(offers),
263 Uploaded: request.Uploaded,
264 Downloaded: request.Downloaded,
266 Event: request.Event.String(),
268 InfoHash: binaryToJsonString(infoHash[:]),
269 PeerID: tc.peerIdBinary(),
271 for _, offer := range offers {
272 req.Offers = append(req.Offers, Offer{
273 OfferID: offer.offerId,
274 Offer: offer.originalOffer,
278 data, err := json.Marshal(req)
280 return fmt.Errorf("marshalling request: %w", err)
285 err = tc.writeMessage(data)
287 return fmt.Errorf("write AnnounceRequest: %w", err)
289 for _, offer := range offers {
290 g.MakeMapIfNilAndSet(&tc.outboundOffers, offer.offerId, offer.outboundOfferValue)
295 func (tc *TrackerClient) writeMessage(data []byte) error {
296 for tc.wsConn == nil {
298 return fmt.Errorf("%T closed", tc)
302 return tc.wsConn.WriteMessage(websocket.TextMessage, data)
305 func (tc *TrackerClient) trackerReadLoop(tracker *websocket.Conn) error {
307 _, message, err := tracker.ReadMessage()
309 return fmt.Errorf("read message error: %w", err)
311 // tc.Logger.WithDefaultLevel(log.Debug).Printf("received message from tracker: %q", message)
313 var ar AnnounceResponse
314 if err := json.Unmarshal(message, &ar); err != nil {
315 tc.Logger.WithDefaultLevel(log.Warning).Printf("error unmarshalling announce response: %v", err)
319 case ar.Offer != nil:
320 ih, err := jsonStringToInfoHash(ar.InfoHash)
322 tc.Logger.WithDefaultLevel(log.Warning).Printf("error decoding info_hash in offer: %v", err)
325 err = tc.handleOffer(offerContext{
331 tc.Logger.Levelf(log.Error, "handling offer for infohash %x: %v", ih, err)
333 case ar.Answer != nil:
334 tc.handleAnswer(ar.OfferID, *ar.Answer)
336 tc.Logger.Levelf(log.Warning, "unhandled announce response %q", message)
341 type offerContext struct {
342 SessDesc webrtc.SessionDescription
347 func (tc *TrackerClient) handleOffer(
348 offerContext offerContext,
351 peerConnection, answer, err := tc.newAnsweringPeerConnection(offerContext)
353 return fmt.Errorf("creating answering peer connection: %w", err)
355 response := AnnounceResponse{
357 InfoHash: binaryToJsonString(offerContext.InfoHash[:]),
358 PeerID: tc.peerIdBinary(),
361 OfferID: offerContext.Id,
363 data, err := json.Marshal(response)
365 peerConnection.Close()
366 return fmt.Errorf("marshalling response: %w", err)
370 if err := tc.writeMessage(data); err != nil {
371 peerConnection.Close()
372 return fmt.Errorf("writing response: %w", err)
377 func (tc *TrackerClient) handleAnswer(offerId string, answer webrtc.SessionDescription) {
380 offer, ok := tc.outboundOffers[offerId]
382 tc.Logger.WithDefaultLevel(log.Warning).Printf("could not find offer for id %+q", offerId)
385 // tc.Logger.WithDefaultLevel(log.Debug).Printf("offer %q got answer %v", offerId, answer)
386 metrics.Add("outbound offers answered", 1)
387 err := offer.peerConnection.SetRemoteDescription(answer)
389 err = fmt.Errorf("using outbound offer answer: %w", err)
390 offer.peerConnection.span.RecordError(err)
391 tc.Logger.LevelPrint(log.Error, err)
394 delete(tc.outboundOffers, offerId)
395 go tc.Announce(tracker.None, offer.infoHash)