github.com/anacrolix/fuse v0.2.0
github.com/anacrolix/generics v0.0.0-20220618083756-f99e35403a60
github.com/anacrolix/go-libutp v1.2.0
- github.com/anacrolix/log v0.13.2-0.20220427063716-a4894bb521c6
+ github.com/anacrolix/log v0.13.2-0.20220711050817-613cb738ef30
github.com/anacrolix/missinggo v1.3.0
github.com/anacrolix/missinggo/perf v1.0.0
github.com/anacrolix/missinggo/v2 v2.7.0
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.2.2 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
+ github.com/go-logr/logr v1.2.3 // indirect
+ github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/huandu/xstrings v1.3.2 // indirect
github.com/rogpeppe/go-internal v1.8.1 // indirect
github.com/rs/dnscache v0.0.0-20211102005908-e0241e321417 // indirect
github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46 // indirect
+ go.opentelemetry.io/otel v1.8.0 // indirect
+ go.opentelemetry.io/otel/trace v1.8.0 // indirect
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect
golang.org/x/exp v0.0.0-20220613132600-b0d781184e0d // indirect
golang.org/x/net v0.0.0-20220630215102-69896b714898 // indirect
github.com/anacrolix/log v0.10.1-0.20220123034749-3920702c17f8/go.mod h1:GmnE2c0nvz8pOIPUSC9Rawgefy1sDXqposC2wgtBZE4=
github.com/anacrolix/log v0.13.2-0.20220427063716-a4894bb521c6 h1:WH/Xcok0GpNID/NUV80CfTwUYXdbhR3pX/DXboxGhNI=
github.com/anacrolix/log v0.13.2-0.20220427063716-a4894bb521c6/go.mod h1:D4+CvN8SnruK6zIFS/xPoRJmtvtnxs+CSfDQ+BFxZ68=
+github.com/anacrolix/log v0.13.2-0.20220711050817-613cb738ef30 h1:bAgFzUxN1K3U8KwOzqCOhiygOr5NqYO3kNlV9tvp2Rc=
+github.com/anacrolix/log v0.13.2-0.20220711050817-613cb738ef30/go.mod h1:D4+CvN8SnruK6zIFS/xPoRJmtvtnxs+CSfDQ+BFxZ68=
github.com/anacrolix/lsan v0.0.0-20211126052245-807000409a62 h1:P04VG6Td13FHMgS5ZBcJX23NPC/fiC4cp9bXwYujdYM=
github.com/anacrolix/lsan v0.0.0-20211126052245-807000409a62/go.mod h1:66cFKPCO7Sl4vbFnAaSq7e4OXtdMhRSBagJGWgmpJbM=
github.com/anacrolix/missinggo v0.0.0-20180725070939-60ef2fbf63df/go.mod h1:kwGiTUTZ0+p4vAz3VbAI5a30t2YbvemcmspjKwrAz5s=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
+github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
+github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
+github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
+github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
+github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
+go.opentelemetry.io/otel v1.8.0 h1:zcvBFizPbpa1q7FehvFiHbQwGzmPILebO0tyqIR5Djg=
+go.opentelemetry.io/otel v1.8.0/go.mod h1:2pkj+iMj0o03Y+cW6/m8Y4WkRdYN3AvCXCnzRMp9yvM=
+go.opentelemetry.io/otel/trace v1.8.0 h1:cSy0DF9eGI5WIfNwZ1q2iUyGj00tGzP24dE1lOlHrfY=
+go.opentelemetry.io/otel/trace v1.8.0/go.mod h1:0Bt3PXY8w+3pheS3hQUt+wow8b1ojPaTBoTCh2zIFI4=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
--- /dev/null
+package webtorrent
+
+import (
+ "context"
+ "github.com/pion/webrtc/v3"
+ "go.opentelemetry.io/otel"
+ "go.opentelemetry.io/otel/trace"
+)
+
+const (
+ tracerName = "anacrolix.torrent.webtorrent"
+ webrtcConnTypeKey = "webtorrent.webrtc.conn.type"
+)
+
+func dataChannelStarted(peerConnectionCtx context.Context, dc *webrtc.DataChannel) (dataChannelCtx context.Context, span trace.Span) {
+ dataChannelCtx, span = otel.Tracer(tracerName).Start(peerConnectionCtx, "DataChannel")
+ dc.OnClose(func() {
+ span.End()
+ })
+ return
+}
package webtorrent
import (
+ "context"
"crypto/rand"
"encoding/json"
"fmt"
+ "go.opentelemetry.io/otel/codes"
+ "go.opentelemetry.io/otel/trace"
"sync"
"time"
type outboundOffer struct {
originalOffer webrtc.SessionDescription
peerConnection *wrappedPeerConnection
- dataChannel *webrtc.DataChannel
infoHash [20]byte
}
InfoHash [20]byte
// This is private as some methods might not be appropriate with data channel context.
peerConnection *wrappedPeerConnection
+ span trace.Span
+ ctx context.Context
}
func (me *DataChannelContext) GetSelectedIceCandidatePair() (*webrtc.ICECandidatePair, error) {
}
offerIDBinary := binaryToJsonString(randOfferId[:])
- pc, dc, offer, err := newOffer()
+ pc, offer, err := newOffer(tc.Logger)
if err != nil {
return fmt.Errorf("creating offer: %w", err)
}
}
tc.outboundOffers[offerIDBinary] = outboundOffer{
peerConnection: pc,
- dataChannel: dc,
originalOffer: offer,
infoHash: infoHash,
}
infoHash [20]byte,
peerId string,
) error {
- peerConnection, answer, err := newAnsweringPeerConnection(offer)
+ peerConnection, answer, err := newAnsweringPeerConnection(tc.Logger, offer)
if err != nil {
return fmt.Errorf("write AnnounceResponse: %w", err)
}
return fmt.Errorf("writing response: %w", err)
}
timer := time.AfterFunc(30*time.Second, func() {
+ peerConnection.span.SetStatus(codes.Error, "answer timeout")
metrics.Add("answering peer connections timed out", 1)
peerConnection.Close()
})
peerConnection.OnDataChannel(func(d *webrtc.DataChannel) {
- setDataChannelOnOpen(d, peerConnection, func(dc datachannel.ReadWriteCloser) {
+ ctx, span := dataChannelStarted(peerConnection.ctx, d)
+ setDataChannelOnOpen(ctx, d, peerConnection, func(dc datachannel.ReadWriteCloser) {
timer.Stop()
metrics.Add("answering peer connection conversions", 1)
tc.mu.Lock()
LocalOffered: false,
InfoHash: infoHash,
peerConnection: peerConnection,
+ ctx: ctx,
+ span: span,
})
})
})
}
// tc.Logger.WithDefaultLevel(log.Debug).Printf("offer %q got answer %v", offerId, answer)
metrics.Add("outbound offers answered", 1)
- err := offer.setAnswer(answer, func(dc datachannel.ReadWriteCloser) {
- metrics.Add("outbound offers answered with datachannel", 1)
- tc.mu.Lock()
- tc.stats.ConvertedOutboundConns++
- tc.mu.Unlock()
- tc.OnConn(dc, DataChannelContext{
- Local: offer.originalOffer,
- Remote: answer,
- OfferId: offerId,
- LocalOffered: true,
- InfoHash: offer.infoHash,
- peerConnection: offer.peerConnection,
+ // Why do we create the data channel before setting the remote description? Are we trying to avoid the peer
+ // initiating?
+ dataChannel, err := offer.peerConnection.CreateDataChannel("webrtc-datachannel", nil)
+ if err != nil {
+ err = fmt.Errorf("creating data channel: %w", err)
+ tc.Logger.LevelPrint(log.Error, err)
+ offer.peerConnection.span.RecordError(err)
+ offer.peerConnection.Close()
+ goto deleteOffer
+ }
+ {
+ ctx, span := dataChannelStarted(offer.peerConnection.ctx, dataChannel)
+ setDataChannelOnOpen(ctx, dataChannel, offer.peerConnection, func(dc datachannel.ReadWriteCloser) {
+ metrics.Add("outbound offers answered with datachannel", 1)
+ tc.mu.Lock()
+ tc.stats.ConvertedOutboundConns++
+ tc.mu.Unlock()
+ tc.OnConn(dc, DataChannelContext{
+ Local: offer.originalOffer,
+ Remote: answer,
+ OfferId: offerId,
+ LocalOffered: true,
+ InfoHash: offer.infoHash,
+ peerConnection: offer.peerConnection,
+ ctx: ctx,
+ span: span,
+ })
})
- })
+ }
+ err = offer.peerConnection.SetRemoteDescription(answer)
if err != nil {
- tc.Logger.WithDefaultLevel(log.Warning).Printf("error using outbound offer answer: %v", err)
+ err = fmt.Errorf("using outbound offer answer: %w", err)
+ offer.peerConnection.span.RecordError(err)
+ dataChannel.Close()
+ tc.Logger.WithDefaultLevel(log.Error).Print(err)
return
}
+deleteOffer:
delete(tc.outboundOffers, offerId)
go tc.Announce(tracker.None, offer.infoHash)
}
package webtorrent
import (
+ "context"
"expvar"
"fmt"
+ "go.opentelemetry.io/otel/attribute"
+ "go.opentelemetry.io/otel/codes"
+ "go.opentelemetry.io/otel/trace"
"io"
"sync"
+ "github.com/anacrolix/log"
"github.com/anacrolix/missinggo/v2/pproffd"
"github.com/pion/datachannel"
"github.com/pion/webrtc/v3"
+ "go.opentelemetry.io/otel"
)
var (
*webrtc.PeerConnection
closeMu sync.Mutex
pproffd.CloseWrapper
+ span trace.Span
+ ctx context.Context
}
func (me *wrappedPeerConnection) Close() error {
me.closeMu.Lock()
defer me.closeMu.Unlock()
- return me.CloseWrapper.Close()
+ err := me.CloseWrapper.Close()
+ me.span.End()
+ return err
}
-func newPeerConnection() (*wrappedPeerConnection, error) {
+func newPeerConnection(logger log.Logger) (*wrappedPeerConnection, error) {
newPeerConnectionMu.Lock()
defer newPeerConnectionMu.Unlock()
+ ctx, span := otel.Tracer(tracerName).Start(context.Background(), "PeerConnection")
pc, err := api.NewPeerConnection(config)
if err != nil {
+ span.SetStatus(codes.Error, err.Error())
+ span.RecordError(err)
+ span.End()
return nil, err
}
- return &wrappedPeerConnection{
+ wpc := &wrappedPeerConnection{
PeerConnection: pc,
CloseWrapper: pproffd.NewCloseWrapper(pc),
- }, nil
+ ctx: ctx,
+ span: span,
+ }
+ // If the state change handler intends to call Close, it should call it on the wrapper.
+ wpc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
+ logger.Levelf(log.Warning, "webrtc PeerConnection state changed to %v", state)
+ span.AddEvent("connection state changed", trace.WithAttributes(attribute.String("state", state.String())))
+ })
+ return wpc, nil
+}
+
+func setAndGatherLocalDescription(peerConnection *wrappedPeerConnection, sdp webrtc.SessionDescription) (_ webrtc.SessionDescription, err error) {
+ gatherComplete := webrtc.GatheringCompletePromise(peerConnection.PeerConnection)
+ peerConnection.span.AddEvent("setting local description")
+ err = peerConnection.SetLocalDescription(sdp)
+ if err != nil {
+ err = fmt.Errorf("setting local description: %w", err)
+ return
+ }
+ <-gatherComplete
+ peerConnection.span.AddEvent("gathering complete")
+ return *peerConnection.LocalDescription(), nil
}
// newOffer creates a transport and returns a WebRTC offer to be announced
-func newOffer() (
+func newOffer(
+ logger log.Logger,
+) (
peerConnection *wrappedPeerConnection,
- dataChannel *webrtc.DataChannel,
offer webrtc.SessionDescription,
err error,
) {
- peerConnection, err = newPeerConnection()
- if err != nil {
- return
- }
- dataChannel, err = peerConnection.CreateDataChannel("webrtc-datachannel", nil)
+ peerConnection, err = newPeerConnection(logger)
if err != nil {
- peerConnection.Close()
return
}
+
+ peerConnection.span.SetAttributes(attribute.String(webrtcConnTypeKey, "offer"))
+
offer, err = peerConnection.CreateOffer(nil)
if err != nil {
peerConnection.Close()
return
}
- gatherComplete := webrtc.GatheringCompletePromise(peerConnection.PeerConnection)
- err = peerConnection.SetLocalDescription(offer)
+ offer, err = setAndGatherLocalDescription(peerConnection, offer)
if err != nil {
peerConnection.Close()
- return
}
- <-gatherComplete
-
- offer = *peerConnection.LocalDescription()
return
}
peerConnection *wrappedPeerConnection,
offer webrtc.SessionDescription,
) (answer webrtc.SessionDescription, err error) {
+ peerConnection.span.SetAttributes(attribute.String(webrtcConnTypeKey, "answer"))
+
err = peerConnection.SetRemoteDescription(offer)
if err != nil {
return
return
}
- gatherComplete := webrtc.GatheringCompletePromise(peerConnection.PeerConnection)
- err = peerConnection.SetLocalDescription(answer)
- if err != nil {
- return
- }
- <-gatherComplete
-
- answer = *peerConnection.LocalDescription()
+ answer, err = setAndGatherLocalDescription(peerConnection, answer)
return
}
-// newAnsweringPeerConnection creates a transport from a WebRTC offer and and returns a WebRTC answer to be
-// announced.
-func newAnsweringPeerConnection(offer webrtc.SessionDescription) (
+// newAnsweringPeerConnection creates a transport from a WebRTC offer and returns a WebRTC answer to be announced.
+func newAnsweringPeerConnection(
+ logger log.Logger,
+ offer webrtc.SessionDescription,
+) (
peerConn *wrappedPeerConnection, answer webrtc.SessionDescription, err error,
) {
- peerConn, err = newPeerConnection()
+ peerConn, err = newPeerConnection(logger)
if err != nil {
err = fmt.Errorf("failed to create new connection: %w", err)
return
}
answer, err = initAnsweringPeerConnection(peerConn, offer)
if err != nil {
+ peerConn.span.RecordError(err)
peerConn.Close()
}
return
}
-func (t *outboundOffer) setAnswer(answer webrtc.SessionDescription, onOpen func(datachannel.ReadWriteCloser)) error {
- setDataChannelOnOpen(t.dataChannel, t.peerConnection, onOpen)
- err := t.peerConnection.SetRemoteDescription(answer)
- return err
-}
-
type datachannelReadWriter interface {
datachannel.Reader
datachannel.Writer
}
func setDataChannelOnOpen(
+ ctx context.Context,
dc *webrtc.DataChannel,
pc *wrappedPeerConnection,
onOpen func(closer datachannel.ReadWriteCloser),
) {
dc.OnOpen(func() {
+ trace.SpanFromContext(ctx).AddEvent("opened")
raw, err := dc.Detach()
if err != nil {
// This shouldn't happen if the API is configured correctly, and we call from OnOpen.
panic(err)
}
+ //dc.OnClose()
onOpen(hookDataChannelCloser(raw, pc))
})
}