From: Matt Joiner Date: Tue, 9 Dec 2014 03:57:53 +0000 (-0600) Subject: dht: Limit the size of peer discovery backlog X-Git-Tag: v1.0.0~1399 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=fa501ed06a8f696f0a249cfc4a6628dc9825ab66;p=btrtrc.git dht: Limit the size of peer discovery backlog --- diff --git a/dht/dht.go b/dht/dht.go index 836f387f..d688f770 100644 --- a/dht/dht.go +++ b/dht/dht.go @@ -156,7 +156,7 @@ func (n *Node) DefinitelyGood() bool { if len(n.id) != 20 { return false } - // No reason to think ill of them if they've never responded. + // No reason to think ill of them if they've never been queried. if n.lastSentQuery.IsZero() { return true } diff --git a/dht/getpeers.go b/dht/getpeers.go index 6654636b..39ea266f 100644 --- a/dht/getpeers.go +++ b/dht/getpeers.go @@ -17,7 +17,10 @@ type peerDiscovery struct { infoHash string } -const parallelQueries = 100 +const ( + parallelQueries = 100 + backlogMaxLen = 10000 +) func (me *peerDiscovery) Close() { me.peerStream.Close() @@ -43,12 +46,29 @@ func (s *Server) GetPeers(infoHash string) (*peerStream, error) { peerStream: &peerStream{ Values: make(chan peerStreamValue), stop: make(chan struct{}), + values: make(chan peerStreamValue), }, triedAddrs: make(map[string]struct{}, 500), backlog: make(map[string]net.Addr, parallelQueries), server: s, infoHash: infoHash, } + // Function ferries from values to Values until discovery is halted. + go func() { + defer close(disc.Values) + for { + select { + case psv := <-disc.values: + select { + case disc.Values <- psv: + case <-disc.stop: + return + } + case <-disc.stop: + return + } + } + }() disc.mu.Lock() for _, addr := range startAddrs { disc.contact(addr) @@ -72,7 +92,9 @@ func (me *peerDiscovery) gotNodeAddr(addr net.Addr) { return } if me.pending >= parallelQueries { - me.backlog[addr.String()] = addr + if len(me.backlog) < backlogMaxLen { + me.backlog[addr.String()] = addr + } } else { me.contact(addr) } @@ -89,7 +111,6 @@ func (me *peerDiscovery) contact(addr net.Addr) { func (me *peerDiscovery) transactionClosed() { me.pending-- - // log.Printf("pending: %d", me.pending) for key, addr := range me.backlog { if me.pending >= parallelQueries { break @@ -140,7 +161,7 @@ func (me *peerDiscovery) getPeers(addr net.Addr) error { }() copy(nodeInfo.ID[:], id) select { - case me.peerStream.Values <- peerStreamValue{ + case me.peerStream.values <- peerStreamValue{ Peers: vs, NodeInfo: nodeInfo, }: @@ -157,10 +178,6 @@ func (me *peerDiscovery) getPeers(addr net.Addr) error { return nil } -func (me *peerDiscovery) streamValue(psv peerStreamValue) { - me.peerStream.Values <- psv -} - type peerStreamValue struct { Peers []util.CompactPeer // Peers given in get_peers response. NodeInfo // The node that gave the response. @@ -169,6 +186,8 @@ type peerStreamValue struct { type peerStream struct { mu sync.Mutex Values chan peerStreamValue + // Inner chan is set to nil when on close. + values chan peerStreamValue stop chan struct{} } @@ -179,6 +198,5 @@ func (ps *peerStream) Close() { case <-ps.stop: default: close(ps.stop) - close(ps.Values) } }