"net/netip"
"sort"
"strconv"
- "strings"
"time"
+ "github.com/anacrolix/torrent/internal/panicif"
+
"github.com/anacrolix/chansync"
"github.com/anacrolix/chansync/events"
"github.com/anacrolix/dht/v2"
"golang.org/x/time/rate"
"github.com/anacrolix/torrent/bencode"
+ "github.com/anacrolix/torrent/internal/check"
"github.com/anacrolix/torrent/internal/limiter"
"github.com/anacrolix/torrent/iplist"
"github.com/anacrolix/torrent/metainfo"
type Client struct {
// An aggregate of stats over all connections. First in struct to ensure 64-bit alignment of
// fields. See #262.
- stats ConnStats
+ connStats ConnStats
_mu lockWithDeferreds
event sync.Cond
activeAnnounceLimiter limiter.Instance
httpClient *http.Client
+
+ undialableWithoutHolepunch map[netip.AddrPort]struct{}
+ undialableWithoutHolepunchDialAttemptedAfterHolepunchConnect map[netip.AddrPort]struct{}
+ dialableOnlyAfterHolepunch map[netip.AddrPort]struct{}
}
type ipStr string
fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
writeDhtServerStatus(w, s)
})
- spew.Fdump(w, &cl.stats)
+ dumpStats(w, cl.statsLocked())
torrentsSlice := cl.torrentsAsSlice()
fmt.Fprintf(w, "# Torrents: %d\n", len(torrentsSlice))
fmt.Fprintln(w)
res = <-resCh
}
}()
- // There are still incompleted dials.
+ // There are still uncompleted dials.
go func() {
for ; left > 0; left-- {
conn := (<-resCh).Conn
func dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
c, err := s.Dial(ctx, addr)
+ if err != nil {
+ log.Levelf(log.Debug, "error dialing %q: %v", addr, err)
+ }
// This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
- // it now in case we close the connection forthwith.
+ // it now in case we close the connection forthwith. Note this is also done in the TCP dialer
+ // code to increase the chance it's done.
if tc, ok := c.(*net.TCPConn); ok {
tc.SetLinger(0)
}
return c
}
-func forgettableDialError(err error) bool {
- return strings.Contains(err.Error(), "no suitable address found")
-}
-
-func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
- if _, ok := t.halfOpen[addr]; !ok {
- panic("invariant broken")
+func (cl *Client) noLongerHalfOpen(t *Torrent, addr string, attemptKey outgoingConnAttemptKey) {
+ path := t.getHalfOpenPath(addr, attemptKey)
+ if !path.Exists() {
+ panic("should exist")
}
- delete(t.halfOpen, addr)
+ path.Delete()
cl.numHalfOpen--
+ if cl.numHalfOpen < 0 {
+ panic("should not be possible")
+ }
for _, t := range cl.torrents {
t.openNewConns()
}
}
+func (cl *Client) countHalfOpenFromTorrents() (count int) {
+ for _, t := range cl.torrents {
+ count += t.numHalfOpenAttempts()
+ }
+ return
+}
+
// Performs initiator handshakes and returns a connection. Returns nil *PeerConn if no connection
// for valid reasons.
func (cl *Client) initiateProtocolHandshakes(
// Returns nil connection and nil error if no connection could be established for valid reasons.
func (cl *Client) initiateRendezvousConnect(
- t *Torrent, addr PeerRemoteAddr,
+ t *Torrent, holepunchAddr netip.AddrPort,
) (ok bool, err error) {
- holepunchAddr, err := addrPortFromPeerRemoteAddr(addr)
- if err != nil {
- return
- }
cl.lock()
defer cl.unlock()
rz, err := t.startHolepunchRendezvous(holepunchAddr)
) {
t := opts.t
addr := opts.addr
- var rzOk bool
- if !opts.skipHolepunchRendezvous {
- rzOk, err = cl.initiateRendezvousConnect(t, addr)
- if err != nil {
- err = fmt.Errorf("initiating rendezvous connect: %w", err)
+ holepunchAddr, err := addrPortFromPeerRemoteAddr(addr)
+ var sentRendezvous bool
+ if err == nil {
+ if !opts.skipHolepunchRendezvous {
+ sentRendezvous, err = cl.initiateRendezvousConnect(t, holepunchAddr)
+ if err != nil {
+ err = fmt.Errorf("initiating rendezvous connect: %w", err)
+ }
}
}
- if opts.requireRendezvous && !rzOk {
+ gotHolepunchConnect := (err == nil && sentRendezvous) || opts.receivedHolepunchConnect
+ if opts.requireRendezvous && !sentRendezvous {
return nil, err
}
if err != nil {
- log.Print(err)
+ t.logger.Print(err)
}
dialCtx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
cl.rLock()
defer cancel()
dr := cl.dialFirst(dialCtx, addr.String())
nc := dr.Conn
+ cl.lock()
+ if gotHolepunchConnect && g.MapContains(cl.undialableWithoutHolepunch, holepunchAddr) {
+ g.MakeMapIfNilAndSet(
+ &cl.undialableWithoutHolepunchDialAttemptedAfterHolepunchConnect,
+ holepunchAddr,
+ struct{}{},
+ )
+ }
+ cl.unlock()
if nc == nil {
+ if !sentRendezvous && !gotHolepunchConnect {
+ cl.lock()
+ g.MakeMapIfNilAndSet(&cl.undialableWithoutHolepunch, holepunchAddr, struct{}{})
+ cl.unlock()
+ }
if dialCtx.Err() != nil {
return nil, fmt.Errorf("dialing: %w", dialCtx.Err())
}
return nil, errors.New("dial failed")
}
+ if gotHolepunchConnect {
+ panicif.False(holepunchAddr.IsValid())
+ cl.lock()
+ if g.MapContains(cl.undialableWithoutHolepunchDialAttemptedAfterHolepunchConnect, holepunchAddr) {
+ g.MakeMapIfNilAndSet(
+ &cl.dialableOnlyAfterHolepunch,
+ holepunchAddr,
+ struct{}{},
+ )
+ }
+ cl.unlock()
+ }
addrIpPort, _ := tryIpPortFromNetAddr(addr)
c, err := cl.initiateProtocolHandshakes(
context.Background(), nc, t, obfuscatedHeader,
requireRendezvous bool
// Don't send rendezvous requests to eligible relays.
skipHolepunchRendezvous bool
+ // Outgoing connection attempt is in response to holepunch connect message.
+ receivedHolepunchConnect bool
}
// Called to dial out and run a connection. The addr we're given is already
// considered half-open.
-func (cl *Client) outgoingConnection(opts outgoingConnOpts, ps PeerSource, trusted bool) {
+func (cl *Client) outgoingConnection(
+ opts outgoingConnOpts,
+ ps PeerSource,
+ trusted bool,
+ attemptKey outgoingConnAttemptKey,
+) {
cl.dialRateLimiter.Wait(context.Background())
c, err := cl.establishOutgoingConn(opts)
if err == nil {
defer cl.unlock()
// Don't release lock between here and addPeerConn, unless it's for
// failure.
- cl.noLongerHalfOpen(opts.t, opts.addr.String())
+ cl.noLongerHalfOpen(opts.t, opts.addr.String(), attemptKey)
if err != nil {
if cl.config.Debug {
cl.logger.Levelf(log.Debug, "error establishing outgoing connection to %v: %v", opts.addr, err)
return nil
}
-const check = false
-
func (p *Peer) initUpdateRequestsTimer() {
- if check {
+ if check.Enabled {
if p.updateRequestsTimer != nil {
panic(p.updateRequestsTimer)
}
},
conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
- halfOpen: make(map[string]PeerInfo),
-
storageOpener: storageClient,
maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
t.smartBanCache.Hash = sha1.Sum
t.smartBanCache.Init()
t.networkingEnabled.Set()
- t.logger = cl.logger.WithContextValue(t).WithNames("torrent", t.infoHash.HexString())
+ t.logger = cl.logger.WithContextValue(t).WithNames("torrent", t.infoHash.HexString()).WithDefaultLevel(log.Debug)
t.sourcesLogger = t.logger.WithNames("sources")
if opts.ChunkSize == 0 {
opts.ChunkSize = defaultChunkSize
return fmt.Sprintf("<%[1]T %[1]p>", cl)
}
-// Returns connection-level aggregate stats at the Client level. See the comment on
+// Returns connection-level aggregate connStats at the Client level. See the comment on
// TorrentStats.ConnStats.
func (cl *Client) ConnStats() ConnStats {
- return cl.stats.Copy()
+ return cl.connStats.Copy()
+}
+
+func (cl *Client) Stats() ClientStats {
+ cl.rLock()
+ defer cl.rUnlock()
+ return cl.statsLocked()
+}
+
+func (cl *Client) statsLocked() (stats ClientStats) {
+ stats.ConnStats = cl.connStats.Copy()
+ stats.ActiveHalfOpenAttempts = cl.numHalfOpen
+ stats.NumPeersUndialableWithoutHolepunchDialedAfterHolepunchConnect =
+ len(cl.undialableWithoutHolepunchDialAttemptedAfterHolepunchConnect)
+ stats.NumPeersDialableOnlyAfterHolepunch =
+ len(cl.dialableOnlyAfterHolepunch)
+ return
}