]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Try to balance incoming and outgoing conns per torrent
authorMatt Joiner <anacrolix@gmail.com>
Sat, 22 Apr 2023 05:42:13 +0000 (15:42 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Sat, 29 Apr 2023 04:54:49 +0000 (14:54 +1000)
client.go
torrent.go
worse-conns.go

index cc328cc6567976b1635930467a5d295a529f530f..b6fed302f3a5dcca68ad1f0071cff0c05d62d440 100644 (file)
--- a/client.go
+++ b/client.go
@@ -459,7 +459,7 @@ func (cl *Client) wantConns() bool {
                return true
        }
        for _, t := range cl.torrents {
-               if t.wantConns() {
+               if t.wantIncomingConns() {
                        return true
                }
        }
index 637a9897b67761d602028cf02e4c4df97936abcd..f8d31b647d5a9949686d98a3f1c3007117935170 100644 (file)
@@ -1090,17 +1090,26 @@ func getPeerConnSlice(cap int) []*PeerConn {
        }
 }
 
-// The worst connection is one that hasn't been sent, or sent anything useful for the longest. A bad
-// connection is one that usually sends us unwanted pieces, or has been in the worse half of the
-// established connections for more than a minute. This is O(n log n). If there was a way to not
-// consider the position of a conn relative to the total number, it could be reduced to O(n).
-func (t *Torrent) worstBadConn() (ret *PeerConn) {
-       wcs := worseConnSlice{conns: t.appendUnclosedConns(getPeerConnSlice(len(t.conns)))}
-       defer peerConnSlices.Put(wcs.conns)
-       wcs.initKeys()
+// Calls the given function with a slice of unclosed conns. It uses a pool to reduce allocations as
+// this is a frequent occurrence.
+func (t *Torrent) withUnclosedConns(f func([]*PeerConn)) {
+       sl := t.appendUnclosedConns(getPeerConnSlice(len(t.conns)))
+       f(sl)
+       peerConnSlices.Put(sl)
+}
+
+func (t *Torrent) worstBadConnFromSlice(opts worseConnLensOpts, sl []*PeerConn) *PeerConn {
+       wcs := worseConnSlice{conns: sl}
+       wcs.initKeys(opts)
        heap.Init(&wcs)
        for wcs.Len() != 0 {
                c := heap.Pop(&wcs).(*PeerConn)
+               if opts.incomingIsBad && !c.outgoing {
+                       return c
+               }
+               if opts.outgoingIsBad && c.outgoing {
+                       return c
+               }
                if c._stats.ChunksReadWasted.Int64() >= 6 && c._stats.ChunksReadWasted.Int64() > c._stats.ChunksReadUseful.Int64() {
                        return c
                }
@@ -1116,6 +1125,17 @@ func (t *Torrent) worstBadConn() (ret *PeerConn) {
        return nil
 }
 
+// The worst connection is one that hasn't been sent, or sent anything useful for the longest. A bad
+// connection is one that usually sends us unwanted pieces, or has been in the worse half of the
+// established connections for more than a minute. This is O(n log n). If there was a way to not
+// consider the position of a conn relative to the total number, it could be reduced to O(n).
+func (t *Torrent) worstBadConn(opts worseConnLensOpts) (ret *PeerConn) {
+       t.withUnclosedConns(func(ucs []*PeerConn) {
+               ret = t.worstBadConnFromSlice(opts, ucs)
+       })
+       return
+}
+
 type PieceStateChange struct {
        Index int
        PieceState
@@ -1303,6 +1323,15 @@ func (t *Torrent) numReceivedConns() (ret int) {
        return
 }
 
+func (t *Torrent) numOutgoingConns() (ret int) {
+       for c := range t.conns {
+               if c.outgoing {
+                       ret++
+               }
+       }
+       return
+}
+
 func (t *Torrent) maxHalfOpen() int {
        // Note that if we somehow exceed the maximum established conns, we want
        // the negative value to have an effect.
@@ -1310,13 +1339,16 @@ func (t *Torrent) maxHalfOpen() int {
        extraIncoming := int64(t.numReceivedConns() - t.maxEstablishedConns/2)
        // We want to allow some experimentation with new peers, and to try to
        // upset an oversupply of received connections.
-       return int(min(max(5, extraIncoming)+establishedHeadroom, int64(t.cl.config.HalfOpenConnsPerTorrent)))
+       return int(min(
+               max(5, extraIncoming)+establishedHeadroom,
+               int64(t.cl.config.HalfOpenConnsPerTorrent),
+       ))
 }
 
 func (t *Torrent) openNewConns() (initiated int) {
        defer t.updateWantPeersEvent()
        for t.peers.Len() != 0 {
-               if !t.wantConns() {
+               if !t.wantOutgoingConns() {
                        return
                }
                if len(t.halfOpen) >= t.maxHalfOpen() {
@@ -1542,7 +1574,7 @@ func (t *Torrent) wantPeers() bool {
        if t.peers.Len() > t.cl.config.TorrentPeersLowWater {
                return false
        }
-       return t.wantConns()
+       return t.wantOutgoingConns()
 }
 
 func (t *Torrent) updateWantPeersEvent() {
@@ -1815,7 +1847,7 @@ func (t *Torrent) dhtAnnouncer(s DhtServer) {
                        // We're also announcing ourselves as a listener, so we don't just want peer addresses.
                        // TODO: We can include the announce_peer step depending on whether we can receive
                        // inbound connections. We should probably only announce once every 15 mins too.
-                       if !t.wantConns() {
+                       if !t.wantAnyConns() {
                                goto wait
                        }
                        // TODO: Determine if there's a listener on the port we're announcing.
@@ -1933,9 +1965,16 @@ func (t *Torrent) addPeerConn(c *PeerConn) (err error) {
                }
        }
        if len(t.conns) >= t.maxEstablishedConns {
-               c := t.worstBadConn()
+               numOutgoing := t.numOutgoingConns()
+               numIncoming := len(t.conns) - numOutgoing
+               c := t.worstBadConn(worseConnLensOpts{
+                       // We've already established that we have too many connections at this point, so we just
+                       // need to match what kind we have too many of vs. what we're trying to add now.
+                       incomingIsBad: (numIncoming-numOutgoing > 1) && c.outgoing,
+                       outgoingIsBad: (numOutgoing-numIncoming > 1) && !c.outgoing,
+               })
                if c == nil {
-                       return errors.New("don't want conns")
+                       return errors.New("don't want conn")
                }
                c.close()
                t.deletePeerConn(c)
@@ -1950,7 +1989,20 @@ func (t *Torrent) addPeerConn(c *PeerConn) (err error) {
        return nil
 }
 
-func (t *Torrent) wantConns() bool {
+func (t *Torrent) newConnsAllowed() bool {
+       if !t.networkingEnabled.Bool() {
+               return false
+       }
+       if t.closed.IsSet() {
+               return false
+       }
+       if !t.needData() && (!t.seeding() || !t.haveAnyPieces()) {
+               return false
+       }
+       return true
+}
+
+func (t *Torrent) wantAnyConns() bool {
        if !t.networkingEnabled.Bool() {
                return false
        }
@@ -1960,7 +2012,35 @@ func (t *Torrent) wantConns() bool {
        if !t.needData() && (!t.seeding() || !t.haveAnyPieces()) {
                return false
        }
-       return len(t.conns) < t.maxEstablishedConns || t.worstBadConn() != nil
+       return len(t.conns) < t.maxEstablishedConns
+}
+
+func (t *Torrent) wantOutgoingConns() bool {
+       if !t.newConnsAllowed() {
+               return false
+       }
+       if len(t.conns) < t.maxEstablishedConns {
+               return true
+       }
+       numIncomingConns := len(t.conns) - t.numOutgoingConns()
+       return t.worstBadConn(worseConnLensOpts{
+               incomingIsBad: numIncomingConns-t.numOutgoingConns() > 1,
+               outgoingIsBad: false,
+       }) != nil
+}
+
+func (t *Torrent) wantIncomingConns() bool {
+       if !t.newConnsAllowed() {
+               return false
+       }
+       if len(t.conns) < t.maxEstablishedConns {
+               return true
+       }
+       numIncomingConns := len(t.conns) - t.numOutgoingConns()
+       return t.worstBadConn(worseConnLensOpts{
+               incomingIsBad: false,
+               outgoingIsBad: t.numOutgoingConns()-numIncomingConns > 1,
+       }) != nil
 }
 
 func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
@@ -1973,7 +2053,7 @@ func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
                        return true
                }),
        }
-       wcs.initKeys()
+       wcs.initKeys(worseConnLensOpts{})
        heap.Init(&wcs)
        for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
                t.dropConnection(heap.Pop(&wcs).(*PeerConn))
index b0e0b4f21be7302297b54208bec3b17b20e0cea6..62895e69151cf3bbfffcd79f6616eee6a53df776 100644 (file)
@@ -11,6 +11,7 @@ import (
 )
 
 type worseConnInput struct {
+       BadDirection        bool
        Useful              bool
        LastHelpful         time.Time
        CompletedHandshake  time.Time
@@ -29,7 +30,11 @@ func (me *worseConnInput) doGetPeerPriorityOnce() {
        me.getPeerPriorityOnce.Do(me.doGetPeerPriority)
 }
 
-func worseConnInputFromPeer(p *Peer) worseConnInput {
+type worseConnLensOpts struct {
+       incomingIsBad, outgoingIsBad bool
+}
+
+func worseConnInputFromPeer(p *Peer, opts worseConnLensOpts) worseConnInput {
        ret := worseConnInput{
                Useful:             p.useful(),
                LastHelpful:        p.lastHelpful(),
@@ -37,18 +42,24 @@ func worseConnInputFromPeer(p *Peer) worseConnInput {
                Pointer:            uintptr(unsafe.Pointer(p)),
                GetPeerPriority:    p.peerPriority,
        }
+       if opts.incomingIsBad && !p.outgoing {
+               ret.BadDirection = true
+       } else if opts.outgoingIsBad && p.outgoing {
+               ret.BadDirection = true
+       }
        return ret
 }
 
 func worseConn(_l, _r *Peer) bool {
        // TODO: Use generics for ptr to
-       l := worseConnInputFromPeer(_l)
-       r := worseConnInputFromPeer(_r)
+       l := worseConnInputFromPeer(_l, worseConnLensOpts{})
+       r := worseConnInputFromPeer(_r, worseConnLensOpts{})
        return l.Less(&r)
 }
 
 func (l *worseConnInput) Less(r *worseConnInput) bool {
        less, ok := multiless.New().Bool(
+               r.BadDirection, l.BadDirection).Bool(
                l.Useful, r.Useful).CmpInt64(
                l.LastHelpful.Sub(r.LastHelpful).Nanoseconds()).CmpInt64(
                l.CompletedHandshake.Sub(r.CompletedHandshake).Nanoseconds()).LazySameLess(
@@ -80,10 +91,10 @@ type worseConnSlice struct {
        keys  []worseConnInput
 }
 
-func (me *worseConnSlice) initKeys() {
+func (me *worseConnSlice) initKeys(opts worseConnLensOpts) {
        me.keys = make([]worseConnInput, len(me.conns))
        for i, c := range me.conns {
-               me.keys[i] = worseConnInputFromPeer(&c.Peer)
+               me.keys[i] = worseConnInputFromPeer(&c.Peer, opts)
        }
 }