From 326b36545b0a716eca9dcc5bf385e7e2f2ccafac Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Tue, 5 Jul 2016 16:23:17 +1000 Subject: [PATCH] Prepare to allow max conns per torrent to be configured --- client.go | 72 +++++--------------------------------------------- connection.go | 16 +++++++++++ global.go | 7 ++--- torrent.go | 67 +++++++++++++++++++++++++++++++++++----------- worst_conns.go | 10 +++---- 5 files changed, 83 insertions(+), 89 deletions(-) diff --git a/client.go b/client.go index b83ef33d..9a223204 100644 --- a/client.go +++ b/client.go @@ -264,7 +264,7 @@ func NewClient(cfg *Config) (cl *Client, err error) { } }() cl = &Client{ - halfOpenLimit: socketsPerTorrent, + halfOpenLimit: defaultHalfOpenConnsPerTorrent, config: *cfg, defaultStorage: cfg.DefaultStorage, dopplegangerAddrs: make(map[string]struct{}), @@ -382,7 +382,7 @@ func (cl *Client) waitAccept() { defer cl.mu.Unlock() for { for _, t := range cl.torrents { - if cl.wantConns(t) { + if t.wantConns() { return } } @@ -936,7 +936,7 @@ func (cl *Client) runHandshookConn(c *connection, t *Torrent) { c.rw, } completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1) - if !cl.addConnection(t, c) { + if !t.addConnection(c) { return } defer t.dropConnection(c) @@ -1337,67 +1337,10 @@ func (cl *Client) connectionLoop(t *Torrent, c *connection) error { } } -// Returns true if the connection is added. -func (cl *Client) addConnection(t *Torrent, c *connection) bool { - if cl.closed.IsSet() { - return false - } - if !cl.wantConns(t) { - return false - } - for _, c0 := range t.conns { - if c.PeerID == c0.PeerID { - // Already connected to a client with that ID. - duplicateClientConns.Add(1) - return false - } - } - if len(t.conns) >= socketsPerTorrent { - c := t.worstBadConn(cl) - if c == nil { - return false - } - if cl.config.Debug && missinggo.CryHeard() { - log.Printf("%s: dropping connection to make room for new one:\n %s", t, c) - } - c.Close() - t.deleteConnection(c) - } - if len(t.conns) >= socketsPerTorrent { - panic(len(t.conns)) - } - t.conns = append(t.conns, c) - c.t = t - return true -} - -func (cl *Client) usefulConn(t *Torrent, c *connection) bool { - if c.closed.IsSet() { - return false - } - if !t.haveInfo() { - return c.supportsExtension("ut_metadata") - } - if t.seeding() { - return c.PeerInterested - } - return t.connHasWantedPieces(c) -} - -func (cl *Client) wantConns(t *Torrent) bool { - if !t.seeding() && !t.needData() { - return false - } - if len(t.conns) < socketsPerTorrent { - return true - } - return t.worstBadConn(cl) != nil -} - func (cl *Client) openNewConns(t *Torrent) { defer t.updateWantPeersEvent() for len(t.peers) != 0 { - if !cl.wantConns(t) { + if !t.wantConns() { return } if len(t.halfOpen) >= cl.halfOpenLimit { @@ -1431,9 +1374,7 @@ func (cl *Client) badPeerIPPort(ip net.IP, port int) bool { return false } -// Prepare a Torrent without any attachment to a Client. That means we can -// initialize fields all fields that don't require the Client without locking -// it. +// Return a Torrent ready for insertion into a Client. func (cl *Client) newTorrent(ih metainfo.Hash) (t *Torrent) { t = &Torrent{ cl: cl, @@ -1444,7 +1385,8 @@ func (cl *Client) newTorrent(ih metainfo.Hash) (t *Torrent) { halfOpen: make(map[string]struct{}), pieceStateChanges: pubsub.NewPubSub(), - storageOpener: cl.defaultStorage, + storageOpener: cl.defaultStorage, + maxEstablishedConns: defaultEstablishedConnsPerTorrent, } return } diff --git a/connection.go b/connection.go index 7f8e10b4..1f8d3e74 100644 --- a/connection.go +++ b/connection.go @@ -653,3 +653,19 @@ func (cn *connection) wroteBytes(b []byte) { cn.stats.wroteBytes(b) cn.t.stats.wroteBytes(b) } + +// Returns whether the connection is currently useful to us. We're seeding and +// they want data, we don't have metainfo and they can provide it, etc. +func (c *connection) useful() bool { + t := c.t + if c.closed.IsSet() { + return false + } + if !t.haveInfo() { + return c.supportsExtension("ut_metadata") + } + if t.seeding() { + return c.PeerInterested + } + return t.connHasWantedPieces(c) +} diff --git a/global.go b/global.go index f938baa2..0d241128 100644 --- a/global.go +++ b/global.go @@ -36,9 +36,10 @@ const ( // http://www.bittorrent.org/beps/bep_0005.html defaultExtensionBytes = "\x00\x00\x00\x00\x00\x10\x00\x01" - socketsPerTorrent = 80 - torrentPeersHighWater = 200 - torrentPeersLowWater = 50 + defaultEstablishedConnsPerTorrent = 80 + defaultHalfOpenConnsPerTorrent = 80 + torrentPeersHighWater = 200 + torrentPeersLowWater = 50 // Limit how long handshake can take. This is to reduce the lingering // impact of a few bad apples. 4s loses 1% of successful handshakes that diff --git a/torrent.go b/torrent.go index 6af85be7..50441879 100644 --- a/torrent.go +++ b/torrent.go @@ -61,7 +61,8 @@ type Torrent struct { // The info dict. nil if we don't have it (yet). info *metainfo.InfoEx // Active peer connections, running message stream loops. - conns []*connection + conns []*connection + maxEstablishedConns int // Set of addrs to which we're attempting to connect. Connections are // half-open until all handshakes are completed. halfOpen map[string]struct{} @@ -133,12 +134,8 @@ func (t *Torrent) addrActive(addr string) bool { return false } -func (t *Torrent) worstConns(cl *Client) (wcs *worstConns) { - wcs = &worstConns{ - c: make([]*connection, 0, len(t.conns)), - t: t, - cl: cl, - } +func (t *Torrent) worstConns() (wcs *worstConns) { + wcs = &worstConns{make([]*connection, 0, len(t.conns))} for _, c := range t.conns { if !c.closed.IsSet() { wcs.c = append(wcs.c, c) @@ -446,11 +443,7 @@ func (t *Torrent) writeStatus(w io.Writer, cl *Client) { fmt.Fprintf(w, "Pending peers: %d\n", len(t.peers)) fmt.Fprintf(w, "Half open: %d\n", len(t.halfOpen)) fmt.Fprintf(w, "Active peers: %d\n", len(t.conns)) - sort.Sort(&worstConns{ - c: t.conns, - t: t, - cl: cl, - }) + sort.Sort(&worstConns{t.conns}) for i, c := range t.conns { fmt.Fprintf(w, "%2d. ", i+1) c.WriteStatus(w, t) @@ -740,15 +733,15 @@ func (t *Torrent) extentPieces(off, _len int64) (pieces []int) { return } -func (t *Torrent) worstBadConn(cl *Client) *connection { - wcs := t.worstConns(cl) +func (t *Torrent) worstBadConn() *connection { + wcs := t.worstConns() heap.Init(wcs) for wcs.Len() != 0 { c := heap.Pop(wcs).(*connection) if c.UnwantedChunksReceived >= 6 && c.UnwantedChunksReceived > c.UsefulChunksReceived { return c } - if wcs.Len() >= (socketsPerTorrent+1)/2 { + if wcs.Len() >= (t.maxEstablishedConns+1)/2 { // Give connections 1 minute to prove themselves. if time.Since(c.completedHandshake) > time.Minute { return c @@ -1273,3 +1266,47 @@ func (t *Torrent) addPeers(peers []Peer) { func (t *Torrent) Stats() TorrentStats { return t.stats } + +// Returns true if the connection is added. +func (t *Torrent) addConnection(c *connection) bool { + if t.cl.closed.IsSet() { + return false + } + if !t.wantConns() { + return false + } + for _, c0 := range t.conns { + if c.PeerID == c0.PeerID { + // Already connected to a client with that ID. + duplicateClientConns.Add(1) + return false + } + } + if len(t.conns) >= t.maxEstablishedConns { + c := t.worstBadConn() + if c == nil { + return false + } + if t.cl.config.Debug && missinggo.CryHeard() { + log.Printf("%s: dropping connection to make room for new one:\n %s", t, c) + } + c.Close() + t.deleteConnection(c) + } + if len(t.conns) >= t.maxEstablishedConns { + panic(len(t.conns)) + } + t.conns = append(t.conns, c) + c.t = t + return true +} + +func (t *Torrent) wantConns() bool { + if !t.seeding() && !t.needData() { + return false + } + if len(t.conns) < t.maxEstablishedConns { + return true + } + return t.worstBadConn() != nil +} diff --git a/worst_conns.go b/worst_conns.go index 181b9ccc..2abc8f09 100644 --- a/worst_conns.go +++ b/worst_conns.go @@ -4,11 +4,9 @@ import ( "time" ) -// Implements heap functions such that [0] is the worst connection. +// Implements a heap of connections by how useful they are or have been. type worstConns struct { - c []*connection - t *Torrent - cl *Client + c []*connection } func (wc *worstConns) Len() int { return len(wc.c) } @@ -44,8 +42,8 @@ func (wc worstConnsSortKey) Less(other worstConnsSortKey) bool { func (wc *worstConns) key(i int) (key worstConnsSortKey) { c := wc.c[i] - key.useful = wc.cl.usefulConn(wc.t, c) - if wc.t.seeding() { + key.useful = c.useful() + if c.t.seeding() { key.lastHelpful = c.lastChunkSent } // Intentionally consider the last time a chunk was received when seeding, -- 2.44.0