]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Prepare to allow max conns per torrent to be configured
authorMatt Joiner <anacrolix@gmail.com>
Tue, 5 Jul 2016 06:23:17 +0000 (16:23 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 5 Jul 2016 06:23:17 +0000 (16:23 +1000)
client.go
connection.go
global.go
torrent.go
worst_conns.go

index b83ef33d66993b70c84ebf8f06a9c1358d6eeb04..9a22320451a0e67e36e59a99b87349799af1a964 100644 (file)
--- a/client.go
+++ b/client.go
@@ -264,7 +264,7 @@ func NewClient(cfg *Config) (cl *Client, err error) {
                }
        }()
        cl = &Client{
-               halfOpenLimit:     socketsPerTorrent,
+               halfOpenLimit:     defaultHalfOpenConnsPerTorrent,
                config:            *cfg,
                defaultStorage:    cfg.DefaultStorage,
                dopplegangerAddrs: make(map[string]struct{}),
@@ -382,7 +382,7 @@ func (cl *Client) waitAccept() {
        defer cl.mu.Unlock()
        for {
                for _, t := range cl.torrents {
-                       if cl.wantConns(t) {
+                       if t.wantConns() {
                                return
                        }
                }
@@ -936,7 +936,7 @@ func (cl *Client) runHandshookConn(c *connection, t *Torrent) {
                c.rw,
        }
        completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
-       if !cl.addConnection(t, c) {
+       if !t.addConnection(c) {
                return
        }
        defer t.dropConnection(c)
@@ -1337,67 +1337,10 @@ func (cl *Client) connectionLoop(t *Torrent, c *connection) error {
        }
 }
 
-// Returns true if the connection is added.
-func (cl *Client) addConnection(t *Torrent, c *connection) bool {
-       if cl.closed.IsSet() {
-               return false
-       }
-       if !cl.wantConns(t) {
-               return false
-       }
-       for _, c0 := range t.conns {
-               if c.PeerID == c0.PeerID {
-                       // Already connected to a client with that ID.
-                       duplicateClientConns.Add(1)
-                       return false
-               }
-       }
-       if len(t.conns) >= socketsPerTorrent {
-               c := t.worstBadConn(cl)
-               if c == nil {
-                       return false
-               }
-               if cl.config.Debug && missinggo.CryHeard() {
-                       log.Printf("%s: dropping connection to make room for new one:\n    %s", t, c)
-               }
-               c.Close()
-               t.deleteConnection(c)
-       }
-       if len(t.conns) >= socketsPerTorrent {
-               panic(len(t.conns))
-       }
-       t.conns = append(t.conns, c)
-       c.t = t
-       return true
-}
-
-func (cl *Client) usefulConn(t *Torrent, c *connection) bool {
-       if c.closed.IsSet() {
-               return false
-       }
-       if !t.haveInfo() {
-               return c.supportsExtension("ut_metadata")
-       }
-       if t.seeding() {
-               return c.PeerInterested
-       }
-       return t.connHasWantedPieces(c)
-}
-
-func (cl *Client) wantConns(t *Torrent) bool {
-       if !t.seeding() && !t.needData() {
-               return false
-       }
-       if len(t.conns) < socketsPerTorrent {
-               return true
-       }
-       return t.worstBadConn(cl) != nil
-}
-
 func (cl *Client) openNewConns(t *Torrent) {
        defer t.updateWantPeersEvent()
        for len(t.peers) != 0 {
-               if !cl.wantConns(t) {
+               if !t.wantConns() {
                        return
                }
                if len(t.halfOpen) >= cl.halfOpenLimit {
@@ -1431,9 +1374,7 @@ func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
        return false
 }
 
-// Prepare a Torrent without any attachment to a Client. That means we can
-// initialize fields all fields that don't require the Client without locking
-// it.
+// Return a Torrent ready for insertion into a Client.
 func (cl *Client) newTorrent(ih metainfo.Hash) (t *Torrent) {
        t = &Torrent{
                cl:        cl,
@@ -1444,7 +1385,8 @@ func (cl *Client) newTorrent(ih metainfo.Hash) (t *Torrent) {
                halfOpen:          make(map[string]struct{}),
                pieceStateChanges: pubsub.NewPubSub(),
 
-               storageOpener: cl.defaultStorage,
+               storageOpener:       cl.defaultStorage,
+               maxEstablishedConns: defaultEstablishedConnsPerTorrent,
        }
        return
 }
index 7f8e10b41d6254f87ffc0b723e0e0a0aa54b4e19..1f8d3e749b5d2ebdbcaed830eaa25e11ce52c075 100644 (file)
@@ -653,3 +653,19 @@ func (cn *connection) wroteBytes(b []byte) {
        cn.stats.wroteBytes(b)
        cn.t.stats.wroteBytes(b)
 }
+
+// Returns whether the connection is currently useful to us. We're seeding and
+// they want data, we don't have metainfo and they can provide it, etc.
+func (c *connection) useful() bool {
+       t := c.t
+       if c.closed.IsSet() {
+               return false
+       }
+       if !t.haveInfo() {
+               return c.supportsExtension("ut_metadata")
+       }
+       if t.seeding() {
+               return c.PeerInterested
+       }
+       return t.connHasWantedPieces(c)
+}
index f938baa282d6fa87f39381446e805f28be8b31ad..0d2411283568081ead4d4e04ecba14f83691c696 100644 (file)
--- a/global.go
+++ b/global.go
@@ -36,9 +36,10 @@ const (
        // http://www.bittorrent.org/beps/bep_0005.html
        defaultExtensionBytes = "\x00\x00\x00\x00\x00\x10\x00\x01"
 
-       socketsPerTorrent     = 80
-       torrentPeersHighWater = 200
-       torrentPeersLowWater  = 50
+       defaultEstablishedConnsPerTorrent = 80
+       defaultHalfOpenConnsPerTorrent    = 80
+       torrentPeersHighWater             = 200
+       torrentPeersLowWater              = 50
 
        // Limit how long handshake can take. This is to reduce the lingering
        // impact of a few bad apples. 4s loses 1% of successful handshakes that
index 6af85be70891e424599a426342b50e2a1df277cf..50441879d0c7f085a4be9cf3898a027553e743d1 100644 (file)
@@ -61,7 +61,8 @@ type Torrent struct {
        // The info dict. nil if we don't have it (yet).
        info *metainfo.InfoEx
        // Active peer connections, running message stream loops.
-       conns []*connection
+       conns               []*connection
+       maxEstablishedConns int
        // Set of addrs to which we're attempting to connect. Connections are
        // half-open until all handshakes are completed.
        halfOpen map[string]struct{}
@@ -133,12 +134,8 @@ func (t *Torrent) addrActive(addr string) bool {
        return false
 }
 
-func (t *Torrent) worstConns(cl *Client) (wcs *worstConns) {
-       wcs = &worstConns{
-               c:  make([]*connection, 0, len(t.conns)),
-               t:  t,
-               cl: cl,
-       }
+func (t *Torrent) worstConns() (wcs *worstConns) {
+       wcs = &worstConns{make([]*connection, 0, len(t.conns))}
        for _, c := range t.conns {
                if !c.closed.IsSet() {
                        wcs.c = append(wcs.c, c)
@@ -446,11 +443,7 @@ func (t *Torrent) writeStatus(w io.Writer, cl *Client) {
        fmt.Fprintf(w, "Pending peers: %d\n", len(t.peers))
        fmt.Fprintf(w, "Half open: %d\n", len(t.halfOpen))
        fmt.Fprintf(w, "Active peers: %d\n", len(t.conns))
-       sort.Sort(&worstConns{
-               c:  t.conns,
-               t:  t,
-               cl: cl,
-       })
+       sort.Sort(&worstConns{t.conns})
        for i, c := range t.conns {
                fmt.Fprintf(w, "%2d. ", i+1)
                c.WriteStatus(w, t)
@@ -740,15 +733,15 @@ func (t *Torrent) extentPieces(off, _len int64) (pieces []int) {
        return
 }
 
-func (t *Torrent) worstBadConn(cl *Client) *connection {
-       wcs := t.worstConns(cl)
+func (t *Torrent) worstBadConn() *connection {
+       wcs := t.worstConns()
        heap.Init(wcs)
        for wcs.Len() != 0 {
                c := heap.Pop(wcs).(*connection)
                if c.UnwantedChunksReceived >= 6 && c.UnwantedChunksReceived > c.UsefulChunksReceived {
                        return c
                }
-               if wcs.Len() >= (socketsPerTorrent+1)/2 {
+               if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
                        // Give connections 1 minute to prove themselves.
                        if time.Since(c.completedHandshake) > time.Minute {
                                return c
@@ -1273,3 +1266,47 @@ func (t *Torrent) addPeers(peers []Peer) {
 func (t *Torrent) Stats() TorrentStats {
        return t.stats
 }
+
+// Returns true if the connection is added.
+func (t *Torrent) addConnection(c *connection) bool {
+       if t.cl.closed.IsSet() {
+               return false
+       }
+       if !t.wantConns() {
+               return false
+       }
+       for _, c0 := range t.conns {
+               if c.PeerID == c0.PeerID {
+                       // Already connected to a client with that ID.
+                       duplicateClientConns.Add(1)
+                       return false
+               }
+       }
+       if len(t.conns) >= t.maxEstablishedConns {
+               c := t.worstBadConn()
+               if c == nil {
+                       return false
+               }
+               if t.cl.config.Debug && missinggo.CryHeard() {
+                       log.Printf("%s: dropping connection to make room for new one:\n    %s", t, c)
+               }
+               c.Close()
+               t.deleteConnection(c)
+       }
+       if len(t.conns) >= t.maxEstablishedConns {
+               panic(len(t.conns))
+       }
+       t.conns = append(t.conns, c)
+       c.t = t
+       return true
+}
+
+func (t *Torrent) wantConns() bool {
+       if !t.seeding() && !t.needData() {
+               return false
+       }
+       if len(t.conns) < t.maxEstablishedConns {
+               return true
+       }
+       return t.worstBadConn() != nil
+}
index 181b9ccc1b935ff2ddba6e744e4e76d31f7e12d0..2abc8f09db3b4a76c0e57cc6d6c426da9847e8e5 100644 (file)
@@ -4,11 +4,9 @@ import (
        "time"
 )
 
-// Implements heap functions such that [0] is the worst connection.
+// Implements a heap of connections by how useful they are or have been.
 type worstConns struct {
-       c  []*connection
-       t  *Torrent
-       cl *Client
+       c []*connection
 }
 
 func (wc *worstConns) Len() int      { return len(wc.c) }
@@ -44,8 +42,8 @@ func (wc worstConnsSortKey) Less(other worstConnsSortKey) bool {
 
 func (wc *worstConns) key(i int) (key worstConnsSortKey) {
        c := wc.c[i]
-       key.useful = wc.cl.usefulConn(wc.t, c)
-       if wc.t.seeding() {
+       key.useful = c.useful()
+       if c.t.seeding() {
                key.lastHelpful = c.lastChunkSent
        }
        // Intentionally consider the last time a chunk was received when seeding,