]> Sergey Matveev's repositories - btrtrc.git/blobdiff - webtorrent/tracker-client.go
Add tracing to webtorrent webrtc resources
[btrtrc.git] / webtorrent / tracker-client.go
index 3b8c6a6bbcdfb2a4cbe08a9a78fce75eace7cc7d..95d87ff45af7d3855441bc4e53efe47aba55d785 100644 (file)
@@ -1,9 +1,12 @@
 package webtorrent
 
 import (
+       "context"
        "crypto/rand"
        "encoding/json"
        "fmt"
+       "go.opentelemetry.io/otel/codes"
+       "go.opentelemetry.io/otel/trace"
        "sync"
        "time"
 
@@ -53,7 +56,6 @@ func (me *TrackerClient) peerIdBinary() string {
 type outboundOffer struct {
        originalOffer  webrtc.SessionDescription
        peerConnection *wrappedPeerConnection
-       dataChannel    *webrtc.DataChannel
        infoHash       [20]byte
 }
 
@@ -65,6 +67,8 @@ type DataChannelContext struct {
        InfoHash      [20]byte
        // This is private as some methods might not be appropriate with data channel context.
        peerConnection *wrappedPeerConnection
+       span           trace.Span
+       ctx            context.Context
 }
 
 func (me *DataChannelContext) GetSelectedIceCandidatePair() (*webrtc.ICECandidatePair, error) {
@@ -207,7 +211,7 @@ func (tc *TrackerClient) Announce(event tracker.AnnounceEvent, infoHash [20]byte
        }
        offerIDBinary := binaryToJsonString(randOfferId[:])
 
-       pc, dc, offer, err := newOffer()
+       pc, offer, err := newOffer(tc.Logger)
        if err != nil {
                return fmt.Errorf("creating offer: %w", err)
        }
@@ -251,7 +255,6 @@ func (tc *TrackerClient) Announce(event tracker.AnnounceEvent, infoHash [20]byte
        }
        tc.outboundOffers[offerIDBinary] = outboundOffer{
                peerConnection: pc,
-               dataChannel:    dc,
                originalOffer:  offer,
                infoHash:       infoHash,
        }
@@ -301,7 +304,7 @@ func (tc *TrackerClient) handleOffer(
        infoHash [20]byte,
        peerId string,
 ) error {
-       peerConnection, answer, err := newAnsweringPeerConnection(offer)
+       peerConnection, answer, err := newAnsweringPeerConnection(tc.Logger, offer)
        if err != nil {
                return fmt.Errorf("write AnnounceResponse: %w", err)
        }
@@ -325,11 +328,13 @@ func (tc *TrackerClient) handleOffer(
                return fmt.Errorf("writing response: %w", err)
        }
        timer := time.AfterFunc(30*time.Second, func() {
+               peerConnection.span.SetStatus(codes.Error, "answer timeout")
                metrics.Add("answering peer connections timed out", 1)
                peerConnection.Close()
        })
        peerConnection.OnDataChannel(func(d *webrtc.DataChannel) {
-               setDataChannelOnOpen(d, peerConnection, func(dc datachannel.ReadWriteCloser) {
+               ctx, span := dataChannelStarted(peerConnection.ctx, d)
+               setDataChannelOnOpen(ctx, d, peerConnection, func(dc datachannel.ReadWriteCloser) {
                        timer.Stop()
                        metrics.Add("answering peer connection conversions", 1)
                        tc.mu.Lock()
@@ -342,6 +347,8 @@ func (tc *TrackerClient) handleOffer(
                                LocalOffered:   false,
                                InfoHash:       infoHash,
                                peerConnection: peerConnection,
+                               ctx:            ctx,
+                               span:           span,
                        })
                })
        })
@@ -358,24 +365,44 @@ func (tc *TrackerClient) handleAnswer(offerId string, answer webrtc.SessionDescr
        }
        // tc.Logger.WithDefaultLevel(log.Debug).Printf("offer %q got answer %v", offerId, answer)
        metrics.Add("outbound offers answered", 1)
-       err := offer.setAnswer(answer, func(dc datachannel.ReadWriteCloser) {
-               metrics.Add("outbound offers answered with datachannel", 1)
-               tc.mu.Lock()
-               tc.stats.ConvertedOutboundConns++
-               tc.mu.Unlock()
-               tc.OnConn(dc, DataChannelContext{
-                       Local:          offer.originalOffer,
-                       Remote:         answer,
-                       OfferId:        offerId,
-                       LocalOffered:   true,
-                       InfoHash:       offer.infoHash,
-                       peerConnection: offer.peerConnection,
+       // Why do we create the data channel before setting the remote description? Are we trying to avoid the peer
+       // initiating?
+       dataChannel, err := offer.peerConnection.CreateDataChannel("webrtc-datachannel", nil)
+       if err != nil {
+               err = fmt.Errorf("creating data channel: %w", err)
+               tc.Logger.LevelPrint(log.Error, err)
+               offer.peerConnection.span.RecordError(err)
+               offer.peerConnection.Close()
+               goto deleteOffer
+       }
+       {
+               ctx, span := dataChannelStarted(offer.peerConnection.ctx, dataChannel)
+               setDataChannelOnOpen(ctx, dataChannel, offer.peerConnection, func(dc datachannel.ReadWriteCloser) {
+                       metrics.Add("outbound offers answered with datachannel", 1)
+                       tc.mu.Lock()
+                       tc.stats.ConvertedOutboundConns++
+                       tc.mu.Unlock()
+                       tc.OnConn(dc, DataChannelContext{
+                               Local:          offer.originalOffer,
+                               Remote:         answer,
+                               OfferId:        offerId,
+                               LocalOffered:   true,
+                               InfoHash:       offer.infoHash,
+                               peerConnection: offer.peerConnection,
+                               ctx:            ctx,
+                               span:           span,
+                       })
                })
-       })
+       }
+       err = offer.peerConnection.SetRemoteDescription(answer)
        if err != nil {
-               tc.Logger.WithDefaultLevel(log.Warning).Printf("error using outbound offer answer: %v", err)
+               err = fmt.Errorf("using outbound offer answer: %w", err)
+               offer.peerConnection.span.RecordError(err)
+               dataChannel.Close()
+               tc.Logger.WithDefaultLevel(log.Error).Print(err)
                return
        }
+deleteOffer:
        delete(tc.outboundOffers, offerId)
        go tc.Announce(tracker.None, offer.infoHash)
 }