From edb5b0983f6a34508fefa53e173dee2d13d20683 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Fri, 18 Oct 2024 10:57:59 +1100 Subject: [PATCH] Only apply WebRTC data channel write limit to webtorrent peer conns --- client.go | 3 +- peer-conn-msg-writer.go | 4 +- torrent.go | 3 +- webrtc.go | 4 +- webtorrent/tracker-client.go | 3 +- webtorrent/transport.go | 79 ++++++++++++++++++++++++++---------- wstracker.go | 3 +- 7 files changed, 65 insertions(+), 34 deletions(-) diff --git a/client.go b/client.go index acf5732e..fe3af239 100644 --- a/client.go +++ b/client.go @@ -33,7 +33,6 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/dustin/go-humanize" gbtree "github.com/google/btree" - "github.com/pion/datachannel" "github.com/pion/webrtc/v4" "github.com/anacrolix/torrent/bencode" @@ -333,7 +332,7 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) { WebsocketTrackerHttpHeader: cl.config.WebsocketTrackerHttpHeader, ICEServers: cl.ICEServers(), DialContext: cl.config.TrackerDialContext, - OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) { + OnConn: func(dc webtorrent.DataChannelConn, dcc webtorrent.DataChannelContext) { cl.lock() defer cl.unlock() t, ok := cl.torrentsByShortHash[dcc.InfoHash] diff --git a/peer-conn-msg-writer.go b/peer-conn-msg-writer.go index 4f17e5ed..61324608 100644 --- a/peer-conn-msg-writer.go +++ b/peer-conn-msg-writer.go @@ -97,10 +97,10 @@ func (cn *peerConnMsgWriter) run(keepAliveTimeout time.Duration) { } var err error for frontBuf.Len() != 0 { - // Limit write size for WebRTC. See https://github.com/pion/datachannel/issues/59. - next := frontBuf.Next(1<<16 - 1) + next := frontBuf.Bytes() var n int n, err = cn.w.Write(next) + frontBuf.Next(n) if err == nil && n != len(next) { panic("expected full write") } diff --git a/torrent.go b/torrent.go index f8115f65..cbd2c050 100644 --- a/torrent.go +++ b/torrent.go @@ -32,7 +32,6 @@ import ( "github.com/anacrolix/missinggo/v2/pubsub" "github.com/anacrolix/multiless" "github.com/anacrolix/sync" - "github.com/pion/datachannel" "github.com/pion/webrtc/v4" "golang.org/x/sync/errgroup" @@ -1851,7 +1850,7 @@ func (t *Torrent) seeding() bool { } func (t *Torrent) onWebRtcConn( - c datachannel.ReadWriteCloser, + c webtorrent.DataChannelConn, dcc webtorrent.DataChannelContext, ) { defer c.Close() diff --git a/webrtc.go b/webrtc.go index 918159db..02a645ec 100644 --- a/webrtc.go +++ b/webrtc.go @@ -1,11 +1,11 @@ package torrent import ( + "io" "net" "strconv" "time" - "github.com/pion/datachannel" "github.com/pion/webrtc/v4" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" @@ -17,7 +17,7 @@ import ( const webrtcNetwork = "webrtc" type webrtcNetConn struct { - datachannel.ReadWriteCloser + io.ReadWriteCloser webtorrent.DataChannelContext } diff --git a/webtorrent/tracker-client.go b/webtorrent/tracker-client.go index 945097f3..ec2232d1 100644 --- a/webtorrent/tracker-client.go +++ b/webtorrent/tracker-client.go @@ -12,7 +12,6 @@ import ( g "github.com/anacrolix/generics" "github.com/anacrolix/log" "github.com/gorilla/websocket" - "github.com/pion/datachannel" "github.com/pion/webrtc/v4" "go.opentelemetry.io/otel/trace" @@ -85,7 +84,7 @@ func (me *DataChannelContext) GetSelectedIceCandidatePair() (*webrtc.ICECandidat return me.peerConnection.SCTP().Transport().ICETransport().GetSelectedCandidatePair() } -type onDataChannelOpen func(_ datachannel.ReadWriteCloser, dcc DataChannelContext) +type onDataChannelOpen func(_ DataChannelConn, dcc DataChannelContext) func (tc *TrackerClient) doWebsocket() error { metrics.Add("websocket dials", 1) diff --git a/webtorrent/transport.go b/webtorrent/transport.go index 3dfc84ab..28f32121 100644 --- a/webtorrent/transport.go +++ b/webtorrent/transport.go @@ -4,11 +4,14 @@ import ( "context" "expvar" "fmt" - "io" + "os" + "strconv" "sync" "time" + g "github.com/anacrolix/generics" "github.com/anacrolix/log" + "github.com/anacrolix/missinggo/v2/panicif" "github.com/anacrolix/missinggo/v2/pproffd" "github.com/pion/datachannel" "github.com/pion/webrtc/v4" @@ -132,7 +135,7 @@ func (tc *TrackerClient) newOffer( err = fmt.Errorf("creating data channel: %w", err) peerConnection.Close() } - initDataChannel(dataChannel, peerConnection, func(dc datachannel.ReadWriteCloser, dcCtx context.Context, dcSpan trace.Span) { + initDataChannel(dataChannel, peerConnection, func(dc DataChannelConn, dcCtx context.Context, dcSpan trace.Span) { metrics.Add("outbound offers answered with datachannel", 1) tc.mu.Lock() tc.stats.ConvertedOutboundConns++ @@ -162,7 +165,7 @@ func (tc *TrackerClient) newOffer( return } -type onDetachedDataChannelFunc func(detached datachannel.ReadWriteCloser, ctx context.Context, span trace.Span) +type onDetachedDataChannelFunc func(detached DataChannelConn, ctx context.Context, span trace.Span) func (tc *TrackerClient) initAnsweringPeerConnection( peerConn *wrappedPeerConnection, @@ -176,7 +179,7 @@ func (tc *TrackerClient) initAnsweringPeerConnection( peerConn.Close() }) peerConn.OnDataChannel(func(d *webrtc.DataChannel) { - initDataChannel(d, peerConn, func(detached datachannel.ReadWriteCloser, ctx context.Context, span trace.Span) { + initDataChannel(d, peerConn, func(detached DataChannelConn, ctx context.Context, span trace.Span) { timer.Stop() metrics.Add("answering peer connection conversions", 1) tc.mu.Lock() @@ -225,13 +228,6 @@ func (tc *TrackerClient) newAnsweringPeerConnection( return } -type datachannelReadWriter interface { - datachannel.Reader - datachannel.Writer - io.Reader - io.Writer -} - type ioCloserFunc func() error func (me ioCloserFunc) Close() error { @@ -256,29 +252,68 @@ func initDataChannel( // This shouldn't happen if the API is configured correctly, and we call from OnOpen. panic(err) } - onOpen(hookDataChannelCloser(raw, pc, span, dc), ctx, span) + onOpen(wrapDataChannel(raw, pc, span, dc), ctx, span) }) } -// Hooks the datachannel's Close to Close the owning PeerConnection. The datachannel takes ownership -// and responsibility for the PeerConnection. -func hookDataChannelCloser( +// WebRTC data channel wrapper that supports operating as a peer conn ReadWriteCloser. +type DataChannelConn struct { + ioCloserFunc + rawDataChannel datachannel.ReadWriteCloser +} + +func (d DataChannelConn) Read(p []byte) (int, error) { + return d.rawDataChannel.Read(p) +} + +// Limit write size for WebRTC data channels. See https://github.com/pion/datachannel/issues/59. The +// default used to be (1<<16)-1. This will be set to the new appropriate value if it's discovered to +// still be a limitation. Set WEBTORRENT_MAX_WRITE_SIZE to experiment with it. +var maxWriteSize = g.None[int]() + +func init() { + s, ok := os.LookupEnv("WEBTORRENT_MAX_WRITE_SIZE") + if !ok { + return + } + i64, err := strconv.ParseInt(s, 0, 0) + panicif.Err(err) + maxWriteSize = g.Some(int(i64)) +} + +func (d DataChannelConn) Write(p []byte) (n int, err error) { + for { + p1 := p + if maxWriteSize.Ok { + p1 = p1[:min(len(p1), maxWriteSize.Value)] + } + var n1 int + n1, err = d.rawDataChannel.Write(p1) + n += n1 + p = p[n1:] + if err != nil { + return + } + if len(p) == 0 { + return + } + } +} + +func wrapDataChannel( dcrwc datachannel.ReadWriteCloser, pc *wrappedPeerConnection, dataChannelSpan trace.Span, originalDataChannel *webrtc.DataChannel, -) datachannel.ReadWriteCloser { - return struct { - datachannelReadWriter - io.Closer - }{ - dcrwc, - ioCloserFunc(func() error { +) DataChannelConn { + return DataChannelConn{ + ioCloserFunc: ioCloserFunc(func() error { dcrwc.Close() pc.Close() originalDataChannel.Close() dataChannelSpan.End() return nil }), + rawDataChannel: dcrwc, } } diff --git a/wstracker.go b/wstracker.go index 7f53987b..0e71e4e4 100644 --- a/wstracker.go +++ b/wstracker.go @@ -10,7 +10,6 @@ import ( "github.com/anacrolix/log" "github.com/gorilla/websocket" - "github.com/pion/datachannel" "github.com/pion/webrtc/v4" "github.com/anacrolix/torrent/tracker" @@ -43,7 +42,7 @@ type websocketTrackers struct { PeerId [20]byte Logger log.Logger GetAnnounceRequest func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) - OnConn func(datachannel.ReadWriteCloser, webtorrent.DataChannelContext) + OnConn func(webtorrent.DataChannelConn, webtorrent.DataChannelContext) mu sync.Mutex clients map[string]*refCountedWebtorrentTrackerClient Proxy httpTracker.ProxyFunc -- 2.48.1