]> Sergey Matveev's repositories - btrtrc.git/blob - webtorrent/transport.go
Merge branch 'webrtc-conn-leak'
[btrtrc.git] / webtorrent / transport.go
1 package webtorrent
2
3 import (
4         "context"
5         "expvar"
6         "fmt"
7         "github.com/anacrolix/log"
8         "github.com/anacrolix/missinggo/v2/pproffd"
9         "github.com/pion/datachannel"
10         "github.com/pion/webrtc/v3"
11         "go.opentelemetry.io/otel"
12         "go.opentelemetry.io/otel/attribute"
13         "go.opentelemetry.io/otel/codes"
14         "go.opentelemetry.io/otel/trace"
15         "io"
16         "sync"
17         "time"
18 )
19
20 const (
21         dataChannelLabel = "webrtc-datachannel"
22 )
23
24 var (
25         metrics = expvar.NewMap("webtorrent")
26         api     = func() *webrtc.API {
27                 // Enable the detach API (since it's non-standard but more idiomatic).
28                 s.DetachDataChannels()
29                 return webrtc.NewAPI(webrtc.WithSettingEngine(s))
30         }()
31         config              = webrtc.Configuration{ICEServers: []webrtc.ICEServer{{URLs: []string{"stun:stun.l.google.com:19302"}}}}
32         newPeerConnectionMu sync.Mutex
33 )
34
35 type wrappedPeerConnection struct {
36         *webrtc.PeerConnection
37         closeMu sync.Mutex
38         pproffd.CloseWrapper
39         span trace.Span
40         ctx  context.Context
41 }
42
43 func (me *wrappedPeerConnection) Close() error {
44         me.closeMu.Lock()
45         defer me.closeMu.Unlock()
46         err := me.CloseWrapper.Close()
47         me.span.End()
48         return err
49 }
50
51 func newPeerConnection(logger log.Logger) (*wrappedPeerConnection, error) {
52         newPeerConnectionMu.Lock()
53         defer newPeerConnectionMu.Unlock()
54         ctx, span := otel.Tracer(tracerName).Start(context.Background(), "PeerConnection")
55         pc, err := api.NewPeerConnection(config)
56         if err != nil {
57                 span.SetStatus(codes.Error, err.Error())
58                 span.RecordError(err)
59                 span.End()
60                 return nil, err
61         }
62         wpc := &wrappedPeerConnection{
63                 PeerConnection: pc,
64                 CloseWrapper:   pproffd.NewCloseWrapper(pc),
65                 ctx:            ctx,
66                 span:           span,
67         }
68         // If the state change handler intends to call Close, it should call it on the wrapper.
69         wpc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
70                 logger.Levelf(log.Warning, "webrtc PeerConnection state changed to %v", state)
71                 span.AddEvent("connection state changed", trace.WithAttributes(attribute.String("state", state.String())))
72         })
73         return wpc, nil
74 }
75
76 func setAndGatherLocalDescription(peerConnection *wrappedPeerConnection, sdp webrtc.SessionDescription) (_ webrtc.SessionDescription, err error) {
77         gatherComplete := webrtc.GatheringCompletePromise(peerConnection.PeerConnection)
78         peerConnection.span.AddEvent("setting local description")
79         err = peerConnection.SetLocalDescription(sdp)
80         if err != nil {
81                 err = fmt.Errorf("setting local description: %w", err)
82                 return
83         }
84         <-gatherComplete
85         peerConnection.span.AddEvent("gathering complete")
86         return *peerConnection.LocalDescription(), nil
87 }
88
89 // newOffer creates a transport and returns a WebRTC offer to be announced. See
90 // https://github.com/pion/webrtc/blob/master/examples/data-channels/jsfiddle/main.go for what this is modelled on.
91 func (tc *TrackerClient) newOffer(
92         logger log.Logger,
93         offerId string,
94         infoHash [20]byte,
95 ) (
96         peerConnection *wrappedPeerConnection,
97         dataChannel *webrtc.DataChannel,
98         offer webrtc.SessionDescription,
99         err error,
100 ) {
101         peerConnection, err = newPeerConnection(logger)
102         if err != nil {
103                 return
104         }
105
106         peerConnection.span.SetAttributes(attribute.String(webrtcConnTypeKey, "offer"))
107
108         dataChannel, err = peerConnection.CreateDataChannel(dataChannelLabel, nil)
109         if err != nil {
110                 err = fmt.Errorf("creating data channel: %w", err)
111                 peerConnection.Close()
112         }
113         initDataChannel(dataChannel, peerConnection, func(dc datachannel.ReadWriteCloser, dcCtx context.Context, dcSpan trace.Span) {
114                 metrics.Add("outbound offers answered with datachannel", 1)
115                 tc.mu.Lock()
116                 tc.stats.ConvertedOutboundConns++
117                 tc.mu.Unlock()
118                 tc.OnConn(dc, DataChannelContext{
119                         OfferId:        offerId,
120                         LocalOffered:   true,
121                         InfoHash:       infoHash,
122                         peerConnection: peerConnection,
123                         Context:        dcCtx,
124                         Span:           dcSpan,
125                 })
126         })
127
128         offer, err = peerConnection.CreateOffer(nil)
129         if err != nil {
130                 dataChannel.Close()
131                 peerConnection.Close()
132                 return
133         }
134
135         offer, err = setAndGatherLocalDescription(peerConnection, offer)
136         if err != nil {
137                 dataChannel.Close()
138                 peerConnection.Close()
139         }
140         return
141 }
142
143 type onDetachedDataChannelFunc func(detached datachannel.ReadWriteCloser, ctx context.Context, span trace.Span)
144
145 func (tc *TrackerClient) initAnsweringPeerConnection(
146         peerConn *wrappedPeerConnection,
147         offerContext offerContext,
148 ) (answer webrtc.SessionDescription, err error) {
149         peerConn.span.SetAttributes(attribute.String(webrtcConnTypeKey, "answer"))
150
151         timer := time.AfterFunc(30*time.Second, func() {
152                 peerConn.span.SetStatus(codes.Error, "answer timeout")
153                 metrics.Add("answering peer connections timed out", 1)
154                 peerConn.Close()
155         })
156         peerConn.OnDataChannel(func(d *webrtc.DataChannel) {
157                 initDataChannel(d, peerConn, func(detached datachannel.ReadWriteCloser, ctx context.Context, span trace.Span) {
158                         timer.Stop()
159                         metrics.Add("answering peer connection conversions", 1)
160                         tc.mu.Lock()
161                         tc.stats.ConvertedInboundConns++
162                         tc.mu.Unlock()
163                         tc.OnConn(detached, DataChannelContext{
164                                 OfferId:        offerContext.Id,
165                                 LocalOffered:   false,
166                                 InfoHash:       offerContext.InfoHash,
167                                 peerConnection: peerConn,
168                                 Context:        ctx,
169                                 Span:           span,
170                         })
171                 })
172         })
173
174         err = peerConn.SetRemoteDescription(offerContext.SessDesc)
175         if err != nil {
176                 return
177         }
178         answer, err = peerConn.CreateAnswer(nil)
179         if err != nil {
180                 return
181         }
182
183         answer, err = setAndGatherLocalDescription(peerConn, answer)
184         return
185 }
186
187 // newAnsweringPeerConnection creates a transport from a WebRTC offer and returns a WebRTC answer to be announced.
188 func (tc *TrackerClient) newAnsweringPeerConnection(
189         offerContext offerContext,
190 ) (
191         peerConn *wrappedPeerConnection, answer webrtc.SessionDescription, err error,
192 ) {
193         peerConn, err = newPeerConnection(tc.Logger)
194         if err != nil {
195                 err = fmt.Errorf("failed to create new connection: %w", err)
196                 return
197         }
198         answer, err = tc.initAnsweringPeerConnection(peerConn, offerContext)
199         if err != nil {
200                 peerConn.span.RecordError(err)
201                 peerConn.Close()
202         }
203         return
204 }
205
206 type datachannelReadWriter interface {
207         datachannel.Reader
208         datachannel.Writer
209         io.Reader
210         io.Writer
211 }
212
213 type ioCloserFunc func() error
214
215 func (me ioCloserFunc) Close() error {
216         return me()
217 }
218
219 func initDataChannel(
220         dc *webrtc.DataChannel,
221         pc *wrappedPeerConnection,
222         onOpen onDetachedDataChannelFunc,
223 ) {
224         var span trace.Span
225         dc.OnClose(func() {
226                 span.End()
227         })
228         dc.OnOpen(func() {
229                 pc.span.AddEvent("data channel opened")
230                 var ctx context.Context
231                 ctx, span = otel.Tracer(tracerName).Start(pc.ctx, "DataChannel")
232                 raw, err := dc.Detach()
233                 if err != nil {
234                         // This shouldn't happen if the API is configured correctly, and we call from OnOpen.
235                         panic(err)
236                 }
237                 onOpen(hookDataChannelCloser(raw, pc, span, dc), ctx, span)
238         })
239 }
240
241 // Hooks the datachannel's Close to Close the owning PeerConnection. The datachannel takes ownership
242 // and responsibility for the PeerConnection.
243 func hookDataChannelCloser(
244         dcrwc datachannel.ReadWriteCloser,
245         pc *wrappedPeerConnection,
246         dataChannelSpan trace.Span,
247         originalDataChannel *webrtc.DataChannel,
248 ) datachannel.ReadWriteCloser {
249         return struct {
250                 datachannelReadWriter
251                 io.Closer
252         }{
253                 dcrwc,
254                 ioCloserFunc(func() error {
255                         dcrwc.Close()
256                         pc.Close()
257                         originalDataChannel.Close()
258                         dataChannelSpan.End()
259                         return nil
260                 }),
261         }
262 }