]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Add tracing to webtorrent webrtc resources
authorMatt Joiner <anacrolix@gmail.com>
Tue, 12 Jul 2022 06:05:19 +0000 (16:05 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 12 Jul 2022 06:15:49 +0000 (16:15 +1000)
go.mod
go.sum
webtorrent/otel.go [new file with mode: 0644]
webtorrent/tracker-client.go
webtorrent/transport.go

diff --git a/go.mod b/go.mod
index 8e351f99c4ae461e6286125d6e9816bb5a9b6e87..9cb0f03ed6284b22ed4d76b17c98a6f9a9715015 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -14,7 +14,7 @@ require (
        github.com/anacrolix/fuse v0.2.0
        github.com/anacrolix/generics v0.0.0-20220618083756-f99e35403a60
        github.com/anacrolix/go-libutp v1.2.0
-       github.com/anacrolix/log v0.13.2-0.20220427063716-a4894bb521c6
+       github.com/anacrolix/log v0.13.2-0.20220711050817-613cb738ef30
        github.com/anacrolix/missinggo v1.3.0
        github.com/anacrolix/missinggo/perf v1.0.0
        github.com/anacrolix/missinggo/v2 v2.7.0
@@ -58,6 +58,8 @@ require (
        github.com/beorn7/perks v1.0.1 // indirect
        github.com/bits-and-blooms/bitset v1.2.2 // indirect
        github.com/cespare/xxhash/v2 v2.1.2 // indirect
+       github.com/go-logr/logr v1.2.3 // indirect
+       github.com/go-logr/stdr v1.2.2 // indirect
        github.com/golang/protobuf v1.5.2 // indirect
        github.com/google/uuid v1.3.0 // indirect
        github.com/huandu/xstrings v1.3.2 // indirect
@@ -86,6 +88,8 @@ require (
        github.com/rogpeppe/go-internal v1.8.1 // indirect
        github.com/rs/dnscache v0.0.0-20211102005908-e0241e321417 // indirect
        github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46 // indirect
+       go.opentelemetry.io/otel v1.8.0 // indirect
+       go.opentelemetry.io/otel/trace v1.8.0 // indirect
        golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect
        golang.org/x/exp v0.0.0-20220613132600-b0d781184e0d // indirect
        golang.org/x/net v0.0.0-20220630215102-69896b714898 // indirect
diff --git a/go.sum b/go.sum
index b936cbe34d1cc4de00469e23c1cb9140d18a32b9..548c202a79b3c9385f1521f9e8e390ab666093b1 100644 (file)
--- a/go.sum
+++ b/go.sum
@@ -93,6 +93,8 @@ github.com/anacrolix/log v0.10.0/go.mod h1:s5yBP/j046fm9odtUTbHOfDUq/zh1W8OkPpJt
 github.com/anacrolix/log v0.10.1-0.20220123034749-3920702c17f8/go.mod h1:GmnE2c0nvz8pOIPUSC9Rawgefy1sDXqposC2wgtBZE4=
 github.com/anacrolix/log v0.13.2-0.20220427063716-a4894bb521c6 h1:WH/Xcok0GpNID/NUV80CfTwUYXdbhR3pX/DXboxGhNI=
 github.com/anacrolix/log v0.13.2-0.20220427063716-a4894bb521c6/go.mod h1:D4+CvN8SnruK6zIFS/xPoRJmtvtnxs+CSfDQ+BFxZ68=
+github.com/anacrolix/log v0.13.2-0.20220711050817-613cb738ef30 h1:bAgFzUxN1K3U8KwOzqCOhiygOr5NqYO3kNlV9tvp2Rc=
+github.com/anacrolix/log v0.13.2-0.20220711050817-613cb738ef30/go.mod h1:D4+CvN8SnruK6zIFS/xPoRJmtvtnxs+CSfDQ+BFxZ68=
 github.com/anacrolix/lsan v0.0.0-20211126052245-807000409a62 h1:P04VG6Td13FHMgS5ZBcJX23NPC/fiC4cp9bXwYujdYM=
 github.com/anacrolix/lsan v0.0.0-20211126052245-807000409a62/go.mod h1:66cFKPCO7Sl4vbFnAaSq7e4OXtdMhRSBagJGWgmpJbM=
 github.com/anacrolix/missinggo v0.0.0-20180725070939-60ef2fbf63df/go.mod h1:kwGiTUTZ0+p4vAz3VbAI5a30t2YbvemcmspjKwrAz5s=
@@ -202,6 +204,11 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
 github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
 github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
+github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
+github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
+github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
+github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
+github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
 github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
 github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
 github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
@@ -474,6 +481,10 @@ go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
 go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
 go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
 go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
+go.opentelemetry.io/otel v1.8.0 h1:zcvBFizPbpa1q7FehvFiHbQwGzmPILebO0tyqIR5Djg=
+go.opentelemetry.io/otel v1.8.0/go.mod h1:2pkj+iMj0o03Y+cW6/m8Y4WkRdYN3AvCXCnzRMp9yvM=
+go.opentelemetry.io/otel/trace v1.8.0 h1:cSy0DF9eGI5WIfNwZ1q2iUyGj00tGzP24dE1lOlHrfY=
+go.opentelemetry.io/otel/trace v1.8.0/go.mod h1:0Bt3PXY8w+3pheS3hQUt+wow8b1ojPaTBoTCh2zIFI4=
 golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
 golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
diff --git a/webtorrent/otel.go b/webtorrent/otel.go
new file mode 100644 (file)
index 0000000..3c40b32
--- /dev/null
@@ -0,0 +1,21 @@
+package webtorrent
+
+import (
+       "context"
+       "github.com/pion/webrtc/v3"
+       "go.opentelemetry.io/otel"
+       "go.opentelemetry.io/otel/trace"
+)
+
+const (
+       tracerName        = "anacrolix.torrent.webtorrent"
+       webrtcConnTypeKey = "webtorrent.webrtc.conn.type"
+)
+
+func dataChannelStarted(peerConnectionCtx context.Context, dc *webrtc.DataChannel) (dataChannelCtx context.Context, span trace.Span) {
+       dataChannelCtx, span = otel.Tracer(tracerName).Start(peerConnectionCtx, "DataChannel")
+       dc.OnClose(func() {
+               span.End()
+       })
+       return
+}
index 3b8c6a6bbcdfb2a4cbe08a9a78fce75eace7cc7d..95d87ff45af7d3855441bc4e53efe47aba55d785 100644 (file)
@@ -1,9 +1,12 @@
 package webtorrent
 
 import (
+       "context"
        "crypto/rand"
        "encoding/json"
        "fmt"
+       "go.opentelemetry.io/otel/codes"
+       "go.opentelemetry.io/otel/trace"
        "sync"
        "time"
 
@@ -53,7 +56,6 @@ func (me *TrackerClient) peerIdBinary() string {
 type outboundOffer struct {
        originalOffer  webrtc.SessionDescription
        peerConnection *wrappedPeerConnection
-       dataChannel    *webrtc.DataChannel
        infoHash       [20]byte
 }
 
@@ -65,6 +67,8 @@ type DataChannelContext struct {
        InfoHash      [20]byte
        // This is private as some methods might not be appropriate with data channel context.
        peerConnection *wrappedPeerConnection
+       span           trace.Span
+       ctx            context.Context
 }
 
 func (me *DataChannelContext) GetSelectedIceCandidatePair() (*webrtc.ICECandidatePair, error) {
@@ -207,7 +211,7 @@ func (tc *TrackerClient) Announce(event tracker.AnnounceEvent, infoHash [20]byte
        }
        offerIDBinary := binaryToJsonString(randOfferId[:])
 
-       pc, dc, offer, err := newOffer()
+       pc, offer, err := newOffer(tc.Logger)
        if err != nil {
                return fmt.Errorf("creating offer: %w", err)
        }
@@ -251,7 +255,6 @@ func (tc *TrackerClient) Announce(event tracker.AnnounceEvent, infoHash [20]byte
        }
        tc.outboundOffers[offerIDBinary] = outboundOffer{
                peerConnection: pc,
-               dataChannel:    dc,
                originalOffer:  offer,
                infoHash:       infoHash,
        }
@@ -301,7 +304,7 @@ func (tc *TrackerClient) handleOffer(
        infoHash [20]byte,
        peerId string,
 ) error {
-       peerConnection, answer, err := newAnsweringPeerConnection(offer)
+       peerConnection, answer, err := newAnsweringPeerConnection(tc.Logger, offer)
        if err != nil {
                return fmt.Errorf("write AnnounceResponse: %w", err)
        }
@@ -325,11 +328,13 @@ func (tc *TrackerClient) handleOffer(
                return fmt.Errorf("writing response: %w", err)
        }
        timer := time.AfterFunc(30*time.Second, func() {
+               peerConnection.span.SetStatus(codes.Error, "answer timeout")
                metrics.Add("answering peer connections timed out", 1)
                peerConnection.Close()
        })
        peerConnection.OnDataChannel(func(d *webrtc.DataChannel) {
-               setDataChannelOnOpen(d, peerConnection, func(dc datachannel.ReadWriteCloser) {
+               ctx, span := dataChannelStarted(peerConnection.ctx, d)
+               setDataChannelOnOpen(ctx, d, peerConnection, func(dc datachannel.ReadWriteCloser) {
                        timer.Stop()
                        metrics.Add("answering peer connection conversions", 1)
                        tc.mu.Lock()
@@ -342,6 +347,8 @@ func (tc *TrackerClient) handleOffer(
                                LocalOffered:   false,
                                InfoHash:       infoHash,
                                peerConnection: peerConnection,
+                               ctx:            ctx,
+                               span:           span,
                        })
                })
        })
@@ -358,24 +365,44 @@ func (tc *TrackerClient) handleAnswer(offerId string, answer webrtc.SessionDescr
        }
        // tc.Logger.WithDefaultLevel(log.Debug).Printf("offer %q got answer %v", offerId, answer)
        metrics.Add("outbound offers answered", 1)
-       err := offer.setAnswer(answer, func(dc datachannel.ReadWriteCloser) {
-               metrics.Add("outbound offers answered with datachannel", 1)
-               tc.mu.Lock()
-               tc.stats.ConvertedOutboundConns++
-               tc.mu.Unlock()
-               tc.OnConn(dc, DataChannelContext{
-                       Local:          offer.originalOffer,
-                       Remote:         answer,
-                       OfferId:        offerId,
-                       LocalOffered:   true,
-                       InfoHash:       offer.infoHash,
-                       peerConnection: offer.peerConnection,
+       // Why do we create the data channel before setting the remote description? Are we trying to avoid the peer
+       // initiating?
+       dataChannel, err := offer.peerConnection.CreateDataChannel("webrtc-datachannel", nil)
+       if err != nil {
+               err = fmt.Errorf("creating data channel: %w", err)
+               tc.Logger.LevelPrint(log.Error, err)
+               offer.peerConnection.span.RecordError(err)
+               offer.peerConnection.Close()
+               goto deleteOffer
+       }
+       {
+               ctx, span := dataChannelStarted(offer.peerConnection.ctx, dataChannel)
+               setDataChannelOnOpen(ctx, dataChannel, offer.peerConnection, func(dc datachannel.ReadWriteCloser) {
+                       metrics.Add("outbound offers answered with datachannel", 1)
+                       tc.mu.Lock()
+                       tc.stats.ConvertedOutboundConns++
+                       tc.mu.Unlock()
+                       tc.OnConn(dc, DataChannelContext{
+                               Local:          offer.originalOffer,
+                               Remote:         answer,
+                               OfferId:        offerId,
+                               LocalOffered:   true,
+                               InfoHash:       offer.infoHash,
+                               peerConnection: offer.peerConnection,
+                               ctx:            ctx,
+                               span:           span,
+                       })
                })
-       })
+       }
+       err = offer.peerConnection.SetRemoteDescription(answer)
        if err != nil {
-               tc.Logger.WithDefaultLevel(log.Warning).Printf("error using outbound offer answer: %v", err)
+               err = fmt.Errorf("using outbound offer answer: %w", err)
+               offer.peerConnection.span.RecordError(err)
+               dataChannel.Close()
+               tc.Logger.WithDefaultLevel(log.Error).Print(err)
                return
        }
+deleteOffer:
        delete(tc.outboundOffers, offerId)
        go tc.Announce(tracker.None, offer.infoHash)
 }
index e4c3b73d88dd9d55094523b8992c44d7613a93d9..9cff003e21166f58b098acb1278bca517581c6be 100644 (file)
@@ -1,14 +1,20 @@
 package webtorrent
 
 import (
+       "context"
        "expvar"
        "fmt"
+       "go.opentelemetry.io/otel/attribute"
+       "go.opentelemetry.io/otel/codes"
+       "go.opentelemetry.io/otel/trace"
        "io"
        "sync"
 
+       "github.com/anacrolix/log"
        "github.com/anacrolix/missinggo/v2/pproffd"
        "github.com/pion/datachannel"
        "github.com/pion/webrtc/v3"
+       "go.opentelemetry.io/otel"
 )
 
 var (
@@ -26,58 +32,81 @@ type wrappedPeerConnection struct {
        *webrtc.PeerConnection
        closeMu sync.Mutex
        pproffd.CloseWrapper
+       span trace.Span
+       ctx  context.Context
 }
 
 func (me *wrappedPeerConnection) Close() error {
        me.closeMu.Lock()
        defer me.closeMu.Unlock()
-       return me.CloseWrapper.Close()
+       err := me.CloseWrapper.Close()
+       me.span.End()
+       return err
 }
 
-func newPeerConnection() (*wrappedPeerConnection, error) {
+func newPeerConnection(logger log.Logger) (*wrappedPeerConnection, error) {
        newPeerConnectionMu.Lock()
        defer newPeerConnectionMu.Unlock()
+       ctx, span := otel.Tracer(tracerName).Start(context.Background(), "PeerConnection")
        pc, err := api.NewPeerConnection(config)
        if err != nil {
+               span.SetStatus(codes.Error, err.Error())
+               span.RecordError(err)
+               span.End()
                return nil, err
        }
-       return &wrappedPeerConnection{
+       wpc := &wrappedPeerConnection{
                PeerConnection: pc,
                CloseWrapper:   pproffd.NewCloseWrapper(pc),
-       }, nil
+               ctx:            ctx,
+               span:           span,
+       }
+       // If the state change handler intends to call Close, it should call it on the wrapper.
+       wpc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
+               logger.Levelf(log.Warning, "webrtc PeerConnection state changed to %v", state)
+               span.AddEvent("connection state changed", trace.WithAttributes(attribute.String("state", state.String())))
+       })
+       return wpc, nil
+}
+
+func setAndGatherLocalDescription(peerConnection *wrappedPeerConnection, sdp webrtc.SessionDescription) (_ webrtc.SessionDescription, err error) {
+       gatherComplete := webrtc.GatheringCompletePromise(peerConnection.PeerConnection)
+       peerConnection.span.AddEvent("setting local description")
+       err = peerConnection.SetLocalDescription(sdp)
+       if err != nil {
+               err = fmt.Errorf("setting local description: %w", err)
+               return
+       }
+       <-gatherComplete
+       peerConnection.span.AddEvent("gathering complete")
+       return *peerConnection.LocalDescription(), nil
 }
 
 // newOffer creates a transport and returns a WebRTC offer to be announced
-func newOffer() (
+func newOffer(
+       logger log.Logger,
+) (
        peerConnection *wrappedPeerConnection,
-       dataChannel *webrtc.DataChannel,
        offer webrtc.SessionDescription,
        err error,
 ) {
-       peerConnection, err = newPeerConnection()
-       if err != nil {
-               return
-       }
-       dataChannel, err = peerConnection.CreateDataChannel("webrtc-datachannel", nil)
+       peerConnection, err = newPeerConnection(logger)
        if err != nil {
-               peerConnection.Close()
                return
        }
+
+       peerConnection.span.SetAttributes(attribute.String(webrtcConnTypeKey, "offer"))
+
        offer, err = peerConnection.CreateOffer(nil)
        if err != nil {
                peerConnection.Close()
                return
        }
 
-       gatherComplete := webrtc.GatheringCompletePromise(peerConnection.PeerConnection)
-       err = peerConnection.SetLocalDescription(offer)
+       offer, err = setAndGatherLocalDescription(peerConnection, offer)
        if err != nil {
                peerConnection.Close()
-               return
        }
-       <-gatherComplete
-
-       offer = *peerConnection.LocalDescription()
        return
 }
 
@@ -85,6 +114,8 @@ func initAnsweringPeerConnection(
        peerConnection *wrappedPeerConnection,
        offer webrtc.SessionDescription,
 ) (answer webrtc.SessionDescription, err error) {
+       peerConnection.span.SetAttributes(attribute.String(webrtcConnTypeKey, "answer"))
+
        err = peerConnection.SetRemoteDescription(offer)
        if err != nil {
                return
@@ -94,40 +125,30 @@ func initAnsweringPeerConnection(
                return
        }
 
-       gatherComplete := webrtc.GatheringCompletePromise(peerConnection.PeerConnection)
-       err = peerConnection.SetLocalDescription(answer)
-       if err != nil {
-               return
-       }
-       <-gatherComplete
-
-       answer = *peerConnection.LocalDescription()
+       answer, err = setAndGatherLocalDescription(peerConnection, answer)
        return
 }
 
-// newAnsweringPeerConnection creates a transport from a WebRTC offer and and returns a WebRTC answer to be
-// announced.
-func newAnsweringPeerConnection(offer webrtc.SessionDescription) (
+// newAnsweringPeerConnection creates a transport from a WebRTC offer and returns a WebRTC answer to be announced.
+func newAnsweringPeerConnection(
+       logger log.Logger,
+       offer webrtc.SessionDescription,
+) (
        peerConn *wrappedPeerConnection, answer webrtc.SessionDescription, err error,
 ) {
-       peerConn, err = newPeerConnection()
+       peerConn, err = newPeerConnection(logger)
        if err != nil {
                err = fmt.Errorf("failed to create new connection: %w", err)
                return
        }
        answer, err = initAnsweringPeerConnection(peerConn, offer)
        if err != nil {
+               peerConn.span.RecordError(err)
                peerConn.Close()
        }
        return
 }
 
-func (t *outboundOffer) setAnswer(answer webrtc.SessionDescription, onOpen func(datachannel.ReadWriteCloser)) error {
-       setDataChannelOnOpen(t.dataChannel, t.peerConnection, onOpen)
-       err := t.peerConnection.SetRemoteDescription(answer)
-       return err
-}
-
 type datachannelReadWriter interface {
        datachannel.Reader
        datachannel.Writer
@@ -142,16 +163,19 @@ func (me ioCloserFunc) Close() error {
 }
 
 func setDataChannelOnOpen(
+       ctx context.Context,
        dc *webrtc.DataChannel,
        pc *wrappedPeerConnection,
        onOpen func(closer datachannel.ReadWriteCloser),
 ) {
        dc.OnOpen(func() {
+               trace.SpanFromContext(ctx).AddEvent("opened")
                raw, err := dc.Detach()
                if err != nil {
                        // This shouldn't happen if the API is configured correctly, and we call from OnOpen.
                        panic(err)
                }
+               //dc.OnClose()
                onOpen(hookDataChannelCloser(raw, pc))
        })
 }