]> Sergey Matveev's repositories - btrtrc.git/commitdiff
dht: Cap nodes to 10k, drop nodes that timeout if we're out of space
authorMatt Joiner <anacrolix@gmail.com>
Tue, 9 Dec 2014 01:09:11 +0000 (19:09 -0600)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 9 Dec 2014 01:09:11 +0000 (19:09 -0600)
dht/dht.go

index 99419eb20b853e43ff198377a0372d5b89ef3cbf..89b9ee24ccd37b3bb4277c0412f54d3e3dc01d21 100644 (file)
@@ -23,6 +23,8 @@ import (
        "github.com/anacrolix/libtorgo/bencode"
 )
 
+const maxNodes = 10000
+
 // Uniquely identifies a transaction to us.
 type transactionKey struct {
        RemoteAddr string // host:port
@@ -68,7 +70,7 @@ func (s *Server) Stats() (ss serverStats) {
        s.mu.Lock()
        defer s.mu.Unlock()
        for _, n := range s.nodes {
-               if n.Good() {
+               if n.DefinitelyGood() {
                        ss.NumGoodNodes++
                }
        }
@@ -150,7 +152,7 @@ func (n *Node) NodeInfo() (ret NodeInfo) {
        return
 }
 
-func (n *Node) Good() bool {
+func (n *Node) DefinitelyGood() bool {
        if len(n.id) != 20 {
                return false
        }
@@ -162,9 +164,6 @@ func (n *Node) Good() bool {
        if n.lastSentQuery.Before(n.lastGotResponse) {
                return true
        }
-       if time.Now().Sub(n.lastSentQuery) >= 2*time.Minute {
-               return false
-       }
        return true
 }
 
@@ -280,6 +279,11 @@ func (t *transaction) sendQuery() error {
 }
 
 func (t *transaction) timeout() {
+       go func() {
+               t.s.mu.Lock()
+               defer t.s.mu.Unlock()
+               t.s.nodeTimedOut(t.remoteAddr)
+       }()
        t.close()
 }
 
@@ -287,6 +291,7 @@ func (t *transaction) close() {
        if t.closing() {
                return
        }
+       t.queryPacket = nil
        close(t.Response)
        close(t.done)
        t.timer.Stop()
@@ -323,6 +328,7 @@ func (t *transaction) handleResponse(m Msg) {
        if t.onResponse != nil {
                t.onResponse(m)
        }
+       t.queryPacket = nil
        select {
        case t.Response <- m:
        default:
@@ -388,7 +394,7 @@ func (s *Server) processPacket(b []byte, addr dHTAddr) {
                                        return
                                }
                        }
-                       log.Printf("%s: received bad krpc message: %s: %q", s, err, b)
+                       log.Printf("%s: received bad krpc message from %s: %s: %q", s, addr, err, b)
                }()
                return
        }
@@ -544,10 +550,25 @@ func (s *Server) getNode(addr dHTAddr) (n *Node) {
                n = &Node{
                        addr: addr,
                }
-               s.nodes[addr.String()] = n
+               if len(s.nodes) < maxNodes {
+                       s.nodes[addr.String()] = n
+               }
        }
        return
 }
+func (s *Server) nodeTimedOut(addr dHTAddr) {
+       node, ok := s.nodes[addr.String()]
+       if !ok {
+               return
+       }
+       if node.DefinitelyGood() {
+               return
+       }
+       if len(s.nodes) < maxNodes {
+               return
+       }
+       delete(s.nodes, addr.String())
+}
 
 func (s *Server) writeToNode(b []byte, node dHTAddr) (err error) {
        if list := s.ipBlockList; list != nil {
@@ -682,7 +703,7 @@ func (s *Server) AnnouncePeer(port int, impliedPort bool, infoHash string) (err
        s.mu.Lock()
        defer s.mu.Unlock()
        for _, node := range s.closestNodes(160, infoHash, func(n *Node) bool {
-               return n.Good() && n.announceToken != ""
+               return n.announceToken != ""
        }) {
                err = s.announcePeer(node.addr, infoHash, port, node.announceToken, impliedPort)
                if err != nil {
@@ -950,7 +971,7 @@ func (s *Server) bootstrap() (err error) {
 
 func (s *Server) numGoodNodes() (num int) {
        for _, n := range s.nodes {
-               if n.Good() {
+               if n.DefinitelyGood() {
                        num++
                }
        }
@@ -1107,7 +1128,7 @@ func idDistance(a, b string) (ret bigIntDistance) {
 // }
 
 func (s *Server) closestGoodNodes(k int, targetID string) []*Node {
-       return s.closestNodes(k, targetID, func(n *Node) bool { return n.Good() })
+       return s.closestNodes(k, targetID, func(n *Node) bool { return n.DefinitelyGood() })
 }
 
 func (s *Server) closestNodes(k int, targetID string, filter func(*Node) bool) []*Node {