]> Sergey Matveev's repositories - btrtrc.git/blob - webtorrent/transport.go
Close detached data channel and end span on webrtc conn close
[btrtrc.git] / webtorrent / transport.go
1 package webtorrent
2
3 import (
4         "context"
5         "expvar"
6         "fmt"
7         "go.opentelemetry.io/otel/attribute"
8         "go.opentelemetry.io/otel/codes"
9         "go.opentelemetry.io/otel/trace"
10         "io"
11         "sync"
12
13         "github.com/anacrolix/log"
14         "github.com/anacrolix/missinggo/v2/pproffd"
15         "github.com/pion/datachannel"
16         "github.com/pion/webrtc/v3"
17         "go.opentelemetry.io/otel"
18 )
19
20 var (
21         metrics = expvar.NewMap("webtorrent")
22         api     = func() *webrtc.API {
23                 // Enable the detach API (since it's non-standard but more idiomatic).
24                 s.DetachDataChannels()
25                 return webrtc.NewAPI(webrtc.WithSettingEngine(s))
26         }()
27         config              = webrtc.Configuration{ICEServers: []webrtc.ICEServer{{URLs: []string{"stun:stun.l.google.com:19302"}}}}
28         newPeerConnectionMu sync.Mutex
29 )
30
31 type wrappedPeerConnection struct {
32         *webrtc.PeerConnection
33         closeMu sync.Mutex
34         pproffd.CloseWrapper
35         span trace.Span
36         ctx  context.Context
37 }
38
39 func (me *wrappedPeerConnection) Close() error {
40         me.closeMu.Lock()
41         defer me.closeMu.Unlock()
42         err := me.CloseWrapper.Close()
43         me.span.End()
44         return err
45 }
46
47 func newPeerConnection(logger log.Logger) (*wrappedPeerConnection, error) {
48         newPeerConnectionMu.Lock()
49         defer newPeerConnectionMu.Unlock()
50         ctx, span := otel.Tracer(tracerName).Start(context.Background(), "PeerConnection")
51         pc, err := api.NewPeerConnection(config)
52         if err != nil {
53                 span.SetStatus(codes.Error, err.Error())
54                 span.RecordError(err)
55                 span.End()
56                 return nil, err
57         }
58         wpc := &wrappedPeerConnection{
59                 PeerConnection: pc,
60                 CloseWrapper:   pproffd.NewCloseWrapper(pc),
61                 ctx:            ctx,
62                 span:           span,
63         }
64         // If the state change handler intends to call Close, it should call it on the wrapper.
65         wpc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
66                 logger.Levelf(log.Warning, "webrtc PeerConnection state changed to %v", state)
67                 span.AddEvent("connection state changed", trace.WithAttributes(attribute.String("state", state.String())))
68         })
69         return wpc, nil
70 }
71
72 func setAndGatherLocalDescription(peerConnection *wrappedPeerConnection, sdp webrtc.SessionDescription) (_ webrtc.SessionDescription, err error) {
73         gatherComplete := webrtc.GatheringCompletePromise(peerConnection.PeerConnection)
74         peerConnection.span.AddEvent("setting local description")
75         err = peerConnection.SetLocalDescription(sdp)
76         if err != nil {
77                 err = fmt.Errorf("setting local description: %w", err)
78                 return
79         }
80         <-gatherComplete
81         peerConnection.span.AddEvent("gathering complete")
82         return *peerConnection.LocalDescription(), nil
83 }
84
85 // newOffer creates a transport and returns a WebRTC offer to be announced
86 func newOffer(
87         logger log.Logger,
88 ) (
89         peerConnection *wrappedPeerConnection,
90         offer webrtc.SessionDescription,
91         err error,
92 ) {
93         peerConnection, err = newPeerConnection(logger)
94         if err != nil {
95                 return
96         }
97
98         peerConnection.span.SetAttributes(attribute.String(webrtcConnTypeKey, "offer"))
99
100         offer, err = peerConnection.CreateOffer(nil)
101         if err != nil {
102                 peerConnection.Close()
103                 return
104         }
105
106         offer, err = setAndGatherLocalDescription(peerConnection, offer)
107         if err != nil {
108                 peerConnection.Close()
109         }
110         return
111 }
112
113 func initAnsweringPeerConnection(
114         peerConnection *wrappedPeerConnection,
115         offer webrtc.SessionDescription,
116 ) (answer webrtc.SessionDescription, err error) {
117         peerConnection.span.SetAttributes(attribute.String(webrtcConnTypeKey, "answer"))
118
119         err = peerConnection.SetRemoteDescription(offer)
120         if err != nil {
121                 return
122         }
123         answer, err = peerConnection.CreateAnswer(nil)
124         if err != nil {
125                 return
126         }
127
128         answer, err = setAndGatherLocalDescription(peerConnection, answer)
129         return
130 }
131
132 // newAnsweringPeerConnection creates a transport from a WebRTC offer and returns a WebRTC answer to be announced.
133 func newAnsweringPeerConnection(
134         logger log.Logger,
135         offer webrtc.SessionDescription,
136 ) (
137         peerConn *wrappedPeerConnection, answer webrtc.SessionDescription, err error,
138 ) {
139         peerConn, err = newPeerConnection(logger)
140         if err != nil {
141                 err = fmt.Errorf("failed to create new connection: %w", err)
142                 return
143         }
144         answer, err = initAnsweringPeerConnection(peerConn, offer)
145         if err != nil {
146                 peerConn.span.RecordError(err)
147                 peerConn.Close()
148         }
149         return
150 }
151
152 type datachannelReadWriter interface {
153         datachannel.Reader
154         datachannel.Writer
155         io.Reader
156         io.Writer
157 }
158
159 type ioCloserFunc func() error
160
161 func (me ioCloserFunc) Close() error {
162         return me()
163 }
164
165 func setDataChannelOnOpen(
166         ctx context.Context,
167         dc *webrtc.DataChannel,
168         pc *wrappedPeerConnection,
169         onOpen func(closer datachannel.ReadWriteCloser),
170 ) {
171         dc.OnOpen(func() {
172                 dataChannelSpan := trace.SpanFromContext(ctx)
173                 dataChannelSpan.AddEvent("opened")
174                 raw, err := dc.Detach()
175                 if err != nil {
176                         // This shouldn't happen if the API is configured correctly, and we call from OnOpen.
177                         panic(err)
178                 }
179                 //dc.OnClose()
180                 onOpen(hookDataChannelCloser(raw, pc, dataChannelSpan))
181         })
182 }
183
184 // Hooks the datachannel's Close to Close the owning PeerConnection. The datachannel takes ownership
185 // and responsibility for the PeerConnection.
186 func hookDataChannelCloser(dcrwc datachannel.ReadWriteCloser, pc *wrappedPeerConnection, dataChannelSpan trace.Span) datachannel.ReadWriteCloser {
187         return struct {
188                 datachannelReadWriter
189                 io.Closer
190         }{
191                 dcrwc,
192                 ioCloserFunc(func() error {
193                         dcrwc.Close()
194                         pc.Close()
195                         dataChannelSpan.End()
196                         return nil
197                 }),
198         }
199 }