]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Add dht AnnouncePeer
authorMatt Joiner <anacrolix@gmail.com>
Sun, 16 Nov 2014 19:08:33 +0000 (13:08 -0600)
committerMatt Joiner <anacrolix@gmail.com>
Sun, 16 Nov 2014 19:08:33 +0000 (13:08 -0600)
dht/dht.go

index 1f4980b14b31aba15ea47a6310b11838e42b6572..9118df71ca967c84c6faf3d20968660abfb35a6d 100644 (file)
@@ -13,6 +13,8 @@ import (
        "sync"
        "time"
 
+       "bitbucket.org/anacrolix/go.torrent/logonce"
+
        "bitbucket.org/anacrolix/go.torrent/util"
        "github.com/nsf/libtorgo/bencode"
 )
@@ -25,6 +27,8 @@ type Server struct {
        nodes            map[string]*Node // Keyed by *net.UDPAddr.String().
        mu               sync.Mutex
        closed           chan struct{}
+
+       NumConfirmedAnnounces int
 }
 
 type ServerConfig struct {
@@ -86,6 +90,7 @@ type Node struct {
        id            string
        lastHeardFrom time.Time
        lastSentTo    time.Time
+       announceToken string
 }
 
 func (n *Node) NodeInfo() (ret NodeInfo) {
@@ -123,6 +128,32 @@ func (m Msg) T() (t string) {
        return
 }
 
+type KRPCError struct {
+       Code int
+       Msg  string
+}
+
+func (me KRPCError) Error() string {
+       return fmt.Sprintf("KRPC error %d: %s", me.Code, me.Msg)
+}
+
+var _ error = KRPCError{}
+
+func (m Msg) Error() *KRPCError {
+       if m["y"] != "e" {
+               return nil
+       }
+       l := m["e"].([]interface{})
+       return &KRPCError{int(l[0].(int64)), l[1].(string)}
+}
+
+// Returns the token given in response to a get_peers request for future
+// announce_peer requests to that node.
+func (m Msg) AnnounceToken() string {
+       defer func() { recover() }()
+       return m["r"].(map[string]interface{})["token"].(string)
+}
+
 type transaction struct {
        remoteAddr net.Addr
        t          string
@@ -413,10 +444,10 @@ func (s *Server) timeoutTransaction(t *transaction) {
        s.removeTransaction(t)
 }
 
-func (s *Server) query(node *net.UDPAddr, q string, a map[string]string) (t *transaction, err error) {
+func (s *Server) query(node *net.UDPAddr, q string, a map[string]interface{}) (t *transaction, err error) {
        tid := s.nextTransactionID()
        if a == nil {
-               a = make(map[string]string, 1)
+               a = make(map[string]interface{}, 1)
        }
        a["id"] = s.IDString()
        d := map[string]interface{}{
@@ -488,6 +519,46 @@ func (s *Server) Ping(node *net.UDPAddr) (*transaction, error) {
        return s.query(node, "ping", nil)
 }
 
+// Announce a local peer. This can only be done to nodes that gave us an
+// announce token, which is received in responses during GetPeers. It's
+// recommended then that GetPeers is called before this method.
+func (s *Server) AnnouncePeer(port int, impliedPort bool, infoHash string) (err error) {
+       s.mu.Lock()
+       defer s.mu.Unlock()
+       for _, node := range s.closestNodes(10000, infoHash, func(n *Node) bool {
+               return n.Good() && n.announceToken != ""
+       }) {
+               err = s.announcePeer(node.addr, infoHash, port, node.announceToken, impliedPort)
+               if err != nil {
+                       break
+               }
+       }
+       return
+}
+
+func (s *Server) announcePeer(node *net.UDPAddr, infoHash string, port int, token string, impliedPort bool) error {
+       t, err := s.query(node, "announce_peer", map[string]interface{}{
+               "implied_port": func() int {
+                       if impliedPort {
+                               return 1
+                       } else {
+                               return 0
+                       }
+               }(),
+               "info_hash": infoHash,
+               "port":      port,
+               "token":     token,
+       })
+       t.setOnResponse(func(m Msg) {
+               if err := m.Error(); err != nil {
+                       logonce.Stderr.Printf("announce_peer response: %s", err)
+                       return
+               }
+               s.NumConfirmedAnnounces++
+       })
+       return err
+}
+
 type findNodeResponse struct {
        Nodes []NodeInfo
 }
@@ -593,7 +664,7 @@ func (s *Server) liftNodes(d Msg) {
 
 // Sends a find_node query to addr. targetID is the node we're looking for.
 func (s *Server) findNode(addr *net.UDPAddr, targetID string) (t *transaction, err error) {
-       t, err = s.query(addr, "find_node", map[string]string{"target": targetID})
+       t, err = s.query(addr, "find_node", map[string]interface{}{"target": targetID})
        if err != nil {
                return
        }
@@ -704,12 +775,13 @@ func (s *Server) getPeers(addr *net.UDPAddr, infoHash string) (t *transaction, e
                err = fmt.Errorf("infohash has bad length")
                return
        }
-       t, err = s.query(addr, "get_peers", map[string]string{"info_hash": infoHash})
+       t, err = s.query(addr, "get_peers", map[string]interface{}{"info_hash": infoHash})
        if err != nil {
                return
        }
        t.setOnResponse(func(m Msg) {
                s.liftNodes(m)
+               s.getNode(addr).announceToken = m.AnnounceToken()
        })
        return
 }
@@ -831,10 +903,14 @@ func idDistance(a, b string) (ret int) {
 }
 
 func (s *Server) closestGoodNodes(k int, targetID string) []*Node {
+       return s.closestNodes(k, targetID, func(n *Node) bool { return n.Good() })
+}
+
+func (s *Server) closestNodes(k int, targetID string, filter func(*Node) bool) []*Node {
        sel := newKClosestNodesSelector(k, targetID)
        idNodes := make(map[string]*Node, len(s.nodes))
        for _, node := range s.nodes {
-               if !node.Good() {
+               if !filter(node) {
                        continue
                }
                sel.Push(node.id)