From: Matt Joiner <anacrolix@gmail.com>
Date: Sun, 16 Nov 2014 19:08:33 +0000 (-0600)
Subject: Add dht AnnouncePeer
X-Git-Tag: v1.0.0~1546
X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=1460e835c2c7238cb392c4ae161ce266c5bb3fc2;p=btrtrc.git

Add dht AnnouncePeer
---

diff --git a/dht/dht.go b/dht/dht.go
index 1f4980b1..9118df71 100644
--- a/dht/dht.go
+++ b/dht/dht.go
@@ -13,6 +13,8 @@ import (
 	"sync"
 	"time"
 
+	"bitbucket.org/anacrolix/go.torrent/logonce"
+
 	"bitbucket.org/anacrolix/go.torrent/util"
 	"github.com/nsf/libtorgo/bencode"
 )
@@ -25,6 +27,8 @@ type Server struct {
 	nodes            map[string]*Node // Keyed by *net.UDPAddr.String().
 	mu               sync.Mutex
 	closed           chan struct{}
+
+	NumConfirmedAnnounces int
 }
 
 type ServerConfig struct {
@@ -86,6 +90,7 @@ type Node struct {
 	id            string
 	lastHeardFrom time.Time
 	lastSentTo    time.Time
+	announceToken string
 }
 
 func (n *Node) NodeInfo() (ret NodeInfo) {
@@ -123,6 +128,32 @@ func (m Msg) T() (t string) {
 	return
 }
 
+type KRPCError struct {
+	Code int
+	Msg  string
+}
+
+func (me KRPCError) Error() string {
+	return fmt.Sprintf("KRPC error %d: %s", me.Code, me.Msg)
+}
+
+var _ error = KRPCError{}
+
+func (m Msg) Error() *KRPCError {
+	if m["y"] != "e" {
+		return nil
+	}
+	l := m["e"].([]interface{})
+	return &KRPCError{int(l[0].(int64)), l[1].(string)}
+}
+
+// Returns the token given in response to a get_peers request for future
+// announce_peer requests to that node.
+func (m Msg) AnnounceToken() string {
+	defer func() { recover() }()
+	return m["r"].(map[string]interface{})["token"].(string)
+}
+
 type transaction struct {
 	remoteAddr net.Addr
 	t          string
@@ -413,10 +444,10 @@ func (s *Server) timeoutTransaction(t *transaction) {
 	s.removeTransaction(t)
 }
 
-func (s *Server) query(node *net.UDPAddr, q string, a map[string]string) (t *transaction, err error) {
+func (s *Server) query(node *net.UDPAddr, q string, a map[string]interface{}) (t *transaction, err error) {
 	tid := s.nextTransactionID()
 	if a == nil {
-		a = make(map[string]string, 1)
+		a = make(map[string]interface{}, 1)
 	}
 	a["id"] = s.IDString()
 	d := map[string]interface{}{
@@ -488,6 +519,46 @@ func (s *Server) Ping(node *net.UDPAddr) (*transaction, error) {
 	return s.query(node, "ping", nil)
 }
 
+// Announce a local peer. This can only be done to nodes that gave us an
+// announce token, which is received in responses during GetPeers. It's
+// recommended then that GetPeers is called before this method.
+func (s *Server) AnnouncePeer(port int, impliedPort bool, infoHash string) (err error) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	for _, node := range s.closestNodes(10000, infoHash, func(n *Node) bool {
+		return n.Good() && n.announceToken != ""
+	}) {
+		err = s.announcePeer(node.addr, infoHash, port, node.announceToken, impliedPort)
+		if err != nil {
+			break
+		}
+	}
+	return
+}
+
+func (s *Server) announcePeer(node *net.UDPAddr, infoHash string, port int, token string, impliedPort bool) error {
+	t, err := s.query(node, "announce_peer", map[string]interface{}{
+		"implied_port": func() int {
+			if impliedPort {
+				return 1
+			} else {
+				return 0
+			}
+		}(),
+		"info_hash": infoHash,
+		"port":      port,
+		"token":     token,
+	})
+	t.setOnResponse(func(m Msg) {
+		if err := m.Error(); err != nil {
+			logonce.Stderr.Printf("announce_peer response: %s", err)
+			return
+		}
+		s.NumConfirmedAnnounces++
+	})
+	return err
+}
+
 type findNodeResponse struct {
 	Nodes []NodeInfo
 }
@@ -593,7 +664,7 @@ func (s *Server) liftNodes(d Msg) {
 
 // Sends a find_node query to addr. targetID is the node we're looking for.
 func (s *Server) findNode(addr *net.UDPAddr, targetID string) (t *transaction, err error) {
-	t, err = s.query(addr, "find_node", map[string]string{"target": targetID})
+	t, err = s.query(addr, "find_node", map[string]interface{}{"target": targetID})
 	if err != nil {
 		return
 	}
@@ -704,12 +775,13 @@ func (s *Server) getPeers(addr *net.UDPAddr, infoHash string) (t *transaction, e
 		err = fmt.Errorf("infohash has bad length")
 		return
 	}
-	t, err = s.query(addr, "get_peers", map[string]string{"info_hash": infoHash})
+	t, err = s.query(addr, "get_peers", map[string]interface{}{"info_hash": infoHash})
 	if err != nil {
 		return
 	}
 	t.setOnResponse(func(m Msg) {
 		s.liftNodes(m)
+		s.getNode(addr).announceToken = m.AnnounceToken()
 	})
 	return
 }
@@ -831,10 +903,14 @@ func idDistance(a, b string) (ret int) {
 }
 
 func (s *Server) closestGoodNodes(k int, targetID string) []*Node {
+	return s.closestNodes(k, targetID, func(n *Node) bool { return n.Good() })
+}
+
+func (s *Server) closestNodes(k int, targetID string, filter func(*Node) bool) []*Node {
 	sel := newKClosestNodesSelector(k, targetID)
 	idNodes := make(map[string]*Node, len(s.nodes))
 	for _, node := range s.nodes {
-		if !node.Good() {
+		if !filter(node) {
 			continue
 		}
 		sel.Push(node.id)