From 79ab1ffe2b11dda5698350799392851691a5e169 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Sat, 22 Apr 2023 15:42:13 +1000 Subject: [PATCH] Try to balance incoming and outgoing conns per torrent --- client.go | 2 +- torrent.go | 114 +++++++++++++++++++++++++++++++++++++++++-------- worse-conns.go | 21 ++++++--- 3 files changed, 114 insertions(+), 23 deletions(-) diff --git a/client.go b/client.go index cc328cc6..b6fed302 100644 --- a/client.go +++ b/client.go @@ -459,7 +459,7 @@ func (cl *Client) wantConns() bool { return true } for _, t := range cl.torrents { - if t.wantConns() { + if t.wantIncomingConns() { return true } } diff --git a/torrent.go b/torrent.go index 637a9897..f8d31b64 100644 --- a/torrent.go +++ b/torrent.go @@ -1090,17 +1090,26 @@ func getPeerConnSlice(cap int) []*PeerConn { } } -// The worst connection is one that hasn't been sent, or sent anything useful for the longest. A bad -// connection is one that usually sends us unwanted pieces, or has been in the worse half of the -// established connections for more than a minute. This is O(n log n). If there was a way to not -// consider the position of a conn relative to the total number, it could be reduced to O(n). -func (t *Torrent) worstBadConn() (ret *PeerConn) { - wcs := worseConnSlice{conns: t.appendUnclosedConns(getPeerConnSlice(len(t.conns)))} - defer peerConnSlices.Put(wcs.conns) - wcs.initKeys() +// Calls the given function with a slice of unclosed conns. It uses a pool to reduce allocations as +// this is a frequent occurrence. +func (t *Torrent) withUnclosedConns(f func([]*PeerConn)) { + sl := t.appendUnclosedConns(getPeerConnSlice(len(t.conns))) + f(sl) + peerConnSlices.Put(sl) +} + +func (t *Torrent) worstBadConnFromSlice(opts worseConnLensOpts, sl []*PeerConn) *PeerConn { + wcs := worseConnSlice{conns: sl} + wcs.initKeys(opts) heap.Init(&wcs) for wcs.Len() != 0 { c := heap.Pop(&wcs).(*PeerConn) + if opts.incomingIsBad && !c.outgoing { + return c + } + if opts.outgoingIsBad && c.outgoing { + return c + } if c._stats.ChunksReadWasted.Int64() >= 6 && c._stats.ChunksReadWasted.Int64() > c._stats.ChunksReadUseful.Int64() { return c } @@ -1116,6 +1125,17 @@ func (t *Torrent) worstBadConn() (ret *PeerConn) { return nil } +// The worst connection is one that hasn't been sent, or sent anything useful for the longest. A bad +// connection is one that usually sends us unwanted pieces, or has been in the worse half of the +// established connections for more than a minute. This is O(n log n). If there was a way to not +// consider the position of a conn relative to the total number, it could be reduced to O(n). +func (t *Torrent) worstBadConn(opts worseConnLensOpts) (ret *PeerConn) { + t.withUnclosedConns(func(ucs []*PeerConn) { + ret = t.worstBadConnFromSlice(opts, ucs) + }) + return +} + type PieceStateChange struct { Index int PieceState @@ -1303,6 +1323,15 @@ func (t *Torrent) numReceivedConns() (ret int) { return } +func (t *Torrent) numOutgoingConns() (ret int) { + for c := range t.conns { + if c.outgoing { + ret++ + } + } + return +} + func (t *Torrent) maxHalfOpen() int { // Note that if we somehow exceed the maximum established conns, we want // the negative value to have an effect. @@ -1310,13 +1339,16 @@ func (t *Torrent) maxHalfOpen() int { extraIncoming := int64(t.numReceivedConns() - t.maxEstablishedConns/2) // We want to allow some experimentation with new peers, and to try to // upset an oversupply of received connections. - return int(min(max(5, extraIncoming)+establishedHeadroom, int64(t.cl.config.HalfOpenConnsPerTorrent))) + return int(min( + max(5, extraIncoming)+establishedHeadroom, + int64(t.cl.config.HalfOpenConnsPerTorrent), + )) } func (t *Torrent) openNewConns() (initiated int) { defer t.updateWantPeersEvent() for t.peers.Len() != 0 { - if !t.wantConns() { + if !t.wantOutgoingConns() { return } if len(t.halfOpen) >= t.maxHalfOpen() { @@ -1542,7 +1574,7 @@ func (t *Torrent) wantPeers() bool { if t.peers.Len() > t.cl.config.TorrentPeersLowWater { return false } - return t.wantConns() + return t.wantOutgoingConns() } func (t *Torrent) updateWantPeersEvent() { @@ -1815,7 +1847,7 @@ func (t *Torrent) dhtAnnouncer(s DhtServer) { // We're also announcing ourselves as a listener, so we don't just want peer addresses. // TODO: We can include the announce_peer step depending on whether we can receive // inbound connections. We should probably only announce once every 15 mins too. - if !t.wantConns() { + if !t.wantAnyConns() { goto wait } // TODO: Determine if there's a listener on the port we're announcing. @@ -1933,9 +1965,16 @@ func (t *Torrent) addPeerConn(c *PeerConn) (err error) { } } if len(t.conns) >= t.maxEstablishedConns { - c := t.worstBadConn() + numOutgoing := t.numOutgoingConns() + numIncoming := len(t.conns) - numOutgoing + c := t.worstBadConn(worseConnLensOpts{ + // We've already established that we have too many connections at this point, so we just + // need to match what kind we have too many of vs. what we're trying to add now. + incomingIsBad: (numIncoming-numOutgoing > 1) && c.outgoing, + outgoingIsBad: (numOutgoing-numIncoming > 1) && !c.outgoing, + }) if c == nil { - return errors.New("don't want conns") + return errors.New("don't want conn") } c.close() t.deletePeerConn(c) @@ -1950,7 +1989,20 @@ func (t *Torrent) addPeerConn(c *PeerConn) (err error) { return nil } -func (t *Torrent) wantConns() bool { +func (t *Torrent) newConnsAllowed() bool { + if !t.networkingEnabled.Bool() { + return false + } + if t.closed.IsSet() { + return false + } + if !t.needData() && (!t.seeding() || !t.haveAnyPieces()) { + return false + } + return true +} + +func (t *Torrent) wantAnyConns() bool { if !t.networkingEnabled.Bool() { return false } @@ -1960,7 +2012,35 @@ func (t *Torrent) wantConns() bool { if !t.needData() && (!t.seeding() || !t.haveAnyPieces()) { return false } - return len(t.conns) < t.maxEstablishedConns || t.worstBadConn() != nil + return len(t.conns) < t.maxEstablishedConns +} + +func (t *Torrent) wantOutgoingConns() bool { + if !t.newConnsAllowed() { + return false + } + if len(t.conns) < t.maxEstablishedConns { + return true + } + numIncomingConns := len(t.conns) - t.numOutgoingConns() + return t.worstBadConn(worseConnLensOpts{ + incomingIsBad: numIncomingConns-t.numOutgoingConns() > 1, + outgoingIsBad: false, + }) != nil +} + +func (t *Torrent) wantIncomingConns() bool { + if !t.newConnsAllowed() { + return false + } + if len(t.conns) < t.maxEstablishedConns { + return true + } + numIncomingConns := len(t.conns) - t.numOutgoingConns() + return t.worstBadConn(worseConnLensOpts{ + incomingIsBad: false, + outgoingIsBad: t.numOutgoingConns()-numIncomingConns > 1, + }) != nil } func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) { @@ -1973,7 +2053,7 @@ func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) { return true }), } - wcs.initKeys() + wcs.initKeys(worseConnLensOpts{}) heap.Init(&wcs) for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 { t.dropConnection(heap.Pop(&wcs).(*PeerConn)) diff --git a/worse-conns.go b/worse-conns.go index b0e0b4f2..62895e69 100644 --- a/worse-conns.go +++ b/worse-conns.go @@ -11,6 +11,7 @@ import ( ) type worseConnInput struct { + BadDirection bool Useful bool LastHelpful time.Time CompletedHandshake time.Time @@ -29,7 +30,11 @@ func (me *worseConnInput) doGetPeerPriorityOnce() { me.getPeerPriorityOnce.Do(me.doGetPeerPriority) } -func worseConnInputFromPeer(p *Peer) worseConnInput { +type worseConnLensOpts struct { + incomingIsBad, outgoingIsBad bool +} + +func worseConnInputFromPeer(p *Peer, opts worseConnLensOpts) worseConnInput { ret := worseConnInput{ Useful: p.useful(), LastHelpful: p.lastHelpful(), @@ -37,18 +42,24 @@ func worseConnInputFromPeer(p *Peer) worseConnInput { Pointer: uintptr(unsafe.Pointer(p)), GetPeerPriority: p.peerPriority, } + if opts.incomingIsBad && !p.outgoing { + ret.BadDirection = true + } else if opts.outgoingIsBad && p.outgoing { + ret.BadDirection = true + } return ret } func worseConn(_l, _r *Peer) bool { // TODO: Use generics for ptr to - l := worseConnInputFromPeer(_l) - r := worseConnInputFromPeer(_r) + l := worseConnInputFromPeer(_l, worseConnLensOpts{}) + r := worseConnInputFromPeer(_r, worseConnLensOpts{}) return l.Less(&r) } func (l *worseConnInput) Less(r *worseConnInput) bool { less, ok := multiless.New().Bool( + r.BadDirection, l.BadDirection).Bool( l.Useful, r.Useful).CmpInt64( l.LastHelpful.Sub(r.LastHelpful).Nanoseconds()).CmpInt64( l.CompletedHandshake.Sub(r.CompletedHandshake).Nanoseconds()).LazySameLess( @@ -80,10 +91,10 @@ type worseConnSlice struct { keys []worseConnInput } -func (me *worseConnSlice) initKeys() { +func (me *worseConnSlice) initKeys(opts worseConnLensOpts) { me.keys = make([]worseConnInput, len(me.conns)) for i, c := range me.conns { - me.keys[i] = worseConnInputFromPeer(&c.Peer) + me.keys[i] = worseConnInputFromPeer(&c.Peer, opts) } } -- 2.44.0