From 1460e835c2c7238cb392c4ae161ce266c5bb3fc2 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Sun, 16 Nov 2014 13:08:33 -0600 Subject: [PATCH] Add dht AnnouncePeer --- dht/dht.go | 86 ++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 81 insertions(+), 5 deletions(-) diff --git a/dht/dht.go b/dht/dht.go index 1f4980b1..9118df71 100644 --- a/dht/dht.go +++ b/dht/dht.go @@ -13,6 +13,8 @@ import ( "sync" "time" + "bitbucket.org/anacrolix/go.torrent/logonce" + "bitbucket.org/anacrolix/go.torrent/util" "github.com/nsf/libtorgo/bencode" ) @@ -25,6 +27,8 @@ type Server struct { nodes map[string]*Node // Keyed by *net.UDPAddr.String(). mu sync.Mutex closed chan struct{} + + NumConfirmedAnnounces int } type ServerConfig struct { @@ -86,6 +90,7 @@ type Node struct { id string lastHeardFrom time.Time lastSentTo time.Time + announceToken string } func (n *Node) NodeInfo() (ret NodeInfo) { @@ -123,6 +128,32 @@ func (m Msg) T() (t string) { return } +type KRPCError struct { + Code int + Msg string +} + +func (me KRPCError) Error() string { + return fmt.Sprintf("KRPC error %d: %s", me.Code, me.Msg) +} + +var _ error = KRPCError{} + +func (m Msg) Error() *KRPCError { + if m["y"] != "e" { + return nil + } + l := m["e"].([]interface{}) + return &KRPCError{int(l[0].(int64)), l[1].(string)} +} + +// Returns the token given in response to a get_peers request for future +// announce_peer requests to that node. +func (m Msg) AnnounceToken() string { + defer func() { recover() }() + return m["r"].(map[string]interface{})["token"].(string) +} + type transaction struct { remoteAddr net.Addr t string @@ -413,10 +444,10 @@ func (s *Server) timeoutTransaction(t *transaction) { s.removeTransaction(t) } -func (s *Server) query(node *net.UDPAddr, q string, a map[string]string) (t *transaction, err error) { +func (s *Server) query(node *net.UDPAddr, q string, a map[string]interface{}) (t *transaction, err error) { tid := s.nextTransactionID() if a == nil { - a = make(map[string]string, 1) + a = make(map[string]interface{}, 1) } a["id"] = s.IDString() d := map[string]interface{}{ @@ -488,6 +519,46 @@ func (s *Server) Ping(node *net.UDPAddr) (*transaction, error) { return s.query(node, "ping", nil) } +// Announce a local peer. This can only be done to nodes that gave us an +// announce token, which is received in responses during GetPeers. It's +// recommended then that GetPeers is called before this method. +func (s *Server) AnnouncePeer(port int, impliedPort bool, infoHash string) (err error) { + s.mu.Lock() + defer s.mu.Unlock() + for _, node := range s.closestNodes(10000, infoHash, func(n *Node) bool { + return n.Good() && n.announceToken != "" + }) { + err = s.announcePeer(node.addr, infoHash, port, node.announceToken, impliedPort) + if err != nil { + break + } + } + return +} + +func (s *Server) announcePeer(node *net.UDPAddr, infoHash string, port int, token string, impliedPort bool) error { + t, err := s.query(node, "announce_peer", map[string]interface{}{ + "implied_port": func() int { + if impliedPort { + return 1 + } else { + return 0 + } + }(), + "info_hash": infoHash, + "port": port, + "token": token, + }) + t.setOnResponse(func(m Msg) { + if err := m.Error(); err != nil { + logonce.Stderr.Printf("announce_peer response: %s", err) + return + } + s.NumConfirmedAnnounces++ + }) + return err +} + type findNodeResponse struct { Nodes []NodeInfo } @@ -593,7 +664,7 @@ func (s *Server) liftNodes(d Msg) { // Sends a find_node query to addr. targetID is the node we're looking for. func (s *Server) findNode(addr *net.UDPAddr, targetID string) (t *transaction, err error) { - t, err = s.query(addr, "find_node", map[string]string{"target": targetID}) + t, err = s.query(addr, "find_node", map[string]interface{}{"target": targetID}) if err != nil { return } @@ -704,12 +775,13 @@ func (s *Server) getPeers(addr *net.UDPAddr, infoHash string) (t *transaction, e err = fmt.Errorf("infohash has bad length") return } - t, err = s.query(addr, "get_peers", map[string]string{"info_hash": infoHash}) + t, err = s.query(addr, "get_peers", map[string]interface{}{"info_hash": infoHash}) if err != nil { return } t.setOnResponse(func(m Msg) { s.liftNodes(m) + s.getNode(addr).announceToken = m.AnnounceToken() }) return } @@ -831,10 +903,14 @@ func idDistance(a, b string) (ret int) { } func (s *Server) closestGoodNodes(k int, targetID string) []*Node { + return s.closestNodes(k, targetID, func(n *Node) bool { return n.Good() }) +} + +func (s *Server) closestNodes(k int, targetID string, filter func(*Node) bool) []*Node { sel := newKClosestNodesSelector(k, targetID) idNodes := make(map[string]*Node, len(s.nodes)) for _, node := range s.nodes { - if !node.Good() { + if !filter(node) { continue } sel.Push(node.id) -- 2.48.1