]> Sergey Matveev's repositories - btrtrc.git/blobdiff - webtorrent/transport.go
Add WebRTC ICE servers config (#824)
[btrtrc.git] / webtorrent / transport.go
index 0bb5701dd58031448aabe89a557ff2b34c81ff36..856625874c8becc00be22a6ee600d401c260b13a 100644 (file)
 package webtorrent
 
 import (
+       "context"
+       "expvar"
        "fmt"
-       "log"
+       "io"
        "sync"
+       "time"
 
+       "github.com/anacrolix/log"
+       "github.com/anacrolix/missinggo/v2/pproffd"
        "github.com/pion/datachannel"
+       "github.com/pion/webrtc/v3"
+       "go.opentelemetry.io/otel"
+       "go.opentelemetry.io/otel/attribute"
+       "go.opentelemetry.io/otel/codes"
+       "go.opentelemetry.io/otel/trace"
+)
 
-       "github.com/pion/webrtc/v2"
+const (
+       dataChannelLabel = "webrtc-datachannel"
 )
 
 var (
-       api = func() *webrtc.API {
-               // Enable the detach API (since it's non-standard but more idiomatic)
-               // (This should be done once globally)
-               s := webrtc.SettingEngine{}
+       metrics = expvar.NewMap("webtorrent")
+       api     = func() *webrtc.API {
+               // Enable the detach API (since it's non-standard but more idiomatic).
                s.DetachDataChannels()
                return webrtc.NewAPI(webrtc.WithSettingEngine(s))
        }()
-       config              = webrtc.Configuration{ICEServers: []webrtc.ICEServer{{URLs: []string{"stun:stun.l.google.com:19302"}}}}
        newPeerConnectionMu sync.Mutex
 )
 
-func newPeerConnection() (*webrtc.PeerConnection, error) {
-       newPeerConnectionMu.Lock()
-       defer newPeerConnectionMu.Unlock()
-       return api.NewPeerConnection(config)
+type wrappedPeerConnection struct {
+       *webrtc.PeerConnection
+       closeMu sync.Mutex
+       pproffd.CloseWrapper
+       span trace.Span
+       ctx  context.Context
 }
 
-type Transport struct {
-       pc *webrtc.PeerConnection
-       dc *webrtc.DataChannel
-
-       lock sync.Mutex
+func (me *wrappedPeerConnection) Close() error {
+       me.closeMu.Lock()
+       defer me.closeMu.Unlock()
+       err := me.CloseWrapper.Close()
+       me.span.End()
+       return err
 }
 
-// NewTransport creates a transport and returns a WebRTC offer to be announced
-func NewTransport() (*Transport, webrtc.SessionDescription, error) {
-       peerConnection, err := newPeerConnection()
+func newPeerConnection(logger log.Logger, iceServers []string) (*wrappedPeerConnection, error) {
+       newPeerConnectionMu.Lock()
+       defer newPeerConnectionMu.Unlock()
+       ctx, span := otel.Tracer(tracerName).Start(context.Background(), "PeerConnection")
+
+       pcConfig := webrtc.Configuration{ICEServers: []webrtc.ICEServer{{URLs: iceServers}}}
+
+       pc, err := api.NewPeerConnection(pcConfig)
        if err != nil {
-               return nil, webrtc.SessionDescription{}, fmt.Errorf("failed to peer connection: %v\n", err)
+               span.SetStatus(codes.Error, err.Error())
+               span.RecordError(err)
+               span.End()
+               return nil, err
        }
-       dataChannel, err := peerConnection.CreateDataChannel("webrtc-datachannel", nil)
-       if err != nil {
-               return nil, webrtc.SessionDescription{}, fmt.Errorf("failed to data channel: %v\n", err)
+       wpc := &wrappedPeerConnection{
+               PeerConnection: pc,
+               CloseWrapper:   pproffd.NewCloseWrapper(pc),
+               ctx:            ctx,
+               span:           span,
        }
-       peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) {
-               fmt.Printf("ICE Connection State has changed: %s\n", connectionState.String())
+       // If the state change handler intends to call Close, it should call it on the wrapper.
+       wpc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
+               logger.Levelf(log.Warning, "webrtc PeerConnection state changed to %v", state)
+               span.AddEvent("connection state changed", trace.WithAttributes(attribute.String("state", state.String())))
        })
+       return wpc, nil
+}
 
-       dataChannel.OnMessage(func(msg webrtc.DataChannelMessage) {
-               fmt.Printf("Message from DataChannel '%s': '%s'\n", dataChannel.Label(), string(msg.Data))
-       })
-       offer, err := peerConnection.CreateOffer(nil)
+func setAndGatherLocalDescription(peerConnection *wrappedPeerConnection, sdp webrtc.SessionDescription) (_ webrtc.SessionDescription, err error) {
+       gatherComplete := webrtc.GatheringCompletePromise(peerConnection.PeerConnection)
+       peerConnection.span.AddEvent("setting local description")
+       err = peerConnection.SetLocalDescription(sdp)
        if err != nil {
-               return nil, webrtc.SessionDescription{}, fmt.Errorf("failed to create offer: %v\n", err)
+               err = fmt.Errorf("setting local description: %w", err)
+               return
        }
-       err = peerConnection.SetLocalDescription(offer)
+       <-gatherComplete
+       peerConnection.span.AddEvent("gathering complete")
+       return *peerConnection.LocalDescription(), nil
+}
+
+// newOffer creates a transport and returns a WebRTC offer to be announced. See
+// https://github.com/pion/webrtc/blob/master/examples/data-channels/jsfiddle/main.go for what this is modelled on.
+func (tc *TrackerClient) newOffer(
+       logger log.Logger,
+       offerId string,
+       infoHash [20]byte,
+) (
+       peerConnection *wrappedPeerConnection,
+       dataChannel *webrtc.DataChannel,
+       offer webrtc.SessionDescription,
+       err error,
+) {
+       peerConnection, err = newPeerConnection(logger, tc.ICEServers)
        if err != nil {
-               return nil, webrtc.SessionDescription{}, fmt.Errorf("failed to set local description: %v\n", err)
+               return
        }
 
-       t := &Transport{pc: peerConnection, dc: dataChannel}
-       return t, offer, nil
-}
+       peerConnection.span.SetAttributes(attribute.String(webrtcConnTypeKey, "offer"))
 
-// NewTransportFromOffer creates a transport from a WebRTC offer and and returns a WebRTC answer to
-// be announced.
-func NewTransportFromOffer(offer webrtc.SessionDescription, onOpen func(datachannel.ReadWriteCloser)) (*Transport, webrtc.SessionDescription, error) {
-       peerConnection, err := newPeerConnection()
+       dataChannel, err = peerConnection.CreateDataChannel(dataChannelLabel, nil)
        if err != nil {
-               return nil, webrtc.SessionDescription{}, fmt.Errorf("failed to peer connection: %v", err)
+               err = fmt.Errorf("creating data channel: %w", err)
+               peerConnection.Close()
        }
-       peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) {
-               fmt.Printf("ICE Connection State has changed: %s\n", connectionState.String())
+       initDataChannel(dataChannel, peerConnection, func(dc datachannel.ReadWriteCloser, dcCtx context.Context, dcSpan trace.Span) {
+               metrics.Add("outbound offers answered with datachannel", 1)
+               tc.mu.Lock()
+               tc.stats.ConvertedOutboundConns++
+               tc.mu.Unlock()
+               tc.OnConn(dc, DataChannelContext{
+                       OfferId:        offerId,
+                       LocalOffered:   true,
+                       InfoHash:       infoHash,
+                       peerConnection: peerConnection,
+                       Context:        dcCtx,
+                       Span:           dcSpan,
+               })
        })
 
-       t := &Transport{pc: peerConnection}
-       peerConnection.OnDataChannel(func(d *webrtc.DataChannel) {
-               fmt.Printf("New DataChannel %s %d\n", d.Label(), d.ID())
-               t.lock.Lock()
-               t.dc = d
-               t.lock.Unlock()
-               t.handleOpen(onOpen)
-       })
+       offer, err = peerConnection.CreateOffer(nil)
+       if err != nil {
+               dataChannel.Close()
+               peerConnection.Close()
+               return
+       }
 
-       err = peerConnection.SetRemoteDescription(offer)
+       offer, err = setAndGatherLocalDescription(peerConnection, offer)
        if err != nil {
-               return nil, webrtc.SessionDescription{}, fmt.Errorf("%v", err)
+               dataChannel.Close()
+               peerConnection.Close()
        }
-       answer, err := peerConnection.CreateAnswer(nil)
+       return
+}
+
+type onDetachedDataChannelFunc func(detached datachannel.ReadWriteCloser, ctx context.Context, span trace.Span)
+
+func (tc *TrackerClient) initAnsweringPeerConnection(
+       peerConn *wrappedPeerConnection,
+       offerContext offerContext,
+) (answer webrtc.SessionDescription, err error) {
+       peerConn.span.SetAttributes(attribute.String(webrtcConnTypeKey, "answer"))
+
+       timer := time.AfterFunc(30*time.Second, func() {
+               peerConn.span.SetStatus(codes.Error, "answer timeout")
+               metrics.Add("answering peer connections timed out", 1)
+               peerConn.Close()
+       })
+       peerConn.OnDataChannel(func(d *webrtc.DataChannel) {
+               initDataChannel(d, peerConn, func(detached datachannel.ReadWriteCloser, ctx context.Context, span trace.Span) {
+                       timer.Stop()
+                       metrics.Add("answering peer connection conversions", 1)
+                       tc.mu.Lock()
+                       tc.stats.ConvertedInboundConns++
+                       tc.mu.Unlock()
+                       tc.OnConn(detached, DataChannelContext{
+                               OfferId:        offerContext.Id,
+                               LocalOffered:   false,
+                               InfoHash:       offerContext.InfoHash,
+                               peerConnection: peerConn,
+                               Context:        ctx,
+                               Span:           span,
+                       })
+               })
+       })
+
+       err = peerConn.SetRemoteDescription(offerContext.SessDesc)
        if err != nil {
-               return nil, webrtc.SessionDescription{}, fmt.Errorf("%v", err)
+               return
        }
-       err = peerConnection.SetLocalDescription(answer)
+       answer, err = peerConn.CreateAnswer(nil)
        if err != nil {
-               return nil, webrtc.SessionDescription{}, fmt.Errorf("%v", err)
+               return
        }
 
-       return t, answer, nil
+       answer, err = setAndGatherLocalDescription(peerConn, answer)
+       return
 }
 
-// SetAnswer sets the WebRTC answer
-func (t *Transport) SetAnswer(answer webrtc.SessionDescription, onOpen func(datachannel.ReadWriteCloser)) error {
-       t.handleOpen(onOpen)
-
-       err := t.pc.SetRemoteDescription(answer)
+// newAnsweringPeerConnection creates a transport from a WebRTC offer and returns a WebRTC answer to be announced.
+func (tc *TrackerClient) newAnsweringPeerConnection(
+       offerContext offerContext,
+) (
+       peerConn *wrappedPeerConnection, answer webrtc.SessionDescription, err error,
+) {
+       peerConn, err = newPeerConnection(tc.Logger, tc.ICEServers)
+       if err != nil {
+               err = fmt.Errorf("failed to create new connection: %w", err)
+               return
+       }
+       answer, err = tc.initAnsweringPeerConnection(peerConn, offerContext)
        if err != nil {
-               return err
+               peerConn.span.RecordError(err)
+               peerConn.Close()
        }
-       return nil
+       return
 }
 
-func (t *Transport) handleOpen(onOpen func(datachannel.ReadWriteCloser)) {
-       t.lock.Lock()
-       dc := t.dc
-       t.lock.Unlock()
-       dc.OnOpen(func() {
-               fmt.Printf("Data channel '%s'-'%d' open.\n", dc.Label(), dc.ID())
+type datachannelReadWriter interface {
+       datachannel.Reader
+       datachannel.Writer
+       io.Reader
+       io.Writer
+}
 
-               // Detach the data channel
+type ioCloserFunc func() error
+
+func (me ioCloserFunc) Close() error {
+       return me()
+}
+
+func initDataChannel(
+       dc *webrtc.DataChannel,
+       pc *wrappedPeerConnection,
+       onOpen onDetachedDataChannelFunc,
+) {
+       var span trace.Span
+       dc.OnClose(func() {
+               span.End()
+       })
+       dc.OnOpen(func() {
+               pc.span.AddEvent("data channel opened")
+               var ctx context.Context
+               ctx, span = otel.Tracer(tracerName).Start(pc.ctx, "DataChannel")
                raw, err := dc.Detach()
                if err != nil {
-                       log.Fatalf("failed to detach: %v", err) // TODO: Error handling
+                       // This shouldn't happen if the API is configured correctly, and we call from OnOpen.
+                       panic(err)
                }
-
-               onOpen(raw)
+               onOpen(hookDataChannelCloser(raw, pc, span, dc), ctx, span)
        })
 }
+
+// Hooks the datachannel's Close to Close the owning PeerConnection. The datachannel takes ownership
+// and responsibility for the PeerConnection.
+func hookDataChannelCloser(
+       dcrwc datachannel.ReadWriteCloser,
+       pc *wrappedPeerConnection,
+       dataChannelSpan trace.Span,
+       originalDataChannel *webrtc.DataChannel,
+) datachannel.ReadWriteCloser {
+       return struct {
+               datachannelReadWriter
+               io.Closer
+       }{
+               dcrwc,
+               ioCloserFunc(func() error {
+                       dcrwc.Close()
+                       pc.Close()
+                       originalDataChannel.Close()
+                       dataChannelSpan.End()
+                       return nil
+               }),
+       }
+}