From cea5584d6b227dfe326273b8f8b136ec7408be19 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Tue, 12 Jun 2018 20:14:00 +1000 Subject: [PATCH] Track outgoing through a new field on connection, and rework duplicate connection preferencing --- client.go | 18 ++++++++++-------- connection.go | 3 ++- connection_test.go | 4 ++-- torrent.go | 43 ++++++++++++++++++++----------------------- 4 files changed, 34 insertions(+), 34 deletions(-) diff --git a/client.go b/client.go index 5f28b12d..3815c10d 100644 --- a/client.go +++ b/client.go @@ -420,7 +420,7 @@ func (cl *Client) incomingConnection(nc net.Conn) { if tc, ok := nc.(*net.TCPConn); ok { tc.SetLinger(0) } - c := cl.newConnection(nc) + c := cl.newConnection(nc, false) c.Discovery = peerSourceIncoming cl.runReceivedConn(c) } @@ -572,7 +572,7 @@ func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) { // Performs initiator handshakes and returns a connection. Returns nil // *connection if no connection for valid reasons. func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader bool) (c *connection, err error) { - c = cl.newConnection(nc) + c = cl.newConnection(nc, true) c.headerEncrypted = encryptHeader ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout) defer cancel() @@ -656,7 +656,7 @@ func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) { } defer c.Close() c.Discovery = ps - cl.runHandshookConn(c, t, true) + cl.runHandshookConn(c, t) } // The port number for incoming peer connections. 0 if the client isn't @@ -768,13 +768,13 @@ func (cl *Client) runReceivedConn(c *connection) { } cl.mu.Lock() defer cl.mu.Unlock() - cl.runHandshookConn(c, t, false) + cl.runHandshookConn(c, t) } -func (cl *Client) runHandshookConn(c *connection, t *Torrent, outgoing bool) { +func (cl *Client) runHandshookConn(c *connection, t *Torrent) { c.setTorrent(t) if c.PeerID == cl.peerID { - if outgoing { + if c.outgoing { connsToSelf.Add(1) addr := c.conn.RemoteAddr().String() cl.dopplegangerAddrs[addr] = struct{}{} @@ -792,7 +792,8 @@ func (cl *Client) runHandshookConn(c *connection, t *Torrent, outgoing bool) { if connIsIpv6(c.conn) { torrent.Add("completed handshake over ipv6", 1) } - if !t.addConnection(c, outgoing) { + if err := t.addConnection(c); err != nil { + log.Fmsg("error adding connection: %s", err).AddValues(c, debugLogValue).Log(t.logger) return } defer t.dropConnection(c) @@ -1148,9 +1149,10 @@ func (cl *Client) banPeerIP(ip net.IP) { cl.badPeerIPs[ip.String()] = struct{}{} } -func (cl *Client) newConnection(nc net.Conn) (c *connection) { +func (cl *Client) newConnection(nc net.Conn, outgoing bool) (c *connection) { c = &connection{ conn: nc, + outgoing: outgoing, Choked: true, PeerChoked: true, PeerMaxRequests: 250, diff --git a/connection.go b/connection.go index e330bd29..6aa007b7 100644 --- a/connection.go +++ b/connection.go @@ -40,7 +40,8 @@ const ( type connection struct { t *Torrent // The actual Conn, used for closing, and setting socket options. - conn net.Conn + conn net.Conn + outgoing bool // The Reader and Writer for this Conn, with hooks installed for stats, // limiting, deadlines etc. w io.Writer diff --git a/connection_test.go b/connection_test.go index 92af0681..6334c52b 100644 --- a/connection_test.go +++ b/connection_test.go @@ -23,7 +23,7 @@ func TestSendBitfieldThenHave(t *testing.T) { r, w := io.Pipe() var cl Client cl.initLogger() - c := cl.newConnection(nil) + c := cl.newConnection(nil, false) c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil)) c.t.setInfo(&metainfo.Info{ Pieces: make([]byte, metainfo.HashSize*3), @@ -103,7 +103,7 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) { t.setChunkSize(defaultChunkSize) t.pendingPieces.Set(0, PiecePriorityNormal.BitmapPriority()) r, w := net.Pipe() - cn := cl.newConnection(r) + cn := cl.newConnection(r, true) cn.setTorrent(t) mrlErr := make(chan error) cl.mu.Lock() diff --git a/torrent.go b/torrent.go index b8b4282b..99b71ae4 100644 --- a/torrent.go +++ b/torrent.go @@ -1489,37 +1489,34 @@ func (t *Torrent) reconcileHandshakeStats(c *connection) { } // Returns true if the connection is added. -func (t *Torrent) addConnection(c *connection, outgoing bool) bool { +func (t *Torrent) addConnection(c *connection) error { if t.closed.IsSet() { - return false + return errors.New("torrent closed") } if !t.wantConns() { - return false + return errors.New("don't want conns") } for c0 := range t.conns { - if c.PeerID == c0.PeerID { - // Already connected to a client with that ID. - duplicateClientConns.Add(1) - lower := string(t.cl.peerID[:]) < string(c.PeerID[:]) - // Retain the connection from initiated from lower peer ID to - // higher. - if outgoing == lower { - // Close the other one. - c0.Close() - // TODO: Is it safe to delete from the map while we're - // iterating over it? - t.deleteConnection(c0) - } else { - // Abandon this one. - return false - } + if c.PeerID != c0.PeerID { + continue + } + // Already connected to a client with that ID. + preferOutbound := string(t.cl.peerID[:]) < string(c.PeerID[:]) + // Retain the connection from initiated from lower peer ID to higher. + if c0.outgoing == preferOutbound { + return errors.New("existing connection preferred") } + if c.outgoing != preferOutbound { + return errors.New("prefer older connection") + } + // Close the other one. + c0.Close() + // TODO: Is it safe to delete from the map while we're iterating + // over it? + t.deleteConnection(c0) } if len(t.conns) >= t.maxEstablishedConns { c := t.worstBadConn() - if c == nil { - return false - } if t.cl.config.Debug && missinggo.CryHeard() { log.Printf("%s: dropping connection to make room for new one:\n %s", t, c) } @@ -1530,7 +1527,7 @@ func (t *Torrent) addConnection(c *connection, outgoing bool) bool { panic(len(t.conns)) } t.conns[c] = struct{}{} - return true + return nil } func (t *Torrent) wantConns() bool { -- 2.48.1