]> Sergey Matveev's repositories - btrtrc.git/blobdiff - client.go
Handle more PeerRemoteAddr variants when calculating dial addr
[btrtrc.git] / client.go
index 9861f7ef3ee052c9aa3e664ed6fe933a8b25f840..6a82774e7a0d5cf8f0fa4b265161de1076a40609 100644 (file)
--- a/client.go
+++ b/client.go
@@ -2,57 +2,66 @@ package torrent
 
 import (
        "bufio"
-       "bytes"
        "context"
        "crypto/rand"
+       "crypto/sha1"
        "encoding/binary"
+       "encoding/hex"
        "errors"
+       "expvar"
        "fmt"
        "io"
+       "math"
        "net"
+       "net/http"
+       "net/netip"
+       "sort"
        "strconv"
-       "strings"
        "time"
 
+       "github.com/anacrolix/chansync"
+       "github.com/anacrolix/chansync/events"
        "github.com/anacrolix/dht/v2"
        "github.com/anacrolix/dht/v2/krpc"
+       . "github.com/anacrolix/generics"
+       g "github.com/anacrolix/generics"
        "github.com/anacrolix/log"
-       "github.com/anacrolix/missinggo/bitmap"
        "github.com/anacrolix/missinggo/perf"
-       "github.com/anacrolix/missinggo/pubsub"
-       "github.com/anacrolix/missinggo/slices"
+       "github.com/anacrolix/missinggo/v2"
+       "github.com/anacrolix/missinggo/v2/bitmap"
        "github.com/anacrolix/missinggo/v2/pproffd"
        "github.com/anacrolix/sync"
-       "github.com/anacrolix/torrent/tracker"
-       "github.com/anacrolix/torrent/webtorrent"
        "github.com/davecgh/go-spew/spew"
        "github.com/dustin/go-humanize"
-       "github.com/google/btree"
+       gbtree "github.com/google/btree"
        "github.com/pion/datachannel"
        "golang.org/x/time/rate"
-       "golang.org/x/xerrors"
-
-       "github.com/anacrolix/missinggo/v2"
-       "github.com/anacrolix/missinggo/v2/conntrack"
 
        "github.com/anacrolix/torrent/bencode"
+       "github.com/anacrolix/torrent/internal/check"
+       "github.com/anacrolix/torrent/internal/limiter"
        "github.com/anacrolix/torrent/iplist"
        "github.com/anacrolix/torrent/metainfo"
        "github.com/anacrolix/torrent/mse"
        pp "github.com/anacrolix/torrent/peer_protocol"
+       utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch"
+       request_strategy "github.com/anacrolix/torrent/request-strategy"
        "github.com/anacrolix/torrent/storage"
+       "github.com/anacrolix/torrent/tracker"
+       "github.com/anacrolix/torrent/types/infohash"
+       "github.com/anacrolix/torrent/webtorrent"
 )
 
 // Clients contain zero or more Torrents. A Client manages a blocklist, the
 // TCP/UDP protocol ports, and DHT as desired.
 type Client struct {
-       // An aggregate of stats over all connections. First in struct to ensure
-       // 64-bit alignment of fields. See #262.
-       stats ConnStats
+       // An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
+       // fields. See #262.
+       connStats ConnStats
 
        _mu    lockWithDeferreds
        event  sync.Cond
-       closed missinggo.Event
+       closed chansync.SetOnce
 
        config *ClientConfig
        logger log.Logger
@@ -69,25 +78,39 @@ type Client struct {
        // include ourselves if we end up trying to connect to our own address
        // through legitimate channels.
        dopplegangerAddrs map[string]struct{}
-       badPeerIPs        map[string]struct{}
+       badPeerIPs        map[netip.Addr]struct{}
        torrents          map[InfoHash]*Torrent
+       pieceRequestOrder map[interface{}]*request_strategy.PieceRequestOrder
 
        acceptLimiter   map[ipStr]int
        dialRateLimiter *rate.Limiter
+       numHalfOpen     int
 
        websocketTrackers websocketTrackers
+
+       activeAnnounceLimiter limiter.Instance
+       httpClient            *http.Client
+
+       clientHolepunchAddrSets
 }
 
 type ipStr string
 
-func (cl *Client) BadPeerIPs() []string {
+func (cl *Client) BadPeerIPs() (ips []string) {
        cl.rLock()
-       defer cl.rUnlock()
-       return cl.badPeerIPsLocked()
+       ips = cl.badPeerIPsLocked()
+       cl.rUnlock()
+       return
 }
 
-func (cl *Client) badPeerIPsLocked() []string {
-       return slices.FromMapKeys(cl.badPeerIPs).([]string)
+func (cl *Client) badPeerIPsLocked() (ips []string) {
+       ips = make([]string, len(cl.badPeerIPs))
+       i := 0
+       for k := range cl.badPeerIPs {
+               ips[i] = k.String()
+               i += 1
+       }
+       return
 }
 
 func (cl *Client) PeerID() PeerID {
@@ -98,10 +121,11 @@ func (cl *Client) PeerID() PeerID {
 // numbers are the same, due to support for custom listeners. Returns zero if no port number is
 // found.
 func (cl *Client) LocalPort() (port int) {
-       cl.eachListener(func(l Listener) bool {
-               port = addrPortOrZero(l.Addr())
-               return port == 0
-       })
+       for i := 0; i < len(cl.listeners); i += 1 {
+               if port = addrPortOrZero(cl.listeners[i].Addr()); port != 0 {
+                       return
+               }
+       }
        return
 }
 
@@ -127,12 +151,14 @@ func (cl *Client) WriteStatus(_w io.Writer) {
                fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
                writeDhtServerStatus(w, s)
        })
-       spew.Fdump(w, &cl.stats)
-       fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
+       dumpStats(w, cl.statsLocked())
+       torrentsSlice := cl.torrentsAsSlice()
+       fmt.Fprintf(w, "# Torrents: %d\n", len(torrentsSlice))
        fmt.Fprintln(w)
-       for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
-               return l.InfoHash().AsString() < r.InfoHash().AsString()
-       }).([]*Torrent) {
+       sort.Slice(torrentsSlice, func(l, r int) bool {
+               return torrentsSlice[l].infoHash.AsString() < torrentsSlice[r].infoHash.AsString()
+       })
+       for _, t := range torrentsSlice {
                if t.name() == "" {
                        fmt.Fprint(w, "<unknown name>")
                } else {
@@ -144,8 +170,8 @@ func (cl *Client) WriteStatus(_w io.Writer) {
                                w,
                                "%f%% of %d bytes (%s)",
                                100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
-                               *t.length,
-                               humanize.Bytes(uint64(*t.length)))
+                               t.length(),
+                               humanize.Bytes(uint64(t.length())))
                } else {
                        w.WriteString("<missing metainfo>")
                }
@@ -156,41 +182,57 @@ func (cl *Client) WriteStatus(_w io.Writer) {
 }
 
 func (cl *Client) initLogger() {
-       cl.logger = cl.config.Logger.WithValues(cl)
-       if !cl.config.Debug {
-               cl.logger = cl.logger.FilterLevel(log.Info)
+       logger := cl.config.Logger
+       if logger.IsZero() {
+               logger = log.Default
+       }
+       if cl.config.Debug {
+               logger = logger.FilterLevel(log.Debug)
        }
+       cl.logger = logger.WithValues(cl)
 }
 
 func (cl *Client) announceKey() int32 {
        return int32(binary.BigEndian.Uint32(cl.peerID[16:20]))
 }
 
+// Initializes a bare minimum Client. *Client and *ClientConfig must not be nil.
+func (cl *Client) init(cfg *ClientConfig) {
+       cl.config = cfg
+       g.MakeMap(&cl.dopplegangerAddrs)
+       cl.torrents = make(map[metainfo.Hash]*Torrent)
+       cl.dialRateLimiter = rate.NewLimiter(10, 10)
+       cl.activeAnnounceLimiter.SlotsPerKey = 2
+       cl.event.L = cl.locker()
+       cl.ipBlockList = cfg.IPBlocklist
+       cl.httpClient = &http.Client{
+               Transport: &http.Transport{
+                       Proxy:       cfg.HTTPProxy,
+                       DialContext: cfg.HTTPDialContext,
+                       // I think this value was observed from some webseeds. It seems reasonable to extend it
+                       // to other uses of HTTP from the client.
+                       MaxConnsPerHost: 10,
+               },
+       }
+}
+
 func NewClient(cfg *ClientConfig) (cl *Client, err error) {
        if cfg == nil {
                cfg = NewDefaultClientConfig()
                cfg.ListenPort = 0
        }
-       defer func() {
-               if err != nil {
-                       cl = nil
-               }
-       }()
-       cl = &Client{
-               config:            cfg,
-               dopplegangerAddrs: make(map[string]struct{}),
-               torrents:          make(map[metainfo.Hash]*Torrent),
-               dialRateLimiter:   rate.NewLimiter(10, 10),
-       }
+       var client Client
+       client.init(cfg)
+       cl = &client
        go cl.acceptLimitClearer()
        cl.initLogger()
        defer func() {
-               if err == nil {
-                       return
+               if err != nil {
+                       cl.Close()
+                       cl = nil
                }
-               cl.Close()
        }()
-       cl.event.L = cl.locker()
+
        storageImpl := cfg.DefaultStorage
        if storageImpl == nil {
                // We'd use mmap by default but HFS+ doesn't support sparse files.
@@ -203,9 +245,6 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) {
                storageImpl = storageImplCloser
        }
        cl.defaultStorage = storage.NewClient(storageImpl)
-       if cfg.IPBlocklist != nil {
-               cl.ipBlockList = cfg.IPBlocklist
-       }
 
        if cfg.PeerID != "" {
                missinggo.CopyExact(&cl.peerID, cfg.PeerID)
@@ -217,7 +256,7 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) {
                }
        }
 
-       sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback)
+       sockets, err := listenAll(cl.listenNetworks(), cl.config.ListenHost, cl.config.ListenPort, cl.firewallCallback, cl.logger)
        if err != nil {
                return
        }
@@ -227,11 +266,13 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) {
 
        for _, _s := range sockets {
                s := _s // Go is fucking retarded.
-               cl.onClose = append(cl.onClose, func() { s.Close() })
+               cl.onClose = append(cl.onClose, func() { go s.Close() })
                if peerNetworkEnabled(parseNetworkString(s.Addr().Network()), cl.config) {
                        cl.dialers = append(cl.dialers, s)
                        cl.listeners = append(cl.listeners, s)
-                       go cl.acceptConnections(s)
+                       if cl.config.AcceptPeerConnections {
+                               go cl.acceptConnections(s)
+                       }
                }
        }
 
@@ -239,11 +280,11 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) {
        if !cfg.NoDHT {
                for _, s := range sockets {
                        if pc, ok := s.(net.PacketConn); ok {
-                               ds, err := cl.newAnacrolixDhtServer(pc)
+                               ds, err := cl.NewAnacrolixDhtServer(pc)
                                if err != nil {
                                        panic(err)
                                }
-                               cl.dhtServers = append(cl.dhtServers, anacrolixDhtServerWrapper{ds})
+                               cl.dhtServers = append(cl.dhtServers, AnacrolixDhtServerWrapper{ds})
                                cl.onClose = append(cl.onClose, func() { ds.Close() })
                        }
                }
@@ -261,6 +302,9 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) {
                        }
                        return t.announceRequest(event), nil
                },
+               Proxy:                      cl.config.HTTPProxy,
+               WebsocketTrackerHttpHeader: cl.config.WebsocketTrackerHttpHeader,
+               DialContext:                cl.config.TrackerDialContext,
                OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
                        cl.lock()
                        defer cl.unlock()
@@ -295,16 +339,22 @@ func (cl *Client) AddDialer(d Dialer) {
        }
 }
 
+func (cl *Client) Listeners() []Listener {
+       return cl.listeners
+}
+
 // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
 // yourself.
 func (cl *Client) AddListener(l Listener) {
        cl.listeners = append(cl.listeners, l)
-       go cl.acceptConnections(l)
+       if cl.config.AcceptPeerConnections {
+               go cl.acceptConnections(l)
+       }
 }
 
 func (cl *Client) firewallCallback(net.Addr) bool {
        cl.rLock()
-       block := !cl.wantConns()
+       block := !cl.wantConns() || !cl.config.AcceptPeerConnections
        cl.rUnlock()
        if block {
                torrent.Add("connections firewalled", 1)
@@ -339,7 +389,9 @@ func (cl *Client) listenNetworks() (ns []network) {
        return
 }
 
-func (cl *Client) newAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
+// Creates an anacrolix/dht Server, as would be done internally in NewClient, for the given conn.
+func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
+       logger := cl.logger.WithNames("dht", conn.LocalAddr().String())
        cfg := dht.ServerConfig{
                IPBlocklist:    cl.ipBlockList,
                Conn:           conn,
@@ -350,30 +402,22 @@ func (cl *Client) newAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err
                        }
                        return cl.config.PublicIp4
                }(),
-               StartingNodes:      cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
-               ConnectionTracking: cl.config.ConnTracker,
-               OnQuery:            cl.config.DHTOnQuery,
-               Logger: cl.logger.WithText(func(m log.Msg) string {
-                       return fmt.Sprintf("dht server on %v: %s", conn.LocalAddr().String(), m.Text())
-               }),
+               StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
+               OnQuery:       cl.config.DHTOnQuery,
+               Logger:        logger,
+       }
+       if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
+               f(&cfg)
        }
        s, err = dht.NewServer(&cfg)
        if err == nil {
-               go func() {
-                       ts, err := s.Bootstrap()
-                       if err != nil {
-                               cl.logger.Printf("error bootstrapping dht: %s", err)
-                       }
-                       log.Fstr("%v completed bootstrap (%v)", s, ts).AddValues(s, ts).Log(cl.logger)
-               }()
+               go s.TableMaintainer()
        }
        return
 }
 
