]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Replace pruning timer with as-required connection dropping
authorMatt Joiner <anacrolix@gmail.com>
Mon, 29 Jun 2015 14:45:26 +0000 (00:45 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 29 Jun 2015 14:45:26 +0000 (00:45 +1000)
client.go
torrent.go
torrent_test.go
worst_conns.go

index bdef0cc462f2d75a4baae36399931b25f07bc557..3128ca825a569868c59da8971b8d33aa05d9e68b 100644 (file)
--- a/client.go
+++ b/client.go
@@ -3,7 +3,6 @@ package torrent
 import (
        "bufio"
        "bytes"
-       "container/heap"
        "crypto/rand"
        "crypto/sha1"
        "encoding/hex"
@@ -1681,27 +1680,31 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error {
        }
 }
 
-func (me *Client) dropConnection(torrent *torrent, conn *connection) {
-       me.event.Broadcast()
-       for r := range conn.Requests {
-               me.connDeleteRequest(torrent, conn, r)
-       }
-       conn.Close()
-       for i0, c := range torrent.Conns {
-               if c != conn {
+// Returns true if connection is removed from torrent.Conns.
+func (me *Client) deleteConnection(t *torrent, c *connection) bool {
+       for i0, _c := range t.Conns {
+               if _c != c {
                        continue
                }
-               i1 := len(torrent.Conns) - 1
+               i1 := len(t.Conns) - 1
                if i0 != i1 {
-                       torrent.Conns[i0] = torrent.Conns[i1]
+                       t.Conns[i0] = t.Conns[i1]
                }
-               torrent.Conns = torrent.Conns[:i1]
-               me.openNewConns(torrent)
-               return
+               t.Conns = t.Conns[:i1]
+               return true
+       }
+       return false
+}
+
+func (me *Client) dropConnection(t *torrent, c *connection) {
+       me.event.Broadcast()
+       c.Close()
+       if me.deleteConnection(t, c) {
+               me.openNewConns(t)
        }
-       panic("connection not found")
 }
 
+// Returns true if the connection is added.
 func (me *Client) addConnection(t *torrent, c *connection) bool {
        if me.stopped() {
                return false
@@ -1721,13 +1724,19 @@ func (me *Client) addConnection(t *torrent, c *connection) bool {
                        return false
                }
        }
-       t.Conns = append(t.Conns, c)
-       // TODO: This should probably be done by a routine that kills off bad
-       // connections, and extra connections killed here instead.
-       if len(t.Conns) > socketsPerTorrent {
-               wcs := t.worstConnsHeap(me)
-               heap.Pop(wcs).(*connection).Close()
+       if len(t.Conns) >= socketsPerTorrent {
+               c := t.worstBadConn(me)
+               if c == nil {
+                       return false
+               }
+               log.Printf("%s: dropping connection to make room for new one: %s", t, c)
+               c.Close()
+               me.deleteConnection(t, c)
+       }
+       if len(t.Conns) >= socketsPerTorrent {
+               panic(len(t.Conns))
        }
+       t.Conns = append(t.Conns, c)
        return true
 }
 
@@ -1744,18 +1753,13 @@ func (t *torrent) needData() bool {
 }
 
 func (cl *Client) usefulConn(t *torrent, c *connection) bool {
-       // A 30 second grace for initial messages to go through.
-       if time.Since(c.completedHandshake) < 30*time.Second {
-               return true
+       select {
+       case <-c.closing:
+               return false
+       default:
        }
        if !t.haveInfo() {
-               if !c.supportsExtension("ut_metadata") {
-                       return false
-               }
-               if time.Since(c.completedHandshake) < 2*time.Minute {
-                       return true
-               }
-               return false
+               return c.supportsExtension("ut_metadata")
        }
        if cl.seeding(t) {
                return c.PeerInterested
@@ -1763,23 +1767,14 @@ func (cl *Client) usefulConn(t *torrent, c *connection) bool {
        return t.connHasWantedPieces(c)
 }
 
-func (t *torrent) numGoodConns(cl *Client) (num int) {
-       for _, c := range t.Conns {
-               if cl.usefulConn(t, c) {
-                       num++
-               }
-       }
-       return
-}
-
 func (me *Client) wantConns(t *torrent) bool {
        if !me.seeding(t) && !t.needData() {
                return false
        }
-       if t.numGoodConns(me) >= socketsPerTorrent {
-               return false
+       if len(t.Conns) < socketsPerTorrent {
+               return true
        }
-       return true
+       return t.worstBadConn(me) != nil
 }
 
 func (me *Client) openNewConns(t *torrent) {
@@ -2162,9 +2157,6 @@ func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (T Torrent, new bool, err er
        // From this point onwards, we can consider the torrent a part of the
        // client.
        if new {
-               t.pruneTimer = time.AfterFunc(0, func() {
-                       cl.pruneConnectionsUnlocked(T.torrent)
-               })
                if !cl.config.DisableTrackers {
                        go cl.announceTorrentTrackers(T.torrent)
                }
@@ -2175,35 +2167,6 @@ func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (T Torrent, new bool, err er
        return
 }
 
-// Prunes unused connections. This is required to make space to dial for
-// replacements.
-func (cl *Client) pruneConnectionsUnlocked(t *torrent) {
-       select {
-       case <-t.ceasingNetworking:
-               return
-       case <-t.closing:
-               return
-       default:
-       }
-       cl.mu.Lock()
-       license := len(t.Conns) - (socketsPerTorrent+1)/2
-       for _, c := range t.Conns {
-               if license <= 0 {
-                       break
-               }
-               if time.Now().Sub(c.lastUsefulChunkReceived) < time.Minute {
-                       continue
-               }
-               if time.Now().Sub(c.completedHandshake) < time.Minute {
-                       continue
-               }
-               c.Close()
-               license--
-       }
-       cl.mu.Unlock()
-       t.pruneTimer.Reset(pruneInterval)
-}
-
 func (me *Client) dropTorrent(infoHash InfoHash) (err error) {
        t, ok := me.torrents[infoHash]
        if !ok {
index 30afe1a9ecc4a9883abedf587eabe5f2836ef1f8..2d791aab455c811cfc3d30e9af2d8cd81ed621ef 100644 (file)
@@ -99,8 +99,6 @@ type torrent struct {
 
        // Closed when .Info is set.
        gotMetainfo chan struct{}
-
-       pruneTimer *time.Timer
 }
 
 func (t *torrent) pieceComplete(piece int) bool {
@@ -131,13 +129,19 @@ func (t *torrent) addrActive(addr string) bool {
        return false
 }
 
-func (t *torrent) worstConnsHeap(cl *Client) (wcs *worstConns) {
+func (t *torrent) worstConns(cl *Client) (wcs *worstConns) {
        wcs = &worstConns{
-               c:  append([]*connection{}, t.Conns...),
+               c:  make([]*connection, 0, len(t.Conns)),
                t:  t,
                cl: cl,
        }
-       heap.Init(wcs)
+       for _, c := range t.Conns {
+               select {
+               case <-c.closing:
+               default:
+                       wcs.c = append(wcs.c, c)
+               }
+       }
        return
 }
 
@@ -153,9 +157,6 @@ func (t *torrent) ceaseNetworking() {
        for _, c := range t.Conns {
                c.Close()
        }
-       if t.pruneTimer != nil {
-               t.pruneTimer.Stop()
-       }
 }
 
 func (t *torrent) addPeer(p Peer) {
@@ -728,3 +729,18 @@ func (t *torrent) extentPieces(off, _len int64) (pieces []int) {
        }
        return
 }
+
+func (t *torrent) worstBadConn(cl *Client) *connection {
+       wcs := t.worstConns(cl)
+       heap.Init(wcs)
+       // A connection can only be bad if it's in the worst half, rounded down.
+       for wcs.Len() > (socketsPerTorrent+1)/2 {
+               c := heap.Pop(wcs).(*connection)
+               // Give connections 1 minute to prove themselves.
+               if time.Since(c.completedHandshake) < time.Minute {
+                       continue
+               }
+               return c
+       }
+       return nil
+}
index fdc96903a0ccf33e44f04f83f0e933482eb59b88..6134e28df26fd177e6ffb98767fb9ad400edd11f 100644 (file)
@@ -3,7 +3,6 @@ package torrent
 import (
        "sync"
        "testing"
-       "time"
 
        "github.com/anacrolix/torrent/peer_protocol"
 )
@@ -46,7 +45,6 @@ func TestTorrentRequest(t *testing.T) {
 
 func TestTorrentDoubleClose(t *testing.T) {
        tt, err := newTorrent(InfoHash{})
-       tt.pruneTimer = time.NewTimer(0)
        if err != nil {
                t.Fatal(err)
        }
index 0d7b9d0ddf0d6b27079249cc1cca5e02647e61db..0f7aa3aa7842b2c5190b259067b8b08fba7f7716 100644 (file)
@@ -29,13 +29,17 @@ func (me *worstConns) Push(x interface{}) {
 type worstConnsSortKey struct {
        useful      bool
        lastHelpful time.Time
+       connected   time.Time
 }
 
 func (me worstConnsSortKey) Less(other worstConnsSortKey) bool {
        if me.useful != other.useful {
                return !me.useful
        }
-       return me.lastHelpful.Before(other.lastHelpful)
+       if !me.lastHelpful.Equal(other.lastHelpful) {
+               return me.lastHelpful.Before(other.lastHelpful)
+       }
+       return me.connected.Before(other.connected)
 }
 
 func (me *worstConns) key(i int) (key worstConnsSortKey) {
@@ -43,9 +47,13 @@ func (me *worstConns) key(i int) (key worstConnsSortKey) {
        key.useful = me.cl.usefulConn(me.t, c)
        if me.cl.seeding(me.t) {
                key.lastHelpful = c.lastChunkSent
-       } else {
+       }
+       // Intentionally consider the last time a chunk was received when seeding,
+       // because we might go from seeding back to leeching.
+       if c.lastUsefulChunkReceived.After(key.lastHelpful) {
                key.lastHelpful = c.lastUsefulChunkReceived
        }
+       key.connected = c.completedHandshake
        return
 }