]> Sergey Matveev's repositories - btrtrc.git/blob - webtorrent/transport.go
Also close created data channels when cleaning up webrtc conns
[btrtrc.git] / webtorrent / transport.go
1 package webtorrent
2
3 import (
4         "context"
5         "expvar"
6         "fmt"
7         "github.com/anacrolix/log"
8         "github.com/anacrolix/missinggo/v2/pproffd"
9         "github.com/pion/datachannel"
10         "github.com/pion/webrtc/v3"
11         "go.opentelemetry.io/otel"
12         "go.opentelemetry.io/otel/attribute"
13         "go.opentelemetry.io/otel/codes"
14         "go.opentelemetry.io/otel/trace"
15         "io"
16         "sync"
17         "time"
18 )
19
20 const (
21         dataChannelLabel = "webrtc-datachannel"
22 )
23
24 var (
25         metrics = expvar.NewMap("webtorrent")
26         api     = func() *webrtc.API {
27                 // Enable the detach API (since it's non-standard but more idiomatic).
28                 s.DetachDataChannels()
29                 return webrtc.NewAPI(webrtc.WithSettingEngine(s))
30         }()
31         config              = webrtc.Configuration{ICEServers: []webrtc.ICEServer{{URLs: []string{"stun:stun.l.google.com:19302"}}}}
32         newPeerConnectionMu sync.Mutex
33 )
34
35 type wrappedPeerConnection struct {
36         *webrtc.PeerConnection
37         closeMu sync.Mutex
38         pproffd.CloseWrapper
39         span trace.Span
40         ctx  context.Context
41 }
42
43 func (me *wrappedPeerConnection) Close() error {
44         me.closeMu.Lock()
45         defer me.closeMu.Unlock()
46         err := me.CloseWrapper.Close()
47         me.span.End()
48         return err
49 }
50
51 func newPeerConnection(logger log.Logger) (*wrappedPeerConnection, error) {
52         newPeerConnectionMu.Lock()
53         defer newPeerConnectionMu.Unlock()
54         ctx, span := otel.Tracer(tracerName).Start(context.Background(), "PeerConnection")
55         pc, err := api.NewPeerConnection(config)
56         if err != nil {
57                 span.SetStatus(codes.Error, err.Error())
58                 span.RecordError(err)
59                 span.End()
60                 return nil, err
61         }
62         wpc := &wrappedPeerConnection{
63                 PeerConnection: pc,
64                 CloseWrapper:   pproffd.NewCloseWrapper(pc),
65                 ctx:            ctx,
66                 span:           span,
67         }
68         // If the state change handler intends to call Close, it should call it on the wrapper.
69         wpc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
70                 logger.Levelf(log.Warning, "webrtc PeerConnection state changed to %v", state)
71                 span.AddEvent("connection state changed", trace.WithAttributes(attribute.String("state", state.String())))
72         })
73         return wpc, nil
74 }
75
76 func setAndGatherLocalDescription(peerConnection *wrappedPeerConnection, sdp webrtc.SessionDescription) (_ webrtc.SessionDescription, err error) {
77         gatherComplete := webrtc.GatheringCompletePromise(peerConnection.PeerConnection)
78         peerConnection.span.AddEvent("setting local description")
79         err = peerConnection.SetLocalDescription(sdp)
80         if err != nil {
81                 err = fmt.Errorf("setting local description: %w", err)
82                 return
83         }
84         <-gatherComplete
85         peerConnection.span.AddEvent("gathering complete")
86         return *peerConnection.LocalDescription(), nil
87 }
88
89 // newOffer creates a transport and returns a WebRTC offer to be announced. See
90 // https://github.com/pion/webrtc/blob/master/examples/data-channels/jsfiddle/main.go for what this is modelled on.
91 func (tc *TrackerClient) newOffer(
92         logger log.Logger,
93         offerId string,
94         infoHash [20]byte,
95 ) (
96         peerConnection *wrappedPeerConnection,
97         dataChannel *webrtc.DataChannel,
98         offer webrtc.SessionDescription,
99         err error,
100 ) {
101         peerConnection, err = newPeerConnection(logger)
102         if err != nil {
103                 return
104         }
105
106         peerConnection.span.SetAttributes(attribute.String(webrtcConnTypeKey, "offer"))
107
108         dataChannel, err = peerConnection.CreateDataChannel(dataChannelLabel, nil)
109         if err != nil {
110                 err = fmt.Errorf("creating data channel: %w", err)
111                 peerConnection.Close()
112         }
113         initDataChannel(dataChannel, peerConnection, func(dc datachannel.ReadWriteCloser, dcCtx context.Context, dcSpan trace.Span) {
114                 metrics.Add("outbound offers answered with datachannel", 1)
115                 tc.mu.Lock()
116                 tc.stats.ConvertedOutboundConns++
117                 tc.mu.Unlock()
118                 tc.OnConn(dc, DataChannelContext{
119                         OfferId:        offerId,
120                         LocalOffered:   true,
121                         InfoHash:       infoHash,
122                         peerConnection: peerConnection,
123                         Context:        dcCtx,
124                         Span:           dcSpan,
125                 })
126         })
127
128         offer, err = peerConnection.CreateOffer(nil)
129         if err != nil {
130                 peerConnection.Close()
131                 return
132         }
133
134         offer, err = setAndGatherLocalDescription(peerConnection, offer)
135         if err != nil {
136                 peerConnection.Close()
137         }
138         return
139 }
140
141 type onDetachedDataChannelFunc func(detached datachannel.ReadWriteCloser, ctx context.Context, span trace.Span)
142
143 func (tc *TrackerClient) initAnsweringPeerConnection(
144         peerConn *wrappedPeerConnection,
145         offerContext offerContext,
146 ) (answer webrtc.SessionDescription, err error) {
147         peerConn.span.SetAttributes(attribute.String(webrtcConnTypeKey, "answer"))
148
149         timer := time.AfterFunc(30*time.Second, func() {
150                 peerConn.span.SetStatus(codes.Error, "answer timeout")
151                 metrics.Add("answering peer connections timed out", 1)
152                 peerConn.Close()
153         })
154         peerConn.OnDataChannel(func(d *webrtc.DataChannel) {
155                 initDataChannel(d, peerConn, func(detached datachannel.ReadWriteCloser, ctx context.Context, span trace.Span) {
156                         timer.Stop()
157                         metrics.Add("answering peer connection conversions", 1)
158                         tc.mu.Lock()
159                         tc.stats.ConvertedInboundConns++
160                         tc.mu.Unlock()
161                         tc.OnConn(detached, DataChannelContext{
162                                 OfferId:        offerContext.Id,
163                                 LocalOffered:   false,
164                                 InfoHash:       offerContext.InfoHash,
165                                 peerConnection: peerConn,
166                                 Context:        ctx,
167                                 Span:           span,
168                         })
169                 })
170         })
171
172         err = peerConn.SetRemoteDescription(offerContext.SessDesc)
173         if err != nil {
174                 return
175         }
176         answer, err = peerConn.CreateAnswer(nil)
177         if err != nil {
178                 return
179         }
180
181         answer, err = setAndGatherLocalDescription(peerConn, answer)
182         return
183 }
184
185 // newAnsweringPeerConnection creates a transport from a WebRTC offer and returns a WebRTC answer to be announced.
186 func (tc *TrackerClient) newAnsweringPeerConnection(
187         offerContext offerContext,
188 ) (
189         peerConn *wrappedPeerConnection, answer webrtc.SessionDescription, err error,
190 ) {
191         peerConn, err = newPeerConnection(tc.Logger)
192         if err != nil {
193                 err = fmt.Errorf("failed to create new connection: %w", err)
194                 return
195         }
196         answer, err = tc.initAnsweringPeerConnection(peerConn, offerContext)
197         if err != nil {
198                 peerConn.span.RecordError(err)
199                 peerConn.Close()
200         }
201         return
202 }
203
204 type datachannelReadWriter interface {
205         datachannel.Reader
206         datachannel.Writer
207         io.Reader
208         io.Writer
209 }
210
211 type ioCloserFunc func() error
212
213 func (me ioCloserFunc) Close() error {
214         return me()
215 }
216
217 func initDataChannel(
218         dc *webrtc.DataChannel,
219         pc *wrappedPeerConnection,
220         onOpen onDetachedDataChannelFunc,
221 ) {
222         var span trace.Span
223         dc.OnClose(func() {
224                 span.End()
225         })
226         dc.OnOpen(func() {
227                 pc.span.AddEvent("data channel opened")
228                 var ctx context.Context
229                 ctx, span = otel.Tracer(tracerName).Start(pc.ctx, "DataChannel")
230                 raw, err := dc.Detach()
231                 if err != nil {
232                         // This shouldn't happen if the API is configured correctly, and we call from OnOpen.
233                         panic(err)
234                 }
235                 onOpen(hookDataChannelCloser(raw, pc, span, dc), ctx, span)
236         })
237 }
238
239 // Hooks the datachannel's Close to Close the owning PeerConnection. The datachannel takes ownership
240 // and responsibility for the PeerConnection.
241 func hookDataChannelCloser(
242         dcrwc datachannel.ReadWriteCloser,
243         pc *wrappedPeerConnection,
244         dataChannelSpan trace.Span,
245         originalDataChannel *webrtc.DataChannel,
246 ) datachannel.ReadWriteCloser {
247         return struct {
248                 datachannelReadWriter
249                 io.Closer
250         }{
251                 dcrwc,
252                 ioCloserFunc(func() error {
253                         dcrwc.Close()
254                         pc.Close()
255                         originalDataChannel.Close()
256                         dataChannelSpan.End()
257                         return nil
258                 }),
259         }
260 }