17 "github.com/anacrolix/libtorgo/bencode"
18 "github.com/anacrolix/sync"
20 "github.com/anacrolix/torrent/iplist"
21 "github.com/anacrolix/torrent/logonce"
22 "github.com/anacrolix/torrent/util"
27 queryResendEvery = 5 * time.Second
30 // Uniquely identifies a transaction to us.
31 type transactionKey struct {
32 RemoteAddr string // host:port
33 T string // The KRPC transaction ID.
39 transactions map[transactionKey]*transaction
40 transactionIDInt uint64
41 nodes map[string]*Node // Keyed by dHTAddr.String().
44 passive bool // Don't respond to queries.
45 ipBlockList *iplist.IPList
47 NumConfirmedAnnounces int
50 type dHTAddr interface {
52 UDPAddr() *net.UDPAddr
55 type cachedAddr struct {
60 func (ca cachedAddr) Network() string {
64 func (ca cachedAddr) String() string {
68 func (ca cachedAddr) UDPAddr() *net.UDPAddr {
69 return ca.a.(*net.UDPAddr)
72 func newDHTAddr(addr net.Addr) dHTAddr {
73 return cachedAddr{addr, addr.String()}
76 type ServerConfig struct {
79 Passive bool // Don't respond to queries.
82 type serverStats struct {
85 NumOutstandingTransactions int
88 func (s *Server) Stats() (ss serverStats) {
91 for _, n := range s.nodes {
92 if n.DefinitelyGood() {
96 ss.NumNodes = len(s.nodes)
97 ss.NumOutstandingTransactions = len(s.transactions)
101 func (s *Server) LocalAddr() net.Addr {
102 return s.socket.LocalAddr()
105 func makeSocket(addr string) (socket *net.UDPConn, err error) {
106 addr_, err := net.ResolveUDPAddr("", addr)
110 socket, err = net.ListenUDP("udp", addr_)
114 func NewServer(c *ServerConfig) (s *Server, err error) {
122 s.socket, err = makeSocket(c.Addr)
127 s.passive = c.Passive
146 log.Printf("error bootstrapping DHT: %s", err)
152 func (s *Server) String() string {
153 return fmt.Sprintf("dht server on %s", s.socket.LocalAddr())
161 func (nid *nodeID) IsUnset() bool {
165 func nodeIDFromString(s string) (ret nodeID) {
169 ret.i.SetBytes([]byte(s))
174 func (nid0 *nodeID) Distance(nid1 *nodeID) (ret big.Int) {
175 if nid0.IsUnset() != nid1.IsUnset() {
179 ret.Xor(&nid0.i, &nid1.i)
183 func (nid *nodeID) String() string {
184 return string(nid.i.Bytes())
192 lastGotQuery time.Time
193 lastGotResponse time.Time
194 lastSentQuery time.Time
197 func (n *Node) idString() string {
201 func (n *Node) SetIDFromBytes(b []byte) {
206 func (n *Node) SetIDFromString(s string) {
207 n.id.i.SetBytes([]byte(s))
210 func (n *Node) IDNotSet() bool {
211 return n.id.i.Int64() == 0
214 func (n *Node) NodeInfo() (ret NodeInfo) {
216 if n := copy(ret.ID[:], n.idString()); n != 20 {
222 func (n *Node) DefinitelyGood() bool {
223 if len(n.idString()) != 20 {
226 // No reason to think ill of them if they've never been queried.
227 if n.lastSentQuery.IsZero() {
230 // They answered our last query.
231 if n.lastSentQuery.Before(n.lastGotResponse) {
237 type Msg map[string]interface{}
239 var _ fmt.Stringer = Msg{}
241 func (m Msg) String() string {
242 return fmt.Sprintf("%#v", m)
245 func (m Msg) T() (t string) {
254 func (m Msg) ID() string {
258 return m[m["y"].(string)].(map[string]interface{})["id"].(string)
261 // Suggested nodes in a response.
262 func (m Msg) Nodes() (nodes []NodeInfo) {
267 return m["r"].(map[string]interface{})["nodes"].(string)
272 for i := 0; i < len(b); i += 26 {
274 err := n.UnmarshalCompact([]byte(b[i : i+26]))
278 nodes = append(nodes, n)
283 type KRPCError struct {
288 func (me KRPCError) Error() string {
289 return fmt.Sprintf("KRPC error %d: %s", me.Code, me.Msg)
292 var _ error = KRPCError{}
294 func (m Msg) Error() (ret *KRPCError) {
299 switch e := m["e"].(type) {
301 ret.Code = int(e[0].(int64))
302 ret.Msg = e[1].(string)
306 logonce.Stderr.Printf(`KRPC error "e" value has unexpected type: %T`, e)
311 // Returns the token given in response to a get_peers request for future
312 // announce_peer requests to that node.
313 func (m Msg) AnnounceToken() (token string, ok bool) {
314 defer func() { recover() }()
315 token, ok = m["r"].(map[string]interface{})["token"].(string)
319 type transaction struct {
324 onResponse func(Msg) // Called with the server locked.
331 userOnResponse func(Msg)
334 func (t *transaction) SetResponseHandler(f func(Msg)) {
338 t.tryHandleResponse()
341 func (t *transaction) tryHandleResponse() {
342 if t.userOnResponse == nil {
346 case r := <-t.response:
348 // Shouldn't be called more than once.
349 t.userOnResponse = nil
354 func (t *transaction) Key() transactionKey {
355 return transactionKey{
356 t.remoteAddr.String(),
361 func jitterDuration(average time.Duration, plusMinus time.Duration) time.Duration {
362 return average - plusMinus/2 + time.Duration(rand.Int63n(int64(plusMinus)))
365 func (t *transaction) startTimer() {
366 t.timer = time.AfterFunc(jitterDuration(queryResendEvery, time.Second), t.timerCallback)
369 func (t *transaction) timerCallback() {
383 if t.timer.Reset(jitterDuration(queryResendEvery, time.Second)) {
384 panic("timer should have fired to get here")
388 func (t *transaction) sendQuery() error {
389 err := t.s.writeToNode(t.queryPacket, t.remoteAddr)
393 t.lastSend = time.Now()
397 func (t *transaction) timeout() {
400 defer t.s.mu.Unlock()
401 t.s.nodeTimedOut(t.remoteAddr)
406 func (t *transaction) close() {
412 t.tryHandleResponse()
417 defer t.s.mu.Unlock()
418 t.s.deleteTransaction(t)
422 func (t *transaction) closing() bool {
431 func (t *transaction) Close() {
437 func (t *transaction) handleResponse(m Msg) {
445 if t.onResponse != nil {
452 case t.response <- m:
454 panic("blocked handling response")
457 t.tryHandleResponse()
460 func (s *Server) setDefaults() (err error) {
463 h := crypto.SHA1.New()
464 ss, err := os.Hostname()
468 ss += s.socket.LocalAddr().String()
470 if b := h.Sum(id[:0:20]); len(b) != 20 {
478 s.nodes = make(map[string]*Node, 10000)
482 func (s *Server) SetIPBlockList(list *iplist.IPList) {
488 func (s *Server) init() (err error) {
489 err = s.setDefaults()
493 s.closed = make(chan struct{})
494 s.transactions = make(map[transactionKey]*transaction)
498 func (s *Server) processPacket(b []byte, addr dHTAddr) {
500 err := bencode.Unmarshal(b, &d)
503 if se, ok := err.(*bencode.SyntaxError); ok {
504 // The message was truncated.
505 if int(se.Offset) == len(b) {
508 // Some messages seem to drop to nul chars abrubtly.
509 if int(se.Offset) < len(b) && b[se.Offset] == 0 {
512 // The message isn't bencode from the first.
517 log.Printf("%s: received bad krpc message from %s: %s: %q", s, addr, err, b)
524 s.handleQuery(addr, d)
527 t := s.findResponseTransaction(d.T(), addr)
529 //log.Printf("unexpected message: %#v", d)
532 node := s.getNode(addr)
533 node.lastGotResponse = time.Now()
534 // TODO: Update node ID as this is an authoritative packet.
535 go t.handleResponse(d)
536 s.deleteTransaction(t)
539 func (s *Server) serve() error {
542 n, addr, err := s.socket.ReadFrom(b[:])
547 logonce.Stderr.Printf("received dht packet exceeds buffer size")
550 s.processPacket(b[:n], newDHTAddr(addr))
554 func (s *Server) ipBlocked(ip net.IP) bool {
555 if s.ipBlockList == nil {
558 return s.ipBlockList.Lookup(ip) != nil
561 func (s *Server) AddNode(ni NodeInfo) {
565 s.nodes = make(map[string]*Node)
567 n := s.getNode(ni.Addr)
569 n.SetIDFromBytes(ni.ID[:])
573 func (s *Server) nodeByID(id string) *Node {
574 for _, node := range s.nodes {
575 if node.idString() == id {
582 func (s *Server) handleQuery(source dHTAddr, m Msg) {
583 args := m["a"].(map[string]interface{})
584 node := s.getNode(source)
585 node.SetIDFromString(args["id"].(string))
586 node.lastGotQuery = time.Now()
593 s.reply(source, m["t"].(string), nil)
594 case "get_peers": // TODO: Extract common behaviour with find_node.
595 targetID := args["info_hash"].(string)
596 if len(targetID) != 20 {
599 var rNodes []NodeInfo
600 // TODO: Reply with "values" list if we have peers instead.
601 for _, node := range s.closestGoodNodes(8, targetID) {
602 rNodes = append(rNodes, node.NodeInfo())
604 nodesBytes := make([]byte, CompactNodeInfoLen*len(rNodes))
605 for i, ni := range rNodes {
606 err := ni.PutCompact(nodesBytes[i*CompactNodeInfoLen : (i+1)*CompactNodeInfoLen])
611 s.reply(source, m["t"].(string), map[string]interface{}{
612 "nodes": string(nodesBytes),
615 case "find_node": // TODO: Extract common behaviour with get_peers.
616 targetID := args["target"].(string)
617 if len(targetID) != 20 {
618 log.Printf("bad DHT query: %v", m)
621 var rNodes []NodeInfo
622 if node := s.nodeByID(targetID); node != nil {
623 rNodes = append(rNodes, node.NodeInfo())
625 for _, node := range s.closestGoodNodes(8, targetID) {
626 rNodes = append(rNodes, node.NodeInfo())
629 nodesBytes := make([]byte, CompactNodeInfoLen*len(rNodes))
630 for i, ni := range rNodes {
631 // TODO: Put IPv6 nodes into the correct dict element.
632 if ni.Addr.UDPAddr().IP.To4() == nil {
635 err := ni.PutCompact(nodesBytes[i*CompactNodeInfoLen : (i+1)*CompactNodeInfoLen])
637 log.Printf("error compacting %#v: %s", ni, err)
641 s.reply(source, m["t"].(string), map[string]interface{}{
642 "nodes": string(nodesBytes),
644 case "announce_peer":
645 // TODO(anacrolix): Implement this lolz.
648 // TODO(anacrolix): Or reject, I don't think I want this.
650 log.Printf("%s: not handling received query: q=%s", s, m["q"])
655 func (s *Server) reply(addr dHTAddr, t string, r map[string]interface{}) {
657 r = make(map[string]interface{}, 1)
659 r["id"] = s.IDString()
660 m := map[string]interface{}{
665 b, err := bencode.Marshal(m)
669 err = s.writeToNode(b, addr)
671 log.Printf("error replying to %s: %s", addr, err)
675 func (s *Server) getNode(addr dHTAddr) (n *Node) {
676 addrStr := addr.String()
682 if len(s.nodes) < maxNodes {
688 func (s *Server) nodeTimedOut(addr dHTAddr) {
689 node, ok := s.nodes[addr.String()]
693 if node.DefinitelyGood() {
696 if len(s.nodes) < maxNodes {
699 delete(s.nodes, addr.String())
702 func (s *Server) writeToNode(b []byte, node dHTAddr) (err error) {
703 if list := s.ipBlockList; list != nil {
704 if r := list.Lookup(util.AddrIP(node.UDPAddr())); r != nil {
705 err = fmt.Errorf("write to %s blocked: %s", node, r.Description)
709 n, err := s.socket.WriteTo(b, node.UDPAddr())
711 err = fmt.Errorf("error writing %d bytes to %s: %#v", len(b), node, err)
715 err = io.ErrShortWrite
721 func (s *Server) findResponseTransaction(transactionID string, sourceNode dHTAddr) *transaction {
722 return s.transactions[transactionKey{
727 func (s *Server) nextTransactionID() string {
728 var b [binary.MaxVarintLen64]byte
729 n := binary.PutUvarint(b[:], s.transactionIDInt)
734 func (s *Server) deleteTransaction(t *transaction) {
735 delete(s.transactions, t.Key())
738 func (s *Server) addTransaction(t *transaction) {
739 if _, ok := s.transactions[t.Key()]; ok {
740 panic("transaction not unique")
742 s.transactions[t.Key()] = t
745 func (s *Server) IDString() string {
752 func (s *Server) query(node dHTAddr, q string, a map[string]interface{}, onResponse func(Msg)) (t *transaction, err error) {
753 tid := s.nextTransactionID()
755 a = make(map[string]interface{}, 1)
757 a["id"] = s.IDString()
758 d := map[string]interface{}{
764 b, err := bencode.Marshal(d)
771 response: make(chan Msg, 1),
772 done: make(chan struct{}),
775 onResponse: onResponse,
781 s.getNode(node).lastSentQuery = time.Now()
787 const CompactNodeInfoLen = 26
789 type NodeInfo struct {
794 func (ni *NodeInfo) PutCompact(b []byte) error {
795 if n := copy(b[:], ni.ID[:]); n != 20 {
798 ip := util.AddrIP(ni.Addr).To4()
800 return errors.New("expected ipv4 address")
802 if n := copy(b[20:], ip); n != 4 {
805 binary.BigEndian.PutUint16(b[24:], uint16(util.AddrPort(ni.Addr)))
809 func (cni *NodeInfo) UnmarshalCompact(b []byte) error {
811 return errors.New("expected 26 bytes")
813 util.CopyExact(cni.ID[:], b[:20])
814 cni.Addr = newDHTAddr(&net.UDPAddr{
815 IP: net.IPv4(b[20], b[21], b[22], b[23]),
816 Port: int(binary.BigEndian.Uint16(b[24:26])),
821 func (s *Server) Ping(node *net.UDPAddr) (*transaction, error) {
824 return s.query(newDHTAddr(node), "ping", nil, nil)
827 // Announce a local peer. This can only be done to nodes that gave us an
828 // announce token, which is received in responses during GetPeers. It's
829 // recommended then that GetPeers is called before this method.
830 func (s *Server) AnnouncePeer(port int, impliedPort bool, infoHash string) (err error) {
833 for _, node := range s.closestNodes(160, nodeIDFromString(infoHash), func(n *Node) bool {
834 return n.announceToken != ""
836 err = s.announcePeer(node.addr, infoHash, port, node.announceToken, impliedPort)
844 func (s *Server) announcePeer(node dHTAddr, infoHash string, port int, token string, impliedPort bool) (err error) {
845 if port == 0 && !impliedPort {
846 return errors.New("nothing to announce")
848 _, err = s.query(node, "announce_peer", map[string]interface{}{
849 "implied_port": func() int {
856 "info_hash": infoHash,
860 if err := m.Error(); err != nil {
861 logonce.Stderr.Printf("announce_peer response: %s", err)
864 s.NumConfirmedAnnounces++
869 // Add response nodes to node table.
870 func (s *Server) liftNodes(d Msg) {
874 for _, cni := range d.Nodes() {
875 if util.AddrPort(cni.Addr) == 0 {
876 // TODO: Why would people even do this?
879 if s.ipBlocked(util.AddrIP(cni.Addr)) {
882 n := s.getNode(cni.Addr)
883 n.SetIDFromBytes(cni.ID[:])
887 // Sends a find_node query to addr. targetID is the node we're looking for.
888 func (s *Server) findNode(addr dHTAddr, targetID string) (t *transaction, err error) {
889 t, err = s.query(addr, "find_node", map[string]interface{}{"target": targetID}, func(d Msg) {
890 // Scrape peers from the response to put in the server's table before
891 // handing the response back to the caller.
900 // In a get_peers response, the addresses of torrent clients involved with the
901 // queried info-hash.
902 func (m Msg) Values() (vs []util.CompactPeer) {
907 rd, ok := r.(map[string]interface{})
911 v, ok := rd["values"]
915 vl, ok := v.([]interface{})
917 log.Printf("unexpected krpc values type: %T", v)
920 vs = make([]util.CompactPeer, 0, len(vl))
921 for _, i := range vl {
926 var cp util.CompactPeer
927 err := cp.UnmarshalBinary([]byte(s))
929 log.Printf("error decoding values list element: %s", err)
937 func (s *Server) getPeers(addr dHTAddr, infoHash string) (t *transaction, err error) {
938 if len(infoHash) != 20 {
939 err = fmt.Errorf("infohash has bad length")
942 t, err = s.query(addr, "get_peers", map[string]interface{}{"info_hash": infoHash}, func(m Msg) {
944 at, ok := m.AnnounceToken()
946 s.getNode(addr).announceToken = at
952 func bootstrapAddrs() (addrs []*net.UDPAddr, err error) {
953 for _, addrStr := range []string{
954 "router.utorrent.com:6881",
955 "router.bittorrent.com:6881",
957 udpAddr, err := net.ResolveUDPAddr("udp4", addrStr)
961 addrs = append(addrs, udpAddr)
964 err = errors.New("nothing resolved")
969 func (s *Server) addRootNodes() error {
970 addrs, err := bootstrapAddrs()
974 for _, addr := range addrs {
975 s.nodes[addr.String()] = &Node{
976 addr: newDHTAddr(addr),
982 // Populates the node table.
983 func (s *Server) bootstrap() (err error) {
986 if len(s.nodes) == 0 {
987 err = s.addRootNodes()
993 var outstanding sync.WaitGroup
994 for _, node := range s.nodes {
996 t, err = s.findNode(node.addr, s.id)
998 err = fmt.Errorf("error sending find_node: %s", err)
1002 t.SetResponseHandler(func(Msg) {
1006 noOutstanding := make(chan struct{})
1009 close(noOutstanding)
1016 case <-time.After(15 * time.Second):
1017 case <-noOutstanding:
1020 // log.Printf("now have %d nodes", len(s.nodes))
1021 if s.numGoodNodes() >= 160 {
1028 func (s *Server) numGoodNodes() (num int) {
1029 for _, n := range s.nodes {
1030 if n.DefinitelyGood() {
1037 func (s *Server) NumNodes() int {
1043 func (s *Server) Nodes() (nis []NodeInfo) {
1046 for _, node := range s.nodes {
1047 // if !node.Good() {
1053 if n := copy(ni.ID[:], node.idString()); n != 20 && n != 0 {
1056 nis = append(nis, ni)
1061 func (s *Server) Close() {
1072 var maxDistance big.Int
1076 maxDistance.SetBit(&zero, 160, 1)
1079 func (s *Server) closestGoodNodes(k int, targetID string) []*Node {
1080 return s.closestNodes(k, nodeIDFromString(targetID), func(n *Node) bool { return n.DefinitelyGood() })
1083 func (s *Server) closestNodes(k int, target nodeID, filter func(*Node) bool) []*Node {
1084 sel := newKClosestNodesSelector(k, target)
1085 idNodes := make(map[string]*Node, len(s.nodes))
1086 for _, node := range s.nodes {
1091 idNodes[node.idString()] = node
1094 ret := make([]*Node, 0, len(ids))
1095 for _, id := range ids {
1096 ret = append(ret, idNodes[id.String()])