tt, new, err := cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
require.NoError(t, err)
assert.True(t, new)
+ tt.SetMaxEstablishedConns(0)
+ tt.SetMaxEstablishedConns(1)
tt.Drop()
}
}
return t.connHasWantedPieces(c)
}
+
+func (c *connection) lastHelpful() time.Time {
+ lasts := []time.Time{c.lastUsefulChunkReceived}
+ if c.t.seeding() {
+ lasts = append(lasts, c.lastChunkSent)
+ }
+ return missinggo.Max(time.Time.Before, missinggo.ConvertToSliceOfEmptyInterface(lasts)...).(time.Time)
+}
"math/rand"
"net"
"os"
- "sort"
"sync"
"time"
return false
}
-func (t *Torrent) worstConns() (wcs *worstConns) {
- wcs = &worstConns{make([]*connection, 0, len(t.conns))}
+func (t *Torrent) worstUnclosedConns() (ret []*connection) {
+ ret = make([]*connection, 0, len(t.conns))
for _, c := range t.conns {
if !c.closed.IsSet() {
- wcs.c = append(wcs.c, c)
+ ret = append(ret, c)
}
}
return
fmt.Fprintf(w, "Pending peers: %d\n", len(t.peers))
fmt.Fprintf(w, "Half open: %d\n", len(t.halfOpen))
fmt.Fprintf(w, "Active peers: %d\n", len(t.conns))
- sort.Sort(&worstConns{t.conns})
+ missinggo.Sort(t.conns, worseConn)
for i, c := range t.conns {
fmt.Fprintf(w, "%2d. ", i+1)
c.WriteStatus(w, t)
return
}
+// The worst connection is one that hasn't been sent, or sent anything useful
+// for the longest. A bad connection is one that usually sends us unwanted
+// pieces, or has been in worser half of the established connections for more
+// than a minute.
func (t *Torrent) worstBadConn() *connection {
- wcs := t.worstConns()
- heap.Init(wcs)
+ wcs := missinggo.HeapFromSlice(t.worstUnclosedConns(), worseConn)
for wcs.Len() != 0 {
c := heap.Pop(wcs).(*connection)
if c.UnwantedChunksReceived >= 6 && c.UnwantedChunksReceived > c.UsefulChunksReceived {
}
return t.worstBadConn() != nil
}
+
+func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
+ t.cl.mu.Lock()
+ defer t.cl.mu.Unlock()
+ oldMax = t.maxEstablishedConns
+ t.maxEstablishedConns = max
+ wcs := missinggo.HeapFromSlice(append([]*connection(nil), t.conns...), worseConn)
+ for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
+ t.dropConnection(wcs.Pop().(*connection))
+ }
+ t.openNewConns()
+ return oldMax
+}
package torrent
-import (
- "time"
-)
-
-// Implements a heap of connections by how useful they are or have been.
-type worstConns struct {
- c []*connection
-}
-
-func (wc *worstConns) Len() int { return len(wc.c) }
-func (wc *worstConns) Swap(i, j int) { wc.c[i], wc.c[j] = wc.c[j], wc.c[i] }
-
-func (wc *worstConns) Pop() (ret interface{}) {
- old := wc.c
- n := len(old)
- ret = old[n-1]
- wc.c = old[:n-1]
- return
-}
-
-func (wc *worstConns) Push(x interface{}) {
- wc.c = append(wc.c, x.(*connection))
-}
-
-type worstConnsSortKey struct {
- useful bool
- lastHelpful time.Time
- connected time.Time
-}
-
-func (wc worstConnsSortKey) Less(other worstConnsSortKey) bool {
- if wc.useful != other.useful {
- return !wc.useful
- }
- if !wc.lastHelpful.Equal(other.lastHelpful) {
- return wc.lastHelpful.Before(other.lastHelpful)
- }
- return wc.connected.Before(other.connected)
-}
-
-func (wc *worstConns) key(i int) (key worstConnsSortKey) {
- c := wc.c[i]
- key.useful = c.useful()
- if c.t.seeding() {
- key.lastHelpful = c.lastChunkSent
+func worseConn(l, r *connection) bool {
+ if l.useful() != r.useful() {
+ return r.useful()
}
- // Intentionally consider the last time a chunk was received when seeding,
- // because we might go from seeding back to leeching.
- if c.lastUsefulChunkReceived.After(key.lastHelpful) {
- key.lastHelpful = c.lastUsefulChunkReceived
+ if !l.lastHelpful().Equal(r.lastHelpful()) {
+ return l.lastHelpful().Before(r.lastHelpful())
}
- key.connected = c.completedHandshake
- return
-}
-
-func (wc worstConns) Less(i, j int) bool {
- return wc.key(i).Less(wc.key(j))
+ return l.completedHandshake.Before(r.completedHandshake)
}