1 // Package DHT implements a DHT for use with the BitTorrent protocol,
2 // described in BEP 5: http://www.bittorrent.org/beps/bep_0005.html.
4 // Standard use involves creating a NewServer, and calling Announce on it with
5 // the details of your local torrent client and infohash of interest.
22 "github.com/anacrolix/sync"
24 "github.com/anacrolix/torrent/bencode"
25 "github.com/anacrolix/torrent/iplist"
26 "github.com/anacrolix/torrent/logonce"
27 "github.com/anacrolix/torrent/util"
32 queryResendEvery = 5 * time.Second
35 // Uniquely identifies a transaction to us.
36 type transactionKey struct {
37 RemoteAddr string // host:port
38 T string // The KRPC transaction ID.
44 transactions map[transactionKey]*Transaction
45 transactionIDInt uint64
46 nodes map[string]*node // Keyed by dHTAddr.String().
49 passive bool // Don't respond to queries.
50 ipBlockList *iplist.IPList
52 numConfirmedAnnounces int
53 bootstrapNodes []string
56 type dHTAddr interface {
58 UDPAddr() *net.UDPAddr
61 type cachedAddr struct {
66 func (ca cachedAddr) Network() string {
70 func (ca cachedAddr) String() string {
74 func (ca cachedAddr) UDPAddr() *net.UDPAddr {
75 return ca.a.(*net.UDPAddr)
78 func newDHTAddr(addr net.Addr) dHTAddr {
79 return cachedAddr{addr, addr.String()}
82 type ServerConfig struct {
83 Addr string // Listen address. Used if Conn is nil.
85 // Don't respond to queries from other nodes.
87 // DHT Bootstrap nodes
88 BootstrapNodes []string
91 type ServerStats struct {
92 // Count of nodes in the node table that responded to our last query or
93 // haven't yet been queried.
95 // Count of nodes in the node table.
97 // Transactions awaiting a response.
98 OutstandingTransactions int
99 // Individual announce_peer requests that got a success response.
100 ConfirmedAnnounces int
103 // Returns statistics for the server.
104 func (s *Server) Stats() (ss ServerStats) {
107 for _, n := range s.nodes {
108 if n.DefinitelyGood() {
112 ss.Nodes = len(s.nodes)
113 ss.OutstandingTransactions = len(s.transactions)
114 ss.ConfirmedAnnounces = s.numConfirmedAnnounces
118 // Returns the listen address for the server. Packets arriving to this address
119 // are processed by the server (unless aliens are involved).
120 func (s *Server) Addr() net.Addr {
121 return s.socket.LocalAddr()
124 func makeSocket(addr string) (socket *net.UDPConn, err error) {
125 addr_, err := net.ResolveUDPAddr("", addr)
129 socket, err = net.ListenUDP("udp", addr_)
133 // Create a new DHT server.
134 func NewServer(c *ServerConfig) (s *Server, err error) {
142 s.socket, err = makeSocket(c.Addr)
147 s.passive = c.Passive
148 s.bootstrapNodes = c.BootstrapNodes
167 log.Printf("error bootstrapping DHT: %s", err)
173 // Returns a description of the Server. Python repr-style.
174 func (s *Server) String() string {
175 return fmt.Sprintf("dht server on %s", s.socket.LocalAddr())
183 func (nid *nodeID) IsUnset() bool {
187 func nodeIDFromString(s string) (ret nodeID) {
191 ret.i.SetBytes([]byte(s))
196 func (nid0 *nodeID) Distance(nid1 *nodeID) (ret big.Int) {
197 if nid0.IsUnset() != nid1.IsUnset() {
201 ret.Xor(&nid0.i, &nid1.i)
205 func (nid *nodeID) String() string {
206 return string(nid.i.Bytes())
214 lastGotQuery time.Time
215 lastGotResponse time.Time
216 lastSentQuery time.Time
219 func (n *node) idString() string {
223 func (n *node) SetIDFromBytes(b []byte) {
228 func (n *node) SetIDFromString(s string) {
229 n.id.i.SetBytes([]byte(s))
232 func (n *node) IDNotSet() bool {
233 return n.id.i.Int64() == 0
236 func (n *node) NodeInfo() (ret NodeInfo) {
238 if n := copy(ret.ID[:], n.idString()); n != 20 {
244 func (n *node) DefinitelyGood() bool {
245 if len(n.idString()) != 20 {
248 // No reason to think ill of them if they've never been queried.
249 if n.lastSentQuery.IsZero() {
252 // They answered our last query.
253 if n.lastSentQuery.Before(n.lastGotResponse) {
259 // A wrapper around the unmarshalled KRPC dict that constitutes messages in
260 // the DHT. There are various helpers for extracting common data from the
261 // message. In normal use, Msg is abstracted away for you, but it can be of
263 type Msg map[string]interface{}
265 var _ fmt.Stringer = Msg{}
267 func (m Msg) String() string {
268 return fmt.Sprintf("%#v", m)
271 func (m Msg) T() (t string) {
280 func (m Msg) ID() string {
284 return m[m["y"].(string)].(map[string]interface{})["id"].(string)
287 // Suggested nodes in a response.
288 func (m Msg) Nodes() (nodes []NodeInfo) {
293 return m["r"].(map[string]interface{})["nodes"].(string)
298 for i := 0; i < len(b); i += 26 {
300 err := n.UnmarshalCompact([]byte(b[i : i+26]))
304 nodes = append(nodes, n)
309 type KRPCError struct {
314 func (me KRPCError) Error() string {
315 return fmt.Sprintf("KRPC error %d: %s", me.Code, me.Msg)
318 var _ error = KRPCError{}
320 func (m Msg) Error() (ret *KRPCError) {
325 switch e := m["e"].(type) {
327 ret.Code = int(e[0].(int64))
328 ret.Msg = e[1].(string)
332 logonce.Stderr.Printf(`KRPC error "e" value has unexpected type: %T`, e)
337 // Returns the token given in response to a get_peers request for future
338 // announce_peer requests to that node.
339 func (m Msg) AnnounceToken() (token string, ok bool) {
340 defer func() { recover() }()
341 token, ok = m["r"].(map[string]interface{})["token"].(string)
345 type Transaction struct {
350 onResponse func(Msg) // Called with the server locked.
357 userOnResponse func(Msg)
360 // Set a function to be called with the response.
361 func (t *Transaction) SetResponseHandler(f func(Msg)) {
365 t.tryHandleResponse()
368 func (t *Transaction) tryHandleResponse() {
369 if t.userOnResponse == nil {
373 case r := <-t.response:
375 // Shouldn't be called more than once.
376 t.userOnResponse = nil
381 func (t *Transaction) key() transactionKey {
382 return transactionKey{
383 t.remoteAddr.String(),
388 func jitterDuration(average time.Duration, plusMinus time.Duration) time.Duration {
389 return average - plusMinus/2 + time.Duration(rand.Int63n(int64(plusMinus)))
392 func (t *Transaction) startTimer() {
393 t.timer = time.AfterFunc(jitterDuration(queryResendEvery, time.Second), t.timerCallback)
396 func (t *Transaction) timerCallback() {
410 if t.timer.Reset(jitterDuration(queryResendEvery, time.Second)) {
411 panic("timer should have fired to get here")
415 func (t *Transaction) sendQuery() error {
416 err := t.s.writeToNode(t.queryPacket, t.remoteAddr)
420 t.lastSend = time.Now()
424 func (t *Transaction) timeout() {
427 defer t.s.mu.Unlock()
428 t.s.nodeTimedOut(t.remoteAddr)
433 func (t *Transaction) close() {
439 t.tryHandleResponse()
444 defer t.s.mu.Unlock()
445 t.s.deleteTransaction(t)
449 func (t *Transaction) closing() bool {
458 // Abandon the transaction.
459 func (t *Transaction) Close() {
465 func (t *Transaction) handleResponse(m Msg) {
473 if t.onResponse != nil {
480 case t.response <- m:
482 panic("blocked handling response")
485 t.tryHandleResponse()
488 func (s *Server) setDefaults() (err error) {
491 h := crypto.SHA1.New()
492 ss, err := os.Hostname()
496 ss += s.socket.LocalAddr().String()
498 if b := h.Sum(id[:0:20]); len(b) != 20 {
506 s.nodes = make(map[string]*node, 10000)
510 // Packets to and from any address matching a range in the list are dropped.
511 func (s *Server) SetIPBlockList(list *iplist.IPList) {
517 func (s *Server) init() (err error) {
518 err = s.setDefaults()
522 s.closed = make(chan struct{})
523 s.transactions = make(map[transactionKey]*Transaction)
527 func (s *Server) processPacket(b []byte, addr dHTAddr) {
529 err := bencode.Unmarshal(b, &d)
532 if se, ok := err.(*bencode.SyntaxError); ok {
533 // The message was truncated.
534 if int(se.Offset) == len(b) {
537 // Some messages seem to drop to nul chars abrubtly.
538 if int(se.Offset) < len(b) && b[se.Offset] == 0 {
541 // The message isn't bencode from the first.
546 log.Printf("%s: received bad krpc message from %s: %s: %q", s, addr, err, b)
553 s.handleQuery(addr, d)
556 t := s.findResponseTransaction(d.T(), addr)
558 //log.Printf("unexpected message: %#v", d)
561 node := s.getNode(addr)
562 node.lastGotResponse = time.Now()
563 // TODO: Update node ID as this is an authoritative packet.
564 go t.handleResponse(d)
565 s.deleteTransaction(t)
568 func (s *Server) serve() error {
571 n, addr, err := s.socket.ReadFrom(b[:])
576 logonce.Stderr.Printf("received dht packet exceeds buffer size")
579 if s.ipBlocked(util.AddrIP(addr)) {
582 s.processPacket(b[:n], newDHTAddr(addr))
586 func (s *Server) ipBlocked(ip net.IP) bool {
587 if s.ipBlockList == nil {
590 return s.ipBlockList.Lookup(ip) != nil
593 // Adds directly to the node table.
594 func (s *Server) AddNode(ni NodeInfo) {
598 s.nodes = make(map[string]*node)
600 n := s.getNode(ni.Addr)
602 n.SetIDFromBytes(ni.ID[:])
606 func (s *Server) nodeByID(id string) *node {
607 for _, node := range s.nodes {
608 if node.idString() == id {
615 func (s *Server) handleQuery(source dHTAddr, m Msg) {
616 args := m["a"].(map[string]interface{})
617 node := s.getNode(source)
618 node.SetIDFromString(args["id"].(string))
619 node.lastGotQuery = time.Now()
626 s.reply(source, m["t"].(string), nil)
627 case "get_peers": // TODO: Extract common behaviour with find_node.
628 targetID := args["info_hash"].(string)
629 if len(targetID) != 20 {
632 var rNodes []NodeInfo
633 // TODO: Reply with "values" list if we have peers instead.
634 for _, node := range s.closestGoodNodes(8, targetID) {
635 rNodes = append(rNodes, node.NodeInfo())
637 nodesBytes := make([]byte, CompactNodeInfoLen*len(rNodes))
638 for i, ni := range rNodes {
639 err := ni.PutCompact(nodesBytes[i*CompactNodeInfoLen : (i+1)*CompactNodeInfoLen])
644 s.reply(source, m["t"].(string), map[string]interface{}{
645 "nodes": string(nodesBytes),
648 case "find_node": // TODO: Extract common behaviour with get_peers.
649 targetID := args["target"].(string)
650 if len(targetID) != 20 {
651 log.Printf("bad DHT query: %v", m)
654 var rNodes []NodeInfo
655 if node := s.nodeByID(targetID); node != nil {
656 rNodes = append(rNodes, node.NodeInfo())
658 for _, node := range s.closestGoodNodes(8, targetID) {
659 rNodes = append(rNodes, node.NodeInfo())
662 nodesBytes := make([]byte, CompactNodeInfoLen*len(rNodes))
663 for i, ni := range rNodes {
664 // TODO: Put IPv6 nodes into the correct dict element.
665 if ni.Addr.UDPAddr().IP.To4() == nil {
668 err := ni.PutCompact(nodesBytes[i*CompactNodeInfoLen : (i+1)*CompactNodeInfoLen])
670 log.Printf("error compacting %#v: %s", ni, err)
674 s.reply(source, m["t"].(string), map[string]interface{}{
675 "nodes": string(nodesBytes),
677 case "announce_peer":
678 // TODO(anacrolix): Implement this lolz.
681 // TODO(anacrolix): Or reject, I don't think I want this.
683 log.Printf("%s: not handling received query: q=%s", s, m["q"])
688 func (s *Server) reply(addr dHTAddr, t string, r map[string]interface{}) {
690 r = make(map[string]interface{}, 1)
693 m := map[string]interface{}{
698 b, err := bencode.Marshal(m)
702 err = s.writeToNode(b, addr)
704 log.Printf("error replying to %s: %s", addr, err)
708 func (s *Server) getNode(addr dHTAddr) (n *node) {
709 addrStr := addr.String()
715 if len(s.nodes) < maxNodes {
721 func (s *Server) nodeTimedOut(addr dHTAddr) {
722 node, ok := s.nodes[addr.String()]
726 if node.DefinitelyGood() {
729 if len(s.nodes) < maxNodes {
732 delete(s.nodes, addr.String())
735 func (s *Server) writeToNode(b []byte, node dHTAddr) (err error) {
736 if list := s.ipBlockList; list != nil {
737 if r := list.Lookup(util.AddrIP(node.UDPAddr())); r != nil {
738 err = fmt.Errorf("write to %s blocked: %s", node, r.Description)
742 n, err := s.socket.WriteTo(b, node.UDPAddr())
744 err = fmt.Errorf("error writing %d bytes to %s: %#v", len(b), node, err)
748 err = io.ErrShortWrite
754 func (s *Server) findResponseTransaction(transactionID string, sourceNode dHTAddr) *Transaction {
755 return s.transactions[transactionKey{
760 func (s *Server) nextTransactionID() string {
761 var b [binary.MaxVarintLen64]byte
762 n := binary.PutUvarint(b[:], s.transactionIDInt)
767 func (s *Server) deleteTransaction(t *Transaction) {
768 delete(s.transactions, t.key())
771 func (s *Server) addTransaction(t *Transaction) {
772 if _, ok := s.transactions[t.key()]; ok {
773 panic("transaction not unique")
775 s.transactions[t.key()] = t
778 // Returns the 20-byte server ID. This is the ID used to communicate with the
780 func (s *Server) ID() string {
787 func (s *Server) query(node dHTAddr, q string, a map[string]interface{}, onResponse func(Msg)) (t *Transaction, err error) {
788 tid := s.nextTransactionID()
790 a = make(map[string]interface{}, 1)
793 d := map[string]interface{}{
799 b, err := bencode.Marshal(d)
806 response: make(chan Msg, 1),
807 done: make(chan struct{}),
810 onResponse: onResponse,
816 s.getNode(node).lastSentQuery = time.Now()
822 // The size in bytes of a NodeInfo in its compact binary representation.
823 const CompactNodeInfoLen = 26
825 type NodeInfo struct {
830 // Writes the node info to its compact binary representation in b. See
831 // CompactNodeInfoLen.
832 func (ni *NodeInfo) PutCompact(b []byte) error {
833 if n := copy(b[:], ni.ID[:]); n != 20 {
836 ip := util.AddrIP(ni.Addr).To4()
838 return errors.New("expected ipv4 address")
840 if n := copy(b[20:], ip); n != 4 {
843 binary.BigEndian.PutUint16(b[24:], uint16(util.AddrPort(ni.Addr)))
847 func (cni *NodeInfo) UnmarshalCompact(b []byte) error {
849 return errors.New("expected 26 bytes")
851 util.CopyExact(cni.ID[:], b[:20])
852 cni.Addr = newDHTAddr(&net.UDPAddr{
853 IP: net.IPv4(b[20], b[21], b[22], b[23]),
854 Port: int(binary.BigEndian.Uint16(b[24:26])),
859 // Sends a ping query to the address given.
860 func (s *Server) Ping(node *net.UDPAddr) (*Transaction, error) {
863 return s.query(newDHTAddr(node), "ping", nil, nil)
866 func (s *Server) announcePeer(node dHTAddr, infoHash string, port int, token string, impliedPort bool) (err error) {
867 if port == 0 && !impliedPort {
868 return errors.New("nothing to announce")
870 _, err = s.query(node, "announce_peer", map[string]interface{}{
871 "implied_port": func() int {
878 "info_hash": infoHash,
882 if err := m.Error(); err != nil {
883 logonce.Stderr.Printf("announce_peer response: %s", err)
886 s.numConfirmedAnnounces++
891 // Add response nodes to node table.
892 func (s *Server) liftNodes(d Msg) {
896 for _, cni := range d.Nodes() {
897 if util.AddrPort(cni.Addr) == 0 {
898 // TODO: Why would people even do this?
901 if s.ipBlocked(util.AddrIP(cni.Addr)) {
904 n := s.getNode(cni.Addr)
905 n.SetIDFromBytes(cni.ID[:])
909 // Sends a find_node query to addr. targetID is the node we're looking for.
910 func (s *Server) findNode(addr dHTAddr, targetID string) (t *Transaction, err error) {
911 t, err = s.query(addr, "find_node", map[string]interface{}{"target": targetID}, func(d Msg) {
912 // Scrape peers from the response to put in the server's table before
913 // handing the response back to the caller.
922 // In a get_peers response, the addresses of torrent clients involved with the
923 // queried info-hash.
924 func (m Msg) Values() (vs []util.CompactPeer) {
929 rd, ok := r.(map[string]interface{})
933 v, ok := rd["values"]
937 vl, ok := v.([]interface{})
939 log.Printf("unexpected krpc values type: %T", v)
942 vs = make([]util.CompactPeer, 0, len(vl))
943 for _, i := range vl {
948 var cp util.CompactPeer
949 err := cp.UnmarshalBinary([]byte(s))
951 log.Printf("error decoding values list element: %s", err)
959 func (s *Server) getPeers(addr dHTAddr, infoHash string) (t *Transaction, err error) {
960 if len(infoHash) != 20 {
961 err = fmt.Errorf("infohash has bad length")
964 t, err = s.query(addr, "get_peers", map[string]interface{}{"info_hash": infoHash}, func(m Msg) {
966 at, ok := m.AnnounceToken()
968 s.getNode(addr).announceToken = at
974 func bootstrapAddrs(nodeAddrs []string) (addrs []*net.UDPAddr, err error) {
975 bootstrapNodes := nodeAddrs
976 if len(bootstrapNodes) == 0 {
977 bootstrapNodes = []string{
978 "router.utorrent.com:6881",
979 "router.bittorrent.com:6881",
982 for _, addrStr := range bootstrapNodes {
984 udpAddr, err := net.ResolveUDPAddr("udp4", addrStr)
988 addrs = append(addrs, udpAddr)
992 err = errors.New("nothing resolved")
997 func (s *Server) addRootNodes() error {
998 addrs, err := bootstrapAddrs(s.bootstrapNodes)
1002 for _, addr := range addrs {
1003 s.nodes[addr.String()] = &node{
1004 addr: newDHTAddr(addr),
1010 // Populates the node table.
1011 func (s *Server) bootstrap() (err error) {
1014 if len(s.nodes) == 0 {
1015 err = s.addRootNodes()
1021 var outstanding sync.WaitGroup
1022 for _, node := range s.nodes {
1024 t, err = s.findNode(node.addr, s.id)
1026 err = fmt.Errorf("error sending find_node: %s", err)
1030 t.SetResponseHandler(func(Msg) {
1034 noOutstanding := make(chan struct{})
1037 close(noOutstanding)
1044 case <-time.After(15 * time.Second):
1045 case <-noOutstanding:
1048 // log.Printf("now have %d nodes", len(s.nodes))
1049 if s.numGoodNodes() >= 160 {
1056 func (s *Server) numGoodNodes() (num int) {
1057 for _, n := range s.nodes {
1058 if n.DefinitelyGood() {
1065 // Returns how many nodes are in the node table.
1066 func (s *Server) NumNodes() int {
1072 // Exports the current node table.
1073 func (s *Server) Nodes() (nis []NodeInfo) {
1076 for _, node := range s.nodes {
1077 // if !node.Good() {
1083 if n := copy(ni.ID[:], node.idString()); n != 20 && n != 0 {
1086 nis = append(nis, ni)
1091 // Stops the server network activity. This is all that's required to clean-up a Server.
1092 func (s *Server) Close() {
1103 var maxDistance big.Int
1107 maxDistance.SetBit(&zero, 160, 1)
1110 func (s *Server) closestGoodNodes(k int, targetID string) []*node {
1111 return s.closestNodes(k, nodeIDFromString(targetID), func(n *node) bool { return n.DefinitelyGood() })
1114 func (s *Server) closestNodes(k int, target nodeID, filter func(*node) bool) []*node {
1115 sel := newKClosestNodesSelector(k, target)
1116 idNodes := make(map[string]*node, len(s.nodes))
1117 for _, node := range s.nodes {
1122 idNodes[node.idString()] = node
1125 ret := make([]*node, 0, len(ids))
1126 for _, id := range ids {
1127 ret = append(ret, idNodes[id.String()])