From 78eb25535da57f0135212327403f7bc27322060f Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Mon, 8 Dec 2014 19:09:11 -0600 Subject: [PATCH] dht: Cap nodes to 10k, drop nodes that timeout if we're out of space --- dht/dht.go | 41 +++++++++++++++++++++++++++++++---------- 1 file changed, 31 insertions(+), 10 deletions(-) diff --git a/dht/dht.go b/dht/dht.go index 99419eb2..89b9ee24 100644 --- a/dht/dht.go +++ b/dht/dht.go @@ -23,6 +23,8 @@ import ( "github.com/anacrolix/libtorgo/bencode" ) +const maxNodes = 10000 + // Uniquely identifies a transaction to us. type transactionKey struct { RemoteAddr string // host:port @@ -68,7 +70,7 @@ func (s *Server) Stats() (ss serverStats) { s.mu.Lock() defer s.mu.Unlock() for _, n := range s.nodes { - if n.Good() { + if n.DefinitelyGood() { ss.NumGoodNodes++ } } @@ -150,7 +152,7 @@ func (n *Node) NodeInfo() (ret NodeInfo) { return } -func (n *Node) Good() bool { +func (n *Node) DefinitelyGood() bool { if len(n.id) != 20 { return false } @@ -162,9 +164,6 @@ func (n *Node) Good() bool { if n.lastSentQuery.Before(n.lastGotResponse) { return true } - if time.Now().Sub(n.lastSentQuery) >= 2*time.Minute { - return false - } return true } @@ -280,6 +279,11 @@ func (t *transaction) sendQuery() error { } func (t *transaction) timeout() { + go func() { + t.s.mu.Lock() + defer t.s.mu.Unlock() + t.s.nodeTimedOut(t.remoteAddr) + }() t.close() } @@ -287,6 +291,7 @@ func (t *transaction) close() { if t.closing() { return } + t.queryPacket = nil close(t.Response) close(t.done) t.timer.Stop() @@ -323,6 +328,7 @@ func (t *transaction) handleResponse(m Msg) { if t.onResponse != nil { t.onResponse(m) } + t.queryPacket = nil select { case t.Response <- m: default: @@ -388,7 +394,7 @@ func (s *Server) processPacket(b []byte, addr dHTAddr) { return } } - log.Printf("%s: received bad krpc message: %s: %q", s, err, b) + log.Printf("%s: received bad krpc message from %s: %s: %q", s, addr, err, b) }() return } @@ -544,10 +550,25 @@ func (s *Server) getNode(addr dHTAddr) (n *Node) { n = &Node{ addr: addr, } - s.nodes[addr.String()] = n + if len(s.nodes) < maxNodes { + s.nodes[addr.String()] = n + } } return } +func (s *Server) nodeTimedOut(addr dHTAddr) { + node, ok := s.nodes[addr.String()] + if !ok { + return + } + if node.DefinitelyGood() { + return + } + if len(s.nodes) < maxNodes { + return + } + delete(s.nodes, addr.String()) +} func (s *Server) writeToNode(b []byte, node dHTAddr) (err error) { if list := s.ipBlockList; list != nil { @@ -682,7 +703,7 @@ func (s *Server) AnnouncePeer(port int, impliedPort bool, infoHash string) (err s.mu.Lock() defer s.mu.Unlock() for _, node := range s.closestNodes(160, infoHash, func(n *Node) bool { - return n.Good() && n.announceToken != "" + return n.announceToken != "" }) { err = s.announcePeer(node.addr, infoHash, port, node.announceToken, impliedPort) if err != nil { @@ -950,7 +971,7 @@ func (s *Server) bootstrap() (err error) { func (s *Server) numGoodNodes() (num int) { for _, n := range s.nodes { - if n.Good() { + if n.DefinitelyGood() { num++ } } @@ -1107,7 +1128,7 @@ func idDistance(a, b string) (ret bigIntDistance) { // } func (s *Server) closestGoodNodes(k int, targetID string) []*Node { - return s.closestNodes(k, targetID, func(n *Node) bool { return n.Good() }) + return s.closestNodes(k, targetID, func(n *Node) bool { return n.DefinitelyGood() }) } func (s *Server) closestNodes(k int, targetID string, filter func(*Node) bool) []*Node { -- 2.48.1