]> Sergey Matveev's repositories - btrtrc.git/blobdiff - webtorrent/transport.go
No Web*
[btrtrc.git] / webtorrent / transport.go
diff --git a/webtorrent/transport.go b/webtorrent/transport.go
deleted file mode 100644 (file)
index 610301d..0000000
+++ /dev/null
@@ -1,263 +0,0 @@
-package webtorrent
-
-import (
-       "context"
-       "expvar"
-       "fmt"
-       "io"
-       "sync"
-       "time"
-
-       "github.com/anacrolix/log"
-       "github.com/anacrolix/missinggo/v2/pproffd"
-       "github.com/pion/datachannel"
-       "github.com/pion/webrtc/v3"
-       "go.opentelemetry.io/otel"
-       "go.opentelemetry.io/otel/attribute"
-       "go.opentelemetry.io/otel/codes"
-       "go.opentelemetry.io/otel/trace"
-)
-
-const (
-       dataChannelLabel = "webrtc-datachannel"
-)
-
-var (
-       metrics = expvar.NewMap("webtorrent")
-       api     = func() *webrtc.API {
-               // Enable the detach API (since it's non-standard but more idiomatic).
-               s.DetachDataChannels()
-               return webrtc.NewAPI(webrtc.WithSettingEngine(s))
-       }()
-       config              = webrtc.Configuration{ICEServers: []webrtc.ICEServer{{URLs: []string{"stun:stun.l.google.com:19302"}}}}
-       newPeerConnectionMu sync.Mutex
-)
-
-type wrappedPeerConnection struct {
-       *webrtc.PeerConnection
-       closeMu sync.Mutex
-       pproffd.CloseWrapper
-       span trace.Span
-       ctx  context.Context
-}
-
-func (me *wrappedPeerConnection) Close() error {
-       me.closeMu.Lock()
-       defer me.closeMu.Unlock()
-       err := me.CloseWrapper.Close()
-       me.span.End()
-       return err
-}
-
-func newPeerConnection(logger log.Logger) (*wrappedPeerConnection, error) {
-       newPeerConnectionMu.Lock()
-       defer newPeerConnectionMu.Unlock()
-       ctx, span := otel.Tracer(tracerName).Start(context.Background(), "PeerConnection")
-       pc, err := api.NewPeerConnection(config)
-       if err != nil {
-               span.SetStatus(codes.Error, err.Error())
-               span.RecordError(err)
-               span.End()
-               return nil, err
-       }
-       wpc := &wrappedPeerConnection{
-               PeerConnection: pc,
-               CloseWrapper:   pproffd.NewCloseWrapper(pc),
-               ctx:            ctx,
-               span:           span,
-       }
-       // If the state change handler intends to call Close, it should call it on the wrapper.
-       wpc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
-               logger.Levelf(log.Warning, "webrtc PeerConnection state changed to %v", state)
-               span.AddEvent("connection state changed", trace.WithAttributes(attribute.String("state", state.String())))
-       })
-       return wpc, nil
-}
-
-func setAndGatherLocalDescription(peerConnection *wrappedPeerConnection, sdp webrtc.SessionDescription) (_ webrtc.SessionDescription, err error) {
-       gatherComplete := webrtc.GatheringCompletePromise(peerConnection.PeerConnection)
-       peerConnection.span.AddEvent("setting local description")
-       err = peerConnection.SetLocalDescription(sdp)
-       if err != nil {
-               err = fmt.Errorf("setting local description: %w", err)
-               return
-       }
-       <-gatherComplete
-       peerConnection.span.AddEvent("gathering complete")
-       return *peerConnection.LocalDescription(), nil
-}
-
-// newOffer creates a transport and returns a WebRTC offer to be announced. See
-// https://github.com/pion/webrtc/blob/master/examples/data-channels/jsfiddle/main.go for what this is modelled on.
-func (tc *TrackerClient) newOffer(
-       logger log.Logger,
-       offerId string,
-       infoHash [20]byte,
-) (
-       peerConnection *wrappedPeerConnection,
-       dataChannel *webrtc.DataChannel,
-       offer webrtc.SessionDescription,
-       err error,
-) {
-       peerConnection, err = newPeerConnection(logger)
-       if err != nil {
-               return
-       }
-
-       peerConnection.span.SetAttributes(attribute.String(webrtcConnTypeKey, "offer"))
-
-       dataChannel, err = peerConnection.CreateDataChannel(dataChannelLabel, nil)
-       if err != nil {
-               err = fmt.Errorf("creating data channel: %w", err)
-               peerConnection.Close()
-       }
-       initDataChannel(dataChannel, peerConnection, func(dc datachannel.ReadWriteCloser, dcCtx context.Context, dcSpan trace.Span) {
-               metrics.Add("outbound offers answered with datachannel", 1)
-               tc.mu.Lock()
-               tc.stats.ConvertedOutboundConns++
-               tc.mu.Unlock()
-               tc.OnConn(dc, DataChannelContext{
-                       OfferId:        offerId,
-                       LocalOffered:   true,
-                       InfoHash:       infoHash,
-                       peerConnection: peerConnection,
-                       Context:        dcCtx,
-                       Span:           dcSpan,
-               })
-       })
-
-       offer, err = peerConnection.CreateOffer(nil)
-       if err != nil {
-               dataChannel.Close()
-               peerConnection.Close()
-               return
-       }
-
-       offer, err = setAndGatherLocalDescription(peerConnection, offer)
-       if err != nil {
-               dataChannel.Close()
-               peerConnection.Close()
-       }
-       return
-}
-
-type onDetachedDataChannelFunc func(detached datachannel.ReadWriteCloser, ctx context.Context, span trace.Span)
-
-func (tc *TrackerClient) initAnsweringPeerConnection(
-       peerConn *wrappedPeerConnection,
-       offerContext offerContext,
-) (answer webrtc.SessionDescription, err error) {
-       peerConn.span.SetAttributes(attribute.String(webrtcConnTypeKey, "answer"))
-
-       timer := time.AfterFunc(30*time.Second, func() {
-               peerConn.span.SetStatus(codes.Error, "answer timeout")
-               metrics.Add("answering peer connections timed out", 1)
-               peerConn.Close()
-       })
-       peerConn.OnDataChannel(func(d *webrtc.DataChannel) {
-               initDataChannel(d, peerConn, func(detached datachannel.ReadWriteCloser, ctx context.Context, span trace.Span) {
-                       timer.Stop()
-                       metrics.Add("answering peer connection conversions", 1)
-                       tc.mu.Lock()
-                       tc.stats.ConvertedInboundConns++
-                       tc.mu.Unlock()
-                       tc.OnConn(detached, DataChannelContext{
-                               OfferId:        offerContext.Id,
-                               LocalOffered:   false,
-                               InfoHash:       offerContext.InfoHash,
-                               peerConnection: peerConn,
-                               Context:        ctx,
-                               Span:           span,
-                       })
-               })
-       })
-
-       err = peerConn.SetRemoteDescription(offerContext.SessDesc)
-       if err != nil {
-               return
-       }
-       answer, err = peerConn.CreateAnswer(nil)
-       if err != nil {
-               return
-       }
-
-       answer, err = setAndGatherLocalDescription(peerConn, answer)
-       return
-}
-
-// newAnsweringPeerConnection creates a transport from a WebRTC offer and returns a WebRTC answer to be announced.
-func (tc *TrackerClient) newAnsweringPeerConnection(
-       offerContext offerContext,
-) (
-       peerConn *wrappedPeerConnection, answer webrtc.SessionDescription, err error,
-) {
-       peerConn, err = newPeerConnection(tc.Logger)
-       if err != nil {
-               err = fmt.Errorf("failed to create new connection: %w", err)
-               return
-       }
-       answer, err = tc.initAnsweringPeerConnection(peerConn, offerContext)
-       if err != nil {
-               peerConn.span.RecordError(err)
-               peerConn.Close()
-       }
-       return
-}
-
-type datachannelReadWriter interface {
-       datachannel.Reader
-       datachannel.Writer
-       io.Reader
-       io.Writer
-}
-
-type ioCloserFunc func() error
-
-func (me ioCloserFunc) Close() error {
-       return me()
-}
-
-func initDataChannel(
-       dc *webrtc.DataChannel,
-       pc *wrappedPeerConnection,
-       onOpen onDetachedDataChannelFunc,
-) {
-       var span trace.Span
-       dc.OnClose(func() {
-               span.End()
-       })
-       dc.OnOpen(func() {
-               pc.span.AddEvent("data channel opened")
-               var ctx context.Context
-               ctx, span = otel.Tracer(tracerName).Start(pc.ctx, "DataChannel")
-               raw, err := dc.Detach()
-               if err != nil {
-                       // This shouldn't happen if the API is configured correctly, and we call from OnOpen.
-                       panic(err)
-               }
-               onOpen(hookDataChannelCloser(raw, pc, span, dc), ctx, span)
-       })
-}
-
-// Hooks the datachannel's Close to Close the owning PeerConnection. The datachannel takes ownership
-// and responsibility for the PeerConnection.
-func hookDataChannelCloser(
-       dcrwc datachannel.ReadWriteCloser,
-       pc *wrappedPeerConnection,
-       dataChannelSpan trace.Span,
-       originalDataChannel *webrtc.DataChannel,
-) datachannel.ReadWriteCloser {
-       return struct {
-               datachannelReadWriter
-               io.Closer
-       }{
-               dcrwc,
-               ioCloserFunc(func() error {
-                       dcrwc.Close()
-                       pc.Close()
-                       originalDataChannel.Close()
-                       dataChannelSpan.End()
-                       return nil
-               }),
-       }
-}