}
}()
cl = &Client{
- halfOpenLimit: socketsPerTorrent,
+ halfOpenLimit: defaultHalfOpenConnsPerTorrent,
config: *cfg,
defaultStorage: cfg.DefaultStorage,
dopplegangerAddrs: make(map[string]struct{}),
defer cl.mu.Unlock()
for {
for _, t := range cl.torrents {
- if cl.wantConns(t) {
+ if t.wantConns() {
return
}
}
c.rw,
}
completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
- if !cl.addConnection(t, c) {
+ if !t.addConnection(c) {
return
}
defer t.dropConnection(c)
}
}
-// Returns true if the connection is added.
-func (cl *Client) addConnection(t *Torrent, c *connection) bool {
- if cl.closed.IsSet() {
- return false
- }
- if !cl.wantConns(t) {
- return false
- }
- for _, c0 := range t.conns {
- if c.PeerID == c0.PeerID {
- // Already connected to a client with that ID.
- duplicateClientConns.Add(1)
- return false
- }
- }
- if len(t.conns) >= socketsPerTorrent {
- c := t.worstBadConn(cl)
- if c == nil {
- return false
- }
- if cl.config.Debug && missinggo.CryHeard() {
- log.Printf("%s: dropping connection to make room for new one:\n %s", t, c)
- }
- c.Close()
- t.deleteConnection(c)
- }
- if len(t.conns) >= socketsPerTorrent {
- panic(len(t.conns))
- }
- t.conns = append(t.conns, c)
- c.t = t
- return true
-}
-
-func (cl *Client) usefulConn(t *Torrent, c *connection) bool {
- if c.closed.IsSet() {
- return false
- }
- if !t.haveInfo() {
- return c.supportsExtension("ut_metadata")
- }
- if t.seeding() {
- return c.PeerInterested
- }
- return t.connHasWantedPieces(c)
-}
-
-func (cl *Client) wantConns(t *Torrent) bool {
- if !t.seeding() && !t.needData() {
- return false
- }
- if len(t.conns) < socketsPerTorrent {
- return true
- }
- return t.worstBadConn(cl) != nil
-}
-
func (cl *Client) openNewConns(t *Torrent) {
defer t.updateWantPeersEvent()
for len(t.peers) != 0 {
- if !cl.wantConns(t) {
+ if !t.wantConns() {
return
}
if len(t.halfOpen) >= cl.halfOpenLimit {
return false
}
-// Prepare a Torrent without any attachment to a Client. That means we can
-// initialize fields all fields that don't require the Client without locking
-// it.
+// Return a Torrent ready for insertion into a Client.
func (cl *Client) newTorrent(ih metainfo.Hash) (t *Torrent) {
t = &Torrent{
cl: cl,
halfOpen: make(map[string]struct{}),
pieceStateChanges: pubsub.NewPubSub(),
- storageOpener: cl.defaultStorage,
+ storageOpener: cl.defaultStorage,
+ maxEstablishedConns: defaultEstablishedConnsPerTorrent,
}
return
}
cn.stats.wroteBytes(b)
cn.t.stats.wroteBytes(b)
}
+
+// Returns whether the connection is currently useful to us. We're seeding and
+// they want data, we don't have metainfo and they can provide it, etc.
+func (c *connection) useful() bool {
+ t := c.t
+ if c.closed.IsSet() {
+ return false
+ }
+ if !t.haveInfo() {
+ return c.supportsExtension("ut_metadata")
+ }
+ if t.seeding() {
+ return c.PeerInterested
+ }
+ return t.connHasWantedPieces(c)
+}
// http://www.bittorrent.org/beps/bep_0005.html
defaultExtensionBytes = "\x00\x00\x00\x00\x00\x10\x00\x01"
- socketsPerTorrent = 80
- torrentPeersHighWater = 200
- torrentPeersLowWater = 50
+ defaultEstablishedConnsPerTorrent = 80
+ defaultHalfOpenConnsPerTorrent = 80
+ torrentPeersHighWater = 200
+ torrentPeersLowWater = 50
// Limit how long handshake can take. This is to reduce the lingering
// impact of a few bad apples. 4s loses 1% of successful handshakes that
// The info dict. nil if we don't have it (yet).
info *metainfo.InfoEx
// Active peer connections, running message stream loops.
- conns []*connection
+ conns []*connection
+ maxEstablishedConns int
// Set of addrs to which we're attempting to connect. Connections are
// half-open until all handshakes are completed.
halfOpen map[string]struct{}
return false
}
-func (t *Torrent) worstConns(cl *Client) (wcs *worstConns) {
- wcs = &worstConns{
- c: make([]*connection, 0, len(t.conns)),
- t: t,
- cl: cl,
- }
+func (t *Torrent) worstConns() (wcs *worstConns) {
+ wcs = &worstConns{make([]*connection, 0, len(t.conns))}
for _, c := range t.conns {
if !c.closed.IsSet() {
wcs.c = append(wcs.c, c)
fmt.Fprintf(w, "Pending peers: %d\n", len(t.peers))
fmt.Fprintf(w, "Half open: %d\n", len(t.halfOpen))
fmt.Fprintf(w, "Active peers: %d\n", len(t.conns))
- sort.Sort(&worstConns{
- c: t.conns,
- t: t,
- cl: cl,
- })
+ sort.Sort(&worstConns{t.conns})
for i, c := range t.conns {
fmt.Fprintf(w, "%2d. ", i+1)
c.WriteStatus(w, t)
return
}
-func (t *Torrent) worstBadConn(cl *Client) *connection {
- wcs := t.worstConns(cl)
+func (t *Torrent) worstBadConn() *connection {
+ wcs := t.worstConns()
heap.Init(wcs)
for wcs.Len() != 0 {
c := heap.Pop(wcs).(*connection)
if c.UnwantedChunksReceived >= 6 && c.UnwantedChunksReceived > c.UsefulChunksReceived {
return c
}
- if wcs.Len() >= (socketsPerTorrent+1)/2 {
+ if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
// Give connections 1 minute to prove themselves.
if time.Since(c.completedHandshake) > time.Minute {
return c
func (t *Torrent) Stats() TorrentStats {
return t.stats
}
+
+// Returns true if the connection is added.
+func (t *Torrent) addConnection(c *connection) bool {
+ if t.cl.closed.IsSet() {
+ return false
+ }
+ if !t.wantConns() {
+ return false
+ }
+ for _, c0 := range t.conns {
+ if c.PeerID == c0.PeerID {
+ // Already connected to a client with that ID.
+ duplicateClientConns.Add(1)
+ return false
+ }
+ }
+ if len(t.conns) >= t.maxEstablishedConns {
+ c := t.worstBadConn()
+ if c == nil {
+ return false
+ }
+ if t.cl.config.Debug && missinggo.CryHeard() {
+ log.Printf("%s: dropping connection to make room for new one:\n %s", t, c)
+ }
+ c.Close()
+ t.deleteConnection(c)
+ }
+ if len(t.conns) >= t.maxEstablishedConns {
+ panic(len(t.conns))
+ }
+ t.conns = append(t.conns, c)
+ c.t = t
+ return true
+}
+
+func (t *Torrent) wantConns() bool {
+ if !t.seeding() && !t.needData() {
+ return false
+ }
+ if len(t.conns) < t.maxEstablishedConns {
+ return true
+ }
+ return t.worstBadConn() != nil
+}
"time"
)
-// Implements heap functions such that [0] is the worst connection.
+// Implements a heap of connections by how useful they are or have been.
type worstConns struct {
- c []*connection
- t *Torrent
- cl *Client
+ c []*connection
}
func (wc *worstConns) Len() int { return len(wc.c) }
func (wc *worstConns) key(i int) (key worstConnsSortKey) {
c := wc.c[i]
- key.useful = wc.cl.usefulConn(wc.t, c)
- if wc.t.seeding() {
+ key.useful = c.useful()
+ if c.t.seeding() {
key.lastHelpful = c.lastChunkSent
}
// Intentionally consider the last time a chunk was received when seeding,