"sync"
"time"
+ "bitbucket.org/anacrolix/go.torrent/logonce"
+
"bitbucket.org/anacrolix/go.torrent/util"
"github.com/nsf/libtorgo/bencode"
)
nodes map[string]*Node // Keyed by *net.UDPAddr.String().
mu sync.Mutex
closed chan struct{}
+
+ NumConfirmedAnnounces int
}
type ServerConfig struct {
id string
lastHeardFrom time.Time
lastSentTo time.Time
+ announceToken string
}
func (n *Node) NodeInfo() (ret NodeInfo) {
return
}
+type KRPCError struct {
+ Code int
+ Msg string
+}
+
+func (me KRPCError) Error() string {
+ return fmt.Sprintf("KRPC error %d: %s", me.Code, me.Msg)
+}
+
+var _ error = KRPCError{}
+
+func (m Msg) Error() *KRPCError {
+ if m["y"] != "e" {
+ return nil
+ }
+ l := m["e"].([]interface{})
+ return &KRPCError{int(l[0].(int64)), l[1].(string)}
+}
+
+// Returns the token given in response to a get_peers request for future
+// announce_peer requests to that node.
+func (m Msg) AnnounceToken() string {
+ defer func() { recover() }()
+ return m["r"].(map[string]interface{})["token"].(string)
+}
+
type transaction struct {
remoteAddr net.Addr
t string
s.removeTransaction(t)
}
-func (s *Server) query(node *net.UDPAddr, q string, a map[string]string) (t *transaction, err error) {
+func (s *Server) query(node *net.UDPAddr, q string, a map[string]interface{}) (t *transaction, err error) {
tid := s.nextTransactionID()
if a == nil {
- a = make(map[string]string, 1)
+ a = make(map[string]interface{}, 1)
}
a["id"] = s.IDString()
d := map[string]interface{}{
return s.query(node, "ping", nil)
}
+// Announce a local peer. This can only be done to nodes that gave us an
+// announce token, which is received in responses during GetPeers. It's
+// recommended then that GetPeers is called before this method.
+func (s *Server) AnnouncePeer(port int, impliedPort bool, infoHash string) (err error) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ for _, node := range s.closestNodes(10000, infoHash, func(n *Node) bool {
+ return n.Good() && n.announceToken != ""
+ }) {
+ err = s.announcePeer(node.addr, infoHash, port, node.announceToken, impliedPort)
+ if err != nil {
+ break
+ }
+ }
+ return
+}
+
+func (s *Server) announcePeer(node *net.UDPAddr, infoHash string, port int, token string, impliedPort bool) error {
+ t, err := s.query(node, "announce_peer", map[string]interface{}{
+ "implied_port": func() int {
+ if impliedPort {
+ return 1
+ } else {
+ return 0
+ }
+ }(),
+ "info_hash": infoHash,
+ "port": port,
+ "token": token,
+ })
+ t.setOnResponse(func(m Msg) {
+ if err := m.Error(); err != nil {
+ logonce.Stderr.Printf("announce_peer response: %s", err)
+ return
+ }
+ s.NumConfirmedAnnounces++
+ })
+ return err
+}
+
type findNodeResponse struct {
Nodes []NodeInfo
}
// Sends a find_node query to addr. targetID is the node we're looking for.
func (s *Server) findNode(addr *net.UDPAddr, targetID string) (t *transaction, err error) {
- t, err = s.query(addr, "find_node", map[string]string{"target": targetID})
+ t, err = s.query(addr, "find_node", map[string]interface{}{"target": targetID})
if err != nil {
return
}
err = fmt.Errorf("infohash has bad length")
return
}
- t, err = s.query(addr, "get_peers", map[string]string{"info_hash": infoHash})
+ t, err = s.query(addr, "get_peers", map[string]interface{}{"info_hash": infoHash})
if err != nil {
return
}
t.setOnResponse(func(m Msg) {
s.liftNodes(m)
+ s.getNode(addr).announceToken = m.AnnounceToken()
})
return
}
}
func (s *Server) closestGoodNodes(k int, targetID string) []*Node {
+ return s.closestNodes(k, targetID, func(n *Node) bool { return n.Good() })
+}
+
+func (s *Server) closestNodes(k int, targetID string, filter func(*Node) bool) []*Node {
sel := newKClosestNodesSelector(k, targetID)
idNodes := make(map[string]*Node, len(s.nodes))
for _, node := range s.nodes {
- if !node.Good() {
+ if !filter(node) {
continue
}
sel.Push(node.id)