]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Track outgoing through a new field on connection, and rework duplicate connection...
authorMatt Joiner <anacrolix@gmail.com>
Tue, 12 Jun 2018 10:14:00 +0000 (20:14 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 12 Jun 2018 10:14:00 +0000 (20:14 +1000)
client.go
connection.go
connection_test.go
torrent.go

index 5f28b12d078833232158e2920991316a4813a4ef..3815c10d91f9a6bd6a201e017a4273426e8f5f73 100644 (file)
--- a/client.go
+++ b/client.go
@@ -420,7 +420,7 @@ func (cl *Client) incomingConnection(nc net.Conn) {
        if tc, ok := nc.(*net.TCPConn); ok {
                tc.SetLinger(0)
        }
-       c := cl.newConnection(nc)
+       c := cl.newConnection(nc, false)
        c.Discovery = peerSourceIncoming
        cl.runReceivedConn(c)
 }
@@ -572,7 +572,7 @@ func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
 // Performs initiator handshakes and returns a connection. Returns nil
 // *connection if no connection for valid reasons.
 func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool) (c *connection, err error) {
-       c = cl.newConnection(nc)
+       c = cl.newConnection(nc, true)
        c.headerEncrypted = encryptHeader
        ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
        defer cancel()
@@ -656,7 +656,7 @@ func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
        }
        defer c.Close()
        c.Discovery = ps
-       cl.runHandshookConn(c, t, true)
+       cl.runHandshookConn(c, t)
 }
 
 // The port number for incoming peer connections. 0 if the client isn't
@@ -768,13 +768,13 @@ func (cl *Client) runReceivedConn(c *connection) {
        }
        cl.mu.Lock()
        defer cl.mu.Unlock()
-       cl.runHandshookConn(c, t, false)
+       cl.runHandshookConn(c, t)
 }
 
-func (cl *Client) runHandshookConn(c *connection, t *Torrent, outgoing bool) {
+func (cl *Client) runHandshookConn(c *connection, t *Torrent) {
        c.setTorrent(t)
        if c.PeerID == cl.peerID {
-               if outgoing {
+               if c.outgoing {
                        connsToSelf.Add(1)
                        addr := c.conn.RemoteAddr().String()
                        cl.dopplegangerAddrs[addr] = struct{}{}
@@ -792,7 +792,8 @@ func (cl *Client) runHandshookConn(c *connection, t *Torrent, outgoing bool) {
        if connIsIpv6(c.conn) {
                torrent.Add("completed handshake over ipv6", 1)
        }
-       if !t.addConnection(c, outgoing) {
+       if err := t.addConnection(c); err != nil {
+               log.Fmsg("error adding connection: %s", err).AddValues(c, debugLogValue).Log(t.logger)
                return
        }
        defer t.dropConnection(c)
@@ -1148,9 +1149,10 @@ func (cl *Client) banPeerIP(ip net.IP) {
        cl.badPeerIPs[ip.String()] = struct{}{}
 }
 
-func (cl *Client) newConnection(nc net.Conn) (c *connection) {
+func (cl *Client) newConnection(nc net.Conn, outgoing bool) (c *connection) {
        c = &connection{
                conn:            nc,
+               outgoing:        outgoing,
                Choked:          true,
                PeerChoked:      true,
                PeerMaxRequests: 250,
index e330bd297ee0591925079af4860fa5e5e4f2e193..6aa007b7369b2bfbaaaeacb7077ab79407f5421b 100644 (file)
@@ -40,7 +40,8 @@ const (
 type connection struct {
        t *Torrent
        // The actual Conn, used for closing, and setting socket options.
-       conn net.Conn
+       conn     net.Conn
+       outgoing bool
        // The Reader and Writer for this Conn, with hooks installed for stats,
        // limiting, deadlines etc.
        w io.Writer
index 92af06819e89c651e9f50bb4d3d265a5465fbdd4..6334c52bc8a3639d066abeae1167485b7e4af660 100644 (file)
@@ -23,7 +23,7 @@ func TestSendBitfieldThenHave(t *testing.T) {
        r, w := io.Pipe()
        var cl Client
        cl.initLogger()
-       c := cl.newConnection(nil)
+       c := cl.newConnection(nil, false)
        c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil))
        c.t.setInfo(&metainfo.Info{
                Pieces: make([]byte, metainfo.HashSize*3),
@@ -103,7 +103,7 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) {
        t.setChunkSize(defaultChunkSize)
        t.pendingPieces.Set(0, PiecePriorityNormal.BitmapPriority())
        r, w := net.Pipe()
-       cn := cl.newConnection(r)
+       cn := cl.newConnection(r, true)
        cn.setTorrent(t)
        mrlErr := make(chan error)
        cl.mu.Lock()
index b8b4282b100f2fb9f7d82fbaea2d2f434d0ded2b..99b71ae4f3d585cbd8033a7a7833dad4a7a145f2 100644 (file)
@@ -1489,37 +1489,34 @@ func (t *Torrent) reconcileHandshakeStats(c *connection) {
 }
 
 // Returns true if the connection is added.
-func (t *Torrent) addConnection(c *connection, outgoing bool) bool {
+func (t *Torrent) addConnection(c *connection) error {
        if t.closed.IsSet() {
-               return false
+               return errors.New("torrent closed")
        }
        if !t.wantConns() {
-               return false
+               return errors.New("don't want conns")
        }
        for c0 := range t.conns {
-               if c.PeerID == c0.PeerID {
-                       // Already connected to a client with that ID.
-                       duplicateClientConns.Add(1)
-                       lower := string(t.cl.peerID[:]) < string(c.PeerID[:])
-                       // Retain the connection from initiated from lower peer ID to
-                       // higher.
-                       if outgoing == lower {
-                               // Close the other one.
-                               c0.Close()
-                               // TODO: Is it safe to delete from the map while we're
-                               // iterating over it?
-                               t.deleteConnection(c0)
-                       } else {
-                               // Abandon this one.
-                               return false
-                       }
+               if c.PeerID != c0.PeerID {
+                       continue
+               }
+               // Already connected to a client with that ID.
+               preferOutbound := string(t.cl.peerID[:]) < string(c.PeerID[:])
+               // Retain the connection from initiated from lower peer ID to higher.
+               if c0.outgoing == preferOutbound {
+                       return errors.New("existing connection preferred")
                }
+               if c.outgoing != preferOutbound {
+                       return errors.New("prefer older connection")
+               }
+               // Close the other one.
+               c0.Close()
+               // TODO: Is it safe to delete from the map while we're iterating
+               // over it?
+               t.deleteConnection(c0)
        }
        if len(t.conns) >= t.maxEstablishedConns {
                c := t.worstBadConn()
-               if c == nil {
-                       return false
-               }
                if t.cl.config.Debug && missinggo.CryHeard() {
                        log.Printf("%s: dropping connection to make room for new one:\n    %s", t, c)
                }
@@ -1530,7 +1527,7 @@ func (t *Torrent) addConnection(c *connection, outgoing bool) bool {
                panic(len(t.conns))
        }
        t.conns[c] = struct{}{}
-       return true
+       return nil
 }
 
 func (t *Torrent) wantConns() bool {