client.go | 19 +++++++++++++++++-- closewrapper.go | 12 ++++++++++++ config.go | 4 ++++ diff --git a/client.go b/client.go index 81f620caf283a886badaee7fad449599169c31fd..99edceeec904fca5affecce4d5d1d01812381d93 100644 --- a/client.go +++ b/client.go @@ -21,6 +21,7 @@ "github.com/anacrolix/dht/krpc" "github.com/anacrolix/log" "github.com/anacrolix/missinggo" "github.com/anacrolix/missinggo/bitmap" + "github.com/anacrolix/missinggo/conntrack" "github.com/anacrolix/missinggo/perf" "github.com/anacrolix/missinggo/pproffd" "github.com/anacrolix/missinggo/pubsub" @@ -291,7 +292,8 @@ return cl.config.PublicIp6 } return cl.config.PublicIp4 }(), - StartingNodes: cl.config.DhtStartingNodes, + StartingNodes: cl.config.DhtStartingNodes, + ConnectionTracking: cl.config.ConnTracker, } s, err = dht.NewServer(&cfg) if err == nil { @@ -538,6 +540,9 @@ network := s.Addr().Network() if peerNetworkEnabled(network, cl.config) { left++ go func() { + cte := cl.config.ConnTracker.Wait( + conntrack.Entry{network, s.Addr().String(), addr}, + "dial torrent client") c, err := s.dial(ctx, addr) // This is a bit optimistic, but it looks non-trivial to thread // this through the proxy code. Set it now in case we close the @@ -546,7 +551,17 @@ if tc, ok := c.(*net.TCPConn); ok { tc.SetLinger(0) } countDialResult(err) - resCh <- dialResult{c, network} + dr := dialResult{c, network} + if c == nil { + cte.Done() + } else { + dr.Conn = closeWrapper{c, func() error { + err := c.Close() + cte.Done() + return err + }} + } + resCh <- dr }() } return true diff --git a/closewrapper.go b/closewrapper.go new file mode 100644 index 0000000000000000000000000000000000000000..5449418996852f706db36cd78d9d6a39cc717e60 --- /dev/null +++ b/closewrapper.go @@ -0,0 +1,12 @@ +package torrent + +import "net" + +type closeWrapper struct { + net.Conn + closer func() error +} + +func (me closeWrapper) Close() error { + return me.closer() +} diff --git a/config.go b/config.go index 31f2251387f796cdf7ff54d9d731311ec48a45ad..7e1bcdd0f92bd72e9f7c8a7b0024380331beef9e 100644 --- a/config.go +++ b/config.go @@ -8,6 +8,7 @@ "time" "github.com/anacrolix/dht" "github.com/anacrolix/missinggo" + "github.com/anacrolix/missinggo/conntrack" "github.com/anacrolix/missinggo/expect" "github.com/anacrolix/torrent/iplist" "github.com/anacrolix/torrent/storage" @@ -121,6 +122,8 @@ DisableAcceptRateLimiting bool // Don't add connections that have the same peer ID as an existing // connection for a given Torrent. dropDuplicatePeerIds bool + + ConnTracker *conntrack.Instance } func (cfg *ClientConfig) SetListenAddr(addr string) *ClientConfig { @@ -147,6 +150,7 @@ DhtStartingNodes: dht.GlobalBootstrapAddrs, ListenHost: func(string) string { return "" }, UploadRateLimiter: unlimited, DownloadRateLimiter: unlimited, + ConnTracker: conntrack.NewInstance(), } }