X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=client.go;h=1ea1d610482551bfef1d621f0ba03bd5cf6f3667;hb=HEAD;hp=4c2d9123a2f2ba0fedeb8180e24bf5719d4dca78;hpb=c8fd8884b124af61b98e435711733d11270265bf;p=btrtrc.git diff --git a/client.go b/client.go index 4c2d9123..1ea1d610 100644 --- a/client.go +++ b/client.go @@ -19,8 +19,6 @@ import ( "strconv" "time" - "github.com/anacrolix/torrent/internal/panicif" - "github.com/anacrolix/chansync" "github.com/anacrolix/chansync/events" "github.com/anacrolix/dht/v2" @@ -37,7 +35,6 @@ import ( "github.com/dustin/go-humanize" gbtree "github.com/google/btree" "github.com/pion/datachannel" - "golang.org/x/time/rate" "github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/internal/check" @@ -84,18 +81,15 @@ type Client struct { torrents map[InfoHash]*Torrent pieceRequestOrder map[interface{}]*request_strategy.PieceRequestOrder - acceptLimiter map[ipStr]int - dialRateLimiter *rate.Limiter - numHalfOpen int + acceptLimiter map[ipStr]int + numHalfOpen int websocketTrackers websocketTrackers activeAnnounceLimiter limiter.Instance httpClient *http.Client - undialableWithoutHolepunch map[netip.AddrPort]struct{} - undialableWithoutHolepunchDialAttemptedAfterHolepunchConnect map[netip.AddrPort]struct{} - dialableOnlyAfterHolepunch map[netip.AddrPort]struct{} + clientHolepunchAddrSets } type ipStr string @@ -205,18 +199,20 @@ func (cl *Client) init(cfg *ClientConfig) { cl.config = cfg g.MakeMap(&cl.dopplegangerAddrs) cl.torrents = make(map[metainfo.Hash]*Torrent) - cl.dialRateLimiter = rate.NewLimiter(10, 10) cl.activeAnnounceLimiter.SlotsPerKey = 2 cl.event.L = cl.locker() cl.ipBlockList = cfg.IPBlocklist cl.httpClient = &http.Client{ - Transport: &http.Transport{ + Transport: cfg.WebTransport, + } + if cl.httpClient.Transport == nil { + cl.httpClient.Transport = &http.Transport{ Proxy: cfg.HTTPProxy, DialContext: cfg.HTTPDialContext, // I think this value was observed from some webseeds. It seems reasonable to extend it // to other uses of HTTP from the client. MaxConnsPerHost: 10, - }, + } } } @@ -308,6 +304,7 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) { }, Proxy: cl.config.HTTPProxy, WebsocketTrackerHttpHeader: cl.config.WebsocketTrackerHttpHeader, + ICEServers: cl.config.ICEServers, DialContext: cl.config.TrackerDialContext, OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) { cl.lock() @@ -504,6 +501,22 @@ func (cl *Client) acceptConnections(l Listener) { for { conn, err := l.Accept() torrent.Add("client listener accepts", 1) + if err == nil { + holepunchAddr, holepunchErr := addrPortFromPeerRemoteAddr(conn.RemoteAddr()) + if holepunchErr == nil { + cl.lock() + if g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) { + setAdd(&cl.accepted, holepunchAddr) + } + if g.MapContains( + cl.undialableWithoutHolepunchDialedAfterHolepunchConnect, + holepunchAddr, + ) { + setAdd(&cl.probablyOnlyConnectedDueToHolepunch, holepunchAddr) + } + cl.unlock() + } + } conn = pproffd.WrapNetConn(conn) cl.rLock() closed := cl.closed.IsSet() @@ -621,51 +634,14 @@ func (cl *Client) dialFirst(ctx context.Context, addr string) (res DialResult) { // Returns a connection over UTP or TCP, whichever is first to connect. func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResult) { - { - t := perf.NewTimer(perf.CallerName(0)) - defer func() { - if res.Conn == nil { - t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err())) - } else { - t.Mark("returned conn over " + res.Dialer.DialerNetwork()) - } - }() + pool := dialPool{ + addr: addr, } - ctx, cancel := context.WithCancel(ctx) - // As soon as we return one connection, cancel the others. - defer cancel() - left := 0 - resCh := make(chan DialResult, left) + defer pool.startDrainer() for _, _s := range dialers { - left++ - s := _s - go func() { - resCh <- DialResult{ - dialFromSocket(ctx, s, addr), - s, - } - }() + pool.add(ctx, _s) } - // Wait for a successful connection. - func() { - defer perf.ScopeTimer()() - for ; left > 0 && res.Conn == nil; left-- { - res = <-resCh - } - }() - // There are still uncompleted dials. - go func() { - for ; left > 0; left-- { - conn := (<-resCh).Conn - if conn != nil { - conn.Close() - } - } - }() - if res.Conn != nil { - go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1) - } - return res + return pool.getFirst() } func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn { @@ -732,119 +708,18 @@ func (cl *Client) initiateProtocolHandshakes( return } -func (cl *Client) waitForRendezvousConnect(ctx context.Context, rz *utHolepunchRendezvous) error { - for { - switch { - case rz.gotConnect.IsSet(): - return nil - case len(rz.relays) == 0: - return errors.New("all relays failed") - case ctx.Err() != nil: - return context.Cause(ctx) - } - relayCond := rz.relayCond.Signaled() - cl.unlock() - select { - case <-rz.gotConnect.Done(): - case <-relayCond: - case <-ctx.Done(): - } - cl.lock() - } -} - -// Returns nil connection and nil error if no connection could be established for valid reasons. -func (cl *Client) initiateRendezvousConnect( - t *Torrent, holepunchAddr netip.AddrPort, -) (ok bool, err error) { - cl.lock() - defer cl.unlock() - rz, err := t.startHolepunchRendezvous(holepunchAddr) - if err != nil { - return - } - if rz == nil { - return - } - ok = true - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - err = cl.waitForRendezvousConnect(ctx, rz) - delete(t.utHolepunchRendezvous, holepunchAddr) - if err != nil { - err = fmt.Errorf("waiting for rendezvous connect signal: %w", err) - } - return -} - -// Returns nil connection and nil error if no connection could be established for valid reasons. -func (cl *Client) establishOutgoingConnEx( - opts outgoingConnOpts, +func doProtocolHandshakeOnDialResult( + t *Torrent, obfuscatedHeader bool, + addr PeerRemoteAddr, + dr DialResult, ) ( - _ *PeerConn, err error, + c *PeerConn, err error, ) { - t := opts.t - addr := opts.addr - holepunchAddr, err := addrPortFromPeerRemoteAddr(addr) - var sentRendezvous bool - if err == nil { - if !opts.skipHolepunchRendezvous { - sentRendezvous, err = cl.initiateRendezvousConnect(t, holepunchAddr) - if err != nil { - err = fmt.Errorf("initiating rendezvous connect: %w", err) - } - } - } - gotHolepunchConnect := (err == nil && sentRendezvous) || opts.receivedHolepunchConnect - if opts.requireRendezvous && !sentRendezvous { - return nil, err - } - if err != nil { - t.logger.Print(err) - } - dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration { - cl.rLock() - defer cl.rUnlock() - return t.dialTimeout() - }()) - defer cancel() - dr := cl.dialFirst(dialCtx, addr.String()) + cl := t.cl nc := dr.Conn - cl.lock() - if gotHolepunchConnect && g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) { - g.MakeMapIfNilAndSet( - &cl.undialableWithoutHolepunchDialAttemptedAfterHolepunchConnect, - holepunchAddr, - struct{}{}, - ) - } - cl.unlock() - if nc == nil { - if !sentRendezvous && !gotHolepunchConnect { - cl.lock() - g.MakeMapIfNilAndSet(&cl.undialableWithoutHolepunch, holepunchAddr, struct{}{}) - cl.unlock() - } - if dialCtx.Err() != nil { - return nil, fmt.Errorf("dialing: %w", dialCtx.Err()) - } - return nil, errors.New("dial failed") - } - if gotHolepunchConnect { - panicif.False(holepunchAddr.IsValid()) - cl.lock() - if g.MapContains(cl.undialableWithoutHolepunchDialAttemptedAfterHolepunchConnect, holepunchAddr) { - g.MakeMapIfNilAndSet( - &cl.dialableOnlyAfterHolepunch, - holepunchAddr, - struct{}{}, - ) - } - cl.unlock() - } addrIpPort, _ := tryIpPortFromNetAddr(addr) - c, err := cl.initiateProtocolHandshakes( + c, err = cl.initiateProtocolHandshakes( context.Background(), nc, t, obfuscatedHeader, newConnectionOpts{ outgoing: true, @@ -860,69 +735,153 @@ func (cl *Client) establishOutgoingConnEx( return c, err } -// Returns nil connection and nil error if no connection could be established -// for valid reasons. -func (cl *Client) establishOutgoingConn(opts outgoingConnOpts) (c *PeerConn, err error) { +// Returns nil connection and nil error if no connection could be established for valid reasons. +func (cl *Client) dialAndCompleteHandshake(opts outgoingConnOpts) (c *PeerConn, err error) { + // It would be better if dial rate limiting could be tested when considering to open connections + // instead. Doing it here means if the limit is low, and the half-open limit is high, we could + // end up with lots of outgoing connection attempts pending that were initiated on stale data. + { + dialReservation := cl.config.DialRateLimiter.Reserve() + if !opts.receivedHolepunchConnect { + if !dialReservation.OK() { + err = errors.New("can't make dial limit reservation") + return + } + time.Sleep(dialReservation.Delay()) + } + } torrent.Add("establish outgoing connection", 1) - obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred - c, err = cl.establishOutgoingConnEx(opts, obfuscatedHeaderFirst) + addr := opts.peerInfo.Addr + dialPool := dialPool{ + resCh: make(chan DialResult), + addr: addr.String(), + } + defer dialPool.startDrainer() + dialTimeout := opts.t.getDialTimeoutUnlocked() + { + ctx, cancel := context.WithTimeout(context.Background(), dialTimeout) + defer cancel() + for _, d := range cl.dialers { + dialPool.add(ctx, d) + } + } + holepunchAddr, holepunchAddrErr := addrPortFromPeerRemoteAddr(addr) + headerObfuscationPolicy := opts.HeaderObfuscationPolicy + obfuscatedHeaderFirst := headerObfuscationPolicy.Preferred + firstDialResult := dialPool.getFirst() + if firstDialResult.Conn == nil { + // No dialers worked. Try to initiate a holepunching rendezvous. + if holepunchAddrErr == nil { + cl.lock() + if !opts.receivedHolepunchConnect { + g.MakeMapIfNilAndSet(&cl.undialableWithoutHolepunch, holepunchAddr, struct{}{}) + } + if !opts.skipHolepunchRendezvous { + opts.t.trySendHolepunchRendezvous(holepunchAddr) + } + cl.unlock() + } + err = fmt.Errorf("all initial dials failed") + return + } + if opts.receivedHolepunchConnect && holepunchAddrErr == nil { + cl.lock() + if g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) { + g.MakeMapIfNilAndSet(&cl.dialableOnlyAfterHolepunch, holepunchAddr, struct{}{}) + } + g.MakeMapIfNil(&cl.dialedSuccessfullyAfterHolepunchConnect) + g.MapInsert(cl.dialedSuccessfullyAfterHolepunchConnect, holepunchAddr, struct{}{}) + cl.unlock() + } + c, err = doProtocolHandshakeOnDialResult( + opts.t, + obfuscatedHeaderFirst, + addr, + firstDialResult, + ) if err == nil { torrent.Add("initiated conn with preferred header obfuscation", 1) return } - // cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err) - if cl.config.HeaderObfuscationPolicy.RequirePreferred { - // We should have just tried with the preferred header obfuscation. If it was required, - // there's nothing else to try. + c.logger.Levelf( + log.Debug, + "error doing protocol handshake with header obfuscation %v", + obfuscatedHeaderFirst, + ) + firstDialResult.Conn.Close() + // We should have just tried with the preferred header obfuscation. If it was required, there's nothing else to try. + if headerObfuscationPolicy.RequirePreferred { return } - // Try again with encryption if we didn't earlier, or without if we did. - c, err = cl.establishOutgoingConnEx(opts, !obfuscatedHeaderFirst) + // Reuse the dialer that returned already but failed to handshake. + { + ctx, cancel := context.WithTimeout(context.Background(), dialTimeout) + defer cancel() + dialPool.add(ctx, firstDialResult.Dialer) + } + secondDialResult := dialPool.getFirst() + if secondDialResult.Conn == nil { + return + } + c, err = doProtocolHandshakeOnDialResult( + opts.t, + !obfuscatedHeaderFirst, + addr, + secondDialResult, + ) if err == nil { torrent.Add("initiated conn with fallback header obfuscation", 1) + return } - // cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err) + c.logger.Levelf( + log.Debug, + "error doing protocol handshake with header obfuscation %v", + !obfuscatedHeaderFirst, + ) + secondDialResult.Conn.Close() return } type outgoingConnOpts struct { - t *Torrent - addr PeerRemoteAddr + peerInfo PeerInfo + t *Torrent // Don't attempt to connect unless a connect message is received after initiating a rendezvous. requireRendezvous bool // Don't send rendezvous requests to eligible relays. skipHolepunchRendezvous bool // Outgoing connection attempt is in response to holepunch connect message. receivedHolepunchConnect bool + HeaderObfuscationPolicy HeaderObfuscationPolicy } // Called to dial out and run a connection. The addr we're given is already // considered half-open. func (cl *Client) outgoingConnection( opts outgoingConnOpts, - ps PeerSource, - trusted bool, attemptKey outgoingConnAttemptKey, ) { - cl.dialRateLimiter.Wait(context.Background()) - c, err := cl.establishOutgoingConn(opts) + c, err := cl.dialAndCompleteHandshake(opts) if err == nil { c.conn.SetWriteDeadline(time.Time{}) } cl.lock() defer cl.unlock() - // Don't release lock between here and addPeerConn, unless it's for - // failure. - cl.noLongerHalfOpen(opts.t, opts.addr.String(), attemptKey) + // Don't release lock between here and addPeerConn, unless it's for failure. + cl.noLongerHalfOpen(opts.t, opts.peerInfo.Addr.String(), attemptKey) if err != nil { if cl.config.Debug { - cl.logger.Levelf(log.Debug, "error establishing outgoing connection to %v: %v", opts.addr, err) + cl.logger.Levelf( + log.Debug, + "error establishing outgoing connection to %v: %v", + opts.peerInfo.Addr, + err, + ) } return } defer c.close() - c.Discovery = ps - c.trusted = trusted + c.Discovery = opts.peerInfo.Source + c.trusted = opts.peerInfo.Trusted opts.t.runHandshookConnLoggingErr(c) } @@ -1342,7 +1301,7 @@ func (cl *Client) newTorrentOpt(opts AddTorrentOpts) (t *Torrent) { t.smartBanCache.Hash = sha1.Sum t.smartBanCache.Init() t.networkingEnabled.Set() - t.logger = cl.logger.WithContextValue(t).WithNames("torrent", t.infoHash.HexString()).WithDefaultLevel(log.Debug) + t.logger = cl.logger.WithDefaultLevel(log.Debug) t.sourcesLogger = t.logger.WithNames("sources") if opts.ChunkSize == 0 { opts.ChunkSize = defaultChunkSize @@ -1641,6 +1600,7 @@ func (cl *Client) newConnection(nc net.Conn, opts newConnectionOpts) (c *PeerCon } c.peerImpl = c c.logger = cl.logger.WithDefaultLevel(log.Warning) + c.logger = c.logger.WithContextText(fmt.Sprintf("%T %p", c, c)) c.setRW(connStatsReadWriter{nc, c}) c.r = &rateLimitedReader{ l: cl.config.DownloadRateLimiter, @@ -1648,8 +1608,8 @@ func (cl *Client) newConnection(nc net.Conn, opts newConnectionOpts) (c *PeerCon } c.logger.Levelf( log.Debug, - "new PeerConn %p [Client %p remoteAddr %v network %v outgoing %t]", - c, cl, opts.remoteAddr, opts.network, opts.outgoing, + "inited with remoteAddr %v network %v outgoing %t", + opts.remoteAddr, opts.network, opts.outgoing, ) for _, f := range cl.config.Callbacks.NewPeer { f(&c.Peer) @@ -1827,13 +1787,3 @@ func (cl *Client) Stats() ClientStats { defer cl.rUnlock() return cl.statsLocked() } - -func (cl *Client) statsLocked() (stats ClientStats) { - stats.ConnStats = cl.connStats.Copy() - stats.ActiveHalfOpenAttempts = cl.numHalfOpen - stats.NumPeersUndialableWithoutHolepunchDialedAfterHolepunchConnect = - len(cl.undialableWithoutHolepunchDialAttemptedAfterHolepunchConnect) - stats.NumPeersDialableOnlyAfterHolepunch = - len(cl.dialableOnlyAfterHolepunch) - return -}