]> Sergey Matveev's repositories - btrtrc.git/commitdiff
dht: Limit the size of peer discovery backlog
authorMatt Joiner <anacrolix@gmail.com>
Tue, 9 Dec 2014 03:57:53 +0000 (21:57 -0600)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 9 Dec 2014 03:57:53 +0000 (21:57 -0600)
dht/dht.go
dht/getpeers.go

index 836f387fb988725ac584375fb1b8ab5ec2c90fd6..d688f770b9af79d25bd4ad91204fd20ba1a1c149 100644 (file)
@@ -156,7 +156,7 @@ func (n *Node) DefinitelyGood() bool {
        if len(n.id) != 20 {
                return false
        }
-       // No reason to think ill of them if they've never responded.
+       // No reason to think ill of them if they've never been queried.
        if n.lastSentQuery.IsZero() {
                return true
        }
index 6654636b18662065e47aa420a7fd6bbb8fc39460..39ea266f388d98834c65168e7d86d6361d7a0766 100644 (file)
@@ -17,7 +17,10 @@ type peerDiscovery struct {
        infoHash   string
 }
 
-const parallelQueries = 100
+const (
+       parallelQueries = 100
+       backlogMaxLen   = 10000
+)
 
 func (me *peerDiscovery) Close() {
        me.peerStream.Close()
@@ -43,12 +46,29 @@ func (s *Server) GetPeers(infoHash string) (*peerStream, error) {
                peerStream: &peerStream{
                        Values: make(chan peerStreamValue),
                        stop:   make(chan struct{}),
+                       values: make(chan peerStreamValue),
                },
                triedAddrs: make(map[string]struct{}, 500),
                backlog:    make(map[string]net.Addr, parallelQueries),
                server:     s,
                infoHash:   infoHash,
        }
+       // Function ferries from values to Values until discovery is halted.
+       go func() {
+               defer close(disc.Values)
+               for {
+                       select {
+                       case psv := <-disc.values:
+                               select {
+                               case disc.Values <- psv:
+                               case <-disc.stop:
+                                       return
+                               }
+                       case <-disc.stop:
+                               return
+                       }
+               }
+       }()
        disc.mu.Lock()
        for _, addr := range startAddrs {
                disc.contact(addr)
@@ -72,7 +92,9 @@ func (me *peerDiscovery) gotNodeAddr(addr net.Addr) {
                return
        }
        if me.pending >= parallelQueries {
-               me.backlog[addr.String()] = addr
+               if len(me.backlog) < backlogMaxLen {
+                       me.backlog[addr.String()] = addr
+               }
        } else {
                me.contact(addr)
        }
@@ -89,7 +111,6 @@ func (me *peerDiscovery) contact(addr net.Addr) {
 
 func (me *peerDiscovery) transactionClosed() {
        me.pending--
-       // log.Printf("pending: %d", me.pending)
        for key, addr := range me.backlog {
                if me.pending >= parallelQueries {
                        break
@@ -140,7 +161,7 @@ func (me *peerDiscovery) getPeers(addr net.Addr) error {
                                }()
                                copy(nodeInfo.ID[:], id)
                                select {
-                               case me.peerStream.Values <- peerStreamValue{
+                               case me.peerStream.values <- peerStreamValue{
                                        Peers:    vs,
                                        NodeInfo: nodeInfo,
                                }:
@@ -157,10 +178,6 @@ func (me *peerDiscovery) getPeers(addr net.Addr) error {
        return nil
 }
 
-func (me *peerDiscovery) streamValue(psv peerStreamValue) {
-       me.peerStream.Values <- psv
-}
-
 type peerStreamValue struct {
        Peers    []util.CompactPeer // Peers given in get_peers response.
        NodeInfo                    // The node that gave the response.
@@ -169,6 +186,8 @@ type peerStreamValue struct {
 type peerStream struct {
        mu     sync.Mutex
        Values chan peerStreamValue
+       // Inner chan is set to nil when on close.
+       values chan peerStreamValue
        stop   chan struct{}
 }
 
@@ -179,6 +198,5 @@ func (ps *peerStream) Close() {
        case <-ps.stop:
        default:
                close(ps.stop)
-               close(ps.Values)
        }
 }