From 0032b45a02e5c96b14e383ed37552d07884580c2 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Sun, 4 Nov 2018 16:56:55 +1100 Subject: [PATCH] Store remoteAddr with each connection It would appear net.Conns returned from proxies don't have a RemoteAddr the client expects. --- client.go | 72 ++++++++++++++++++++++++---------------------- connection.go | 29 +++++++++---------- connection_test.go | 4 +-- torrent.go | 28 +++++------------- 4 files changed, 60 insertions(+), 73 deletions(-) diff --git a/client.go b/client.go index 2c402198..dd3e4cca 100644 --- a/client.go +++ b/client.go @@ -442,7 +442,7 @@ func (cl *Client) incomingConnection(nc net.Conn) { if tc, ok := nc.(*net.TCPConn); ok { tc.SetLinger(0) } - c := cl.newConnection(nc, false) + c := cl.newConnection(nc, false, ipPortFromNetAddr(nc.RemoteAddr()), nc.RemoteAddr().Network()) c.Discovery = peerSourceIncoming cl.runReceivedConn(c) } @@ -460,7 +460,8 @@ func (cl *Client) torrent(ih metainfo.Hash) *Torrent { } type dialResult struct { - Conn net.Conn + Conn net.Conn + Network string } func countDialResult(err error) { @@ -523,32 +524,30 @@ func peerNetworkEnabled(network string, cfg *ClientConfig) bool { } // Returns a connection over UTP or TCP, whichever is first to connect. -func (cl *Client) dialFirst(ctx context.Context, addr string) net.Conn { +func (cl *Client) dialFirst(ctx context.Context, addr string) dialResult { ctx, cancel := context.WithCancel(ctx) // As soon as we return one connection, cancel the others. defer cancel() left := 0 resCh := make(chan dialResult, left) - dial := func(f func(_ context.Context, addr string) (net.Conn, error)) { - left++ - go func() { - c, err := f(ctx, addr) - // This is a bit optimistic, but it looks non-trivial to thread - // this through the proxy code. Set it now in case we close the - // connection forthwith. - if tc, ok := c.(*net.TCPConn); ok { - tc.SetLinger(0) - } - countDialResult(err) - resCh <- dialResult{c} - }() - } func() { cl.lock() defer cl.unlock() cl.eachListener(func(s socket) bool { - if peerNetworkEnabled(s.Addr().Network(), cl.config) { - dial(s.dial) + network := s.Addr().Network() + if peerNetworkEnabled(network, cl.config) { + left++ + go func() { + c, err := s.dial(ctx, addr) + // This is a bit optimistic, but it looks non-trivial to thread + // this through the proxy code. Set it now in case we close the + // connection forthwith. + if tc, ok := c.(*net.TCPConn); ok { + tc.SetLinger(0) + } + countDialResult(err) + resCh <- dialResult{c, network} + }() } return true }) @@ -573,7 +572,7 @@ func (cl *Client) dialFirst(ctx context.Context, addr string) net.Conn { if res.Conn != nil { go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1) } - return res.Conn + return res } func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) { @@ -586,8 +585,8 @@ func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) { // Performs initiator handshakes and returns a connection. Returns nil // *connection if no connection for valid reasons. -func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool) (c *connection, err error) { - c = cl.newConnection(nc, true) +func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool, remoteAddr ipPort, network string) (c *connection, err error) { + c = cl.newConnection(nc, true, remoteAddr, network) c.headerEncrypted = encryptHeader ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout) defer cancel() @@ -608,8 +607,9 @@ func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torr // Returns nil connection and nil error if no connection could be established // for valid reasons. -func (cl *Client) establishOutgoingConnEx(t *Torrent, addr string, ctx context.Context, obfuscatedHeader bool) (c *connection, err error) { - nc := cl.dialFirst(ctx, addr) +func (cl *Client) establishOutgoingConnEx(t *Torrent, addr ipPort, ctx context.Context, obfuscatedHeader bool) (c *connection, err error) { + dr := cl.dialFirst(ctx, addr.String()) + nc := dr.Conn if nc == nil { return } @@ -618,12 +618,12 @@ func (cl *Client) establishOutgoingConnEx(t *Torrent, addr string, ctx context.C nc.Close() } }() - return cl.handshakesConnection(ctx, nc, t, obfuscatedHeader) + return cl.handshakesConnection(ctx, nc, t, obfuscatedHeader, addr, dr.Network) } // Returns nil connection and nil error if no connection could be established // for valid reasons. -func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) { +func (cl *Client) establishOutgoingConn(t *Torrent, addr ipPort) (c *connection, err error) { torrent.Add("establish outgoing connection", 1) ctx, cancel := context.WithTimeout(context.Background(), func() time.Duration { cl.rLock() @@ -658,14 +658,14 @@ func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, // Called to dial out and run a connection. The addr we're given is already // considered half-open. -func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) { +func (cl *Client) outgoingConnection(t *Torrent, addr ipPort, ps peerSource) { cl.dialRateLimiter.Wait(context.Background()) c, err := cl.establishOutgoingConn(t, addr) cl.lock() defer cl.unlock() // Don't release lock between here and addConnection, unless it's for // failure. - cl.noLongerHalfOpen(t, addr) + cl.noLongerHalfOpen(t, addr.String()) if err != nil { if cl.config.Debug { log.Printf("error establishing outgoing connection: %s", err) @@ -794,18 +794,18 @@ func (cl *Client) runReceivedConn(c *connection) { ).AddValue( debugLogValue, ).Add( - "network", c.remoteAddr().Network(), + "network", c.network, ).Log(cl.logger) torrent.Add("error receiving handshake", 1) cl.lock() - cl.onBadAccept(c.remoteAddr()) + cl.onBadAccept(c.remoteAddr) cl.unlock() return } if t == nil { torrent.Add("received handshake for unloaded torrent", 1) cl.lock() - cl.onBadAccept(c.remoteAddr()) + cl.onBadAccept(c.remoteAddr) cl.unlock() return } @@ -862,7 +862,7 @@ func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) { }, V: cl.config.ExtendedHandshakeClientVersion, Reqq: 64, // TODO: Really? - YourIp: pp.CompactIp(missinggo.AddrIP(conn.remoteAddr())), + YourIp: pp.CompactIp(conn.remoteAddr.IP), Encryption: !cl.config.DisableEncryption, Port: cl.incomingPeerPort(), MetadataSize: torrent.metadataSize(), @@ -1182,7 +1182,7 @@ func (cl *Client) banPeerIP(ip net.IP) { cl.badPeerIPs[ip.String()] = struct{}{} } -func (cl *Client) newConnection(nc net.Conn, outgoing bool) (c *connection) { +func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr ipPort, network string) (c *connection) { c = &connection{ conn: nc, outgoing: outgoing, @@ -1190,6 +1190,8 @@ func (cl *Client) newConnection(nc net.Conn, outgoing bool) (c *connection) { PeerChoked: true, PeerMaxRequests: 250, writeBuffer: new(bytes.Buffer), + remoteAddr: remoteAddr, + network: network, } c.writerCond.L = cl.locker() c.setRW(connStatsReadWriter{nc, c}) @@ -1275,8 +1277,8 @@ func (cl *Client) ListenAddrs() (ret []net.Addr) { return } -func (cl *Client) onBadAccept(addr net.Addr) { - ip := maskIpForAcceptLimiting(missinggo.AddrIP(addr)) +func (cl *Client) onBadAccept(addr ipPort) { + ip := maskIpForAcceptLimiting(addr.IP) if cl.acceptLimiter == nil { cl.acceptLimiter = make(map[ipStr]int) } diff --git a/connection.go b/connection.go index f58fca21..bcbc5bb5 100644 --- a/connection.go +++ b/connection.go @@ -42,8 +42,10 @@ type connection struct { t *Torrent // The actual Conn, used for closing, and setting socket options. - conn net.Conn - outgoing bool + conn net.Conn + outgoing bool + network string + remoteAddr ipPort // The Reader and Writer for this Conn, with hooks installed for stats, // limiting, deadlines etc. w io.Writer @@ -133,7 +135,7 @@ func (cn *connection) expectingChunks() bool { // Returns true if the connection is over IPv6. func (cn *connection) ipv6() bool { - ip := missinggo.AddrIP(cn.remoteAddr()) + ip := cn.remoteAddr.IP if ip.To4() != nil { return false } @@ -179,10 +181,6 @@ func (cn *connection) mu() sync.Locker { return cn.t.cl.locker() } -func (cn *connection) remoteAddr() net.Addr { - return cn.conn.RemoteAddr() -} - func (cn *connection) localAddr() net.Addr { return cn.conn.LocalAddr() } @@ -241,7 +239,7 @@ func (cn *connection) connectionFlags() (ret string) { } func (cn *connection) utp() bool { - return isUtpNetwork(cn.remoteAddr().Network()) + return isUtpNetwork(cn.network) } // Inspired by https://github.com/transmission/transmission/wiki/Peer-Status-Text. @@ -279,7 +277,7 @@ func (cn *connection) downloadRate() float64 { func (cn *connection) WriteStatus(w io.Writer, t *Torrent) { // \t isn't preserved in
 blocks?
