]> Sergey Matveev's repositories - btrtrc.git/blobdiff - torrent.go
Try to balance incoming and outgoing conns per torrent
[btrtrc.git] / torrent.go
index 637a9897b67761d602028cf02e4c4df97936abcd..f8d31b647d5a9949686d98a3f1c3007117935170 100644 (file)
@@ -1090,17 +1090,26 @@ func getPeerConnSlice(cap int) []*PeerConn {
        }
 }
 
-// The worst connection is one that hasn't been sent, or sent anything useful for the longest. A bad
-// connection is one that usually sends us unwanted pieces, or has been in the worse half of the
-// established connections for more than a minute. This is O(n log n). If there was a way to not
-// consider the position of a conn relative to the total number, it could be reduced to O(n).
-func (t *Torrent) worstBadConn() (ret *PeerConn) {
-       wcs := worseConnSlice{conns: t.appendUnclosedConns(getPeerConnSlice(len(t.conns)))}
-       defer peerConnSlices.Put(wcs.conns)
-       wcs.initKeys()
+// Calls the given function with a slice of unclosed conns. It uses a pool to reduce allocations as
+// this is a frequent occurrence.
+func (t *Torrent) withUnclosedConns(f func([]*PeerConn)) {
+       sl := t.appendUnclosedConns(getPeerConnSlice(len(t.conns)))
+       f(sl)
+       peerConnSlices.Put(sl)
+}
+
+func (t *Torrent) worstBadConnFromSlice(opts worseConnLensOpts, sl []*PeerConn) *PeerConn {
+       wcs := worseConnSlice{conns: sl}
+       wcs.initKeys(opts)
        heap.Init(&wcs)
        for wcs.Len() != 0 {
                c := heap.Pop(&wcs).(*PeerConn)
+               if opts.incomingIsBad && !c.outgoing {
+                       return c
+               }
+               if opts.outgoingIsBad && c.outgoing {
+                       return c
+               }
                if c._stats.ChunksReadWasted.Int64() >= 6 && c._stats.ChunksReadWasted.Int64() > c._stats.ChunksReadUseful.Int64() {
                        return c
                }
@@ -1116,6 +1125,17 @@ func (t *Torrent) worstBadConn() (ret *PeerConn) {
        return nil
 }
 
+// The worst connection is one that hasn't been sent, or sent anything useful for the longest. A bad
+// connection is one that usually sends us unwanted pieces, or has been in the worse half of the
+// established connections for more than a minute. This is O(n log n). If there was a way to not
+// consider the position of a conn relative to the total number, it could be reduced to O(n).
+func (t *Torrent) worstBadConn(opts worseConnLensOpts) (ret *PeerConn) {
+       t.withUnclosedConns(func(ucs []*PeerConn) {
+               ret = t.worstBadConnFromSlice(opts, ucs)
+       })
+       return
+}
+
 type PieceStateChange struct {
        Index int
        PieceState
@@ -1303,6 +1323,15 @@ func (t *Torrent) numReceivedConns() (ret int) {
        return
 }
 
+func (t *Torrent) numOutgoingConns() (ret int) {
+       for c := range t.conns {
+               if c.outgoing {
+                       ret++
+               }
+       }
+       return
+}
+
 func (t *Torrent) maxHalfOpen() int {
        // Note that if we somehow exceed the maximum established conns, we want
        // the negative value to have an effect.
@@ -1310,13 +1339,16 @@ func (t *Torrent) maxHalfOpen() int {
        extraIncoming := int64(t.numReceivedConns() - t.maxEstablishedConns/2)
        // We want to allow some experimentation with new peers, and to try to
        // upset an oversupply of received connections.
-       return int(min(max(5, extraIncoming)+establishedHeadroom, int64(t.cl.config.HalfOpenConnsPerTorrent)))
+       return int(min(
+               max(5, extraIncoming)+establishedHeadroom,
+               int64(t.cl.config.HalfOpenConnsPerTorrent),
+       ))
 }
 
 func (t *Torrent) openNewConns() (initiated int) {
        defer t.updateWantPeersEvent()
        for t.peers.Len() != 0 {
-               if !t.wantConns() {
+               if !t.wantOutgoingConns() {
                        return
                }
                if len(t.halfOpen) >= t.maxHalfOpen() {
@@ -1542,7 +1574,7 @@ func (t *Torrent) wantPeers() bool {
        if t.peers.Len() > t.cl.config.TorrentPeersLowWater {
                return false
        }
-       return t.wantConns()
+       return t.wantOutgoingConns()
 }
 
 func (t *Torrent) updateWantPeersEvent() {
@@ -1815,7 +1847,7 @@ func (t *Torrent) dhtAnnouncer(s DhtServer) {
                        // We're also announcing ourselves as a listener, so we don't just want peer addresses.
                        // TODO: We can include the announce_peer step depending on whether we can receive
                        // inbound connections. We should probably only announce once every 15 mins too.
-                       if !t.wantConns() {
+                       if !t.wantAnyConns() {
                                goto wait
                        }
                        // TODO: Determine if there's a listener on the port we're announcing.
@@ -1933,9 +1965,16 @@ func (t *Torrent) addPeerConn(c *PeerConn) (err error) {
                }
        }
        if len(t.conns) >= t.maxEstablishedConns {
-               c := t.worstBadConn()
+               numOutgoing := t.numOutgoingConns()
+               numIncoming := len(t.conns) - numOutgoing
+               c := t.worstBadConn(worseConnLensOpts{
+                       // We've already established that we have too many connections at this point, so we just
+                       // need to match what kind we have too many of vs. what we're trying to add now.
+                       incomingIsBad: (numIncoming-numOutgoing > 1) && c.outgoing,
+                       outgoingIsBad: (numOutgoing-numIncoming > 1) && !c.outgoing,
+               })
                if c == nil {
-                       return errors.New("don't want conns")
+                       return errors.New("don't want conn")
                }
                c.close()
                t.deletePeerConn(c)
@@ -1950,7 +1989,20 @@ func (t *Torrent) addPeerConn(c *PeerConn) (err error) {
        return nil
 }
 
-func (t *Torrent) wantConns() bool {
+func (t *Torrent) newConnsAllowed() bool {
+       if !t.networkingEnabled.Bool() {
+               return false
+       }
+       if t.closed.IsSet() {
+               return false
+       }
+       if !t.needData() && (!t.seeding() || !t.haveAnyPieces()) {
+               return false
+       }
+       return true
+}
+
+func (t *Torrent) wantAnyConns() bool {
        if !t.networkingEnabled.Bool() {
                return false
        }
@@ -1960,7 +2012,35 @@ func (t *Torrent) wantConns() bool {
        if !t.needData() && (!t.seeding() || !t.haveAnyPieces()) {
                return false
        }
-       return len(t.conns) < t.maxEstablishedConns || t.worstBadConn() != nil
+       return len(t.conns) < t.maxEstablishedConns
+}
+
+func (t *Torrent) wantOutgoingConns() bool {
+       if !t.newConnsAllowed() {
+               return false
+       }
+       if len(t.conns) < t.maxEstablishedConns {
+               return true
+       }
+       numIncomingConns := len(t.conns) - t.numOutgoingConns()
+       return t.worstBadConn(worseConnLensOpts{
+               incomingIsBad: numIncomingConns-t.numOutgoingConns() > 1,
+               outgoingIsBad: false,
+       }) != nil
+}
+
+func (t *Torrent) wantIncomingConns() bool {
+       if !t.newConnsAllowed() {
+               return false
+       }
+       if len(t.conns) < t.maxEstablishedConns {
+               return true
+       }
+       numIncomingConns := len(t.conns) - t.numOutgoingConns()
+       return t.worstBadConn(worseConnLensOpts{
+               incomingIsBad: false,
+               outgoingIsBad: t.numOutgoingConns()-numIncomingConns > 1,
+       }) != nil
 }
 
 func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
@@ -1973,7 +2053,7 @@ func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
                        return true
                }),
        }
-       wcs.initKeys()
+       wcs.initKeys(worseConnLensOpts{})
        heap.Init(&wcs)
        for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
                t.dropConnection(heap.Pop(&wcs).(*PeerConn))