-func (cl *Client) Closed() <-chan struct{} {
-       cl.lock()
-       defer cl.unlock()
-       return cl.closed.C()
+func (cl *Client) Closed() events.Done {
+       return cl.closed.Done()
 }
 
 func (cl *Client) eachDhtServer(f func(DhtServer)) {
@@ -382,19 +426,24 @@ func (cl *Client) eachDhtServer(f func(DhtServer)) {
        }
 }
 
-// Stops the client. All connections to peers are closed and all activity will
-// come to a halt.
-func (cl *Client) Close() {
+// Stops the client. All connections to peers are closed and all activity will come to a halt.
+func (cl *Client) Close() (errs []error) {
+       var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning
        cl.lock()
-       defer cl.unlock()
-       cl.closed.Set()
        for _, t := range cl.torrents {
-               t.close()
+               err := t.close(&closeGroup)
+               if err != nil {
+                       errs = append(errs, err)
+               }
        }
        for i := range cl.onClose {
                cl.onClose[len(cl.onClose)-1-i]()
        }
+       cl.closed.Set()
+       cl.unlock()
        cl.event.Broadcast()
+       closeGroup.Wait() // defer is LIFO. We want to Wait() after cl.unlock()
+       return
 }
 
 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
@@ -410,28 +459,22 @@ func (cl *Client) ipIsBlocked(ip net.IP) bool {
 }
 
 func (cl *Client) wantConns() bool {
+       if cl.config.AlwaysWantConns {
+               return true
+       }
        for _, t := range cl.torrents {
-               if t.wantConns() {
+               if t.wantIncomingConns() {
                        return true
                }
        }
        return false
 }
 
-func (cl *Client) waitAccept() {
-       for {
-               if cl.closed.IsSet() {
-                       return
-               }
-               if cl.wantConns() {
-                       return
-               }
-               cl.event.Wait()
-       }
-}
-
 // TODO: Apply filters for non-standard networks, particularly rate-limiting.
 func (cl *Client) rejectAccepted(conn net.Conn) error {
+       if !cl.wantConns() {
+               return errors.New("don't want conns right now")
+       }
        ra := conn.RemoteAddr()
        if rip := addrIpOrNil(ra); rip != nil {
                if cl.config.DisableIPv4Peers && rip.To4() != nil {
@@ -439,7 +482,6 @@ func (cl *Client) rejectAccepted(conn net.Conn) error {
                }
                if cl.config.DisableIPv4 && len(rip) == net.IPv4len {
                        return errors.New("ipv4 disabled")
-
                }
                if cl.config.DisableIPv6 && len(rip) == net.IPv6len && rip.To4() == nil {
                        return errors.New("ipv6 disabled")
@@ -454,7 +496,7 @@ func (cl *Client) rejectAccepted(conn net.Conn) error {
        return nil
 }
 
-func (cl *Client) acceptConnections(l net.Listener) {
+func (cl *Client) acceptConnections(l Listener) {
        for {
                conn, err := l.Accept()
                torrent.Add("client listener accepts", 1)
@@ -462,7 +504,7 @@ func (cl *Client) acceptConnections(l net.Listener) {
                cl.rLock()
                closed := cl.closed.IsSet()
                var reject error
-               if conn != nil {
+               if !closed && conn != nil {
                        reject = cl.rejectAccepted(conn)
                }
                cl.rUnlock()
@@ -473,22 +515,40 @@ func (cl *Client) acceptConnections(l net.Listener) {
                        return
                }
                if err != nil {
-                       log.Fmsg("error accepting connection: %s", err).SetLevel(log.Debug).Log(cl.logger)
+                       log.Fmsg("error accepting connection: %s", err).LogLevel(log.Debug, cl.logger)
                        continue
                }
+               {
+                       holepunchAddr, holepunchErr := addrPortFromPeerRemoteAddr(conn.RemoteAddr())
+                       if holepunchErr == nil {
+                               cl.lock()
+                               if g.MapContains(
+                                       cl.undialableWithoutHolepunchDialedAfterHolepunchConnect,
+                                       holepunchAddr,
+                               ) {
+                                       g.MakeMapIfNil(&cl.probablyOnlyConnectedDueToHolepunch)
+                                       g.MapInsert(cl.probablyOnlyConnectedDueToHolepunch, holepunchAddr, struct{}{})
+                               }
+                               cl.unlock()
+                       }
+               }
                go func() {
                        if reject != nil {
                                torrent.Add("rejected accepted connections", 1)
-                               log.Fmsg("rejecting accepted conn: %v", reject).SetLevel(log.Debug).Log(cl.logger)
+                               cl.logger.LazyLog(log.Debug, func() log.Msg {
+                                       return log.Fmsg("rejecting accepted conn: %v", reject)
+                               })
                                conn.Close()
                        } else {
                                go cl.incomingConnection(conn)
                        }
-                       log.Fmsg("accepted %q connection at %q from %q",
-                               l.Addr().Network(),
-                               conn.LocalAddr(),
-                               conn.RemoteAddr(),
-                       ).SetLevel(log.Debug).Log(cl.logger)
+                       cl.logger.LazyLog(log.Debug, func() log.Msg {
+                               return log.Fmsg("accepted %q connection at %q from %q",
+                                       l.Addr().Network(),
+                                       conn.LocalAddr(),
+                                       conn.RemoteAddr(),
+                               )
+                       })
                        torrent.Add(fmt.Sprintf("accepted conn remote IP len=%d", len(addrIpOrNil(conn.RemoteAddr()))), 1)
                        torrent.Add(fmt.Sprintf("accepted conn network=%s", conn.RemoteAddr().Network()), 1)
                        torrent.Add(fmt.Sprintf("accepted on %s listener", l.Addr().Network()), 1)
@@ -496,7 +556,8 @@ func (cl *Client) acceptConnections(l net.Listener) {
        }
 }
 
-func regularConnString(nc net.Conn) string {
+// Creates the PeerConn.connString for a regular net.Conn PeerConn.
+func regularNetConnPeerConnConnString(nc net.Conn) string {
        return fmt.Sprintf("%s-%s", nc.LocalAddr(), nc.RemoteAddr())
 }
 
@@ -505,16 +566,29 @@ func (cl *Client) incomingConnection(nc net.Conn) {
        if tc, ok := nc.(*net.TCPConn); ok {
                tc.SetLinger(0)
        }
-       c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
-               regularConnString(nc))
+       remoteAddr, _ := tryIpPortFromNetAddr(nc.RemoteAddr())
+       c := cl.newConnection(
+               nc,
+               newConnectionOpts{
+                       outgoing:        false,
+                       remoteAddr:      nc.RemoteAddr(),
+                       localPublicAddr: cl.publicAddr(remoteAddr.IP),
+                       network:         nc.RemoteAddr().Network(),
+                       connString:      regularNetConnPeerConnConnString(nc),
+               })
+       defer func() {
+               cl.lock()
+               defer cl.unlock()
+               c.close()
+       }()
        c.Discovery = PeerSourceIncoming
        cl.runReceivedConn(c)
 }
 
 // Returns a handle to the given torrent, if it's present in the client.
 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
-       cl.lock()
-       defer cl.unlock()
+       cl.rLock()
+       defer cl.rUnlock()
        t, ok = cl.torrents[ih]
        return
 }
@@ -523,9 +597,9 @@ func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
        return cl.torrents[ih]
 }
 
-type dialResult struct {
-       Conn    net.Conn
-       Network string
+type DialResult struct {
+       Conn   net.Conn
+       Dialer Dialer
 }
 
 func countDialResult(err error) {
@@ -536,7 +610,7 @@ func countDialResult(err error) {
        }
 }
 
-func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
+func reducedDialTimeout(minDialTimeout, max time.Duration, halfOpenLimit, pendingPeers int) (ret time.Duration) {
        ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
        if ret < minDialTimeout {
                ret = minDialTimeout
@@ -551,129 +625,71 @@ func (cl *Client) dopplegangerAddr(addr string) bool {
 }
 
 // Returns a connection over UTP or TCP, whichever is first to connect.
-func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) {
-       {
-               t := perf.NewTimer(perf.CallerName(0))
-               defer func() {
-                       if res.Conn == nil {
-                               t.Mark(fmt.Sprintf("returned no conn (context: %v)", ctx.Err()))
-                       } else {
-                               t.Mark("returned conn over " + res.Network)
-                       }
-               }()
+func (cl *Client) dialFirst(ctx context.Context, addr string) (res DialResult) {
+       return DialFirst(ctx, addr, cl.dialers)
+}
+
+// Returns a connection over UTP or TCP, whichever is first to connect.
+func DialFirst(ctx context.Context, addr string, dialers []Dialer) (res DialResult) {
+       pool := dialPool{
+               addr: addr,
        }
-       ctx, cancel := context.WithCancel(ctx)
-       // As soon as we return one connection, cancel the others.
-       defer cancel()
-       left := 0
-       resCh := make(chan dialResult, left)
-       func() {
-               cl.lock()
-               defer cl.unlock()
-               cl.eachDialer(func(s Dialer) bool {
-                       func() {
-                               left++
-                               //cl.logger.Printf("dialing %s on %s/%s", addr, s.Addr().Network(), s.Addr())
-                               go func() {
-                                       resCh <- dialResult{
-                                               cl.dialFromSocket(ctx, s, addr),
-                                               s.LocalAddr().Network(),
-                                       }
-                               }()
-                       }()
-                       return true
-               })
-       }()
-       // Wait for a successful connection.
-       func() {
-               defer perf.ScopeTimer()()
-               for ; left > 0 && res.Conn == nil; left-- {
-                       res = <-resCh
-               }
-       }()
-       // There are still incompleted dials.
-       go func() {
-               for ; left > 0; left-- {
-                       conn := (<-resCh).Conn
-                       if conn != nil {
-                               conn.Close()
-                       }
-               }
-       }()
-       if res.Conn != nil {
-               go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
-       }
-       //if res.Conn != nil {
-       //      cl.logger.Printf("first connection for %s from %s/%s", addr, res.Conn.LocalAddr().Network(), res.Conn.LocalAddr().String())
-       //} else {
-       //      cl.logger.Printf("failed to dial %s", addr)
-       //}
-       return res
-}
-
-func (cl *Client) dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
-       network := s.LocalAddr().Network()
-       cte := cl.config.ConnTracker.Wait(
-               ctx,
-               conntrack.Entry{network, s.LocalAddr().String(), addr},
-               "dial torrent client",
-               0,
-       )
-       // Try to avoid committing to a dial if the context is complete as it's difficult to determine
-       // which dial errors allow us to forget the connection tracking entry handle.
-       if ctx.Err() != nil {
-               if cte != nil {
-                       cte.Forget()
-               }
-               return nil
+       defer pool.startDrainer()
+       for _, _s := range dialers {
+               pool.add(ctx, _s)
        }
+       return pool.getFirst()
+}
+
+func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
        c, err := s.Dial(ctx, addr)
+       if err != nil {
+               log.Levelf(log.Debug, "error dialing %q: %v", addr, err)
+       }
        // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
-       // it now in case we close the connection forthwith.
+       // it now in case we close the connection forthwith. Note this is also done in the TCP dialer
+       // code to increase the chance it's done.
        if tc, ok := c.(*net.TCPConn); ok {
                tc.SetLinger(0)
        }
        countDialResult(err)
-       if c == nil {
-               if err != nil && forgettableDialError(err) {
-                       cte.Forget()
-               } else {
-                       cte.Done()
-               }
-               return nil
-       }
-       return closeWrapper{c, func() error {
-               err := c.Close()
-               cte.Done()
-               return err
-       }}
+       return c
 }
 
-func forgettableDialError(err error) bool {
-       return strings.Contains(err.Error(), "no suitable address found")
+func (cl *Client) noLongerHalfOpen(t *Torrent, addr string, attemptKey outgoingConnAttemptKey) {
+       path := t.getHalfOpenPath(addr, attemptKey)
+       if !path.Exists() {
+               panic("should exist")
+       }
+       path.Delete()
+       cl.numHalfOpen--
+       if cl.numHalfOpen < 0 {
+               panic("should not be possible")
+       }
+       for _, t := range cl.torrents {
+               t.openNewConns()
+       }
 }
 
-func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
-       if _, ok := t.halfOpen[addr]; !ok {
-               panic("invariant broken")
+func (cl *Client) countHalfOpenFromTorrents() (count int) {
+       for _, t := range cl.torrents {
+               count += t.numHalfOpenAttempts()
        }
-       delete(t.halfOpen, addr)
-       t.openNewConns()
+       return
 }
 
-// Performs initiator handshakes and returns a connection. Returns nil *connection if no connection
+// Performs initiator handshakes and returns a connection. Returns nil *PeerConn if no connection
 // for valid reasons.
 func (cl *Client) initiateProtocolHandshakes(
        ctx context.Context,
        nc net.Conn,
        t *Torrent,
-       outgoing, encryptHeader bool,
-       remoteAddr net.Addr,
-       network, connString string,
+       encryptHeader bool,
+       newConnOpts newConnectionOpts,
 ) (
        c *PeerConn, err error,
 ) {
-       c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
+       c = cl.newConnection(nc, newConnOpts)
        c.headerEncrypted = encryptHeader
        ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
        defer cancel()
@@ -689,74 +705,166 @@ func (cl *Client) initiateProtocolHandshakes(
        return
 }
 
-// Returns nil connection and nil error if no connection could be established for valid reasons.
-func (cl *Client) establishOutgoingConnEx(t *Torrent, addr net.Addr, obfuscatedHeader bool) (*PeerConn, error) {
-       dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
-               cl.rLock()
-               defer cl.rUnlock()
-               return t.dialTimeout()
-       }())
-       defer cancel()
-       dr := cl.dialFirst(dialCtx, addr.String())
+func doProtocolHandshakeOnDialResult(
+       t *Torrent,
+       obfuscatedHeader bool,
+       addr PeerRemoteAddr,
+       dr DialResult,
+) (
+       c *PeerConn, err error,
+) {
+       cl := t.cl
        nc := dr.Conn
-       if nc == nil {
-               if dialCtx.Err() != nil {
-                       return nil, xerrors.Errorf("dialing: %w", dialCtx.Err())
-               }
-               return nil, errors.New("dial failed")
-       }
-       c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Network, regularConnString(nc))
+       addrIpPort, _ := tryIpPortFromNetAddr(addr)
+       c, err = cl.initiateProtocolHandshakes(
+               context.Background(), nc, t, obfuscatedHeader,
+               newConnectionOpts{
+                       outgoing:   true,
+                       remoteAddr: addr,
+                       // It would be possible to retrieve a public IP from the dialer used here?
+                       localPublicAddr: cl.publicAddr(addrIpPort.IP),
+                       network:         dr.Dialer.DialerNetwork(),
+                       connString:      regularNetConnPeerConnConnString(nc),
+               })
        if err != nil {
                nc.Close()
        }
        return c, err
 }
 
-// Returns nil connection and nil error if no connection could be established
-// for valid reasons.
-func (cl *Client) establishOutgoingConn(t *Torrent, addr net.Addr) (c *PeerConn, err error) {
+// Returns nil connection and nil error if no connection could be established for valid reasons.
+func (cl *Client) dialAndCompleteHandshake(opts outgoingConnOpts) (c *PeerConn, err error) {
        torrent.Add("establish outgoing connection", 1)
-       obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
-       c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
+       addr := opts.peerInfo.Addr
+       dialPool := dialPool{
+               resCh: make(chan DialResult),
+               addr:  addr.String(),
+       }
+       defer dialPool.startDrainer()
+       dialTimeout := opts.t.getDialTimeoutUnlocked()
+       {
+               ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
+               defer cancel()
+               for _, d := range cl.dialers {
+                       dialPool.add(ctx, d)
+               }
+       }
+       holepunchAddr, holepunchAddrErr := addrPortFromPeerRemoteAddr(addr)
+       if holepunchAddrErr == nil && opts.receivedHolepunchConnect {
+               cl.lock()
+               if g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) {
+                       g.MakeMapIfNilAndSet(
+                               &cl.undialableWithoutHolepunchDialedAfterHolepunchConnect,
+                               holepunchAddr,
+                               struct{}{},
+                       )
+               }
+               cl.unlock()
+       }
+       headerObfuscationPolicy := opts.HeaderObfuscationPolicy
+       obfuscatedHeaderFirst := headerObfuscationPolicy.Preferred
+       firstDialResult := dialPool.getFirst()
+       if firstDialResult.Conn == nil {
+               // No dialers worked. Try to initiate a holepunching rendezvous.
+               if holepunchAddrErr == nil {
+                       cl.lock()
+                       if !opts.receivedHolepunchConnect {
+                               g.MakeMapIfNilAndSet(&cl.undialableWithoutHolepunch, holepunchAddr, struct{}{})
+                       }
+                       opts.t.startHolepunchRendezvous(holepunchAddr)
+                       cl.unlock()
+               }
+               err = fmt.Errorf("all initial dials failed")
+               return
+       }
+       if opts.receivedHolepunchConnect && holepunchAddrErr == nil {
+               cl.lock()
+               if g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) {
+                       g.MakeMapIfNilAndSet(&cl.dialableOnlyAfterHolepunch, holepunchAddr, struct{}{})
+               }
+               g.MakeMapIfNil(&cl.dialedSuccessfullyAfterHolepunchConnect)
+               g.MapInsert(cl.dialedSuccessfullyAfterHolepunchConnect, holepunchAddr, struct{}{})
+               cl.unlock()
+       }
+       c, err = doProtocolHandshakeOnDialResult(
+               opts.t,
+               obfuscatedHeaderFirst,
+               addr,
+               firstDialResult,
+       )
        if err == nil {
                torrent.Add("initiated conn with preferred header obfuscation", 1)
                return
        }
-       //cl.logger.Printf("error establishing connection to %s (obfuscatedHeader=%t): %v", addr, obfuscatedHeaderFirst, err)
-       if cl.config.HeaderObfuscationPolicy.RequirePreferred {
-               // We should have just tried with the preferred header obfuscation. If it was required,
-               // there's nothing else to try.
+       // We should have just tried with the preferred header obfuscation. If it was required, there's nothing else to try.
+       if headerObfuscationPolicy.RequirePreferred {
+               return
+       }
+       // Reuse the dialer that returned already but failed to handshake.
+       {
+               ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
+               defer cancel()
+               dialPool.add(ctx, firstDialResult.Dialer)
+       }
+       secondDialResult := dialPool.getFirst()
+       if secondDialResult.Conn == nil {
                return
        }
-       // Try again with encryption if we didn't earlier, or without if we did.
-       c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
+       c, err = doProtocolHandshakeOnDialResult(
+               opts.t,
+               !obfuscatedHeaderFirst,
+               addr,
+               secondDialResult,
+       )
        if err == nil {
                torrent.Add("initiated conn with fallback header obfuscation", 1)
+               return
        }
-       //cl.logger.Printf("error establishing fallback connection to %v: %v", addr, err)
        return
 }
 
+type outgoingConnOpts struct {
+       peerInfo PeerInfo
+       t        *Torrent
+       // Don't attempt to connect unless a connect message is received after initiating a rendezvous.
+       requireRendezvous bool
+       // Don't send rendezvous requests to eligible relays.
+       skipHolepunchRendezvous bool
+       // Outgoing connection attempt is in response to holepunch connect message.
+       receivedHolepunchConnect bool
+       HeaderObfuscationPolicy  HeaderObfuscationPolicy
+}
+
 // Called to dial out and run a connection. The addr we're given is already
 // considered half-open.
-func (cl *Client) outgoingConnection(t *Torrent, addr net.Addr, ps PeerSource, trusted bool) {
+func (cl *Client) outgoingConnection(
+       opts outgoingConnOpts,
+       attemptKey outgoingConnAttemptKey,
+) {
        cl.dialRateLimiter.Wait(context.Background())
-       c, err := cl.establishOutgoingConn(t, addr)
+       c, err := cl.dialAndCompleteHandshake(opts)
+       if err == nil {
+               c.conn.SetWriteDeadline(time.Time{})
+       }
        cl.lock()
        defer cl.unlock()
-       // Don't release lock between here and addConnection, unless it's for
-       // failure.
-       cl.noLongerHalfOpen(t, addr.String())
+       // Don't release lock between here and addPeerConn, unless it's for failure.
+       cl.noLongerHalfOpen(opts.t, opts.peerInfo.Addr.String(), attemptKey)
        if err != nil {
                if cl.config.Debug {
-                       cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
+                       cl.logger.Levelf(
+                               log.Debug,
+                               "error establishing outgoing connection to %v: %v",
+                               opts.peerInfo.Addr,
+                               err,
+                       )
                }
                return
        }
        defer c.close()
-       c.Discovery = ps
-       c.trusted = trusted
-       t.runHandshookConnLoggingErr(c)
+       c.Discovery = opts.peerInfo.Source
+       c.trusted = opts.peerInfo.Trusted
+       opts.t.runHandshookConnLoggingErr(c)
 }
 
 // The port number for incoming peer connections. 0 if the client isn't listening.
@@ -779,12 +887,12 @@ func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
                )
                c.setRW(rw)
                if err != nil {
-                       return xerrors.Errorf("header obfuscation handshake: %w", err)
+                       return fmt.Errorf("header obfuscation handshake: %w", err)
                }
        }
        ih, err := cl.connBtHandshake(c, &t.infoHash)
        if err != nil {
-               return xerrors.Errorf("bittorrent protocol handshake: %w", err)
+               return fmt.Errorf("bittorrent protocol handshake: %w", err)
        }
        if ih != t.infoHash {
                return errors.New("bittorrent protocol handshake: peer infohash didn't match")
@@ -792,10 +900,11 @@ func (cl *Client) initiateHandshakes(c *PeerConn, t *Torrent) error {
        return nil
 }
 
-// Calls f with any secret keys.
+// Calls f with any secret keys. Note that it takes the Client lock, and so must be used from code
+// that won't also try to take the lock. This saves us copying all the infohashes everytime.
 func (cl *Client) forSkeys(f func([]byte) bool) {
-       cl.lock()
-       defer cl.unlock()
+       cl.rLock()
+       defer cl.rUnlock()
        if false { // Emulate the bug from #114
                var firstIh InfoHash
                for ih := range cl.torrents {
@@ -816,11 +925,18 @@ func (cl *Client) forSkeys(f func([]byte) bool) {
        }
 }
 
+func (cl *Client) handshakeReceiverSecretKeys() mse.SecretKeyIter {
+       if ret := cl.config.Callbacks.ReceiveEncryptedHandshakeSkeys; ret != nil {
+               return ret
+       }
+       return cl.forSkeys
+}
+
 // Do encryption and bittorrent handshakes as receiver.
 func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
        defer perf.ScopeTimerErr(&err)()
        var rw io.ReadWriter
-       rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
+       rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.handshakeReceiverSecretKeys(), cl.config.HeaderObfuscationPolicy, cl.config.CryptoSelector)
        c.setRW(rw)
        if err == nil || err == mse.ErrNoSecretKeyMatch {
                if c.headerEncrypted {
@@ -838,13 +954,12 @@ func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
                return
        }
        if cl.config.HeaderObfuscationPolicy.RequirePreferred && c.headerEncrypted != cl.config.HeaderObfuscationPolicy.Preferred {
-               err = errors.New("connection not have required header obfuscation")
+               err = errors.New("connection does not have required header obfuscation")
                return
        }
        ih, err := cl.connBtHandshake(c, nil)
        if err != nil {
-               err = xerrors.Errorf("during bt handshake: %w", err)
-               return
+               return nil, fmt.Errorf("during bt handshake: %w", err)
        }
        cl.lock()
        t = cl.torrents[ih]
@@ -852,15 +967,28 @@ func (cl *Client) receiveHandshakes(c *PeerConn) (t *Torrent, err error) {
        return
 }
 
+var successfulPeerWireProtocolHandshakePeerReservedBytes expvar.Map
+
+func init() {
+       torrent.Set(
+               "successful_peer_wire_protocol_handshake_peer_reserved_bytes",
+               &successfulPeerWireProtocolHandshakePeerReservedBytes)
+}
+
 func (cl *Client) connBtHandshake(c *PeerConn, ih *metainfo.Hash) (ret metainfo.Hash, err error) {
        res, err := pp.Handshake(c.rw(), ih, cl.peerID, cl.config.Extensions)
        if err != nil {
                return
        }
+       successfulPeerWireProtocolHandshakePeerReservedBytes.Add(
+               hex.EncodeToString(res.PeerExtensionBits[:]), 1)
        ret = res.Hash
        c.PeerExtensionBytes = res.PeerExtensionBits
        c.PeerID = res.PeerID
        c.completedHandshake = time.Now()
+       if cb := cl.config.Callbacks.CompletedHandshake; cb != nil {
+               cb(c, res.Hash)
+       }
        return
 }
 
@@ -871,27 +999,31 @@ func (cl *Client) runReceivedConn(c *PeerConn) {
        }
        t, err := cl.receiveHandshakes(c)
        if err != nil {
-               log.Fmsg(
-                       "error receiving handshakes on %v: %s", c, err,
-               ).SetLevel(log.Debug).
-                       Add(
-                               "network", c.network,
-                       ).Log(cl.logger)
+               cl.logger.LazyLog(log.Debug, func() log.Msg {
+                       return log.Fmsg(
+                               "error receiving handshakes on %v: %s", c, err,
+                       ).Add(
+                               "network", c.Network,
+                       )
+               })
                torrent.Add("error receiving handshake", 1)
                cl.lock()
-               cl.onBadAccept(c.remoteAddr)
+               cl.onBadAccept(c.RemoteAddr)
                cl.unlock()
                return
        }
        if t == nil {
                torrent.Add("received handshake for unloaded torrent", 1)
-               log.Fmsg("received handshake for unloaded torrent").SetLevel(log.Debug).Log(cl.logger)
+               cl.logger.LazyLog(log.Debug, func() log.Msg {
+                       return log.Fmsg("received handshake for unloaded torrent")
+               })
                cl.lock()
-               cl.onBadAccept(c.remoteAddr)
+               cl.onBadAccept(c.RemoteAddr)
                cl.unlock()
                return
        }
        torrent.Add("received handshake for loaded torrent", 1)
+       c.conn.SetWriteDeadline(time.Time{})
        cl.lock()
        defer cl.unlock()
        t.runHandshookConnLoggingErr(c)
@@ -900,30 +1032,36 @@ func (cl *Client) runReceivedConn(c *PeerConn) {
 // Client lock must be held before entering this.
 func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
        c.setTorrent(t)
+       for i, b := range cl.config.MinPeerExtensions {
+               if c.PeerExtensionBytes[i]&b != b {
+                       return fmt.Errorf("peer did not meet minimum peer extensions: %x", c.PeerExtensionBytes[:])
+               }
+       }
        if c.PeerID == cl.peerID {
                if c.outgoing {
                        connsToSelf.Add(1)
-                       addr := c.conn.RemoteAddr().String()
+                       addr := c.RemoteAddr.String()
                        cl.dopplegangerAddrs[addr] = struct{}{}
-               } else {
+               } /* else {
                        // Because the remote address is not necessarily the same as its client's torrent listen
                        // address, we won't record the remote address as a doppleganger. Instead, the initiator
                        // can record *us* as the doppleganger.
-               }
-               return errors.New("local and remote peer ids are the same")
+               } */
+               t.logger.Levelf(log.Debug, "local and remote peer ids are the same")
+               return nil
        }
-       c.conn.SetWriteDeadline(time.Time{})
        c.r = deadlineReader{c.conn, c.r}
-       completedHandshakeConnectionFlags.Add(c.ConnectionFlags(), 1)
+       completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
        if connIsIpv6(c.conn) {
                torrent.Add("completed handshake over ipv6", 1)
        }
-       if err := t.addConnection(c); err != nil {
+       if err := t.addPeerConn(c); err != nil {
                return fmt.Errorf("adding connection: %w", err)
        }
        defer t.dropConnection(c)
-       go c.writer(time.Minute)
+       c.startMessageWriter()
        cl.sendInitialMessages(c, t)
+       c.initUpdateRequestsTimer()
        err := c.mainReadLoop()
        if err != nil {
                return fmt.Errorf("main read loop: %w", err)
@@ -931,25 +1069,62 @@ func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
        return nil
 }
 
+func (p *Peer) initUpdateRequestsTimer() {
+       if check.Enabled {
+               if p.updateRequestsTimer != nil {
+                       panic(p.updateRequestsTimer)
+               }
+       }
+       if enableUpdateRequestsTimer {
+               p.updateRequestsTimer = time.AfterFunc(math.MaxInt64, p.updateRequestsTimerFunc)
+       }
+}
+
+const peerUpdateRequestsTimerReason = "updateRequestsTimer"
+
+func (c *Peer) updateRequestsTimerFunc() {
+       c.locker().Lock()
+       defer c.locker().Unlock()
+       if c.closed.IsSet() {
+               return
+       }
+       if c.isLowOnRequests() {
+               // If there are no outstanding requests, then a request update should have already run.
+               return
+       }
+       if d := time.Since(c.lastRequestUpdate); d < updateRequestsTimerDuration {
+               // These should be benign, Timer.Stop doesn't guarantee that its function won't run if it's
+               // already been fired.
+               torrent.Add("spurious timer requests updates", 1)
+               return
+       }
+       c.updateRequests(peerUpdateRequestsTimerReason)
+}
+
+// Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
+// instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
+// (1<<19) cached for sending, for 16KiB (1<<14) chunks.
+const localClientReqq = 1024
+
 // See the order given in Transmission's tr_peerMsgsNew.
 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
        if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
-               conn.post(pp.Message{
+               conn.write(pp.Message{
                        Type:       pp.Extended,
                        ExtendedID: pp.HandshakeExtendedID,
                        ExtendedPayload: func() []byte {
                                msg := pp.ExtendedHandshakeMessage{
                                        M: map[pp.ExtensionName]pp.ExtensionNumber{
-                                               pp.ExtensionNameMetadata: metadataExtendedId,
+                                               pp.ExtensionNameMetadata:  metadataExtendedId,
+                                               utHolepunch.ExtensionName: utHolepunchExtendedId,
                                        },
                                        V:            cl.config.ExtendedHandshakeClientVersion,
-                                       Reqq:         64, // TODO: Really?
-                                       YourIp:       pp.CompactIp(addrIpOrNil(conn.remoteAddr)),
+                                       Reqq:         localClientReqq,
+                                       YourIp:       pp.CompactIp(conn.remoteIp()),
                                        Encryption:   cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
                                        Port:         cl.incomingPeerPort(),
                                        MetadataSize: torrent.metadataSize(),
-                                       // TODO: We can figured these out specific to the socket
-                                       // used.
+                                       // TODO: We can figure these out specific to the socket used.
                                        Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
                                        Ipv6: cl.config.PublicIp6.To16(),
                                }
@@ -963,11 +1138,11 @@ func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
        func() {
                if conn.fastEnabled() {
                        if torrent.haveAllPieces() {
-                               conn.post(pp.Message{Type: pp.HaveAll})
-                               conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces()))
+                               conn.write(pp.Message{Type: pp.HaveAll})
+                               conn.sentHaves.AddRange(0, bitmap.BitRange(conn.t.NumPieces()))
                                return
                        } else if !torrent.haveAnyPieces() {
-                               conn.post(pp.Message{Type: pp.HaveNone})
+                               conn.write(pp.Message{Type: pp.HaveNone})
                                conn.sentHaves.Clear()
                                return
                        }
@@ -975,7 +1150,7 @@ func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
                conn.postBitfield()
        }()
        if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
-               conn.post(pp.Message{
+               conn.write(pp.Message{
                        Type: pp.Port,
                        Port: cl.dhtPort(),
                })
@@ -983,54 +1158,55 @@ func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
 }
 
 func (cl *Client) dhtPort() (ret uint16) {
-       cl.eachDhtServer(func(s DhtServer) {
-               ret = uint16(missinggo.AddrPort(s.Addr()))
-       })
-       return
+       if len(cl.dhtServers) == 0 {
+               return
+       }
+       return uint16(missinggo.AddrPort(cl.dhtServers[len(cl.dhtServers)-1].Addr()))
 }
 
-func (cl *Client) haveDhtServer() (ret bool) {
-       cl.eachDhtServer(func(_ DhtServer) {
-               ret = true
-       })
-       return
+func (cl *Client) haveDhtServer() bool {
+       return len(cl.dhtServers) > 0
 }
 
 // Process incoming ut_metadata message.
 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerConn) error {
-       var d map[string]int
+       var d pp.ExtendedMetadataRequestMsg
        err := bencode.Unmarshal(payload, &d)
        if _, ok := err.(bencode.ErrUnusedTrailingBytes); ok {
        } else if err != nil {
                return fmt.Errorf("error unmarshalling bencode: %s", err)
        }
-       msgType, ok := d["msg_type"]
-       if !ok {
-               return errors.New("missing msg_type field")
-       }
-       piece := d["piece"]
-       switch msgType {
+       piece := d.Piece
+       switch d.Type {
        case pp.DataMetadataExtensionMsgType:
                c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.MetadataChunksRead }))
                if !c.requestedMetadataPiece(piece) {
                        return fmt.Errorf("got unexpected piece %d", piece)
                }
                c.metadataRequests[piece] = false
-               begin := len(payload) - metadataPieceSize(d["total_size"], piece)
+               begin := len(payload) - d.PieceSize()
                if begin < 0 || begin >= len(payload) {
                        return fmt.Errorf("data has bad offset in payload: %d", begin)
                }
                t.saveMetadataPiece(piece, payload[begin:])
                c.lastUsefulChunkReceived = time.Now()
-               return t.maybeCompleteMetadata()
+               err = t.maybeCompleteMetadata()
+               if err != nil {
+                       // Log this at the Torrent-level, as we don't partition metadata by Peer yet, so we
+                       // don't know who to blame. TODO: Also errors can be returned here that aren't related
+                       // to verifying metadata, which should be fixed. This should be tagged with metadata, so
+                       // log consumers can filter for this message.
+                       t.logger.WithDefaultLevel(log.Warning).Printf("error completing metadata: %v", err)
+               }
+               return err
        case pp.RequestMetadataExtensionMsgType:
                if !t.haveMetadataPiece(piece) {
-                       c.post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
+                       c.write(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d.Piece, nil))
                        return nil
                }
                start := (1 << 14) * piece
-               c.logger.Printf("sending metadata piece %d", piece)
-               c.post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
+               c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
+               c.write(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
                return nil
        case pp.RejectMetadataExtensionMsgType:
                return nil
@@ -1039,15 +1215,16 @@ func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerCon
        }
 }
 
-func (cl *Client) badPeerAddr(addr net.Addr) bool {
+func (cl *Client) badPeerAddr(addr PeerRemoteAddr) bool {
        if ipa, ok := tryIpPortFromNetAddr(addr); ok {
                return cl.badPeerIPPort(ipa.IP, ipa.Port)
        }
        return false
 }
 
+// Returns whether the IP address and port are considered "bad".
 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
-       if port == 0 {
+       if port == 0 || ip == nil {
                return true
        }
        if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
@@ -1056,7 +1233,11 @@ func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
        if _, ok := cl.ipBlockRange(ip); ok {
                return true
        }
-       if _, ok := cl.badPeerIPs[ip.String()]; ok {
+       ipAddr, ok := netip.AddrFromSlice(ip)
+       if !ok {
+               panic(ip)
+       }
+       if _, ok := cl.badPeerIPs[ipAddr]; ok {
                return true
        }
        return false
@@ -1064,41 +1245,50 @@ func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
 
 // Return a Torrent ready for insertion into a Client.
 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
+       return cl.newTorrentOpt(AddTorrentOpts{
+               InfoHash: ih,
+               Storage:  specStorage,
+       })
+}
+
+// Return a Torrent ready for insertion into a Client.
+func (cl *Client) newTorrentOpt(opts AddTorrentOpts) (t *Torrent) {
        // use provided storage, if provided
        storageClient := cl.defaultStorage
-       if specStorage != nil {
-               storageClient = storage.NewClient(specStorage)
+       if opts.Storage != nil {
+               storageClient = storage.NewClient(opts.Storage)
        }
 
        t = &Torrent{
                cl:       cl,
-               infoHash: ih,
+               infoHash: opts.InfoHash,
                peers: prioritizedPeers{
-                       om: btree.New(32),
+                       om: gbtree.New(32),
                        getPrio: func(p PeerInfo) peerPriority {
-                               return bep40PriorityIgnoreError(cl.publicAddr(addrIpOrNil(p.Addr)), p.addr())
+                               ipPort := p.addr()
+                               return bep40PriorityIgnoreError(cl.publicAddr(ipPort.IP), ipPort)
                        },
                },
                conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
 
-               halfOpen:          make(map[string]PeerInfo),
-               pieceStateChanges: pubsub.NewPubSub(),
-
                storageOpener:       storageClient,
                maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
 
-               networkingEnabled: true,
                metadataChanged: sync.Cond{
                        L: cl.locker(),
                },
-               webSeeds: make(map[string]*peer),
-       }
-       t._pendingPieces.NewSet = priorityBitmapStableNewSet
-       t.requestStrategy = cl.config.DefaultRequestStrategy(t.requestStrategyCallbacks(), &cl._mu)
-       t.logger = cl.logger.WithValues(t).WithText(func(m log.Msg) string {
-               return fmt.Sprintf("%v: %s", t, m.Text())
-       })
-       t.setChunkSize(defaultChunkSize)
+               webSeeds:     make(map[string]*Peer),
+               gotMetainfoC: make(chan struct{}),
+       }
+       t.smartBanCache.Hash = sha1.Sum
+       t.smartBanCache.Init()
+       t.networkingEnabled.Set()
+       t.logger = cl.logger.WithContextValue(t).WithNames("torrent", t.infoHash.HexString()).WithDefaultLevel(log.Debug)
+       t.sourcesLogger = t.logger.WithNames("sources")
+       if opts.ChunkSize == 0 {
+               opts.ChunkSize = defaultChunkSize
+       }
+       t.setChunkSize(opts.ChunkSize)
        return
 }
 
@@ -1128,7 +1318,9 @@ func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStor
 
        t = cl.newTorrent(infoHash, specStorage)
        cl.eachDhtServer(func(s DhtServer) {
-               go t.dhtAnnouncer(s)
+               if cl.config.PeriodicallyAnnounceTorrentsToDht {
+                       go t.dhtAnnouncer(s)
+               }
        })
        cl.torrents[infoHash] = t
        cl.clearAcceptLimits()
@@ -1138,43 +1330,107 @@ func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStor
        return
 }
 
-// Add or merge a torrent spec. If the torrent is already present, the
-// trackers will be merged with the existing ones. If the Info isn't yet
-// known, it will be set. The display name is replaced if the new spec
-// provides one. Returns new if the torrent wasn't already in the client.
-// Note that any `Storage` defined on the spec will be ignored if the
-// torrent is already present (i.e. `new` return value is `true`)
+// Adds a torrent by InfoHash with a custom Storage implementation. If the torrent already exists
+// then this Storage is ignored and the existing torrent returned with `new` set to `false`.
+func (cl *Client) AddTorrentOpt(opts AddTorrentOpts) (t *Torrent, new bool) {
+       infoHash := opts.InfoHash
+       cl.lock()
+       defer cl.unlock()
+       t, ok := cl.torrents[infoHash]
+       if ok {
+               return
+       }
+       new = true
+
+       t = cl.newTorrentOpt(opts)
+       cl.eachDhtServer(func(s DhtServer) {
+               if cl.config.PeriodicallyAnnounceTorrentsToDht {
+                       go t.dhtAnnouncer(s)
+               }
+       })
+       cl.torrents[infoHash] = t
+       t.setInfoBytesLocked(opts.InfoBytes)
+       cl.clearAcceptLimits()
+       t.updateWantPeersEvent()
+       // Tickle Client.waitAccept, new torrent may want conns.
+       cl.event.Broadcast()
+       return
+}
+
+type AddTorrentOpts struct {
+       InfoHash  infohash.T
+       Storage   storage.ClientImpl
+       ChunkSize pp.Integer
+       InfoBytes []byte
+}
+
+// Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
+// Torrent.MergeSpec.
 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
-       t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
+       t, new = cl.AddTorrentOpt(AddTorrentOpts{
+               InfoHash:  spec.InfoHash,
+               Storage:   spec.Storage,
+               ChunkSize: spec.ChunkSize,
+       })
+       modSpec := *spec
+       if new {
+               // ChunkSize was already applied by adding a new Torrent, and MergeSpec disallows changing
+               // it.
+               modSpec.ChunkSize = 0
+       }
+       err = t.MergeSpec(&modSpec)
+       if err != nil && new {
+               t.Drop()
+       }
+       return
+}
+
+// The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
+// spec.DisallowDataDownload/Upload will be read and applied
+// The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
+func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
        if spec.DisplayName != "" {
                t.SetDisplayName(spec.DisplayName)
        }
        if spec.InfoBytes != nil {
-               err = t.SetInfoBytes(spec.InfoBytes)
+               err := t.SetInfoBytes(spec.InfoBytes)
                if err != nil {
-                       return
+                       return err
                }
        }
+       cl := t.cl
+       cl.AddDhtNodes(spec.DhtNodes)
+       t.UseSources(spec.Sources)
        cl.lock()
        defer cl.unlock()
+       t.initialPieceCheckDisabled = spec.DisableInitialPieceCheck
+       for _, url := range spec.Webseeds {
+               t.addWebSeed(url)
+       }
+       for _, peerAddr := range spec.PeerAddrs {
+               t.addPeer(PeerInfo{
+                       Addr:    StringAddr(peerAddr),
+                       Source:  PeerSourceDirect,
+                       Trusted: true,
+               })
+       }
        if spec.ChunkSize != 0 {
-               t.setChunkSize(pp.Integer(spec.ChunkSize))
+               panic("chunk size cannot be changed for existing Torrent")
        }
        t.addTrackers(spec.Trackers)
        t.maybeNewConns()
-       return
+       t.dataDownloadDisallowed.SetBool(spec.DisallowDataDownload)
+       t.dataUploadDisallowed = spec.DisallowDataUpload
+       return nil
 }
 
-func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
+func (cl *Client) dropTorrent(infoHash metainfo.Hash, wg *sync.WaitGroup) (err error) {
        t, ok := cl.torrents[infoHash]
        if !ok {
                err = fmt.Errorf("no such torrent")
                return
        }
-       err = t.close()
-       if err != nil {
-               panic(err)
-       }
+       err = t.close(wg)
        delete(cl.torrents, infoHash)
        return
 }
@@ -1207,8 +1463,8 @@ func (cl *Client) WaitAll() bool {
 
 // Returns handles to all the torrents loaded in the Client.
 func (cl *Client) Torrents() []*Torrent {
-       cl.lock()
-       defer cl.unlock()
+       cl.rLock()
+       defer cl.rUnlock()
        return cl.torrentsAsSlice()
 }
 
@@ -1220,7 +1476,7 @@ func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
 }
 
 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
-       spec, err := TorrentSpecFromMagnetURI(uri)
+       spec, err := TorrentSpecFromMagnetUri(uri)
        if err != nil {
                return
        }
@@ -1229,13 +1485,11 @@ func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
 }
 
 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
-       T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
-       var ss []string
-       slices.MakeInto(&ss, mi.Nodes)
-       cl.AddDHTNodes(ss)
-       for _, url := range mi.UrlList {
-               T.addWebSeed(url)
+       ts, err := TorrentSpecFromMetaInfoErr(mi)
+       if err != nil {
+               return
        }
+       T, _, err = cl.AddTorrentSpec(ts)
        return
 }
 
@@ -1251,7 +1505,7 @@ func (cl *Client) DhtServers() []DhtServer {
        return cl.dhtServers
 }
 
-func (cl *Client) AddDHTNodes(nodes []string) {
+func (cl *Client) AddDhtNodes(nodes []string) {
        for _, n := range nodes {
                hmp := missinggo.SplitHostMaybePort(n)
                ip := net.ParseIP(hmp.Host)
@@ -1272,39 +1526,75 @@ func (cl *Client) AddDHTNodes(nodes []string) {
 }
 
 func (cl *Client) banPeerIP(ip net.IP) {
-       cl.logger.Printf("banning ip %v", ip)
-       if cl.badPeerIPs == nil {
-               cl.badPeerIPs = make(map[string]struct{})
+       // We can't take this from string, because it will lose netip's v4on6. net.ParseIP parses v4
+       // addresses directly to v4on6, which doesn't compare equal with v4.
+       ipAddr, ok := netip.AddrFromSlice(ip)
+       if !ok {
+               panic(ip)
+       }
+       g.MakeMapIfNilAndSet(&cl.badPeerIPs, ipAddr, struct{}{})
+       for _, t := range cl.torrents {
+               t.iterPeers(func(p *Peer) {
+                       if p.remoteIp().Equal(ip) {
+                               t.logger.Levelf(log.Warning, "dropping peer %v with banned ip %v", p, ip)
+                               // Should this be a close?
+                               p.drop()
+                       }
+               })
        }
-       cl.badPeerIPs[ip.String()] = struct{}{}
 }
 
-func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr net.Addr, network, connString string) (c *PeerConn) {
+type newConnectionOpts struct {
+       outgoing        bool
+       remoteAddr      PeerRemoteAddr
+       localPublicAddr peerLocalPublicAddr
+       network         string
+       connString      string
+}
+
+func (cl *Client) newConnection(nc net.Conn, opts newConnectionOpts) (c *PeerConn) {
+       if opts.network == "" {
+               panic(opts.remoteAddr)
+       }
        c = &PeerConn{
-               peer: peer{
-                       outgoing:        outgoing,
+               Peer: Peer{
+                       outgoing:        opts.outgoing,
                        choking:         true,
                        peerChoking:     true,
                        PeerMaxRequests: 250,
 
-                       remoteAddr: remoteAddr,
-                       network:    network,
-                       connString: connString,
+                       RemoteAddr:      opts.remoteAddr,
+                       localPublicAddr: opts.localPublicAddr,
+                       Network:         opts.network,
+                       callbacks:       &cl.config.Callbacks,
                },
-               conn:        nc,
-               writeBuffer: new(bytes.Buffer),
+               connString: opts.connString,
+               conn:       nc,
+       }
+       c.peerRequestDataAllocLimiter.Max = cl.config.MaxAllocPeerRequestDataPerConn
+       c.initRequestState()
+       // TODO: Need to be much more explicit about this, including allowing non-IP bannable addresses.
+       if opts.remoteAddr != nil {
+               netipAddrPort, err := netip.ParseAddrPort(opts.remoteAddr.String())
+               if err == nil {
+                       c.bannableAddr = Some(netipAddrPort.Addr())
+               }
        }
-       c.PeerImpl = c
-       c.logger = cl.logger.WithValues(c).WithDefaultLevel(log.Debug).WithText(func(m log.Msg) string {
-               return fmt.Sprintf("%v: %s", c, m.Text())
-       })
-       c.writerCond.L = cl.locker()
+       c.peerImpl = c
+       c.logger = cl.logger.WithDefaultLevel(log.Warning)
        c.setRW(connStatsReadWriter{nc, c})
        c.r = &rateLimitedReader{
                l: cl.config.DownloadRateLimiter,
                r: c.r,
        }
-       c.logger.Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
+       c.logger.Levelf(
+               log.Debug,
+               "new PeerConn %p [Client %p remoteAddr %v network %v outgoing %t]",
+               c, cl, opts.remoteAddr, opts.network, opts.outgoing,
+       )
+       for _, f := range cl.config.Callbacks.NewPeer {
+               f(&c.Peer)
+       }
        return
 }
 
@@ -1330,14 +1620,6 @@ func firstNotNil(ips ...net.IP) net.IP {
        return nil
 }
 
-func (cl *Client) eachDialer(f func(Dialer) bool) {
-       for _, s := range cl.dialers {
-               if !f(s) {
-                       break
-               }
-       }
-}
-
 func (cl *Client) eachListener(f func(Listener) bool) {
        for _, s := range cl.listeners {
                if !f(s) {
@@ -1346,12 +1628,13 @@ func (cl *Client) eachListener(f func(Listener) bool) {
        }
 }
 
-func (cl *Client) findListener(f func(net.Listener) bool) (ret net.Listener) {
-       cl.eachListener(func(l Listener) bool {
-               ret = l
-               return !f(l)
-       })
-       return
+func (cl *Client) findListener(f func(Listener) bool) (ret Listener) {
+       for i := 0; i < len(cl.listeners); i += 1 {
+               if ret = cl.listeners[i]; f(ret) {
+                       return
+               }
+       }
+       return nil
 }
 
 func (cl *Client) publicIp(peer net.IP) net.IP {
@@ -1371,7 +1654,7 @@ func (cl *Client) publicIp(peer net.IP) net.IP {
 
 func (cl *Client) findListenerIp(f func(net.IP) bool) net.IP {
        l := cl.findListener(
-               func(l net.Listener) bool {
+               func(l Listener) bool {
                        return f(addrIpOrNil(l.Addr()))
                },
        )
@@ -1389,15 +1672,25 @@ func (cl *Client) publicAddr(peer net.IP) IpPort {
 // ListenAddrs addresses currently being listened to.
 func (cl *Client) ListenAddrs() (ret []net.Addr) {
        cl.lock()
-       defer cl.unlock()
-       cl.eachListener(func(l Listener) bool {
-               ret = append(ret, l.Addr())
-               return true
-       })
+       ret = make([]net.Addr, len(cl.listeners))
+       for i := 0; i < len(cl.listeners); i += 1 {
+               ret[i] = cl.listeners[i].Addr()
+       }
+       cl.unlock()
        return
 }
 
-func (cl *Client) onBadAccept(addr net.Addr) {
+func (cl *Client) PublicIPs() (ips []net.IP) {
+       if ip := cl.config.PublicIp4; ip != nil {
+               ips = append(ips, ip)
+       }
+       if ip := cl.config.PublicIp6; ip != nil {
+               ips = append(ips, ip)
+       }
+       return
+}
+
+func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
        ipa, ok := tryIpPortFromNetAddr(addr)
        if !ok {
                return
@@ -1423,7 +1716,7 @@ func (cl *Client) clearAcceptLimits() {
 func (cl *Client) acceptLimitClearer() {
        for {
                select {
-               case <-cl.closed.LockedChan(cl.locker()):
+               case <-cl.closed.Done():
                        return
                case <-time.After(15 * time.Minute):
                        cl.lock()
@@ -1463,3 +1756,15 @@ func (cl *Client) locker() *lockWithDeferreds {
 func (cl *Client) String() string {
        return fmt.Sprintf("<%[1]T %[1]p>", cl)
 }
+
+// Returns connection-level aggregate connStats at the Client level. See the comment on
+// TorrentStats.ConnStats.
+func (cl *Client) ConnStats() ConnStats {
+       return cl.connStats.Copy()
+}
+
+func (cl *Client) Stats() ClientStats {
+       cl.rLock()
+       defer cl.rUnlock()
+       return cl.statsLocked()
+}