From: Matt Joiner Date: Fri, 19 Dec 2014 23:10:22 +0000 (+1100) Subject: dht: Use a bloom filter to track contacted nodes during peer discovery X-Git-Tag: v1.0.0~1389 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=ec47c4f4dc80b1a7620a2634dc3511bd4ddb2443;p=btrtrc.git dht: Use a bloom filter to track contacted nodes during peer discovery --- diff --git a/dht/getpeers.go b/dht/getpeers.go index ed8b0e59..97cc131f 100644 --- a/dht/getpeers.go +++ b/dht/getpeers.go @@ -1,28 +1,22 @@ package dht import ( + "bitbucket.org/anacrolix/go.torrent/util" + "github.com/willf/bloom" "log" "net" "sync" "time" - - "bitbucket.org/anacrolix/go.torrent/util" ) type peerDiscovery struct { *peerStream - triedAddrs map[string]struct{} - backlog map[string]net.Addr + triedAddrs *bloom.BloomFilter pending int server *Server infoHash string } -const ( - parallelQueries = 100 - backlogMaxLen = 10000 -) - func (me *peerDiscovery) Close() { me.peerStream.Close() } @@ -51,8 +45,7 @@ func (s *Server) GetPeers(infoHash string) (*peerStream, error) { stop: make(chan struct{}), values: make(chan peerStreamValue), }, - triedAddrs: make(map[string]struct{}, 500), - backlog: make(map[string]net.Addr, parallelQueries), + triedAddrs: bloom.NewWithEstimates(500000, 0.01), server: s, infoHash: infoHash, } @@ -88,26 +81,17 @@ func (me *peerDiscovery) gotNodeAddr(addr net.Addr) { // Not a contactable address. return } - if me.server.ipBlocked(util.AddrIP(addr)) { + if me.triedAddrs.Test([]byte(addr.String())) { return } - if _, ok := me.triedAddrs[addr.String()]; ok { - return - } - if _, ok := me.backlog[addr.String()]; ok { + if me.server.ipBlocked(util.AddrIP(addr)) { return } - if me.pending >= parallelQueries { - if len(me.backlog) < backlogMaxLen { - me.backlog[addr.String()] = addr - } - } else { - me.contact(addr) - } + me.contact(addr) } func (me *peerDiscovery) contact(addr net.Addr) { - me.triedAddrs[addr.String()] = struct{}{} + me.triedAddrs.Add([]byte(addr.String())) if err := me.getPeers(addr); err != nil { log.Printf("error sending get_peers request to %s: %s", addr, err) return @@ -117,13 +101,6 @@ func (me *peerDiscovery) contact(addr net.Addr) { func (me *peerDiscovery) transactionClosed() { me.pending-- - for key, addr := range me.backlog { - if me.pending >= parallelQueries { - break - } - delete(me.backlog, key) - me.contact(addr) - } if me.pending == 0 { me.Close() return