]> Sergey Matveev's repositories - btrtrc.git/commitdiff
dht: Use a bloom filter to track contacted nodes during peer discovery
authorMatt Joiner <anacrolix@gmail.com>
Fri, 19 Dec 2014 23:10:22 +0000 (10:10 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Fri, 19 Dec 2014 23:10:22 +0000 (10:10 +1100)
dht/getpeers.go

index ed8b0e59db446e6436031db1098e63a67b4c0130..97cc131f5b9fc3f9e91d32ec42557975050649d9 100644 (file)
@@ -1,28 +1,22 @@
 package dht
 
 import (
+       "bitbucket.org/anacrolix/go.torrent/util"
+       "github.com/willf/bloom"
        "log"
        "net"
        "sync"
        "time"
-
-       "bitbucket.org/anacrolix/go.torrent/util"
 )
 
 type peerDiscovery struct {
        *peerStream
-       triedAddrs map[string]struct{}
-       backlog    map[string]net.Addr
+       triedAddrs *bloom.BloomFilter
        pending    int
        server     *Server
        infoHash   string
 }
 
-const (
-       parallelQueries = 100
-       backlogMaxLen   = 10000
-)
-
 func (me *peerDiscovery) Close() {
        me.peerStream.Close()
 }
@@ -51,8 +45,7 @@ func (s *Server) GetPeers(infoHash string) (*peerStream, error) {
                        stop:   make(chan struct{}),
                        values: make(chan peerStreamValue),
                },
-               triedAddrs: make(map[string]struct{}, 500),
-               backlog:    make(map[string]net.Addr, parallelQueries),
+               triedAddrs: bloom.NewWithEstimates(500000, 0.01),
                server:     s,
                infoHash:   infoHash,
        }
@@ -88,26 +81,17 @@ func (me *peerDiscovery) gotNodeAddr(addr net.Addr) {
                // Not a contactable address.
                return
        }
-       if me.server.ipBlocked(util.AddrIP(addr)) {
+       if me.triedAddrs.Test([]byte(addr.String())) {
                return
        }
-       if _, ok := me.triedAddrs[addr.String()]; ok {
-               return
-       }
-       if _, ok := me.backlog[addr.String()]; ok {
+       if me.server.ipBlocked(util.AddrIP(addr)) {
                return
        }
-       if me.pending >= parallelQueries {
-               if len(me.backlog) < backlogMaxLen {
-                       me.backlog[addr.String()] = addr
-               }
-       } else {
-               me.contact(addr)
-       }
+       me.contact(addr)
 }
 
 func (me *peerDiscovery) contact(addr net.Addr) {
-       me.triedAddrs[addr.String()] = struct{}{}
+       me.triedAddrs.Add([]byte(addr.String()))
        if err := me.getPeers(addr); err != nil {
                log.Printf("error sending get_peers request to %s: %s", addr, err)
                return
@@ -117,13 +101,6 @@ func (me *peerDiscovery) contact(addr net.Addr) {
 
 func (me *peerDiscovery) transactionClosed() {
        me.pending--
-       for key, addr := range me.backlog {
-               if me.pending >= parallelQueries {
-                       break
-               }
-               delete(me.backlog, key)
-               me.contact(addr)
-       }
        if me.pending == 0 {
                me.Close()
                return