8 "go.opentelemetry.io/otel/codes"
9 "go.opentelemetry.io/otel/trace"
13 "github.com/anacrolix/log"
15 "github.com/anacrolix/torrent/tracker"
16 "github.com/gorilla/websocket"
17 "github.com/pion/datachannel"
18 "github.com/pion/webrtc/v3"
21 type TrackerClientStats struct {
23 ConvertedInboundConns int64
24 ConvertedOutboundConns int64
27 // Client represents the webtorrent client
28 type TrackerClient struct {
30 GetAnnounceRequest func(_ tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error)
32 OnConn onDataChannelOpen
34 Dialer *websocket.Dialer
38 outboundOffers map[string]outboundOffer // OfferID to outboundOffer
39 wsConn *websocket.Conn
41 stats TrackerClientStats
42 pingTicker *time.Ticker
45 func (me *TrackerClient) Stats() TrackerClientStats {
51 func (me *TrackerClient) peerIdBinary() string {
52 return binaryToJsonString(me.PeerId[:])
55 // outboundOffer represents an outstanding offer.
56 type outboundOffer struct {
57 originalOffer webrtc.SessionDescription
58 peerConnection *wrappedPeerConnection
62 type DataChannelContext struct {
63 // Can these be obtained by just calling the relevant methods on peerConnection?
64 Local, Remote webrtc.SessionDescription
68 // This is private as some methods might not be appropriate with data channel context.
69 peerConnection *wrappedPeerConnection
74 func (me *DataChannelContext) GetSelectedIceCandidatePair() (*webrtc.ICECandidatePair, error) {
75 return me.peerConnection.SCTP().Transport().ICETransport().GetSelectedCandidatePair()
78 type onDataChannelOpen func(_ datachannel.ReadWriteCloser, dcc DataChannelContext)
80 func (tc *TrackerClient) doWebsocket() error {
81 metrics.Add("websocket dials", 1)
85 c, _, err := tc.Dialer.Dial(tc.Url, nil)
87 return fmt.Errorf("dialing tracker: %w", err)
90 tc.Logger.WithDefaultLevel(log.Info).Printf("connected")
96 closeChan := make(chan struct{})
100 case <-tc.pingTicker.C:
102 err := c.WriteMessage(websocket.PingMessage, []byte{})
113 err = tc.trackerReadLoop(tc.wsConn)
121 // Finishes initialization and spawns the run routine, calling onStop when it completes with the
122 // result. We don't let the caller just spawn the runner directly, since then we can race against
123 // .Close to finish initialization.
124 func (tc *TrackerClient) Start(onStop func(error)) {
125 tc.pingTicker = time.NewTicker(60 * time.Second)
132 func (tc *TrackerClient) run() error {
136 err := tc.doWebsocket()
143 tc.Logger.WithDefaultLevel(level).Printf("websocket instance ended: %v", err)
144 time.Sleep(time.Minute)
151 func (tc *TrackerClient) Close() error {
154 if tc.wsConn != nil {
157 tc.closeUnusedOffers()
164 func (tc *TrackerClient) announceOffers() {
165 // tc.Announce grabs a lock on tc.outboundOffers. It also handles the case where outboundOffers
166 // is nil. Take ownership of outboundOffers here.
168 offers := tc.outboundOffers
169 tc.outboundOffers = nil
176 // Iterate over our locally-owned offers, close any existing "invalid" ones from before the
177 // socket reconnected, reannounce the infohash, adding it back into the tc.outboundOffers.
178 tc.Logger.WithDefaultLevel(log.Info).Printf("reannouncing %d infohashes after restart", len(offers))
179 for _, offer := range offers {
180 // TODO: Capture the errors? Are we even in a position to do anything with them?
181 offer.peerConnection.Close()
182 // Use goroutine here to allow read loop to start and ensure the buffer drains.
183 go tc.Announce(tracker.Started, offer.infoHash)
187 func (tc *TrackerClient) closeUnusedOffers() {
188 for _, offer := range tc.outboundOffers {
189 offer.peerConnection.Close()
191 tc.outboundOffers = nil
194 func (tc *TrackerClient) CloseOffersForInfohash(infoHash [20]byte) {
197 for key, offer := range tc.outboundOffers {
198 if offer.infoHash == infoHash {
199 offer.peerConnection.Close()
200 delete(tc.outboundOffers, key)
205 func (tc *TrackerClient) Announce(event tracker.AnnounceEvent, infoHash [20]byte) error {
206 metrics.Add("outbound announces", 1)
207 var randOfferId [20]byte
208 _, err := rand.Read(randOfferId[:])
210 return fmt.Errorf("generating offer_id bytes: %w", err)
212 offerIDBinary := binaryToJsonString(randOfferId[:])
214 pc, offer, err := newOffer(tc.Logger)
216 return fmt.Errorf("creating offer: %w", err)
219 request, err := tc.GetAnnounceRequest(event, infoHash)
222 return fmt.Errorf("getting announce parameters: %w", err)
225 req := AnnounceRequest{
226 Numwant: 1, // If higher we need to create equal amount of offers.
227 Uploaded: request.Uploaded,
228 Downloaded: request.Downloaded,
230 Event: request.Event.String(),
232 InfoHash: binaryToJsonString(infoHash[:]),
233 PeerID: tc.peerIdBinary(),
235 OfferID: offerIDBinary,
240 data, err := json.Marshal(req)
243 return fmt.Errorf("marshalling request: %w", err)
248 err = tc.writeMessage(data)
251 return fmt.Errorf("write AnnounceRequest: %w", err)
253 if tc.outboundOffers == nil {
254 tc.outboundOffers = make(map[string]outboundOffer)
256 tc.outboundOffers[offerIDBinary] = outboundOffer{
258 originalOffer: offer,
264 func (tc *TrackerClient) writeMessage(data []byte) error {
265 for tc.wsConn == nil {
267 return fmt.Errorf("%T closed", tc)
271 return tc.wsConn.WriteMessage(websocket.TextMessage, data)
274 func (tc *TrackerClient) trackerReadLoop(tracker *websocket.Conn) error {
276 _, message, err := tracker.ReadMessage()
278 return fmt.Errorf("read message error: %w", err)
280 // tc.Logger.WithDefaultLevel(log.Debug).Printf("received message from tracker: %q", message)
282 var ar AnnounceResponse
283 if err := json.Unmarshal(message, &ar); err != nil {
284 tc.Logger.WithDefaultLevel(log.Warning).Printf("error unmarshalling announce response: %v", err)
288 case ar.Offer != nil:
289 ih, err := jsonStringToInfoHash(ar.InfoHash)
291 tc.Logger.WithDefaultLevel(log.Warning).Printf("error decoding info_hash in offer: %v", err)
294 tc.handleOffer(*ar.Offer, ar.OfferID, ih, ar.PeerID)
295 case ar.Answer != nil:
296 tc.handleAnswer(ar.OfferID, *ar.Answer)
301 func (tc *TrackerClient) handleOffer(
302 offer webrtc.SessionDescription,
307 peerConnection, answer, err := newAnsweringPeerConnection(tc.Logger, offer)
309 return fmt.Errorf("write AnnounceResponse: %w", err)
311 response := AnnounceResponse{
313 InfoHash: binaryToJsonString(infoHash[:]),
314 PeerID: tc.peerIdBinary(),
319 data, err := json.Marshal(response)
321 peerConnection.Close()
322 return fmt.Errorf("marshalling response: %w", err)
326 if err := tc.writeMessage(data); err != nil {
327 peerConnection.Close()
328 return fmt.Errorf("writing response: %w", err)
330 timer := time.AfterFunc(30*time.Second, func() {
331 peerConnection.span.SetStatus(codes.Error, "answer timeout")
332 metrics.Add("answering peer connections timed out", 1)
333 peerConnection.Close()
335 peerConnection.OnDataChannel(func(d *webrtc.DataChannel) {
336 ctx, span := dataChannelStarted(peerConnection.ctx, d)
337 setDataChannelOnOpen(ctx, d, peerConnection, func(dc datachannel.ReadWriteCloser) {
339 metrics.Add("answering peer connection conversions", 1)
341 tc.stats.ConvertedInboundConns++
343 tc.OnConn(dc, DataChannelContext{
349 peerConnection: peerConnection,
358 func (tc *TrackerClient) handleAnswer(offerId string, answer webrtc.SessionDescription) {
361 offer, ok := tc.outboundOffers[offerId]
363 tc.Logger.WithDefaultLevel(log.Warning).Printf("could not find offer for id %+q", offerId)
366 // tc.Logger.WithDefaultLevel(log.Debug).Printf("offer %q got answer %v", offerId, answer)
367 metrics.Add("outbound offers answered", 1)
368 // Why do we create the data channel before setting the remote description? Are we trying to avoid the peer
370 dataChannel, err := offer.peerConnection.CreateDataChannel("webrtc-datachannel", nil)
372 err = fmt.Errorf("creating data channel: %w", err)
373 tc.Logger.LevelPrint(log.Error, err)
374 offer.peerConnection.span.RecordError(err)
375 offer.peerConnection.Close()
379 ctx, span := dataChannelStarted(offer.peerConnection.ctx, dataChannel)
380 setDataChannelOnOpen(ctx, dataChannel, offer.peerConnection, func(dc datachannel.ReadWriteCloser) {
381 metrics.Add("outbound offers answered with datachannel", 1)
383 tc.stats.ConvertedOutboundConns++
385 tc.OnConn(dc, DataChannelContext{
386 Local: offer.originalOffer,
390 InfoHash: offer.infoHash,
391 peerConnection: offer.peerConnection,
397 err = offer.peerConnection.SetRemoteDescription(answer)
399 err = fmt.Errorf("using outbound offer answer: %w", err)
400 offer.peerConnection.span.RecordError(err)
402 tc.Logger.WithDefaultLevel(log.Error).Print(err)
406 delete(tc.outboundOffers, offerId)
407 go tc.Announce(tracker.None, offer.infoHash)