X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=client.go;h=6a82774e7a0d5cf8f0fa4b265161de1076a40609;hb=3a92268f10184f5ed4602fa43d476d0ada96dc8e;hp=179031d03e3a81b7b1bd21151a1769a05f9eb141;hpb=ccce2dba13be2b86a8daa8804e7cbc04b475aa8b;p=btrtrc.git diff --git a/client.go b/client.go index 179031d0..6a82774e 100644 --- a/client.go +++ b/client.go @@ -4,7 +4,9 @@ import ( "bufio" "context" "crypto/rand" + "crypto/sha1" "encoding/binary" + "encoding/hex" "errors" "expvar" "fmt" @@ -12,37 +14,41 @@ import ( "math" "net" "net/http" + "net/netip" "sort" "strconv" - "strings" "time" + "github.com/anacrolix/chansync" "github.com/anacrolix/chansync/events" "github.com/anacrolix/dht/v2" "github.com/anacrolix/dht/v2/krpc" + . "github.com/anacrolix/generics" + g "github.com/anacrolix/generics" "github.com/anacrolix/log" "github.com/anacrolix/missinggo/perf" - "github.com/anacrolix/missinggo/pubsub" "github.com/anacrolix/missinggo/v2" "github.com/anacrolix/missinggo/v2/bitmap" "github.com/anacrolix/missinggo/v2/pproffd" "github.com/anacrolix/sync" "github.com/davecgh/go-spew/spew" "github.com/dustin/go-humanize" - "github.com/google/btree" + gbtree "github.com/google/btree" "github.com/pion/datachannel" "golang.org/x/time/rate" - "github.com/anacrolix/chansync" - "github.com/anacrolix/torrent/bencode" + "github.com/anacrolix/torrent/internal/check" "github.com/anacrolix/torrent/internal/limiter" "github.com/anacrolix/torrent/iplist" "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/mse" pp "github.com/anacrolix/torrent/peer_protocol" + utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch" + request_strategy "github.com/anacrolix/torrent/request-strategy" "github.com/anacrolix/torrent/storage" "github.com/anacrolix/torrent/tracker" + "github.com/anacrolix/torrent/types/infohash" "github.com/anacrolix/torrent/webtorrent" ) @@ -51,7 +57,7 @@ import ( type Client struct { // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of // fields. See #262. - stats ConnStats + connStats ConnStats _mu lockWithDeferreds event sync.Cond @@ -72,8 +78,9 @@ type Client struct { // include ourselves if we end up trying to connect to our own address // through legitimate channels. dopplegangerAddrs map[string]struct{} - badPeerIPs map[string]struct{} + badPeerIPs map[netip.Addr]struct{} torrents map[InfoHash]*Torrent + pieceRequestOrder map[interface{}]*request_strategy.PieceRequestOrder acceptLimiter map[ipStr]int dialRateLimiter *rate.Limiter @@ -82,8 +89,9 @@ type Client struct { websocketTrackers websocketTrackers activeAnnounceLimiter limiter.Instance + httpClient *http.Client - updateRequests chansync.BroadcastCond + clientHolepunchAddrSets } type ipStr string @@ -99,7 +107,7 @@ func (cl *Client) badPeerIPsLocked() (ips []string) { ips = make([]string, len(cl.badPeerIPs)) i := 0 for k := range cl.badPeerIPs { - ips[i] = k + ips[i] = k.String() i += 1 } return @@ -143,7 +151,7 @@ func (cl *Client) WriteStatus(_w io.Writer) { fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String()) writeDhtServerStatus(w, s) }) - spew.Fdump(w, &cl.stats) + dumpStats(w, cl.statsLocked()) torrentsSlice := cl.torrentsAsSlice() fmt.Fprintf(w, "# Torrents: %d\n", len(torrentsSlice)) fmt.Fprintln(w) @@ -162,8 +170,8 @@ func (cl *Client) WriteStatus(_w io.Writer) { w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())), - *t.length, - humanize.Bytes(uint64(*t.length))) + t.length(), + humanize.Bytes(uint64(t.length()))) } else { w.WriteString("") } @@ -173,19 +181,13 @@ func (cl *Client) WriteStatus(_w io.Writer) { } } -// Filters things that are less than warning from UPnP discovery. -func upnpDiscoverLogFilter(m log.Msg) bool { - level, ok := m.GetLevel() - return !m.HasValue(UpnpDiscoverLogTag) || (!level.LessThan(log.Warning) && ok) -} - func (cl *Client) initLogger() { logger := cl.config.Logger if logger.IsZero() { logger = log.Default - if !cl.config.Debug { - logger = logger.FilterLevel(log.Info).WithFilter(upnpDiscoverLogFilter) - } + } + if cl.config.Debug { + logger = logger.FilterLevel(log.Debug) } cl.logger = logger.WithValues(cl) } @@ -197,13 +199,21 @@ func (cl *Client) announceKey() int32 { // Initializes a bare minimum Client. *Client and *ClientConfig must not be nil. func (cl *Client) init(cfg *ClientConfig) { cl.config = cfg - cl.dopplegangerAddrs = make(map[string]struct{}) + g.MakeMap(&cl.dopplegangerAddrs) cl.torrents = make(map[metainfo.Hash]*Torrent) cl.dialRateLimiter = rate.NewLimiter(10, 10) cl.activeAnnounceLimiter.SlotsPerKey = 2 - cl.event.L = cl.locker() cl.ipBlockList = cfg.IPBlocklist + cl.httpClient = &http.Client{ + Transport: &http.Transport{ + Proxy: cfg.HTTPProxy, + DialContext: cfg.HTTPDialContext, + // I think this value was observed from some webseeds. It seems reasonable to extend it + // to other uses of HTTP from the client. + MaxConnsPerHost: 10, + }, + } } func NewClient(cfg *ClientConfig) (cl *Client, err error) { @@ -246,7 +256,7 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) { } } - sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback) + sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback, cl.logger) if err != nil { return } @@ -256,7 +266,7 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) { for _, _s := range sockets { s := _s // Go is fucking retarded. - cl.onClose = append(cl.onClose, func() { s.Close() }) + cl.onClose = append(cl.onClose, func() { go s.Close() }) if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) { cl.dialers = append(cl.dialers, s) cl.listeners = append(cl.listeners, s) @@ -292,6 +302,9 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) { } return t.announceRequest(event), nil }, + Proxy: cl.config.HTTPProxy, + WebsocketTrackerHttpHeader: cl.config.WebsocketTrackerHttpHeader, + DialContext: cl.config.TrackerDialContext, OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) { cl.lock() defer cl.unlock() @@ -378,6 +391,7 @@ func (cl *Client) listenNetworks() (ns []network) { // Creates an anacrolix/dht Server, as would be done internally in NewClient, for the given conn. func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) { + logger := cl.logger.WithNames("dht", conn.LocalAddr().String()) cfg := dht.ServerConfig{ IPBlocklist: cl.ipBlockList, Conn: conn, @@ -390,20 +404,14 @@ func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err }(), StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()), OnQuery: cl.config.DHTOnQuery, - Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())), + Logger: logger, } if f := cl.config.ConfigureAnacrolixDhtServer; f != nil { f(&cfg) } s, err = dht.NewServer(&cfg) if err == nil { - go func() { - ts, err := s.Bootstrap() - if err != nil { - cl.logger.Printf("error bootstrapping dht: %s", err) - } - log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger) - }() + go s.TableMaintainer() } return } @@ -418,26 +426,23 @@ func (cl *Client) eachDhtServer(f func(DhtServer)) { } } -// Stops the client. All connections to peers are closed and all activity will -// come to a halt. +// Stops the client. All connections to peers are closed and all activity will come to a halt. func (cl *Client) Close() (errs []error) { - cl.closed.Set() var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning cl.lock() - cl.event.Broadcast() for _, t := range cl.torrents { err := t.close(&closeGroup) if err != nil { errs = append(errs, err) } } - cl.unlock() - closeGroup.Wait() // defer is LIFO. We want to Wait() after cl.unlock() - cl.lock() for i := range cl.onClose { cl.onClose[len(cl.onClose)-1-i]() } + cl.closed.Set() cl.unlock() + cl.event.Broadcast() + closeGroup.Wait() // defer is LIFO. We want to Wait() after cl.unlock() return } @@ -458,7 +463,7 @@ func (cl *Client) wantConns() bool { return true } for _, t := range cl.torrents { - if t.wantConns() { + if t.wantIncomingConns() { return true } } @@ -477,7 +482,6 @@ func (cl *Client) rejectAccepted(conn net.Conn) error { } if cl.config.DisableIPv4 && len(rip) == net.IPv4len { return errors.New("ipv4 disabled") - } if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil { return errors.New("ipv6 disabled") @@ -500,7 +504,7 @@ func (cl *Client) acceptConnections(l Listener) { cl.rLock() closed := cl.closed.IsSet() var reject error - if conn != nil { + if !closed && conn != nil { reject = cl.rejectAccepted(conn) } cl.rUnlock() @@ -511,22 +515,40 @@ func (cl *Client) acceptConnections(l Listener) { return } if err != nil { - log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger) + log.Fmsg("error accepting connection: %s", err).LogLevel(log.Debug, cl.logger) continue } + { + holepunchAddr, holepunchErr := addrPortFromPeerRemoteAddr(conn.RemoteAddr()) + if holepunchErr == nil { + cl.lock() + if g.MapContains( + cl.undialableWithoutHolepunchDialedAfterHolepunchConnect, + holepunchAddr, + ) { + g.MakeMapIfNil(&cl.probablyOnlyConnectedDueToHolepunch) + g.MapInsert(cl.probablyOnlyConnectedDueToHolepunch, holepunchAddr, struct{}{}) + } + cl.unlock() + } + } go func() { if reject != nil { torrent.Add("rejected accepted connections", 1) - log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger) + cl.logger.LazyLog(log.Debug, func() log.Msg { + return log.Fmsg("rejecting accepted conn: %v", reject) + }) conn.Close() } else { go cl.incomingConnection(conn) } - log.Fmsg("accepted %q connection at %q from %q", - l.Addr().Network(), - conn.LocalAddr(), - conn.RemoteAddr(), - ).SetLevel(log.Debug).Log(cl.logger) + cl.logger.LazyLog(log.Debug, func() log.Msg { + return log.Fmsg("accepted %q connection at %q from %q", + l.Addr().Network(), + conn.LocalAddr(), + conn.RemoteAddr(), + ) + }) torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1) torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1) torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1) @@ -544,8 +566,16 @@ func (cl *Client) incomingConnection(nc net.Conn) { if tc, ok := nc.(*net.TCPConn); ok { tc.SetLinger(0) } - c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(), - regularNetConnPeerConnConnString(nc)) + remoteAddr, _ := tryIpPortFromNetAddr(nc.RemoteAddr()) + c := cl.newConnection( + nc, + newConnectionOpts{ + outgoing: false, + remoteAddr: nc.RemoteAddr(), + localPublicAddr: cl.publicAddr(remoteAddr.IP), + network: nc.RemoteAddr().Network(), + connString: regularNetConnPeerConnConnString(nc), + }) defer func() { cl.lock() defer cl.unlock() @@ -557,8 +587,8 @@ func (cl *Client) incomingConnection(nc net.Conn) { // Returns a handle to the given torrent, if it's present in the client. func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) { - cl.lock() - defer cl.unlock() + cl.rLock() + defer cl.rUnlock() t, ok = cl.torrents[ih] return } @@ -580,7 +610,7 @@ func countDialResult(err error) { } } -func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) { +func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit, pendingPeers int) (ret time.Duration) { ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit) if ret < minDialTimeout { ret = minDialTimeout @@ -601,57 +631,24 @@ func (cl *Client) dialFirst(ctx context.Context, addr string) (res DialResult) { // Returns a connection over UTP or TCP, whichever is first to connect. func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResult) { - { - t := perf.NewTimer(perf.CallerName(0)) - defer func() { - if res.Conn == nil { - t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err())) - } else { - t.Mark("returned conn over " + res.Dialer.DialerNetwork()) - } - }() + pool := dialPool{ + addr: addr, } - ctx, cancel := context.WithCancel(ctx) - // As soon as we return one connection, cancel the others. - defer cancel() - left := 0 - resCh := make(chan DialResult, left) + defer pool.startDrainer() for _, _s := range dialers { - left++ - s := _s - go func() { - resCh <- DialResult{ - dialFromSocket(ctx, s, addr), - s, - } - }() - } - // Wait for a successful connection. - func() { - defer perf.ScopeTimer()() - for ; left > 0 && res.Conn == nil; left-- { - res = <-resCh - } - }() - // There are still incompleted dials. - go func() { - for ; left > 0; left-- { - conn := (<-resCh).Conn - if conn != nil { - conn.Close() - } - } - }() - if res.Conn != nil { - go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1) + pool.add(ctx, _s) } - return res + return pool.getFirst() } func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn { c, err := s.Dial(ctx, addr) + if err != nil { + log.Levelf(log.Debug, "error dialing %q: %v", addr, err) + } // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set - // it now in case we close the connection forthwith. + // it now in case we close the connection forthwith. Note this is also done in the TCP dialer + // code to increase the chance it's done. if tc, ok := c.(*net.TCPConn); ok { tc.SetLinger(0) } @@ -659,34 +656,40 @@ func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn { return c } -func forgettableDialError(err error) bool { - return strings.Contains(err.Error(), "no suitable address found") -} - -func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) { - if _, ok := t.halfOpen[addr]; !ok { - panic("invariant broken") +func (cl *Client) noLongerHalfOpen(t *Torrent, addr string, attemptKey outgoingConnAttemptKey) { + path := t.getHalfOpenPath(addr, attemptKey) + if !path.Exists() { + panic("should exist") } - delete(t.halfOpen, addr) + path.Delete() cl.numHalfOpen-- + if cl.numHalfOpen < 0 { + panic("should not be possible") + } for _, t := range cl.torrents { t.openNewConns() } } -// Performs initiator handshakes and returns a connection. Returns nil *connection if no connection +func (cl *Client) countHalfOpenFromTorrents() (count int) { + for _, t := range cl.torrents { + count += t.numHalfOpenAttempts() + } + return +} + +// Performs initiator handshakes and returns a connection. Returns nil *PeerConn if no connection // for valid reasons. func (cl *Client) initiateProtocolHandshakes( ctx context.Context, nc net.Conn, t *Torrent, - outgoing, encryptHeader bool, - remoteAddr PeerRemoteAddr, - network, connString string, + encryptHeader bool, + newConnOpts newConnectionOpts, ) ( c *PeerConn, err error, ) { - c = cl.newConnection(nc, outgoing, remoteAddr, network, connString) + c = cl.newConnection(nc, newConnOpts) c.headerEncrypted = encryptHeader ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout) defer cancel() @@ -702,74 +705,166 @@ func (cl *Client) initiateProtocolHandshakes( return } -// Returns nil connection and nil error if no connection could be established for valid reasons. -func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfuscatedHeader bool) (*PeerConn, error) { - dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration { - cl.rLock() - defer cl.rUnlock() - return t.dialTimeout() - }()) - defer cancel() - dr := cl.dialFirst(dialCtx, addr.String()) +func doProtocolHandshakeOnDialResult( + t *Torrent, + obfuscatedHeader bool, + addr PeerRemoteAddr, + dr DialResult, +) ( + c *PeerConn, err error, +) { + cl := t.cl nc := dr.Conn - if nc == nil { - if dialCtx.Err() != nil { - return nil, fmt.Errorf("dialing: %w", dialCtx.Err()) - } - return nil, errors.New("dial failed") - } - c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Dialer.DialerNetwork(), regularNetConnPeerConnConnString(nc)) + addrIpPort, _ := tryIpPortFromNetAddr(addr) + c, err = cl.initiateProtocolHandshakes( + context.Background(), nc, t, obfuscatedHeader, + newConnectionOpts{ + outgoing: true, + remoteAddr: addr, + // It would be possible to retrieve a public IP from the dialer used here? + localPublicAddr: cl.publicAddr(addrIpPort.IP), + network: dr.Dialer.DialerNetwork(), + connString: regularNetConnPeerConnConnString(nc), + }) if err != nil { nc.Close() } return c, err } -// Returns nil connection and nil error if no connection could be established -// for valid reasons. -func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *PeerConn, err error) { +// Returns nil connection and nil error if no connection could be established for valid reasons. +func (cl *Client) dialAndCompleteHandshake(opts outgoingConnOpts) (c *PeerConn, err error) { torrent.Add("establish outgoing connection", 1) - obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred - c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst) + addr := opts.peerInfo.Addr + dialPool := dialPool{ + resCh: make(chan DialResult), + addr: addr.String(), + } + defer dialPool.startDrainer() + dialTimeout := opts.t.getDialTimeoutUnlocked() + { + ctx, cancel := context.WithTimeout(context.Background(), dialTimeout) + defer cancel() + for _, d := range cl.dialers { + dialPool.add(ctx, d) + } + } + holepunchAddr, holepunchAddrErr := addrPortFromPeerRemoteAddr(addr) + if holepunchAddrErr == nil && opts.receivedHolepunchConnect { + cl.lock() + if g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) { + g.MakeMapIfNilAndSet( + &cl.undialableWithoutHolepunchDialedAfterHolepunchConnect, + holepunchAddr, + struct{}{}, + ) + } + cl.unlock() + } + headerObfuscationPolicy := opts.HeaderObfuscationPolicy + obfuscatedHeaderFirst := headerObfuscationPolicy.Preferred + firstDialResult := dialPool.getFirst() + if firstDialResult.Conn == nil { + // No dialers worked. Try to initiate a holepunching rendezvous. + if holepunchAddrErr == nil { + cl.lock() + if !opts.receivedHolepunchConnect { + g.MakeMapIfNilAndSet(&cl.undialableWithoutHolepunch, holepunchAddr, struct{}{}) + } + opts.t.startHolepunchRendezvous(holepunchAddr) + cl.unlock() + } + err = fmt.Errorf("all initial dials failed") + return + } + if opts.receivedHolepunchConnect && holepunchAddrErr == nil { + cl.lock() + if g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) { + g.MakeMapIfNilAndSet(&cl.dialableOnlyAfterHolepunch, holepunchAddr, struct{}{}) + } + g.MakeMapIfNil(&cl.dialedSuccessfullyAfterHolepunchConnect) + g.MapInsert(cl.dialedSuccessfullyAfterHolepunchConnect, holepunchAddr, struct{}{}) + cl.unlock() + } + c, err = doProtocolHandshakeOnDialResult( + opts.t, + obfuscatedHeaderFirst, + addr, + firstDialResult, + ) if err == nil { torrent.Add("initiated conn with preferred header obfuscation", 1) return } - //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err) - if cl.config.HeaderObfuscationPolicy.RequirePreferred { - // We should have just tried with the preferred header obfuscation. If it was required, - // there's nothing else to try. + // We should have just tried with the preferred header obfuscation. If it was required, there's nothing else to try. + if headerObfuscationPolicy.RequirePreferred { + return + } + // Reuse the dialer that returned already but failed to handshake. + { + ctx, cancel := context.WithTimeout(context.Background(), dialTimeout) + defer cancel() + dialPool.add(ctx, firstDialResult.Dialer) + } + secondDialResult := dialPool.getFirst() + if secondDialResult.Conn == nil { return } - // Try again with encryption if we didn't earlier, or without if we did. - c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst) + c, err = doProtocolHandshakeOnDialResult( + opts.t, + !obfuscatedHeaderFirst, + addr, + secondDialResult, + ) if err == nil { torrent.Add("initiated conn with fallback header obfuscation", 1) + return } - //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err) return } +type outgoingConnOpts struct { + peerInfo PeerInfo + t *Torrent + // Don't attempt to connect unless a connect message is received after initiating a rendezvous. + requireRendezvous bool + // Don't send rendezvous requests to eligible relays. + skipHolepunchRendezvous bool + // Outgoing connection attempt is in response to holepunch connect message. + receivedHolepunchConnect bool + HeaderObfuscationPolicy HeaderObfuscationPolicy +} + // Called to dial out and run a connection. The addr we're given is already // considered half-open. -func (cl *Client) outgoingConnection(t *Torrent, addr PeerRemoteAddr, ps PeerSource, trusted bool) { +func (cl *Client) outgoingConnection( + opts outgoingConnOpts, + attemptKey outgoingConnAttemptKey, +) { cl.dialRateLimiter.Wait(context.Background()) - c, err := cl.establishOutgoingConn(t, addr) + c, err := cl.dialAndCompleteHandshake(opts) + if err == nil { + c.conn.SetWriteDeadline(time.Time{}) + } cl.lock() defer cl.unlock() - // Don't release lock between here and addPeerConn, unless it's for - // failure. - cl.noLongerHalfOpen(t, addr.String()) + // Don't release lock between here and addPeerConn, unless it's for failure. + cl.noLongerHalfOpen(opts.t, opts.peerInfo.Addr.String(), attemptKey) if err != nil { if cl.config.Debug { - cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err) + cl.logger.Levelf( + log.Debug, + "error establishing outgoing connection to %v: %v", + opts.peerInfo.Addr, + err, + ) } return } defer c.close() - c.Discovery = ps - c.trusted = trusted - t.runHandshookConnLoggingErr(c) + c.Discovery = opts.peerInfo.Source + c.trusted = opts.peerInfo.Trusted + opts.t.runHandshookConnLoggingErr(c) } // The port number for incoming peer connections. 0 if the client isn't listening. @@ -885,7 +980,8 @@ func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo. if err != nil { return } - successfulPeerWireProtocolHandshakePeerReservedBytes.Add(res.PeerExtensionBits.String(), 1) + successfulPeerWireProtocolHandshakePeerReservedBytes.Add( + hex.EncodeToString(res.PeerExtensionBits[:]), 1) ret = res.Hash c.PeerExtensionBytes = res.PeerExtensionBits c.PeerID = res.PeerID @@ -903,12 +999,13 @@ func (cl *Client) runReceivedConn(c *PeerConn) { } t, err := cl.receiveHandshakes(c) if err != nil { - log.Fmsg( - "error receiving handshakes on %v: %s", c, err, - ).SetLevel(log.Debug). - Add( + cl.logger.LazyLog(log.Debug, func() log.Msg { + return log.Fmsg( + "error receiving handshakes on %v: %s", c, err, + ).Add( "network", c.Network, - ).Log(cl.logger) + ) + }) torrent.Add("error receiving handshake", 1) cl.lock() cl.onBadAccept(c.RemoteAddr) @@ -917,13 +1014,16 @@ func (cl *Client) runReceivedConn(c *PeerConn) { } if t == nil { torrent.Add("received handshake for unloaded torrent", 1) - log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger) + cl.logger.LazyLog(log.Debug, func() log.Msg { + return log.Fmsg("received handshake for unloaded torrent") + }) cl.lock() cl.onBadAccept(c.RemoteAddr) cl.unlock() return } torrent.Add("received handshake for loaded torrent", 1) + c.conn.SetWriteDeadline(time.Time{}) cl.lock() defer cl.unlock() t.runHandshookConnLoggingErr(c) @@ -932,20 +1032,24 @@ func (cl *Client) runReceivedConn(c *PeerConn) { // Client lock must be held before entering this. func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error { c.setTorrent(t) + for i, b := range cl.config.MinPeerExtensions { + if c.PeerExtensionBytes[i]&b != b { + return fmt.Errorf("peer did not meet minimum peer extensions: %x", c.PeerExtensionBytes[:]) + } + } if c.PeerID == cl.peerID { if c.outgoing { connsToSelf.Add(1) - addr := c.conn.RemoteAddr().String() + addr := c.RemoteAddr.String() cl.dopplegangerAddrs[addr] = struct{}{} } /* else { // Because the remote address is not necessarily the same as its client's torrent listen // address, we won't record the remote address as a doppleganger. Instead, the initiator // can record *us* as the doppleganger. } */ - t.logger.WithLevel(log.Debug).Printf("local and remote peer ids are the same") + t.logger.Levelf(log.Debug, "local and remote peer ids are the same") return nil } - c.conn.SetWriteDeadline(time.Time{}) c.r = deadlineReader{c.conn, c.r} completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1) if connIsIpv6(c.conn) { @@ -955,10 +1059,9 @@ func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error { return fmt.Errorf("adding connection: %w", err) } defer t.dropConnection(c) - c.startWriter() + c.startMessageWriter() cl.sendInitialMessages(c, t) - c.updateRequestsTimer = time.AfterFunc(math.MaxInt64, c.updateRequestsTimerFunc) - c.updateRequestsTimer.Stop() + c.initUpdateRequestsTimer() err := c.mainReadLoop() if err != nil { return fmt.Errorf("main read loop: %w", err) @@ -966,22 +1069,42 @@ func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error { return nil } -func (c *PeerConn) updateRequestsTimerFunc() { +func (p *Peer) initUpdateRequestsTimer() { + if check.Enabled { + if p.updateRequestsTimer != nil { + panic(p.updateRequestsTimer) + } + } + if enableUpdateRequestsTimer { + p.updateRequestsTimer = time.AfterFunc(math.MaxInt64, p.updateRequestsTimerFunc) + } +} + +const peerUpdateRequestsTimerReason = "updateRequestsTimer" + +func (c *Peer) updateRequestsTimerFunc() { c.locker().Lock() defer c.locker().Unlock() - if c.needRequestUpdate != "" { + if c.closed.IsSet() { return } - if c.actualRequestState.Requests.IsEmpty() { - panic("updateRequestsTimer should have been stopped") + if c.isLowOnRequests() { + // If there are no outstanding requests, then a request update should have already run. + return } - c.updateRequests("updateRequestsTimer") + if d := time.Since(c.lastRequestUpdate); d < updateRequestsTimerDuration { + // These should be benign, Timer.Stop doesn't guarantee that its function won't run if it's + // already been fired. + torrent.Add("spurious timer requests updates", 1) + return + } + c.updateRequests(peerUpdateRequestsTimerReason) } // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this // instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB // (1<<19) cached for sending, for 16KiB (1<<14) chunks. -const localClientReqq = 1 << 5 +const localClientReqq = 1024 // See the order given in Transmission's tr_peerMsgsNew. func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) { @@ -992,7 +1115,8 @@ func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) { ExtendedPayload: func() []byte { msg := pp.ExtendedHandshakeMessage{ M: map[pp.ExtensionName]pp.ExtensionNumber{ - pp.ExtensionNameMetadata: metadataExtendedId, + pp.ExtensionNameMetadata: metadataExtendedId, + utHolepunch.ExtensionName: utHolepunchExtendedId, }, V: cl.config.ExtendedHandshakeClientVersion, Reqq: localClientReqq, @@ -1000,8 +1124,7 @@ func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) { Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred, Port: cl.incomingPeerPort(), MetadataSize: torrent.metadataSize(), - // TODO: We can figured these out specific to the socket - // used. + // TODO: We can figure these out specific to the socket used. Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()), Ipv6: cl.config.PublicIp6.To16(), } @@ -1099,8 +1222,9 @@ func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool { return false } +// Returns whether the IP address and port are considered "bad". func (cl *Client) badPeerIPPort(ip net.IP, port int) bool { - if port == 0 { + if port == 0 || ip == nil { return true } if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) { @@ -1109,7 +1233,11 @@ func (cl *Client) badPeerIPPort(ip net.IP, port int) bool { if _, ok := cl.ipBlockRange(ip); ok { return true } - if _, ok := cl.badPeerIPs[ip.String()]; ok { + ipAddr, ok := netip.AddrFromSlice(ip) + if !ok { + panic(ip) + } + if _, ok := cl.badPeerIPs[ipAddr]; ok { return true } return false @@ -1117,14 +1245,14 @@ func (cl *Client) badPeerIPPort(ip net.IP, port int) bool { // Return a Torrent ready for insertion into a Client. func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) { - return cl.newTorrentOpt(addTorrentOpts{ + return cl.newTorrentOpt(AddTorrentOpts{ InfoHash: ih, Storage: specStorage, }) } // Return a Torrent ready for insertion into a Client. -func (cl *Client) newTorrentOpt(opts addTorrentOpts) (t *Torrent) { +func (cl *Client) newTorrentOpt(opts AddTorrentOpts) (t *Torrent) { // use provided storage, if provided storageClient := cl.defaultStorage if opts.Storage != nil { @@ -1135,7 +1263,7 @@ func (cl *Client) newTorrentOpt(opts addTorrentOpts) (t *Torrent) { cl: cl, infoHash: opts.InfoHash, peers: prioritizedPeers{ - om: btree.New(32), + om: gbtree.New(32), getPrio: func(p PeerInfo) peerPriority { ipPort := p.addr() return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort) @@ -1143,9 +1271,6 @@ func (cl *Client) newTorrentOpt(opts addTorrentOpts) (t *Torrent) { }, conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent), - halfOpen: make(map[string]PeerInfo), - pieceStateChanges: pubsub.NewPubSub(), - storageOpener: storageClient, maxEstablishedConns: cl.config.EstablishedConnsPerTorrent, @@ -1155,8 +1280,11 @@ func (cl *Client) newTorrentOpt(opts addTorrentOpts) (t *Torrent) { webSeeds: make(map[string]*Peer), gotMetainfoC: make(chan struct{}), } + t.smartBanCache.Hash = sha1.Sum + t.smartBanCache.Init() t.networkingEnabled.Set() - t.logger = cl.logger.WithContextValue(t) + t.logger = cl.logger.WithContextValue(t).WithNames("torrent", t.infoHash.HexString()).WithDefaultLevel(log.Debug) + t.sourcesLogger = t.logger.WithNames("sources") if opts.ChunkSize == 0 { opts.ChunkSize = defaultChunkSize } @@ -1202,10 +1330,9 @@ func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStor return } -// Adds a torrent by InfoHash with a custom Storage implementation. -// If the torrent already exists then this Storage is ignored and the -// existing torrent returned with `new` set to `false` -func (cl *Client) AddTorrentOpt(opts addTorrentOpts) (t *Torrent, new bool) { +// Adds a torrent by InfoHash with a custom Storage implementation. If the torrent already exists +// then this Storage is ignored and the existing torrent returned with `new` set to `false`. +func (cl *Client) AddTorrentOpt(opts AddTorrentOpts) (t *Torrent, new bool) { infoHash := opts.InfoHash cl.lock() defer cl.unlock() @@ -1222,6 +1349,7 @@ func (cl *Client) AddTorrentOpt(opts addTorrentOpts) (t *Torrent, new bool) { } }) cl.torrents[infoHash] = t + t.setInfoBytesLocked(opts.InfoBytes) cl.clearAcceptLimits() t.updateWantPeersEvent() // Tickle Client.waitAccept, new torrent may want conns. @@ -1229,16 +1357,17 @@ func (cl *Client) AddTorrentOpt(opts addTorrentOpts) (t *Torrent, new bool) { return } -type addTorrentOpts struct { - InfoHash InfoHash +type AddTorrentOpts struct { + InfoHash infohash.T Storage storage.ClientImpl ChunkSize pp.Integer + InfoBytes []byte } // Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also // Torrent.MergeSpec. func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) { - t, new = cl.AddTorrentOpt(addTorrentOpts{ + t, new = cl.AddTorrentOpt(AddTorrentOpts{ InfoHash: spec.InfoHash, Storage: spec.Storage, ChunkSize: spec.ChunkSize, @@ -1256,13 +1385,6 @@ func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err e return } -type stringAddr string - -var _ net.Addr = stringAddr("") - -func (stringAddr) Network() string { return "" } -func (me stringAddr) String() string { return string(me) } - // The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set. // spec.DisallowDataDownload/Upload will be read and applied // The display name is replaced if the new spec provides one. Note that any `Storage` is ignored. @@ -1270,7 +1392,6 @@ func (t *Torrent) MergeSpec(spec *TorrentSpec) error { if spec.DisplayName != "" { t.SetDisplayName(spec.DisplayName) } - t.initialPieceCheckDisabled = spec.DisableInitialPieceCheck if spec.InfoBytes != nil { err := t.SetInfoBytes(spec.InfoBytes) if err != nil { @@ -1279,15 +1400,16 @@ func (t *Torrent) MergeSpec(spec *TorrentSpec) error { } cl := t.cl cl.AddDhtNodes(spec.DhtNodes) + t.UseSources(spec.Sources) cl.lock() defer cl.unlock() - useTorrentSources(spec.Sources, t) + t.initialPieceCheckDisabled = spec.DisableInitialPieceCheck for _, url := range spec.Webseeds { t.addWebSeed(url) } for _, peerAddr := range spec.PeerAddrs { t.addPeer(PeerInfo{ - Addr: stringAddr(peerAddr), + Addr: StringAddr(peerAddr), Source: PeerSourceDirect, Trusted: true, }) @@ -1302,52 +1424,6 @@ func (t *Torrent) MergeSpec(spec *TorrentSpec) error { return nil } -func useTorrentSources(sources []string, t *Torrent) { - // TODO: bind context to the lifetime of *Torrent so that it's cancelled if the torrent closes - ctx := context.Background() - for i := 0; i < len(sources); i += 1 { - s := sources[i] - go func() { - if err := useTorrentSource(ctx, s, t); err != nil { - t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err) - } else { - t.logger.Printf("successfully used source %q", s) - } - }() - } -} - -func useTorrentSource(ctx context.Context, source string, t *Torrent) (err error) { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - go func() { - select { - case <-t.GotInfo(): - case <-t.Closed(): - case <-ctx.Done(): - } - cancel() - }() - var req *http.Request - if req, err = http.NewRequestWithContext(ctx, http.MethodGet, source, nil); err != nil { - panic(err) - } - var resp *http.Response - if resp, err = http.DefaultClient.Do(req); err != nil { - return - } - var mi metainfo.MetaInfo - err = bencode.NewDecoder(resp.Body).Decode(&mi) - resp.Body.Close() - if err != nil { - if ctx.Err() != nil { - return nil - } - return - } - return t.MergeSpec(TorrentSpecFromMetaInfo(&mi)) -} - func (cl *Client) dropTorrent(infoHash metainfo.Hash, wg *sync.WaitGroup) (err error) { t, ok := cl.torrents[infoHash] if !ok { @@ -1355,9 +1431,6 @@ func (cl *Client) dropTorrent(infoHash metainfo.Hash, wg *sync.WaitGroup) (err e return } err = t.close(wg) - if err != nil { - panic(err) - } delete(cl.torrents, infoHash) return } @@ -1390,8 +1463,8 @@ func (cl *Client) WaitAll() bool { // Returns handles to all the torrents loaded in the Client. func (cl *Client) Torrents() []*Torrent { - cl.lock() - defer cl.unlock() + cl.rLock() + defer cl.rUnlock() return cl.torrentsAsSlice() } @@ -1453,39 +1526,72 @@ func (cl *Client) AddDhtNodes(nodes []string) { } func (cl *Client) banPeerIP(ip net.IP) { - cl.logger.Printf("banning ip %v", ip) - if cl.badPeerIPs == nil { - cl.badPeerIPs = make(map[string]struct{}) + // We can't take this from string, because it will lose netip's v4on6. net.ParseIP parses v4 + // addresses directly to v4on6, which doesn't compare equal with v4. + ipAddr, ok := netip.AddrFromSlice(ip) + if !ok { + panic(ip) + } + g.MakeMapIfNilAndSet(&cl.badPeerIPs, ipAddr, struct{}{}) + for _, t := range cl.torrents { + t.iterPeers(func(p *Peer) { + if p.remoteIp().Equal(ip) { + t.logger.Levelf(log.Warning, "dropping peer %v with banned ip %v", p, ip) + // Should this be a close? + p.drop() + } + }) } - cl.badPeerIPs[ip.String()] = struct{}{} } -func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) { - if network == "" { - panic(remoteAddr) +type newConnectionOpts struct { + outgoing bool + remoteAddr PeerRemoteAddr + localPublicAddr peerLocalPublicAddr + network string + connString string +} + +func (cl *Client) newConnection(nc net.Conn, opts newConnectionOpts) (c *PeerConn) { + if opts.network == "" { + panic(opts.remoteAddr) } c = &PeerConn{ Peer: Peer{ - outgoing: outgoing, + outgoing: opts.outgoing, choking: true, peerChoking: true, PeerMaxRequests: 250, - RemoteAddr: remoteAddr, - Network: network, - callbacks: &cl.config.Callbacks, + RemoteAddr: opts.remoteAddr, + localPublicAddr: opts.localPublicAddr, + Network: opts.network, + callbacks: &cl.config.Callbacks, }, - connString: connString, + connString: opts.connString, conn: nc, } + c.peerRequestDataAllocLimiter.Max = cl.config.MaxAllocPeerRequestDataPerConn + c.initRequestState() + // TODO: Need to be much more explicit about this, including allowing non-IP bannable addresses. + if opts.remoteAddr != nil { + netipAddrPort, err := netip.ParseAddrPort(opts.remoteAddr.String()) + if err == nil { + c.bannableAddr = Some(netipAddrPort.Addr()) + } + } c.peerImpl = c - c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c) + c.logger = cl.logger.WithDefaultLevel(log.Warning) c.setRW(connStatsReadWriter{nc, c}) c.r = &rateLimitedReader{ l: cl.config.DownloadRateLimiter, r: c.r, } - c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing) + c.logger.Levelf( + log.Debug, + "new PeerConn %p [Client %p remoteAddr %v network %v outgoing %t]", + c, cl, opts.remoteAddr, opts.network, opts.outgoing, + ) for _, f := range cl.config.Callbacks.NewPeer { f(&c.Peer) } @@ -1574,6 +1680,16 @@ func (cl *Client) ListenAddrs() (ret []net.Addr) { return } +func (cl *Client) PublicIPs() (ips []net.IP) { + if ip := cl.config.PublicIp4; ip != nil { + ips = append(ips, ip) + } + if ip := cl.config.PublicIp6; ip != nil { + ips = append(ips, ip) + } + return +} + func (cl *Client) onBadAccept(addr PeerRemoteAddr) { ipa, ok := tryIpPortFromNetAddr(addr) if !ok { @@ -1641,8 +1757,14 @@ func (cl *Client) String() string { return fmt.Sprintf("<%[1]T %[1]p>", cl) } -// Returns connection-level aggregate stats at the Client level. See the comment on +// Returns connection-level aggregate connStats at the Client level. See the comment on // TorrentStats.ConnStats. func (cl *Client) ConnStats() ConnStats { - return cl.stats.Copy() + return cl.connStats.Copy() +} + +func (cl *Client) Stats() ClientStats { + cl.rLock() + defer cl.rUnlock() + return cl.statsLocked() }