]> Sergey Matveev's repositories - btrtrc.git/blob - webtorrent/transport.go
Make fs a separate module
[btrtrc.git] / webtorrent / transport.go
1 package webtorrent
2
3 import (
4         "context"
5         "expvar"
6         "fmt"
7         "io"
8         "sync"
9         "time"
10
11         "github.com/anacrolix/log"
12         "github.com/anacrolix/missinggo/v2/pproffd"
13         "github.com/pion/datachannel"
14         "github.com/pion/webrtc/v3"
15         "go.opentelemetry.io/otel"
16         "go.opentelemetry.io/otel/attribute"
17         "go.opentelemetry.io/otel/codes"
18         "go.opentelemetry.io/otel/trace"
19 )
20
21 const (
22         dataChannelLabel = "webrtc-datachannel"
23 )
24
25 var (
26         metrics = expvar.NewMap("webtorrent")
27         api     = func() *webrtc.API {
28                 // Enable the detach API (since it's non-standard but more idiomatic).
29                 s.DetachDataChannels()
30                 return webrtc.NewAPI(webrtc.WithSettingEngine(s))
31         }()
32         newPeerConnectionMu sync.Mutex
33 )
34
35 type wrappedPeerConnection struct {
36         *webrtc.PeerConnection
37         closeMu sync.Mutex
38         pproffd.CloseWrapper
39         span trace.Span
40         ctx  context.Context
41 }
42
43 func (me *wrappedPeerConnection) Close() error {
44         me.closeMu.Lock()
45         defer me.closeMu.Unlock()
46         err := me.CloseWrapper.Close()
47         me.span.End()
48         return err
49 }
50
51 func newPeerConnection(logger log.Logger, iceServers []string) (*wrappedPeerConnection, error) {
52         newPeerConnectionMu.Lock()
53         defer newPeerConnectionMu.Unlock()
54         ctx, span := otel.Tracer(tracerName).Start(context.Background(), "PeerConnection")
55
56         pcConfig := webrtc.Configuration{ICEServers: []webrtc.ICEServer{{URLs: iceServers}}}
57
58         pc, err := api.NewPeerConnection(pcConfig)
59         if err != nil {
60                 span.SetStatus(codes.Error, err.Error())
61                 span.RecordError(err)
62                 span.End()
63                 return nil, err
64         }
65         wpc := &wrappedPeerConnection{
66                 PeerConnection: pc,
67                 CloseWrapper:   pproffd.NewCloseWrapper(pc),
68                 ctx:            ctx,
69                 span:           span,
70         }
71         // If the state change handler intends to call Close, it should call it on the wrapper.
72         wpc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
73                 logger.Levelf(log.Warning, "webrtc PeerConnection state changed to %v", state)
74                 span.AddEvent("connection state changed", trace.WithAttributes(attribute.String("state", state.String())))
75         })
76         return wpc, nil
77 }
78
79 func setAndGatherLocalDescription(peerConnection *wrappedPeerConnection, sdp webrtc.SessionDescription) (_ webrtc.SessionDescription, err error) {
80         gatherComplete := webrtc.GatheringCompletePromise(peerConnection.PeerConnection)
81         peerConnection.span.AddEvent("setting local description")
82         err = peerConnection.SetLocalDescription(sdp)
83         if err != nil {
84                 err = fmt.Errorf("setting local description: %w", err)
85                 return
86         }
87         <-gatherComplete
88         peerConnection.span.AddEvent("gathering complete")
89         return *peerConnection.LocalDescription(), nil
90 }
91
92 // newOffer creates a transport and returns a WebRTC offer to be announced. See
93 // https://github.com/pion/webrtc/blob/master/examples/data-channels/jsfiddle/main.go for what this is modelled on.
94 func (tc *TrackerClient) newOffer(
95         logger log.Logger,
96         offerId string,
97         infoHash [20]byte,
98 ) (
99         peerConnection *wrappedPeerConnection,
100         dataChannel *webrtc.DataChannel,
101         offer webrtc.SessionDescription,
102         err error,
103 ) {
104         peerConnection, err = newPeerConnection(logger, tc.ICEServers)
105         if err != nil {
106                 return
107         }
108
109         peerConnection.span.SetAttributes(attribute.String(webrtcConnTypeKey, "offer"))
110
111         dataChannel, err = peerConnection.CreateDataChannel(dataChannelLabel, nil)
112         if err != nil {
113                 err = fmt.Errorf("creating data channel: %w", err)
114                 peerConnection.Close()
115         }
116         initDataChannel(dataChannel, peerConnection, func(dc datachannel.ReadWriteCloser, dcCtx context.Context, dcSpan trace.Span) {
117                 metrics.Add("outbound offers answered with datachannel", 1)
118                 tc.mu.Lock()
119                 tc.stats.ConvertedOutboundConns++
120                 tc.mu.Unlock()
121                 tc.OnConn(dc, DataChannelContext{
122                         OfferId:        offerId,
123                         LocalOffered:   true,
124                         InfoHash:       infoHash,
125                         peerConnection: peerConnection,
126                         Context:        dcCtx,
127                         Span:           dcSpan,
128                 })
129         })
130
131         offer, err = peerConnection.CreateOffer(nil)
132         if err != nil {
133                 dataChannel.Close()
134                 peerConnection.Close()
135                 return
136         }
137
138         offer, err = setAndGatherLocalDescription(peerConnection, offer)
139         if err != nil {
140                 dataChannel.Close()
141                 peerConnection.Close()
142         }
143         return
144 }
145
146 type onDetachedDataChannelFunc func(detached datachannel.ReadWriteCloser, ctx context.Context, span trace.Span)
147
148 func (tc *TrackerClient) initAnsweringPeerConnection(
149         peerConn *wrappedPeerConnection,
150         offerContext offerContext,
151 ) (answer webrtc.SessionDescription, err error) {
152         peerConn.span.SetAttributes(attribute.String(webrtcConnTypeKey, "answer"))
153
154         timer := time.AfterFunc(30*time.Second, func() {
155                 peerConn.span.SetStatus(codes.Error, "answer timeout")
156                 metrics.Add("answering peer connections timed out", 1)
157                 peerConn.Close()
158         })
159         peerConn.OnDataChannel(func(d *webrtc.DataChannel) {
160                 initDataChannel(d, peerConn, func(detached datachannel.ReadWriteCloser, ctx context.Context, span trace.Span) {
161                         timer.Stop()
162                         metrics.Add("answering peer connection conversions", 1)
163                         tc.mu.Lock()
164                         tc.stats.ConvertedInboundConns++
165                         tc.mu.Unlock()
166                         tc.OnConn(detached, DataChannelContext{
167                                 OfferId:        offerContext.Id,
168                                 LocalOffered:   false,
169                                 InfoHash:       offerContext.InfoHash,
170                                 peerConnection: peerConn,
171                                 Context:        ctx,
172                                 Span:           span,
173                         })
174                 })
175         })
176
177         err = peerConn.SetRemoteDescription(offerContext.SessDesc)
178         if err != nil {
179                 return
180         }
181         answer, err = peerConn.CreateAnswer(nil)
182         if err != nil {
183                 return
184         }
185
186         answer, err = setAndGatherLocalDescription(peerConn, answer)
187         return
188 }
189
190 // newAnsweringPeerConnection creates a transport from a WebRTC offer and returns a WebRTC answer to be announced.
191 func (tc *TrackerClient) newAnsweringPeerConnection(
192         offerContext offerContext,
193 ) (
194         peerConn *wrappedPeerConnection, answer webrtc.SessionDescription, err error,
195 ) {
196         peerConn, err = newPeerConnection(tc.Logger, tc.ICEServers)
197         if err != nil {
198                 err = fmt.Errorf("failed to create new connection: %w", err)
199                 return
200         }
201         answer, err = tc.initAnsweringPeerConnection(peerConn, offerContext)
202         if err != nil {
203                 peerConn.span.RecordError(err)
204                 peerConn.Close()
205         }
206         return
207 }
208
209 type datachannelReadWriter interface {
210         datachannel.Reader
211         datachannel.Writer
212         io.Reader
213         io.Writer
214 }
215
216 type ioCloserFunc func() error
217
218 func (me ioCloserFunc) Close() error {
219         return me()
220 }
221
222 func initDataChannel(
223         dc *webrtc.DataChannel,
224         pc *wrappedPeerConnection,
225         onOpen onDetachedDataChannelFunc,
226 ) {
227         var span trace.Span
228         dc.OnClose(func() {
229                 span.End()
230         })
231         dc.OnOpen(func() {
232                 pc.span.AddEvent("data channel opened")
233                 var ctx context.Context
234                 ctx, span = otel.Tracer(tracerName).Start(pc.ctx, "DataChannel")
235                 raw, err := dc.Detach()
236                 if err != nil {
237                         // This shouldn't happen if the API is configured correctly, and we call from OnOpen.
238                         panic(err)
239                 }
240                 onOpen(hookDataChannelCloser(raw, pc, span, dc), ctx, span)
241         })
242 }
243
244 // Hooks the datachannel's Close to Close the owning PeerConnection. The datachannel takes ownership
245 // and responsibility for the PeerConnection.
246 func hookDataChannelCloser(
247         dcrwc datachannel.ReadWriteCloser,
248         pc *wrappedPeerConnection,
249         dataChannelSpan trace.Span,
250         originalDataChannel *webrtc.DataChannel,
251 ) datachannel.ReadWriteCloser {
252         return struct {
253                 datachannelReadWriter
254                 io.Closer
255         }{
256                 dcrwc,
257                 ioCloserFunc(func() error {
258                         dcrwc.Close()
259                         pc.Close()
260                         originalDataChannel.Close()
261                         dataChannelSpan.End()
262                         return nil
263                 }),
264         }
265 }