]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Store remoteAddr with each connection
authorMatt Joiner <anacrolix@gmail.com>
Sun, 4 Nov 2018 05:56:55 +0000 (16:56 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Sun, 4 Nov 2018 10:15:51 +0000 (21:15 +1100)
It would appear net.Conns returned from proxies don't have a RemoteAddr the client expects.

client.go
connection.go
connection_test.go
torrent.go

index 2c402198fcc1c65e3647bf5ef7ded546be43028f..dd3e4cca0cc0d32735140cbbfd97133ab7e5fe9b 100644 (file)
--- a/client.go
+++ b/client.go
@@ -442,7 +442,7 @@ func (cl *Client) incomingConnection(nc net.Conn) {
        if tc, ok := nc.(*net.TCPConn); ok {
                tc.SetLinger(0)
        }
-       c := cl.newConnection(nc, false)
+       c := cl.newConnection(nc, false, ipPortFromNetAddr(nc.RemoteAddr()), nc.RemoteAddr().Network())
        c.Discovery = peerSourceIncoming
        cl.runReceivedConn(c)
 }
@@ -460,7 +460,8 @@ func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
 }
 
 type dialResult struct {
-       Conn net.Conn
+       Conn    net.Conn
+       Network string
 }
 
 func countDialResult(err error) {
@@ -523,32 +524,30 @@ func peerNetworkEnabled(network string, cfg *ClientConfig) bool {
 }
 
 // Returns a connection over UTP or TCP, whichever is first to connect.
-func (cl *Client) dialFirst(ctx context.Context, addr string) net.Conn {
+func (cl *Client) dialFirst(ctx context.Context, addr string) dialResult {
        ctx, cancel := context.WithCancel(ctx)
        // As soon as we return one connection, cancel the others.
        defer cancel()
        left := 0
        resCh := make(chan dialResult, left)
-       dial := func(f func(_ context.Context, addr string) (net.Conn, error)) {
-               left++
-               go func() {
-                       c, err := f(ctx, addr)
-                       // This is a bit optimistic, but it looks non-trivial to thread
-                       // this through the proxy code. Set it now in case we close the
-                       // connection forthwith.
-                       if tc, ok := c.(*net.TCPConn); ok {
-                               tc.SetLinger(0)
-                       }
-                       countDialResult(err)
-                       resCh <- dialResult{c}
-               }()
-       }
        func() {
                cl.lock()
                defer cl.unlock()
                cl.eachListener(func(s socket) bool {
-                       if peerNetworkEnabled(s.Addr().Network(), cl.config) {
-                               dial(s.dial)
+                       network := s.Addr().Network()
+                       if peerNetworkEnabled(network, cl.config) {
+                               left++
+                               go func() {
+                                       c, err := s.dial(ctx, addr)
+                                       // This is a bit optimistic, but it looks non-trivial to thread
+                                       // this through the proxy code. Set it now in case we close the
+                                       // connection forthwith.
+                                       if tc, ok := c.(*net.TCPConn); ok {
+                                               tc.SetLinger(0)
+                                       }
+                                       countDialResult(err)
+                                       resCh <- dialResult{c, network}
+                               }()
                        }
                        return true
                })
@@ -573,7 +572,7 @@ func (cl *Client) dialFirst(ctx context.Context, addr string) net.Conn {
        if res.Conn != nil {
                go torrent.Add(fmt.Sprintf("network dialed first: %s", res.Conn.RemoteAddr().Network()), 1)
        }
-       return res.Conn
+       return res
 }
 
 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
@@ -586,8 +585,8 @@ func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
 
 // Performs initiator handshakes and returns a connection. Returns nil
 // *connection if no connection for valid reasons.
-func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool) (c *connection, err error) {
-       c = cl.newConnection(nc, true)
+func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool, remoteAddr ipPort, network string) (c *connection, err error) {
+       c = cl.newConnection(nc, true, remoteAddr, network)
        c.headerEncrypted = encryptHeader
        ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
        defer cancel()
@@ -608,8 +607,9 @@ func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torr
 
 // Returns nil connection and nil error if no connection could be established
 // for valid reasons.
-func (cl *Client) establishOutgoingConnEx(t *Torrent, addr string, ctx context.Context, obfuscatedHeader bool) (c *connection, err error) {
-       nc := cl.dialFirst(ctx, addr)
+func (cl *Client) establishOutgoingConnEx(t *Torrent, addr ipPort, ctx context.Context, obfuscatedHeader bool) (c *connection, err error) {
+       dr := cl.dialFirst(ctx, addr.String())
+       nc := dr.Conn
        if nc == nil {
                return
        }
@@ -618,12 +618,12 @@ func (cl *Client) establishOutgoingConnEx(t *Torrent, addr string, ctx context.C
                        nc.Close()
                }
        }()
-       return cl.handshakesConnection(ctx, nc, t, obfuscatedHeader)
+       return cl.handshakesConnection(ctx, nc, t, obfuscatedHeader, addr, dr.Network)
 }
 
 // Returns nil connection and nil error if no connection could be established
 // for valid reasons.
-func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
+func (cl *Client) establishOutgoingConn(t *Torrent, addr ipPort) (c *connection, err error) {
        torrent.Add("establish outgoing connection", 1)
        ctx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
                cl.rLock()
@@ -658,14 +658,14 @@ func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection,
 
 // Called to dial out and run a connection. The addr we're given is already
 // considered half-open.
-func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
+func (cl *Client) outgoingConnection(t *Torrent, addr ipPort, ps peerSource) {
        cl.dialRateLimiter.Wait(context.Background())
        c, err := cl.establishOutgoingConn(t, addr)
        cl.lock()
        defer cl.unlock()
        // Don't release lock between here and addConnection, unless it's for
        // failure.
-       cl.noLongerHalfOpen(t, addr)
+       cl.noLongerHalfOpen(t, addr.String())
        if err != nil {
                if cl.config.Debug {
                        log.Printf("error establishing outgoing connection: %s", err)
@@ -794,18 +794,18 @@ func (cl *Client) runReceivedConn(c *connection) {
                ).AddValue(
                        debugLogValue,
                ).Add(
-                       "network", c.remoteAddr().Network(),
+                       "network", c.network,
                ).Log(cl.logger)
                torrent.Add("error receiving handshake", 1)
                cl.lock()
-               cl.onBadAccept(c.remoteAddr())
+               cl.onBadAccept(c.remoteAddr)
                cl.unlock()
                return
        }
        if t == nil {
                torrent.Add("received handshake for unloaded torrent", 1)
                cl.lock()
-               cl.onBadAccept(c.remoteAddr())
+               cl.onBadAccept(c.remoteAddr)
                cl.unlock()
                return
        }
@@ -862,7 +862,7 @@ func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
                                        },
                                        V:            cl.config.ExtendedHandshakeClientVersion,
                                        Reqq:         64, // TODO: Really?
-                                       YourIp:       pp.CompactIp(missinggo.AddrIP(conn.remoteAddr())),
+                                       YourIp:       pp.CompactIp(conn.remoteAddr.IP),
                                        Encryption:   !cl.config.DisableEncryption,
                                        Port:         cl.incomingPeerPort(),
                                        MetadataSize: torrent.metadataSize(),
@@ -1182,7 +1182,7 @@ func (cl *Client) banPeerIP(ip net.IP) {
        cl.badPeerIPs[ip.String()] = struct{}{}
 }
 
-func (cl *Client) newConnection(nc net.Conn, outgoing bool) (c *connection) {
+func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr ipPort, network string) (c *connection) {
        c = &connection{
                conn:            nc,
                outgoing:        outgoing,
@@ -1190,6 +1190,8 @@ func (cl *Client) newConnection(nc net.Conn, outgoing bool) (c *connection) {
                PeerChoked:      true,
                PeerMaxRequests: 250,
                writeBuffer:     new(bytes.Buffer),
+               remoteAddr:      remoteAddr,
+               network:         network,
        }
        c.writerCond.L = cl.locker()
        c.setRW(connStatsReadWriter{nc, c})
@@ -1275,8 +1277,8 @@ func (cl *Client) ListenAddrs() (ret []net.Addr) {
        return
 }
 
-func (cl *Client) onBadAccept(addr net.Addr) {
-       ip := maskIpForAcceptLimiting(missinggo.AddrIP(addr))
+func (cl *Client) onBadAccept(addr ipPort) {
+       ip := maskIpForAcceptLimiting(addr.IP)
        if cl.acceptLimiter == nil {
                cl.acceptLimiter = make(map[ipStr]int)
        }
index f58fca217ef2349ce8dc5e245619cf24ac7312ad..bcbc5bb53841e8950900cb7be35606328266c33a 100644 (file)
@@ -42,8 +42,10 @@ type connection struct {
 
        t *Torrent
        // The actual Conn, used for closing, and setting socket options.
-       conn     net.Conn
-       outgoing bool
+       conn       net.Conn
+       outgoing   bool
+       network    string
+       remoteAddr ipPort
        // The Reader and Writer for this Conn, with hooks installed for stats,
        // limiting, deadlines etc.
        w io.Writer
@@ -133,7 +135,7 @@ func (cn *connection) expectingChunks() bool {
 
 // Returns true if the connection is over IPv6.
 func (cn *connection) ipv6() bool {
-       ip := missinggo.AddrIP(cn.remoteAddr())
+       ip := cn.remoteAddr.IP
        if ip.To4() != nil {
                return false
        }
@@ -179,10 +181,6 @@ func (cn *connection) mu() sync.Locker {
        return cn.t.cl.locker()
 }
 
-func (cn *connection) remoteAddr() net.Addr {
-       return cn.conn.RemoteAddr()
-}
-
 func (cn *connection) localAddr() net.Addr {
        return cn.conn.LocalAddr()
 }
@@ -241,7 +239,7 @@ func (cn *connection) connectionFlags() (ret string) {
 }
 
 func (cn *connection) utp() bool {
-       return isUtpNetwork(cn.remoteAddr().Network())
+       return isUtpNetwork(cn.network)
 }
 
 // Inspired by https://github.com/transmission/transmission/wiki/Peer-Status-Text.
@@ -279,7 +277,7 @@ func (cn *connection) downloadRate() float64 {
 
 func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
        // \t isn't preserved in <pre> blocks?
-       fmt.Fprintf(w, "%+-55q %s %s-%s\n", cn.PeerID, cn.PeerExtensionBytes, cn.localAddr(), cn.remoteAddr())
+       fmt.Fprintf(w, "%+-55q %s %s-%s\n", cn.PeerID, cn.PeerExtensionBytes, cn.localAddr(), cn.remoteAddr)
        fmt.Fprintf(w, "    last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n",
                eventAgeString(cn.lastMessageReceived),
                eventAgeString(cn.completedHandshake),
@@ -1150,15 +1148,15 @@ func (c *connection) mainReadLoop() (err error) {
                case pp.Extended:
                        err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload)
                case pp.Port:
-                       pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
-                       if err != nil {
-                               panic(err)
+                       pingAddr := net.UDPAddr{
+                               IP:   c.remoteAddr.IP,
+                               Port: int(c.remoteAddr.Port),
                        }
                        if msg.Port != 0 {
                                pingAddr.Port = int(msg.Port)
                        }
                        cl.eachDhtServer(func(s *dht.Server) {
-                               go s.Ping(pingAddr, nil)
+                               go s.Ping(&pingAddr, nil)
                        })
                case pp.AllowedFast:
                        torrent.Add("allowed fasts received", 1)
@@ -1550,9 +1548,10 @@ func (c *connection) peerPriority() peerPriority {
 }
 
 func (c *connection) remoteIp() net.IP {
-       return missinggo.AddrIP(c.remoteAddr())
+       return c.remoteAddr.IP
 }
 
+// ???
 func (c *connection) remoteIpPort() ipPort {
-       return ipPort{missinggo.AddrIP(c.remoteAddr()), uint16(missinggo.AddrPort(c.remoteAddr()))}
+       return c.remoteAddr
 }
index 673b5e514a48c65d7aca9a8bb9db6c43a6b1a7e4..4239745924cd5eaaa061f5dc0af36b36e05624bc 100644 (file)
@@ -23,7 +23,7 @@ func TestSendBitfieldThenHave(t *testing.T) {
                config: &ClientConfig{DownloadRateLimiter: unlimited},
        }
        cl.initLogger()
-       c := cl.newConnection(nil, false)
+       c := cl.newConnection(nil, false, ipPort{}, "")
        c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil))
        c.t.setInfo(&metainfo.Info{
                Pieces: make([]byte, metainfo.HashSize*3),
@@ -105,7 +105,7 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) {
        t.setChunkSize(defaultChunkSize)
        t.pendingPieces.Set(0, PiecePriorityNormal.BitmapPriority())
        r, w := net.Pipe()
-       cn := cl.newConnection(r, true)
+       cn := cl.newConnection(r, true, ipPort{}, "")
        cn.setTorrent(t)
        mrlErr := make(chan error)
        msg := pp.Message{
index 9400bf281d52b455aa2bfa11e4a056e8d80f6fa2..da5e3fa7a95c75386182734f5c71a25da51df2c5 100644 (file)
@@ -11,7 +11,6 @@ import (
        "net"
        "net/url"
        "os"
-       "strconv"
        "sync"
        "text/tabwriter"
        "time"
@@ -172,21 +171,11 @@ func (t *Torrent) KnownSwarm() (ks []Peer) {
 
        // Add active peers to the list
        for conn := range t.conns {
-               host, portString, err := net.SplitHostPort(conn.remoteAddr().String())
-               if err != nil {
-                       panic(err)
-               }
-
-               ip := net.ParseIP(host)
-               port, err := strconv.Atoi(portString)
-               if err != nil {
-                       panic(err)
-               }
 
                ks = append(ks, Peer{
                        Id:     conn.PeerID,
-                       IP:     ip,
-                       Port:   port,
+                       IP:     conn.remoteAddr.IP,
+                       Port:   int(conn.remoteAddr.Port),
                        Source: conn.Discovery,
                        // > If the connection is encrypted, that's certainly enough to set SupportsEncryption.
                        // > But if we're not connected to them with an encrypted connection, I couldn't say
@@ -232,10 +221,7 @@ func (t *Torrent) addrActive(addr string) bool {
                return true
        }
        for c := range t.conns {
-               ra := c.remoteAddr()
-               if ra == nil {
-                       continue
-               }
+               ra := c.remoteAddr
                if ra.String() == addr {
                        return true
                }
@@ -1621,7 +1607,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, correct bool) {
                                }())
                        }
                        c := touchers[0]
-                       t.cl.banPeerIP(missinggo.AddrIP(c.remoteAddr()))
+                       t.cl.banPeerIP(c.remoteAddr.IP)
                        c.Drop()
                }
                t.onIncompletePiece(piece)
@@ -1750,11 +1736,11 @@ func (t *Torrent) initiateConn(peer Peer) {
        if t.cl.badPeerIPPort(peer.IP, peer.Port) {
                return
        }
-       addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
-       if t.addrActive(addr) {
+       addr := ipPort{peer.IP, uint16(peer.Port)}
+       if t.addrActive(addr.String()) {
                return
        }
-       t.halfOpen[addr] = peer
+       t.halfOpen[addr.String()] = peer
        go t.cl.outgoingConnection(t, addr, peer.Source)
 }