import (
"bufio"
"bytes"
- "container/heap"
"crypto/rand"
"crypto/sha1"
"encoding/hex"
}
}
-func (me *Client) dropConnection(torrent *torrent, conn *connection) {
- me.event.Broadcast()
- for r := range conn.Requests {
- me.connDeleteRequest(torrent, conn, r)
- }
- conn.Close()
- for i0, c := range torrent.Conns {
- if c != conn {
+// Returns true if connection is removed from torrent.Conns.
+func (me *Client) deleteConnection(t *torrent, c *connection) bool {
+ for i0, _c := range t.Conns {
+ if _c != c {
continue
}
- i1 := len(torrent.Conns) - 1
+ i1 := len(t.Conns) - 1
if i0 != i1 {
- torrent.Conns[i0] = torrent.Conns[i1]
+ t.Conns[i0] = t.Conns[i1]
}
- torrent.Conns = torrent.Conns[:i1]
- me.openNewConns(torrent)
- return
+ t.Conns = t.Conns[:i1]
+ return true
+ }
+ return false
+}
+
+func (me *Client) dropConnection(t *torrent, c *connection) {
+ me.event.Broadcast()
+ c.Close()
+ if me.deleteConnection(t, c) {
+ me.openNewConns(t)
}
- panic("connection not found")
}
+// Returns true if the connection is added.
func (me *Client) addConnection(t *torrent, c *connection) bool {
if me.stopped() {
return false
return false
}
}
- t.Conns = append(t.Conns, c)
- // TODO: This should probably be done by a routine that kills off bad
- // connections, and extra connections killed here instead.
- if len(t.Conns) > socketsPerTorrent {
- wcs := t.worstConnsHeap(me)
- heap.Pop(wcs).(*connection).Close()
+ if len(t.Conns) >= socketsPerTorrent {
+ c := t.worstBadConn(me)
+ if c == nil {
+ return false
+ }
+ log.Printf("%s: dropping connection to make room for new one: %s", t, c)
+ c.Close()
+ me.deleteConnection(t, c)
+ }
+ if len(t.Conns) >= socketsPerTorrent {
+ panic(len(t.Conns))
}
+ t.Conns = append(t.Conns, c)
return true
}
}
func (cl *Client) usefulConn(t *torrent, c *connection) bool {
- // A 30 second grace for initial messages to go through.
- if time.Since(c.completedHandshake) < 30*time.Second {
- return true
+ select {
+ case <-c.closing:
+ return false
+ default:
}
if !t.haveInfo() {
- if !c.supportsExtension("ut_metadata") {
- return false
- }
- if time.Since(c.completedHandshake) < 2*time.Minute {
- return true
- }
- return false
+ return c.supportsExtension("ut_metadata")
}
if cl.seeding(t) {
return c.PeerInterested
return t.connHasWantedPieces(c)
}
-func (t *torrent) numGoodConns(cl *Client) (num int) {
- for _, c := range t.Conns {
- if cl.usefulConn(t, c) {
- num++
- }
- }
- return
-}
-
func (me *Client) wantConns(t *torrent) bool {
if !me.seeding(t) && !t.needData() {
return false
}
- if t.numGoodConns(me) >= socketsPerTorrent {
- return false
+ if len(t.Conns) < socketsPerTorrent {
+ return true
}
- return true
+ return t.worstBadConn(me) != nil
}
func (me *Client) openNewConns(t *torrent) {
// From this point onwards, we can consider the torrent a part of the
// client.
if new {
- t.pruneTimer = time.AfterFunc(0, func() {
- cl.pruneConnectionsUnlocked(T.torrent)
- })
if !cl.config.DisableTrackers {
go cl.announceTorrentTrackers(T.torrent)
}
return
}
-// Prunes unused connections. This is required to make space to dial for
-// replacements.
-func (cl *Client) pruneConnectionsUnlocked(t *torrent) {
- select {
- case <-t.ceasingNetworking:
- return
- case <-t.closing:
- return
- default:
- }
- cl.mu.Lock()
- license := len(t.Conns) - (socketsPerTorrent+1)/2
- for _, c := range t.Conns {
- if license <= 0 {
- break
- }
- if time.Now().Sub(c.lastUsefulChunkReceived) < time.Minute {
- continue
- }
- if time.Now().Sub(c.completedHandshake) < time.Minute {
- continue
- }
- c.Close()
- license--
- }
- cl.mu.Unlock()
- t.pruneTimer.Reset(pruneInterval)
-}
-
func (me *Client) dropTorrent(infoHash InfoHash) (err error) {
t, ok := me.torrents[infoHash]
if !ok {
// Closed when .Info is set.
gotMetainfo chan struct{}
-
- pruneTimer *time.Timer
}
func (t *torrent) pieceComplete(piece int) bool {
return false
}
-func (t *torrent) worstConnsHeap(cl *Client) (wcs *worstConns) {
+func (t *torrent) worstConns(cl *Client) (wcs *worstConns) {
wcs = &worstConns{
- c: append([]*connection{}, t.Conns...),
+ c: make([]*connection, 0, len(t.Conns)),
t: t,
cl: cl,
}
- heap.Init(wcs)
+ for _, c := range t.Conns {
+ select {
+ case <-c.closing:
+ default:
+ wcs.c = append(wcs.c, c)
+ }
+ }
return
}
for _, c := range t.Conns {
c.Close()
}
- if t.pruneTimer != nil {
- t.pruneTimer.Stop()
- }
}
func (t *torrent) addPeer(p Peer) {
}
return
}
+
+func (t *torrent) worstBadConn(cl *Client) *connection {
+ wcs := t.worstConns(cl)
+ heap.Init(wcs)
+ // A connection can only be bad if it's in the worst half, rounded down.
+ for wcs.Len() > (socketsPerTorrent+1)/2 {
+ c := heap.Pop(wcs).(*connection)
+ // Give connections 1 minute to prove themselves.
+ if time.Since(c.completedHandshake) < time.Minute {
+ continue
+ }
+ return c
+ }
+ return nil
+}
type worstConnsSortKey struct {
useful bool
lastHelpful time.Time
+ connected time.Time
}
func (me worstConnsSortKey) Less(other worstConnsSortKey) bool {
if me.useful != other.useful {
return !me.useful
}
- return me.lastHelpful.Before(other.lastHelpful)
+ if !me.lastHelpful.Equal(other.lastHelpful) {
+ return me.lastHelpful.Before(other.lastHelpful)
+ }
+ return me.connected.Before(other.connected)
}
func (me *worstConns) key(i int) (key worstConnsSortKey) {
key.useful = me.cl.usefulConn(me.t, c)
if me.cl.seeding(me.t) {
key.lastHelpful = c.lastChunkSent
- } else {
+ }
+ // Intentionally consider the last time a chunk was received when seeding,
+ // because we might go from seeding back to leeching.
+ if c.lastUsefulChunkReceived.After(key.lastHelpful) {
key.lastHelpful = c.lastUsefulChunkReceived
}
+ key.connected = c.completedHandshake
return
}