"context"
"expvar"
"fmt"
- "go.opentelemetry.io/otel/attribute"
- "go.opentelemetry.io/otel/codes"
- "go.opentelemetry.io/otel/trace"
"io"
"sync"
+ "time"
"github.com/anacrolix/log"
"github.com/anacrolix/missinggo/v2/pproffd"
"github.com/pion/datachannel"
"github.com/pion/webrtc/v3"
"go.opentelemetry.io/otel"
+ "go.opentelemetry.io/otel/attribute"
+ "go.opentelemetry.io/otel/codes"
+ "go.opentelemetry.io/otel/trace"
+)
+
+const (
+ dataChannelLabel = "webrtc-datachannel"
)
var (
s.DetachDataChannels()
return webrtc.NewAPI(webrtc.WithSettingEngine(s))
}()
- config = webrtc.Configuration{ICEServers: []webrtc.ICEServer{{URLs: []string{"stun:stun.l.google.com:19302"}}}}
newPeerConnectionMu sync.Mutex
)
return err
}
-func newPeerConnection(logger log.Logger) (*wrappedPeerConnection, error) {
+func newPeerConnection(logger log.Logger, iceServers []string) (*wrappedPeerConnection, error) {
newPeerConnectionMu.Lock()
defer newPeerConnectionMu.Unlock()
ctx, span := otel.Tracer(tracerName).Start(context.Background(), "PeerConnection")
- pc, err := api.NewPeerConnection(config)
+
+ pcConfig := webrtc.Configuration{ICEServers: []webrtc.ICEServer{{URLs: iceServers}}}
+
+ pc, err := api.NewPeerConnection(pcConfig)
if err != nil {
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
return *peerConnection.LocalDescription(), nil
}
-// newOffer creates a transport and returns a WebRTC offer to be announced
-func newOffer(
+// newOffer creates a transport and returns a WebRTC offer to be announced. See
+// https://github.com/pion/webrtc/blob/master/examples/data-channels/jsfiddle/main.go for what this is modelled on.
+func (tc *TrackerClient) newOffer(
logger log.Logger,
+ offerId string,
+ infoHash [20]byte,
) (
peerConnection *wrappedPeerConnection,
+ dataChannel *webrtc.DataChannel,
offer webrtc.SessionDescription,
err error,
) {
- peerConnection, err = newPeerConnection(logger)
+ peerConnection, err = newPeerConnection(logger, tc.ICEServers)
if err != nil {
return
}
peerConnection.span.SetAttributes(attribute.String(webrtcConnTypeKey, "offer"))
+ dataChannel, err = peerConnection.CreateDataChannel(dataChannelLabel, nil)
+ if err != nil {
+ err = fmt.Errorf("creating data channel: %w", err)
+ peerConnection.Close()
+ }
+ initDataChannel(dataChannel, peerConnection, func(dc datachannel.ReadWriteCloser, dcCtx context.Context, dcSpan trace.Span) {
+ metrics.Add("outbound offers answered with datachannel", 1)
+ tc.mu.Lock()
+ tc.stats.ConvertedOutboundConns++
+ tc.mu.Unlock()
+ tc.OnConn(dc, DataChannelContext{
+ OfferId: offerId,
+ LocalOffered: true,
+ InfoHash: infoHash,
+ peerConnection: peerConnection,
+ Context: dcCtx,
+ Span: dcSpan,
+ })
+ })
+
offer, err = peerConnection.CreateOffer(nil)
if err != nil {
+ dataChannel.Close()
peerConnection.Close()
return
}
offer, err = setAndGatherLocalDescription(peerConnection, offer)
if err != nil {
+ dataChannel.Close()
peerConnection.Close()
}
return
}
-func initAnsweringPeerConnection(
- peerConnection *wrappedPeerConnection,
- offer webrtc.SessionDescription,
+type onDetachedDataChannelFunc func(detached datachannel.ReadWriteCloser, ctx context.Context, span trace.Span)
+
+func (tc *TrackerClient) initAnsweringPeerConnection(
+ peerConn *wrappedPeerConnection,
+ offerContext offerContext,
) (answer webrtc.SessionDescription, err error) {
- peerConnection.span.SetAttributes(attribute.String(webrtcConnTypeKey, "answer"))
+ peerConn.span.SetAttributes(attribute.String(webrtcConnTypeKey, "answer"))
- err = peerConnection.SetRemoteDescription(offer)
+ timer := time.AfterFunc(30*time.Second, func() {
+ peerConn.span.SetStatus(codes.Error, "answer timeout")
+ metrics.Add("answering peer connections timed out", 1)
+ peerConn.Close()
+ })
+ peerConn.OnDataChannel(func(d *webrtc.DataChannel) {
+ initDataChannel(d, peerConn, func(detached datachannel.ReadWriteCloser, ctx context.Context, span trace.Span) {
+ timer.Stop()
+ metrics.Add("answering peer connection conversions", 1)
+ tc.mu.Lock()
+ tc.stats.ConvertedInboundConns++
+ tc.mu.Unlock()
+ tc.OnConn(detached, DataChannelContext{
+ OfferId: offerContext.Id,
+ LocalOffered: false,
+ InfoHash: offerContext.InfoHash,
+ peerConnection: peerConn,
+ Context: ctx,
+ Span: span,
+ })
+ })
+ })
+
+ err = peerConn.SetRemoteDescription(offerContext.SessDesc)
if err != nil {
return
}
- answer, err = peerConnection.CreateAnswer(nil)
+ answer, err = peerConn.CreateAnswer(nil)
if err != nil {
return
}
- answer, err = setAndGatherLocalDescription(peerConnection, answer)
+ answer, err = setAndGatherLocalDescription(peerConn, answer)
return
}
// newAnsweringPeerConnection creates a transport from a WebRTC offer and returns a WebRTC answer to be announced.
-func newAnsweringPeerConnection(
- logger log.Logger,
- offer webrtc.SessionDescription,
+func (tc *TrackerClient) newAnsweringPeerConnection(
+ offerContext offerContext,
) (
peerConn *wrappedPeerConnection, answer webrtc.SessionDescription, err error,
) {
- peerConn, err = newPeerConnection(logger)
+ peerConn, err = newPeerConnection(tc.Logger, tc.ICEServers)
if err != nil {
err = fmt.Errorf("failed to create new connection: %w", err)
return
}
- answer, err = initAnsweringPeerConnection(peerConn, offer)
+ answer, err = tc.initAnsweringPeerConnection(peerConn, offerContext)
if err != nil {
peerConn.span.RecordError(err)
peerConn.Close()
return me()
}
-func setDataChannelOnOpen(
- ctx context.Context,
+func initDataChannel(
dc *webrtc.DataChannel,
pc *wrappedPeerConnection,
- onOpen func(closer datachannel.ReadWriteCloser),
+ onOpen onDetachedDataChannelFunc,
) {
+ var span trace.Span
+ dc.OnClose(func() {
+ span.End()
+ })
dc.OnOpen(func() {
- dataChannelSpan := trace.SpanFromContext(ctx)
- dataChannelSpan.AddEvent("opened")
+ pc.span.AddEvent("data channel opened")
+ var ctx context.Context
+ ctx, span = otel.Tracer(tracerName).Start(pc.ctx, "DataChannel")
raw, err := dc.Detach()
if err != nil {
// This shouldn't happen if the API is configured correctly, and we call from OnOpen.
panic(err)
}
- //dc.OnClose()
- onOpen(hookDataChannelCloser(raw, pc, dataChannelSpan))
+ onOpen(hookDataChannelCloser(raw, pc, span, dc), ctx, span)
})
}
// Hooks the datachannel's Close to Close the owning PeerConnection. The datachannel takes ownership
// and responsibility for the PeerConnection.
-func hookDataChannelCloser(dcrwc datachannel.ReadWriteCloser, pc *wrappedPeerConnection, dataChannelSpan trace.Span) datachannel.ReadWriteCloser {
+func hookDataChannelCloser(
+ dcrwc datachannel.ReadWriteCloser,
+ pc *wrappedPeerConnection,
+ dataChannelSpan trace.Span,
+ originalDataChannel *webrtc.DataChannel,
+) datachannel.ReadWriteCloser {
return struct {
datachannelReadWriter
io.Closer
ioCloserFunc(func() error {
dcrwc.Close()
pc.Close()
+ originalDataChannel.Close()
dataChannelSpan.End()
return nil
}),