package webtorrent
import (
+ "context"
+ "expvar"
"fmt"
- "log"
+ "io"
"sync"
+ "time"
+ "github.com/anacrolix/log"
+ "github.com/anacrolix/missinggo/v2/pproffd"
"github.com/pion/datachannel"
+ "github.com/pion/webrtc/v3"
+ "go.opentelemetry.io/otel"
+ "go.opentelemetry.io/otel/attribute"
+ "go.opentelemetry.io/otel/codes"
+ "go.opentelemetry.io/otel/trace"
+)
- "github.com/pion/webrtc/v2"
+const (
+ dataChannelLabel = "webrtc-datachannel"
)
var (
- api = func() *webrtc.API {
- // Enable the detach API (since it's non-standard but more idiomatic)
- // (This should be done once globally)
- s := webrtc.SettingEngine{}
+ metrics = expvar.NewMap("webtorrent")
+ api = func() *webrtc.API {
+ // Enable the detach API (since it's non-standard but more idiomatic).
s.DetachDataChannels()
return webrtc.NewAPI(webrtc.WithSettingEngine(s))
}()
newPeerConnectionMu sync.Mutex
)
-func newPeerConnection() (*webrtc.PeerConnection, error) {
- newPeerConnectionMu.Lock()
- defer newPeerConnectionMu.Unlock()
- return api.NewPeerConnection(config)
+type wrappedPeerConnection struct {
+ *webrtc.PeerConnection
+ closeMu sync.Mutex
+ pproffd.CloseWrapper
+ span trace.Span
+ ctx context.Context
}
-type Transport struct {
- pc *webrtc.PeerConnection
- dc *webrtc.DataChannel
-
- lock sync.Mutex
+func (me *wrappedPeerConnection) Close() error {
+ me.closeMu.Lock()
+ defer me.closeMu.Unlock()
+ err := me.CloseWrapper.Close()
+ me.span.End()
+ return err
}
-// NewTransport creates a transport and returns a WebRTC offer to be announced
-func NewTransport() (*Transport, webrtc.SessionDescription, error) {
- peerConnection, err := newPeerConnection()
+func newPeerConnection(logger log.Logger) (*wrappedPeerConnection, error) {
+ newPeerConnectionMu.Lock()
+ defer newPeerConnectionMu.Unlock()
+ ctx, span := otel.Tracer(tracerName).Start(context.Background(), "PeerConnection")
+ pc, err := api.NewPeerConnection(config)
if err != nil {
- return nil, webrtc.SessionDescription{}, fmt.Errorf("failed to peer connection: %v\n", err)
+ span.SetStatus(codes.Error, err.Error())
+ span.RecordError(err)
+ span.End()
+ return nil, err
}
- dataChannel, err := peerConnection.CreateDataChannel("webrtc-datachannel", nil)
- if err != nil {
- return nil, webrtc.SessionDescription{}, fmt.Errorf("failed to data channel: %v\n", err)
+ wpc := &wrappedPeerConnection{
+ PeerConnection: pc,
+ CloseWrapper: pproffd.NewCloseWrapper(pc),
+ ctx: ctx,
+ span: span,
}
- peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) {
- fmt.Printf("ICE Connection State has changed: %s\n", connectionState.String())
+ // If the state change handler intends to call Close, it should call it on the wrapper.
+ wpc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
+ logger.Levelf(log.Warning, "webrtc PeerConnection state changed to %v", state)
+ span.AddEvent("connection state changed", trace.WithAttributes(attribute.String("state", state.String())))
})
+ return wpc, nil
+}
- dataChannel.OnMessage(func(msg webrtc.DataChannelMessage) {
- fmt.Printf("Message from DataChannel '%s': '%s'\n", dataChannel.Label(), string(msg.Data))
- })
- offer, err := peerConnection.CreateOffer(nil)
+func setAndGatherLocalDescription(peerConnection *wrappedPeerConnection, sdp webrtc.SessionDescription) (_ webrtc.SessionDescription, err error) {
+ gatherComplete := webrtc.GatheringCompletePromise(peerConnection.PeerConnection)
+ peerConnection.span.AddEvent("setting local description")
+ err = peerConnection.SetLocalDescription(sdp)
if err != nil {
- return nil, webrtc.SessionDescription{}, fmt.Errorf("failed to create offer: %v\n", err)
+ err = fmt.Errorf("setting local description: %w", err)
+ return
}
- err = peerConnection.SetLocalDescription(offer)
+ <-gatherComplete
+ peerConnection.span.AddEvent("gathering complete")
+ return *peerConnection.LocalDescription(), nil
+}
+
+// newOffer creates a transport and returns a WebRTC offer to be announced. See
+// https://github.com/pion/webrtc/blob/master/examples/data-channels/jsfiddle/main.go for what this is modelled on.
+func (tc *TrackerClient) newOffer(
+ logger log.Logger,
+ offerId string,
+ infoHash [20]byte,
+) (
+ peerConnection *wrappedPeerConnection,
+ dataChannel *webrtc.DataChannel,
+ offer webrtc.SessionDescription,
+ err error,
+) {
+ peerConnection, err = newPeerConnection(logger)
if err != nil {
- return nil, webrtc.SessionDescription{}, fmt.Errorf("failed to set local description: %v\n", err)
+ return
}
- t := &Transport{pc: peerConnection, dc: dataChannel}
- return t, offer, nil
-}
+ peerConnection.span.SetAttributes(attribute.String(webrtcConnTypeKey, "offer"))
-// NewTransportFromOffer creates a transport from a WebRTC offer and and returns a WebRTC answer to
-// be announced.
-func NewTransportFromOffer(offer webrtc.SessionDescription, onOpen func(datachannel.ReadWriteCloser)) (*Transport, webrtc.SessionDescription, error) {
- peerConnection, err := newPeerConnection()
+ dataChannel, err = peerConnection.CreateDataChannel(dataChannelLabel, nil)
if err != nil {
- return nil, webrtc.SessionDescription{}, fmt.Errorf("failed to peer connection: %v", err)
+ err = fmt.Errorf("creating data channel: %w", err)
+ peerConnection.Close()
}
- peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) {
- fmt.Printf("ICE Connection State has changed: %s\n", connectionState.String())
+ initDataChannel(dataChannel, peerConnection, func(dc datachannel.ReadWriteCloser, dcCtx context.Context, dcSpan trace.Span) {
+ metrics.Add("outbound offers answered with datachannel", 1)
+ tc.mu.Lock()
+ tc.stats.ConvertedOutboundConns++
+ tc.mu.Unlock()
+ tc.OnConn(dc, DataChannelContext{
+ OfferId: offerId,
+ LocalOffered: true,
+ InfoHash: infoHash,
+ peerConnection: peerConnection,
+ Context: dcCtx,
+ Span: dcSpan,
+ })
})
- t := &Transport{pc: peerConnection}
- peerConnection.OnDataChannel(func(d *webrtc.DataChannel) {
- fmt.Printf("New DataChannel %s %d\n", d.Label(), d.ID())
- t.lock.Lock()
- t.dc = d
- t.lock.Unlock()
- t.handleOpen(onOpen)
- })
+ offer, err = peerConnection.CreateOffer(nil)
+ if err != nil {
+ dataChannel.Close()
+ peerConnection.Close()
+ return
+ }
- err = peerConnection.SetRemoteDescription(offer)
+ offer, err = setAndGatherLocalDescription(peerConnection, offer)
if err != nil {
- return nil, webrtc.SessionDescription{}, fmt.Errorf("%v", err)
+ dataChannel.Close()
+ peerConnection.Close()
}
- answer, err := peerConnection.CreateAnswer(nil)
+ return
+}
+
+type onDetachedDataChannelFunc func(detached datachannel.ReadWriteCloser, ctx context.Context, span trace.Span)
+
+func (tc *TrackerClient) initAnsweringPeerConnection(
+ peerConn *wrappedPeerConnection,
+ offerContext offerContext,
+) (answer webrtc.SessionDescription, err error) {
+ peerConn.span.SetAttributes(attribute.String(webrtcConnTypeKey, "answer"))
+
+ timer := time.AfterFunc(30*time.Second, func() {
+ peerConn.span.SetStatus(codes.Error, "answer timeout")
+ metrics.Add("answering peer connections timed out", 1)
+ peerConn.Close()
+ })
+ peerConn.OnDataChannel(func(d *webrtc.DataChannel) {
+ initDataChannel(d, peerConn, func(detached datachannel.ReadWriteCloser, ctx context.Context, span trace.Span) {
+ timer.Stop()
+ metrics.Add("answering peer connection conversions", 1)
+ tc.mu.Lock()
+ tc.stats.ConvertedInboundConns++
+ tc.mu.Unlock()
+ tc.OnConn(detached, DataChannelContext{
+ OfferId: offerContext.Id,
+ LocalOffered: false,
+ InfoHash: offerContext.InfoHash,
+ peerConnection: peerConn,
+ Context: ctx,
+ Span: span,
+ })
+ })
+ })
+
+ err = peerConn.SetRemoteDescription(offerContext.SessDesc)
if err != nil {
- return nil, webrtc.SessionDescription{}, fmt.Errorf("%v", err)
+ return
}
- err = peerConnection.SetLocalDescription(answer)
+ answer, err = peerConn.CreateAnswer(nil)
if err != nil {
- return nil, webrtc.SessionDescription{}, fmt.Errorf("%v", err)
+ return
}
- return t, answer, nil
+ answer, err = setAndGatherLocalDescription(peerConn, answer)
+ return
}
-// SetAnswer sets the WebRTC answer
-func (t *Transport) SetAnswer(answer webrtc.SessionDescription, onOpen func(datachannel.ReadWriteCloser)) error {
- t.handleOpen(onOpen)
-
- err := t.pc.SetRemoteDescription(answer)
+// newAnsweringPeerConnection creates a transport from a WebRTC offer and returns a WebRTC answer to be announced.
+func (tc *TrackerClient) newAnsweringPeerConnection(
+ offerContext offerContext,
+) (
+ peerConn *wrappedPeerConnection, answer webrtc.SessionDescription, err error,
+) {
+ peerConn, err = newPeerConnection(tc.Logger)
if err != nil {
- return err
+ err = fmt.Errorf("failed to create new connection: %w", err)
+ return
}
- return nil
+ answer, err = tc.initAnsweringPeerConnection(peerConn, offerContext)
+ if err != nil {
+ peerConn.span.RecordError(err)
+ peerConn.Close()
+ }
+ return
}
-func (t *Transport) handleOpen(onOpen func(datachannel.ReadWriteCloser)) {
- t.lock.Lock()
- dc := t.dc
- t.lock.Unlock()
- dc.OnOpen(func() {
- fmt.Printf("Data channel '%s'-'%d' open.\n", dc.Label(), dc.ID())
+type datachannelReadWriter interface {
+ datachannel.Reader
+ datachannel.Writer
+ io.Reader
+ io.Writer
+}
- // Detach the data channel
+type ioCloserFunc func() error
+
+func (me ioCloserFunc) Close() error {
+ return me()
+}
+
+func initDataChannel(
+ dc *webrtc.DataChannel,
+ pc *wrappedPeerConnection,
+ onOpen onDetachedDataChannelFunc,
+) {
+ var span trace.Span
+ dc.OnClose(func() {
+ span.End()
+ })
+ dc.OnOpen(func() {
+ pc.span.AddEvent("data channel opened")
+ var ctx context.Context
+ ctx, span = otel.Tracer(tracerName).Start(pc.ctx, "DataChannel")
raw, err := dc.Detach()
if err != nil {
- log.Fatalf("failed to detach: %v", err) // TODO: Error handling
+ // This shouldn't happen if the API is configured correctly, and we call from OnOpen.
+ panic(err)
}
-
- onOpen(raw)
+ onOpen(hookDataChannelCloser(raw, pc, span, dc), ctx, span)
})
}
+
+// Hooks the datachannel's Close to Close the owning PeerConnection. The datachannel takes ownership
+// and responsibility for the PeerConnection.
+func hookDataChannelCloser(
+ dcrwc datachannel.ReadWriteCloser,
+ pc *wrappedPeerConnection,
+ dataChannelSpan trace.Span,
+ originalDataChannel *webrtc.DataChannel,
+) datachannel.ReadWriteCloser {
+ return struct {
+ datachannelReadWriter
+ io.Closer
+ }{
+ dcrwc,
+ ioCloserFunc(func() error {
+ dcrwc.Close()
+ pc.Close()
+ originalDataChannel.Close()
+ dataChannelSpan.End()
+ return nil
+ }),
+ }
+}