-	fmt.Fprintf(w, "%+-55q %s %s-%s\n", cn.PeerID, cn.PeerExtensionBytes, cn.localAddr(), cn.remoteAddr())
+	fmt.Fprintf(w, "%+-55q %s %s-%s\n", cn.PeerID, cn.PeerExtensionBytes, cn.localAddr(), cn.remoteAddr)
 	fmt.Fprintf(w, "    last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n",
 		eventAgeString(cn.lastMessageReceived),
 		eventAgeString(cn.completedHandshake),
@@ -1150,15 +1148,15 @@ func (c *connection) mainReadLoop() (err error) {
 		case pp.Extended:
 			err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload)
 		case pp.Port:
-			pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
-			if err != nil {
-				panic(err)
+			pingAddr := net.UDPAddr{
+				IP:   c.remoteAddr.IP,
+				Port: int(c.remoteAddr.Port),
 			}
 			if msg.Port != 0 {
 				pingAddr.Port = int(msg.Port)
 			}
 			cl.eachDhtServer(func(s *dht.Server) {
-				go s.Ping(pingAddr, nil)
+				go s.Ping(&pingAddr, nil)
 			})
 		case pp.AllowedFast:
 			torrent.Add("allowed fasts received", 1)
@@ -1550,9 +1548,10 @@ func (c *connection) peerPriority() peerPriority {
 }
 
 func (c *connection) remoteIp() net.IP {
-	return missinggo.AddrIP(c.remoteAddr())
+	return c.remoteAddr.IP
 }
 
+// ???
 func (c *connection) remoteIpPort() ipPort {
-	return ipPort{missinggo.AddrIP(c.remoteAddr()), uint16(missinggo.AddrPort(c.remoteAddr()))}
+	return c.remoteAddr
 }
diff --git a/connection_test.go b/connection_test.go
index 673b5e51..42397459 100644
--- a/connection_test.go
+++ b/connection_test.go
@@ -23,7 +23,7 @@ func TestSendBitfieldThenHave(t *testing.T) {
 		config: &ClientConfig{DownloadRateLimiter: unlimited},
 	}
 	cl.initLogger()
-	c := cl.newConnection(nil, false)
+	c := cl.newConnection(nil, false, ipPort{}, "")
 	c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil))
 	c.t.setInfo(&metainfo.Info{
 		Pieces: make([]byte, metainfo.HashSize*3),
@@ -105,7 +105,7 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) {
 	t.setChunkSize(defaultChunkSize)
 	t.pendingPieces.Set(0, PiecePriorityNormal.BitmapPriority())
 	r, w := net.Pipe()
-	cn := cl.newConnection(r, true)
+	cn := cl.newConnection(r, true, ipPort{}, "")
 	cn.setTorrent(t)
 	mrlErr := make(chan error)
 	msg := pp.Message{
diff --git a/torrent.go b/torrent.go
index 9400bf28..da5e3fa7 100644
--- a/torrent.go
+++ b/torrent.go
@@ -11,7 +11,6 @@ import (
 	"net"
 	"net/url"
 	"os"
-	"strconv"
 	"sync"
 	"text/tabwriter"
 	"time"
@@ -172,21 +171,11 @@ func (t *Torrent) KnownSwarm() (ks []Peer) {
 
 	// Add active peers to the list
 	for conn := range t.conns {
-		host, portString, err := net.SplitHostPort(conn.remoteAddr().String())
-		if err != nil {
-			panic(err)
-		}
-
-		ip := net.ParseIP(host)
-		port, err := strconv.Atoi(portString)
-		if err != nil {
-			panic(err)
-		}
 
 		ks = append(ks, Peer{
 			Id:     conn.PeerID,
-			IP:     ip,
-			Port:   port,
+			IP:     conn.remoteAddr.IP,
+			Port:   int(conn.remoteAddr.Port),
 			Source: conn.Discovery,
 			// > If the connection is encrypted, that's certainly enough to set SupportsEncryption.
 			// > But if we're not connected to them with an encrypted connection, I couldn't say
@@ -232,10 +221,7 @@ func (t *Torrent) addrActive(addr string) bool {
 		return true
 	}
 	for c := range t.conns {
-		ra := c.remoteAddr()
-		if ra == nil {
-			continue
-		}
+		ra := c.remoteAddr
 		if ra.String() == addr {
 			return true
 		}
@@ -1621,7 +1607,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, correct bool) {
 				}())
 			}
 			c := touchers[0]
-			t.cl.banPeerIP(missinggo.AddrIP(c.remoteAddr()))
+			t.cl.banPeerIP(c.remoteAddr.IP)
 			c.Drop()
 		}
 		t.onIncompletePiece(piece)
@@ -1750,11 +1736,11 @@ func (t *Torrent) initiateConn(peer Peer) {
 	if t.cl.badPeerIPPort(peer.IP, peer.Port) {
 		return
 	}
-	addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
-	if t.addrActive(addr) {
+	addr := ipPort{peer.IP, uint16(peer.Port)}
+	if t.addrActive(addr.String()) {
 		return
 	}
-	t.halfOpen[addr] = peer
+	t.halfOpen[addr.String()] = peer
 	go t.cl.outgoingConnection(t, addr, peer.Source)
 }
 
-- 
2.48.1