From: Matt Joiner Date: Tue, 12 Jul 2022 06:05:19 +0000 (+1000) Subject: Add tracing to webtorrent webrtc resources X-Git-Tag: v1.47.0~1^2~8 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=c17018d04e0ce091087943432ac7ea36f27674bc;p=btrtrc.git Add tracing to webtorrent webrtc resources --- diff --git a/go.mod b/go.mod index 8e351f99..9cb0f03e 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/anacrolix/fuse v0.2.0 github.com/anacrolix/generics v0.0.0-20220618083756-f99e35403a60 github.com/anacrolix/go-libutp v1.2.0 - github.com/anacrolix/log v0.13.2-0.20220427063716-a4894bb521c6 + github.com/anacrolix/log v0.13.2-0.20220711050817-613cb738ef30 github.com/anacrolix/missinggo v1.3.0 github.com/anacrolix/missinggo/perf v1.0.0 github.com/anacrolix/missinggo/v2 v2.7.0 @@ -58,6 +58,8 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.2.2 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/go-logr/logr v1.2.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/google/uuid v1.3.0 // indirect github.com/huandu/xstrings v1.3.2 // indirect @@ -86,6 +88,8 @@ require ( github.com/rogpeppe/go-internal v1.8.1 // indirect github.com/rs/dnscache v0.0.0-20211102005908-e0241e321417 // indirect github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46 // indirect + go.opentelemetry.io/otel v1.8.0 // indirect + go.opentelemetry.io/otel/trace v1.8.0 // indirect golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect golang.org/x/exp v0.0.0-20220613132600-b0d781184e0d // indirect golang.org/x/net v0.0.0-20220630215102-69896b714898 // indirect diff --git a/go.sum b/go.sum index b936cbe3..548c202a 100644 --- a/go.sum +++ b/go.sum @@ -93,6 +93,8 @@ github.com/anacrolix/log v0.10.0/go.mod h1:s5yBP/j046fm9odtUTbHOfDUq/zh1W8OkPpJt github.com/anacrolix/log v0.10.1-0.20220123034749-3920702c17f8/go.mod h1:GmnE2c0nvz8pOIPUSC9Rawgefy1sDXqposC2wgtBZE4= github.com/anacrolix/log v0.13.2-0.20220427063716-a4894bb521c6 h1:WH/Xcok0GpNID/NUV80CfTwUYXdbhR3pX/DXboxGhNI= github.com/anacrolix/log v0.13.2-0.20220427063716-a4894bb521c6/go.mod h1:D4+CvN8SnruK6zIFS/xPoRJmtvtnxs+CSfDQ+BFxZ68= +github.com/anacrolix/log v0.13.2-0.20220711050817-613cb738ef30 h1:bAgFzUxN1K3U8KwOzqCOhiygOr5NqYO3kNlV9tvp2Rc= +github.com/anacrolix/log v0.13.2-0.20220711050817-613cb738ef30/go.mod h1:D4+CvN8SnruK6zIFS/xPoRJmtvtnxs+CSfDQ+BFxZ68= github.com/anacrolix/lsan v0.0.0-20211126052245-807000409a62 h1:P04VG6Td13FHMgS5ZBcJX23NPC/fiC4cp9bXwYujdYM= github.com/anacrolix/lsan v0.0.0-20211126052245-807000409a62/go.mod h1:66cFKPCO7Sl4vbFnAaSq7e4OXtdMhRSBagJGWgmpJbM= github.com/anacrolix/missinggo v0.0.0-20180725070939-60ef2fbf63df/go.mod h1:kwGiTUTZ0+p4vAz3VbAI5a30t2YbvemcmspjKwrAz5s= @@ -202,6 +204,11 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= +github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= @@ -474,6 +481,10 @@ go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= +go.opentelemetry.io/otel v1.8.0 h1:zcvBFizPbpa1q7FehvFiHbQwGzmPILebO0tyqIR5Djg= +go.opentelemetry.io/otel v1.8.0/go.mod h1:2pkj+iMj0o03Y+cW6/m8Y4WkRdYN3AvCXCnzRMp9yvM= +go.opentelemetry.io/otel/trace v1.8.0 h1:cSy0DF9eGI5WIfNwZ1q2iUyGj00tGzP24dE1lOlHrfY= +go.opentelemetry.io/otel/trace v1.8.0/go.mod h1:0Bt3PXY8w+3pheS3hQUt+wow8b1ojPaTBoTCh2zIFI4= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= diff --git a/webtorrent/otel.go b/webtorrent/otel.go new file mode 100644 index 00000000..3c40b32a --- /dev/null +++ b/webtorrent/otel.go @@ -0,0 +1,21 @@ +package webtorrent + +import ( + "context" + "github.com/pion/webrtc/v3" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" +) + +const ( + tracerName = "anacrolix.torrent.webtorrent" + webrtcConnTypeKey = "webtorrent.webrtc.conn.type" +) + +func dataChannelStarted(peerConnectionCtx context.Context, dc *webrtc.DataChannel) (dataChannelCtx context.Context, span trace.Span) { + dataChannelCtx, span = otel.Tracer(tracerName).Start(peerConnectionCtx, "DataChannel") + dc.OnClose(func() { + span.End() + }) + return +} diff --git a/webtorrent/tracker-client.go b/webtorrent/tracker-client.go index 3b8c6a6b..95d87ff4 100644 --- a/webtorrent/tracker-client.go +++ b/webtorrent/tracker-client.go @@ -1,9 +1,12 @@ package webtorrent import ( + "context" "crypto/rand" "encoding/json" "fmt" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" "sync" "time" @@ -53,7 +56,6 @@ func (me *TrackerClient) peerIdBinary() string { type outboundOffer struct { originalOffer webrtc.SessionDescription peerConnection *wrappedPeerConnection - dataChannel *webrtc.DataChannel infoHash [20]byte } @@ -65,6 +67,8 @@ type DataChannelContext struct { InfoHash [20]byte // This is private as some methods might not be appropriate with data channel context. peerConnection *wrappedPeerConnection + span trace.Span + ctx context.Context } func (me *DataChannelContext) GetSelectedIceCandidatePair() (*webrtc.ICECandidatePair, error) { @@ -207,7 +211,7 @@ func (tc *TrackerClient) Announce(event tracker.AnnounceEvent, infoHash [20]byte } offerIDBinary := binaryToJsonString(randOfferId[:]) - pc, dc, offer, err := newOffer() + pc, offer, err := newOffer(tc.Logger) if err != nil { return fmt.Errorf("creating offer: %w", err) } @@ -251,7 +255,6 @@ func (tc *TrackerClient) Announce(event tracker.AnnounceEvent, infoHash [20]byte } tc.outboundOffers[offerIDBinary] = outboundOffer{ peerConnection: pc, - dataChannel: dc, originalOffer: offer, infoHash: infoHash, } @@ -301,7 +304,7 @@ func (tc *TrackerClient) handleOffer( infoHash [20]byte, peerId string, ) error { - peerConnection, answer, err := newAnsweringPeerConnection(offer) + peerConnection, answer, err := newAnsweringPeerConnection(tc.Logger, offer) if err != nil { return fmt.Errorf("write AnnounceResponse: %w", err) } @@ -325,11 +328,13 @@ func (tc *TrackerClient) handleOffer( return fmt.Errorf("writing response: %w", err) } timer := time.AfterFunc(30*time.Second, func() { + peerConnection.span.SetStatus(codes.Error, "answer timeout") metrics.Add("answering peer connections timed out", 1) peerConnection.Close() }) peerConnection.OnDataChannel(func(d *webrtc.DataChannel) { - setDataChannelOnOpen(d, peerConnection, func(dc datachannel.ReadWriteCloser) { + ctx, span := dataChannelStarted(peerConnection.ctx, d) + setDataChannelOnOpen(ctx, d, peerConnection, func(dc datachannel.ReadWriteCloser) { timer.Stop() metrics.Add("answering peer connection conversions", 1) tc.mu.Lock() @@ -342,6 +347,8 @@ func (tc *TrackerClient) handleOffer( LocalOffered: false, InfoHash: infoHash, peerConnection: peerConnection, + ctx: ctx, + span: span, }) }) }) @@ -358,24 +365,44 @@ func (tc *TrackerClient) handleAnswer(offerId string, answer webrtc.SessionDescr } // tc.Logger.WithDefaultLevel(log.Debug).Printf("offer %q got answer %v", offerId, answer) metrics.Add("outbound offers answered", 1) - err := offer.setAnswer(answer, func(dc datachannel.ReadWriteCloser) { - metrics.Add("outbound offers answered with datachannel", 1) - tc.mu.Lock() - tc.stats.ConvertedOutboundConns++ - tc.mu.Unlock() - tc.OnConn(dc, DataChannelContext{ - Local: offer.originalOffer, - Remote: answer, - OfferId: offerId, - LocalOffered: true, - InfoHash: offer.infoHash, - peerConnection: offer.peerConnection, + // Why do we create the data channel before setting the remote description? Are we trying to avoid the peer + // initiating? + dataChannel, err := offer.peerConnection.CreateDataChannel("webrtc-datachannel", nil) + if err != nil { + err = fmt.Errorf("creating data channel: %w", err) + tc.Logger.LevelPrint(log.Error, err) + offer.peerConnection.span.RecordError(err) + offer.peerConnection.Close() + goto deleteOffer + } + { + ctx, span := dataChannelStarted(offer.peerConnection.ctx, dataChannel) + setDataChannelOnOpen(ctx, dataChannel, offer.peerConnection, func(dc datachannel.ReadWriteCloser) { + metrics.Add("outbound offers answered with datachannel", 1) + tc.mu.Lock() + tc.stats.ConvertedOutboundConns++ + tc.mu.Unlock() + tc.OnConn(dc, DataChannelContext{ + Local: offer.originalOffer, + Remote: answer, + OfferId: offerId, + LocalOffered: true, + InfoHash: offer.infoHash, + peerConnection: offer.peerConnection, + ctx: ctx, + span: span, + }) }) - }) + } + err = offer.peerConnection.SetRemoteDescription(answer) if err != nil { - tc.Logger.WithDefaultLevel(log.Warning).Printf("error using outbound offer answer: %v", err) + err = fmt.Errorf("using outbound offer answer: %w", err) + offer.peerConnection.span.RecordError(err) + dataChannel.Close() + tc.Logger.WithDefaultLevel(log.Error).Print(err) return } +deleteOffer: delete(tc.outboundOffers, offerId) go tc.Announce(tracker.None, offer.infoHash) } diff --git a/webtorrent/transport.go b/webtorrent/transport.go index e4c3b73d..9cff003e 100644 --- a/webtorrent/transport.go +++ b/webtorrent/transport.go @@ -1,14 +1,20 @@ package webtorrent import ( + "context" "expvar" "fmt" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" "io" "sync" + "github.com/anacrolix/log" "github.com/anacrolix/missinggo/v2/pproffd" "github.com/pion/datachannel" "github.com/pion/webrtc/v3" + "go.opentelemetry.io/otel" ) var ( @@ -26,58 +32,81 @@ type wrappedPeerConnection struct { *webrtc.PeerConnection closeMu sync.Mutex pproffd.CloseWrapper + span trace.Span + ctx context.Context } func (me *wrappedPeerConnection) Close() error { me.closeMu.Lock() defer me.closeMu.Unlock() - return me.CloseWrapper.Close() + err := me.CloseWrapper.Close() + me.span.End() + return err } -func newPeerConnection() (*wrappedPeerConnection, error) { +func newPeerConnection(logger log.Logger) (*wrappedPeerConnection, error) { newPeerConnectionMu.Lock() defer newPeerConnectionMu.Unlock() + ctx, span := otel.Tracer(tracerName).Start(context.Background(), "PeerConnection") pc, err := api.NewPeerConnection(config) if err != nil { + span.SetStatus(codes.Error, err.Error()) + span.RecordError(err) + span.End() return nil, err } - return &wrappedPeerConnection{ + wpc := &wrappedPeerConnection{ PeerConnection: pc, CloseWrapper: pproffd.NewCloseWrapper(pc), - }, nil + ctx: ctx, + span: span, + } + // If the state change handler intends to call Close, it should call it on the wrapper. + wpc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { + logger.Levelf(log.Warning, "webrtc PeerConnection state changed to %v", state) + span.AddEvent("connection state changed", trace.WithAttributes(attribute.String("state", state.String()))) + }) + return wpc, nil +} + +func setAndGatherLocalDescription(peerConnection *wrappedPeerConnection, sdp webrtc.SessionDescription) (_ webrtc.SessionDescription, err error) { + gatherComplete := webrtc.GatheringCompletePromise(peerConnection.PeerConnection) + peerConnection.span.AddEvent("setting local description") + err = peerConnection.SetLocalDescription(sdp) + if err != nil { + err = fmt.Errorf("setting local description: %w", err) + return + } + <-gatherComplete + peerConnection.span.AddEvent("gathering complete") + return *peerConnection.LocalDescription(), nil } // newOffer creates a transport and returns a WebRTC offer to be announced -func newOffer() ( +func newOffer( + logger log.Logger, +) ( peerConnection *wrappedPeerConnection, - dataChannel *webrtc.DataChannel, offer webrtc.SessionDescription, err error, ) { - peerConnection, err = newPeerConnection() - if err != nil { - return - } - dataChannel, err = peerConnection.CreateDataChannel("webrtc-datachannel", nil) + peerConnection, err = newPeerConnection(logger) if err != nil { - peerConnection.Close() return } + + peerConnection.span.SetAttributes(attribute.String(webrtcConnTypeKey, "offer")) + offer, err = peerConnection.CreateOffer(nil) if err != nil { peerConnection.Close() return } - gatherComplete := webrtc.GatheringCompletePromise(peerConnection.PeerConnection) - err = peerConnection.SetLocalDescription(offer) + offer, err = setAndGatherLocalDescription(peerConnection, offer) if err != nil { peerConnection.Close() - return } - <-gatherComplete - - offer = *peerConnection.LocalDescription() return } @@ -85,6 +114,8 @@ func initAnsweringPeerConnection( peerConnection *wrappedPeerConnection, offer webrtc.SessionDescription, ) (answer webrtc.SessionDescription, err error) { + peerConnection.span.SetAttributes(attribute.String(webrtcConnTypeKey, "answer")) + err = peerConnection.SetRemoteDescription(offer) if err != nil { return @@ -94,40 +125,30 @@ func initAnsweringPeerConnection( return } - gatherComplete := webrtc.GatheringCompletePromise(peerConnection.PeerConnection) - err = peerConnection.SetLocalDescription(answer) - if err != nil { - return - } - <-gatherComplete - - answer = *peerConnection.LocalDescription() + answer, err = setAndGatherLocalDescription(peerConnection, answer) return } -// newAnsweringPeerConnection creates a transport from a WebRTC offer and and returns a WebRTC answer to be -// announced. -func newAnsweringPeerConnection(offer webrtc.SessionDescription) ( +// newAnsweringPeerConnection creates a transport from a WebRTC offer and returns a WebRTC answer to be announced. +func newAnsweringPeerConnection( + logger log.Logger, + offer webrtc.SessionDescription, +) ( peerConn *wrappedPeerConnection, answer webrtc.SessionDescription, err error, ) { - peerConn, err = newPeerConnection() + peerConn, err = newPeerConnection(logger) if err != nil { err = fmt.Errorf("failed to create new connection: %w", err) return } answer, err = initAnsweringPeerConnection(peerConn, offer) if err != nil { + peerConn.span.RecordError(err) peerConn.Close() } return } -func (t *outboundOffer) setAnswer(answer webrtc.SessionDescription, onOpen func(datachannel.ReadWriteCloser)) error { - setDataChannelOnOpen(t.dataChannel, t.peerConnection, onOpen) - err := t.peerConnection.SetRemoteDescription(answer) - return err -} - type datachannelReadWriter interface { datachannel.Reader datachannel.Writer @@ -142,16 +163,19 @@ func (me ioCloserFunc) Close() error { } func setDataChannelOnOpen( + ctx context.Context, dc *webrtc.DataChannel, pc *wrappedPeerConnection, onOpen func(closer datachannel.ReadWriteCloser), ) { dc.OnOpen(func() { + trace.SpanFromContext(ctx).AddEvent("opened") raw, err := dc.Detach() if err != nil { // This shouldn't happen if the API is configured correctly, and we call from OnOpen. panic(err) } + //dc.OnClose() onOpen(hookDataChannelCloser(raw, pc)) }) }