]> Sergey Matveev's repositories - btrtrc.git/blob - webtorrent/transport.go
Fix some webtorrent PeerConnection leaks
[btrtrc.git] / webtorrent / transport.go
1 package webtorrent
2
3 import (
4         "expvar"
5         "fmt"
6         "io"
7         "sync"
8         "time"
9
10         "github.com/anacrolix/missinggo/v2/pproffd"
11         "github.com/pion/datachannel"
12
13         "github.com/pion/webrtc/v2"
14 )
15
16 var (
17         metrics = expvar.NewMap("webtorrent")
18         api     = func() *webrtc.API {
19                 // Enable the detach API (since it's non-standard but more idiomatic).
20                 s := webrtc.SettingEngine{}
21                 s.DetachDataChannels()
22                 return webrtc.NewAPI(webrtc.WithSettingEngine(s))
23         }()
24         config              = webrtc.Configuration{ICEServers: []webrtc.ICEServer{{URLs: []string{"stun:stun.l.google.com:19302"}}}}
25         newPeerConnectionMu sync.Mutex
26 )
27
28 type wrappedPeerConnection struct {
29         *webrtc.PeerConnection
30         pproffd.CloseWrapper
31 }
32
33 func (me wrappedPeerConnection) Close() error {
34         return me.CloseWrapper.Close()
35 }
36
37 func newPeerConnection() (wrappedPeerConnection, error) {
38         newPeerConnectionMu.Lock()
39         defer newPeerConnectionMu.Unlock()
40         pc, err := api.NewPeerConnection(config)
41         if err != nil {
42                 return wrappedPeerConnection{}, err
43         }
44         return wrappedPeerConnection{
45                 pc,
46                 pproffd.NewCloseWrapper(pc),
47         }, nil
48 }
49
50 // newOffer creates a transport and returns a WebRTC offer to be announced
51 func newOffer() (
52         peerConnection wrappedPeerConnection,
53         dataChannel *webrtc.DataChannel,
54         offer webrtc.SessionDescription,
55         err error,
56 ) {
57         peerConnection, err = newPeerConnection()
58         if err != nil {
59                 return
60         }
61         dataChannel, err = peerConnection.CreateDataChannel("webrtc-datachannel", nil)
62         if err != nil {
63                 peerConnection.Close()
64                 return
65         }
66         offer, err = peerConnection.CreateOffer(nil)
67         if err != nil {
68                 peerConnection.Close()
69                 return
70         }
71         err = peerConnection.SetLocalDescription(offer)
72         if err != nil {
73                 peerConnection.Close()
74                 return
75         }
76         return
77 }
78
79 func initAnsweringPeerConnection(
80         peerConnection wrappedPeerConnection,
81         offerId string,
82         offer webrtc.SessionDescription,
83         onOpen onDataChannelOpen,
84 ) (answer webrtc.SessionDescription, err error) {
85         err = peerConnection.SetRemoteDescription(offer)
86         if err != nil {
87                 return
88         }
89         answer, err = peerConnection.CreateAnswer(nil)
90         if err != nil {
91                 return
92         }
93         err = peerConnection.SetLocalDescription(answer)
94         if err != nil {
95                 return
96         }
97         timer := time.AfterFunc(30*time.Second, func() {
98                 metrics.Add("answering peer connections timed out", 1)
99                 peerConnection.Close()
100         })
101         peerConnection.OnDataChannel(func(d *webrtc.DataChannel) {
102                 setDataChannelOnOpen(d, peerConnection, func(dc datachannel.ReadWriteCloser) {
103                         timer.Stop()
104                         metrics.Add("answering peer connection conversions", 1)
105                         onOpen(dc, DataChannelContext{answer, offer, offerId, false})
106                 })
107         })
108         return
109 }
110
111 // getAnswerForOffer creates a transport from a WebRTC offer and and returns a WebRTC answer to be
112 // announced.
113 func getAnswerForOffer(
114         offer webrtc.SessionDescription, onOpen onDataChannelOpen, offerId string,
115 ) (
116         answer webrtc.SessionDescription, err error,
117 ) {
118         peerConnection, err := newPeerConnection()
119         if err != nil {
120                 err = fmt.Errorf("failed to peer connection: %w", err)
121                 return
122         }
123         answer, err = initAnsweringPeerConnection(peerConnection, offerId, offer, onOpen)
124         if err != nil {
125                 peerConnection.Close()
126         }
127         return
128 }
129
130 func (t *outboundOffer) setAnswer(answer webrtc.SessionDescription, onOpen func(datachannel.ReadWriteCloser)) error {
131         setDataChannelOnOpen(t.dataChannel, t.peerConnection, onOpen)
132         err := t.peerConnection.SetRemoteDescription(answer)
133         if err == nil {
134                 // TODO: Maybe grab this inside the onOpen callback and mark the offer used there.
135                 t.answered = true
136         }
137         return err
138 }
139
140 type datachannelReadWriter interface {
141         datachannel.Reader
142         datachannel.Writer
143         io.Reader
144         io.Writer
145 }
146
147 type ioCloserFunc func() error
148
149 func (me ioCloserFunc) Close() error {
150         return me()
151 }
152
153 func setDataChannelOnOpen(
154         dc *webrtc.DataChannel,
155         pc wrappedPeerConnection,
156         onOpen func(closer datachannel.ReadWriteCloser),
157 ) {
158         dc.OnOpen(func() {
159                 raw, err := dc.Detach()
160                 if err != nil {
161                         // This shouldn't happen if the API is configured correctly, and we call from OnOpen.
162                         panic(err)
163                 }
164                 onOpen(hookDataChannelCloser(raw, pc))
165         })
166 }
167
168 func hookDataChannelCloser(dcrwc datachannel.ReadWriteCloser, pc wrappedPeerConnection) datachannel.ReadWriteCloser {
169         return struct {
170                 datachannelReadWriter
171                 io.Closer
172         }{
173                 dcrwc,
174                 ioCloserFunc(pc.Close),
175         }
176 }