}
}
-// The worst connection is one that hasn't been sent, or sent anything useful for the longest. A bad
-// connection is one that usually sends us unwanted pieces, or has been in the worse half of the
-// established connections for more than a minute. This is O(n log n). If there was a way to not
-// consider the position of a conn relative to the total number, it could be reduced to O(n).
-func (t *Torrent) worstBadConn() (ret *PeerConn) {
- wcs := worseConnSlice{conns: t.appendUnclosedConns(getPeerConnSlice(len(t.conns)))}
- defer peerConnSlices.Put(wcs.conns)
- wcs.initKeys()
+// Calls the given function with a slice of unclosed conns. It uses a pool to reduce allocations as
+// this is a frequent occurrence.
+func (t *Torrent) withUnclosedConns(f func([]*PeerConn)) {
+ sl := t.appendUnclosedConns(getPeerConnSlice(len(t.conns)))
+ f(sl)
+ peerConnSlices.Put(sl)
+}
+
+func (t *Torrent) worstBadConnFromSlice(opts worseConnLensOpts, sl []*PeerConn) *PeerConn {
+ wcs := worseConnSlice{conns: sl}
+ wcs.initKeys(opts)
heap.Init(&wcs)
for wcs.Len() != 0 {
c := heap.Pop(&wcs).(*PeerConn)
+ if opts.incomingIsBad && !c.outgoing {
+ return c
+ }
+ if opts.outgoingIsBad && c.outgoing {
+ return c
+ }
if c._stats.ChunksReadWasted.Int64() >= 6 && c._stats.ChunksReadWasted.Int64() > c._stats.ChunksReadUseful.Int64() {
return c
}
return nil
}
+// The worst connection is one that hasn't been sent, or sent anything useful for the longest. A bad
+// connection is one that usually sends us unwanted pieces, or has been in the worse half of the
+// established connections for more than a minute. This is O(n log n). If there was a way to not
+// consider the position of a conn relative to the total number, it could be reduced to O(n).
+func (t *Torrent) worstBadConn(opts worseConnLensOpts) (ret *PeerConn) {
+ t.withUnclosedConns(func(ucs []*PeerConn) {
+ ret = t.worstBadConnFromSlice(opts, ucs)
+ })
+ return
+}
+
type PieceStateChange struct {
Index int
PieceState
return
}
+func (t *Torrent) numOutgoingConns() (ret int) {
+ for c := range t.conns {
+ if c.outgoing {
+ ret++
+ }
+ }
+ return
+}
+
func (t *Torrent) maxHalfOpen() int {
// Note that if we somehow exceed the maximum established conns, we want
// the negative value to have an effect.
extraIncoming := int64(t.numReceivedConns() - t.maxEstablishedConns/2)
// We want to allow some experimentation with new peers, and to try to
// upset an oversupply of received connections.
- return int(min(max(5, extraIncoming)+establishedHeadroom, int64(t.cl.config.HalfOpenConnsPerTorrent)))
+ return int(min(
+ max(5, extraIncoming)+establishedHeadroom,
+ int64(t.cl.config.HalfOpenConnsPerTorrent),
+ ))
}
func (t *Torrent) openNewConns() (initiated int) {
defer t.updateWantPeersEvent()
for t.peers.Len() != 0 {
- if !t.wantConns() {
+ if !t.wantOutgoingConns() {
return
}
if len(t.halfOpen) >= t.maxHalfOpen() {
if t.peers.Len() > t.cl.config.TorrentPeersLowWater {
return false
}
- return t.wantConns()
+ return t.wantOutgoingConns()
}
func (t *Torrent) updateWantPeersEvent() {
// We're also announcing ourselves as a listener, so we don't just want peer addresses.
// TODO: We can include the announce_peer step depending on whether we can receive
// inbound connections. We should probably only announce once every 15 mins too.
- if !t.wantConns() {
+ if !t.wantAnyConns() {
goto wait
}
// TODO: Determine if there's a listener on the port we're announcing.
}
}
if len(t.conns) >= t.maxEstablishedConns {
- c := t.worstBadConn()
+ numOutgoing := t.numOutgoingConns()
+ numIncoming := len(t.conns) - numOutgoing
+ c := t.worstBadConn(worseConnLensOpts{
+ // We've already established that we have too many connections at this point, so we just
+ // need to match what kind we have too many of vs. what we're trying to add now.
+ incomingIsBad: (numIncoming-numOutgoing > 1) && c.outgoing,
+ outgoingIsBad: (numOutgoing-numIncoming > 1) && !c.outgoing,
+ })
if c == nil {
- return errors.New("don't want conns")
+ return errors.New("don't want conn")
}
c.close()
t.deletePeerConn(c)
return nil
}
-func (t *Torrent) wantConns() bool {
+func (t *Torrent) newConnsAllowed() bool {
+ if !t.networkingEnabled.Bool() {
+ return false
+ }
+ if t.closed.IsSet() {
+ return false
+ }
+ if !t.needData() && (!t.seeding() || !t.haveAnyPieces()) {
+ return false
+ }
+ return true
+}
+
+func (t *Torrent) wantAnyConns() bool {
if !t.networkingEnabled.Bool() {
return false
}
if !t.needData() && (!t.seeding() || !t.haveAnyPieces()) {
return false
}
- return len(t.conns) < t.maxEstablishedConns || t.worstBadConn() != nil
+ return len(t.conns) < t.maxEstablishedConns
+}
+
+func (t *Torrent) wantOutgoingConns() bool {
+ if !t.newConnsAllowed() {
+ return false
+ }
+ if len(t.conns) < t.maxEstablishedConns {
+ return true
+ }
+ numIncomingConns := len(t.conns) - t.numOutgoingConns()
+ return t.worstBadConn(worseConnLensOpts{
+ incomingIsBad: numIncomingConns-t.numOutgoingConns() > 1,
+ outgoingIsBad: false,
+ }) != nil
+}
+
+func (t *Torrent) wantIncomingConns() bool {
+ if !t.newConnsAllowed() {
+ return false
+ }
+ if len(t.conns) < t.maxEstablishedConns {
+ return true
+ }
+ numIncomingConns := len(t.conns) - t.numOutgoingConns()
+ return t.worstBadConn(worseConnLensOpts{
+ incomingIsBad: false,
+ outgoingIsBad: t.numOutgoingConns()-numIncomingConns > 1,
+ }) != nil
}
func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
return true
}),
}
- wcs.initKeys()
+ wcs.initKeys(worseConnLensOpts{})
heap.Init(&wcs)
for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
t.dropConnection(heap.Pop(&wcs).(*PeerConn))
)
type worseConnInput struct {
+ BadDirection bool
Useful bool
LastHelpful time.Time
CompletedHandshake time.Time
me.getPeerPriorityOnce.Do(me.doGetPeerPriority)
}
-func worseConnInputFromPeer(p *Peer) worseConnInput {
+type worseConnLensOpts struct {
+ incomingIsBad, outgoingIsBad bool
+}
+
+func worseConnInputFromPeer(p *Peer, opts worseConnLensOpts) worseConnInput {
ret := worseConnInput{
Useful: p.useful(),
LastHelpful: p.lastHelpful(),
Pointer: uintptr(unsafe.Pointer(p)),
GetPeerPriority: p.peerPriority,
}
+ if opts.incomingIsBad && !p.outgoing {
+ ret.BadDirection = true
+ } else if opts.outgoingIsBad && p.outgoing {
+ ret.BadDirection = true
+ }
return ret
}
func worseConn(_l, _r *Peer) bool {
// TODO: Use generics for ptr to
- l := worseConnInputFromPeer(_l)
- r := worseConnInputFromPeer(_r)
+ l := worseConnInputFromPeer(_l, worseConnLensOpts{})
+ r := worseConnInputFromPeer(_r, worseConnLensOpts{})
return l.Less(&r)
}
func (l *worseConnInput) Less(r *worseConnInput) bool {
less, ok := multiless.New().Bool(
+ r.BadDirection, l.BadDirection).Bool(
l.Useful, r.Useful).CmpInt64(
l.LastHelpful.Sub(r.LastHelpful).Nanoseconds()).CmpInt64(
l.CompletedHandshake.Sub(r.CompletedHandshake).Nanoseconds()).LazySameLess(
keys []worseConnInput
}
-func (me *worseConnSlice) initKeys() {
+func (me *worseConnSlice) initKeys(opts worseConnLensOpts) {
me.keys = make([]worseConnInput, len(me.conns))
for i, c := range me.conns {
- me.keys[i] = worseConnInputFromPeer(&c.Peer)
+ me.keys[i] = worseConnInputFromPeer(&c.Peer, opts)
}
}