11 "github.com/anacrolix/log"
12 "github.com/anacrolix/missinggo/v2/pproffd"
13 "github.com/pion/datachannel"
14 "github.com/pion/webrtc/v3"
15 "go.opentelemetry.io/otel"
16 "go.opentelemetry.io/otel/attribute"
17 "go.opentelemetry.io/otel/codes"
18 "go.opentelemetry.io/otel/trace"
22 dataChannelLabel = "webrtc-datachannel"
26 metrics = expvar.NewMap("webtorrent")
27 api = func() *webrtc.API {
28 // Enable the detach API (since it's non-standard but more idiomatic).
29 s.DetachDataChannels()
30 return webrtc.NewAPI(webrtc.WithSettingEngine(s))
32 newPeerConnectionMu sync.Mutex
35 type wrappedPeerConnection struct {
36 *webrtc.PeerConnection
43 func (me *wrappedPeerConnection) Close() error {
45 defer me.closeMu.Unlock()
46 err := me.CloseWrapper.Close()
51 func newPeerConnection(logger log.Logger, iceServers []string) (*wrappedPeerConnection, error) {
52 newPeerConnectionMu.Lock()
53 defer newPeerConnectionMu.Unlock()
54 ctx, span := otel.Tracer(tracerName).Start(context.Background(), "PeerConnection")
56 pcConfig := webrtc.Configuration{ICEServers: []webrtc.ICEServer{{URLs: iceServers}}}
58 pc, err := api.NewPeerConnection(pcConfig)
60 span.SetStatus(codes.Error, err.Error())
65 wpc := &wrappedPeerConnection{
67 CloseWrapper: pproffd.NewCloseWrapper(pc),
71 // If the state change handler intends to call Close, it should call it on the wrapper.
72 wpc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
73 logger.Levelf(log.Warning, "webrtc PeerConnection state changed to %v", state)
74 span.AddEvent("connection state changed", trace.WithAttributes(attribute.String("state", state.String())))
79 func setAndGatherLocalDescription(peerConnection *wrappedPeerConnection, sdp webrtc.SessionDescription) (_ webrtc.SessionDescription, err error) {
80 gatherComplete := webrtc.GatheringCompletePromise(peerConnection.PeerConnection)
81 peerConnection.span.AddEvent("setting local description")
82 err = peerConnection.SetLocalDescription(sdp)
84 err = fmt.Errorf("setting local description: %w", err)
88 peerConnection.span.AddEvent("gathering complete")
89 return *peerConnection.LocalDescription(), nil
92 // newOffer creates a transport and returns a WebRTC offer to be announced. See
93 // https://github.com/pion/webrtc/blob/master/examples/data-channels/jsfiddle/main.go for what this is modelled on.
94 func (tc *TrackerClient) newOffer(
99 peerConnection *wrappedPeerConnection,
100 dataChannel *webrtc.DataChannel,
101 offer webrtc.SessionDescription,
104 peerConnection, err = newPeerConnection(logger, tc.ICEServers)
109 peerConnection.span.SetAttributes(attribute.String(webrtcConnTypeKey, "offer"))
111 dataChannel, err = peerConnection.CreateDataChannel(dataChannelLabel, nil)
113 err = fmt.Errorf("creating data channel: %w", err)
114 peerConnection.Close()
116 initDataChannel(dataChannel, peerConnection, func(dc datachannel.ReadWriteCloser, dcCtx context.Context, dcSpan trace.Span) {
117 metrics.Add("outbound offers answered with datachannel", 1)
119 tc.stats.ConvertedOutboundConns++
121 tc.OnConn(dc, DataChannelContext{
125 peerConnection: peerConnection,
131 offer, err = peerConnection.CreateOffer(nil)
134 peerConnection.Close()
138 offer, err = setAndGatherLocalDescription(peerConnection, offer)
141 peerConnection.Close()
146 type onDetachedDataChannelFunc func(detached datachannel.ReadWriteCloser, ctx context.Context, span trace.Span)
148 func (tc *TrackerClient) initAnsweringPeerConnection(
149 peerConn *wrappedPeerConnection,
150 offerContext offerContext,
151 ) (answer webrtc.SessionDescription, err error) {
152 peerConn.span.SetAttributes(attribute.String(webrtcConnTypeKey, "answer"))
154 timer := time.AfterFunc(30*time.Second, func() {
155 peerConn.span.SetStatus(codes.Error, "answer timeout")
156 metrics.Add("answering peer connections timed out", 1)
159 peerConn.OnDataChannel(func(d *webrtc.DataChannel) {
160 initDataChannel(d, peerConn, func(detached datachannel.ReadWriteCloser, ctx context.Context, span trace.Span) {
162 metrics.Add("answering peer connection conversions", 1)
164 tc.stats.ConvertedInboundConns++
166 tc.OnConn(detached, DataChannelContext{
167 OfferId: offerContext.Id,
169 InfoHash: offerContext.InfoHash,
170 peerConnection: peerConn,
177 err = peerConn.SetRemoteDescription(offerContext.SessDesc)
181 answer, err = peerConn.CreateAnswer(nil)
186 answer, err = setAndGatherLocalDescription(peerConn, answer)
190 // newAnsweringPeerConnection creates a transport from a WebRTC offer and returns a WebRTC answer to be announced.
191 func (tc *TrackerClient) newAnsweringPeerConnection(
192 offerContext offerContext,
194 peerConn *wrappedPeerConnection, answer webrtc.SessionDescription, err error,
196 peerConn, err = newPeerConnection(tc.Logger, tc.ICEServers)
198 err = fmt.Errorf("failed to create new connection: %w", err)
201 answer, err = tc.initAnsweringPeerConnection(peerConn, offerContext)
203 peerConn.span.RecordError(err)
209 type datachannelReadWriter interface {
216 type ioCloserFunc func() error
218 func (me ioCloserFunc) Close() error {
222 func initDataChannel(
223 dc *webrtc.DataChannel,
224 pc *wrappedPeerConnection,
225 onOpen onDetachedDataChannelFunc,
232 pc.span.AddEvent("data channel opened")
233 var ctx context.Context
234 ctx, span = otel.Tracer(tracerName).Start(pc.ctx, "DataChannel")
235 raw, err := dc.Detach()
237 // This shouldn't happen if the API is configured correctly, and we call from OnOpen.
240 onOpen(hookDataChannelCloser(raw, pc, span, dc), ctx, span)
244 // Hooks the datachannel's Close to Close the owning PeerConnection. The datachannel takes ownership
245 // and responsibility for the PeerConnection.
246 func hookDataChannelCloser(
247 dcrwc datachannel.ReadWriteCloser,
248 pc *wrappedPeerConnection,
249 dataChannelSpan trace.Span,
250 originalDataChannel *webrtc.DataChannel,
251 ) datachannel.ReadWriteCloser {
253 datachannelReadWriter
257 ioCloserFunc(func() error {
260 originalDataChannel.Close()
261 dataChannelSpan.End()