]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Only apply WebRTC data channel write limit to webtorrent peer conns
authorMatt Joiner <anacrolix@gmail.com>
Thu, 17 Oct 2024 23:57:59 +0000 (10:57 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Thu, 17 Oct 2024 23:57:59 +0000 (10:57 +1100)
client.go
peer-conn-msg-writer.go
torrent.go
webrtc.go
webtorrent/tracker-client.go
webtorrent/transport.go
wstracker.go

index acf5732ec16a79295e5a4ea29b4e2db48a13f256..fe3af23997e5a6649faf9385e50a7d8bd41f03b0 100644 (file)
--- a/client.go
+++ b/client.go
@@ -33,7 +33,6 @@ import (
        "github.com/davecgh/go-spew/spew"
        "github.com/dustin/go-humanize"
        gbtree "github.com/google/btree"
-       "github.com/pion/datachannel"
        "github.com/pion/webrtc/v4"
 
        "github.com/anacrolix/torrent/bencode"
@@ -333,7 +332,7 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) {
                WebsocketTrackerHttpHeader: cl.config.WebsocketTrackerHttpHeader,
                ICEServers:                 cl.ICEServers(),
                DialContext:                cl.config.TrackerDialContext,
-               OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
+               OnConn: func(dc webtorrent.DataChannelConn, dcc webtorrent.DataChannelContext) {
                        cl.lock()
                        defer cl.unlock()
                        t, ok := cl.torrentsByShortHash[dcc.InfoHash]
index 4f17e5edec4886560cf56e7b2974723f6cb82ee0..6132460823090e2a64b39d0fb9132e8ca67d929c 100644 (file)
@@ -97,10 +97,10 @@ func (cn *peerConnMsgWriter) run(keepAliveTimeout time.Duration) {
                }
                var err error
                for frontBuf.Len() != 0 {
-                       // Limit write size for WebRTC. See https://github.com/pion/datachannel/issues/59.
-                       next := frontBuf.Next(1<<16 - 1)
+                       next := frontBuf.Bytes()
                        var n int
                        n, err = cn.w.Write(next)
+                       frontBuf.Next(n)
                        if err == nil && n != len(next) {
                                panic("expected full write")
                        }
index f8115f6562122ad98c918adcf2d6cd6b58fff4a7..cbd2c05006b8ff73cd878afb30200200039b8941 100644 (file)
@@ -32,7 +32,6 @@ import (
        "github.com/anacrolix/missinggo/v2/pubsub"
        "github.com/anacrolix/multiless"
        "github.com/anacrolix/sync"
-       "github.com/pion/datachannel"
        "github.com/pion/webrtc/v4"
        "golang.org/x/sync/errgroup"
 
@@ -1851,7 +1850,7 @@ func (t *Torrent) seeding() bool {
 }
 
 func (t *Torrent) onWebRtcConn(
-       c datachannel.ReadWriteCloser,
+       c webtorrent.DataChannelConn,
        dcc webtorrent.DataChannelContext,
 ) {
        defer c.Close()
index 918159dbb79110ef502dcd5664a153f4ea61027d..02a645eca346b0b5280f2cd79964add136fc47ed 100644 (file)
--- a/webrtc.go
+++ b/webrtc.go
@@ -1,11 +1,11 @@
 package torrent
 
 import (
+       "io"
        "net"
        "strconv"
        "time"
 
-       "github.com/pion/datachannel"
        "github.com/pion/webrtc/v4"
        "go.opentelemetry.io/otel"
        "go.opentelemetry.io/otel/attribute"
@@ -17,7 +17,7 @@ import (
 const webrtcNetwork = "webrtc"
 
 type webrtcNetConn struct {
-       datachannel.ReadWriteCloser
+       io.ReadWriteCloser
        webtorrent.DataChannelContext
 }
 
index 945097f30dd7fc2d1838eec4b15a221e20e6ab21..ec2232d132cb1d941d03138d62b290f808a8f008 100644 (file)
@@ -12,7 +12,6 @@ import (
        g "github.com/anacrolix/generics"
        "github.com/anacrolix/log"
        "github.com/gorilla/websocket"
-       "github.com/pion/datachannel"
        "github.com/pion/webrtc/v4"
        "go.opentelemetry.io/otel/trace"
 
@@ -85,7 +84,7 @@ func (me *DataChannelContext) GetSelectedIceCandidatePair() (*webrtc.ICECandidat
        return me.peerConnection.SCTP().Transport().ICETransport().GetSelectedCandidatePair()
 }
 
-type onDataChannelOpen func(_ datachannel.ReadWriteCloser, dcc DataChannelContext)
+type onDataChannelOpen func(_ DataChannelConn, dcc DataChannelContext)
 
 func (tc *TrackerClient) doWebsocket() error {
        metrics.Add("websocket dials", 1)
index 3dfc84ab901c2857eb8a994ec86a776d1445f6ee..28f32121a7030abc09f6854f3aa93213806480d5 100644 (file)
@@ -4,11 +4,14 @@ import (
        "context"
        "expvar"
        "fmt"
-       "io"
+       "os"
+       "strconv"
        "sync"
        "time"
 
+       g "github.com/anacrolix/generics"
        "github.com/anacrolix/log"
+       "github.com/anacrolix/missinggo/v2/panicif"
        "github.com/anacrolix/missinggo/v2/pproffd"
        "github.com/pion/datachannel"
        "github.com/pion/webrtc/v4"
@@ -132,7 +135,7 @@ func (tc *TrackerClient) newOffer(
                err = fmt.Errorf("creating data channel: %w", err)
                peerConnection.Close()
        }
-       initDataChannel(dataChannel, peerConnection, func(dc datachannel.ReadWriteCloser, dcCtx context.Context, dcSpan trace.Span) {
+       initDataChannel(dataChannel, peerConnection, func(dc DataChannelConn, dcCtx context.Context, dcSpan trace.Span) {
                metrics.Add("outbound offers answered with datachannel", 1)
                tc.mu.Lock()
                tc.stats.ConvertedOutboundConns++
@@ -162,7 +165,7 @@ func (tc *TrackerClient) newOffer(
        return
 }
 
-type onDetachedDataChannelFunc func(detached datachannel.ReadWriteCloser, ctx context.Context, span trace.Span)
+type onDetachedDataChannelFunc func(detached DataChannelConn, ctx context.Context, span trace.Span)
 
 func (tc *TrackerClient) initAnsweringPeerConnection(
        peerConn *wrappedPeerConnection,
@@ -176,7 +179,7 @@ func (tc *TrackerClient) initAnsweringPeerConnection(
                peerConn.Close()
        })
        peerConn.OnDataChannel(func(d *webrtc.DataChannel) {
-               initDataChannel(d, peerConn, func(detached datachannel.ReadWriteCloser, ctx context.Context, span trace.Span) {
+               initDataChannel(d, peerConn, func(detached DataChannelConn, ctx context.Context, span trace.Span) {
                        timer.Stop()
                        metrics.Add("answering peer connection conversions", 1)
                        tc.mu.Lock()
@@ -225,13 +228,6 @@ func (tc *TrackerClient) newAnsweringPeerConnection(
        return
 }
 
-type datachannelReadWriter interface {
-       datachannel.Reader
-       datachannel.Writer
-       io.Reader
-       io.Writer
-}
-
 type ioCloserFunc func() error
 
 func (me ioCloserFunc) Close() error {
@@ -256,29 +252,68 @@ func initDataChannel(
                        // This shouldn't happen if the API is configured correctly, and we call from OnOpen.
                        panic(err)
                }
-               onOpen(hookDataChannelCloser(raw, pc, span, dc), ctx, span)
+               onOpen(wrapDataChannel(raw, pc, span, dc), ctx, span)
        })
 }
 
-// Hooks the datachannel's Close to Close the owning PeerConnection. The datachannel takes ownership
-// and responsibility for the PeerConnection.
-func hookDataChannelCloser(
+// WebRTC data channel wrapper that supports operating as a peer conn ReadWriteCloser.
+type DataChannelConn struct {
+       ioCloserFunc
+       rawDataChannel datachannel.ReadWriteCloser
+}
+
+func (d DataChannelConn) Read(p []byte) (int, error) {
+       return d.rawDataChannel.Read(p)
+}
+
+// Limit write size for WebRTC data channels. See https://github.com/pion/datachannel/issues/59. The
+// default used to be (1<<16)-1. This will be set to the new appropriate value if it's discovered to
+// still be a limitation. Set WEBTORRENT_MAX_WRITE_SIZE to experiment with it.
+var maxWriteSize = g.None[int]()
+
+func init() {
+       s, ok := os.LookupEnv("WEBTORRENT_MAX_WRITE_SIZE")
+       if !ok {
+               return
+       }
+       i64, err := strconv.ParseInt(s, 0, 0)
+       panicif.Err(err)
+       maxWriteSize = g.Some(int(i64))
+}
+
+func (d DataChannelConn) Write(p []byte) (n int, err error) {
+       for {
+               p1 := p
+               if maxWriteSize.Ok {
+                       p1 = p1[:min(len(p1), maxWriteSize.Value)]
+               }
+               var n1 int
+               n1, err = d.rawDataChannel.Write(p1)
+               n += n1
+               p = p[n1:]
+               if err != nil {
+                       return
+               }
+               if len(p) == 0 {
+                       return
+               }
+       }
+}
+
+func wrapDataChannel(
        dcrwc datachannel.ReadWriteCloser,
        pc *wrappedPeerConnection,
        dataChannelSpan trace.Span,
        originalDataChannel *webrtc.DataChannel,
-) datachannel.ReadWriteCloser {
-       return struct {
-               datachannelReadWriter
-               io.Closer
-       }{
-               dcrwc,
-               ioCloserFunc(func() error {
+) DataChannelConn {
+       return DataChannelConn{
+               ioCloserFunc: ioCloserFunc(func() error {
                        dcrwc.Close()
                        pc.Close()
                        originalDataChannel.Close()
                        dataChannelSpan.End()
                        return nil
                }),
+               rawDataChannel: dcrwc,
        }
 }
index 7f53987bc93a3cc7366af1c7e0b242a41fa0e712..0e71e4e4a00f4d3f711b3ed87b3daf7eeadf5954 100644 (file)
@@ -10,7 +10,6 @@ import (
 
        "github.com/anacrolix/log"
        "github.com/gorilla/websocket"
-       "github.com/pion/datachannel"
        "github.com/pion/webrtc/v4"
 
        "github.com/anacrolix/torrent/tracker"
@@ -43,7 +42,7 @@ type websocketTrackers struct {
        PeerId                     [20]byte
        Logger                     log.Logger
        GetAnnounceRequest         func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error)
-       OnConn                     func(datachannel.ReadWriteCloser, webtorrent.DataChannelContext)
+       OnConn                     func(webtorrent.DataChannelConn, webtorrent.DataChannelContext)
        mu                         sync.Mutex
        clients                    map[string]*refCountedWebtorrentTrackerClient
        Proxy                      httpTracker.ProxyFunc