"net/netip"
"sort"
"strconv"
- "strings"
"time"
+ "github.com/anacrolix/torrent/internal/panicif"
+
"github.com/anacrolix/chansync"
"github.com/anacrolix/chansync/events"
"github.com/anacrolix/dht/v2"
"github.com/anacrolix/dht/v2/krpc"
- g "github.com/anacrolix/generics"
. "github.com/anacrolix/generics"
+ g "github.com/anacrolix/generics"
"github.com/anacrolix/log"
"github.com/anacrolix/missinggo/perf"
"github.com/anacrolix/missinggo/v2"
"golang.org/x/time/rate"
"github.com/anacrolix/torrent/bencode"
+ "github.com/anacrolix/torrent/internal/check"
"github.com/anacrolix/torrent/internal/limiter"
"github.com/anacrolix/torrent/iplist"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/mse"
pp "github.com/anacrolix/torrent/peer_protocol"
+ utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch"
request_strategy "github.com/anacrolix/torrent/request-strategy"
"github.com/anacrolix/torrent/storage"
"github.com/anacrolix/torrent/tracker"
type Client struct {
// An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
// fields. See #262.
- stats ConnStats
+ connStats ConnStats
_mu lockWithDeferreds
event sync.Cond
activeAnnounceLimiter limiter.Instance
httpClient *http.Client
+
+ undialableWithoutHolepunch map[netip.AddrPort]struct{}
+ undialableWithoutHolepunchDialAttemptedAfterHolepunchConnect map[netip.AddrPort]struct{}
+ dialableOnlyAfterHolepunch map[netip.AddrPort]struct{}
}
type ipStr string
fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
writeDhtServerStatus(w, s)
})
- spew.Fdump(w, &cl.stats)
+ dumpStats(w, cl.statsLocked())
torrentsSlice := cl.torrentsAsSlice()
fmt.Fprintf(w, "# Torrents: %d\n", len(torrentsSlice))
fmt.Fprintln(w)
res = <-resCh
}
}()
- // There are still incompleted dials.
+ // There are still uncompleted dials.
go func() {
for ; left > 0; left-- {
conn := (<-resCh).Conn
func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
c, err := s.Dial(ctx, addr)
+ if err != nil {
+ log.Levelf(log.Debug, "error dialing %q: %v", addr, err)
+ }
// This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
- // it now in case we close the connection forthwith.
+ // it now in case we close the connection forthwith. Note this is also done in the TCP dialer
+ // code to increase the chance it's done.
if tc, ok := c.(*net.TCPConn); ok {
tc.SetLinger(0)
}
return c
}
-func forgettableDialError(err error) bool {
- return strings.Contains(err.Error(), "no suitable address found")
-}
-
-func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
- if _, ok := t.halfOpen[addr]; !ok {
- panic("invariant broken")
+func (cl *Client) noLongerHalfOpen(t *Torrent, addr string, attemptKey outgoingConnAttemptKey) {
+ path := t.getHalfOpenPath(addr, attemptKey)
+ if !path.Exists() {
+ panic("should exist")
}
- delete(t.halfOpen, addr)
+ path.Delete()
cl.numHalfOpen--
+ if cl.numHalfOpen < 0 {
+ panic("should not be possible")
+ }
for _, t := range cl.torrents {
t.openNewConns()
}
}
+func (cl *Client) countHalfOpenFromTorrents() (count int) {
+ for _, t := range cl.torrents {
+ count += t.numHalfOpenAttempts()
+ }
+ return
+}
+
// Performs initiator handshakes and returns a connection. Returns nil *PeerConn if no connection
// for valid reasons.
func (cl *Client) initiateProtocolHandshakes(
return
}
+func (cl *Client) waitForRendezvousConnect(ctx context.Context, rz *utHolepunchRendezvous) error {
+ for {
+ switch {
+ case rz.gotConnect.IsSet():
+ return nil
+ case len(rz.relays) == 0:
+ return errors.New("all relays failed")
+ case ctx.Err() != nil:
+ return context.Cause(ctx)
+ }
+ relayCond := rz.relayCond.Signaled()
+ cl.unlock()
+ select {
+ case <-rz.gotConnect.Done():
+ case <-relayCond:
+ case <-ctx.Done():
+ }
+ cl.lock()
+ }
+}
+
+// Returns nil connection and nil error if no connection could be established for valid reasons.
+func (cl *Client) initiateRendezvousConnect(
+ t *Torrent, holepunchAddr netip.AddrPort,
+) (ok bool, err error) {
+ cl.lock()
+ defer cl.unlock()
+ rz, err := t.startHolepunchRendezvous(holepunchAddr)
+ if err != nil {
+ return
+ }
+ if rz == nil {
+ return
+ }
+ ok = true
+ ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+ defer cancel()
+ err = cl.waitForRendezvousConnect(ctx, rz)
+ delete(t.utHolepunchRendezvous, holepunchAddr)
+ if err != nil {
+ err = fmt.Errorf("waiting for rendezvous connect signal: %w", err)
+ }
+ return
+}
+
// Returns nil connection and nil error if no connection could be established for valid reasons.
-func (cl *Client) establishOutgoingConnEx(t *Torrent, addr PeerRemoteAddr, obfuscatedHeader bool) (*PeerConn, error) {
+func (cl *Client) establishOutgoingConnEx(
+ opts outgoingConnOpts,
+ obfuscatedHeader bool,
+) (
+ _ *PeerConn, err error,
+) {
+ t := opts.t
+ addr := opts.addr
+ holepunchAddr, err := addrPortFromPeerRemoteAddr(addr)
+ var sentRendezvous bool
+ if err == nil {
+ if !opts.skipHolepunchRendezvous {
+ sentRendezvous, err = cl.initiateRendezvousConnect(t, holepunchAddr)
+ if err != nil {
+ err = fmt.Errorf("initiating rendezvous connect: %w", err)
+ }
+ }
+ }
+ gotHolepunchConnect := (err == nil && sentRendezvous) || opts.receivedHolepunchConnect
+ if opts.requireRendezvous && !sentRendezvous {
+ return nil, err
+ }
+ if err != nil {
+ t.logger.Print(err)
+ }
dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
cl.rLock()
defer cl.rUnlock()
defer cancel()
dr := cl.dialFirst(dialCtx, addr.String())
nc := dr.Conn
+ cl.lock()
+ if gotHolepunchConnect && g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) {
+ g.MakeMapIfNilAndSet(
+ &cl.undialableWithoutHolepunchDialAttemptedAfterHolepunchConnect,
+ holepunchAddr,
+ struct{}{},
+ )
+ }
+ cl.unlock()
if nc == nil {
+ if !sentRendezvous && !gotHolepunchConnect {
+ cl.lock()
+ g.MakeMapIfNilAndSet(&cl.undialableWithoutHolepunch, holepunchAddr, struct{}{})
+ cl.unlock()
+ }
if dialCtx.Err() != nil {
return nil, fmt.Errorf("dialing: %w", dialCtx.Err())
}
return nil, errors.New("dial failed")
}
+ if gotHolepunchConnect {
+ panicif.False(holepunchAddr.IsValid())
+ cl.lock()
+ if g.MapContains(cl.undialableWithoutHolepunchDialAttemptedAfterHolepunchConnect, holepunchAddr) {
+ g.MakeMapIfNilAndSet(
+ &cl.dialableOnlyAfterHolepunch,
+ holepunchAddr,
+ struct{}{},
+ )
+ }
+ cl.unlock()
+ }
addrIpPort, _ := tryIpPortFromNetAddr(addr)
c, err := cl.initiateProtocolHandshakes(
context.Background(), nc, t, obfuscatedHeader,
// Returns nil connection and nil error if no connection could be established
// for valid reasons.
-func (cl *Client) establishOutgoingConn(t *Torrent, addr PeerRemoteAddr) (c *PeerConn, err error) {
+func (cl *Client) establishOutgoingConn(opts outgoingConnOpts) (c *PeerConn, err error) {
torrent.Add("establish outgoing connection", 1)
obfuscatedHeaderFirst := cl.config.HeaderObfuscationPolicy.Preferred
- c, err = cl.establishOutgoingConnEx(t, addr, obfuscatedHeaderFirst)
+ c, err = cl.establishOutgoingConnEx(opts, obfuscatedHeaderFirst)
if err == nil {
torrent.Add("initiated conn with preferred header obfuscation", 1)
return
return
}
// Try again with encryption if we didn't earlier, or without if we did.
- c, err = cl.establishOutgoingConnEx(t, addr, !obfuscatedHeaderFirst)
+ c, err = cl.establishOutgoingConnEx(opts, !obfuscatedHeaderFirst)
if err == nil {
torrent.Add("initiated conn with fallback header obfuscation", 1)
}
return
}
+type outgoingConnOpts struct {
+ t *Torrent
+ addr PeerRemoteAddr
+ // Don't attempt to connect unless a connect message is received after initiating a rendezvous.
+ requireRendezvous bool
+ // Don't send rendezvous requests to eligible relays.
+ skipHolepunchRendezvous bool
+ // Outgoing connection attempt is in response to holepunch connect message.
+ receivedHolepunchConnect bool
+}
+
// Called to dial out and run a connection. The addr we're given is already
// considered half-open.
-func (cl *Client) outgoingConnection(t *Torrent, addr PeerRemoteAddr, ps PeerSource, trusted bool) {
+func (cl *Client) outgoingConnection(
+ opts outgoingConnOpts,
+ ps PeerSource,
+ trusted bool,
+ attemptKey outgoingConnAttemptKey,
+) {
cl.dialRateLimiter.Wait(context.Background())
- c, err := cl.establishOutgoingConn(t, addr)
+ c, err := cl.establishOutgoingConn(opts)
if err == nil {
c.conn.SetWriteDeadline(time.Time{})
}
defer cl.unlock()
// Don't release lock between here and addPeerConn, unless it's for
// failure.
- cl.noLongerHalfOpen(t, addr.String())
+ cl.noLongerHalfOpen(opts.t, opts.addr.String(), attemptKey)
if err != nil {
if cl.config.Debug {
- cl.logger.Levelf(log.Debug, "error establishing outgoing connection to %v: %v", addr, err)
+ cl.logger.Levelf(log.Debug, "error establishing outgoing connection to %v: %v", opts.addr, err)
}
return
}
defer c.close()
c.Discovery = ps
c.trusted = trusted
- t.runHandshookConnLoggingErr(c)
+ opts.t.runHandshookConnLoggingErr(c)
}
// The port number for incoming peer connections. 0 if the client isn't listening.
return nil
}
-const check = false
-
func (p *Peer) initUpdateRequestsTimer() {
- if check {
+ if check.Enabled {
if p.updateRequestsTimer != nil {
panic(p.updateRequestsTimer)
}
ExtendedPayload: func() []byte {
msg := pp.ExtendedHandshakeMessage{
M: map[pp.ExtensionName]pp.ExtensionNumber{
- pp.ExtensionNameMetadata: metadataExtendedId,
+ pp.ExtensionNameMetadata: metadataExtendedId,
+ utHolepunch.ExtensionName: utHolepunchExtendedId,
},
V: cl.config.ExtendedHandshakeClientVersion,
Reqq: localClientReqq,
},
conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
- halfOpen: make(map[string]PeerInfo),
-
storageOpener: storageClient,
maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
t.smartBanCache.Hash = sha1.Sum
t.smartBanCache.Init()
t.networkingEnabled.Set()
- t.logger = cl.logger.WithContextValue(t).WithNames("torrent", t.infoHash.HexString())
+ t.logger = cl.logger.WithContextValue(t).WithNames("torrent", t.infoHash.HexString()).WithDefaultLevel(log.Debug)
t.sourcesLogger = t.logger.WithNames("sources")
if opts.ChunkSize == 0 {
opts.ChunkSize = defaultChunkSize
return fmt.Sprintf("<%[1]T %[1]p>", cl)
}
-// Returns connection-level aggregate stats at the Client level. See the comment on
+// Returns connection-level aggregate connStats at the Client level. See the comment on
// TorrentStats.ConnStats.
func (cl *Client) ConnStats() ConnStats {
- return cl.stats.Copy()
+ return cl.connStats.Copy()
+}
+
+func (cl *Client) Stats() ClientStats {
+ cl.rLock()
+ defer cl.rUnlock()
+ return cl.statsLocked()
+}
+
+func (cl *Client) statsLocked() (stats ClientStats) {
+ stats.ConnStats = cl.connStats.Copy()
+ stats.ActiveHalfOpenAttempts = cl.numHalfOpen
+ stats.NumPeersUndialableWithoutHolepunchDialedAfterHolepunchConnect =
+ len(cl.undialableWithoutHolepunchDialAttemptedAfterHolepunchConnect)
+ stats.NumPeersDialableOnlyAfterHolepunch =
+ len(cl.dialableOnlyAfterHolepunch)
+ return
}