client.go | 3 +-- peer-conn-msg-writer.go | 4 ++-- torrent.go | 3 +-- webrtc.go | 4 ++-- webtorrent/tracker-client.go | 3 +-- webtorrent/transport.go | 79 ++++++++++++++++++++++++++++++++++++++--------------- wstracker.go | 3 +-- diff --git a/client.go b/client.go index acf5732ec16a79295e5a4ea29b4e2db48a13f256..fe3af23997e5a6649faf9385e50a7d8bd41f03b0 100644 --- a/client.go +++ b/client.go @@ -33,7 +33,6 @@ "github.com/cespare/xxhash" "github.com/davecgh/go-spew/spew" "github.com/dustin/go-humanize" gbtree "github.com/google/btree" - "github.com/pion/datachannel" "github.com/pion/webrtc/v4" "github.com/anacrolix/torrent/bencode" @@ -333,7 +332,7 @@ Proxy: cl.config.HTTPProxy, WebsocketTrackerHttpHeader: cl.config.WebsocketTrackerHttpHeader, ICEServers: cl.ICEServers(), DialContext: cl.config.TrackerDialContext, - OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) { + OnConn: func(dc webtorrent.DataChannelConn, dcc webtorrent.DataChannelContext) { cl.lock() defer cl.unlock() t, ok := cl.torrentsByShortHash[dcc.InfoHash] diff --git a/peer-conn-msg-writer.go b/peer-conn-msg-writer.go index 4f17e5edec4886560cf56e7b2974723f6cb82ee0..6132460823090e2a64b39d0fb9132e8ca67d929c 100644 --- a/peer-conn-msg-writer.go +++ b/peer-conn-msg-writer.go @@ -97,10 +97,10 @@ panic("expected non-empty front buffer") } var err error for frontBuf.Len() != 0 { - // Limit write size for WebRTC. See https://github.com/pion/datachannel/issues/59. - next := frontBuf.Next(1<<16 - 1) + next := frontBuf.Bytes() var n int n, err = cn.w.Write(next) + frontBuf.Next(n) if err == nil && n != len(next) { panic("expected full write") } diff --git a/torrent.go b/torrent.go index f8115f6562122ad98c918adcf2d6cd6b58fff4a7..cbd2c05006b8ff73cd878afb30200200039b8941 100644 --- a/torrent.go +++ b/torrent.go @@ -32,7 +32,6 @@ "github.com/anacrolix/missinggo/v2/bitmap" "github.com/anacrolix/missinggo/v2/pubsub" "github.com/anacrolix/multiless" "github.com/anacrolix/sync" - "github.com/pion/datachannel" "github.com/pion/webrtc/v4" "golang.org/x/sync/errgroup" @@ -1851,7 +1850,7 @@ return true } func (t *Torrent) onWebRtcConn( - c datachannel.ReadWriteCloser, + c webtorrent.DataChannelConn, dcc webtorrent.DataChannelContext, ) { defer c.Close() diff --git a/webrtc.go b/webrtc.go index 918159dbb79110ef502dcd5664a153f4ea61027d..02a645eca346b0b5280f2cd79964add136fc47ed 100644 --- a/webrtc.go +++ b/webrtc.go @@ -1,11 +1,11 @@ package torrent import ( + "io" "net" "strconv" "time" - "github.com/pion/datachannel" "github.com/pion/webrtc/v4" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" @@ -17,7 +17,7 @@ const webrtcNetwork = "webrtc" type webrtcNetConn struct { - datachannel.ReadWriteCloser + io.ReadWriteCloser webtorrent.DataChannelContext } diff --git a/webtorrent/tracker-client.go b/webtorrent/tracker-client.go index 945097f30dd7fc2d1838eec4b15a221e20e6ab21..ec2232d132cb1d941d03138d62b290f808a8f008 100644 --- a/webtorrent/tracker-client.go +++ b/webtorrent/tracker-client.go @@ -12,7 +12,6 @@ g "github.com/anacrolix/generics" "github.com/anacrolix/log" "github.com/gorilla/websocket" - "github.com/pion/datachannel" "github.com/pion/webrtc/v4" "go.opentelemetry.io/otel/trace" @@ -85,7 +84,7 @@ func (me *DataChannelContext) GetSelectedIceCandidatePair() (*webrtc.ICECandidatePair, error) { return me.peerConnection.SCTP().Transport().ICETransport().GetSelectedCandidatePair() } -type onDataChannelOpen func(_ datachannel.ReadWriteCloser, dcc DataChannelContext) +type onDataChannelOpen func(_ DataChannelConn, dcc DataChannelContext) func (tc *TrackerClient) doWebsocket() error { metrics.Add("websocket dials", 1) diff --git a/webtorrent/transport.go b/webtorrent/transport.go index 3dfc84ab901c2857eb8a994ec86a776d1445f6ee..28f32121a7030abc09f6854f3aa93213806480d5 100644 --- a/webtorrent/transport.go +++ b/webtorrent/transport.go @@ -4,11 +4,14 @@ import ( "context" "expvar" "fmt" - "io" + "os" + "strconv" "sync" "time" + g "github.com/anacrolix/generics" "github.com/anacrolix/log" + "github.com/anacrolix/missinggo/v2/panicif" "github.com/anacrolix/missinggo/v2/pproffd" "github.com/pion/datachannel" "github.com/pion/webrtc/v4" @@ -132,7 +135,7 @@ if err != nil { err = fmt.Errorf("creating data channel: %w", err) peerConnection.Close() } - initDataChannel(dataChannel, peerConnection, func(dc datachannel.ReadWriteCloser, dcCtx context.Context, dcSpan trace.Span) { + initDataChannel(dataChannel, peerConnection, func(dc DataChannelConn, dcCtx context.Context, dcSpan trace.Span) { metrics.Add("outbound offers answered with datachannel", 1) tc.mu.Lock() tc.stats.ConvertedOutboundConns++ @@ -162,7 +165,7 @@ } return } -type onDetachedDataChannelFunc func(detached datachannel.ReadWriteCloser, ctx context.Context, span trace.Span) +type onDetachedDataChannelFunc func(detached DataChannelConn, ctx context.Context, span trace.Span) func (tc *TrackerClient) initAnsweringPeerConnection( peerConn *wrappedPeerConnection, @@ -176,7 +179,7 @@ metrics.Add("answering peer connections timed out", 1) peerConn.Close() }) peerConn.OnDataChannel(func(d *webrtc.DataChannel) { - initDataChannel(d, peerConn, func(detached datachannel.ReadWriteCloser, ctx context.Context, span trace.Span) { + initDataChannel(d, peerConn, func(detached DataChannelConn, ctx context.Context, span trace.Span) { timer.Stop() metrics.Add("answering peer connection conversions", 1) tc.mu.Lock() @@ -225,13 +228,6 @@ } return } -type datachannelReadWriter interface { - datachannel.Reader - datachannel.Writer - io.Reader - io.Writer -} - type ioCloserFunc func() error func (me ioCloserFunc) Close() error { @@ -256,29 +252,68 @@ if err != nil { // This shouldn't happen if the API is configured correctly, and we call from OnOpen. panic(err) } - onOpen(hookDataChannelCloser(raw, pc, span, dc), ctx, span) + onOpen(wrapDataChannel(raw, pc, span, dc), ctx, span) }) } -// Hooks the datachannel's Close to Close the owning PeerConnection. The datachannel takes ownership -// and responsibility for the PeerConnection. -func hookDataChannelCloser( +// WebRTC data channel wrapper that supports operating as a peer conn ReadWriteCloser. +type DataChannelConn struct { + ioCloserFunc + rawDataChannel datachannel.ReadWriteCloser +} + +func (d DataChannelConn) Read(p []byte) (int, error) { + return d.rawDataChannel.Read(p) +} + +// Limit write size for WebRTC data channels. See https://github.com/pion/datachannel/issues/59. The +// default used to be (1<<16)-1. This will be set to the new appropriate value if it's discovered to +// still be a limitation. Set WEBTORRENT_MAX_WRITE_SIZE to experiment with it. +var maxWriteSize = g.None[int]() + +func init() { + s, ok := os.LookupEnv("WEBTORRENT_MAX_WRITE_SIZE") + if !ok { + return + } + i64, err := strconv.ParseInt(s, 0, 0) + panicif.Err(err) + maxWriteSize = g.Some(int(i64)) +} + +func (d DataChannelConn) Write(p []byte) (n int, err error) { + for { + p1 := p + if maxWriteSize.Ok { + p1 = p1[:min(len(p1), maxWriteSize.Value)] + } + var n1 int + n1, err = d.rawDataChannel.Write(p1) + n += n1 + p = p[n1:] + if err != nil { + return + } + if len(p) == 0 { + return + } + } +} + +func wrapDataChannel( dcrwc datachannel.ReadWriteCloser, pc *wrappedPeerConnection, dataChannelSpan trace.Span, originalDataChannel *webrtc.DataChannel, -) datachannel.ReadWriteCloser { - return struct { - datachannelReadWriter - io.Closer - }{ - dcrwc, - ioCloserFunc(func() error { +) DataChannelConn { + return DataChannelConn{ + ioCloserFunc: ioCloserFunc(func() error { dcrwc.Close() pc.Close() originalDataChannel.Close() dataChannelSpan.End() return nil }), + rawDataChannel: dcrwc, } } diff --git a/wstracker.go b/wstracker.go index 7f53987bc93a3cc7366af1c7e0b242a41fa0e712..0e71e4e4a00f4d3f711b3ed87b3daf7eeadf5954 100644 --- a/wstracker.go +++ b/wstracker.go @@ -10,7 +10,6 @@ "sync" "github.com/anacrolix/log" "github.com/gorilla/websocket" - "github.com/pion/datachannel" "github.com/pion/webrtc/v4" "github.com/anacrolix/torrent/tracker" @@ -43,7 +42,7 @@ type websocketTrackers struct { PeerId [20]byte Logger log.Logger GetAnnounceRequest func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) - OnConn func(datachannel.ReadWriteCloser, webtorrent.DataChannelContext) + OnConn func(webtorrent.DataChannelConn, webtorrent.DataChannelContext) mu sync.Mutex clients map[string]*refCountedWebtorrentTrackerClient Proxy httpTracker.ProxyFunc