From ed08bd2837dda871d555978f570af08517e619aa Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 28 Aug 2014 10:06:36 +1000 Subject: [PATCH] Introduce socket/torrent limits, work in progress --- client.go | 22 +++++++++++++++++----- torrent.go | 13 +++++++++++++ 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/client.go b/client.go index 0ffd9301..2dc4acfa 100644 --- a/client.go +++ b/client.go @@ -17,6 +17,7 @@ package torrent import ( "bufio" + "container/heap" "crypto/rand" "crypto/sha1" "errors" @@ -55,11 +56,15 @@ var ( duplicateConnsAvoided = expvar.NewInt("duplicateConnsAvoided") ) -// Justification for set bits follows. -// -// Extension protocol: http://www.bittorrent.org/beps/bep_0010.html -// DHT: http://www.bittorrent.org/beps/bep_0005.html -const extensionBytes = "\x00\x00\x00\x00\x00\x10\x00\x01" +const ( + // Justification for set bits follows. + // + // Extension protocol: http://www.bittorrent.org/beps/bep_0010.html + // DHT: http://www.bittorrent.org/beps/bep_0005.html + extensionBytes = "\x00\x00\x00\x00\x00\x10\x00\x01" + + socketsPerTorrent = 40 +) // Currently doesn't really queue, but should in the future. func (cl *Client) queuePieceCheck(t *torrent, pieceIndex pp.Integer) { @@ -969,6 +974,10 @@ func (me *Client) addConnection(t *torrent, c *connection) bool { } } t.Conns = append(t.Conns, c) + if len(t.Conns) > socketsPerTorrent { + wcs := t.worstConnsHeap() + heap.Pop(wcs).(*connection).Close() + } return true } @@ -983,6 +992,9 @@ func (me *Client) openNewConns() { if me.halfOpen >= me.halfOpenLimit { return } + if me.halfOpen+me.handshaking+len(t.Conns) >= socketsPerTorrent { + break + } var ( k peersKey p Peer diff --git a/torrent.go b/torrent.go index 1dc6cd6c..8274091c 100644 --- a/torrent.go +++ b/torrent.go @@ -1,11 +1,13 @@ package torrent import ( + "container/heap" "container/list" "fmt" "io" "log" "net" + "sort" "sync" "bitbucket.org/anacrolix/go.torrent/util" @@ -67,6 +69,14 @@ type torrent struct { metadataHave []bool } +func (t *torrent) worstConnsHeap() (wcs *worstConnsHeap) { + wcs = new(worstConnsHeap) + *wcs = make([]*connection, 0, len(t.Conns)) + *wcs = append(*wcs, t.Conns...) + heap.Init(wcs) + return +} + func (t *torrent) CeaseNetworking() { t.stateMu.Lock() defer t.stateMu.Unlock() @@ -283,6 +293,9 @@ func (t *torrent) WriteStatus(w io.Writer) { fmt.Fprintln(w) fmt.Fprintf(w, "Pending peers: %d\n", len(t.Peers)) fmt.Fprintf(w, "Active peers: %d\n", len(t.Conns)) + wcs := new(worstConnsHeap) + *wcs = t.Conns + sort.Sort(wcs) for _, c := range t.Conns { c.WriteStatus(w) } -- 2.48.1