X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=client.go;h=f1a054abc6708a223e3c4159de7e53cb0c35ce84;hb=5efb4dd9410e28bb2d6320268af4f98366be6508;hp=bc9bafbbd8fd95b065ebd03bc154426729fc37e9;hpb=9352f6cf8e1fb6c125e205d0a10f8068e511d663;p=btrtrc.git diff --git a/client.go b/client.go index bc9bafbb..f1a054ab 100644 --- a/client.go +++ b/client.go @@ -35,7 +35,6 @@ import ( "github.com/dustin/go-humanize" gbtree "github.com/google/btree" "github.com/pion/datachannel" - "golang.org/x/time/rate" "github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/internal/check" @@ -82,9 +81,8 @@ type Client struct { torrents map[InfoHash]*Torrent pieceRequestOrder map[interface{}]*request_strategy.PieceRequestOrder - acceptLimiter map[ipStr]int - dialRateLimiter *rate.Limiter - numHalfOpen int + acceptLimiter map[ipStr]int + numHalfOpen int websocketTrackers websocketTrackers @@ -201,7 +199,6 @@ func (cl *Client) init(cfg *ClientConfig) { cl.config = cfg g.MakeMap(&cl.dopplegangerAddrs) cl.torrents = make(map[metainfo.Hash]*Torrent) - cl.dialRateLimiter = rate.NewLimiter(10, 10) cl.activeAnnounceLimiter.SlotsPerKey = 2 cl.event.L = cl.locker() cl.ipBlockList = cfg.IPBlocklist @@ -304,6 +301,7 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) { }, Proxy: cl.config.HTTPProxy, WebsocketTrackerHttpHeader: cl.config.WebsocketTrackerHttpHeader, + ICEServers: cl.config.ICEServers, DialContext: cl.config.TrackerDialContext, OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) { cl.lock() @@ -500,6 +498,22 @@ func (cl *Client) acceptConnections(l Listener) { for { conn, err := l.Accept() torrent.Add("client listener accepts", 1) + if err == nil { + holepunchAddr, holepunchErr := addrPortFromPeerRemoteAddr(conn.RemoteAddr()) + if holepunchErr == nil { + cl.lock() + if g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) { + setAdd(&cl.accepted, holepunchAddr) + } + if g.MapContains( + cl.undialableWithoutHolepunchDialedAfterHolepunchConnect, + holepunchAddr, + ) { + setAdd(&cl.probablyOnlyConnectedDueToHolepunch, holepunchAddr) + } + cl.unlock() + } + } conn = pproffd.WrapNetConn(conn) cl.rLock() closed := cl.closed.IsSet() @@ -518,20 +532,6 @@ func (cl *Client) acceptConnections(l Listener) { log.Fmsg("error accepting connection: %s", err).LogLevel(log.Debug, cl.logger) continue } - { - holepunchAddr, holepunchErr := addrPortFromPeerRemoteAddr(conn.RemoteAddr()) - if holepunchErr == nil { - cl.lock() - if g.MapContains( - cl.undialableWithoutHolepunchDialedAfterHolepunchConnect, - holepunchAddr, - ) { - g.MakeMapIfNil(&cl.probablyOnlyConnectedDueToHolepunch) - g.MapInsert(cl.probablyOnlyConnectedDueToHolepunch, holepunchAddr, struct{}{}) - } - cl.unlock() - } - } go func() { if reject != nil { torrent.Add("rejected accepted connections", 1) @@ -734,6 +734,19 @@ func doProtocolHandshakeOnDialResult( // Returns nil connection and nil error if no connection could be established for valid reasons. func (cl *Client) dialAndCompleteHandshake(opts outgoingConnOpts) (c *PeerConn, err error) { + // It would be better if dial rate limiting could be tested when considering to open connections + // instead. Doing it here means if the limit is low, and the half-open limit is high, we could + // end up with lots of outgoing connection attempts pending that were initiated on stale data. + { + dialReservation := cl.config.DialRateLimiter.Reserve() + if !opts.receivedHolepunchConnect { + if !dialReservation.OK() { + err = errors.New("can't make dial limit reservation") + return + } + time.Sleep(dialReservation.Delay()) + } + } torrent.Add("establish outgoing connection", 1) addr := opts.peerInfo.Addr dialPool := dialPool{ @@ -750,17 +763,6 @@ func (cl *Client) dialAndCompleteHandshake(opts outgoingConnOpts) (c *PeerConn, } } holepunchAddr, holepunchAddrErr := addrPortFromPeerRemoteAddr(addr) - if holepunchAddrErr == nil && opts.receivedHolepunchConnect { - cl.lock() - if g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) { - g.MakeMapIfNilAndSet( - &cl.undialableWithoutHolepunchDialedAfterHolepunchConnect, - holepunchAddr, - struct{}{}, - ) - } - cl.unlock() - } headerObfuscationPolicy := opts.HeaderObfuscationPolicy obfuscatedHeaderFirst := headerObfuscationPolicy.Preferred firstDialResult := dialPool.getFirst() @@ -798,6 +800,12 @@ func (cl *Client) dialAndCompleteHandshake(opts outgoingConnOpts) (c *PeerConn, torrent.Add("initiated conn with preferred header obfuscation", 1) return } + c.logger.Levelf( + log.Debug, + "error doing protocol handshake with header obfuscation %v", + obfuscatedHeaderFirst, + ) + firstDialResult.Conn.Close() // We should have just tried with the preferred header obfuscation. If it was required, there's nothing else to try. if headerObfuscationPolicy.RequirePreferred { return @@ -822,6 +830,12 @@ func (cl *Client) dialAndCompleteHandshake(opts outgoingConnOpts) (c *PeerConn, torrent.Add("initiated conn with fallback header obfuscation", 1) return } + c.logger.Levelf( + log.Debug, + "error doing protocol handshake with header obfuscation %v", + !obfuscatedHeaderFirst, + ) + secondDialResult.Conn.Close() return } @@ -843,7 +857,6 @@ func (cl *Client) outgoingConnection( opts outgoingConnOpts, attemptKey outgoingConnAttemptKey, ) { - cl.dialRateLimiter.Wait(context.Background()) c, err := cl.dialAndCompleteHandshake(opts) if err == nil { c.conn.SetWriteDeadline(time.Time{}) @@ -1285,7 +1298,7 @@ func (cl *Client) newTorrentOpt(opts AddTorrentOpts) (t *Torrent) { t.smartBanCache.Hash = sha1.Sum t.smartBanCache.Init() t.networkingEnabled.Set() - t.logger = cl.logger.WithContextValue(t).WithNames("torrent", t.infoHash.HexString()).WithDefaultLevel(log.Debug) + t.logger = cl.logger.WithDefaultLevel(log.Debug) t.sourcesLogger = t.logger.WithNames("sources") if opts.ChunkSize == 0 { opts.ChunkSize = defaultChunkSize @@ -1592,8 +1605,8 @@ func (cl *Client) newConnection(nc net.Conn, opts newConnectionOpts) (c *PeerCon } c.logger.Levelf( log.Debug, - "new PeerConn %p [Client %p remoteAddr %v network %v outgoing %t]", - c, cl, opts.remoteAddr, opts.network, opts.outgoing, + "inited with remoteAddr %v network %v outgoing %t", + opts.remoteAddr, opts.network, opts.outgoing, ) for _, f := range cl.config.Callbacks.NewPeer { f(&c.Peer)