]> Sergey Matveev's repositories - btrtrc.git/blob - webtorrent/transport.go
webtorrent: Synchronize access to PeerConnection.Close
[btrtrc.git] / webtorrent / transport.go
1 package webtorrent
2
3 import (
4         "expvar"
5         "fmt"
6         "io"
7         "sync"
8
9         "github.com/anacrolix/missinggo/v2/pproffd"
10         "github.com/pion/datachannel"
11
12         "github.com/pion/webrtc/v2"
13 )
14
15 var (
16         metrics = expvar.NewMap("webtorrent")
17         api     = func() *webrtc.API {
18                 // Enable the detach API (since it's non-standard but more idiomatic).
19                 s := webrtc.SettingEngine{}
20                 s.DetachDataChannels()
21                 return webrtc.NewAPI(webrtc.WithSettingEngine(s))
22         }()
23         config              = webrtc.Configuration{ICEServers: []webrtc.ICEServer{{URLs: []string{"stun:stun.l.google.com:19302"}}}}
24         newPeerConnectionMu sync.Mutex
25 )
26
27 type wrappedPeerConnection struct {
28         *webrtc.PeerConnection
29         closeMu sync.Mutex
30         pproffd.CloseWrapper
31 }
32
33 func (me *wrappedPeerConnection) Close() error {
34         me.closeMu.Lock()
35         defer me.closeMu.Unlock()
36         return me.CloseWrapper.Close()
37 }
38
39 func newPeerConnection() (*wrappedPeerConnection, error) {
40         newPeerConnectionMu.Lock()
41         defer newPeerConnectionMu.Unlock()
42         pc, err := api.NewPeerConnection(config)
43         if err != nil {
44                 return nil, err
45         }
46         return &wrappedPeerConnection{
47                 PeerConnection: pc,
48                 CloseWrapper:   pproffd.NewCloseWrapper(pc),
49         }, nil
50 }
51
52 // newOffer creates a transport and returns a WebRTC offer to be announced
53 func newOffer() (
54         peerConnection *wrappedPeerConnection,
55         dataChannel *webrtc.DataChannel,
56         offer webrtc.SessionDescription,
57         err error,
58 ) {
59         peerConnection, err = newPeerConnection()
60         if err != nil {
61                 return
62         }
63         dataChannel, err = peerConnection.CreateDataChannel("webrtc-datachannel", nil)
64         if err != nil {
65                 peerConnection.Close()
66                 return
67         }
68         offer, err = peerConnection.CreateOffer(nil)
69         if err != nil {
70                 peerConnection.Close()
71                 return
72         }
73         err = peerConnection.SetLocalDescription(offer)
74         if err != nil {
75                 peerConnection.Close()
76                 return
77         }
78         return
79 }
80
81 func initAnsweringPeerConnection(
82         peerConnection *wrappedPeerConnection,
83         offer webrtc.SessionDescription,
84 ) (answer webrtc.SessionDescription, err error) {
85         err = peerConnection.SetRemoteDescription(offer)
86         if err != nil {
87                 return
88         }
89         answer, err = peerConnection.CreateAnswer(nil)
90         if err != nil {
91                 return
92         }
93         err = peerConnection.SetLocalDescription(answer)
94         if err != nil {
95                 return
96         }
97         return
98 }
99
100 // newAnsweringPeerConnection creates a transport from a WebRTC offer and and returns a WebRTC answer to be
101 // announced.
102 func newAnsweringPeerConnection(offer webrtc.SessionDescription) (
103         peerConn *wrappedPeerConnection, answer webrtc.SessionDescription, err error,
104 ) {
105         peerConn, err = newPeerConnection()
106         if err != nil {
107                 err = fmt.Errorf("failed to create new connection: %w", err)
108                 return
109         }
110         answer, err = initAnsweringPeerConnection(peerConn, offer)
111         if err != nil {
112                 peerConn.Close()
113         }
114         return
115 }
116
117 func (t *outboundOffer) setAnswer(answer webrtc.SessionDescription, onOpen func(datachannel.ReadWriteCloser)) error {
118         setDataChannelOnOpen(t.dataChannel, t.peerConnection, onOpen)
119         err := t.peerConnection.SetRemoteDescription(answer)
120         return err
121 }
122
123 type datachannelReadWriter interface {
124         datachannel.Reader
125         datachannel.Writer
126         io.Reader
127         io.Writer
128 }
129
130 type ioCloserFunc func() error
131
132 func (me ioCloserFunc) Close() error {
133         return me()
134 }
135
136 func setDataChannelOnOpen(
137         dc *webrtc.DataChannel,
138         pc *wrappedPeerConnection,
139         onOpen func(closer datachannel.ReadWriteCloser),
140 ) {
141         dc.OnOpen(func() {
142                 raw, err := dc.Detach()
143                 if err != nil {
144                         // This shouldn't happen if the API is configured correctly, and we call from OnOpen.
145                         panic(err)
146                 }
147                 onOpen(hookDataChannelCloser(raw, pc))
148         })
149 }
150
151 func hookDataChannelCloser(dcrwc datachannel.ReadWriteCloser, pc *wrappedPeerConnection) datachannel.ReadWriteCloser {
152         return struct {
153                 datachannelReadWriter
154                 io.Closer
155         }{
156                 dcrwc,
157                 ioCloserFunc(pc.Close),
158         }
159 }