+++ /dev/null
-package webtorrent
-
-import (
- "context"
- "expvar"
- "fmt"
- "io"
- "sync"
- "time"
-
- "github.com/anacrolix/log"
- "github.com/anacrolix/missinggo/v2/pproffd"
- "github.com/pion/datachannel"
- "github.com/pion/webrtc/v3"
- "go.opentelemetry.io/otel"
- "go.opentelemetry.io/otel/attribute"
- "go.opentelemetry.io/otel/codes"
- "go.opentelemetry.io/otel/trace"
-)
-
-const (
- dataChannelLabel = "webrtc-datachannel"
-)
-
-var (
- metrics = expvar.NewMap("webtorrent")
- api = func() *webrtc.API {
- // Enable the detach API (since it's non-standard but more idiomatic).
- s.DetachDataChannels()
- return webrtc.NewAPI(webrtc.WithSettingEngine(s))
- }()
- config = webrtc.Configuration{ICEServers: []webrtc.ICEServer{{URLs: []string{"stun:stun.l.google.com:19302"}}}}
- newPeerConnectionMu sync.Mutex
-)
-
-type wrappedPeerConnection struct {
- *webrtc.PeerConnection
- closeMu sync.Mutex
- pproffd.CloseWrapper
- span trace.Span
- ctx context.Context
-}
-
-func (me *wrappedPeerConnection) Close() error {
- me.closeMu.Lock()
- defer me.closeMu.Unlock()
- err := me.CloseWrapper.Close()
- me.span.End()
- return err
-}
-
-func newPeerConnection(logger log.Logger) (*wrappedPeerConnection, error) {
- newPeerConnectionMu.Lock()
- defer newPeerConnectionMu.Unlock()
- ctx, span := otel.Tracer(tracerName).Start(context.Background(), "PeerConnection")
- pc, err := api.NewPeerConnection(config)
- if err != nil {
- span.SetStatus(codes.Error, err.Error())
- span.RecordError(err)
- span.End()
- return nil, err
- }
- wpc := &wrappedPeerConnection{
- PeerConnection: pc,
- CloseWrapper: pproffd.NewCloseWrapper(pc),
- ctx: ctx,
- span: span,
- }
- // If the state change handler intends to call Close, it should call it on the wrapper.
- wpc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
- logger.Levelf(log.Warning, "webrtc PeerConnection state changed to %v", state)
- span.AddEvent("connection state changed", trace.WithAttributes(attribute.String("state", state.String())))
- })
- return wpc, nil
-}
-
-func setAndGatherLocalDescription(peerConnection *wrappedPeerConnection, sdp webrtc.SessionDescription) (_ webrtc.SessionDescription, err error) {
- gatherComplete := webrtc.GatheringCompletePromise(peerConnection.PeerConnection)
- peerConnection.span.AddEvent("setting local description")
- err = peerConnection.SetLocalDescription(sdp)
- if err != nil {
- err = fmt.Errorf("setting local description: %w", err)
- return
- }
- <-gatherComplete
- peerConnection.span.AddEvent("gathering complete")
- return *peerConnection.LocalDescription(), nil
-}
-
-// newOffer creates a transport and returns a WebRTC offer to be announced. See
-// https://github.com/pion/webrtc/blob/master/examples/data-channels/jsfiddle/main.go for what this is modelled on.
-func (tc *TrackerClient) newOffer(
- logger log.Logger,
- offerId string,
- infoHash [20]byte,
-) (
- peerConnection *wrappedPeerConnection,
- dataChannel *webrtc.DataChannel,
- offer webrtc.SessionDescription,
- err error,
-) {
- peerConnection, err = newPeerConnection(logger)
- if err != nil {
- return
- }
-
- peerConnection.span.SetAttributes(attribute.String(webrtcConnTypeKey, "offer"))
-
- dataChannel, err = peerConnection.CreateDataChannel(dataChannelLabel, nil)
- if err != nil {
- err = fmt.Errorf("creating data channel: %w", err)
- peerConnection.Close()
- }
- initDataChannel(dataChannel, peerConnection, func(dc datachannel.ReadWriteCloser, dcCtx context.Context, dcSpan trace.Span) {
- metrics.Add("outbound offers answered with datachannel", 1)
- tc.mu.Lock()
- tc.stats.ConvertedOutboundConns++
- tc.mu.Unlock()
- tc.OnConn(dc, DataChannelContext{
- OfferId: offerId,
- LocalOffered: true,
- InfoHash: infoHash,
- peerConnection: peerConnection,
- Context: dcCtx,
- Span: dcSpan,
- })
- })
-
- offer, err = peerConnection.CreateOffer(nil)
- if err != nil {
- dataChannel.Close()
- peerConnection.Close()
- return
- }
-
- offer, err = setAndGatherLocalDescription(peerConnection, offer)
- if err != nil {
- dataChannel.Close()
- peerConnection.Close()
- }
- return
-}
-
-type onDetachedDataChannelFunc func(detached datachannel.ReadWriteCloser, ctx context.Context, span trace.Span)
-
-func (tc *TrackerClient) initAnsweringPeerConnection(
- peerConn *wrappedPeerConnection,
- offerContext offerContext,
-) (answer webrtc.SessionDescription, err error) {
- peerConn.span.SetAttributes(attribute.String(webrtcConnTypeKey, "answer"))
-
- timer := time.AfterFunc(30*time.Second, func() {
- peerConn.span.SetStatus(codes.Error, "answer timeout")
- metrics.Add("answering peer connections timed out", 1)
- peerConn.Close()
- })
- peerConn.OnDataChannel(func(d *webrtc.DataChannel) {
- initDataChannel(d, peerConn, func(detached datachannel.ReadWriteCloser, ctx context.Context, span trace.Span) {
- timer.Stop()
- metrics.Add("answering peer connection conversions", 1)
- tc.mu.Lock()
- tc.stats.ConvertedInboundConns++
- tc.mu.Unlock()
- tc.OnConn(detached, DataChannelContext{
- OfferId: offerContext.Id,
- LocalOffered: false,
- InfoHash: offerContext.InfoHash,
- peerConnection: peerConn,
- Context: ctx,
- Span: span,
- })
- })
- })
-
- err = peerConn.SetRemoteDescription(offerContext.SessDesc)
- if err != nil {
- return
- }
- answer, err = peerConn.CreateAnswer(nil)
- if err != nil {
- return
- }
-
- answer, err = setAndGatherLocalDescription(peerConn, answer)
- return
-}
-
-// newAnsweringPeerConnection creates a transport from a WebRTC offer and returns a WebRTC answer to be announced.
-func (tc *TrackerClient) newAnsweringPeerConnection(
- offerContext offerContext,
-) (
- peerConn *wrappedPeerConnection, answer webrtc.SessionDescription, err error,
-) {
- peerConn, err = newPeerConnection(tc.Logger)
- if err != nil {
- err = fmt.Errorf("failed to create new connection: %w", err)
- return
- }
- answer, err = tc.initAnsweringPeerConnection(peerConn, offerContext)
- if err != nil {
- peerConn.span.RecordError(err)
- peerConn.Close()
- }
- return
-}
-
-type datachannelReadWriter interface {
- datachannel.Reader
- datachannel.Writer
- io.Reader
- io.Writer
-}
-
-type ioCloserFunc func() error
-
-func (me ioCloserFunc) Close() error {
- return me()
-}
-
-func initDataChannel(
- dc *webrtc.DataChannel,
- pc *wrappedPeerConnection,
- onOpen onDetachedDataChannelFunc,
-) {
- var span trace.Span
- dc.OnClose(func() {
- span.End()
- })
- dc.OnOpen(func() {
- pc.span.AddEvent("data channel opened")
- var ctx context.Context
- ctx, span = otel.Tracer(tracerName).Start(pc.ctx, "DataChannel")
- raw, err := dc.Detach()
- if err != nil {
- // This shouldn't happen if the API is configured correctly, and we call from OnOpen.
- panic(err)
- }
- onOpen(hookDataChannelCloser(raw, pc, span, dc), ctx, span)
- })
-}
-
-// Hooks the datachannel's Close to Close the owning PeerConnection. The datachannel takes ownership
-// and responsibility for the PeerConnection.
-func hookDataChannelCloser(
- dcrwc datachannel.ReadWriteCloser,
- pc *wrappedPeerConnection,
- dataChannelSpan trace.Span,
- originalDataChannel *webrtc.DataChannel,
-) datachannel.ReadWriteCloser {
- return struct {
- datachannelReadWriter
- io.Closer
- }{
- dcrwc,
- ioCloserFunc(func() error {
- dcrwc.Close()
- pc.Close()
- originalDataChannel.Close()
- dataChannelSpan.End()
- return nil
- }),
- }
-}