]> Sergey Matveev's repositories - btrtrc.git/blobdiff - client.go
Add WebRTC ICE servers config (#824)
[btrtrc.git] / client.go
index 95c21aabb7576333b80cdbd9b4d3a2cef71b0460..a87e44391f657508406c243b74df06602c81b030 100644 (file)
--- a/client.go
+++ b/client.go
@@ -35,7 +35,6 @@ import (
        "github.com/dustin/go-humanize"
        gbtree "github.com/google/btree"
        "github.com/pion/datachannel"
-       "golang.org/x/time/rate"
 
        "github.com/anacrolix/torrent/bencode"
        "github.com/anacrolix/torrent/internal/check"
@@ -82,18 +81,15 @@ type Client struct {
        torrents          map[InfoHash]*Torrent
        pieceRequestOrder map[interface{}]*request_strategy.PieceRequestOrder
 
-       acceptLimiter   map[ipStr]int
-       dialRateLimiter *rate.Limiter
-       numHalfOpen     int
+       acceptLimiter map[ipStr]int
+       numHalfOpen   int
 
        websocketTrackers websocketTrackers
 
        activeAnnounceLimiter limiter.Instance
        httpClient            *http.Client
 
-       undialableWithoutHolepunch                                   map[netip.AddrPort]struct{}
-       undialableWithoutHolepunchDialAttemptedAfterHolepunchConnect map[netip.AddrPort]struct{}
-       dialableOnlyAfterHolepunch                                   map[netip.AddrPort]struct{}
+       clientHolepunchAddrSets
 }
 
 type ipStr string
@@ -203,7 +199,6 @@ func (cl *Client) init(cfg *ClientConfig) {
        cl.config = cfg
        g.MakeMap(&cl.dopplegangerAddrs)
        cl.torrents = make(map[metainfo.Hash]*Torrent)
-       cl.dialRateLimiter = rate.NewLimiter(10, 10)
        cl.activeAnnounceLimiter.SlotsPerKey = 2
        cl.event.L = cl.locker()
        cl.ipBlockList = cfg.IPBlocklist
@@ -306,6 +301,7 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) {
                },
                Proxy:                      cl.config.HTTPProxy,
                WebsocketTrackerHttpHeader: cl.config.WebsocketTrackerHttpHeader,
+               ICEServers:                 cl.config.ICEServers,
                DialContext:                cl.config.TrackerDialContext,
                OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
                        cl.lock()
@@ -520,6 +516,20 @@ func (cl *Client) acceptConnections(l Listener) {
                        log.Fmsg("error accepting connection: %s", err).LogLevel(log.Debug, cl.logger)
                        continue
                }
+               {
+                       holepunchAddr, holepunchErr := addrPortFromPeerRemoteAddr(conn.RemoteAddr())
+                       if holepunchErr == nil {
+                               cl.lock()
+                               if g.MapContains(
+                                       cl.undialableWithoutHolepunchDialedAfterHolepunchConnect,
+                                       holepunchAddr,
+                               ) {
+                                       g.MakeMapIfNil(&cl.probablyOnlyConnectedDueToHolepunch)
+                                       g.MapInsert(cl.probablyOnlyConnectedDueToHolepunch, holepunchAddr, struct{}{})
+                               }
+                               cl.unlock()
+                       }
+               }
                go func() {
                        if reject != nil {
                                torrent.Add("rejected accepted connections", 1)
@@ -722,6 +732,19 @@ func doProtocolHandshakeOnDialResult(
 
 // Returns nil connection and nil error if no connection could be established for valid reasons.
 func (cl *Client) dialAndCompleteHandshake(opts outgoingConnOpts) (c *PeerConn, err error) {
+       // It would be better if dial rate limiting could be tested when considering to open connections
+       // instead. Doing it here means if the limit is low, and the half-open limit is high, we could
+       // end up with lots of outgoing connection attempts pending that were initiated on stale data.
+       {
+               dialReservation := cl.config.DialRateLimiter.Reserve()
+               if !opts.receivedHolepunchConnect {
+                       if !dialReservation.OK() {
+                               err = errors.New("can't make dial limit reservation")
+                               return
+                       }
+                       time.Sleep(dialReservation.Delay())
+               }
+       }
        torrent.Add("establish outgoing connection", 1)
        addr := opts.peerInfo.Addr
        dialPool := dialPool{
@@ -738,12 +761,16 @@ func (cl *Client) dialAndCompleteHandshake(opts outgoingConnOpts) (c *PeerConn,
                }
        }
        holepunchAddr, holepunchAddrErr := addrPortFromPeerRemoteAddr(addr)
-       if holepunchAddrErr == nil && g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) && opts.receivedHolepunchConnect {
-               g.MakeMapIfNilAndSet(
-                       &cl.undialableWithoutHolepunchDialAttemptedAfterHolepunchConnect,
-                       holepunchAddr,
-                       struct{}{},
-               )
+       if holepunchAddrErr == nil && opts.receivedHolepunchConnect {
+               cl.lock()
+               if g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) {
+                       g.MakeMapIfNilAndSet(
+                               &cl.undialableWithoutHolepunchDialedAfterHolepunchConnect,
+                               holepunchAddr,
+                               struct{}{},
+                       )
+               }
+               cl.unlock()
        }
        headerObfuscationPolicy := opts.HeaderObfuscationPolicy
        obfuscatedHeaderFirst := headerObfuscationPolicy.Preferred
@@ -751,18 +778,26 @@ func (cl *Client) dialAndCompleteHandshake(opts outgoingConnOpts) (c *PeerConn,
        if firstDialResult.Conn == nil {
                // No dialers worked. Try to initiate a holepunching rendezvous.
                if holepunchAddrErr == nil {
+                       cl.lock()
                        if !opts.receivedHolepunchConnect {
-                               cl.lock()
                                g.MakeMapIfNilAndSet(&cl.undialableWithoutHolepunch, holepunchAddr, struct{}{})
-                               cl.unlock()
                        }
-                       opts.t.startHolepunchRendezvous(holepunchAddr)
+                       if !opts.skipHolepunchRendezvous {
+                               opts.t.trySendHolepunchRendezvous(holepunchAddr)
+                       }
+                       cl.unlock()
                }
                err = fmt.Errorf("all initial dials failed")
                return
        }
-       if opts.receivedHolepunchConnect && holepunchAddrErr == nil && g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) {
-               g.MakeMapIfNilAndSet(&cl.dialableOnlyAfterHolepunch, holepunchAddr, struct{}{})
+       if opts.receivedHolepunchConnect && holepunchAddrErr == nil {
+               cl.lock()
+               if g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) {
+                       g.MakeMapIfNilAndSet(&cl.dialableOnlyAfterHolepunch, holepunchAddr, struct{}{})
+               }
+               g.MakeMapIfNil(&cl.dialedSuccessfullyAfterHolepunchConnect)
+               g.MapInsert(cl.dialedSuccessfullyAfterHolepunchConnect, holepunchAddr, struct{}{})
+               cl.unlock()
        }
        c, err = doProtocolHandshakeOnDialResult(
                opts.t,
@@ -774,6 +809,12 @@ func (cl *Client) dialAndCompleteHandshake(opts outgoingConnOpts) (c *PeerConn,
                torrent.Add("initiated conn with preferred header obfuscation", 1)
                return
        }
+       c.logger.Levelf(
+               log.Debug,
+               "error doing protocol handshake with header obfuscation %v",
+               obfuscatedHeaderFirst,
+       )
+       firstDialResult.Conn.Close()
        // We should have just tried with the preferred header obfuscation. If it was required, there's nothing else to try.
        if headerObfuscationPolicy.RequirePreferred {
                return
@@ -798,6 +839,12 @@ func (cl *Client) dialAndCompleteHandshake(opts outgoingConnOpts) (c *PeerConn,
                torrent.Add("initiated conn with fallback header obfuscation", 1)
                return
        }
+       c.logger.Levelf(
+               log.Debug,
+               "error doing protocol handshake with header obfuscation %v",
+               !obfuscatedHeaderFirst,
+       )
+       secondDialResult.Conn.Close()
        return
 }
 
@@ -819,7 +866,6 @@ func (cl *Client) outgoingConnection(
        opts outgoingConnOpts,
        attemptKey outgoingConnAttemptKey,
 ) {
-       cl.dialRateLimiter.Wait(context.Background())
        c, err := cl.dialAndCompleteHandshake(opts)
        if err == nil {
                c.conn.SetWriteDeadline(time.Time{})
@@ -1261,7 +1307,7 @@ func (cl *Client) newTorrentOpt(opts AddTorrentOpts) (t *Torrent) {
        t.smartBanCache.Hash = sha1.Sum
        t.smartBanCache.Init()
        t.networkingEnabled.Set()
-       t.logger = cl.logger.WithContextValue(t).WithNames("torrent", t.infoHash.HexString()).WithDefaultLevel(log.Debug)
+       t.logger = cl.logger.WithDefaultLevel(log.Debug)
        t.sourcesLogger = t.logger.WithNames("sources")
        if opts.ChunkSize == 0 {
                opts.ChunkSize = defaultChunkSize
@@ -1560,6 +1606,7 @@ func (cl *Client) newConnection(nc net.Conn, opts newConnectionOpts) (c *PeerCon
        }
        c.peerImpl = c
        c.logger = cl.logger.WithDefaultLevel(log.Warning)
+       c.logger = c.logger.WithContextText(fmt.Sprintf("%T %p", c, c))
        c.setRW(connStatsReadWriter{nc, c})
        c.r = &rateLimitedReader{
                l: cl.config.DownloadRateLimiter,
@@ -1567,8 +1614,8 @@ func (cl *Client) newConnection(nc net.Conn, opts newConnectionOpts) (c *PeerCon
        }
        c.logger.Levelf(
                log.Debug,
-               "new PeerConn %p [Client %p remoteAddr %v network %v outgoing %t]",
-               c, cl, opts.remoteAddr, opts.network, opts.outgoing,
+               "inited with remoteAddr %v network %v outgoing %t",
+               opts.remoteAddr, opts.network, opts.outgoing,
        )
        for _, f := range cl.config.Callbacks.NewPeer {
                f(&c.Peer)
@@ -1746,13 +1793,3 @@ func (cl *Client) Stats() ClientStats {
        defer cl.rUnlock()
        return cl.statsLocked()
 }
-
-func (cl *Client) statsLocked() (stats ClientStats) {
-       stats.ConnStats = cl.connStats.Copy()
-       stats.ActiveHalfOpenAttempts = cl.numHalfOpen
-       stats.NumPeersUndialableWithoutHolepunchDialedAfterHolepunchConnect =
-               len(cl.undialableWithoutHolepunchDialAttemptedAfterHolepunchConnect)
-       stats.NumPeersDialableOnlyAfterHolepunch =
-               len(cl.dialableOnlyAfterHolepunch)
-       return
-}