]> Sergey Matveev's repositories - btrtrc.git/blob - webtorrent/transport.go
610301db2c8a71cef7007d69d7287715c5174f67
[btrtrc.git] / webtorrent / transport.go
1 package webtorrent
2
3 import (
4         "context"
5         "expvar"
6         "fmt"
7         "io"
8         "sync"
9         "time"
10
11         "github.com/anacrolix/log"
12         "github.com/anacrolix/missinggo/v2/pproffd"
13         "github.com/pion/datachannel"
14         "github.com/pion/webrtc/v3"
15         "go.opentelemetry.io/otel"
16         "go.opentelemetry.io/otel/attribute"
17         "go.opentelemetry.io/otel/codes"
18         "go.opentelemetry.io/otel/trace"
19 )
20
21 const (
22         dataChannelLabel = "webrtc-datachannel"
23 )
24
25 var (
26         metrics = expvar.NewMap("webtorrent")
27         api     = func() *webrtc.API {
28                 // Enable the detach API (since it's non-standard but more idiomatic).
29                 s.DetachDataChannels()
30                 return webrtc.NewAPI(webrtc.WithSettingEngine(s))
31         }()
32         config              = webrtc.Configuration{ICEServers: []webrtc.ICEServer{{URLs: []string{"stun:stun.l.google.com:19302"}}}}
33         newPeerConnectionMu sync.Mutex
34 )
35
36 type wrappedPeerConnection struct {
37         *webrtc.PeerConnection
38         closeMu sync.Mutex
39         pproffd.CloseWrapper
40         span trace.Span
41         ctx  context.Context
42 }
43
44 func (me *wrappedPeerConnection) Close() error {
45         me.closeMu.Lock()
46         defer me.closeMu.Unlock()
47         err := me.CloseWrapper.Close()
48         me.span.End()
49         return err
50 }
51
52 func newPeerConnection(logger log.Logger) (*wrappedPeerConnection, error) {
53         newPeerConnectionMu.Lock()
54         defer newPeerConnectionMu.Unlock()
55         ctx, span := otel.Tracer(tracerName).Start(context.Background(), "PeerConnection")
56         pc, err := api.NewPeerConnection(config)
57         if err != nil {
58                 span.SetStatus(codes.Error, err.Error())
59                 span.RecordError(err)
60                 span.End()
61                 return nil, err
62         }
63         wpc := &wrappedPeerConnection{
64                 PeerConnection: pc,
65                 CloseWrapper:   pproffd.NewCloseWrapper(pc),
66                 ctx:            ctx,
67                 span:           span,
68         }
69         // If the state change handler intends to call Close, it should call it on the wrapper.
70         wpc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
71                 logger.Levelf(log.Warning, "webrtc PeerConnection state changed to %v", state)
72                 span.AddEvent("connection state changed", trace.WithAttributes(attribute.String("state", state.String())))
73         })
74         return wpc, nil
75 }
76
77 func setAndGatherLocalDescription(peerConnection *wrappedPeerConnection, sdp webrtc.SessionDescription) (_ webrtc.SessionDescription, err error) {
78         gatherComplete := webrtc.GatheringCompletePromise(peerConnection.PeerConnection)
79         peerConnection.span.AddEvent("setting local description")
80         err = peerConnection.SetLocalDescription(sdp)
81         if err != nil {
82                 err = fmt.Errorf("setting local description: %w", err)
83                 return
84         }
85         <-gatherComplete
86         peerConnection.span.AddEvent("gathering complete")
87         return *peerConnection.LocalDescription(), nil
88 }
89
90 // newOffer creates a transport and returns a WebRTC offer to be announced. See
91 // https://github.com/pion/webrtc/blob/master/examples/data-channels/jsfiddle/main.go for what this is modelled on.
92 func (tc *TrackerClient) newOffer(
93         logger log.Logger,
94         offerId string,
95         infoHash [20]byte,
96 ) (
97         peerConnection *wrappedPeerConnection,
98         dataChannel *webrtc.DataChannel,
99         offer webrtc.SessionDescription,
100         err error,
101 ) {
102         peerConnection, err = newPeerConnection(logger)
103         if err != nil {
104                 return
105         }
106
107         peerConnection.span.SetAttributes(attribute.String(webrtcConnTypeKey, "offer"))
108
109         dataChannel, err = peerConnection.CreateDataChannel(dataChannelLabel, nil)
110         if err != nil {
111                 err = fmt.Errorf("creating data channel: %w", err)
112                 peerConnection.Close()
113         }
114         initDataChannel(dataChannel, peerConnection, func(dc datachannel.ReadWriteCloser, dcCtx context.Context, dcSpan trace.Span) {
115                 metrics.Add("outbound offers answered with datachannel", 1)
116                 tc.mu.Lock()
117                 tc.stats.ConvertedOutboundConns++
118                 tc.mu.Unlock()
119                 tc.OnConn(dc, DataChannelContext{
120                         OfferId:        offerId,
121                         LocalOffered:   true,
122                         InfoHash:       infoHash,
123                         peerConnection: peerConnection,
124                         Context:        dcCtx,
125                         Span:           dcSpan,
126                 })
127         })
128
129         offer, err = peerConnection.CreateOffer(nil)
130         if err != nil {
131                 dataChannel.Close()
132                 peerConnection.Close()
133                 return
134         }
135
136         offer, err = setAndGatherLocalDescription(peerConnection, offer)
137         if err != nil {
138                 dataChannel.Close()
139                 peerConnection.Close()
140         }
141         return
142 }
143
144 type onDetachedDataChannelFunc func(detached datachannel.ReadWriteCloser, ctx context.Context, span trace.Span)
145
146 func (tc *TrackerClient) initAnsweringPeerConnection(
147         peerConn *wrappedPeerConnection,
148         offerContext offerContext,
149 ) (answer webrtc.SessionDescription, err error) {
150         peerConn.span.SetAttributes(attribute.String(webrtcConnTypeKey, "answer"))
151
152         timer := time.AfterFunc(30*time.Second, func() {
153                 peerConn.span.SetStatus(codes.Error, "answer timeout")
154                 metrics.Add("answering peer connections timed out", 1)
155                 peerConn.Close()
156         })
157         peerConn.OnDataChannel(func(d *webrtc.DataChannel) {
158                 initDataChannel(d, peerConn, func(detached datachannel.ReadWriteCloser, ctx context.Context, span trace.Span) {
159                         timer.Stop()
160                         metrics.Add("answering peer connection conversions", 1)
161                         tc.mu.Lock()
162                         tc.stats.ConvertedInboundConns++
163                         tc.mu.Unlock()
164                         tc.OnConn(detached, DataChannelContext{
165                                 OfferId:        offerContext.Id,
166                                 LocalOffered:   false,
167                                 InfoHash:       offerContext.InfoHash,
168                                 peerConnection: peerConn,
169                                 Context:        ctx,
170                                 Span:           span,
171                         })
172                 })
173         })
174
175         err = peerConn.SetRemoteDescription(offerContext.SessDesc)
176         if err != nil {
177                 return
178         }
179         answer, err = peerConn.CreateAnswer(nil)
180         if err != nil {
181                 return
182         }
183
184         answer, err = setAndGatherLocalDescription(peerConn, answer)
185         return
186 }
187
188 // newAnsweringPeerConnection creates a transport from a WebRTC offer and returns a WebRTC answer to be announced.
189 func (tc *TrackerClient) newAnsweringPeerConnection(
190         offerContext offerContext,
191 ) (
192         peerConn *wrappedPeerConnection, answer webrtc.SessionDescription, err error,
193 ) {
194         peerConn, err = newPeerConnection(tc.Logger)
195         if err != nil {
196                 err = fmt.Errorf("failed to create new connection: %w", err)
197                 return
198         }
199         answer, err = tc.initAnsweringPeerConnection(peerConn, offerContext)
200         if err != nil {
201                 peerConn.span.RecordError(err)
202                 peerConn.Close()
203         }
204         return
205 }
206
207 type datachannelReadWriter interface {
208         datachannel.Reader
209         datachannel.Writer
210         io.Reader
211         io.Writer
212 }
213
214 type ioCloserFunc func() error
215
216 func (me ioCloserFunc) Close() error {
217         return me()
218 }
219
220 func initDataChannel(
221         dc *webrtc.DataChannel,
222         pc *wrappedPeerConnection,
223         onOpen onDetachedDataChannelFunc,
224 ) {
225         var span trace.Span
226         dc.OnClose(func() {
227                 span.End()
228         })
229         dc.OnOpen(func() {
230                 pc.span.AddEvent("data channel opened")
231                 var ctx context.Context
232                 ctx, span = otel.Tracer(tracerName).Start(pc.ctx, "DataChannel")
233                 raw, err := dc.Detach()
234                 if err != nil {
235                         // This shouldn't happen if the API is configured correctly, and we call from OnOpen.
236                         panic(err)
237                 }
238                 onOpen(hookDataChannelCloser(raw, pc, span, dc), ctx, span)
239         })
240 }
241
242 // Hooks the datachannel's Close to Close the owning PeerConnection. The datachannel takes ownership
243 // and responsibility for the PeerConnection.
244 func hookDataChannelCloser(
245         dcrwc datachannel.ReadWriteCloser,
246         pc *wrappedPeerConnection,
247         dataChannelSpan trace.Span,
248         originalDataChannel *webrtc.DataChannel,
249 ) datachannel.ReadWriteCloser {
250         return struct {
251                 datachannelReadWriter
252                 io.Closer
253         }{
254                 dcrwc,
255                 ioCloserFunc(func() error {
256                         dcrwc.Close()
257                         pc.Close()
258                         originalDataChannel.Close()
259                         dataChannelSpan.End()
260                         return nil
261                 }),
262         }
263 }